]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/ssh.py
e874ba6d78fe860b6ed969f1c6d15a7c71d13193
[ceph.git] / ceph / src / pybind / mgr / cephadm / ssh.py
1 import logging
2 import os
3 import asyncio
4 from tempfile import NamedTemporaryFile
5 from threading import Thread
6 from contextlib import contextmanager
7 from io import StringIO
8 from shlex import quote
9 from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union
10 from orchestrator import OrchestratorError
11
12 try:
13 import asyncssh
14 except ImportError:
15 asyncssh = None # type: ignore
16
17 if TYPE_CHECKING:
18 from cephadm.module import CephadmOrchestrator
19 from asyncssh.connection import SSHClientConnection
20
21 T = TypeVar('T')
22
23
24 logger = logging.getLogger(__name__)
25
26 asyncssh_logger = logging.getLogger('asyncssh')
27 asyncssh_logger.propagate = False
28
29 DEFAULT_SSH_CONFIG = """
30 Host *
31 User root
32 StrictHostKeyChecking no
33 UserKnownHostsFile /dev/null
34 ConnectTimeout=30
35 """
36
37
38 class EventLoopThread(Thread):
39
40 def __init__(self) -> None:
41 self._loop = asyncio.new_event_loop()
42 asyncio.set_event_loop(self._loop)
43
44 super().__init__(target=self._loop.run_forever)
45 self.start()
46
47 def get_result(self, coro: Awaitable[T]) -> T:
48 return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
49
50
51 class SSHManager:
52
53 def __init__(self, mgr: "CephadmOrchestrator"):
54 self.mgr: "CephadmOrchestrator" = mgr
55 self.cons: Dict[str, "SSHClientConnection"] = {}
56
57 async def _remote_connection(self,
58 host: str,
59 addr: Optional[str] = None,
60 ) -> "SSHClientConnection":
61 if not self.cons.get(host) or host not in self.mgr.inventory:
62 if not addr and host in self.mgr.inventory:
63 addr = self.mgr.inventory.get_addr(host)
64
65 if not addr:
66 raise OrchestratorError("host address is empty")
67
68 assert self.mgr.ssh_user
69 n = self.mgr.ssh_user + '@' + addr
70 logger.debug("Opening connection to {} with ssh options '{}'".format(
71 n, self.mgr._ssh_options))
72
73 asyncssh.set_log_level('DEBUG')
74 asyncssh.set_debug_level(3)
75
76 with self.redirect_log(host, addr):
77 try:
78 ssh_options = asyncssh.SSHClientConnectionOptions(
79 keepalive_interval=7, keepalive_count_max=3)
80 conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name],
81 known_hosts=None, config=[self.mgr.ssh_config_fname],
82 preferred_auth=['publickey'], options=ssh_options)
83 except OSError:
84 raise
85 except asyncssh.Error:
86 raise
87 except Exception:
88 raise
89 self.cons[host] = conn
90
91 self.mgr.offline_hosts_remove(host)
92
93 return self.cons[host]
94
95 @contextmanager
96 def redirect_log(self, host: str, addr: str) -> Iterator[None]:
97 log_string = StringIO()
98 ch = logging.StreamHandler(log_string)
99 ch.setLevel(logging.INFO)
100 asyncssh_logger.addHandler(ch)
101
102 try:
103 yield
104 except OSError as e:
105 self.mgr.offline_hosts.add(host)
106 log_content = log_string.getvalue()
107 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there. {str(e)}"
108 logger.exception(msg)
109 raise OrchestratorError(msg)
110 except asyncssh.Error as e:
111 self.mgr.offline_hosts.add(host)
112 log_content = log_string.getvalue()
113 msg = f'Failed to connect to {host} ({addr}). {str(e)}' + '\n' + f'Log: {log_content}'
114 logger.debug(msg)
115 raise OrchestratorError(msg)
116 except Exception as e:
117 self.mgr.offline_hosts.add(host)
118 log_content = log_string.getvalue()
119 logger.exception(str(e))
120 raise OrchestratorError(
121 f'Failed to connect to {host} ({addr}): {repr(e)}' + '\n' f'Log: {log_content}')
122 finally:
123 log_string.flush()
124 asyncssh_logger.removeHandler(ch)
125
126 def remote_connection(self,
127 host: str,
128 addr: Optional[str] = None,
129 ) -> "SSHClientConnection":
130 return self.mgr.wait_async(self._remote_connection(host, addr))
131
132 async def _execute_command(self,
133 host: str,
134 cmd: List[str],
135 stdin: Optional[str] = None,
136 addr: Optional[str] = None,
137 ) -> Tuple[str, str, int]:
138 conn = await self._remote_connection(host, addr)
139 sudo_prefix = "sudo " if self.mgr.ssh_user != 'root' else ""
140 cmd = sudo_prefix + " ".join(quote(x) for x in cmd)
141 logger.debug(f'Running command: {cmd}')
142 try:
143 r = await conn.run('sudo true', check=True, timeout=5)
144 r = await conn.run(cmd, input=stdin)
145 # handle these Exceptions otherwise you might get a weird error like TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception)
146 except (asyncssh.ChannelOpenError, asyncssh.ProcessError, Exception) as e:
147 # SSH connection closed or broken, will create new connection next call
148 logger.debug(f'Connection to {host} failed. {str(e)}')
149 await self._reset_con(host)
150 self.mgr.offline_hosts.add(host)
151 raise OrchestratorError(f'Unable to reach remote host {host}. {str(e)}')
152
153 def _rstrip(v: Union[bytes, str, None]) -> str:
154 if not v:
155 return ''
156 if isinstance(v, str):
157 return v.rstrip('\n')
158 if isinstance(v, bytes):
159 return v.decode().rstrip('\n')
160 raise OrchestratorError(
161 f'Unable to parse ssh output with type {type(v)} from remote host {host}')
162
163 out = _rstrip(r.stdout)
164 err = _rstrip(r.stderr)
165 rc = r.returncode if r.returncode else 0
166
167 return out, err, rc
168
169 def execute_command(self,
170 host: str,
171 cmd: List[str],
172 stdin: Optional[str] = None,
173 addr: Optional[str] = None,
174 ) -> Tuple[str, str, int]:
175 return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr))
176
177 async def _check_execute_command(self,
178 host: str,
179 cmd: List[str],
180 stdin: Optional[str] = None,
181 addr: Optional[str] = None,
182 ) -> str:
183 out, err, code = await self._execute_command(host, cmd, stdin, addr)
184 if code != 0:
185 msg = f'Command {cmd} failed. {err}'
186 logger.debug(msg)
187 raise OrchestratorError(msg)
188 return out
189
190 def check_execute_command(self,
191 host: str,
192 cmd: List[str],
193 stdin: Optional[str] = None,
194 addr: Optional[str] = None,
195 ) -> str:
196 return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr))
197
198 async def _write_remote_file(self,
199 host: str,
200 path: str,
201 content: bytes,
202 mode: Optional[int] = None,
203 uid: Optional[int] = None,
204 gid: Optional[int] = None,
205 addr: Optional[str] = None,
206 ) -> None:
207 try:
208 dirname = os.path.dirname(path)
209 await self._check_execute_command(host, ['mkdir', '-p', dirname], addr=addr)
210 await self._check_execute_command(host, ['mkdir', '-p', '/tmp' + dirname], addr=addr)
211 tmp_path = '/tmp' + path + '.new'
212 await self._check_execute_command(host, ['touch', tmp_path], addr=addr)
213 if self.mgr.ssh_user != 'root':
214 assert self.mgr.ssh_user
215 await self._check_execute_command(host, ['chown', '-R', self.mgr.ssh_user, tmp_path], addr=addr)
216 await self._check_execute_command(host, ['chmod', str(644), tmp_path], addr=addr)
217 with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
218 os.fchmod(f.fileno(), 0o600)
219 f.write(content)
220 f.flush()
221 conn = await self._remote_connection(host, addr)
222 await asyncssh.scp(f.name, (conn, tmp_path))
223 if uid is not None and gid is not None and mode is not None:
224 # shlex quote takes str or byte object, not int
225 await self._check_execute_command(host, ['chown', '-R', str(uid) + ':' + str(gid), tmp_path], addr=addr)
226 await self._check_execute_command(host, ['chmod', oct(mode)[2:], tmp_path], addr=addr)
227 await self._check_execute_command(host, ['mv', tmp_path, path], addr=addr)
228 except Exception as e:
229 msg = f"Unable to write {host}:{path}: {e}"
230 logger.exception(msg)
231 raise OrchestratorError(msg)
232
233 def write_remote_file(self,
234 host: str,
235 path: str,
236 content: bytes,
237 mode: Optional[int] = None,
238 uid: Optional[int] = None,
239 gid: Optional[int] = None,
240 addr: Optional[str] = None,
241 ) -> None:
242 self.mgr.wait_async(self._write_remote_file(
243 host, path, content, mode, uid, gid, addr))
244
245 async def _reset_con(self, host: str) -> None:
246 conn = self.cons.get(host)
247 if conn:
248 logger.debug(f'_reset_con close {host}')
249 conn.close()
250 del self.cons[host]
251
252 def reset_con(self, host: str) -> None:
253 self.mgr.wait_async(self._reset_con(host))
254
255 def _reset_cons(self) -> None:
256 for host, conn in self.cons.items():
257 logger.debug(f'_reset_cons close {host}')
258 conn.close()
259 self.cons = {}
260
261 def _reconfig_ssh(self) -> None:
262 temp_files = [] # type: list
263 ssh_options = [] # type: List[str]
264
265 # ssh_config
266 self.mgr.ssh_config_fname = self.mgr.ssh_config_file
267 ssh_config = self.mgr.get_store("ssh_config")
268 if ssh_config is not None or self.mgr.ssh_config_fname is None:
269 if not ssh_config:
270 ssh_config = DEFAULT_SSH_CONFIG
271 f = NamedTemporaryFile(prefix='cephadm-conf-')
272 os.fchmod(f.fileno(), 0o600)
273 f.write(ssh_config.encode('utf-8'))
274 f.flush() # make visible to other processes
275 temp_files += [f]
276 self.mgr.ssh_config_fname = f.name
277 if self.mgr.ssh_config_fname:
278 self.mgr.validate_ssh_config_fname(self.mgr.ssh_config_fname)
279 ssh_options += ['-F', self.mgr.ssh_config_fname]
280 self.mgr.ssh_config = ssh_config
281
282 # identity
283 ssh_key = self.mgr.get_store("ssh_identity_key")
284 ssh_pub = self.mgr.get_store("ssh_identity_pub")
285 self.mgr.ssh_pub = ssh_pub
286 self.mgr.ssh_key = ssh_key
287 if ssh_key and ssh_pub:
288 self.mgr.tkey = NamedTemporaryFile(prefix='cephadm-identity-')
289 self.mgr.tkey.write(ssh_key.encode('utf-8'))
290 os.fchmod(self.mgr.tkey.fileno(), 0o600)
291 self.mgr.tkey.flush() # make visible to other processes
292 tpub = open(self.mgr.tkey.name + '.pub', 'w')
293 os.fchmod(tpub.fileno(), 0o600)
294 tpub.write(ssh_pub)
295 tpub.flush() # make visible to other processes
296 temp_files += [self.mgr.tkey, tpub]
297 ssh_options += ['-i', self.mgr.tkey.name]
298
299 self.mgr._temp_files = temp_files
300 if ssh_options:
301 self.mgr._ssh_options = ' '.join(ssh_options)
302 else:
303 self.mgr._ssh_options = None
304
305 if self.mgr.mode == 'root':
306 self.mgr.ssh_user = self.mgr.get_store('ssh_user', default='root')
307 elif self.mgr.mode == 'cephadm-package':
308 self.mgr.ssh_user = 'cephadm'
309
310 self._reset_cons()