]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | import cherrypy |
2 | import ipaddress | |
3 | import json | |
4 | import logging | |
5 | import socket | |
6 | import ssl | |
7 | import tempfile | |
8 | import threading | |
9 | import time | |
10 | ||
33c7a0ef TL |
11 | from mgr_module import ServiceInfoT |
12 | from mgr_util import verify_tls_files, build_url | |
20effc67 TL |
13 | from orchestrator import DaemonDescriptionStatus, OrchestratorError |
14 | from orchestrator._interface import daemon_type_to_service | |
15 | from ceph.utils import datetime_now | |
16 | from ceph.deployment.inventory import Devices | |
17 | from ceph.deployment.service_spec import ServiceSpec, PlacementSpec | |
39ae355f | 18 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephExporterService |
33c7a0ef | 19 | from cephadm.services.ingress import IngressSpec |
20effc67 TL |
20 | |
21 | from datetime import datetime, timedelta | |
22 | from cryptography import x509 | |
23 | from cryptography.x509.oid import NameOID | |
24 | from cryptography.hazmat.primitives.asymmetric import rsa | |
25 | from cryptography.hazmat.primitives import hashes, serialization | |
26 | from cryptography.hazmat.backends import default_backend | |
27 | ||
28 | from typing import Any, Dict, List, Set, Tuple, \ | |
33c7a0ef | 29 | TYPE_CHECKING, Optional, cast, Collection |
20effc67 TL |
30 | |
31 | if TYPE_CHECKING: | |
32 | from cephadm.module import CephadmOrchestrator | |
33 | ||
34 | ||
35 | def cherrypy_filter(record: logging.LogRecord) -> int: | |
36 | blocked = [ | |
37 | 'TLSV1_ALERT_DECRYPT_ERROR' | |
38 | ] | |
39 | msg = record.getMessage() | |
40 | return not any([m for m in blocked if m in msg]) | |
41 | ||
42 | ||
43 | logging.getLogger('cherrypy.error').addFilter(cherrypy_filter) | |
44 | cherrypy.log.access_log.propagate = False | |
45 | ||
46 | ||
47 | class CherryPyThread(threading.Thread): | |
48 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
49 | self.mgr = mgr | |
50 | self.cherrypy_shutdown_event = threading.Event() | |
51 | self.ssl_certs = SSLCerts(self.mgr) | |
52 | self.server_port = 7150 | |
53 | self.server_addr = self.mgr.get_mgr_ip() | |
54 | super(CherryPyThread, self).__init__(target=self.run) | |
55 | ||
33c7a0ef TL |
56 | def configure_cherrypy(self) -> None: |
57 | cherrypy.config.update({ | |
58 | 'environment': 'production', | |
59 | 'server.socket_host': self.server_addr, | |
60 | 'server.socket_port': self.server_port, | |
61 | 'engine.autoreload.on': False, | |
62 | 'server.ssl_module': 'builtin', | |
63 | 'server.ssl_certificate': self.cert_tmp.name, | |
64 | 'server.ssl_private_key': self.key_tmp.name, | |
65 | }) | |
66 | ||
67 | # configure routes | |
68 | root = Root(self.mgr) | |
69 | host_data = HostData(self.mgr) | |
70 | d = cherrypy.dispatch.RoutesDispatcher() | |
71 | d.connect(name='index', route='/', controller=root.index) | |
72 | d.connect(name='sd-config', route='/prometheus/sd-config', controller=root.get_sd_config) | |
73 | d.connect(name='rules', route='/prometheus/rules', controller=root.get_prometheus_rules) | |
74 | d.connect(name='host-data', route='/data', controller=host_data.POST, | |
75 | conditions=dict(method=['POST'])) | |
76 | ||
77 | conf = {'/': {'request.dispatch': d}} | |
78 | cherrypy.tree.mount(None, "/", config=conf) | |
79 | ||
20effc67 TL |
80 | def run(self) -> None: |
81 | try: | |
82 | try: | |
83 | old_creds = self.mgr.get_store('cephadm_endpoint_credentials') | |
84 | if not old_creds: | |
85 | raise OrchestratorError('No old credentials for cephadm endpoint found') | |
86 | old_creds_dict = json.loads(old_creds) | |
87 | old_key = old_creds_dict['key'] | |
88 | old_cert = old_creds_dict['cert'] | |
89 | self.ssl_certs.load_root_credentials(old_cert, old_key) | |
90 | except (OrchestratorError, json.decoder.JSONDecodeError, KeyError, ValueError): | |
91 | self.ssl_certs.generate_root_cert() | |
92 | ||
93 | cert, key = self.ssl_certs.generate_cert() | |
94 | ||
95 | self.key_tmp = tempfile.NamedTemporaryFile() | |
96 | self.key_tmp.write(key.encode('utf-8')) | |
97 | self.key_tmp.flush() # pkey_tmp must not be gc'ed | |
98 | key_fname = self.key_tmp.name | |
99 | ||
100 | self.cert_tmp = tempfile.NamedTemporaryFile() | |
101 | self.cert_tmp.write(cert.encode('utf-8')) | |
102 | self.cert_tmp.flush() # cert_tmp must not be gc'ed | |
103 | cert_fname = self.cert_tmp.name | |
104 | ||
105 | verify_tls_files(cert_fname, key_fname) | |
33c7a0ef | 106 | self.configure_cherrypy() |
20effc67 | 107 | |
20effc67 TL |
108 | self.mgr.log.debug('Starting cherrypy engine...') |
109 | self.start_engine() | |
110 | self.mgr.log.debug('Cherrypy engine started.') | |
111 | cephadm_endpoint_creds = { | |
112 | 'cert': self.ssl_certs.get_root_cert(), | |
113 | 'key': self.ssl_certs.get_root_key() | |
114 | } | |
115 | self.mgr.set_store('cephadm_endpoint_credentials', json.dumps(cephadm_endpoint_creds)) | |
116 | self.mgr._kick_serve_loop() | |
117 | # wait for the shutdown event | |
118 | self.cherrypy_shutdown_event.wait() | |
119 | self.cherrypy_shutdown_event.clear() | |
120 | cherrypy.engine.stop() | |
121 | self.mgr.log.debug('Cherrypy engine stopped.') | |
122 | except Exception as e: | |
123 | self.mgr.log.error(f'Failed to run cephadm cherrypy endpoint: {e}') | |
124 | ||
125 | def start_engine(self) -> None: | |
126 | port_connect_attempts = 0 | |
127 | while port_connect_attempts < 150: | |
128 | try: | |
129 | cherrypy.engine.start() | |
130 | self.mgr.log.debug(f'Cephadm endpoint connected to port {self.server_port}') | |
131 | return | |
132 | except cherrypy.process.wspbus.ChannelFailures as e: | |
133 | self.mgr.log.debug( | |
134 | f'{e}. Trying next port.') | |
135 | self.server_port += 1 | |
136 | cherrypy.server.httpserver = None | |
137 | cherrypy.config.update({ | |
138 | 'server.socket_port': self.server_port | |
139 | }) | |
140 | port_connect_attempts += 1 | |
141 | self.mgr.log.error( | |
142 | 'Cephadm Endpoint could not find free port in range 7150-7300 and failed to start') | |
143 | ||
144 | def shutdown(self) -> None: | |
145 | self.mgr.log.debug('Stopping cherrypy engine...') | |
146 | self.cherrypy_shutdown_event.set() | |
147 | ||
148 | ||
33c7a0ef TL |
149 | class Root(object): |
150 | ||
151 | # collapse everything to '/' | |
152 | def _cp_dispatch(self, vpath: str) -> 'Root': | |
153 | cherrypy.request.path = '' | |
154 | return self | |
20effc67 TL |
155 | |
156 | def __init__(self, mgr: "CephadmOrchestrator"): | |
157 | self.mgr = mgr | |
20effc67 | 158 | |
33c7a0ef TL |
159 | @cherrypy.expose |
160 | def index(self) -> str: | |
20effc67 TL |
161 | return '''<!DOCTYPE html> |
162 | <html> | |
163 | <head><title>Cephadm HTTP Endpoint</title></head> | |
164 | <body> | |
33c7a0ef TL |
165 | <h2>Cephadm Service Discovery Endpoints</h2> |
166 | <p><a href='prometheus/sd-config?service=mgr-prometheus'>mgr/Prometheus http sd-config</a></p> | |
167 | <p><a href='prometheus/sd-config?service=alertmanager'>Alertmanager http sd-config</a></p> | |
168 | <p><a href='prometheus/sd-config?service=node-exporter'>Node exporter http sd-config</a></p> | |
169 | <p><a href='prometheus/sd-config?service=haproxy'>HAProxy http sd-config</a></p> | |
170 | <p><a href='prometheus/rules'>Prometheus rules</a></p> | |
20effc67 TL |
171 | </body> |
172 | </html>''' | |
173 | ||
33c7a0ef TL |
174 | @cherrypy.expose |
175 | @cherrypy.tools.json_out() | |
176 | def get_sd_config(self, service: str) -> List[Dict[str, Collection[str]]]: | |
177 | """Return <http_sd_config> compatible prometheus config for the specified service.""" | |
178 | if service == 'mgr-prometheus': | |
179 | return self.prometheus_sd_config() | |
180 | elif service == 'alertmanager': | |
181 | return self.alertmgr_sd_config() | |
182 | elif service == 'node-exporter': | |
183 | return self.node_exporter_sd_config() | |
184 | elif service == 'haproxy': | |
185 | return self.haproxy_sd_config() | |
39ae355f TL |
186 | elif service == 'ceph-exporter': |
187 | return self.ceph_exporter_sd_config() | |
33c7a0ef TL |
188 | else: |
189 | return [] | |
190 | ||
191 | def prometheus_sd_config(self) -> List[Dict[str, Collection[str]]]: | |
192 | """Return <http_sd_config> compatible prometheus config for prometheus service.""" | |
193 | servers = self.mgr.list_servers() | |
194 | targets = [] | |
195 | for server in servers: | |
196 | hostname = server.get('hostname', '') | |
197 | for service in cast(List[ServiceInfoT], server.get('services', [])): | |
198 | if service['type'] != 'mgr': | |
199 | continue | |
200 | port = self.mgr.get_module_option_ex('prometheus', 'server_port', 9283) | |
201 | targets.append(f'{hostname}:{port}') | |
202 | return [{"targets": targets, "labels": {}}] | |
203 | ||
204 | def alertmgr_sd_config(self) -> List[Dict[str, Collection[str]]]: | |
205 | """Return <http_sd_config> compatible prometheus config for mgr alertmanager service.""" | |
206 | srv_entries = [] | |
207 | for dd in self.mgr.cache.get_daemons_by_service('alertmanager'): | |
208 | assert dd.hostname is not None | |
209 | addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) | |
210 | port = dd.ports[0] if dd.ports else 9093 | |
211 | srv_entries.append('{}'.format(build_url(host=addr, port=port).lstrip('/'))) | |
212 | return [{"targets": srv_entries, "labels": {}}] | |
213 | ||
214 | def node_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]: | |
215 | """Return <http_sd_config> compatible prometheus config for node-exporter service.""" | |
216 | srv_entries = [] | |
217 | for dd in self.mgr.cache.get_daemons_by_service('node-exporter'): | |
218 | assert dd.hostname is not None | |
219 | addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) | |
220 | port = dd.ports[0] if dd.ports else 9100 | |
221 | srv_entries.append({ | |
222 | 'targets': [build_url(host=addr, port=port).lstrip('/')], | |
223 | 'labels': {'instance': dd.hostname} | |
224 | }) | |
225 | return srv_entries | |
226 | ||
227 | def haproxy_sd_config(self) -> List[Dict[str, Collection[str]]]: | |
228 | """Return <http_sd_config> compatible prometheus config for haproxy service.""" | |
229 | srv_entries = [] | |
230 | for dd in self.mgr.cache.get_daemons_by_type('ingress'): | |
231 | if dd.service_name() in self.mgr.spec_store: | |
232 | spec = cast(IngressSpec, self.mgr.spec_store[dd.service_name()].spec) | |
233 | assert dd.hostname is not None | |
234 | if dd.daemon_type == 'haproxy': | |
235 | addr = self.mgr.inventory.get_addr(dd.hostname) | |
236 | srv_entries.append({ | |
237 | 'targets': [f"{build_url(host=addr, port=spec.monitor_port).lstrip('/')}"], | |
238 | 'labels': {'instance': dd.service_name()} | |
239 | }) | |
240 | return srv_entries | |
241 | ||
39ae355f TL |
242 | def ceph_exporter_sd_config(self) -> List[Dict[str, Collection[str]]]: |
243 | """Return <http_sd_config> compatible prometheus config for ceph-exporter service.""" | |
244 | srv_entries = [] | |
245 | for dd in self.mgr.cache.get_daemons_by_service('ceph-exporter'): | |
246 | assert dd.hostname is not None | |
247 | addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname) | |
248 | port = dd.ports[0] if dd.ports else CephExporterService.DEFAULT_SERVICE_PORT | |
249 | srv_entries.append({ | |
250 | 'targets': [build_url(host=addr, port=port).lstrip('/')], | |
251 | 'labels': {'instance': dd.hostname} | |
252 | }) | |
253 | return srv_entries | |
254 | ||
33c7a0ef TL |
255 | @cherrypy.expose(alias='prometheus/rules') |
256 | def get_prometheus_rules(self) -> str: | |
257 | """Return currently configured prometheus rules as Yaml.""" | |
258 | cherrypy.response.headers['Content-Type'] = 'text/plain' | |
259 | with open(self.mgr.prometheus_alerts_path, 'r', encoding='utf-8') as f: | |
260 | return f.read() | |
261 | ||
20effc67 TL |
262 | |
263 | class HostData: | |
264 | exposed = True | |
265 | ||
266 | def __init__(self, mgr: "CephadmOrchestrator"): | |
267 | self.mgr = mgr | |
268 | ||
269 | @cherrypy.tools.json_in() | |
270 | @cherrypy.tools.json_out() | |
271 | def POST(self) -> Dict[str, Any]: | |
272 | data: Dict[str, Any] = cherrypy.request.json | |
273 | results: Dict[str, Any] = {} | |
274 | try: | |
275 | self.check_request_fields(data) | |
276 | except Exception as e: | |
277 | results['result'] = f'Bad metadata: {e}' | |
278 | self.mgr.log.warning(f'Received bad metadata from an agent: {e}') | |
279 | else: | |
280 | # if we got here, we've already verified the keyring of the agent. If | |
281 | # host agent is reporting on is marked offline, it shouldn't be any more | |
282 | self.mgr.offline_hosts_remove(data['host']) | |
283 | results['result'] = self.handle_metadata(data) | |
284 | return results | |
285 | ||
286 | def check_request_fields(self, data: Dict[str, Any]) -> None: | |
287 | fields = '{' + ', '.join([key for key in data.keys()]) + '}' | |
288 | if 'host' not in data: | |
289 | raise Exception( | |
290 | f'No host in metadata from agent ("host" field). Only received fields {fields}') | |
291 | host = data['host'] | |
292 | if host not in self.mgr.cache.get_hosts(): | |
293 | raise Exception(f'Received metadata from agent on unknown hostname {host}') | |
294 | if 'keyring' not in data: | |
295 | raise Exception( | |
296 | f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}') | |
297 | if host not in self.mgr.agent_cache.agent_keys: | |
298 | raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent') | |
299 | if data['keyring'] != self.mgr.agent_cache.agent_keys[host]: | |
300 | raise Exception(f'Got wrong keyring from agent on host {host}.') | |
301 | if 'port' not in data: | |
302 | raise Exception( | |
303 | f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}') | |
304 | if 'ack' not in data: | |
305 | raise Exception( | |
306 | f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}') | |
307 | try: | |
308 | int(data['ack']) | |
309 | except Exception as e: | |
310 | raise Exception( | |
311 | f'Counter value from agent on host {host} could not be converted to an integer: {e}') | |
312 | metadata_types = ['ls', 'networks', 'facts', 'volume'] | |
313 | metadata_types_str = '{' + ', '.join(metadata_types) + '}' | |
314 | if not all(item in data.keys() for item in metadata_types): | |
315 | self.mgr.log.warning( | |
316 | f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}') | |
317 | ||
318 | def handle_metadata(self, data: Dict[str, Any]) -> str: | |
319 | try: | |
320 | host = data['host'] | |
321 | self.mgr.agent_cache.agent_ports[host] = int(data['port']) | |
322 | if host not in self.mgr.agent_cache.agent_counter: | |
323 | self.mgr.agent_cache.agent_counter[host] = 1 | |
324 | self.mgr.agent_helpers._request_agent_acks({host}) | |
325 | res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata' | |
326 | self.mgr.log.debug(res) | |
327 | return res | |
328 | ||
329 | # update timestamp of most recent agent update | |
330 | self.mgr.agent_cache.agent_timestamp[host] = datetime_now() | |
331 | ||
332 | error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()]) | |
333 | daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host)) | |
334 | ||
335 | up_to_date = False | |
336 | ||
337 | int_ack = int(data['ack']) | |
338 | if int_ack == self.mgr.agent_cache.agent_counter[host]: | |
339 | up_to_date = True | |
340 | else: | |
341 | # we got old counter value with message, inform agent of new timestamp | |
342 | if not self.mgr.agent_cache.messaging_agent(host): | |
343 | self.mgr.agent_helpers._request_agent_acks({host}) | |
344 | self.mgr.log.debug( | |
345 | f'Received old metadata from agent on host {host}. Requested up-to-date metadata.') | |
346 | ||
347 | if 'ls' in data and data['ls']: | |
348 | self.mgr._process_ls_output(host, data['ls']) | |
349 | self.mgr.update_failed_daemon_health_check() | |
350 | if 'networks' in data and data['networks']: | |
351 | self.mgr.cache.update_host_networks(host, data['networks']) | |
352 | if 'facts' in data and data['facts']: | |
353 | self.mgr.cache.update_host_facts(host, json.loads(data['facts'])) | |
354 | if 'volume' in data and data['volume']: | |
355 | ret = Devices.from_json(json.loads(data['volume'])) | |
356 | self.mgr.cache.update_host_devices(host, ret.devices) | |
357 | ||
358 | if ( | |
359 | error_daemons_old != set([dd.name() for dd in self.mgr.cache.get_error_daemons()]) | |
360 | or daemon_count_old != len(self.mgr.cache.get_daemons_by_host(host)) | |
361 | ): | |
362 | self.mgr.log.debug( | |
363 | f'Change detected in state of daemons from {host} agent metadata. Kicking serve loop') | |
364 | self.mgr._kick_serve_loop() | |
365 | ||
366 | if up_to_date and ('ls' in data and data['ls']): | |
367 | was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date() | |
368 | self.mgr.cache.metadata_up_to_date[host] = True | |
369 | if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date(): | |
370 | self.mgr.log.debug( | |
371 | 'New metadata from agent has made all hosts up to date. Kicking serve loop') | |
372 | self.mgr._kick_serve_loop() | |
373 | self.mgr.log.debug( | |
374 | f'Received up-to-date metadata from agent on host {host}.') | |
375 | ||
376 | self.mgr.agent_cache.save_agent(host) | |
377 | return 'Successfully processed metadata.' | |
378 | ||
379 | except Exception as e: | |
380 | err_str = f'Failed to update metadata with metadata from agent on host {host}: {e}' | |
381 | self.mgr.log.warning(err_str) | |
382 | return err_str | |
383 | ||
384 | ||
385 | class AgentMessageThread(threading.Thread): | |
386 | def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None: | |
387 | self.mgr = mgr | |
388 | self.host = host | |
389 | self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host | |
390 | self.port = port | |
391 | self.data: str = json.dumps(data) | |
392 | self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec | |
393 | super(AgentMessageThread, self).__init__(target=self.run) | |
394 | ||
395 | def run(self) -> None: | |
396 | self.mgr.log.debug(f'Sending message to agent on host {self.host}') | |
397 | self.mgr.agent_cache.sending_agent_message[self.host] = True | |
398 | try: | |
399 | assert self.mgr.cherrypy_thread | |
400 | root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert() | |
401 | root_cert_tmp = tempfile.NamedTemporaryFile() | |
402 | root_cert_tmp.write(root_cert.encode('utf-8')) | |
403 | root_cert_tmp.flush() | |
404 | root_cert_fname = root_cert_tmp.name | |
405 | ||
406 | cert, key = self.mgr.cherrypy_thread.ssl_certs.generate_cert() | |
407 | ||
408 | cert_tmp = tempfile.NamedTemporaryFile() | |
409 | cert_tmp.write(cert.encode('utf-8')) | |
410 | cert_tmp.flush() | |
411 | cert_fname = cert_tmp.name | |
412 | ||
413 | key_tmp = tempfile.NamedTemporaryFile() | |
414 | key_tmp.write(key.encode('utf-8')) | |
415 | key_tmp.flush() | |
416 | key_fname = key_tmp.name | |
417 | ||
418 | ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=root_cert_fname) | |
419 | ssl_ctx.verify_mode = ssl.CERT_REQUIRED | |
420 | ssl_ctx.check_hostname = True | |
421 | ssl_ctx.load_cert_chain(cert_fname, key_fname) | |
422 | except Exception as e: | |
423 | self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}') | |
424 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
425 | return | |
426 | try: | |
427 | bytes_len: str = str(len(self.data.encode('utf-8'))) | |
428 | if len(bytes_len.encode('utf-8')) > 10: | |
429 | raise Exception( | |
430 | f'Message is too big to send to agent. Message size is {bytes_len} bytes!') | |
431 | while len(bytes_len.encode('utf-8')) < 10: | |
432 | bytes_len = '0' + bytes_len | |
433 | except Exception as e: | |
434 | self.mgr.log.error(f'Failed to get length of json payload: {e}') | |
435 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
436 | return | |
437 | for retry_wait in [3, 5]: | |
438 | try: | |
439 | agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
440 | secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr) | |
441 | secure_agent_socket.connect((self.addr, self.port)) | |
442 | msg = (bytes_len + self.data) | |
443 | secure_agent_socket.sendall(msg.encode('utf-8')) | |
444 | agent_response = secure_agent_socket.recv(1024).decode() | |
445 | self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}') | |
446 | if self.daemon_spec: | |
447 | self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec) | |
448 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
449 | return | |
450 | except ConnectionError as e: | |
451 | # if it's a connection error, possibly try to connect again. | |
452 | # We could have just deployed agent and it might not be ready | |
453 | self.mgr.log.debug( | |
454 | f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}') | |
455 | time.sleep(retry_wait) | |
456 | except Exception as e: | |
457 | # if it's not a connection error, something has gone wrong. Give up. | |
458 | self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}') | |
459 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
460 | return | |
461 | self.mgr.log.error(f'Could not connect to agent on host {self.host}') | |
462 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
463 | return | |
464 | ||
465 | ||
466 | class CephadmAgentHelpers: | |
467 | def __init__(self, mgr: "CephadmOrchestrator"): | |
468 | self.mgr: "CephadmOrchestrator" = mgr | |
469 | ||
470 | def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None: | |
471 | for host in hosts: | |
472 | if increment: | |
473 | self.mgr.cache.metadata_up_to_date[host] = False | |
474 | if host not in self.mgr.agent_cache.agent_counter: | |
475 | self.mgr.agent_cache.agent_counter[host] = 1 | |
476 | elif increment: | |
477 | self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1 | |
478 | payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]} | |
479 | if daemon_spec: | |
480 | payload['config'] = daemon_spec.final_config | |
481 | message_thread = AgentMessageThread( | |
482 | host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec) | |
483 | message_thread.start() | |
484 | ||
485 | def _request_ack_all_not_up_to_date(self) -> None: | |
486 | self.mgr.agent_helpers._request_agent_acks( | |
487 | set([h for h in self.mgr.cache.get_hosts() if | |
488 | (not self.mgr.cache.host_metadata_up_to_date(h) | |
489 | and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])) | |
490 | ||
491 | def _agent_down(self, host: str) -> bool: | |
492 | # if host is draining or drained (has _no_schedule label) there should not | |
493 | # be an agent deployed there and therefore we should return False | |
494 | if host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]: | |
495 | return False | |
496 | # if we haven't deployed an agent on the host yet, don't say an agent is down | |
497 | if not self.mgr.cache.get_daemons_by_type('agent', host=host): | |
498 | return False | |
499 | # if we don't have a timestamp, it's likely because of a mgr fail over. | |
500 | # just set the timestamp to now. However, if host was offline before, we | |
501 | # should not allow creating a new timestamp to cause it to be marked online | |
502 | if host not in self.mgr.agent_cache.agent_timestamp: | |
503 | if host in self.mgr.offline_hosts: | |
504 | return False | |
505 | self.mgr.agent_cache.agent_timestamp[host] = datetime_now() | |
506 | # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it. | |
507 | down_mult: float = max(self.mgr.agent_down_multiplier, 1.5) | |
508 | time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host] | |
509 | if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate): | |
510 | return True | |
511 | return False | |
512 | ||
513 | def _update_agent_down_healthcheck(self, down_agent_hosts: List[str]) -> None: | |
514 | self.mgr.remove_health_warning('CEPHADM_AGENT_DOWN') | |
515 | if down_agent_hosts: | |
516 | detail: List[str] = [] | |
517 | down_mult: float = max(self.mgr.agent_down_multiplier, 1.5) | |
518 | for agent in down_agent_hosts: | |
519 | detail.append((f'Cephadm agent on host {agent} has not reported in ' | |
520 | f'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed ' | |
521 | 'down and host may be offline.')) | |
522 | for dd in [d for d in self.mgr.cache.get_daemons_by_type('agent') if d.hostname in down_agent_hosts]: | |
523 | dd.status = DaemonDescriptionStatus.error | |
524 | self.mgr.set_health_warning( | |
525 | 'CEPHADM_AGENT_DOWN', | |
526 | summary='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % ( | |
527 | len(down_agent_hosts)), | |
528 | count=len(down_agent_hosts), | |
529 | detail=detail, | |
530 | ) | |
531 | ||
532 | # this function probably seems very unnecessary, but it makes it considerably easier | |
533 | # to get the unit tests working. All unit tests that check which daemons were deployed | |
534 | # or services setup would have to be individually changed to expect an agent service or | |
535 | # daemons, OR we can put this in its own function then mock the function | |
536 | def _apply_agent(self) -> None: | |
537 | spec = ServiceSpec( | |
538 | service_type='agent', | |
539 | placement=PlacementSpec(host_pattern='*') | |
540 | ) | |
541 | self.mgr.spec_store.save(spec) | |
542 | ||
543 | def _handle_use_agent_setting(self) -> bool: | |
544 | need_apply = False | |
545 | if self.mgr.use_agent: | |
546 | # on the off chance there are still agents hanging around from | |
547 | # when we turned the config option off, we need to redeploy them | |
548 | # we can tell they're in that state if we don't have a keyring for | |
549 | # them in the host cache | |
550 | for agent in self.mgr.cache.get_daemons_by_service('agent'): | |
551 | if agent.hostname not in self.mgr.agent_cache.agent_keys: | |
552 | self.mgr._schedule_daemon_action(agent.name(), 'redeploy') | |
553 | if 'agent' not in self.mgr.spec_store: | |
554 | self.mgr.agent_helpers._apply_agent() | |
555 | need_apply = True | |
556 | else: | |
557 | if 'agent' in self.mgr.spec_store: | |
558 | self.mgr.spec_store.rm('agent') | |
559 | need_apply = True | |
560 | self.mgr.agent_cache.agent_counter = {} | |
561 | self.mgr.agent_cache.agent_timestamp = {} | |
562 | self.mgr.agent_cache.agent_keys = {} | |
563 | self.mgr.agent_cache.agent_ports = {} | |
564 | return need_apply | |
565 | ||
566 | def _check_agent(self, host: str) -> bool: | |
567 | down = False | |
568 | try: | |
569 | assert self.mgr.cherrypy_thread | |
570 | assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() | |
571 | except Exception: | |
572 | self.mgr.log.debug( | |
573 | f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert') | |
574 | return down | |
575 | if self.mgr.agent_helpers._agent_down(host): | |
576 | down = True | |
577 | try: | |
578 | agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0] | |
579 | assert agent.daemon_id is not None | |
580 | assert agent.hostname is not None | |
581 | except Exception as e: | |
582 | self.mgr.log.debug( | |
583 | f'Could not retrieve agent on host {host} from daemon cache: {e}') | |
584 | return down | |
585 | try: | |
586 | spec = self.mgr.spec_store.active_specs.get('agent', None) | |
587 | deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id) | |
588 | last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host) | |
589 | if not last_config or last_deps != deps: | |
590 | # if root cert is the dep that changed, we must use ssh to reconfig | |
591 | # so it's necessary to check this one specifically | |
592 | root_cert_match = False | |
593 | try: | |
594 | root_cert = self.mgr.cherrypy_thread.ssl_certs.get_root_cert() | |
595 | if last_deps and root_cert in last_deps: | |
596 | root_cert_match = True | |
597 | except Exception: | |
598 | pass | |
599 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent) | |
600 | # we need to know the agent port to try to reconfig w/ http | |
601 | # otherwise there is no choice but a full ssh reconfig | |
602 | if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down: | |
603 | daemon_spec = self.mgr.cephadm_services[daemon_type_to_service( | |
604 | daemon_spec.daemon_type)].prepare_create(daemon_spec) | |
605 | self.mgr.agent_helpers._request_agent_acks( | |
606 | hosts={daemon_spec.host}, | |
607 | increment=True, | |
608 | daemon_spec=daemon_spec, | |
609 | ) | |
610 | else: | |
611 | self.mgr._daemon_action(daemon_spec, action='reconfig') | |
612 | return down | |
613 | except Exception as e: | |
614 | self.mgr.log.debug( | |
615 | f'Agent on host {host} not ready to have config and deps checked: {e}') | |
616 | action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name()) | |
617 | if action: | |
618 | try: | |
619 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent) | |
620 | self.mgr._daemon_action(daemon_spec, action=action) | |
621 | self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name()) | |
622 | except Exception as e: | |
623 | self.mgr.log.debug( | |
624 | f'Agent on host {host} not ready to {action}: {e}') | |
625 | return down | |
626 | ||
627 | ||
628 | class SSLCerts: | |
629 | def __init__(self, mgr: "CephadmOrchestrator") -> None: | |
630 | self.mgr = mgr | |
631 | self.root_cert: Any | |
632 | self.root_key: Any | |
633 | ||
634 | def generate_root_cert(self) -> Tuple[str, str]: | |
635 | self.root_key = rsa.generate_private_key( | |
636 | public_exponent=65537, key_size=4096, backend=default_backend()) | |
637 | root_public_key = self.root_key.public_key() | |
638 | ||
639 | root_builder = x509.CertificateBuilder() | |
640 | ||
641 | root_builder = root_builder.subject_name(x509.Name([ | |
642 | x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), | |
643 | ])) | |
644 | ||
645 | root_builder = root_builder.issuer_name(x509.Name([ | |
646 | x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), | |
647 | ])) | |
648 | ||
649 | root_builder = root_builder.not_valid_before(datetime.now()) | |
650 | root_builder = root_builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3))) | |
651 | root_builder = root_builder.serial_number(x509.random_serial_number()) | |
652 | root_builder = root_builder.public_key(root_public_key) | |
653 | root_builder = root_builder.add_extension( | |
654 | x509.SubjectAlternativeName( | |
655 | [x509.IPAddress(ipaddress.IPv4Address(str(self.mgr.get_mgr_ip())))] | |
656 | ), | |
657 | critical=False | |
658 | ) | |
659 | root_builder = root_builder.add_extension( | |
660 | x509.BasicConstraints(ca=True, path_length=None), critical=True, | |
661 | ) | |
662 | ||
663 | self.root_cert = root_builder.sign( | |
664 | private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() | |
665 | ) | |
666 | ||
667 | cert_str = self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') | |
668 | key_str = self.root_key.private_bytes( | |
669 | encoding=serialization.Encoding.PEM, | |
670 | format=serialization.PrivateFormat.TraditionalOpenSSL, | |
671 | encryption_algorithm=serialization.NoEncryption() | |
672 | ).decode('utf-8') | |
673 | ||
674 | return (cert_str, key_str) | |
675 | ||
676 | def generate_cert(self, addr: str = '') -> Tuple[str, str]: | |
677 | have_ip = True | |
678 | if addr: | |
679 | try: | |
680 | ip = x509.IPAddress(ipaddress.IPv4Address(addr)) | |
681 | except Exception: | |
682 | try: | |
683 | ip = x509.IPAddress(ipaddress.IPv6Address(addr)) | |
684 | except Exception: | |
685 | have_ip = False | |
686 | pass | |
687 | else: | |
688 | ip = x509.IPAddress(ipaddress.IPv4Address(self.mgr.get_mgr_ip())) | |
689 | ||
690 | private_key = rsa.generate_private_key( | |
691 | public_exponent=65537, key_size=4096, backend=default_backend()) | |
692 | public_key = private_key.public_key() | |
693 | ||
694 | builder = x509.CertificateBuilder() | |
695 | ||
696 | builder = builder.subject_name(x509.Name([ | |
697 | x509.NameAttribute(NameOID.COMMON_NAME, addr if addr else str(self.mgr.get_mgr_ip())), | |
698 | ])) | |
699 | ||
700 | builder = builder.issuer_name(x509.Name([ | |
701 | x509.NameAttribute(NameOID.COMMON_NAME, u'cephadm-root'), | |
702 | ])) | |
703 | ||
704 | builder = builder.not_valid_before(datetime.now()) | |
705 | builder = builder.not_valid_after(datetime.now() + timedelta(days=(365 * 10 + 3))) | |
706 | builder = builder.serial_number(x509.random_serial_number()) | |
707 | builder = builder.public_key(public_key) | |
708 | if have_ip: | |
709 | builder = builder.add_extension( | |
710 | x509.SubjectAlternativeName( | |
711 | [ip] | |
712 | ), | |
713 | critical=False | |
714 | ) | |
715 | builder = builder.add_extension( | |
716 | x509.BasicConstraints(ca=False, path_length=None), critical=True, | |
717 | ) | |
718 | ||
719 | cert = builder.sign( | |
720 | private_key=self.root_key, algorithm=hashes.SHA256(), backend=default_backend() | |
721 | ) | |
722 | ||
723 | cert_str = cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') | |
724 | key_str = private_key.private_bytes( | |
725 | encoding=serialization.Encoding.PEM, | |
726 | format=serialization.PrivateFormat.TraditionalOpenSSL, | |
727 | encryption_algorithm=serialization.NoEncryption() | |
728 | ).decode('utf-8') | |
729 | ||
730 | return (cert_str, key_str) | |
731 | ||
732 | def get_root_cert(self) -> str: | |
733 | try: | |
734 | return self.root_cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8') | |
735 | except AttributeError: | |
736 | return '' | |
737 | ||
738 | def get_root_key(self) -> str: | |
739 | try: | |
740 | return self.root_key.private_bytes( | |
741 | encoding=serialization.Encoding.PEM, | |
742 | format=serialization.PrivateFormat.TraditionalOpenSSL, | |
743 | encryption_algorithm=serialization.NoEncryption(), | |
744 | ).decode('utf-8') | |
745 | except AttributeError: | |
746 | return '' | |
747 | ||
748 | def load_root_credentials(self, cert: str, priv_key: str) -> None: | |
749 | given_cert = x509.load_pem_x509_certificate(cert.encode('utf-8'), backend=default_backend()) | |
750 | tz = given_cert.not_valid_after.tzinfo | |
751 | if datetime.now(tz) >= given_cert.not_valid_after: | |
752 | raise OrchestratorError('Given cert is expired') | |
753 | self.root_cert = given_cert | |
754 | self.root_key = serialization.load_pem_private_key( | |
755 | data=priv_key.encode('utf-8'), backend=default_backend(), password=None) |