]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/ssh.py
e874ba6d78fe860b6ed969f1c6d15a7c71d13193
4 from tempfile
import NamedTemporaryFile
5 from threading
import Thread
6 from contextlib
import contextmanager
7 from io
import StringIO
8 from shlex
import quote
9 from typing
import TYPE_CHECKING
, Optional
, List
, Tuple
, Dict
, Iterator
, TypeVar
, Awaitable
, Union
10 from orchestrator
import OrchestratorError
15 asyncssh
= None # type: ignore
18 from cephadm
. module
import CephadmOrchestrator
19 from asyncssh
. connection
import SSHClientConnection
24 logger
= logging
. getLogger ( __name__
)
26 asyncssh_logger
= logging
. getLogger ( 'asyncssh' )
27 asyncssh_logger
. propagate
= False
29 DEFAULT_SSH_CONFIG
= """
32 StrictHostKeyChecking no
33 UserKnownHostsFile /dev/null
38 class EventLoopThread ( Thread
):
40 def __init__ ( self
) -> None :
41 self
._l oop
= asyncio
. new_event_loop ()
42 asyncio
. set_event_loop ( self
._l oop
)
44 super () .__ init
__ ( target
= self
._l oop
. run_forever
)
47 def get_result ( self
, coro
: Awaitable
[ T
]) -> T
:
48 return asyncio
. run_coroutine_threadsafe ( coro
, self
._l oop
). result ()
53 def __init__ ( self
, mgr
: "CephadmOrchestrator" ):
54 self
. mgr
: "CephadmOrchestrator" = mgr
55 self
. cons
: Dict
[ str , "SSHClientConnection" ] = {}
57 async def _remote_connection ( self
,
59 addr
: Optional
[ str ] = None ,
60 ) -> "SSHClientConnection" :
61 if not self
. cons
. get ( host
) or host
not in self
. mgr
. inventory
:
62 if not addr
and host
in self
. mgr
. inventory
:
63 addr
= self
. mgr
. inventory
. get_addr ( host
)
66 raise OrchestratorError ( "host address is empty" )
68 assert self
. mgr
. ssh_user
69 n
= self
. mgr
. ssh_user
+ '@' + addr
70 logger
. debug ( "Opening connection to {} with ssh options '{}'" . format (
71 n
, self
. mgr
._ ssh
_ options
))
73 asyncssh
. set_log_level ( 'DEBUG' )
74 asyncssh
. set_debug_level ( 3 )
76 with self
. redirect_log ( host
, addr
):
78 ssh_options
= asyncssh
. SSHClientConnectionOptions (
79 keepalive_interval
= 7 , keepalive_count_max
= 3 )
80 conn
= await asyncssh
. connect ( addr
, username
= self
. mgr
. ssh_user
, client_keys
=[ self
. mgr
. tkey
. name
],
81 known_hosts
= None , config
=[ self
. mgr
. ssh_config_fname
],
82 preferred_auth
=[ 'publickey' ], options
= ssh_options
)
85 except asyncssh
. Error
:
89 self
. cons
[ host
] = conn
91 self
. mgr
. offline_hosts_remove ( host
)
93 return self
. cons
[ host
]
96 def redirect_log ( self
, host
: str , addr
: str ) -> Iterator
[ None ]:
97 log_string
= StringIO ()
98 ch
= logging
. StreamHandler ( log_string
)
99 ch
. setLevel ( logging
. INFO
)
100 asyncssh_logger
. addHandler ( ch
)
105 self
. mgr
. offline_hosts
. add ( host
)
106 log_content
= log_string
. getvalue ()
107 msg
= f
"Can't communicate with remote host ` {addr} `, possibly because python3 is not installed there. {str(e)}"
108 logger
. exception ( msg
)
109 raise OrchestratorError ( msg
)
110 except asyncssh
. Error
as e
:
111 self
. mgr
. offline_hosts
. add ( host
)
112 log_content
= log_string
. getvalue ()
113 msg
= f
'Failed to connect to {host} ( {addr} ). {str(e)}' + ' \n ' + f
'Log: {log_content} '
115 raise OrchestratorError ( msg
)
116 except Exception as e
:
117 self
. mgr
. offline_hosts
. add ( host
)
118 log_content
= log_string
. getvalue ()
119 logger
. exception ( str ( e
))
120 raise OrchestratorError (
121 f
'Failed to connect to {host} ( {addr} ): {repr(e)}' + ' \n ' f
'Log: {log_content} ' )
124 asyncssh_logger
. removeHandler ( ch
)
126 def remote_connection ( self
,
128 addr
: Optional
[ str ] = None ,
129 ) -> "SSHClientConnection" :
130 return self
. mgr
. wait_async ( self
._ remote
_ connection
( host
, addr
))
132 async def _execute_command ( self
,
135 stdin
: Optional
[ str ] = None ,
136 addr
: Optional
[ str ] = None ,
137 ) -> Tuple
[ str , str , int ]:
138 conn
= await self
._ remote
_ connection
( host
, addr
)
139 sudo_prefix
= "sudo " if self
. mgr
. ssh_user
!= 'root' else ""
140 cmd
= sudo_prefix
+ " " . join ( quote ( x
) for x
in cmd
)
141 logger
. debug ( f
'Running command: {cmd} ' )
143 r
= await conn
. run ( 'sudo true' , check
= True , timeout
= 5 )
144 r
= await conn
. run ( cmd
, input = stdin
)
145 # handle these Exceptions otherwise you might get a weird error like TypeError: __init__() missing 1 required positional argument: 'reason' (due to the asyncssh error interacting with raise_if_exception)
146 except ( asyncssh
. ChannelOpenError
, asyncssh
. ProcessError
, Exception ) as e
:
147 # SSH connection closed or broken, will create new connection next call
148 logger
. debug ( f
'Connection to {host} failed. {str(e)}' )
149 await self
._ reset
_ con
( host
)
150 self
. mgr
. offline_hosts
. add ( host
)
151 raise OrchestratorError ( f
'Unable to reach remote host {host} . {str(e)}' )
153 def _rstrip ( v
: Union
[ bytes
, str , None ]) -> str :
156 if isinstance ( v
, str ):
157 return v
. rstrip ( ' \n ' )
158 if isinstance ( v
, bytes
):
159 return v
. decode (). rstrip ( ' \n ' )
160 raise OrchestratorError (
161 f
'Unable to parse ssh output with type {type(v)} from remote host {host} ' )
163 out
= _rstrip ( r
. stdout
)
164 err
= _rstrip ( r
. stderr
)
165 rc
= r
. returncode
if r
. returncode
else 0
169 def execute_command ( self
,
172 stdin
: Optional
[ str ] = None ,
173 addr
: Optional
[ str ] = None ,
174 ) -> Tuple
[ str , str , int ]:
175 return self
. mgr
. wait_async ( self
._ execute
_ command
( host
, cmd
, stdin
, addr
))
177 async def _check_execute_command ( self
,
180 stdin
: Optional
[ str ] = None ,
181 addr
: Optional
[ str ] = None ,
183 out
, err
, code
= await self
._ execute
_ command
( host
, cmd
, stdin
, addr
)
185 msg
= f
'Command {cmd} failed. {err} '
187 raise OrchestratorError ( msg
)
190 def check_execute_command ( self
,
193 stdin
: Optional
[ str ] = None ,
194 addr
: Optional
[ str ] = None ,
196 return self
. mgr
. wait_async ( self
._ check
_ execute
_ command
( host
, cmd
, stdin
, addr
))
198 async def _write_remote_file ( self
,
202 mode
: Optional
[ int ] = None ,
203 uid
: Optional
[ int ] = None ,
204 gid
: Optional
[ int ] = None ,
205 addr
: Optional
[ str ] = None ,
208 dirname
= os
. path
. dirname ( path
)
209 await self
._ check
_ execute
_ command
( host
, [ 'mkdir' , '-p' , dirname
], addr
= addr
)
210 await self
._ check
_ execute
_ command
( host
, [ 'mkdir' , '-p' , '/tmp' + dirname
], addr
= addr
)
211 tmp_path
= '/tmp' + path
+ '.new'
212 await self
._ check
_ execute
_ command
( host
, [ 'touch' , tmp_path
], addr
= addr
)
213 if self
. mgr
. ssh_user
!= 'root' :
214 assert self
. mgr
. ssh_user
215 await self
._ check
_ execute
_ command
( host
, [ 'chown' , '-R' , self
. mgr
. ssh_user
, tmp_path
], addr
= addr
)
216 await self
._ check
_ execute
_ command
( host
, [ 'chmod' , str ( 644 ), tmp_path
], addr
= addr
)
217 with
NamedTemporaryFile ( prefix
= 'cephadm-write-remote-file-' ) as f
:
218 os
. fchmod ( f
. fileno (), 0o600 )
221 conn
= await self
._ remote
_ connection
( host
, addr
)
222 await asyncssh
. scp ( f
. name
, ( conn
, tmp_path
))
223 if uid
is not None and gid
is not None and mode
is not None :
224 # shlex quote takes str or byte object, not int
225 await self
._ check
_ execute
_ command
( host
, [ 'chown' , '-R' , str ( uid
) + ':' + str ( gid
), tmp_path
], addr
= addr
)
226 await self
._ check
_ execute
_ command
( host
, [ 'chmod' , oct ( mode
)[ 2 :], tmp_path
], addr
= addr
)
227 await self
._ check
_ execute
_ command
( host
, [ 'mv' , tmp_path
, path
], addr
= addr
)
228 except Exception as e
:
229 msg
= f
"Unable to write {host} : {path} : {e} "
230 logger
. exception ( msg
)
231 raise OrchestratorError ( msg
)
233 def write_remote_file ( self
,
237 mode
: Optional
[ int ] = None ,
238 uid
: Optional
[ int ] = None ,
239 gid
: Optional
[ int ] = None ,
240 addr
: Optional
[ str ] = None ,
242 self
. mgr
. wait_async ( self
._ write
_ remote
_ file
(
243 host
, path
, content
, mode
, uid
, gid
, addr
))
245 async def _reset_con ( self
, host
: str ) -> None :
246 conn
= self
. cons
. get ( host
)
248 logger
. debug ( f
'_reset_con close {host} ' )
252 def reset_con ( self
, host
: str ) -> None :
253 self
. mgr
. wait_async ( self
._ reset
_ con
( host
))
255 def _reset_cons ( self
) -> None :
256 for host
, conn
in self
. cons
. items ():
257 logger
. debug ( f
'_reset_cons close {host} ' )
261 def _reconfig_ssh ( self
) -> None :
262 temp_files
= [] # type: list
263 ssh_options
= [] # type: List[str]
266 self
. mgr
. ssh_config_fname
= self
. mgr
. ssh_config_file
267 ssh_config
= self
. mgr
. get_store ( "ssh_config" )
268 if ssh_config
is not None or self
. mgr
. ssh_config_fname
is None :
270 ssh_config
= DEFAULT_SSH_CONFIG
271 f
= NamedTemporaryFile ( prefix
= 'cephadm-conf-' )
272 os
. fchmod ( f
. fileno (), 0o600 )
273 f
. write ( ssh_config
. encode ( 'utf-8' ))
274 f
. flush () # make visible to other processes
276 self
. mgr
. ssh_config_fname
= f
. name
277 if self
. mgr
. ssh_config_fname
:
278 self
. mgr
. validate_ssh_config_fname ( self
. mgr
. ssh_config_fname
)
279 ssh_options
+= [ '-F' , self
. mgr
. ssh_config_fname
]
280 self
. mgr
. ssh_config
= ssh_config
283 ssh_key
= self
. mgr
. get_store ( "ssh_identity_key" )
284 ssh_pub
= self
. mgr
. get_store ( "ssh_identity_pub" )
285 self
. mgr
. ssh_pub
= ssh_pub
286 self
. mgr
. ssh_key
= ssh_key
287 if ssh_key
and ssh_pub
:
288 self
. mgr
. tkey
= NamedTemporaryFile ( prefix
= 'cephadm-identity-' )
289 self
. mgr
. tkey
. write ( ssh_key
. encode ( 'utf-8' ))
290 os
. fchmod ( self
. mgr
. tkey
. fileno (), 0o600 )
291 self
. mgr
. tkey
. flush () # make visible to other processes
292 tpub
= open ( self
. mgr
. tkey
. name
+ '.pub' , 'w' )
293 os
. fchmod ( tpub
. fileno (), 0o600 )
295 tpub
. flush () # make visible to other processes
296 temp_files
+= [ self
. mgr
. tkey
, tpub
]
297 ssh_options
+= [ '-i' , self
. mgr
. tkey
. name
]
299 self
. mgr
._ temp
_ files
= temp_files
301 self
. mgr
._ ssh
_ options
= ' ' . join ( ssh_options
)
303 self
. mgr
._ ssh
_ options
= None
305 if self
. mgr
. mode
== 'root' :
306 self
. mgr
. ssh_user
= self
. mgr
. get_store ( 'ssh_user' , default
= 'root' )
307 elif self
. mgr
. mode
== 'cephadm-package' :
308 self
. mgr
. ssh_user
= 'cephadm'