]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | import logging |
2 | import os | |
3 | import asyncio | |
4 | from tempfile import NamedTemporaryFile | |
5 | from threading import Thread | |
6 | from contextlib import contextmanager | |
7 | from io import StringIO | |
8 | from shlex import quote | |
9 | from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union | |
10 | from orchestrator import OrchestratorError | |
11 | ||
12 | try: | |
13 | import asyncssh | |
14 | except ImportError: | |
15 | asyncssh = None # type: ignore | |
16 | ||
17 | if TYPE_CHECKING: | |
18 | from cephadm.module import CephadmOrchestrator | |
19 | from asyncssh.connection import SSHClientConnection | |
20 | ||
21 | T = TypeVar('T') | |
22 | ||
23 | ||
24 | logger = logging.getLogger(__name__) | |
25 | ||
26 | asyncssh_logger = logging.getLogger('asyncssh') | |
27 | asyncssh_logger.propagate = False | |
28 | ||
39ae355f TL |
29 | |
30 | class HostConnectionError(OrchestratorError): | |
31 | def __init__(self, message: str, hostname: str, addr: str) -> None: | |
32 | super().__init__(message) | |
33 | self.hostname = hostname | |
34 | self.addr = addr | |
35 | ||
36 | ||
20effc67 TL |
37 | DEFAULT_SSH_CONFIG = """ |
38 | Host * | |
39 | User root | |
40 | StrictHostKeyChecking no | |
41 | UserKnownHostsFile /dev/null | |
42 | ConnectTimeout=30 | |
43 | """ | |
44 | ||
45 | ||
46 | class EventLoopThread(Thread): | |
47 | ||
48 | def __init__(self) -> None: | |
49 | self._loop = asyncio.new_event_loop() | |
50 | asyncio.set_event_loop(self._loop) | |
51 | ||
52 | super().__init__(target=self._loop.run_forever) | |
53 | self.start() | |
54 | ||
1e59de90 TL |
55 | def get_result(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T: |
56 | # useful to note: This "run_coroutine_threadsafe" returns a | |
57 | # concurrent.futures.Future, rather than an asyncio.Future. They are | |
58 | # fairly similar but have a few differences, notably in our case | |
59 | # that the result function of a concurrent.futures.Future accepts | |
60 | # a timeout argument | |
61 | future = asyncio.run_coroutine_threadsafe(coro, self._loop) | |
62 | try: | |
63 | return future.result(timeout) | |
64 | except asyncio.TimeoutError: | |
65 | # try to cancel the task before raising the exception further up | |
66 | future.cancel() | |
67 | raise | |
20effc67 TL |
68 | |
69 | ||
70 | class SSHManager: | |
71 | ||
72 | def __init__(self, mgr: "CephadmOrchestrator"): | |
73 | self.mgr: "CephadmOrchestrator" = mgr | |
74 | self.cons: Dict[str, "SSHClientConnection"] = {} | |
75 | ||
76 | async def _remote_connection(self, | |
77 | host: str, | |
78 | addr: Optional[str] = None, | |
79 | ) -> "SSHClientConnection": | |
33c7a0ef | 80 | if not self.cons.get(host) or host not in self.mgr.inventory: |
20effc67 TL |
81 | if not addr and host in self.mgr.inventory: |
82 | addr = self.mgr.inventory.get_addr(host) | |
83 | ||
84 | if not addr: | |
85 | raise OrchestratorError("host address is empty") | |
86 | ||
87 | assert self.mgr.ssh_user | |
88 | n = self.mgr.ssh_user + '@' + addr | |
89 | logger.debug("Opening connection to {} with ssh options '{}'".format( | |
90 | n, self.mgr._ssh_options)) | |
91 | ||
92 | asyncssh.set_log_level('DEBUG') | |
93 | asyncssh.set_debug_level(3) | |
94 | ||
95 | with self.redirect_log(host, addr): | |
96 | try: | |
33c7a0ef TL |
97 | ssh_options = asyncssh.SSHClientConnectionOptions( |
98 | keepalive_interval=7, keepalive_count_max=3) | |
99 | conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name], | |
100 | known_hosts=None, config=[self.mgr.ssh_config_fname], | |
101 | preferred_auth=['publickey'], options=ssh_options) | |
20effc67 TL |
102 | except OSError: |
103 | raise | |
104 | except asyncssh.Error: | |
105 | raise | |
106 | except Exception: | |
107 | raise | |
108 | self.cons[host] = conn | |
109 | ||
110 | self.mgr.offline_hosts_remove(host) | |
111 | ||
112 | return self.cons[host] | |
113 | ||
114 | @contextmanager | |
115 | def redirect_log(self, host: str, addr: str) -> Iterator[None]: | |
116 | log_string = StringIO() | |
117 | ch = logging.StreamHandler(log_string) | |
33c7a0ef | 118 | ch.setLevel(logging.INFO) |
20effc67 TL |
119 | asyncssh_logger.addHandler(ch) |
120 | ||
121 | try: | |
122 | yield | |
123 | except OSError as e: | |
124 | self.mgr.offline_hosts.add(host) | |
125 | log_content = log_string.getvalue() | |
aee94f69 | 126 | msg = f"Can't communicate with remote host `{addr}`, possibly because the host is not reachable or python3 is not installed on the host. {str(e)}" |
20effc67 | 127 | logger.exception(msg) |
39ae355f | 128 | raise HostConnectionError(msg, host, addr) |
20effc67 TL |
129 | except asyncssh.Error as e: |
130 | self.mgr.offline_hosts.add(host) | |
131 | log_content = log_string.getvalue() | |
132 | msg = f'Failed to connect to {host} ({addr}). {str(e)}' + '\n' + f'Log: {log_content}' | |
133 | logger.debug(msg) | |
39ae355f | 134 | raise HostConnectionError(msg, host, addr) |
20effc67 TL |
135 | except Exception as e: |
136 | self.mgr.offline_hosts.add(host) | |
137 | log_content = log_string.getvalue() | |
138 | logger.exception(str(e)) | |
39ae355f TL |
139 | raise HostConnectionError( |
140 | f'Failed to connect to {host} ({addr}): {repr(e)}' + '\n' f'Log: {log_content}', host, addr) | |
20effc67 TL |
141 | finally: |
142 | log_string.flush() | |
143 | asyncssh_logger.removeHandler(ch) | |
144 | ||
145 | def remote_connection(self, | |
146 | host: str, | |
147 | addr: Optional[str] = None, | |
148 | ) -> "SSHClientConnection": | |
1e59de90 TL |
149 | with self.mgr.async_timeout_handler(host, f'ssh {host} (addr {addr})'): |
150 | return self.mgr.wait_async(self._remote_connection(host, addr)) | |
20effc67 TL |
151 | |
152 | async def _execute_command(self, | |
153 | host: str, | |
aee94f69 | 154 | cmd_components: List[str], |
20effc67 TL |
155 | stdin: Optional[str] = None, |
156 | addr: Optional[str] = None, | |
39ae355f | 157 | log_command: Optional[bool] = True, |
20effc67 | 158 | ) -> Tuple[str, str, int]: |
aee94f69 | 159 | |
20effc67 | 160 | conn = await self._remote_connection(host, addr) |
33c7a0ef | 161 | sudo_prefix = "sudo " if self.mgr.ssh_user != 'root' else "" |
aee94f69 TL |
162 | cmd = sudo_prefix + " ".join(quote(x) for x in cmd_components) |
163 | try: | |
164 | address = addr or self.mgr.inventory.get_addr(host) | |
165 | except Exception: | |
166 | address = host | |
39ae355f TL |
167 | if log_command: |
168 | logger.debug(f'Running command: {cmd}') | |
20effc67 | 169 | try: |
aee94f69 | 170 | r = await conn.run(f'{sudo_prefix}true', check=True, timeout=5) # host quick check |
20effc67 | 171 | r = await conn.run(cmd, input=stdin) |
aee94f69 TL |
172 | # handle these Exceptions otherwise you might get a weird error like |
173 | # TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception) | |
174 | except asyncssh.ChannelOpenError as e: | |
20effc67 TL |
175 | # SSH connection closed or broken, will create new connection next call |
176 | logger.debug(f'Connection to {host} failed. {str(e)}') | |
177 | await self._reset_con(host) | |
178 | self.mgr.offline_hosts.add(host) | |
aee94f69 TL |
179 | raise HostConnectionError(f'Unable to reach remote host {host}. {str(e)}', host, address) |
180 | except asyncssh.ProcessError as e: | |
181 | msg = f"Cannot execute the command '{cmd}' on the {host}. {str(e.stderr)}." | |
182 | logger.debug(msg) | |
183 | await self._reset_con(host) | |
184 | self.mgr.offline_hosts.add(host) | |
185 | raise HostConnectionError(msg, host, address) | |
186 | except Exception as e: | |
187 | msg = f"Generic error while executing command '{cmd}' on the host {host}. {str(e)}." | |
188 | logger.debug(msg) | |
189 | await self._reset_con(host) | |
190 | self.mgr.offline_hosts.add(host) | |
191 | raise HostConnectionError(msg, host, address) | |
20effc67 TL |
192 | |
193 | def _rstrip(v: Union[bytes, str, None]) -> str: | |
194 | if not v: | |
195 | return '' | |
196 | if isinstance(v, str): | |
197 | return v.rstrip('\n') | |
198 | if isinstance(v, bytes): | |
199 | return v.decode().rstrip('\n') | |
200 | raise OrchestratorError( | |
201 | f'Unable to parse ssh output with type {type(v)} from remote host {host}') | |
202 | ||
203 | out = _rstrip(r.stdout) | |
204 | err = _rstrip(r.stderr) | |
205 | rc = r.returncode if r.returncode else 0 | |
206 | ||
207 | return out, err, rc | |
208 | ||
209 | def execute_command(self, | |
210 | host: str, | |
211 | cmd: List[str], | |
212 | stdin: Optional[str] = None, | |
213 | addr: Optional[str] = None, | |
39ae355f | 214 | log_command: Optional[bool] = True |
20effc67 | 215 | ) -> Tuple[str, str, int]: |
1e59de90 TL |
216 | with self.mgr.async_timeout_handler(host, " ".join(cmd)): |
217 | return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command)) | |
20effc67 TL |
218 | |
219 | async def _check_execute_command(self, | |
220 | host: str, | |
221 | cmd: List[str], | |
222 | stdin: Optional[str] = None, | |
223 | addr: Optional[str] = None, | |
39ae355f | 224 | log_command: Optional[bool] = True |
20effc67 | 225 | ) -> str: |
39ae355f | 226 | out, err, code = await self._execute_command(host, cmd, stdin, addr, log_command) |
20effc67 TL |
227 | if code != 0: |
228 | msg = f'Command {cmd} failed. {err}' | |
229 | logger.debug(msg) | |
230 | raise OrchestratorError(msg) | |
231 | return out | |
232 | ||
233 | def check_execute_command(self, | |
234 | host: str, | |
235 | cmd: List[str], | |
236 | stdin: Optional[str] = None, | |
237 | addr: Optional[str] = None, | |
39ae355f | 238 | log_command: Optional[bool] = True, |
20effc67 | 239 | ) -> str: |
1e59de90 TL |
240 | with self.mgr.async_timeout_handler(host, " ".join(cmd)): |
241 | return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command)) | |
20effc67 TL |
242 | |
243 | async def _write_remote_file(self, | |
244 | host: str, | |
245 | path: str, | |
246 | content: bytes, | |
247 | mode: Optional[int] = None, | |
248 | uid: Optional[int] = None, | |
249 | gid: Optional[int] = None, | |
250 | addr: Optional[str] = None, | |
251 | ) -> None: | |
252 | try: | |
1e59de90 | 253 | cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}" |
20effc67 TL |
254 | dirname = os.path.dirname(path) |
255 | await self._check_execute_command(host, ['mkdir', '-p', dirname], addr=addr) | |
1e59de90 TL |
256 | await self._check_execute_command(host, ['mkdir', '-p', cephadm_tmp_dir + dirname], addr=addr) |
257 | tmp_path = cephadm_tmp_dir + path + '.new' | |
20effc67 | 258 | await self._check_execute_command(host, ['touch', tmp_path], addr=addr) |
33c7a0ef | 259 | if self.mgr.ssh_user != 'root': |
20effc67 | 260 | assert self.mgr.ssh_user |
1e59de90 | 261 | await self._check_execute_command(host, ['chown', '-R', self.mgr.ssh_user, cephadm_tmp_dir], addr=addr) |
20effc67 TL |
262 | await self._check_execute_command(host, ['chmod', str(644), tmp_path], addr=addr) |
263 | with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f: | |
264 | os.fchmod(f.fileno(), 0o600) | |
265 | f.write(content) | |
266 | f.flush() | |
267 | conn = await self._remote_connection(host, addr) | |
1e59de90 TL |
268 | async with conn.start_sftp_client() as sftp: |
269 | await sftp.put(f.name, tmp_path) | |
33c7a0ef TL |
270 | if uid is not None and gid is not None and mode is not None: |
271 | # shlex quote takes str or byte object, not int | |
272 | await self._check_execute_command(host, ['chown', '-R', str(uid) + ':' + str(gid), tmp_path], addr=addr) | |
273 | await self._check_execute_command(host, ['chmod', oct(mode)[2:], tmp_path], addr=addr) | |
20effc67 TL |
274 | await self._check_execute_command(host, ['mv', tmp_path, path], addr=addr) |
275 | except Exception as e: | |
276 | msg = f"Unable to write {host}:{path}: {e}" | |
277 | logger.exception(msg) | |
278 | raise OrchestratorError(msg) | |
279 | ||
280 | def write_remote_file(self, | |
281 | host: str, | |
282 | path: str, | |
283 | content: bytes, | |
284 | mode: Optional[int] = None, | |
285 | uid: Optional[int] = None, | |
286 | gid: Optional[int] = None, | |
287 | addr: Optional[str] = None, | |
288 | ) -> None: | |
1e59de90 TL |
289 | with self.mgr.async_timeout_handler(host, f'writing file {path}'): |
290 | self.mgr.wait_async(self._write_remote_file( | |
291 | host, path, content, mode, uid, gid, addr)) | |
20effc67 TL |
292 | |
293 | async def _reset_con(self, host: str) -> None: | |
294 | conn = self.cons.get(host) | |
295 | if conn: | |
296 | logger.debug(f'_reset_con close {host}') | |
297 | conn.close() | |
298 | del self.cons[host] | |
299 | ||
300 | def reset_con(self, host: str) -> None: | |
1e59de90 TL |
301 | with self.mgr.async_timeout_handler(cmd=f'resetting ssh connection to {host}'): |
302 | self.mgr.wait_async(self._reset_con(host)) | |
20effc67 TL |
303 | |
304 | def _reset_cons(self) -> None: | |
305 | for host, conn in self.cons.items(): | |
306 | logger.debug(f'_reset_cons close {host}') | |
307 | conn.close() | |
308 | self.cons = {} | |
309 | ||
310 | def _reconfig_ssh(self) -> None: | |
311 | temp_files = [] # type: list | |
312 | ssh_options = [] # type: List[str] | |
313 | ||
314 | # ssh_config | |
315 | self.mgr.ssh_config_fname = self.mgr.ssh_config_file | |
316 | ssh_config = self.mgr.get_store("ssh_config") | |
317 | if ssh_config is not None or self.mgr.ssh_config_fname is None: | |
318 | if not ssh_config: | |
319 | ssh_config = DEFAULT_SSH_CONFIG | |
320 | f = NamedTemporaryFile(prefix='cephadm-conf-') | |
321 | os.fchmod(f.fileno(), 0o600) | |
322 | f.write(ssh_config.encode('utf-8')) | |
323 | f.flush() # make visible to other processes | |
324 | temp_files += [f] | |
325 | self.mgr.ssh_config_fname = f.name | |
326 | if self.mgr.ssh_config_fname: | |
327 | self.mgr.validate_ssh_config_fname(self.mgr.ssh_config_fname) | |
328 | ssh_options += ['-F', self.mgr.ssh_config_fname] | |
329 | self.mgr.ssh_config = ssh_config | |
330 | ||
331 | # identity | |
332 | ssh_key = self.mgr.get_store("ssh_identity_key") | |
333 | ssh_pub = self.mgr.get_store("ssh_identity_pub") | |
aee94f69 | 334 | ssh_cert = self.mgr.get_store("ssh_identity_cert") |
20effc67 TL |
335 | self.mgr.ssh_pub = ssh_pub |
336 | self.mgr.ssh_key = ssh_key | |
aee94f69 TL |
337 | self.mgr.ssh_cert = ssh_cert |
338 | if ssh_key: | |
20effc67 TL |
339 | self.mgr.tkey = NamedTemporaryFile(prefix='cephadm-identity-') |
340 | self.mgr.tkey.write(ssh_key.encode('utf-8')) | |
341 | os.fchmod(self.mgr.tkey.fileno(), 0o600) | |
342 | self.mgr.tkey.flush() # make visible to other processes | |
aee94f69 TL |
343 | temp_files += [self.mgr.tkey] |
344 | if ssh_pub: | |
345 | tpub = open(self.mgr.tkey.name + '.pub', 'w') | |
346 | os.fchmod(tpub.fileno(), 0o600) | |
347 | tpub.write(ssh_pub) | |
348 | tpub.flush() # make visible to other processes | |
349 | temp_files += [tpub] | |
350 | if ssh_cert: | |
351 | tcert = open(self.mgr.tkey.name + '-cert.pub', 'w') | |
352 | os.fchmod(tcert.fileno(), 0o600) | |
353 | tcert.write(ssh_cert) | |
354 | tcert.flush() # make visible to other processes | |
355 | temp_files += [tcert] | |
20effc67 TL |
356 | ssh_options += ['-i', self.mgr.tkey.name] |
357 | ||
358 | self.mgr._temp_files = temp_files | |
359 | if ssh_options: | |
360 | self.mgr._ssh_options = ' '.join(ssh_options) | |
361 | else: | |
362 | self.mgr._ssh_options = None | |
363 | ||
364 | if self.mgr.mode == 'root': | |
365 | self.mgr.ssh_user = self.mgr.get_store('ssh_user', default='root') | |
366 | elif self.mgr.mode == 'cephadm-package': | |
367 | self.mgr.ssh_user = 'cephadm' | |
368 | ||
369 | self._reset_cons() |