]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/agent.py
fa75a8759bbcf3907ef8bd941b25b612106418f6
11 from mgr_module
import ServiceInfoT
12 from mgr_util
import verify_tls_files
, build_url
13 from orchestrator
import DaemonDescriptionStatus
, OrchestratorError
14 from orchestrator
._interface
import daemon_type_to_service
15 from ceph
.utils
import datetime_now
16 from ceph
.deployment
.inventory
import Devices
17 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
18 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
19 from cephadm
.services
.ingress
import IngressSpec
21 from datetime
import datetime
, timedelta
22 from cryptography
import x509
23 from cryptography
.x509
.oid
import NameOID
24 from cryptography
.hazmat
.primitives
.asymmetric
import rsa
25 from cryptography
.hazmat
.primitives
import hashes
, serialization
26 from cryptography
.hazmat
.backends
import default_backend
28 from typing
import Any
, Dict
, List
, Set
, Tuple
, \
29 TYPE_CHECKING
, Optional
, cast
, Collection
32 from cephadm
.module
import CephadmOrchestrator
35 def cherrypy_filter(record
: logging
.LogRecord
) -> int:
37 'TLSV1_ALERT_DECRYPT_ERROR'
39 msg
= record
.getMessage()
40 return not any([m
for m
in blocked
if m
in msg
])
43 logging
.getLogger('cherrypy.error').addFilter(cherrypy_filter
)
44 cherrypy
.log
.access_log
.propagate
= False
47 class CherryPyThread(threading
.Thread
):
48 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
50 self
.cherrypy_shutdown_event
= threading
.Event()
51 self
.ssl_certs
= SSLCerts(self
.mgr
)
52 self
.server_port
= 7150
53 self
.server_addr
= self
.mgr
.get_mgr_ip()
54 super(CherryPyThread
, self
).__init
__(target
=self
.run
)
56 def configure_cherrypy(self
) -> None:
57 cherrypy
.config
.update({
58 'environment': 'production',
59 'server.socket_host': self
.server_addr
,
60 'server.socket_port': self
.server_port
,
61 'engine.autoreload.on': False,
62 'server.ssl_module': 'builtin',
63 'server.ssl_certificate': self
.cert_tmp
.name
,
64 'server.ssl_private_key': self
.key_tmp
.name
,
69 host_data
= HostData(self
.mgr
)
70 d
= cherrypy
.dispatch
.RoutesDispatcher()
71 d
.connect(name
='index', route
='/', controller
=root
.index
)
72 d
.connect(name
='sd-config', route
='/prometheus/sd-config', controller
=root
.get_sd_config
)
73 d
.connect(name
='rules', route
='/prometheus/rules', controller
=root
.get_prometheus_rules
)
74 d
.connect(name
='host-data', route
='/data', controller
=host_data
.POST
,
75 conditions
=dict(method
=['POST']))
77 conf
= {'/': {'request.dispatch': d
}}
78 cherrypy
.tree
.mount(None, "/", config
=conf
)
80 def run(self
) -> None:
83 old_creds
= self
.mgr
.get_store('cephadm_endpoint_credentials')
85 raise OrchestratorError('No old credentials for cephadm endpoint found')
86 old_creds_dict
= json
.loads(old_creds
)
87 old_key
= old_creds_dict
['key']
88 old_cert
= old_creds_dict
['cert']
89 self
.ssl_certs
.load_root_credentials(old_cert
, old_key
)
90 except (OrchestratorError
, json
.decoder
.JSONDecodeError
, KeyError, ValueError):
91 self
.ssl_certs
.generate_root_cert()
93 cert
, key
= self
.ssl_certs
.generate_cert()
95 self
.key_tmp
= tempfile
.NamedTemporaryFile()
96 self
.key_tmp
.write(key
.encode('utf-8'))
97 self
.key_tmp
.flush() # pkey_tmp must not be gc'ed
98 key_fname
= self
.key_tmp
.name
100 self
.cert_tmp
= tempfile
.NamedTemporaryFile()
101 self
.cert_tmp
.write(cert
.encode('utf-8'))
102 self
.cert_tmp
.flush() # cert_tmp must not be gc'ed
103 cert_fname
= self
.cert_tmp
.name
105 verify_tls_files(cert_fname
, key_fname
)
106 self
.configure_cherrypy()
108 self
.mgr
.log
.debug('Starting cherrypy engine...')
110 self
.mgr
.log
.debug('Cherrypy engine started.')
111 cephadm_endpoint_creds
= {
112 'cert': self
.ssl_certs
.get_root_cert(),
113 'key': self
.ssl_certs
.get_root_key()
115 self
.mgr
.set_store('cephadm_endpoint_credentials', json
.dumps(cephadm_endpoint_creds
))
116 self
.mgr
._kick
_serve
_loop
()
117 # wait for the shutdown event
118 self
.cherrypy_shutdown_event
.wait()
119 self
.cherrypy_shutdown_event
.clear()
120 cherrypy
.engine
.stop()
121 self
.mgr
.log
.debug('Cherrypy engine stopped.')
122 except Exception as e
:
123 self
.mgr
.log
.error(f
'Failed to run cephadm cherrypy endpoint: {e}')
125 def start_engine(self
) -> None:
126 port_connect_attempts
= 0
127 while port_connect_attempts
< 150:
129 cherrypy
.engine
.start()
130 self
.mgr
.log
.debug(f
'Cephadm endpoint connected to port {self.server_port}')
132 except cherrypy
.process
.wspbus
.ChannelFailures
as e
:
134 f
'{e}. Trying next port.')
135 self
.server_port
+= 1
136 cherrypy
.server
.httpserver
= None
137 cherrypy
.config
.update({
138 'server.socket_port': self
.server_port
140 port_connect_attempts
+= 1
142 'Cephadm Endpoint could not find free port in range 7150-7300 and failed to start')
144 def shutdown(self
) -> None:
145 self
.mgr
.log
.debug('Stopping cherrypy engine...')
146 self
.cherrypy_shutdown_event
.set()
151 # collapse everything to '/'
152 def _cp_dispatch(self
, vpath
: str) -> 'Root':
153 cherrypy
.request
.path
= ''
156 def __init__(self
, mgr
: "CephadmOrchestrator"):
160 def index(self
) -> str:
161 return '''<!DOCTYPE html>
163 <head><title>Cephadm HTTP Endpoint</title></head>
165 <h2>Cephadm Service Discovery Endpoints</h2>
166 <p><a href='prometheus/sd-config?service=mgr-prometheus'>mgr/Prometheus http sd-config</a></p>
167 <p><a href='prometheus/sd-config?service=alertmanager'>Alertmanager http sd-config</a></p>
168 <p><a href='prometheus/sd-config?service=node-exporter'>Node exporter http sd-config</a></p>
169 <p><a href='prometheus/sd-config?service=haproxy'>HAProxy http sd-config</a></p>
170 <p><a href='prometheus/rules'>Prometheus rules</a></p>
175 @cherrypy.tools
.json_out()
176 def get_sd_config(self
, service
: str) -> List
[Dict
[str, Collection
[str]]]:
177 """Return <http_sd_config> compatible prometheus config for the specified service."""
178 if service
== 'mgr-prometheus':
179 return self
.prometheus_sd_config()
180 elif service
== 'alertmanager':
181 return self
.alertmgr_sd_config()
182 elif service
== 'node-exporter':
183 return self
.node_exporter_sd_config()
184 elif service
== 'haproxy':
185 return self
.haproxy_sd_config()
189 def prometheus_sd_config(self
) -> List
[Dict
[str, Collection
[str]]]:
190 """Return <http_sd_config> compatible prometheus config for prometheus service."""
191 servers
= self
.mgr
.list_servers()
193 for server
in servers
:
194 hostname
= server
.get('hostname', '')
195 for service
in cast(List
[ServiceInfoT
], server
.get('services', [])):
196 if service
['type'] != 'mgr':
198 port
= self
.mgr
.get_module_option_ex('prometheus', 'server_port', 9283)
199 targets
.append(f
'{hostname}:{port}')
200 return [{"targets": targets
, "labels": {}}]
202 def alertmgr_sd_config(self
) -> List
[Dict
[str, Collection
[str]]]:
203 """Return <http_sd_config> compatible prometheus config for mgr alertmanager service."""
205 for dd
in self
.mgr
.cache
.get_daemons_by_service('alertmanager'):
206 assert dd
.hostname
is not None
207 addr
= dd
.ip
if dd
.ip
else self
.mgr
.inventory
.get_addr(dd
.hostname
)
208 port
= dd
.ports
[0] if dd
.ports
else 9093
209 srv_entries
.append('{}'.format(build_url(host
=addr
, port
=port
).lstrip('/')))
210 return [{"targets": srv_entries
, "labels": {}}]
212 def node_exporter_sd_config(self
) -> List
[Dict
[str, Collection
[str]]]:
213 """Return <http_sd_config> compatible prometheus config for node-exporter service."""
215 for dd
in self
.mgr
.cache
.get_daemons_by_service('node-exporter'):
216 assert dd
.hostname
is not None
217 addr
= dd
.ip
if dd
.ip
else self
.mgr
.inventory
.get_addr(dd
.hostname
)
218 port
= dd
.ports
[0] if dd
.ports
else 9100
220 'targets': [build_url(host
=addr
, port
=port
).lstrip('/')],
221 'labels': {'instance': dd
.hostname
}
225 def haproxy_sd_config(self
) -> List
[Dict
[str, Collection
[str]]]:
226 """Return <http_sd_config> compatible prometheus config for haproxy service."""
228 for dd
in self
.mgr
.cache
.get_daemons_by_type('ingress'):
229 if dd
.service_name() in self
.mgr
.spec_store
:
230 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[dd
.service_name()].spec
)
231 assert dd
.hostname
is not None
232 if dd
.daemon_type
== 'haproxy':
233 addr
= self
.mgr
.inventory
.get_addr(dd
.hostname
)
235 'targets': [f
"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"],
236 'labels': {'instance': dd
.service_name()}
240 @cherrypy.expose(alias
='prometheus/rules')
241 def get_prometheus_rules(self
) -> str:
242 """Return currently configured prometheus rules as Yaml."""
243 cherrypy
.response
.headers
['Content-Type'] = 'text/plain'
244 with
open(self
.mgr
.prometheus_alerts_path
, 'r', encoding
='utf-8') as f
:
251 def __init__(self
, mgr
: "CephadmOrchestrator"):
254 @cherrypy.tools
.json_in()
255 @cherrypy.tools
.json_out()
256 def POST(self
) -> Dict
[str, Any
]:
257 data
: Dict
[str, Any
] = cherrypy
.request
.json
258 results
: Dict
[str, Any
] = {}
260 self
.check_request_fields(data
)
261 except Exception as e
:
262 results
['result'] = f
'Bad metadata: {e}'
263 self
.mgr
.log
.warning(f
'Received bad metadata from an agent: {e}')
265 # if we got here, we've already verified the keyring of the agent. If
266 # host agent is reporting on is marked offline, it shouldn't be any more
267 self
.mgr
.offline_hosts_remove(data
['host'])
268 results
['result'] = self
.handle_metadata(data
)
271 def check_request_fields(self
, data
: Dict
[str, Any
]) -> None:
272 fields
= '{' + ', '.join([key
for key
in data
.keys()]) + '}'
273 if 'host' not in data
:
275 f
'No host in metadata from agent ("host" field). Only received fields {fields}')
277 if host
not in self
.mgr
.cache
.get_hosts():
278 raise Exception(f
'Received metadata from agent on unknown hostname {host}')
279 if 'keyring' not in data
:
281 f
'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
282 if host
not in self
.mgr
.agent_cache
.agent_keys
:
283 raise Exception(f
'No agent keyring stored for host {host}. Cannot verify agent')
284 if data
['keyring'] != self
.mgr
.agent_cache
.agent_keys
[host
]:
285 raise Exception(f
'Got wrong keyring from agent on host {host}.')
286 if 'port' not in data
:
288 f
'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
289 if 'ack' not in data
:
291 f
'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
294 except Exception as e
:
296 f
'Counter value from agent on host {host} could not be converted to an integer: {e}')
297 metadata_types
= ['ls', 'networks', 'facts', 'volume']
298 metadata_types_str
= '{' + ', '.join(metadata_types
) + '}'
299 if not all(item
in data
.keys() for item
in metadata_types
):
300 self
.mgr
.log
.warning(
301 f
'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
303 def handle_metadata(self
, data
: Dict
[str, Any
]) -> str:
306 self
.mgr
.agent_cache
.agent_ports
[host
] = int(data
['port'])
307 if host
not in self
.mgr
.agent_cache
.agent_counter
:
308 self
.mgr
.agent_cache
.agent_counter
[host
] = 1
309 self
.mgr
.agent_helpers
._request
_agent
_acks
({host}
)
310 res
= f
'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
311 self
.mgr
.log
.debug(res
)
314 # update timestamp of most recent agent update
315 self
.mgr
.agent_cache
.agent_timestamp
[host
] = datetime_now()
317 error_daemons_old
= set([dd
.name() for dd
in self
.mgr
.cache
.get_error_daemons()])
318 daemon_count_old
= len(self
.mgr
.cache
.get_daemons_by_host(host
))
322 int_ack
= int(data
['ack'])
323 if int_ack
== self
.mgr
.agent_cache
.agent_counter
[host
]:
326 # we got old counter value with message, inform agent of new timestamp
327 if not self
.mgr
.agent_cache
.messaging_agent(host
):
328 self
.mgr
.agent_helpers
._request
_agent
_acks
({host}
)
330 f
'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
332 if 'ls' in data
and data
['ls']:
333 self
.mgr
._process
_ls
_output
(host
, data
['ls'])
334 self
.mgr
.update_failed_daemon_health_check()
335 if 'networks' in data
and data
['networks']:
336 self
.mgr
.cache
.update_host_networks(host
, data
['networks'])
337 if 'facts' in data
and data
['facts']:
338 self
.mgr
.cache
.update_host_facts(host
, json
.loads(data
['facts']))
339 if 'volume' in data
and data
['volume']:
340 ret
= Devices
.from_json(json
.loads(data
['volume']))
341 self
.mgr
.cache
.update_host_devices(host
, ret
.devices
)
344 error_daemons_old
!= set([dd
.name() for dd
in self
.mgr
.cache
.get_error_daemons()])
345 or daemon_count_old
!= len(self
.mgr
.cache
.get_daemons_by_host(host
))
348 f
'Change detected in state of daemons from {host} agent metadata. Kicking serve loop')
349 self
.mgr
._kick
_serve
_loop
()
351 if up_to_date
and ('ls' in data
and data
['ls']):
352 was_out_of_date
= not self
.mgr
.cache
.all_host_metadata_up_to_date()
353 self
.mgr
.cache
.metadata_up_to_date
[host
] = True
354 if was_out_of_date
and self
.mgr
.cache
.all_host_metadata_up_to_date():
356 'New metadata from agent has made all hosts up to date. Kicking serve loop')
357 self
.mgr
._kick
_serve
_loop
()
359 f
'Received up-to-date metadata from agent on host {host}.')
361 self
.mgr
.agent_cache
.save_agent(host
)
362 return 'Successfully processed metadata.'
364 except Exception as e
:
365 err_str
= f
'Failed to update metadata with metadata from agent on host {host}: {e}'
366 self
.mgr
.log
.warning(err_str
)
370 class AgentMessageThread(threading
.Thread
):
371 def __init__(self
, host
: str, port
: int, data
: Dict
[Any
, Any
], mgr
: "CephadmOrchestrator", daemon_spec
: Optional
[CephadmDaemonDeploySpec
] = None) -> None:
374 self
.addr
= self
.mgr
.inventory
.get_addr(host
) if host
in self
.mgr
.inventory
else host
376 self
.data
: str = json
.dumps(data
)
377 self
.daemon_spec
: Optional
[CephadmDaemonDeploySpec
] = daemon_spec
378 super(AgentMessageThread
, self
).__init
__(target
=self
.run
)
380 def run(self
) -> None:
381 self
.mgr
.log
.debug(f
'Sending message to agent on host {self.host}')
382 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = True
384 assert self
.mgr
.cherrypy_thread
385 root_cert
= self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert()
386 root_cert_tmp
= tempfile
.NamedTemporaryFile()
387 root_cert_tmp
.write(root_cert
.encode('utf-8'))
388 root_cert_tmp
.flush()
389 root_cert_fname
= root_cert_tmp
.name
391 cert
, key
= self
.mgr
.cherrypy_thread
.ssl_certs
.generate_cert()
393 cert_tmp
= tempfile
.NamedTemporaryFile()
394 cert_tmp
.write(cert
.encode('utf-8'))
396 cert_fname
= cert_tmp
.name
398 key_tmp
= tempfile
.NamedTemporaryFile()
399 key_tmp
.write(key
.encode('utf-8'))
401 key_fname
= key_tmp
.name
403 ssl_ctx
= ssl
.create_default_context(ssl
.Purpose
.SERVER_AUTH
, cafile
=root_cert_fname
)
404 ssl_ctx
.verify_mode
= ssl
.CERT_REQUIRED
405 ssl_ctx
.check_hostname
= True
406 ssl_ctx
.load_cert_chain(cert_fname
, key_fname
)
407 except Exception as e
:
408 self
.mgr
.log
.error(f
'Failed to get certs for connecting to agent: {e}')
409 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
412 bytes_len
: str = str(len(self
.data
.encode('utf-8')))
413 if len(bytes_len
.encode('utf-8')) > 10:
415 f
'Message is too big to send to agent. Message size is {bytes_len} bytes!')
416 while len(bytes_len
.encode('utf-8')) < 10:
417 bytes_len
= '0' + bytes_len
418 except Exception as e
:
419 self
.mgr
.log
.error(f
'Failed to get length of json payload: {e}')
420 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
422 for retry_wait
in [3, 5]:
424 agent_socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
425 secure_agent_socket
= ssl_ctx
.wrap_socket(agent_socket
, server_hostname
=self
.addr
)
426 secure_agent_socket
.connect((self
.addr
, self
.port
))
427 msg
= (bytes_len
+ self
.data
)
428 secure_agent_socket
.sendall(msg
.encode('utf-8'))
429 agent_response
= secure_agent_socket
.recv(1024).decode()
430 self
.mgr
.log
.debug(f
'Received "{agent_response}" from agent on host {self.host}')
432 self
.mgr
.agent_cache
.agent_config_successfully_delivered(self
.daemon_spec
)
433 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
435 except ConnectionError
as e
:
436 # if it's a connection error, possibly try to connect again.
437 # We could have just deployed agent and it might not be ready
439 f
'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
440 time
.sleep(retry_wait
)
441 except Exception as e
:
442 # if it's not a connection error, something has gone wrong. Give up.
443 self
.mgr
.log
.error(f
'Failed to contact agent on host {self.host}: {e}')
444 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
446 self
.mgr
.log
.error(f
'Could not connect to agent on host {self.host}')
447 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
451 class CephadmAgentHelpers
:
452 def __init__(self
, mgr
: "CephadmOrchestrator"):
453 self
.mgr
: "CephadmOrchestrator" = mgr
455 def _request_agent_acks(self
, hosts
: Set
[str], increment
: bool = False, daemon_spec
: Optional
[CephadmDaemonDeploySpec
] = None) -> None:
458 self
.mgr
.cache
.metadata_up_to_date
[host
] = False
459 if host
not in self
.mgr
.agent_cache
.agent_counter
:
460 self
.mgr
.agent_cache
.agent_counter
[host
] = 1
462 self
.mgr
.agent_cache
.agent_counter
[host
] = self
.mgr
.agent_cache
.agent_counter
[host
] + 1
463 payload
: Dict
[str, Any
] = {'counter': self
.mgr
.agent_cache
.agent_counter
[host
]}
465 payload
['config'] = daemon_spec
.final_config
466 message_thread
= AgentMessageThread(
467 host
, self
.mgr
.agent_cache
.agent_ports
[host
], payload
, self
.mgr
, daemon_spec
)
468 message_thread
.start()
470 def _request_ack_all_not_up_to_date(self
) -> None:
471 self
.mgr
.agent_helpers
._request
_agent
_acks
(
472 set([h
for h
in self
.mgr
.cache
.get_hosts() if
473 (not self
.mgr
.cache
.host_metadata_up_to_date(h
)
474 and h
in self
.mgr
.agent_cache
.agent_ports
and not self
.mgr
.agent_cache
.messaging_agent(h
))]))
476 def _agent_down(self
, host
: str) -> bool:
477 # if host is draining or drained (has _no_schedule label) there should not
478 # be an agent deployed there and therefore we should return False
479 if host
not in [h
.hostname
for h
in self
.mgr
.cache
.get_non_draining_hosts()]:
481 # if we haven't deployed an agent on the host yet, don't say an agent is down
482 if not self
.mgr
.cache
.get_daemons_by_type('agent', host
=host
):
484 # if we don't have a timestamp, it's likely because of a mgr fail over.
485 # just set the timestamp to now. However, if host was offline before, we
486 # should not allow creating a new timestamp to cause it to be marked online
487 if host
not in self
.mgr
.agent_cache
.agent_timestamp
:
488 if host
in self
.mgr
.offline_hosts
:
490 self
.mgr
.agent_cache
.agent_timestamp
[host
] = datetime_now()
491 # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
492 down_mult
: float = max(self
.mgr
.agent_down_multiplier
, 1.5)
493 time_diff
= datetime_now() - self
.mgr
.agent_cache
.agent_timestamp
[host
]
494 if time_diff
.total_seconds() > down_mult
* float(self
.mgr
.agent_refresh_rate
):
498 def _update_agent_down_healthcheck(self
, down_agent_hosts
: List
[str]) -> None:
499 self
.mgr
.remove_health_warning('CEPHADM_AGENT_DOWN')
501 detail
: List
[str] = []
502 down_mult
: float = max(self
.mgr
.agent_down_multiplier
, 1.5)
503 for agent
in down_agent_hosts
:
504 detail
.append((f
'Cephadm agent on host {agent} has not reported in '
505 f
'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
506 'down and host may be offline.'))
507 for dd
in [d
for d
in self
.mgr
.cache
.get_daemons_by_type('agent') if d
.hostname
in down_agent_hosts
]:
508 dd
.status
= DaemonDescriptionStatus
.error
509 self
.mgr
.set_health_warning(
510 'CEPHADM_AGENT_DOWN',
511 summary
='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % (
512 len(down_agent_hosts
)),
513 count
=len(down_agent_hosts
),
517 # this function probably seems very unnecessary, but it makes it considerably easier
518 # to get the unit tests working. All unit tests that check which daemons were deployed
519 # or services setup would have to be individually changed to expect an agent service or
520 # daemons, OR we can put this in its own function then mock the function
521 def _apply_agent(self
) -> None:
523 service_type
='agent',
524 placement
=PlacementSpec(host_pattern
='*')
526 self
.mgr
.spec_store
.save(spec
)
528 def _handle_use_agent_setting(self
) -> bool:
530 if self
.mgr
.use_agent
:
531 # on the off chance there are still agents hanging around from
532 # when we turned the config option off, we need to redeploy them
533 # we can tell they're in that state if we don't have a keyring for
534 # them in the host cache
535 for agent
in self
.mgr
.cache
.get_daemons_by_service('agent'):
536 if agent
.hostname
not in self
.mgr
.agent_cache
.agent_keys
:
537 self
.mgr
._schedule
_daemon
_action
(agent
.name(), 'redeploy')
538 if 'agent' not in self
.mgr
.spec_store
:
539 self
.mgr
.agent_helpers
._apply
_agent
()
542 if 'agent' in self
.mgr
.spec_store
:
543 self
.mgr
.spec_store
.rm('agent')
545 self
.mgr
.agent_cache
.agent_counter
= {}
546 self
.mgr
.agent_cache
.agent_timestamp
= {}
547 self
.mgr
.agent_cache
.agent_keys
= {}
548 self
.mgr
.agent_cache
.agent_ports
= {}
551 def _check_agent(self
, host
: str) -> bool:
554 assert self
.mgr
.cherrypy_thread
555 assert self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert()
558 f
'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
560 if self
.mgr
.agent_helpers
._agent
_down
(host
):
563 agent
= self
.mgr
.cache
.get_daemons_by_type('agent', host
=host
)[0]
564 assert agent
.daemon_id
is not None
565 assert agent
.hostname
is not None
566 except Exception as e
:
568 f
'Could not retrieve agent on host {host} from daemon cache: {e}')
571 spec
= self
.mgr
.spec_store
.active_specs
.get('agent', None)
572 deps
= self
.mgr
._calc
_daemon
_deps
(spec
, 'agent', agent
.daemon_id
)
573 last_deps
, last_config
= self
.mgr
.agent_cache
.get_agent_last_config_deps(host
)
574 if not last_config
or last_deps
!= deps
:
575 # if root cert is the dep that changed, we must use ssh to reconfig
576 # so it's necessary to check this one specifically
577 root_cert_match
= False
579 root_cert
= self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert()
580 if last_deps
and root_cert
in last_deps
:
581 root_cert_match
= True
584 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(agent
)
585 # we need to know the agent port to try to reconfig w/ http
586 # otherwise there is no choice but a full ssh reconfig
587 if host
in self
.mgr
.agent_cache
.agent_ports
and root_cert_match
and not down
:
588 daemon_spec
= self
.mgr
.cephadm_services
[daemon_type_to_service(
589 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
590 self
.mgr
.agent_helpers
._request
_agent
_acks
(
591 hosts
={daemon_spec
.host
},
593 daemon_spec
=daemon_spec
,
596 self
.mgr
._daemon
_action
(daemon_spec
, action
='reconfig')
598 except Exception as e
:
600 f
'Agent on host {host} not ready to have config and deps checked: {e}')
601 action
= self
.mgr
.cache
.get_scheduled_daemon_action(agent
.hostname
, agent
.name())
604 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(agent
)
605 self
.mgr
._daemon
_action
(daemon_spec
, action
=action
)
606 self
.mgr
.cache
.rm_scheduled_daemon_action(agent
.hostname
, agent
.name())
607 except Exception as e
:
609 f
'Agent on host {host} not ready to {action}: {e}')
614 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
619 def generate_root_cert(self
) -> Tuple
[str, str]:
620 self
.root_key
= rsa
.generate_private_key(
621 public_exponent
=65537, key_size
=4096, backend
=default_backend())
622 root_public_key
= self
.root_key
.public_key()
624 root_builder
= x509
.CertificateBuilder()
626 root_builder
= root_builder
.subject_name(x509
.Name([
627 x509
.NameAttribute(NameOID
.COMMON_NAME
, u
'cephadm-root'),
630 root_builder
= root_builder
.issuer_name(x509
.Name([
631 x509
.NameAttribute(NameOID
.COMMON_NAME
, u
'cephadm-root'),
634 root_builder
= root_builder
.not_valid_before(datetime
.now())
635 root_builder
= root_builder
.not_valid_after(datetime
.now() + timedelta(days
=(365 * 10 + 3)))
636 root_builder
= root_builder
.serial_number(x509
.random_serial_number())
637 root_builder
= root_builder
.public_key(root_public_key
)
638 root_builder
= root_builder
.add_extension(
639 x509
.SubjectAlternativeName(
640 [x509
.IPAddress(ipaddress
.IPv4Address(str(self
.mgr
.get_mgr_ip())))]
644 root_builder
= root_builder
.add_extension(
645 x509
.BasicConstraints(ca
=True, path_length
=None), critical
=True,
648 self
.root_cert
= root_builder
.sign(
649 private_key
=self
.root_key
, algorithm
=hashes
.SHA256(), backend
=default_backend()
652 cert_str
= self
.root_cert
.public_bytes(encoding
=serialization
.Encoding
.PEM
).decode('utf-8')
653 key_str
= self
.root_key
.private_bytes(
654 encoding
=serialization
.Encoding
.PEM
,
655 format
=serialization
.PrivateFormat
.TraditionalOpenSSL
,
656 encryption_algorithm
=serialization
.NoEncryption()
659 return (cert_str
, key_str
)
661 def generate_cert(self
, addr
: str = '') -> Tuple
[str, str]:
665 ip
= x509
.IPAddress(ipaddress
.IPv4Address(addr
))
668 ip
= x509
.IPAddress(ipaddress
.IPv6Address(addr
))
673 ip
= x509
.IPAddress(ipaddress
.IPv4Address(self
.mgr
.get_mgr_ip()))
675 private_key
= rsa
.generate_private_key(
676 public_exponent
=65537, key_size
=4096, backend
=default_backend())
677 public_key
= private_key
.public_key()
679 builder
= x509
.CertificateBuilder()
681 builder
= builder
.subject_name(x509
.Name([
682 x509
.NameAttribute(NameOID
.COMMON_NAME
, addr
if addr
else str(self
.mgr
.get_mgr_ip())),
685 builder
= builder
.issuer_name(x509
.Name([
686 x509
.NameAttribute(NameOID
.COMMON_NAME
, u
'cephadm-root'),
689 builder
= builder
.not_valid_before(datetime
.now())
690 builder
= builder
.not_valid_after(datetime
.now() + timedelta(days
=(365 * 10 + 3)))
691 builder
= builder
.serial_number(x509
.random_serial_number())
692 builder
= builder
.public_key(public_key
)
694 builder
= builder
.add_extension(
695 x509
.SubjectAlternativeName(
700 builder
= builder
.add_extension(
701 x509
.BasicConstraints(ca
=False, path_length
=None), critical
=True,
705 private_key
=self
.root_key
, algorithm
=hashes
.SHA256(), backend
=default_backend()
708 cert_str
= cert
.public_bytes(encoding
=serialization
.Encoding
.PEM
).decode('utf-8')
709 key_str
= private_key
.private_bytes(
710 encoding
=serialization
.Encoding
.PEM
,
711 format
=serialization
.PrivateFormat
.TraditionalOpenSSL
,
712 encryption_algorithm
=serialization
.NoEncryption()
715 return (cert_str
, key_str
)
717 def get_root_cert(self
) -> str:
719 return self
.root_cert
.public_bytes(encoding
=serialization
.Encoding
.PEM
).decode('utf-8')
720 except AttributeError:
723 def get_root_key(self
) -> str:
725 return self
.root_key
.private_bytes(
726 encoding
=serialization
.Encoding
.PEM
,
727 format
=serialization
.PrivateFormat
.TraditionalOpenSSL
,
728 encryption_algorithm
=serialization
.NoEncryption(),
730 except AttributeError:
733 def load_root_credentials(self
, cert
: str, priv_key
: str) -> None:
734 given_cert
= x509
.load_pem_x509_certificate(cert
.encode('utf-8'), backend
=default_backend())
735 tz
= given_cert
.not_valid_after
.tzinfo
736 if datetime
.now(tz
) >= given_cert
.not_valid_after
:
737 raise OrchestratorError('Given cert is expired')
738 self
.root_cert
= given_cert
739 self
.root_key
= serialization
.load_pem_private_key(
740 data
=priv_key
.encode('utf-8'), backend
=default_backend(), password
=None)