]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | import logging |
2 | import os | |
3 | import asyncio | |
4 | from tempfile import NamedTemporaryFile | |
5 | from threading import Thread | |
6 | from contextlib import contextmanager | |
7 | from io import StringIO | |
8 | from shlex import quote | |
9 | from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union | |
10 | from orchestrator import OrchestratorError | |
11 | ||
12 | try: | |
13 | import asyncssh | |
14 | except ImportError: | |
15 | asyncssh = None # type: ignore | |
16 | ||
17 | if TYPE_CHECKING: | |
18 | from cephadm.module import CephadmOrchestrator | |
19 | from asyncssh.connection import SSHClientConnection | |
20 | ||
21 | T = TypeVar('T') | |
22 | ||
23 | ||
24 | logger = logging.getLogger(__name__) | |
25 | ||
26 | asyncssh_logger = logging.getLogger('asyncssh') | |
27 | asyncssh_logger.propagate = False | |
28 | ||
29 | DEFAULT_SSH_CONFIG = """ | |
30 | Host * | |
31 | User root | |
32 | StrictHostKeyChecking no | |
33 | UserKnownHostsFile /dev/null | |
34 | ConnectTimeout=30 | |
35 | """ | |
36 | ||
37 | ||
38 | class EventLoopThread(Thread): | |
39 | ||
40 | def __init__(self) -> None: | |
41 | self._loop = asyncio.new_event_loop() | |
42 | asyncio.set_event_loop(self._loop) | |
43 | ||
44 | super().__init__(target=self._loop.run_forever) | |
45 | self.start() | |
46 | ||
47 | def get_result(self, coro: Awaitable[T]) -> T: | |
48 | return asyncio.run_coroutine_threadsafe(coro, self._loop).result() | |
49 | ||
50 | ||
51 | class SSHManager: | |
52 | ||
53 | def __init__(self, mgr: "CephadmOrchestrator"): | |
54 | self.mgr: "CephadmOrchestrator" = mgr | |
55 | self.cons: Dict[str, "SSHClientConnection"] = {} | |
56 | ||
57 | async def _remote_connection(self, | |
58 | host: str, | |
59 | addr: Optional[str] = None, | |
60 | ) -> "SSHClientConnection": | |
33c7a0ef | 61 | if not self.cons.get(host) or host not in self.mgr.inventory: |
20effc67 TL |
62 | if not addr and host in self.mgr.inventory: |
63 | addr = self.mgr.inventory.get_addr(host) | |
64 | ||
65 | if not addr: | |
66 | raise OrchestratorError("host address is empty") | |
67 | ||
68 | assert self.mgr.ssh_user | |
69 | n = self.mgr.ssh_user + '@' + addr | |
70 | logger.debug("Opening connection to {} with ssh options '{}'".format( | |
71 | n, self.mgr._ssh_options)) | |
72 | ||
73 | asyncssh.set_log_level('DEBUG') | |
74 | asyncssh.set_debug_level(3) | |
75 | ||
76 | with self.redirect_log(host, addr): | |
77 | try: | |
33c7a0ef TL |
78 | ssh_options = asyncssh.SSHClientConnectionOptions( |
79 | keepalive_interval=7, keepalive_count_max=3) | |
80 | conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name], | |
81 | known_hosts=None, config=[self.mgr.ssh_config_fname], | |
82 | preferred_auth=['publickey'], options=ssh_options) | |
20effc67 TL |
83 | except OSError: |
84 | raise | |
85 | except asyncssh.Error: | |
86 | raise | |
87 | except Exception: | |
88 | raise | |
89 | self.cons[host] = conn | |
90 | ||
91 | self.mgr.offline_hosts_remove(host) | |
92 | ||
93 | return self.cons[host] | |
94 | ||
95 | @contextmanager | |
96 | def redirect_log(self, host: str, addr: str) -> Iterator[None]: | |
97 | log_string = StringIO() | |
98 | ch = logging.StreamHandler(log_string) | |
33c7a0ef | 99 | ch.setLevel(logging.INFO) |
20effc67 TL |
100 | asyncssh_logger.addHandler(ch) |
101 | ||
102 | try: | |
103 | yield | |
104 | except OSError as e: | |
105 | self.mgr.offline_hosts.add(host) | |
106 | log_content = log_string.getvalue() | |
33c7a0ef | 107 | msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there. {str(e)}" |
20effc67 TL |
108 | logger.exception(msg) |
109 | raise OrchestratorError(msg) | |
110 | except asyncssh.Error as e: | |
111 | self.mgr.offline_hosts.add(host) | |
112 | log_content = log_string.getvalue() | |
113 | msg = f'Failed to connect to {host} ({addr}). {str(e)}' + '\n' + f'Log: {log_content}' | |
114 | logger.debug(msg) | |
115 | raise OrchestratorError(msg) | |
116 | except Exception as e: | |
117 | self.mgr.offline_hosts.add(host) | |
118 | log_content = log_string.getvalue() | |
119 | logger.exception(str(e)) | |
120 | raise OrchestratorError( | |
121 | f'Failed to connect to {host} ({addr}): {repr(e)}' + '\n' f'Log: {log_content}') | |
122 | finally: | |
123 | log_string.flush() | |
124 | asyncssh_logger.removeHandler(ch) | |
125 | ||
126 | def remote_connection(self, | |
127 | host: str, | |
128 | addr: Optional[str] = None, | |
129 | ) -> "SSHClientConnection": | |
130 | return self.mgr.wait_async(self._remote_connection(host, addr)) | |
131 | ||
132 | async def _execute_command(self, | |
133 | host: str, | |
134 | cmd: List[str], | |
135 | stdin: Optional[str] = None, | |
136 | addr: Optional[str] = None, | |
137 | ) -> Tuple[str, str, int]: | |
138 | conn = await self._remote_connection(host, addr) | |
33c7a0ef TL |
139 | sudo_prefix = "sudo " if self.mgr.ssh_user != 'root' else "" |
140 | cmd = sudo_prefix + " ".join(quote(x) for x in cmd) | |
20effc67 TL |
141 | logger.debug(f'Running command: {cmd}') |
142 | try: | |
33c7a0ef | 143 | r = await conn.run('sudo true', check=True, timeout=5) |
20effc67 TL |
144 | r = await conn.run(cmd, input=stdin) |
145 | # handle these Exceptions otherwise you might get a weird error like TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception) | |
33c7a0ef | 146 | except (asyncssh.ChannelOpenError, asyncssh.ProcessError, Exception) as e: |
20effc67 TL |
147 | # SSH connection closed or broken, will create new connection next call |
148 | logger.debug(f'Connection to {host} failed. {str(e)}') | |
149 | await self._reset_con(host) | |
150 | self.mgr.offline_hosts.add(host) | |
151 | raise OrchestratorError(f'Unable to reach remote host {host}. {str(e)}') | |
152 | ||
153 | def _rstrip(v: Union[bytes, str, None]) -> str: | |
154 | if not v: | |
155 | return '' | |
156 | if isinstance(v, str): | |
157 | return v.rstrip('\n') | |
158 | if isinstance(v, bytes): | |
159 | return v.decode().rstrip('\n') | |
160 | raise OrchestratorError( | |
161 | f'Unable to parse ssh output with type {type(v)} from remote host {host}') | |
162 | ||
163 | out = _rstrip(r.stdout) | |
164 | err = _rstrip(r.stderr) | |
165 | rc = r.returncode if r.returncode else 0 | |
166 | ||
167 | return out, err, rc | |
168 | ||
169 | def execute_command(self, | |
170 | host: str, | |
171 | cmd: List[str], | |
172 | stdin: Optional[str] = None, | |
173 | addr: Optional[str] = None, | |
174 | ) -> Tuple[str, str, int]: | |
175 | return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr)) | |
176 | ||
177 | async def _check_execute_command(self, | |
178 | host: str, | |
179 | cmd: List[str], | |
180 | stdin: Optional[str] = None, | |
181 | addr: Optional[str] = None, | |
182 | ) -> str: | |
183 | out, err, code = await self._execute_command(host, cmd, stdin, addr) | |
184 | if code != 0: | |
185 | msg = f'Command {cmd} failed. {err}' | |
186 | logger.debug(msg) | |
187 | raise OrchestratorError(msg) | |
188 | return out | |
189 | ||
190 | def check_execute_command(self, | |
191 | host: str, | |
192 | cmd: List[str], | |
193 | stdin: Optional[str] = None, | |
194 | addr: Optional[str] = None, | |
195 | ) -> str: | |
196 | return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr)) | |
197 | ||
198 | async def _write_remote_file(self, | |
199 | host: str, | |
200 | path: str, | |
201 | content: bytes, | |
202 | mode: Optional[int] = None, | |
203 | uid: Optional[int] = None, | |
204 | gid: Optional[int] = None, | |
205 | addr: Optional[str] = None, | |
206 | ) -> None: | |
207 | try: | |
208 | dirname = os.path.dirname(path) | |
209 | await self._check_execute_command(host, ['mkdir', '-p', dirname], addr=addr) | |
210 | await self._check_execute_command(host, ['mkdir', '-p', '/tmp' + dirname], addr=addr) | |
211 | tmp_path = '/tmp' + path + '.new' | |
212 | await self._check_execute_command(host, ['touch', tmp_path], addr=addr) | |
33c7a0ef | 213 | if self.mgr.ssh_user != 'root': |
20effc67 TL |
214 | assert self.mgr.ssh_user |
215 | await self._check_execute_command(host, ['chown', '-R', self.mgr.ssh_user, tmp_path], addr=addr) | |
216 | await self._check_execute_command(host, ['chmod', str(644), tmp_path], addr=addr) | |
217 | with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f: | |
218 | os.fchmod(f.fileno(), 0o600) | |
219 | f.write(content) | |
220 | f.flush() | |
221 | conn = await self._remote_connection(host, addr) | |
222 | await asyncssh.scp(f.name, (conn, tmp_path)) | |
33c7a0ef TL |
223 | if uid is not None and gid is not None and mode is not None: |
224 | # shlex quote takes str or byte object, not int | |
225 | await self._check_execute_command(host, ['chown', '-R', str(uid) + ':' + str(gid), tmp_path], addr=addr) | |
226 | await self._check_execute_command(host, ['chmod', oct(mode)[2:], tmp_path], addr=addr) | |
20effc67 TL |
227 | await self._check_execute_command(host, ['mv', tmp_path, path], addr=addr) |
228 | except Exception as e: | |
229 | msg = f"Unable to write {host}:{path}: {e}" | |
230 | logger.exception(msg) | |
231 | raise OrchestratorError(msg) | |
232 | ||
233 | def write_remote_file(self, | |
234 | host: str, | |
235 | path: str, | |
236 | content: bytes, | |
237 | mode: Optional[int] = None, | |
238 | uid: Optional[int] = None, | |
239 | gid: Optional[int] = None, | |
240 | addr: Optional[str] = None, | |
241 | ) -> None: | |
242 | self.mgr.wait_async(self._write_remote_file( | |
243 | host, path, content, mode, uid, gid, addr)) | |
244 | ||
245 | async def _reset_con(self, host: str) -> None: | |
246 | conn = self.cons.get(host) | |
247 | if conn: | |
248 | logger.debug(f'_reset_con close {host}') | |
249 | conn.close() | |
250 | del self.cons[host] | |
251 | ||
252 | def reset_con(self, host: str) -> None: | |
253 | self.mgr.wait_async(self._reset_con(host)) | |
254 | ||
255 | def _reset_cons(self) -> None: | |
256 | for host, conn in self.cons.items(): | |
257 | logger.debug(f'_reset_cons close {host}') | |
258 | conn.close() | |
259 | self.cons = {} | |
260 | ||
261 | def _reconfig_ssh(self) -> None: | |
262 | temp_files = [] # type: list | |
263 | ssh_options = [] # type: List[str] | |
264 | ||
265 | # ssh_config | |
266 | self.mgr.ssh_config_fname = self.mgr.ssh_config_file | |
267 | ssh_config = self.mgr.get_store("ssh_config") | |
268 | if ssh_config is not None or self.mgr.ssh_config_fname is None: | |
269 | if not ssh_config: | |
270 | ssh_config = DEFAULT_SSH_CONFIG | |
271 | f = NamedTemporaryFile(prefix='cephadm-conf-') | |
272 | os.fchmod(f.fileno(), 0o600) | |
273 | f.write(ssh_config.encode('utf-8')) | |
274 | f.flush() # make visible to other processes | |
275 | temp_files += [f] | |
276 | self.mgr.ssh_config_fname = f.name | |
277 | if self.mgr.ssh_config_fname: | |
278 | self.mgr.validate_ssh_config_fname(self.mgr.ssh_config_fname) | |
279 | ssh_options += ['-F', self.mgr.ssh_config_fname] | |
280 | self.mgr.ssh_config = ssh_config | |
281 | ||
282 | # identity | |
283 | ssh_key = self.mgr.get_store("ssh_identity_key") | |
284 | ssh_pub = self.mgr.get_store("ssh_identity_pub") | |
285 | self.mgr.ssh_pub = ssh_pub | |
286 | self.mgr.ssh_key = ssh_key | |
287 | if ssh_key and ssh_pub: | |
288 | self.mgr.tkey = NamedTemporaryFile(prefix='cephadm-identity-') | |
289 | self.mgr.tkey.write(ssh_key.encode('utf-8')) | |
290 | os.fchmod(self.mgr.tkey.fileno(), 0o600) | |
291 | self.mgr.tkey.flush() # make visible to other processes | |
292 | tpub = open(self.mgr.tkey.name + '.pub', 'w') | |
293 | os.fchmod(tpub.fileno(), 0o600) | |
294 | tpub.write(ssh_pub) | |
295 | tpub.flush() # make visible to other processes | |
296 | temp_files += [self.mgr.tkey, tpub] | |
297 | ssh_options += ['-i', self.mgr.tkey.name] | |
298 | ||
299 | self.mgr._temp_files = temp_files | |
300 | if ssh_options: | |
301 | self.mgr._ssh_options = ' '.join(ssh_options) | |
302 | else: | |
303 | self.mgr._ssh_options = None | |
304 | ||
305 | if self.mgr.mode == 'root': | |
306 | self.mgr.ssh_user = self.mgr.get_store('ssh_user', default='root') | |
307 | elif self.mgr.mode == 'cephadm-package': | |
308 | self.mgr.ssh_user = 'cephadm' | |
309 | ||
310 | self._reset_cons() |