]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/ssh.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / ssh.py
CommitLineData
20effc67
TL
1import logging
2import os
3import asyncio
4from tempfile import NamedTemporaryFile
5from threading import Thread
6from contextlib import contextmanager
7from io import StringIO
8from shlex import quote
9from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union
10from orchestrator import OrchestratorError
11
12try:
13 import asyncssh
14except ImportError:
15 asyncssh = None # type: ignore
16
17if TYPE_CHECKING:
18 from cephadm.module import CephadmOrchestrator
19 from asyncssh.connection import SSHClientConnection
20
21T = TypeVar('T')
22
23
24logger = logging.getLogger(__name__)
25
26asyncssh_logger = logging.getLogger('asyncssh')
27asyncssh_logger.propagate = False
28
39ae355f
TL
29
30class HostConnectionError(OrchestratorError):
31 def __init__(self, message: str, hostname: str, addr: str) -> None:
32 super().__init__(message)
33 self.hostname = hostname
34 self.addr = addr
35
36
20effc67
TL
37DEFAULT_SSH_CONFIG = """
38Host *
39 User root
40 StrictHostKeyChecking no
41 UserKnownHostsFile /dev/null
42 ConnectTimeout=30
43"""
44
45
46class EventLoopThread(Thread):
47
48 def __init__(self) -> None:
49 self._loop = asyncio.new_event_loop()
50 asyncio.set_event_loop(self._loop)
51
52 super().__init__(target=self._loop.run_forever)
53 self.start()
54
1e59de90
TL
55 def get_result(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
56 # useful to note: This "run_coroutine_threadsafe" returns a
57 # concurrent.futures.Future, rather than an asyncio.Future. They are
58 # fairly similar but have a few differences, notably in our case
59 # that the result function of a concurrent.futures.Future accepts
60 # a timeout argument
61 future = asyncio.run_coroutine_threadsafe(coro, self._loop)
62 try:
63 return future.result(timeout)
64 except asyncio.TimeoutError:
65 # try to cancel the task before raising the exception further up
66 future.cancel()
67 raise
20effc67
TL
68
69
70class SSHManager:
71
72 def __init__(self, mgr: "CephadmOrchestrator"):
73 self.mgr: "CephadmOrchestrator" = mgr
74 self.cons: Dict[str, "SSHClientConnection"] = {}
75
76 async def _remote_connection(self,
77 host: str,
78 addr: Optional[str] = None,
79 ) -> "SSHClientConnection":
33c7a0ef 80 if not self.cons.get(host) or host not in self.mgr.inventory:
20effc67
TL
81 if not addr and host in self.mgr.inventory:
82 addr = self.mgr.inventory.get_addr(host)
83
84 if not addr:
85 raise OrchestratorError("host address is empty")
86
87 assert self.mgr.ssh_user
88 n = self.mgr.ssh_user + '@' + addr
89 logger.debug("Opening connection to {} with ssh options '{}'".format(
90 n, self.mgr._ssh_options))
91
92 asyncssh.set_log_level('DEBUG')
93 asyncssh.set_debug_level(3)
94
95 with self.redirect_log(host, addr):
96 try:
33c7a0ef
TL
97 ssh_options = asyncssh.SSHClientConnectionOptions(
98 keepalive_interval=7, keepalive_count_max=3)
99 conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name],
100 known_hosts=None, config=[self.mgr.ssh_config_fname],
101 preferred_auth=['publickey'], options=ssh_options)
20effc67
TL
102 except OSError:
103 raise
104 except asyncssh.Error:
105 raise
106 except Exception:
107 raise
108 self.cons[host] = conn
109
110 self.mgr.offline_hosts_remove(host)
111
112 return self.cons[host]
113
114 @contextmanager
115 def redirect_log(self, host: str, addr: str) -> Iterator[None]:
116 log_string = StringIO()
117 ch = logging.StreamHandler(log_string)
33c7a0ef 118 ch.setLevel(logging.INFO)
20effc67
TL
119 asyncssh_logger.addHandler(ch)
120
121 try:
122 yield
123 except OSError as e:
124 self.mgr.offline_hosts.add(host)
125 log_content = log_string.getvalue()
aee94f69 126 msg = f"Can't communicate with remote host `{addr}`, possibly because the host is not reachable or python3 is not installed on the host. {str(e)}"
20effc67 127 logger.exception(msg)
39ae355f 128 raise HostConnectionError(msg, host, addr)
20effc67
TL
129 except asyncssh.Error as e:
130 self.mgr.offline_hosts.add(host)
131 log_content = log_string.getvalue()
132 msg = f'Failed to connect to {host} ({addr}). {str(e)}' + '\n' + f'Log: {log_content}'
133 logger.debug(msg)
39ae355f 134 raise HostConnectionError(msg, host, addr)
20effc67
TL
135 except Exception as e:
136 self.mgr.offline_hosts.add(host)
137 log_content = log_string.getvalue()
138 logger.exception(str(e))
39ae355f
TL
139 raise HostConnectionError(
140 f'Failed to connect to {host} ({addr}): {repr(e)}' + '\n' f'Log: {log_content}', host, addr)
20effc67
TL
141 finally:
142 log_string.flush()
143 asyncssh_logger.removeHandler(ch)
144
145 def remote_connection(self,
146 host: str,
147 addr: Optional[str] = None,
148 ) -> "SSHClientConnection":
1e59de90
TL
149 with self.mgr.async_timeout_handler(host, f'ssh {host} (addr {addr})'):
150 return self.mgr.wait_async(self._remote_connection(host, addr))
20effc67
TL
151
152 async def _execute_command(self,
153 host: str,
aee94f69 154 cmd_components: List[str],
20effc67
TL
155 stdin: Optional[str] = None,
156 addr: Optional[str] = None,
39ae355f 157 log_command: Optional[bool] = True,
20effc67 158 ) -> Tuple[str, str, int]:
aee94f69 159
20effc67 160 conn = await self._remote_connection(host, addr)
33c7a0ef 161 sudo_prefix = "sudo " if self.mgr.ssh_user != 'root' else ""
aee94f69
TL
162 cmd = sudo_prefix + " ".join(quote(x) for x in cmd_components)
163 try:
164 address = addr or self.mgr.inventory.get_addr(host)
165 except Exception:
166 address = host
39ae355f
TL
167 if log_command:
168 logger.debug(f'Running command: {cmd}')
20effc67 169 try:
aee94f69 170 r = await conn.run(f'{sudo_prefix}true', check=True, timeout=5) # host quick check
20effc67 171 r = await conn.run(cmd, input=stdin)
aee94f69
TL
172 # handle these Exceptions otherwise you might get a weird error like
173 # TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception)
174 except asyncssh.ChannelOpenError as e:
20effc67
TL
175 # SSH connection closed or broken, will create new connection next call
176 logger.debug(f'Connection to {host} failed. {str(e)}')
177 await self._reset_con(host)
178 self.mgr.offline_hosts.add(host)
aee94f69
TL
179 raise HostConnectionError(f'Unable to reach remote host {host}. {str(e)}', host, address)
180 except asyncssh.ProcessError as e:
181 msg = f"Cannot execute the command '{cmd}' on the {host}. {str(e.stderr)}."
182 logger.debug(msg)
183 await self._reset_con(host)
184 self.mgr.offline_hosts.add(host)
185 raise HostConnectionError(msg, host, address)
186 except Exception as e:
187 msg = f"Generic error while executing command '{cmd}' on the host {host}. {str(e)}."
188 logger.debug(msg)
189 await self._reset_con(host)
190 self.mgr.offline_hosts.add(host)
191 raise HostConnectionError(msg, host, address)
20effc67
TL
192
193 def _rstrip(v: Union[bytes, str, None]) -> str:
194 if not v:
195 return ''
196 if isinstance(v, str):
197 return v.rstrip('\n')
198 if isinstance(v, bytes):
199 return v.decode().rstrip('\n')
200 raise OrchestratorError(
201 f'Unable to parse ssh output with type {type(v)} from remote host {host}')
202
203 out = _rstrip(r.stdout)
204 err = _rstrip(r.stderr)
205 rc = r.returncode if r.returncode else 0
206
207 return out, err, rc
208
209 def execute_command(self,
210 host: str,
211 cmd: List[str],
212 stdin: Optional[str] = None,
213 addr: Optional[str] = None,
39ae355f 214 log_command: Optional[bool] = True
20effc67 215 ) -> Tuple[str, str, int]:
1e59de90
TL
216 with self.mgr.async_timeout_handler(host, " ".join(cmd)):
217 return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr, log_command))
20effc67
TL
218
219 async def _check_execute_command(self,
220 host: str,
221 cmd: List[str],
222 stdin: Optional[str] = None,
223 addr: Optional[str] = None,
39ae355f 224 log_command: Optional[bool] = True
20effc67 225 ) -> str:
39ae355f 226 out, err, code = await self._execute_command(host, cmd, stdin, addr, log_command)
20effc67
TL
227 if code != 0:
228 msg = f'Command {cmd} failed. {err}'
229 logger.debug(msg)
230 raise OrchestratorError(msg)
231 return out
232
233 def check_execute_command(self,
234 host: str,
235 cmd: List[str],
236 stdin: Optional[str] = None,
237 addr: Optional[str] = None,
39ae355f 238 log_command: Optional[bool] = True,
20effc67 239 ) -> str:
1e59de90
TL
240 with self.mgr.async_timeout_handler(host, " ".join(cmd)):
241 return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr, log_command))
20effc67
TL
242
243 async def _write_remote_file(self,
244 host: str,
245 path: str,
246 content: bytes,
247 mode: Optional[int] = None,
248 uid: Optional[int] = None,
249 gid: Optional[int] = None,
250 addr: Optional[str] = None,
251 ) -> None:
252 try:
1e59de90 253 cephadm_tmp_dir = f"/tmp/cephadm-{self.mgr._cluster_fsid}"
20effc67
TL
254 dirname = os.path.dirname(path)
255 await self._check_execute_command(host, ['mkdir', '-p', dirname], addr=addr)
1e59de90
TL
256 await self._check_execute_command(host, ['mkdir', '-p', cephadm_tmp_dir + dirname], addr=addr)
257 tmp_path = cephadm_tmp_dir + path + '.new'
20effc67 258 await self._check_execute_command(host, ['touch', tmp_path], addr=addr)
33c7a0ef 259 if self.mgr.ssh_user != 'root':
20effc67 260 assert self.mgr.ssh_user
1e59de90 261 await self._check_execute_command(host, ['chown', '-R', self.mgr.ssh_user, cephadm_tmp_dir], addr=addr)
20effc67
TL
262 await self._check_execute_command(host, ['chmod', str(644), tmp_path], addr=addr)
263 with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
264 os.fchmod(f.fileno(), 0o600)
265 f.write(content)
266 f.flush()
267 conn = await self._remote_connection(host, addr)
1e59de90
TL
268 async with conn.start_sftp_client() as sftp:
269 await sftp.put(f.name, tmp_path)
33c7a0ef
TL
270 if uid is not None and gid is not None and mode is not None:
271 # shlex quote takes str or byte object, not int
272 await self._check_execute_command(host, ['chown', '-R', str(uid) + ':' + str(gid), tmp_path], addr=addr)
273 await self._check_execute_command(host, ['chmod', oct(mode)[2:], tmp_path], addr=addr)
20effc67
TL
274 await self._check_execute_command(host, ['mv', tmp_path, path], addr=addr)
275 except Exception as e:
276 msg = f"Unable to write {host}:{path}: {e}"
277 logger.exception(msg)
278 raise OrchestratorError(msg)
279
280 def write_remote_file(self,
281 host: str,
282 path: str,
283 content: bytes,
284 mode: Optional[int] = None,
285 uid: Optional[int] = None,
286 gid: Optional[int] = None,
287 addr: Optional[str] = None,
288 ) -> None:
1e59de90
TL
289 with self.mgr.async_timeout_handler(host, f'writing file {path}'):
290 self.mgr.wait_async(self._write_remote_file(
291 host, path, content, mode, uid, gid, addr))
20effc67
TL
292
293 async def _reset_con(self, host: str) -> None:
294 conn = self.cons.get(host)
295 if conn:
296 logger.debug(f'_reset_con close {host}')
297 conn.close()
298 del self.cons[host]
299
300 def reset_con(self, host: str) -> None:
1e59de90
TL
301 with self.mgr.async_timeout_handler(cmd=f'resetting ssh connection to {host}'):
302 self.mgr.wait_async(self._reset_con(host))
20effc67
TL
303
304 def _reset_cons(self) -> None:
305 for host, conn in self.cons.items():
306 logger.debug(f'_reset_cons close {host}')
307 conn.close()
308 self.cons = {}
309
310 def _reconfig_ssh(self) -> None:
311 temp_files = [] # type: list
312 ssh_options = [] # type: List[str]
313
314 # ssh_config
315 self.mgr.ssh_config_fname = self.mgr.ssh_config_file
316 ssh_config = self.mgr.get_store("ssh_config")
317 if ssh_config is not None or self.mgr.ssh_config_fname is None:
318 if not ssh_config:
319 ssh_config = DEFAULT_SSH_CONFIG
320 f = NamedTemporaryFile(prefix='cephadm-conf-')
321 os.fchmod(f.fileno(), 0o600)
322 f.write(ssh_config.encode('utf-8'))
323 f.flush() # make visible to other processes
324 temp_files += [f]
325 self.mgr.ssh_config_fname = f.name
326 if self.mgr.ssh_config_fname:
327 self.mgr.validate_ssh_config_fname(self.mgr.ssh_config_fname)
328 ssh_options += ['-F', self.mgr.ssh_config_fname]
329 self.mgr.ssh_config = ssh_config
330
331 # identity
332 ssh_key = self.mgr.get_store("ssh_identity_key")
333 ssh_pub = self.mgr.get_store("ssh_identity_pub")
aee94f69 334 ssh_cert = self.mgr.get_store("ssh_identity_cert")
20effc67
TL
335 self.mgr.ssh_pub = ssh_pub
336 self.mgr.ssh_key = ssh_key
aee94f69
TL
337 self.mgr.ssh_cert = ssh_cert
338 if ssh_key:
20effc67
TL
339 self.mgr.tkey = NamedTemporaryFile(prefix='cephadm-identity-')
340 self.mgr.tkey.write(ssh_key.encode('utf-8'))
341 os.fchmod(self.mgr.tkey.fileno(), 0o600)
342 self.mgr.tkey.flush() # make visible to other processes
aee94f69
TL
343 temp_files += [self.mgr.tkey]
344 if ssh_pub:
345 tpub = open(self.mgr.tkey.name + '.pub', 'w')
346 os.fchmod(tpub.fileno(), 0o600)
347 tpub.write(ssh_pub)
348 tpub.flush() # make visible to other processes
349 temp_files += [tpub]
350 if ssh_cert:
351 tcert = open(self.mgr.tkey.name + '-cert.pub', 'w')
352 os.fchmod(tcert.fileno(), 0o600)
353 tcert.write(ssh_cert)
354 tcert.flush() # make visible to other processes
355 temp_files += [tcert]
20effc67
TL
356 ssh_options += ['-i', self.mgr.tkey.name]
357
358 self.mgr._temp_files = temp_files
359 if ssh_options:
360 self.mgr._ssh_options = ' '.join(ssh_options)
361 else:
362 self.mgr._ssh_options = None
363
364 if self.mgr.mode == 'root':
365 self.mgr.ssh_user = self.mgr.get_store('ssh_user', default='root')
366 elif self.mgr.mode == 'cephadm-package':
367 self.mgr.ssh_user = 'cephadm'
368
369 self._reset_cons()