]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/agent.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / agent.py
CommitLineData
20effc67
TL
1import cherrypy
2import ipaddress
3import json
4import logging
5import socket
6import ssl
7import tempfile
8import threading
9import time
10
33c7a0ef
TL
11from mgr_module import ServiceInfoT
12from mgr_util import verify_tls_files, build_url
20effc67
TL
13from orchestrator import DaemonDescriptionStatus, OrchestratorError
14from orchestrator._interface import daemon_type_to_service
15from ceph.utils import datetime_now
16from ceph.deployment.inventory import Devices
17from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
39ae355f 18from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephExporterService
33c7a0ef 19from cephadm.services.ingress import IngressSpec
20effc67
TL
20
21from datetime import datetime, timedelta
22from cryptography import x509
23from cryptography.x509.oid import NameOID
24from cryptography.hazmat.primitives.asymmetric import rsa
25from cryptography.hazmat.primitives import hashes, serialization
26from cryptography.hazmat.backends import default_backend
27
28from typing import Any, Dict, List, Set, Tuple, \
33c7a0ef 29 TYPE_CHECKING, Optional, cast, Collection
20effc67
TL
30
31if TYPE_CHECKING:
32 from cephadm.module import CephadmOrchestrator
33
34
35def cherrypy_filter(record: logging.LogRecord) -> int:
36 blocked = [
37 'TLSV1_ALERT_DECRYPT_ERROR'
38 ]
39 msg = record.getMessage()
40 return not any([m for m in blocked if m in msg])
41
42
43logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
44cherrypy.log.access_log.propagate = False
45
46
47class CherryPyThread(threading.Thread):
48 def __init__(self, mgr: "CephadmOrchestrator") -> None:
49 self.mgr = mgr
50 self.cherrypy_shutdown_event = threading.Event()
51 self.ssl_certs = SSLCerts(self.mgr)
52 self.server_port = 7150
53 self.server_addr = self.mgr.get_mgr_ip()
54 super(CherryPyThread, self).__init__(target=self.run)
55
33c7a0ef
TL
56 def configure_cherrypy(self) -> None:
57 cherrypy.config.update({
58 'environment': 'production',
59 'server.socket_host': self.server_addr,
60 'server.socket_port': self.server_port,
61 'engine.autoreload.on': False,
62 'server.ssl_module': 'builtin',
63 'server.ssl_certificate': self.cert_tmp.name,
64 'server.ssl_private_key': self.key_tmp.name,
65 })
66
67 # configure routes
68 root = Root(self.mgr)
69 host_data = HostData(self.mgr)
70 d = cherrypy.dispatch.RoutesDispatcher()
71 d.connect(name='index', route='/', controller=root.index)
72 d.connect(name='sd-config', route='/prometheus/sd-config', controller=root.get_sd_config)
73 d.connect(name='rules', route='/prometheus/rules', controller=root.get_prometheus_rules)
74 d.connect(name='host-data', route='/data', controller=host_data.POST,
75 conditions=dict(method=['POST']))
76
77 conf = {'/': {'request.dispatch': d}}
78 cherrypy.tree.mount(None, "/", config=conf)
79
20effc67
TL
80 def run(self) -> None:
81 try:
82 try:
83 old_creds = self.mgr.get_store('cephadm_endpoint_credentials')
84 if not old_creds:
85 raise OrchestratorError('No old credentials for cephadm endpoint found')
86 old_creds_dict = json.loads(old_creds)
87 old_key = old_creds_dict['key']
88 old_cert = old_creds_dict['cert']
89 self.ssl_certs.load_root_credentials(old_cert, old_key)
90 except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError):
91 self.ssl_certs.generate_root_cert()
92
93 cert, key = self.ssl_certs.generate_cert()
94
95 self.key_tmp = tempfile.NamedTemporaryFile()
96 self.key_tmp.write(key.encode('utf-8'))
97 self.key_tmp.flush() # pkey_tmp must not be gc'ed
98 key_fname = self.key_tmp.name
99
100 self.cert_tmp = tempfile.NamedTemporaryFile()
101 self.cert_tmp.write(cert.encode('utf-8'))
102 self.cert_tmp.flush() # cert_tmp must not be gc'ed
103 cert_fname = self.cert_tmp.name
104
105 verify_tls_files(cert_fname, key_fname)
33c7a0ef 106 self.configure_cherrypy()
20effc67 107
20effc67
TL
108 self.mgr.log.debug('Starting cherrypy engine...')
109 self.start_engine()
110 self.mgr.log.debug('Cherrypy engine started.')
111 cephadm_endpoint_creds = {
112 'cert': self.ssl_certs.get_root_cert(),
113 'key': self.ssl_certs.get_root_key()
114 }
115 self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds))
116 self.mgr._kick_serve_loop()
117 # wait for the shutdown event
118 self.cherrypy_shutdown_event.wait()
119 self.cherrypy_shutdown_event.clear()
120 cherrypy.engine.stop()
121 self.mgr.log.debug('Cherrypy engine stopped.')
122 except Exception as e:
123 self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}')
124
125 def start_engine(self) -> None:
126 port_connect_attempts = 0
127 while port_connect_attempts < 150:
128 try:
129 cherrypy.engine.start()
130 self.mgr.log.debug(f'Cephadm endpoint connected to port {self.server_port}')
131 return
132 except cherrypy.process.wspbus.ChannelFailures as e:
133 self.mgr.log.debug(
134 f'{e}. Trying next port.')
135 self.server_port += 1
136 cherrypy.server.httpserver = None
137 cherrypy.config.update({
138 'server.socket_port': self.server_port
139 })
140 port_connect_attempts += 1
141 self.mgr.log.error(
142 'Cephadm Endpoint could not find free port in range 7150-7300 and failed to start')
143
144 def shutdown(self) -> None:
145 self.mgr.log.debug('Stopping cherrypy engine...')
146 self.cherrypy_shutdown_event.set()
147
148
33c7a0ef
TL
149class Root(object):
150
151 # collapse everything to '/'
152 def _cp_dispatch(self, vpath: str) -> 'Root':
153 cherrypy.request.path = ''
154 return self
20effc67
TL
155
156 def __init__(self, mgr: "CephadmOrchestrator"):
157 self.mgr = mgr
20effc67 158
33c7a0ef
TL
159 @cherrypy.expose
160 def index(self) -> str:
20effc67
TL
161 return '''<!DOCTYPE html>
162<html>
163<head><title>Cephadm HTTP Endpoint</title></head>
164<body>
33c7a0ef
TL
165<h2>Cephadm Service Discovery Endpoints</h2>
166<p><a href='prometheus/sd-config?service=mgr-prometheus'>mgr/Prometheus http sd-config</a></p>
167<p><a href='prometheus/sd-config?service=alertmanager'>Alertmanager http sd-config</a></p>
168<p><a href='prometheus/sd-config?service=node-exporter'>Node exporter http sd-config</a></p>
169<p><a href='prometheus/sd-config?service=haproxy'>HAProxy http sd-config</a></p>
170<p><a href='prometheus/rules'>Prometheus rules</a></p>
20effc67
TL
171</body>
172</html>'''
173
33c7a0ef
TL
174 @cherrypy.expose
175 @cherrypy.tools.json_out()
176 def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]:
177 """Return <http_sd_config> compatible prometheus config for the specified service."""
178 if service == 'mgr-prometheus':
179 return self.prometheus_sd_config()
180 elif service == 'alertmanager':
181 return self.alertmgr_sd_config()
182 elif service == 'node-exporter':
183 return self.node_exporter_sd_config()
184 elif service == 'haproxy':
185 return self.haproxy_sd_config()
39ae355f
TL
186 elif service == 'ceph-exporter':
187 return self.ceph_exporter_sd_config()
33c7a0ef
TL
188 else:
189 return []
190
191 def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]:
192 """Return <http_sd_config> compatible prometheus config for prometheus service."""
193 servers = self.mgr.list_servers()
194 targets = []
195 for server in servers:
196 hostname = server.get('hostname', '')
197 for service in cast(List[ServiceInfoT], server.get('services', [])):
198 if service['type'] != 'mgr':
199 continue
200 port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283)
201 targets.append(f'{hostname}:{port}')
202 return [{"targets": targets, "labels": {}}]
203
204 def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]:
205 """Return <http_sd_config> compatible prometheus config for mgr alertmanager service."""
206 srv_entries = []
207 for dd in self.mgr.cache.get_daemons_by_service('alertmanager'):
208 assert dd.hostname is not None
209 addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
210 port = dd.ports[0] if dd.ports else 9093
211 srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/')))
212 return [{"targets": srv_entries, "labels": {}}]
213
214 def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]:
215 """Return <http_sd_config> compatible prometheus config for node-exporter service."""
216 srv_entries = []
217 for dd in self.mgr.cache.get_daemons_by_service('node-exporter'):
218 assert dd.hostname is not None
219 addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
220 port = dd.ports[0] if dd.ports else 9100
221 srv_entries.append({
222 'targets': [build_url(host=addr, port=port).lstrip('/')],
223 'labels': {'instance': dd.hostname}
224 })
225 return srv_entries
226
227 def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]:
228 """Return <http_sd_config> compatible prometheus config for haproxy service."""
229 srv_entries = []
230 for dd in self.mgr.cache.get_daemons_by_type('ingress'):
231 if dd.service_name() in self.mgr.spec_store:
232 spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec)
233 assert dd.hostname is not None
234 if dd.daemon_type == 'haproxy':
235 addr = self.mgr.inventory.get_addr(dd.hostname)
236 srv_entries.append({
237 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"],
238 'labels': {'instance': dd.service_name()}
239 })
240 return srv_entries
241
39ae355f
TL
242 def ceph_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]:
243 """Return <http_sd_config> compatible prometheus config for ceph-exporter service."""
244 srv_entries = []
245 for dd in self.mgr.cache.get_daemons_by_service('ceph-exporter'):
246 assert dd.hostname is not None
247 addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
248 port = dd.ports[0] if dd.ports else CephExporterService.DEFAULT_SERVICE_PORT
249 srv_entries.append({
250 'targets': [build_url(host=addr, port=port).lstrip('/')],
251 'labels': {'instance': dd.hostname}
252 })
253 return srv_entries
254
33c7a0ef
TL
255 @cherrypy.expose(alias='prometheus/rules')
256 def get_prometheus_rules(self) -> str:
257 """Return currently configured prometheus rules as Yaml."""
258 cherrypy.response.headers['Content-Type'] = 'text/plain'
259 with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f:
260 return f.read()
261
20effc67
TL
262
263class HostData:
264 exposed = True
265
266 def __init__(self, mgr: "CephadmOrchestrator"):
267 self.mgr = mgr
268
269 @cherrypy.tools.json_in()
270 @cherrypy.tools.json_out()
271 def POST(self) -> Dict[str, Any]:
272 data: Dict[str, Any] = cherrypy.request.json
273 results: Dict[str, Any] = {}
274 try:
275 self.check_request_fields(data)
276 except Exception as e:
277 results['result'] = f'Bad metadata: {e}'
278 self.mgr.log.warning(f'Received bad metadata from an agent: {e}')
279 else:
280 # if we got here, we've already verified the keyring of the agent. If
281 # host agent is reporting on is marked offline, it shouldn't be any more
282 self.mgr.offline_hosts_remove(data['host'])
283 results['result'] = self.handle_metadata(data)
284 return results
285
286 def check_request_fields(self, data: Dict[str, Any]) -> None:
287 fields = '{' + ', '.join([key for key in data.keys()]) + '}'
288 if 'host' not in data:
289 raise Exception(
290 f'No host in metadata from agent ("host" field). Only received fields {fields}')
291 host = data['host']
292 if host not in self.mgr.cache.get_hosts():
293 raise Exception(f'Received metadata from agent on unknown hostname {host}')
294 if 'keyring' not in data:
295 raise Exception(
296 f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
297 if host not in self.mgr.agent_cache.agent_keys:
298 raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
299 if data['keyring'] != self.mgr.agent_cache.agent_keys[host]:
300 raise Exception(f'Got wrong keyring from agent on host {host}.')
301 if 'port' not in data:
302 raise Exception(
303 f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
304 if 'ack' not in data:
305 raise Exception(
306 f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
307 try:
308 int(data['ack'])
309 except Exception as e:
310 raise Exception(
311 f'Counter value from agent on host {host} could not be converted to an integer: {e}')
312 metadata_types = ['ls', 'networks', 'facts', 'volume']
313 metadata_types_str = '{' + ', '.join(metadata_types) + '}'
314 if not all(item in data.keys() for item in metadata_types):
315 self.mgr.log.warning(
316 f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
317
318 def handle_metadata(self, data: Dict[str, Any]) -> str:
319 try:
320 host = data['host']
321 self.mgr.agent_cache.agent_ports[host] = int(data['port'])
322 if host not in self.mgr.agent_cache.agent_counter:
323 self.mgr.agent_cache.agent_counter[host] = 1
324 self.mgr.agent_helpers._request_agent_acks({host})
325 res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
326 self.mgr.log.debug(res)
327 return res
328
329 # update timestamp of most recent agent update
330 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
331
332 error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
333 daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
334
335 up_to_date = False
336
337 int_ack = int(data['ack'])
338 if int_ack == self.mgr.agent_cache.agent_counter[host]:
339 up_to_date = True
340 else:
341 # we got old counter value with message, inform agent of new timestamp
342 if not self.mgr.agent_cache.messaging_agent(host):
343 self.mgr.agent_helpers._request_agent_acks({host})
344 self.mgr.log.debug(
345 f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
346
347 if 'ls' in data and data['ls']:
348 self.mgr._process_ls_output(host, data['ls'])
349 self.mgr.update_failed_daemon_health_check()
350 if 'networks' in data and data['networks']:
351 self.mgr.cache.update_host_networks(host, data['networks'])
352 if 'facts' in data and data['facts']:
353 self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
354 if 'volume' in data and data['volume']:
355 ret = Devices.from_json(json.loads(data['volume']))
356 self.mgr.cache.update_host_devices(host, ret.devices)
357
358 if (
359 error_daemons_old != set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
360 or daemon_count_old != len(self.mgr.cache.get_daemons_by_host(host))
361 ):
362 self.mgr.log.debug(
363 f'Change detected in state of daemons from {host} agent metadata. Kicking serve loop')
364 self.mgr._kick_serve_loop()
365
366 if up_to_date and ('ls' in data and data['ls']):
367 was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date()
368 self.mgr.cache.metadata_up_to_date[host] = True
369 if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date():
370 self.mgr.log.debug(
371 'New metadata from agent has made all hosts up to date. Kicking serve loop')
372 self.mgr._kick_serve_loop()
373 self.mgr.log.debug(
374 f'Received up-to-date metadata from agent on host {host}.')
375
376 self.mgr.agent_cache.save_agent(host)
377 return 'Successfully processed metadata.'
378
379 except Exception as e:
380 err_str = f'Failed to update metadata with metadata from agent on host {host}: {e}'
381 self.mgr.log.warning(err_str)
382 return err_str
383
384
385class AgentMessageThread(threading.Thread):
386 def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
387 self.mgr = mgr
388 self.host = host
389 self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
390 self.port = port
391 self.data: str = json.dumps(data)
392 self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
393 super(AgentMessageThread, self).__init__(target=self.run)
394
395 def run(self) -> None:
396 self.mgr.log.debug(f'Sending message to agent on host {self.host}')
397 self.mgr.agent_cache.sending_agent_message[self.host] = True
398 try:
399 assert self.mgr.cherrypy_thread
400 root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
401 root_cert_tmp = tempfile.NamedTemporaryFile()
402 root_cert_tmp.write(root_cert.encode('utf-8'))
403 root_cert_tmp.flush()
404 root_cert_fname = root_cert_tmp.name
405
406 cert, key = self.mgr.cherrypy_thread.ssl_certs.generate_cert()
407
408 cert_tmp = tempfile.NamedTemporaryFile()
409 cert_tmp.write(cert.encode('utf-8'))
410 cert_tmp.flush()
411 cert_fname = cert_tmp.name
412
413 key_tmp = tempfile.NamedTemporaryFile()
414 key_tmp.write(key.encode('utf-8'))
415 key_tmp.flush()
416 key_fname = key_tmp.name
417
418 ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=root_cert_fname)
419 ssl_ctx.verify_mode = ssl.CERT_REQUIRED
420 ssl_ctx.check_hostname = True
421 ssl_ctx.load_cert_chain(cert_fname, key_fname)
422 except Exception as e:
423 self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}')
424 self.mgr.agent_cache.sending_agent_message[self.host] = False
425 return
426 try:
427 bytes_len: str = str(len(self.data.encode('utf-8')))
428 if len(bytes_len.encode('utf-8')) > 10:
429 raise Exception(
430 f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
431 while len(bytes_len.encode('utf-8')) < 10:
432 bytes_len = '0' + bytes_len
433 except Exception as e:
434 self.mgr.log.error(f'Failed to get length of json payload: {e}')
435 self.mgr.agent_cache.sending_agent_message[self.host] = False
436 return
437 for retry_wait in [3, 5]:
438 try:
439 agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
440 secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr)
441 secure_agent_socket.connect((self.addr, self.port))
442 msg = (bytes_len + self.data)
443 secure_agent_socket.sendall(msg.encode('utf-8'))
444 agent_response = secure_agent_socket.recv(1024).decode()
445 self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
446 if self.daemon_spec:
447 self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec)
448 self.mgr.agent_cache.sending_agent_message[self.host] = False
449 return
450 except ConnectionError as e:
451 # if it's a connection error, possibly try to connect again.
452 # We could have just deployed agent and it might not be ready
453 self.mgr.log.debug(
454 f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
455 time.sleep(retry_wait)
456 except Exception as e:
457 # if it's not a connection error, something has gone wrong. Give up.
458 self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}')
459 self.mgr.agent_cache.sending_agent_message[self.host] = False
460 return
461 self.mgr.log.error(f'Could not connect to agent on host {self.host}')
462 self.mgr.agent_cache.sending_agent_message[self.host] = False
463 return
464
465
466class CephadmAgentHelpers:
467 def __init__(self, mgr: "CephadmOrchestrator"):
468 self.mgr: "CephadmOrchestrator" = mgr
469
470 def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
471 for host in hosts:
472 if increment:
473 self.mgr.cache.metadata_up_to_date[host] = False
474 if host not in self.mgr.agent_cache.agent_counter:
475 self.mgr.agent_cache.agent_counter[host] = 1
476 elif increment:
477 self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1
478 payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]}
479 if daemon_spec:
480 payload['config'] = daemon_spec.final_config
481 message_thread = AgentMessageThread(
482 host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
483 message_thread.start()
484
485 def _request_ack_all_not_up_to_date(self) -> None:
486 self.mgr.agent_helpers._request_agent_acks(
487 set([h for h in self.mgr.cache.get_hosts() if
488 (not self.mgr.cache.host_metadata_up_to_date(h)
489 and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]))
490
491 def _agent_down(self, host: str) -> bool:
492 # if host is draining or drained (has _no_schedule label) there should not
493 # be an agent deployed there and therefore we should return False
494 if host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]:
495 return False
496 # if we haven't deployed an agent on the host yet, don't say an agent is down
497 if not self.mgr.cache.get_daemons_by_type('agent', host=host):
498 return False
499 # if we don't have a timestamp, it's likely because of a mgr fail over.
500 # just set the timestamp to now. However, if host was offline before, we
501 # should not allow creating a new timestamp to cause it to be marked online
502 if host not in self.mgr.agent_cache.agent_timestamp:
503 if host in self.mgr.offline_hosts:
504 return False
505 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
506 # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
507 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
508 time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host]
509 if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate):
510 return True
511 return False
512
513 def _update_agent_down_healthcheck(self, down_agent_hosts: List[str]) -> None:
514 self.mgr.remove_health_warning('CEPHADM_AGENT_DOWN')
515 if down_agent_hosts:
516 detail: List[str] = []
517 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
518 for agent in down_agent_hosts:
519 detail.append((f'Cephadm agent on host {agent} has not reported in '
520 f'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
521 'down and host may be offline.'))
522 for dd in [d for d in self.mgr.cache.get_daemons_by_type('agent') if d.hostname in down_agent_hosts]:
523 dd.status = DaemonDescriptionStatus.error
524 self.mgr.set_health_warning(
525 'CEPHADM_AGENT_DOWN',
526 summary='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % (
527 len(down_agent_hosts)),
528 count=len(down_agent_hosts),
529 detail=detail,
530 )
531
532 # this function probably seems very unnecessary, but it makes it considerably easier
533 # to get the unit tests working. All unit tests that check which daemons were deployed
534 # or services setup would have to be individually changed to expect an agent service or
535 # daemons, OR we can put this in its own function then mock the function
536 def _apply_agent(self) -> None:
537 spec = ServiceSpec(
538 service_type='agent',
539 placement=PlacementSpec(host_pattern='*')
540 )
541 self.mgr.spec_store.save(spec)
542
543 def _handle_use_agent_setting(self) -> bool:
544 need_apply = False
545 if self.mgr.use_agent:
546 # on the off chance there are still agents hanging around from
547 # when we turned the config option off, we need to redeploy them
548 # we can tell they're in that state if we don't have a keyring for
549 # them in the host cache
550 for agent in self.mgr.cache.get_daemons_by_service('agent'):
551 if agent.hostname not in self.mgr.agent_cache.agent_keys:
552 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
553 if 'agent' not in self.mgr.spec_store:
554 self.mgr.agent_helpers._apply_agent()
555 need_apply = True
556 else:
557 if 'agent' in self.mgr.spec_store:
558 self.mgr.spec_store.rm('agent')
559 need_apply = True
560 self.mgr.agent_cache.agent_counter = {}
561 self.mgr.agent_cache.agent_timestamp = {}
562 self.mgr.agent_cache.agent_keys = {}
563 self.mgr.agent_cache.agent_ports = {}
564 return need_apply
565
566 def _check_agent(self, host: str) -> bool:
567 down = False
568 try:
569 assert self.mgr.cherrypy_thread
570 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
571 except Exception:
572 self.mgr.log.debug(
573 f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
574 return down
575 if self.mgr.agent_helpers._agent_down(host):
576 down = True
577 try:
578 agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
579 assert agent.daemon_id is not None
580 assert agent.hostname is not None
581 except Exception as e:
582 self.mgr.log.debug(
583 f'Could not retrieve agent on host {host} from daemon cache: {e}')
584 return down
585 try:
586 spec = self.mgr.spec_store.active_specs.get('agent', None)
587 deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
588 last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host)
589 if not last_config or last_deps != deps:
590 # if root cert is the dep that changed, we must use ssh to reconfig
591 # so it's necessary to check this one specifically
592 root_cert_match = False
593 try:
594 root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
595 if last_deps and root_cert in last_deps:
596 root_cert_match = True
597 except Exception:
598 pass
599 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
600 # we need to know the agent port to try to reconfig w/ http
601 # otherwise there is no choice but a full ssh reconfig
602 if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
603 daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
604 daemon_spec.daemon_type)].prepare_create(daemon_spec)
605 self.mgr.agent_helpers._request_agent_acks(
606 hosts={daemon_spec.host},
607 increment=True,
608 daemon_spec=daemon_spec,
609 )
610 else:
611 self.mgr._daemon_action(daemon_spec, action='reconfig')
612 return down
613 except Exception as e:
614 self.mgr.log.debug(
615 f'Agent on host {host} not ready to have config and deps checked: {e}')
616 action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
617 if action:
618 try:
619 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
620 self.mgr._daemon_action(daemon_spec, action=action)
621 self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
622 except Exception as e:
623 self.mgr.log.debug(
624 f'Agent on host {host} not ready to {action}: {e}')
625 return down
626
627
628class SSLCerts:
629 def __init__(self, mgr: "CephadmOrchestrator") -> None:
630 self.mgr = mgr
631 self.root_cert: Any
632 self.root_key: Any
633
634 def generate_root_cert(self) -> Tuple[str, str]:
635 self.root_key = rsa.generate_private_key(
636 public_exponent=65537, key_size=4096, backend=default_backend())
637 root_public_key = self.root_key.public_key()
638
639 root_builder = x509.CertificateBuilder()
640
641 root_builder = root_builder.subject_name(x509.Name([
642 x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
643 ]))
644
645 root_builder = root_builder.issuer_name(x509.Name([
646 x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
647 ]))
648
649 root_builder = root_builder.not_valid_before(datetime.now())
650 root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
651 root_builder = root_builder.serial_number(x509.random_serial_number())
652 root_builder = root_builder.public_key(root_public_key)
653 root_builder = root_builder.add_extension(
654 x509.SubjectAlternativeName(
655 [x509.IPAddress(ipaddress.IPv4Address(str(self.mgr.get_mgr_ip())))]
656 ),
657 critical=False
658 )
659 root_builder = root_builder.add_extension(
660 x509.BasicConstraints(ca=True, path_length=None), critical=True,
661 )
662
663 self.root_cert = root_builder.sign(
664 private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
665 )
666
667 cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
668 key_str = self.root_key.private_bytes(
669 encoding=serialization.Encoding.PEM,
670 format=serialization.PrivateFormat.TraditionalOpenSSL,
671 encryption_algorithm=serialization.NoEncryption()
672 ).decode('utf-8')
673
674 return (cert_str, key_str)
675
676 def generate_cert(self, addr: str = '') -> Tuple[str, str]:
677 have_ip = True
678 if addr:
679 try:
680 ip = x509.IPAddress(ipaddress.IPv4Address(addr))
681 except Exception:
682 try:
683 ip = x509.IPAddress(ipaddress.IPv6Address(addr))
684 except Exception:
685 have_ip = False
686 pass
687 else:
688 ip = x509.IPAddress(ipaddress.IPv4Address(self.mgr.get_mgr_ip()))
689
690 private_key = rsa.generate_private_key(
691 public_exponent=65537, key_size=4096, backend=default_backend())
692 public_key = private_key.public_key()
693
694 builder = x509.CertificateBuilder()
695
696 builder = builder.subject_name(x509.Name([
697 x509.NameAttribute(NameOID.COMMON_NAME, addr if addr else str(self.mgr.get_mgr_ip())),
698 ]))
699
700 builder = builder.issuer_name(x509.Name([
701 x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'),
702 ]))
703
704 builder = builder.not_valid_before(datetime.now())
705 builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3)))
706 builder = builder.serial_number(x509.random_serial_number())
707 builder = builder.public_key(public_key)
708 if have_ip:
709 builder = builder.add_extension(
710 x509.SubjectAlternativeName(
711 [ip]
712 ),
713 critical=False
714 )
715 builder = builder.add_extension(
716 x509.BasicConstraints(ca=False, path_length=None), critical=True,
717 )
718
719 cert = builder.sign(
720 private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend()
721 )
722
723 cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
724 key_str = private_key.private_bytes(
725 encoding=serialization.Encoding.PEM,
726 format=serialization.PrivateFormat.TraditionalOpenSSL,
727 encryption_algorithm=serialization.NoEncryption()
728 ).decode('utf-8')
729
730 return (cert_str, key_str)
731
732 def get_root_cert(self) -> str:
733 try:
734 return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
735 except AttributeError:
736 return ''
737
738 def get_root_key(self) -> str:
739 try:
740 return self.root_key.private_bytes(
741 encoding=serialization.Encoding.PEM,
742 format=serialization.PrivateFormat.TraditionalOpenSSL,
743 encryption_algorithm=serialization.NoEncryption(),
744 ).decode('utf-8')
745 except AttributeError:
746 return ''
747
748 def load_root_credentials(self, cert: str, priv_key: str) -> None:
749 given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend())
750 tz = given_cert.not_valid_after.tzinfo
751 if datetime.now(tz) >= given_cert.not_valid_after:
752 raise OrchestratorError('Given cert is expired')
753 self.root_cert = given_cert
754 self.root_key = serialization.load_pem_private_key(
755 data=priv_key.encode('utf-8'), backend=default_backend(), password=None)