]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/agent.py
fa75a8759bbcf3907ef8bd941b25b612106418f6
[ceph.git] / ceph / src / pybind / mgr / cephadm / agent.py
1 import cherrypy
2 import ipaddress
3 import json
4 import logging
5 import socket
6 import ssl
7 import tempfile
8 import threading
9 import time
10
11 from mgr_module import ServiceInfoT
12 from mgr_util import verify_tls_files, build_url
13 from orchestrator import DaemonDescriptionStatus, OrchestratorError
14 from orchestrator._interface import daemon_type_to_service
15 from ceph.utils import datetime_now
16 from ceph.deployment.inventory import Devices
17 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
18 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
19 from cephadm.services.ingress import IngressSpec
20
21 from datetime import datetime, timedelta
22 from cryptography import x509
23 from cryptography.x509.oid import NameOID
24 from cryptography.hazmat.primitives.asymmetric import rsa
25 from cryptography.hazmat.primitives import hashes, serialization
26 from cryptography.hazmat.backends import default_backend
27
28 from typing import Any, Dict, List, Set, Tuple, \
29 TYPE_CHECKING, Optional, cast, Collection
30
31 if TYPE_CHECKING:
32 from cephadm.module import CephadmOrchestrator
33
34
35 def cherrypy_filter(record: logging.LogRecord) -> int:
36 blocked = [
37 'TLSV1_ALERT_DECRYPT_ERROR'
38 ]
39 msg = record.getMessage()
40 return not any([m for m in blocked if m in msg])
41
42
43 logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
44 cherrypy.log.access_log.propagate = False
45
46
47 class CherryPyThread(threading.Thread):
48 def __init__(self, mgr: "CephadmOrchestrator") -> None:
49 self.mgr = mgr
50 self.cherrypy_shutdown_event = threading.Event()
51 self.ssl_certs = SSLCerts(self.mgr)
52 self.server_port = 7150
53 self.server_addr = self.mgr.get_mgr_ip()
54 super(CherryPyThread, self).__init__(target=self.run)
55
56 def configure_cherrypy(self) -> None:
57 cherrypy.config.update({
58 'environment': 'production',
59 'server.socket_host': self.server_addr,
60 'server.socket_port': self.server_port,
61 'engine.autoreload.on': False,
62 'server.ssl_module': 'builtin',
63 'server.ssl_certificate': self.cert_tmp.name,
64 'server.ssl_private_key': self.key_tmp.name,
65 })
66
67 # configure routes
68 root = Root(self.mgr)
69 host_data = HostData(self.mgr)
70 d = cherrypy.dispatch.RoutesDispatcher()
71 d.connect(name='index', route='/', controller=root.index)
72 d.connect(name='sd-config', route='/prometheus/sd-config', controller=root.get_sd_config)
73 d.connect(name='rules', route='/prometheus/rules', controller=root.get_prometheus_rules)
74 d.connect(name='host-data', route='/data', controller=host_data.POST,
75 conditions=dict(method=['POST']))
76
77 conf = {'/': {'request.dispatch': d}}
78 cherrypy.tree.mount(None, "/", config=conf)
79
80 def run(self) -> None:
81 try:
82 try:
83 old_creds = self.mgr.get_store('cephadm_endpoint_credentials')
84 if not old_creds:
85 raise OrchestratorError('No old credentials for cephadm endpoint found')
86 old_creds_dict = json.loads(old_creds)
87 old_key = old_creds_dict['key']
88 old_cert = old_creds_dict['cert']
89 self.ssl_certs.load_root_credentials(old_cert, old_key)
90 except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
91 self.ssl_certs.generate_root_cert()
92
93 cert, key = self.ssl_certs.generate_cert()
94
95 self.key_tmp = tempfile.NamedTemporaryFile()
96 self.key_tmp.write(key.encode('utf-8'))
97 self.key_tmp.flush() # pkey_tmp must not be gc'ed
98 key_fname = self.key_tmp.name
99
100 self.cert_tmp = tempfile.NamedTemporaryFile()
101 self.cert_tmp.write(cert.encode('utf-8'))
102 self.cert_tmp.flush() # cert_tmp must not be gc'ed
103 cert_fname = self.cert_tmp.name
104
105 verify_tls_files(cert_fname, key_fname)
106 self.configure_cherrypy()
107
108 self.mgr.log.debug('Starting cherrypy engine...')
109 self.start_engine()
110 self.mgr.log.debug('Cherrypy engine started.')
111 cephadm_endpoint_creds = {
112 'cert': self.ssl_certs.get_root_cert(),
113 'key': self.ssl_certs.get_root_key()
114 }
115 self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds))
116 self.mgr._kick_serve_loop()
117 # wait for the shutdown event
118 self.cherrypy_shutdown_event.wait()
119 self.cherrypy_shutdown_event.clear()
120 cherrypy.engine.stop()
121 self.mgr.log.debug('Cherrypy engine stopped.')
122 except Exception as e:
123 self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}')
124
125 def start_engine(self) -> None:
126 port_connect_attempts = 0
127 while port_connect_attempts < 150:
128 try:
129 cherrypy.engine.start()
130 self.mgr.log.debug(f'Cephadm endpoint connected to port {self.server_port}')
131 return
132 except cherrypy.process.wspbus.ChannelFailures as e:
133 self.mgr.log.debug(
134 f'{e}. Trying next port.')
135 self.server_port += 1
136 cherrypy.server.httpserver = None
137 cherrypy.config.update({
138 'server.socket_port': self.server_port
139 })
140 port_connect_attempts += 1
141 self.mgr.log.error(
142 'Cephadm Endpoint could not find free port in range 7150-7300 and failed to start')
143
144 def shutdown(self) -> None:
145 self.mgr.log.debug('Stopping cherrypy engine...')
146 self.cherrypy_shutdown_event.set()
147
148
149 class Root(object):
150
151 # collapse everything to '/'
152 def _cp_dispatch(self, vpath: str) -> 'Root':
153 cherrypy.request.path = ''
154 return self
155
156 def __init__(self, mgr: "CephadmOrchestrator"):
157 self.mgr = mgr
158
159 @cherrypy.expose
160 def index(self) -> str:
161 return '''<!DOCTYPE html>
162 <html>
163 <head><title>Cephadm HTTP Endpoint</title></head>
164 <body>
165 <h2>Cephadm Service Discovery Endpoints</h2>
166 <p><a href='prometheus/sd-config?service=mgr-prometheus'>mgr/Prometheus http sd-config</a></p>
167 <p><a href='prometheus/sd-config?service=alertmanager'>Alertmanager http sd-config</a></p>
168 <p><a href='prometheus/sd-config?service=node-exporter'>Node exporter http sd-config</a></p>
169 <p><a href='prometheus/sd-config?service=haproxy'>HAProxy http sd-config</a></p>
170 <p><a href='prometheus/rules'>Prometheus rules</a></p>
171 </body>
172 </html>'''
173
174 @cherrypy.expose
175 @cherrypy.tools.json_out()
176 def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]:
177 """Return <http_sd_config> compatible prometheus config for the specified service."""
178 if service == 'mgr-prometheus':
179 return self.prometheus_sd_config()
180 elif service == 'alertmanager':
181 return self.alertmgr_sd_config()
182 elif service == 'node-exporter':
183 return self.node_exporter_sd_config()
184 elif service == 'haproxy':
185 return self.haproxy_sd_config()
186 else:
187 return []
188
189 def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]:
190 """Return <http_sd_config> compatible prometheus config for prometheus service."""
191 servers = self.mgr.list_servers()
192 targets = []
193 for server in servers:
194 hostname = server.get('hostname', '')
195 for service in cast(List[ServiceInfoT], server.get('services', [])):
196 if service['type'] != 'mgr':
197 continue
198 port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283)
199 targets.append(f'{hostname}:{port}')
200 return [{"targets": targets, "labels": {}}]
201
202 def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]:
203 """Return <http_sd_config> compatible prometheus config for mgr alertmanager service."""
204 srv_entries = []
205 for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
206 assert dd.hostname is not None
207 addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
208 port = dd.ports[0] if dd.ports else 9093
209 srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/')))
210 return [{"targets": srv_entries, "labels": {}}]
211
212 def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]:
213 """Return <http_sd_config> compatible prometheus config for node-exporter service."""
214 srv_entries = []
215 for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
216 assert dd.hostname is not None
217 addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
218 port = dd.ports[0] if dd.ports else 9100
219 srv_entries.append({
220 'targets': [build_url(host=addr, port=port).lstrip('/')],
221 'labels': {'instance': dd.hostname}
222 })
223 return srv_entries
224
225 def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]:
226 """Return <http_sd_config> compatible prometheus config for haproxy service."""
227 srv_entries = []
228 for dd in self.mgr.cache.get_daemons_by_type('ingress'):
229 if dd.service_name() in self.mgr.spec_store:
230 spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec)
231 assert dd.hostname is not None
232 if dd.daemon_type == 'haproxy':
233 addr = self.mgr.inventory.get_addr(dd.hostname)
234 srv_entries.append({
235 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"],
236 'labels': {'instance': dd.service_name()}
237 })
238 return srv_entries
239
240 @cherrypy.expose(alias='prometheus/rules')
241 def get_prometheus_rules(self) -> str:
242 """Return currently configured prometheus rules as Yaml."""
243 cherrypy.response.headers['Content-Type'] = 'text/plain'
244 with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f:
245 return f.read()
246
247
248 class HostData:
249 exposed = True
250
251 def __init__(self, mgr: "CephadmOrchestrator"):
252 self.mgr = mgr
253
254 @cherrypy.tools.json_in()
255 @cherrypy.tools.json_out()
256 def POST(self) -> Dict[str, Any]:
257 data: Dict[str, Any] = cherrypy.request.json
258 results: Dict[str, Any] = {}
259 try:
260 self.check_request_fields(data)
261 except Exception as e:
262 results['result'] = f'Bad metadata: {e}'
263 self.mgr.log.warning(f'Received bad metadata from an agent: {e}')
264 else:
265 # if we got here, we've already verified the keyring of the agent. If
266 # host agent is reporting on is marked offline, it shouldn't be any more
267 self.mgr.offline_hosts_remove(data['host'])
268 results['result'] = self.handle_metadata(data)
269 return results
270
271 def check_request_fields(self, data: Dict[str, Any]) -> None:
272 fields = '{' + ', '.join([key for key in data.keys()]) + '}'
273 if 'host' not in data:
274 raise Exception(
275 f'No host in metadata from agent ("host" field). Only received fields {fields}')
276 host = data['host']
277 if host not in self.mgr.cache.get_hosts():
278 raise Exception(f'Received metadata from agent on unknown hostname {host}')
279 if 'keyring' not in data:
280 raise Exception(
281 f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
282 if host not in self.mgr.agent_cache.agent_keys:
283 raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
284 if data['keyring'] != self.mgr.agent_cache.agent_keys[host]:
285 raise Exception(f'Got wrong keyring from agent on host {host}.')
286 if 'port' not in data:
287 raise Exception(
288 f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
289 if 'ack' not in data:
290 raise Exception(
291 f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
292 try:
293 int(data['ack'])
294 except Exception as e:
295 raise Exception(
296 f'Counter value from agent on host {host} could not be converted to an integer: {e}')
297 metadata_types = ['ls', 'networks', 'facts', 'volume']
298 metadata_types_str = '{' + ', '.join(metadata_types) + '}'
299 if not all(item in data.keys() for item in metadata_types):
300 self.mgr.log.warning(
301 f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
302
303 def handle_metadata(self, data: Dict[str, Any]) -> str:
304 try:
305 host = data['host']
306 self.mgr.agent_cache.agent_ports[host] = int(data['port'])
307 if host not in self.mgr.agent_cache.agent_counter:
308 self.mgr.agent_cache.agent_counter[host] = 1
309 self.mgr.agent_helpers._request_agent_acks({host})
310 res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
311 self.mgr.log.debug(res)
312 return res
313
314 # update timestamp of most recent agent update
315 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
316
317 error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
318 daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
319
320 up_to_date = False
321
322 int_ack = int(data['ack'])
323 if int_ack == self.mgr.agent_cache.agent_counter[host]:
324 up_to_date = True
325 else:
326 # we got old counter value with message, inform agent of new timestamp
327 if not self.mgr.agent_cache.messaging_agent(host):
328 self.mgr.agent_helpers._request_agent_acks({host})
329 self.mgr.log.debug(
330 f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
331
332 if 'ls' in data and data['ls']:
333 self.mgr._process_ls_output(host, data['ls'])
334 self.mgr.update_failed_daemon_health_check()
335 if 'networks' in data and data['networks']:
336 self.mgr.cache.update_host_networks(host, data['networks'])
337 if 'facts' in data and data['facts']:
338 self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
339 if 'volume' in data and data['volume']:
340 ret = Devices.from_json(json.loads(data['volume']))
341 self.mgr.cache.update_host_devices(host, ret.devices)
342
343 if (
344 error_daemons_old != set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
345 or daemon_count_old != len(self.mgr.cache.get_daemons_by_host(host))
346 ):
347 self.mgr.log.debug(
348 f'Change detected in state of daemons from {host} agent metadata. Kicking serve loop')
349 self.mgr._kick_serve_loop()
350
351 if up_to_date and ('ls' in data and data['ls']):
352 was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date()
353 self.mgr.cache.metadata_up_to_date[host] = True
354 if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date():
355 self.mgr.log.debug(
356 'New metadata from agent has made all hosts up to date. Kicking serve loop')
357 self.mgr._kick_serve_loop()
358 self.mgr.log.debug(
359 f'Received up-to-date metadata from agent on host {host}.')
360
361 self.mgr.agent_cache.save_agent(host)
362 return 'Successfully processed metadata.'
363
364 except Exception as e:
365 err_str = f'Failed to update metadata with metadata from agent on host {host}: {e}'
366 self.mgr.log.warning(err_str)
367 return err_str
368
369
370 class AgentMessageThread(threading.Thread):
371 def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
372 self.mgr = mgr
373 self.host = host
374 self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
375 self.port = port
376 self.data: str = json.dumps(data)
377 self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
378 super(AgentMessageThread, self).__init__(target=self.run)
379
380 def run(self) -> None:
381 self.mgr.log.debug(f'Sending message to agent on host {self.host}')
382 self.mgr.agent_cache.sending_agent_message[self.host] = True
383 try:
384 assert self.mgr.cherrypy_thread
385 root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
386 root_cert_tmp = tempfile.NamedTemporaryFile()
387 root_cert_tmp.write(root_cert.encode('utf-8'))
388 root_cert_tmp.flush()
389 root_cert_fname = root_cert_tmp.name
390
391 cert, key = self.mgr.cherrypy_thread.ssl_certs.generate_cert()
392
393 cert_tmp = tempfile.NamedTemporaryFile()
394 cert_tmp.write(cert.encode('utf-8'))
395 cert_tmp.flush()
396 cert_fname = cert_tmp.name
397
398 key_tmp = tempfile.NamedTemporaryFile()
399 key_tmp.write(key.encode('utf-8'))
400 key_tmp.flush()
401 key_fname = key_tmp.name
402
403 ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=root_cert_fname)
404 ssl_ctx.verify_mode = ssl.CERT_REQUIRED
405 ssl_ctx.check_hostname = True
406 ssl_ctx.load_cert_chain(cert_fname, key_fname)
407 except Exception as e:
408 self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}')
409 self.mgr.agent_cache.sending_agent_message[self.host] = False
410 return
411 try:
412 bytes_len: str = str(len(self.data.encode('utf-8')))
413 if len(bytes_len.encode('utf-8')) > 10:
414 raise Exception(
415 f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
416 while len(bytes_len.encode('utf-8')) < 10:
417 bytes_len = '0' + bytes_len
418 except Exception as e:
419 self.mgr.log.error(f'Failed to get length of json payload: {e}')
420 self.mgr.agent_cache.sending_agent_message[self.host] = False
421 return
422 for retry_wait in [3, 5]:
423 try:
424 agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
425 secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr)
426 secure_agent_socket.connect((self.addr, self.port))
427 msg = (bytes_len + self.data)
428 secure_agent_socket.sendall(msg.encode('utf-8'))
429 agent_response = secure_agent_socket.recv(1024).decode()
430 self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
431 if self.daemon_spec:
432 self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec)
433 self.mgr.agent_cache.sending_agent_message[self.host] = False
434 return
435 except ConnectionError as e:
436 # if it's a connection error, possibly try to connect again.
437 # We could have just deployed agent and it might not be ready
438 self.mgr.log.debug(
439 f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
440 time.sleep(retry_wait)
441 except Exception as e:
442 # if it's not a connection error, something has gone wrong. Give up.
443 self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}')
444 self.mgr.agent_cache.sending_agent_message[self.host] = False
445 return
446 self.mgr.log.error(f'Could not connect to agent on host {self.host}')
447 self.mgr.agent_cache.sending_agent_message[self.host] = False
448 return
449
450
451 class CephadmAgentHelpers:
452 def __init__(self, mgr: "CephadmOrchestrator"):
453 self.mgr: "CephadmOrchestrator" = mgr
454
455 def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
456 for host in hosts:
457 if increment:
458 self.mgr.cache.metadata_up_to_date[host] = False
459 if host not in self.mgr.agent_cache.agent_counter:
460 self.mgr.agent_cache.agent_counter[host] = 1
461 elif increment:
462 self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1
463 payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]}
464 if daemon_spec:
465 payload['config'] = daemon_spec.final_config
466 message_thread = AgentMessageThread(
467 host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
468 message_thread.start()
469
470 def _request_ack_all_not_up_to_date(self) -> None:
471 self.mgr.agent_helpers._request_agent_acks(
472 set([h for h in self.mgr.cache.get_hosts() if
473 (not self.mgr.cache.host_metadata_up_to_date(h)
474 and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]))
475
476 def _agent_down(self, host: str) -> bool:
477 # if host is draining or drained (has _no_schedule label) there should not
478 # be an agent deployed there and therefore we should return False
479 if host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]:
480 return False
481 # if we haven't deployed an agent on the host yet, don't say an agent is down
482 if not self.mgr.cache.get_daemons_by_type('agent', host=host):
483 return False
484 # if we don't have a timestamp, it's likely because of a mgr fail over.
485 # just set the timestamp to now. However, if host was offline before, we
486 # should not allow creating a new timestamp to cause it to be marked online
487 if host not in self.mgr.agent_cache.agent_timestamp:
488 if host in self.mgr.offline_hosts:
489 return False
490 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
491 # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
492 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
493 time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host]
494 if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate):
495 return True
496 return False
497
498 def _update_agent_down_healthcheck(self, down_agent_hosts: List[str]) -> None:
499 self.mgr.remove_health_warning('CEPHADM_AGENT_DOWN')
500 if down_agent_hosts:
501 detail: List[str] = []
502 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
503 for agent in down_agent_hosts:
504 detail.append((f'Cephadm agent on host {agent} has not reported in '
505 f'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
506 'down and host may be offline.'))
507 for dd in [d for d in self.mgr.cache.get_daemons_by_type('agent') if d.hostname in down_agent_hosts]:
508 dd.status = DaemonDescriptionStatus.error
509 self.mgr.set_health_warning(
510 'CEPHADM_AGENT_DOWN',
511 summary='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % (
512 len(down_agent_hosts)),
513 count=len(down_agent_hosts),
514 detail=detail,
515 )
516
517 # this function probably seems very unnecessary, but it makes it considerably easier
518 # to get the unit tests working. All unit tests that check which daemons were deployed
519 # or services setup would have to be individually changed to expect an agent service or
520 # daemons, OR we can put this in its own function then mock the function
521 def _apply_agent(self) -> None:
522 spec = ServiceSpec(
523 service_type='agent',
524 placement=PlacementSpec(host_pattern='*')
525 )
526 self.mgr.spec_store.save(spec)
527
528 def _handle_use_agent_setting(self) -> bool:
529 need_apply = False
530 if self.mgr.use_agent:
531 # on the off chance there are still agents hanging around from
532 # when we turned the config option off, we need to redeploy them
533 # we can tell they're in that state if we don't have a keyring for
534 # them in the host cache
535 for agent in self.mgr.cache.get_daemons_by_service('agent'):
536 if agent.hostname not in self.mgr.agent_cache.agent_keys:
537 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
538 if 'agent' not in self.mgr.spec_store:
539 self.mgr.agent_helpers._apply_agent()
540 need_apply = True
541 else:
542 if 'agent' in self.mgr.spec_store:
543 self.mgr.spec_store.rm('agent')
544 need_apply = True
545 self.mgr.agent_cache.agent_counter = {}
546 self.mgr.agent_cache.agent_timestamp = {}
547 self.mgr.agent_cache.agent_keys = {}
548 self.mgr.agent_cache.agent_ports = {}
549 return need_apply
550
551 def _check_agent(self, host: str) -> bool:
552 down = False
553 try:
554 assert self.mgr.cherrypy_thread
555 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
556 except Exception:
557 self.mgr.log.debug(
558 f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
559 return down
560 if self.mgr.agent_helpers._agent_down(host):
561 down = True
562 try:
563 agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
564 assert agent.daemon_id is not None
565 assert agent.hostname is not None
566 except Exception as e:
567 self.mgr.log.debug(
568 f'Could not retrieve agent on host {host} from daemon cache: {e}')
569 return down
570 try:
571 spec = self.mgr.spec_store.active_specs.get('agent', None)
572 deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
573 last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host)
574 if not last_config or last_deps != deps:
575 # if root cert is the dep that changed, we must use ssh to reconfig
576 # so it's necessary to check this one specifically
577 root_cert_match = False
578 try:
579 root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
580 if last_deps and root_cert in last_deps:
581 root_cert_match = True
582 except Exception:
583 pass
584 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
585 # we need to know the agent port to try to reconfig w/ http
586 # otherwise there is no choice but a full ssh reconfig
587 if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
588 daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
589 daemon_spec.daemon_type)].prepare_create(daemon_spec)
590 self.mgr.agent_helpers._request_agent_acks(
591 hosts={daemon_spec.host},
592 increment=True,
593 daemon_spec=daemon_spec,
594 )
595 else:
596 self.mgr._daemon_action(daemon_spec, action='reconfig')
597 return down
598 except Exception as e:
599 self.mgr.log.debug(
600 f'Agent on host {host} not ready to have config and deps checked: {e}')
601 action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
602 if action:
603 try:
604 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
605 self.mgr._daemon_action(daemon_spec, action=action)
606 self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
607 except Exception as e:
608 self.mgr.log.debug(
609 f'Agent on host {host} not ready to {action}: {e}')
610 return down
611
612
613 class SSLCerts:
614 def __init__(self, mgr: "CephadmOrchestrator") -> None:
615 self.mgr = mgr
616 self.root_cert: Any
617 self.root_key: Any
618
619 def generate_root_cert(self) -> Tuple[str, str]:
620 self.root_key = rsa.generate_private_key(
621 public_exponent=65537, key_size=4096, backend=default_backend())
622 root_public_key = self.root_key.public_key()
623
624 root_builder = x509.CertificateBuilder()
625
626 root_builder = root_builder.subject_name(x509.Name([
627 x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
628 ]))
629
630 root_builder = root_builder.issuer_name(x509.Name([
631 x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
632 ]))
633
634 root_builder = root_builder.not_valid_before(datetime.now())
635 root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
636 root_builder = root_builder.serial_number(x509.random_serial_number())
637 root_builder = root_builder.public_key(root_public_key)
638 root_builder = root_builder.add_extension(
639 x509.SubjectAlternativeName(
640 [x509.IPAddress(ipaddress.IPv4Address(str(self.mgr.get_mgr_ip())))]
641 ),
642 critical=False
643 )
644 root_builder = root_builder.add_extension(
645 x509.BasicConstraints(ca=True, path_length=None), critical=True,
646 )
647
648 self.root_cert = root_builder.sign(
649 private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
650 )
651
652 cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
653 key_str = self.root_key.private_bytes(
654 encoding=serialization.Encoding.PEM,
655 format=serialization.PrivateFormat.TraditionalOpenSSL,
656 encryption_algorithm=serialization.NoEncryption()
657 ).decode('utf-8')
658
659 return (cert_str, key_str)
660
661 def generate_cert(self, addr: str = '') -> Tuple[str, str]:
662 have_ip = True
663 if addr:
664 try:
665 ip = x509.IPAddress(ipaddress.IPv4Address(addr))
666 except Exception:
667 try:
668 ip = x509.IPAddress(ipaddress.IPv6Address(addr))
669 except Exception:
670 have_ip = False
671 pass
672 else:
673 ip = x509.IPAddress(ipaddress.IPv4Address(self.mgr.get_mgr_ip()))
674
675 private_key = rsa.generate_private_key(
676 public_exponent=65537, key_size=4096, backend=default_backend())
677 public_key = private_key.public_key()
678
679 builder = x509.CertificateBuilder()
680
681 builder = builder.subject_name(x509.Name([
682 x509.NameAttribute(NameOID.COMMON_NAME, addr if addr else str(self.mgr.get_mgr_ip())),
683 ]))
684
685 builder = builder.issuer_name(x509.Name([
686 x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
687 ]))
688
689 builder = builder.not_valid_before(datetime.now())
690 builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
691 builder = builder.serial_number(x509.random_serial_number())
692 builder = builder.public_key(public_key)
693 if have_ip:
694 builder = builder.add_extension(
695 x509.SubjectAlternativeName(
696 [ip]
697 ),
698 critical=False
699 )
700 builder = builder.add_extension(
701 x509.BasicConstraints(ca=False, path_length=None), critical=True,
702 )
703
704 cert = builder.sign(
705 private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
706 )
707
708 cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
709 key_str = private_key.private_bytes(
710 encoding=serialization.Encoding.PEM,
711 format=serialization.PrivateFormat.TraditionalOpenSSL,
712 encryption_algorithm=serialization.NoEncryption()
713 ).decode('utf-8')
714
715 return (cert_str, key_str)
716
717 def get_root_cert(self) -> str:
718 try:
719 return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
720 except AttributeError:
721 return ''
722
723 def get_root_key(self) -> str:
724 try:
725 return self.root_key.private_bytes(
726 encoding=serialization.Encoding.PEM,
727 format=serialization.PrivateFormat.TraditionalOpenSSL,
728 encryption_algorithm=serialization.NoEncryption(),
729 ).decode('utf-8')
730 except AttributeError:
731 return ''
732
733 def load_root_credentials(self, cert: str, priv_key: str) -> None:
734 given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
735 tz = given_cert.not_valid_after.tzinfo
736 if datetime.now(tz) >= given_cert.not_valid_after:
737 raise OrchestratorError('Given cert is expired')
738 self.root_cert = given_cert
739 self.root_key = serialization.load_pem_private_key(
740 data=priv_key.encode('utf-8'), backend=default_backend(), password=None)