]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/ssh.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / ssh.py
CommitLineData
20effc67
TL
1import logging
2import os
3import asyncio
4from tempfile import NamedTemporaryFile
5from threading import Thread
6from contextlib import contextmanager
7from io import StringIO
8from shlex import quote
9from typing import TYPE_CHECKING, Optional, List, Tuple, Dict, Iterator, TypeVar, Awaitable, Union
10from orchestrator import OrchestratorError
11
12try:
13 import asyncssh
14except ImportError:
15 asyncssh = None # type: ignore
16
17if TYPE_CHECKING:
18 from cephadm.module import CephadmOrchestrator
19 from asyncssh.connection import SSHClientConnection
20
21T = TypeVar('T')
22
23
24logger = logging.getLogger(__name__)
25
26asyncssh_logger = logging.getLogger('asyncssh')
27asyncssh_logger.propagate = False
28
29DEFAULT_SSH_CONFIG = """
30Host *
31 User root
32 StrictHostKeyChecking no
33 UserKnownHostsFile /dev/null
34 ConnectTimeout=30
35"""
36
37
38class EventLoopThread(Thread):
39
40 def __init__(self) -> None:
41 self._loop = asyncio.new_event_loop()
42 asyncio.set_event_loop(self._loop)
43
44 super().__init__(target=self._loop.run_forever)
45 self.start()
46
47 def get_result(self, coro: Awaitable[T]) -> T:
48 return asyncio.run_coroutine_threadsafe(coro, self._loop).result()
49
50
51class SSHManager:
52
53 def __init__(self, mgr: "CephadmOrchestrator"):
54 self.mgr: "CephadmOrchestrator" = mgr
55 self.cons: Dict[str, "SSHClientConnection"] = {}
56
57 async def _remote_connection(self,
58 host: str,
59 addr: Optional[str] = None,
60 ) -> "SSHClientConnection":
33c7a0ef 61 if not self.cons.get(host) or host not in self.mgr.inventory:
20effc67
TL
62 if not addr and host in self.mgr.inventory:
63 addr = self.mgr.inventory.get_addr(host)
64
65 if not addr:
66 raise OrchestratorError("host address is empty")
67
68 assert self.mgr.ssh_user
69 n = self.mgr.ssh_user + '@' + addr
70 logger.debug("Opening connection to {} with ssh options '{}'".format(
71 n, self.mgr._ssh_options))
72
73 asyncssh.set_log_level('DEBUG')
74 asyncssh.set_debug_level(3)
75
76 with self.redirect_log(host, addr):
77 try:
33c7a0ef
TL
78 ssh_options = asyncssh.SSHClientConnectionOptions(
79 keepalive_interval=7, keepalive_count_max=3)
80 conn = await asyncssh.connect(addr, username=self.mgr.ssh_user, client_keys=[self.mgr.tkey.name],
81 known_hosts=None, config=[self.mgr.ssh_config_fname],
82 preferred_auth=['publickey'], options=ssh_options)
20effc67
TL
83 except OSError:
84 raise
85 except asyncssh.Error:
86 raise
87 except Exception:
88 raise
89 self.cons[host] = conn
90
91 self.mgr.offline_hosts_remove(host)
92
93 return self.cons[host]
94
95 @contextmanager
96 def redirect_log(self, host: str, addr: str) -> Iterator[None]:
97 log_string = StringIO()
98 ch = logging.StreamHandler(log_string)
33c7a0ef 99 ch.setLevel(logging.INFO)
20effc67
TL
100 asyncssh_logger.addHandler(ch)
101
102 try:
103 yield
104 except OSError as e:
105 self.mgr.offline_hosts.add(host)
106 log_content = log_string.getvalue()
33c7a0ef 107 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there. {str(e)}"
20effc67
TL
108 logger.exception(msg)
109 raise OrchestratorError(msg)
110 except asyncssh.Error as e:
111 self.mgr.offline_hosts.add(host)
112 log_content = log_string.getvalue()
113 msg = f'Failed to connect to {host} ({addr}). {str(e)}' + '\n' + f'Log: {log_content}'
114 logger.debug(msg)
115 raise OrchestratorError(msg)
116 except Exception as e:
117 self.mgr.offline_hosts.add(host)
118 log_content = log_string.getvalue()
119 logger.exception(str(e))
120 raise OrchestratorError(
121 f'Failed to connect to {host} ({addr}): {repr(e)}' + '\n' f'Log: {log_content}')
122 finally:
123 log_string.flush()
124 asyncssh_logger.removeHandler(ch)
125
126 def remote_connection(self,
127 host: str,
128 addr: Optional[str] = None,
129 ) -> "SSHClientConnection":
130 return self.mgr.wait_async(self._remote_connection(host, addr))
131
132 async def _execute_command(self,
133 host: str,
134 cmd: List[str],
135 stdin: Optional[str] = None,
136 addr: Optional[str] = None,
137 ) -> Tuple[str, str, int]:
138 conn = await self._remote_connection(host, addr)
33c7a0ef
TL
139 sudo_prefix = "sudo " if self.mgr.ssh_user != 'root' else ""
140 cmd = sudo_prefix + " ".join(quote(x) for x in cmd)
20effc67
TL
141 logger.debug(f'Running command: {cmd}')
142 try:
33c7a0ef 143 r = await conn.run('sudo true', check=True, timeout=5)
20effc67
TL
144 r = await conn.run(cmd, input=stdin)
145 # handle these Exceptions otherwise you might get a weird error like TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception)
33c7a0ef 146 except (asyncssh.ChannelOpenError, asyncssh.ProcessError, Exception) as e:
20effc67
TL
147 # SSH connection closed or broken, will create new connection next call
148 logger.debug(f'Connection to {host} failed. {str(e)}')
149 await self._reset_con(host)
150 self.mgr.offline_hosts.add(host)
151 raise OrchestratorError(f'Unable to reach remote host {host}. {str(e)}')
152
153 def _rstrip(v: Union[bytes, str, None]) -> str:
154 if not v:
155 return ''
156 if isinstance(v, str):
157 return v.rstrip('\n')
158 if isinstance(v, bytes):
159 return v.decode().rstrip('\n')
160 raise OrchestratorError(
161 f'Unable to parse ssh output with type {type(v)} from remote host {host}')
162
163 out = _rstrip(r.stdout)
164 err = _rstrip(r.stderr)
165 rc = r.returncode if r.returncode else 0
166
167 return out, err, rc
168
169 def execute_command(self,
170 host: str,
171 cmd: List[str],
172 stdin: Optional[str] = None,
173 addr: Optional[str] = None,
174 ) -> Tuple[str, str, int]:
175 return self.mgr.wait_async(self._execute_command(host, cmd, stdin, addr))
176
177 async def _check_execute_command(self,
178 host: str,
179 cmd: List[str],
180 stdin: Optional[str] = None,
181 addr: Optional[str] = None,
182 ) -> str:
183 out, err, code = await self._execute_command(host, cmd, stdin, addr)
184 if code != 0:
185 msg = f'Command {cmd} failed. {err}'
186 logger.debug(msg)
187 raise OrchestratorError(msg)
188 return out
189
190 def check_execute_command(self,
191 host: str,
192 cmd: List[str],
193 stdin: Optional[str] = None,
194 addr: Optional[str] = None,
195 ) -> str:
196 return self.mgr.wait_async(self._check_execute_command(host, cmd, stdin, addr))
197
198 async def _write_remote_file(self,
199 host: str,
200 path: str,
201 content: bytes,
202 mode: Optional[int] = None,
203 uid: Optional[int] = None,
204 gid: Optional[int] = None,
205 addr: Optional[str] = None,
206 ) -> None:
207 try:
208 dirname = os.path.dirname(path)
209 await self._check_execute_command(host, ['mkdir', '-p', dirname], addr=addr)
210 await self._check_execute_command(host, ['mkdir', '-p', '/tmp' + dirname], addr=addr)
211 tmp_path = '/tmp' + path + '.new'
212 await self._check_execute_command(host, ['touch', tmp_path], addr=addr)
33c7a0ef 213 if self.mgr.ssh_user != 'root':
20effc67
TL
214 assert self.mgr.ssh_user
215 await self._check_execute_command(host, ['chown', '-R', self.mgr.ssh_user, tmp_path], addr=addr)
216 await self._check_execute_command(host, ['chmod', str(644), tmp_path], addr=addr)
217 with NamedTemporaryFile(prefix='cephadm-write-remote-file-') as f:
218 os.fchmod(f.fileno(), 0o600)
219 f.write(content)
220 f.flush()
221 conn = await self._remote_connection(host, addr)
222 await asyncssh.scp(f.name, (conn, tmp_path))
33c7a0ef
TL
223 if uid is not None and gid is not None and mode is not None:
224 # shlex quote takes str or byte object, not int
225 await self._check_execute_command(host, ['chown', '-R', str(uid) + ':' + str(gid), tmp_path], addr=addr)
226 await self._check_execute_command(host, ['chmod', oct(mode)[2:], tmp_path], addr=addr)
20effc67
TL
227 await self._check_execute_command(host, ['mv', tmp_path, path], addr=addr)
228 except Exception as e:
229 msg = f"Unable to write {host}:{path}: {e}"
230 logger.exception(msg)
231 raise OrchestratorError(msg)
232
233 def write_remote_file(self,
234 host: str,
235 path: str,
236 content: bytes,
237 mode: Optional[int] = None,
238 uid: Optional[int] = None,
239 gid: Optional[int] = None,
240 addr: Optional[str] = None,
241 ) -> None:
242 self.mgr.wait_async(self._write_remote_file(
243 host, path, content, mode, uid, gid, addr))
244
245 async def _reset_con(self, host: str) -> None:
246 conn = self.cons.get(host)
247 if conn:
248 logger.debug(f'_reset_con close {host}')
249 conn.close()
250 del self.cons[host]
251
252 def reset_con(self, host: str) -> None:
253 self.mgr.wait_async(self._reset_con(host))
254
255 def _reset_cons(self) -> None:
256 for host, conn in self.cons.items():
257 logger.debug(f'_reset_cons close {host}')
258 conn.close()
259 self.cons = {}
260
261 def _reconfig_ssh(self) -> None:
262 temp_files = [] # type: list
263 ssh_options = [] # type: List[str]
264
265 # ssh_config
266 self.mgr.ssh_config_fname = self.mgr.ssh_config_file
267 ssh_config = self.mgr.get_store("ssh_config")
268 if ssh_config is not None or self.mgr.ssh_config_fname is None:
269 if not ssh_config:
270 ssh_config = DEFAULT_SSH_CONFIG
271 f = NamedTemporaryFile(prefix='cephadm-conf-')
272 os.fchmod(f.fileno(), 0o600)
273 f.write(ssh_config.encode('utf-8'))
274 f.flush() # make visible to other processes
275 temp_files += [f]
276 self.mgr.ssh_config_fname = f.name
277 if self.mgr.ssh_config_fname:
278 self.mgr.validate_ssh_config_fname(self.mgr.ssh_config_fname)
279 ssh_options += ['-F', self.mgr.ssh_config_fname]
280 self.mgr.ssh_config = ssh_config
281
282 # identity
283 ssh_key = self.mgr.get_store("ssh_identity_key")
284 ssh_pub = self.mgr.get_store("ssh_identity_pub")
285 self.mgr.ssh_pub = ssh_pub
286 self.mgr.ssh_key = ssh_key
287 if ssh_key and ssh_pub:
288 self.mgr.tkey = NamedTemporaryFile(prefix='cephadm-identity-')
289 self.mgr.tkey.write(ssh_key.encode('utf-8'))
290 os.fchmod(self.mgr.tkey.fileno(), 0o600)
291 self.mgr.tkey.flush() # make visible to other processes
292 tpub = open(self.mgr.tkey.name + '.pub', 'w')
293 os.fchmod(tpub.fileno(), 0o600)
294 tpub.write(ssh_pub)
295 tpub.flush() # make visible to other processes
296 temp_files += [self.mgr.tkey, tpub]
297 ssh_options += ['-i', self.mgr.tkey.name]
298
299 self.mgr._temp_files = temp_files
300 if ssh_options:
301 self.mgr._ssh_options = ' '.join(ssh_options)
302 else:
303 self.mgr._ssh_options = None
304
305 if self.mgr.mode == 'root':
306 self.mgr.ssh_user = self.mgr.get_store('ssh_user', default='root')
307 elif self.mgr.mode == 'cephadm-package':
308 self.mgr.ssh_user = 'cephadm'
309
310 self._reset_cons()