]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/agent.py
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / pybind / mgr / cephadm / agent.py
CommitLineData
1e59de90
TL
1try:
2 import cherrypy
3 from cherrypy._cpserver import Server
4except ImportError:
5 # to avoid sphinx build crash
6 class Server: # type: ignore
7 pass
8
20effc67
TL
9import json
10import logging
11import socket
12import ssl
13import tempfile
14import threading
15import time
16
1e59de90 17from orchestrator import DaemonDescriptionStatus
20effc67 18from orchestrator._interface import daemon_type_to_service
f38dd50b 19from ceph.utils import datetime_now, http_req
20effc67
TL
20from ceph.deployment.inventory import Devices
21from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
1e59de90
TL
22from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
23from cephadm.ssl_cert_utils import SSLCerts
24from mgr_util import test_port_allocation, PortAlreadyInUse
20effc67 25
f38dd50b
TL
26from urllib.error import HTTPError, URLError
27from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping
20effc67
TL
28
29if TYPE_CHECKING:
30 from cephadm.module import CephadmOrchestrator
31
32
33def cherrypy_filter(record: logging.LogRecord) -> int:
34 blocked = [
35 'TLSV1_ALERT_DECRYPT_ERROR'
36 ]
37 msg = record.getMessage()
38 return not any([m for m in blocked if m in msg])
39
40
41logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
42cherrypy.log.access_log.propagate = False
43
44
1e59de90
TL
45class AgentEndpoint:
46
47 KV_STORE_AGENT_ROOT_CERT = 'cephadm_agent/root/cert'
48 KV_STORE_AGENT_ROOT_KEY = 'cephadm_agent/root/key'
49
20effc67
TL
50 def __init__(self, mgr: "CephadmOrchestrator") -> None:
51 self.mgr = mgr
1e59de90 52 self.ssl_certs = SSLCerts()
20effc67
TL
53 self.server_port = 7150
54 self.server_addr = self.mgr.get_mgr_ip()
1e59de90
TL
55
56 def configure_routes(self) -> None:
f38dd50b
TL
57 conf = {'/': {'tools.trailing_slash.on': False}}
58
59 cherrypy.tree.mount(self.host_data, '/data', config=conf)
60 cherrypy.tree.mount(self.node_proxy_endpoint, '/node-proxy', config=conf)
33c7a0ef 61
1e59de90
TL
62 def configure_tls(self, server: Server) -> None:
63 old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
64 old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
65 if old_cert and old_key:
66 self.ssl_certs.load_root_credentials(old_cert, old_key)
67 else:
68 self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
69 self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert())
70 self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key())
33c7a0ef 71
1e59de90
TL
72 host = self.mgr.get_hostname()
73 addr = self.mgr.get_mgr_ip()
74 server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
20effc67 75
1e59de90
TL
76 def find_free_port(self) -> None:
77 max_port = self.server_port + 150
78 while self.server_port <= max_port:
20effc67 79 try:
1e59de90
TL
80 test_port_allocation(self.server_addr, self.server_port)
81 self.host_data.socket_port = self.server_port
82 self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}')
20effc67 83 return
1e59de90 84 except PortAlreadyInUse:
20effc67 85 self.server_port += 1
1e59de90 86 self.mgr.log.error(f'Cephadm agent could not find free port in range {max_port - 150}-{max_port} and failed to start')
20effc67 87
1e59de90
TL
88 def configure(self) -> None:
89 self.host_data = HostData(self.mgr, self.server_port, self.server_addr)
90 self.configure_tls(self.host_data)
f38dd50b 91 self.node_proxy_endpoint = NodeProxyEndpoint(self.mgr)
1e59de90
TL
92 self.configure_routes()
93 self.find_free_port()
20effc67 94
33c7a0ef 95
f38dd50b
TL
96class NodeProxyEndpoint:
97 def __init__(self, mgr: "CephadmOrchestrator"):
98 self.mgr = mgr
99 self.ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
100 self.ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
101 self.ssl_ctx.check_hostname = False
102 self.ssl_ctx.verify_mode = ssl.CERT_NONE
103 # self.ssl_ctx = ssl.create_default_context()
104 # self.ssl_ctx.check_hostname = True
105 # self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
106 # self.ssl_ctx.load_verify_locations(cadata=self.ssl_root_crt)
107 self.redfish_token: str = ''
108 self.redfish_session_location: str = ''
109
110 def _cp_dispatch(self, vpath: List[str]) -> "NodeProxyEndpoint":
111 if len(vpath) > 1: # /{hostname}/<endpoint>
112 hostname = vpath.pop(0) # /<endpoint>
113 cherrypy.request.params['hostname'] = hostname
114 # /{hostname}/led/{type}/{drive} eg: /{hostname}/led/chassis or /{hostname}/led/drive/{id}
115 if vpath[0] == 'led' and len(vpath) > 1: # /led/{type}/{id}
116 _type = vpath[1]
117 cherrypy.request.params['type'] = _type
118 vpath.pop(1) # /led/{id} or # /led
119 if _type == 'drive' and len(vpath) > 1: # /led/{id}
120 _id = vpath[1]
121 vpath.pop(1) # /led
122 cherrypy.request.params['id'] = _id
123 # /<endpoint>
124 return self
125
126 @cherrypy.expose
127 @cherrypy.tools.allow(methods=['POST'])
128 @cherrypy.tools.json_in()
129 @cherrypy.tools.json_out()
130 def oob(self) -> Dict[str, Any]:
131 """
132 Get the out-of-band management tool details for a given host.
133
134 :return: oob details.
135 :rtype: dict
136 """
137 data: Dict[str, Any] = cherrypy.request.json
138 results: Dict[str, Any] = {}
139
140 self.validate_node_proxy_data(data)
141
142 # expecting name to be "node-proxy.<hostname>"
143 hostname = data['cephx']['name'][11:]
144 results['result'] = self.mgr.node_proxy_cache.oob.get(hostname, '')
145 if not results['result']:
146 raise cherrypy.HTTPError(400, 'The provided host has no iDrac details.')
147 return results
148
149 def validate_node_proxy_data(self, data: Dict[str, Any]) -> None:
150 """
151 Validate received data.
152
153 :param data: data to validate.
154 :type data: dict
155
156 :raises cherrypy.HTTPError 400: If the data is not valid (missing fields)
157 :raises cherrypy.HTTPError 403: If the secret provided is wrong.
158 """
159 cherrypy.response.status = 200
160 try:
161 if 'cephx' not in data.keys():
162 raise cherrypy.HTTPError(400, 'The field \'cephx\' must be provided.')
163 elif 'name' not in data['cephx'].keys():
164 cherrypy.response.status = 400
165 raise cherrypy.HTTPError(400, 'The field \'name\' must be provided.')
166 # expecting name to be "node-proxy.<hostname>"
167 hostname = data['cephx']['name'][11:]
168 if 'secret' not in data['cephx'].keys():
169 raise cherrypy.HTTPError(400, 'The node-proxy keyring must be provided.')
170 elif not self.mgr.node_proxy_cache.keyrings.get(hostname, ''):
171 raise cherrypy.HTTPError(502, f'Make sure the node-proxy is running on {hostname}')
172 elif data['cephx']['secret'] != self.mgr.node_proxy_cache.keyrings[hostname]:
173 raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {hostname}.')
174 except AttributeError:
175 raise cherrypy.HTTPError(400, 'Malformed data received.')
176
177 # TODO(guits): refactor this
178 # TODO(guits): use self.node_proxy.get_critical_from_host() ?
179 def get_nok_members(self,
180 data: Dict[str, Any]) -> List[Dict[str, str]]:
181 """
182 Retrieves members whose status is not 'ok'.
183
184 :param data: Data containing information about members.
185 :type data: dict
186
187 :return: A list containing dictionaries of members whose status is not 'ok'.
188 :rtype: List[Dict[str, str]]
189
190 :return: None
191 :rtype: None
192 """
193 nok_members: List[Dict[str, str]] = []
194
195 for member in data.keys():
196 _status = data[member]['status']['health'].lower()
197 if _status.lower() != 'ok':
198 state = data[member]['status']['state']
199 _member = dict(
200 member=member,
201 status=_status,
202 state=state
203 )
204 nok_members.append(_member)
205
206 return nok_members
207
208 def raise_alert(self, data: Dict[str, Any]) -> None:
209 """
210 Raises hardware alerts based on the provided patch status.
211
212 :param data: Data containing patch status information.
213 :type data: dict
214
215 This function iterates through the provided status
216 information to raise hardware alerts.
217 For each component in the provided data, it removes any
218 existing health warnings associated with it and checks
219 for non-okay members using the `get_nok_members` method.
220 If non-okay members are found, it sets a new health
221 warning for that component and generates a report detailing
222 the non-okay members' statuses.
223
224 Note: This function relies on the `get_nok_members` method to
225 identify non-okay members.
226
227 :return: None
228 :rtype: None
229 """
230
231 for component in data['patch']['status'].keys():
232 alert_name = f"HARDWARE_{component.upper()}"
233 self.mgr.remove_health_warning(alert_name)
234 nok_members = self.get_nok_members(data['patch']['status'][component])
235
236 if nok_members:
237 count = len(nok_members)
238 self.mgr.set_health_warning(
239 alert_name,
240 summary=f'{count} {component} member{"s" if count > 1 else ""} {"are" if count > 1 else "is"} not ok',
241 count=count,
242 detail=[f"{member['member']} is {member['status']}: {member['state']}" for member in nok_members],
243 )
244
245 @cherrypy.expose
246 @cherrypy.tools.allow(methods=['POST'])
247 @cherrypy.tools.json_in()
248 @cherrypy.tools.json_out()
249 def data(self) -> None:
250 """
251 Handles incoming data via a POST request.
252
253 This function is exposed to handle POST requests and expects incoming
254 JSON data. It processes the incoming data by first validating it
255 through the `validate_node_proxy_data` method. Subsequently, it
256 extracts the hostname from the data and saves the information
257 using `mgr.node_proxy.save`. Finally, it raises alerts based on the
258 provided status through the `raise_alert` method.
259
260 :return: None
261 :rtype: None
262 """
263 data: Dict[str, Any] = cherrypy.request.json
264 self.validate_node_proxy_data(data)
265 if 'patch' not in data.keys():
266 raise cherrypy.HTTPError(400, 'Malformed data received.')
267 host = data['cephx']['name'][11:]
268 self.mgr.node_proxy_cache.save(host, data['patch'])
269 self.raise_alert(data)
270
271 @cherrypy.expose
272 @cherrypy.tools.allow(methods=['GET', 'PATCH'])
273 @cherrypy.tools.json_in()
274 @cherrypy.tools.json_out()
275 def led(self, **kw: Any) -> Dict[str, Any]:
276 """
277 Handles enclosure LED operations for the specified hostname.
278
279 This function handles GET and PATCH requests related to LED status for a
280 specific hostname. It identifies the request method and provided hostname.
281 If the hostname is missing, it logs an error and returns an error message.
282
283 For PATCH requests, it prepares authorization headers based on the
284 provided ID and password, encodes them, and constructs the authorization
285 header.
286
287 After processing, it queries the endpoint and returns the result.
288
289 :param kw: Keyword arguments including 'hostname'.
290 :type kw: dict
291
292 :return: Result of the LED-related operation.
293 :rtype: dict[str, Any]
294 """
295 method: str = cherrypy.request.method
296 header: MutableMapping[str, str] = {}
297 hostname: Optional[str] = kw.get('hostname')
298 led_type: Optional[str] = kw.get('type')
299 id_drive: Optional[str] = kw.get('id')
300 payload: Optional[Dict[str, str]] = None
301 endpoint: List[Any] = ['led', led_type]
302 device: str = id_drive if id_drive else ''
303
304 ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
305 ssl_ctx = ssl.create_default_context()
306 ssl_ctx.check_hostname = True
307 ssl_ctx.verify_mode = ssl.CERT_REQUIRED
308 ssl_ctx.load_verify_locations(cadata=ssl_root_crt)
309
310 if not hostname:
311 msg: str = "listing enclosure LED status for all nodes is not implemented."
312 self.mgr.log.debug(msg)
313 raise cherrypy.HTTPError(501, msg)
314
315 if not led_type:
316 msg = "the led type must be provided (either 'chassis' or 'drive')."
317 self.mgr.log.debug(msg)
318 raise cherrypy.HTTPError(400, msg)
319
320 if led_type == 'drive' and not id_drive:
321 msg = "the id of the drive must be provided when type is 'drive'."
322 self.mgr.log.debug(msg)
323 raise cherrypy.HTTPError(400, msg)
324
325 if led_type == 'drive':
326 endpoint.append(device)
327
328 if hostname not in self.mgr.node_proxy_cache.data.keys():
329 # TODO(guits): update unit test for this
330 msg = f"'{hostname}' not found."
331 self.mgr.log.debug(msg)
332 raise cherrypy.HTTPError(400, msg)
333
334 addr: str = self.mgr.inventory.get_addr(hostname)
335
336 if method == 'PATCH':
337 # TODO(guits): need to check the request is authorized
338 # allowing a specific keyring only ? (client.admin or client.agent.. ?)
339 data: Dict[str, Any] = cherrypy.request.json
340 if 'state' not in data.keys():
341 msg = "'state' key not provided."
342 raise cherrypy.HTTPError(400, msg)
343 if 'keyring' not in data.keys():
344 msg = "'keyring' key must be provided."
345 raise cherrypy.HTTPError(400, msg)
346 if data['keyring'] != self.mgr.node_proxy_cache.keyrings.get(hostname):
347 msg = 'wrong keyring provided.'
348 raise cherrypy.HTTPError(401, msg)
349 payload = {}
350 payload['state'] = data['state']
351
352 if led_type == 'drive':
353 if id_drive not in self.mgr.node_proxy_cache.data[hostname]['status']['storage'].keys():
354 # TODO(guits): update unit test for this
355 msg = f"'{id_drive}' not found."
356 self.mgr.log.debug(msg)
357 raise cherrypy.HTTPError(400, msg)
358
359 endpoint = f'/{"/".join(endpoint)}'
360 header = self.mgr.node_proxy.generate_auth_header(hostname)
361
362 try:
363 headers, result, status = http_req(hostname=addr,
364 port='9456',
365 headers=header,
366 method=method,
367 data=json.dumps(payload),
368 endpoint=endpoint,
369 ssl_ctx=ssl_ctx)
370 response_json = json.loads(result)
371 except HTTPError as e:
372 self.mgr.log.debug(e)
373 except URLError:
374 raise cherrypy.HTTPError(502, f'Make sure the node-proxy agent is deployed and running on {hostname}')
375
376 return response_json
377
378 @cherrypy.expose
379 @cherrypy.tools.allow(methods=['GET'])
380 @cherrypy.tools.json_out()
381 def fullreport(self, **kw: Any) -> Dict[str, Any]:
382 """
383 Handles GET request to retrieve a full report.
384
385 This function is exposed to handle GET requests and retrieves a comprehensive
386 report using the 'fullreport' method from the NodeProxyCache class.
387
388 :param kw: Keyword arguments for the request.
389 :type kw: dict
390
391 :return: The full report data.
392 :rtype: dict[str, Any]
393
394 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
395 """
396 try:
397 results = self.mgr.node_proxy_cache.fullreport(**kw)
398 except KeyError:
399 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
400 return results
401
402 @cherrypy.expose
403 @cherrypy.tools.allow(methods=['GET'])
404 @cherrypy.tools.json_out()
405 def criticals(self, **kw: Any) -> Dict[str, Any]:
406 """
407 Handles GET request to retrieve critical information.
408
409 This function is exposed to handle GET requests and fetches critical data
410 using the 'criticals' method from the NodeProxyCache class.
411
412 :param kw: Keyword arguments for the request.
413 :type kw: dict
414
415 :return: Critical information data.
416 :rtype: dict[str, Any]
417
418 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
419 """
420 try:
421 results = self.mgr.node_proxy_cache.criticals(**kw)
422 except KeyError:
423 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
424 return results
425
426 @cherrypy.expose
427 @cherrypy.tools.allow(methods=['GET'])
428 @cherrypy.tools.json_out()
429 def summary(self, **kw: Any) -> Dict[str, Any]:
430 """
431 Handles GET request to retrieve summary information.
432
433 This function is exposed to handle GET requests and fetches summary
434 data using the 'summary' method from the NodeProxyCache class.
435
436 :param kw: Keyword arguments for the request.
437 :type kw: dict
438
439 :return: Summary information data.
440 :rtype: dict[str, Any]
441
442 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
443 """
444 try:
445 results = self.mgr.node_proxy_cache.summary(**kw)
446 except KeyError:
447 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
448 return results
449
450 @cherrypy.expose
451 @cherrypy.tools.allow(methods=['GET'])
452 @cherrypy.tools.json_out()
453 def memory(self, **kw: Any) -> Dict[str, Any]:
454 """
455 Handles GET request to retrieve specific information.
456
457 This function is exposed to handle GET requests
458 and fetch specific data using the 'common' method
459 from the NodeProxyCache class with.
460
461 :param kw: Keyword arguments for the request.
462 :type kw: dict
463
464 :return: Specific information data.
465 :rtype: dict[str, Any]
466
467 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
468 """
469 try:
470 results = self.mgr.node_proxy_cache.common('memory', **kw)
471 except KeyError:
472 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
473 return results
474
475 @cherrypy.expose
476 @cherrypy.tools.allow(methods=['GET'])
477 @cherrypy.tools.json_out()
478 def network(self, **kw: Any) -> Dict[str, Any]:
479 """
480 Handles GET request to retrieve specific information.
481
482 This function is exposed to handle GET requests
483 and fetch specific data using the 'common' method
484 from the NodeProxyCache class with.
485
486 :param kw: Keyword arguments for the request.
487 :type kw: dict
488
489 :return: Specific information data.
490 :rtype: dict[str, Any]
491
492 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
493 """
494 try:
495 results = self.mgr.node_proxy_cache.common('network', **kw)
496 except KeyError:
497 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
498 return results
499
500 @cherrypy.expose
501 @cherrypy.tools.allow(methods=['GET'])
502 @cherrypy.tools.json_out()
503 def processors(self, **kw: Any) -> Dict[str, Any]:
504 """
505 Handles GET request to retrieve specific information.
506
507 This function is exposed to handle GET requests
508 and fetch specific data using the 'common' method
509 from the NodeProxyCache class with.
510
511 :param kw: Keyword arguments for the request.
512 :type kw: dict
513
514 :return: Specific information data.
515 :rtype: dict[str, Any]
516
517 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
518 """
519 try:
520 results = self.mgr.node_proxy_cache.common('processors', **kw)
521 except KeyError:
522 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
523 return results
524
525 @cherrypy.expose
526 @cherrypy.tools.allow(methods=['GET'])
527 @cherrypy.tools.json_out()
528 def storage(self, **kw: Any) -> Dict[str, Any]:
529 """
530 Handles GET request to retrieve specific information.
531
532 This function is exposed to handle GET requests
533 and fetch specific data using the 'common' method
534 from the NodeProxyCache class with.
535
536 :param kw: Keyword arguments for the request.
537 :type kw: dict
538
539 :return: Specific information data.
540 :rtype: dict[str, Any]
541
542 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
543 """
544 try:
545 results = self.mgr.node_proxy_cache.common('storage', **kw)
546 except KeyError:
547 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
548 return results
549
550 @cherrypy.expose
551 @cherrypy.tools.allow(methods=['GET'])
552 @cherrypy.tools.json_out()
553 def power(self, **kw: Any) -> Dict[str, Any]:
554 """
555 Handles GET request to retrieve specific information.
556
557 This function is exposed to handle GET requests
558 and fetch specific data using the 'common' method
559 from the NodeProxyCache class with.
560
561 :param kw: Keyword arguments for the request.
562 :type kw: dict
563
564 :return: Specific information data.
565 :rtype: dict[str, Any]
566
567 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
568 """
569 try:
570 results = self.mgr.node_proxy_cache.common('power', **kw)
571 except KeyError:
572 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
573 return results
574
575 @cherrypy.expose
576 @cherrypy.tools.allow(methods=['GET'])
577 @cherrypy.tools.json_out()
578 def fans(self, **kw: Any) -> Dict[str, Any]:
579 """
580 Handles GET request to retrieve specific information.
581
582 This function is exposed to handle GET requests
583 and fetch specific data using the 'common' method
584 from the NodeProxyCache class with.
585
586 :param kw: Keyword arguments for the request.
587 :type kw: dict
588
589 :return: Specific information data.
590 :rtype: dict[str, Any]
591
592 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
593 """
594 try:
595 results = self.mgr.node_proxy_cache.common('fans', **kw)
596 except KeyError:
597 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
598 return results
599
600 @cherrypy.expose
601 @cherrypy.tools.allow(methods=['GET'])
602 @cherrypy.tools.json_out()
603 def firmwares(self, **kw: Any) -> Dict[str, Any]:
604 """
605 Handles GET request to retrieve firmware information.
606
607 This function is exposed to handle GET requests and fetches firmware data using
608 the 'firmwares' method from the NodeProxyCache class.
609
610 :param kw: Keyword arguments for the request.
611 :type kw: dict
612
613 :return: Firmware information data.
614 :rtype: dict[str, Any]
615
616 :raises cherrypy.HTTPError 404: If the passed hostname is not found.
617 """
618 try:
619 results = self.mgr.node_proxy_cache.firmwares(**kw)
620 except KeyError:
621 raise cherrypy.HTTPError(404, f"{kw.get('hostname')} not found.")
622 return results
623
624
1e59de90 625class HostData(Server):
20effc67
TL
626 exposed = True
627
1e59de90 628 def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str):
20effc67 629 self.mgr = mgr
1e59de90
TL
630 super().__init__()
631 self.socket_port = port
632 self.socket_host = host
633 self.subscribe()
634
635 def stop(self) -> None:
636 # we must call unsubscribe before stopping the server,
637 # otherwise the port is not released and we will get
638 # an exception when trying to restart it
639 self.unsubscribe()
640 super().stop()
20effc67 641
f38dd50b 642 @cherrypy.tools.allow(methods=['POST'])
20effc67
TL
643 @cherrypy.tools.json_in()
644 @cherrypy.tools.json_out()
f38dd50b
TL
645 @cherrypy.expose
646 def index(self) -> Dict[str, Any]:
20effc67
TL
647 data: Dict[str, Any] = cherrypy.request.json
648 results: Dict[str, Any] = {}
649 try:
650 self.check_request_fields(data)
651 except Exception as e:
652 results['result'] = f'Bad metadata: {e}'
653 self.mgr.log.warning(f'Received bad metadata from an agent: {e}')
654 else:
655 # if we got here, we've already verified the keyring of the agent. If
656 # host agent is reporting on is marked offline, it shouldn't be any more
657 self.mgr.offline_hosts_remove(data['host'])
658 results['result'] = self.handle_metadata(data)
659 return results
660
661 def check_request_fields(self, data: Dict[str, Any]) -> None:
662 fields = '{' + ', '.join([key for key in data.keys()]) + '}'
663 if 'host' not in data:
664 raise Exception(
665 f'No host in metadata from agent ("host" field). Only received fields {fields}')
666 host = data['host']
667 if host not in self.mgr.cache.get_hosts():
668 raise Exception(f'Received metadata from agent on unknown hostname {host}')
669 if 'keyring' not in data:
670 raise Exception(
671 f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
672 if host not in self.mgr.agent_cache.agent_keys:
673 raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
674 if data['keyring'] != self.mgr.agent_cache.agent_keys[host]:
675 raise Exception(f'Got wrong keyring from agent on host {host}.')
676 if 'port' not in data:
677 raise Exception(
678 f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
679 if 'ack' not in data:
680 raise Exception(
681 f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
682 try:
683 int(data['ack'])
684 except Exception as e:
685 raise Exception(
686 f'Counter value from agent on host {host} could not be converted to an integer: {e}')
687 metadata_types = ['ls', 'networks', 'facts', 'volume']
688 metadata_types_str = '{' + ', '.join(metadata_types) + '}'
689 if not all(item in data.keys() for item in metadata_types):
690 self.mgr.log.warning(
691 f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
692
693 def handle_metadata(self, data: Dict[str, Any]) -> str:
694 try:
695 host = data['host']
696 self.mgr.agent_cache.agent_ports[host] = int(data['port'])
697 if host not in self.mgr.agent_cache.agent_counter:
698 self.mgr.agent_cache.agent_counter[host] = 1
699 self.mgr.agent_helpers._request_agent_acks({host})
700 res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
701 self.mgr.log.debug(res)
702 return res
703
704 # update timestamp of most recent agent update
705 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
706
707 error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
708 daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
709
710 up_to_date = False
711
712 int_ack = int(data['ack'])
713 if int_ack == self.mgr.agent_cache.agent_counter[host]:
714 up_to_date = True
715 else:
716 # we got old counter value with message, inform agent of new timestamp
717 if not self.mgr.agent_cache.messaging_agent(host):
718 self.mgr.agent_helpers._request_agent_acks({host})
719 self.mgr.log.debug(
720 f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
721
722 if 'ls' in data and data['ls']:
723 self.mgr._process_ls_output(host, data['ls'])
724 self.mgr.update_failed_daemon_health_check()
725 if 'networks' in data and data['networks']:
726 self.mgr.cache.update_host_networks(host, data['networks'])
727 if 'facts' in data and data['facts']:
728 self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
729 if 'volume' in data and data['volume']:
730 ret = Devices.from_json(json.loads(data['volume']))
731 self.mgr.cache.update_host_devices(host, ret.devices)
732
733 if (
734 error_daemons_old != set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
735 or daemon_count_old != len(self.mgr.cache.get_daemons_by_host(host))
736 ):
737 self.mgr.log.debug(
738 f'Change detected in state of daemons from {host} agent metadata. Kicking serve loop')
739 self.mgr._kick_serve_loop()
740
741 if up_to_date and ('ls' in data and data['ls']):
742 was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date()
743 self.mgr.cache.metadata_up_to_date[host] = True
744 if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date():
745 self.mgr.log.debug(
746 'New metadata from agent has made all hosts up to date. Kicking serve loop')
747 self.mgr._kick_serve_loop()
748 self.mgr.log.debug(
749 f'Received up-to-date metadata from agent on host {host}.')
750
751 self.mgr.agent_cache.save_agent(host)
752 return 'Successfully processed metadata.'
753
754 except Exception as e:
755 err_str = f'Failed to update metadata with metadata from agent on host {host}: {e}'
756 self.mgr.log.warning(err_str)
757 return err_str
758
759
760class AgentMessageThread(threading.Thread):
761 def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
762 self.mgr = mgr
1e59de90 763 self.agent = mgr.http_server.agent
20effc67
TL
764 self.host = host
765 self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
766 self.port = port
767 self.data: str = json.dumps(data)
768 self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
f38dd50b 769 self.agent_response: str = ''
1e59de90 770 super().__init__(target=self.run)
20effc67
TL
771
772 def run(self) -> None:
773 self.mgr.log.debug(f'Sending message to agent on host {self.host}')
774 self.mgr.agent_cache.sending_agent_message[self.host] = True
775 try:
1e59de90
TL
776 assert self.agent
777 root_cert = self.agent.ssl_certs.get_root_cert()
20effc67
TL
778 root_cert_tmp = tempfile.NamedTemporaryFile()
779 root_cert_tmp.write(root_cert.encode('utf-8'))
780 root_cert_tmp.flush()
781 root_cert_fname = root_cert_tmp.name
782
1e59de90
TL
783 cert, key = self.agent.ssl_certs.generate_cert(
784 self.mgr.get_hostname(), self.mgr.get_mgr_ip())
20effc67
TL
785
786 cert_tmp = tempfile.NamedTemporaryFile()
787 cert_tmp.write(cert.encode('utf-8'))
788 cert_tmp.flush()
789 cert_fname = cert_tmp.name
790
791 key_tmp = tempfile.NamedTemporaryFile()
792 key_tmp.write(key.encode('utf-8'))
793 key_tmp.flush()
794 key_fname = key_tmp.name
795
796 ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=root_cert_fname)
797 ssl_ctx.verify_mode = ssl.CERT_REQUIRED
798 ssl_ctx.check_hostname = True
799 ssl_ctx.load_cert_chain(cert_fname, key_fname)
800 except Exception as e:
801 self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}')
802 self.mgr.agent_cache.sending_agent_message[self.host] = False
803 return
804 try:
805 bytes_len: str = str(len(self.data.encode('utf-8')))
806 if len(bytes_len.encode('utf-8')) > 10:
807 raise Exception(
808 f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
809 while len(bytes_len.encode('utf-8')) < 10:
810 bytes_len = '0' + bytes_len
811 except Exception as e:
812 self.mgr.log.error(f'Failed to get length of json payload: {e}')
813 self.mgr.agent_cache.sending_agent_message[self.host] = False
814 return
815 for retry_wait in [3, 5]:
816 try:
817 agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
818 secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr)
819 secure_agent_socket.connect((self.addr, self.port))
820 msg = (bytes_len + self.data)
821 secure_agent_socket.sendall(msg.encode('utf-8'))
f38dd50b
TL
822 self.agent_response = secure_agent_socket.recv(1024).decode()
823 self.mgr.log.debug(f'Received "{self.agent_response}" from agent on host {self.host}')
20effc67
TL
824 if self.daemon_spec:
825 self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec)
826 self.mgr.agent_cache.sending_agent_message[self.host] = False
827 return
828 except ConnectionError as e:
829 # if it's a connection error, possibly try to connect again.
830 # We could have just deployed agent and it might not be ready
831 self.mgr.log.debug(
832 f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
833 time.sleep(retry_wait)
834 except Exception as e:
835 # if it's not a connection error, something has gone wrong. Give up.
836 self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}')
837 self.mgr.agent_cache.sending_agent_message[self.host] = False
838 return
839 self.mgr.log.error(f'Could not connect to agent on host {self.host}')
840 self.mgr.agent_cache.sending_agent_message[self.host] = False
841 return
842
f38dd50b
TL
843 def get_agent_response(self) -> str:
844 return self.agent_response
845
20effc67
TL
846
847class CephadmAgentHelpers:
848 def __init__(self, mgr: "CephadmOrchestrator"):
849 self.mgr: "CephadmOrchestrator" = mgr
1e59de90 850 self.agent = mgr.http_server.agent
20effc67
TL
851
852 def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
853 for host in hosts:
854 if increment:
855 self.mgr.cache.metadata_up_to_date[host] = False
856 if host not in self.mgr.agent_cache.agent_counter:
857 self.mgr.agent_cache.agent_counter[host] = 1
858 elif increment:
859 self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1
860 payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]}
861 if daemon_spec:
862 payload['config'] = daemon_spec.final_config
863 message_thread = AgentMessageThread(
864 host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
865 message_thread.start()
866
867 def _request_ack_all_not_up_to_date(self) -> None:
868 self.mgr.agent_helpers._request_agent_acks(
869 set([h for h in self.mgr.cache.get_hosts() if
870 (not self.mgr.cache.host_metadata_up_to_date(h)
871 and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]))
872
873 def _agent_down(self, host: str) -> bool:
874 # if host is draining or drained (has _no_schedule label) there should not
875 # be an agent deployed there and therefore we should return False
aee94f69 876 if self.mgr.cache.is_host_draining(host):
20effc67
TL
877 return False
878 # if we haven't deployed an agent on the host yet, don't say an agent is down
879 if not self.mgr.cache.get_daemons_by_type('agent', host=host):
880 return False
881 # if we don't have a timestamp, it's likely because of a mgr fail over.
882 # just set the timestamp to now. However, if host was offline before, we
883 # should not allow creating a new timestamp to cause it to be marked online
884 if host not in self.mgr.agent_cache.agent_timestamp:
885 if host in self.mgr.offline_hosts:
886 return False
887 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
888 # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
889 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
890 time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host]
891 if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate):
892 return True
893 return False
894
895 def _update_agent_down_healthcheck(self, down_agent_hosts: List[str]) -> None:
896 self.mgr.remove_health_warning('CEPHADM_AGENT_DOWN')
897 if down_agent_hosts:
898 detail: List[str] = []
899 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
900 for agent in down_agent_hosts:
901 detail.append((f'Cephadm agent on host {agent} has not reported in '
902 f'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
903 'down and host may be offline.'))
904 for dd in [d for d in self.mgr.cache.get_daemons_by_type('agent') if d.hostname in down_agent_hosts]:
905 dd.status = DaemonDescriptionStatus.error
906 self.mgr.set_health_warning(
907 'CEPHADM_AGENT_DOWN',
908 summary='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % (
909 len(down_agent_hosts)),
910 count=len(down_agent_hosts),
911 detail=detail,
912 )
913
914 # this function probably seems very unnecessary, but it makes it considerably easier
915 # to get the unit tests working. All unit tests that check which daemons were deployed
916 # or services setup would have to be individually changed to expect an agent service or
917 # daemons, OR we can put this in its own function then mock the function
918 def _apply_agent(self) -> None:
919 spec = ServiceSpec(
920 service_type='agent',
921 placement=PlacementSpec(host_pattern='*')
922 )
923 self.mgr.spec_store.save(spec)
924
925 def _handle_use_agent_setting(self) -> bool:
926 need_apply = False
927 if self.mgr.use_agent:
928 # on the off chance there are still agents hanging around from
929 # when we turned the config option off, we need to redeploy them
930 # we can tell they're in that state if we don't have a keyring for
931 # them in the host cache
932 for agent in self.mgr.cache.get_daemons_by_service('agent'):
933 if agent.hostname not in self.mgr.agent_cache.agent_keys:
934 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
935 if 'agent' not in self.mgr.spec_store:
936 self.mgr.agent_helpers._apply_agent()
937 need_apply = True
938 else:
939 if 'agent' in self.mgr.spec_store:
940 self.mgr.spec_store.rm('agent')
941 need_apply = True
f38dd50b
TL
942 if not self.mgr.cache.get_daemons_by_service('agent'):
943 self.mgr.agent_cache.agent_counter = {}
944 self.mgr.agent_cache.agent_timestamp = {}
945 self.mgr.agent_cache.agent_keys = {}
946 self.mgr.agent_cache.agent_ports = {}
20effc67
TL
947 return need_apply
948
949 def _check_agent(self, host: str) -> bool:
950 down = False
951 try:
1e59de90
TL
952 assert self.agent
953 assert self.agent.ssl_certs.get_root_cert()
20effc67
TL
954 except Exception:
955 self.mgr.log.debug(
956 f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
957 return down
958 if self.mgr.agent_helpers._agent_down(host):
959 down = True
960 try:
961 agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
962 assert agent.daemon_id is not None
963 assert agent.hostname is not None
964 except Exception as e:
965 self.mgr.log.debug(
966 f'Could not retrieve agent on host {host} from daemon cache: {e}')
967 return down
968 try:
969 spec = self.mgr.spec_store.active_specs.get('agent', None)
970 deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
971 last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host)
972 if not last_config or last_deps != deps:
973 # if root cert is the dep that changed, we must use ssh to reconfig
974 # so it's necessary to check this one specifically
975 root_cert_match = False
976 try:
1e59de90 977 root_cert = self.agent.ssl_certs.get_root_cert()
20effc67
TL
978 if last_deps and root_cert in last_deps:
979 root_cert_match = True
980 except Exception:
981 pass
982 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
983 # we need to know the agent port to try to reconfig w/ http
984 # otherwise there is no choice but a full ssh reconfig
985 if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
986 daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
987 daemon_spec.daemon_type)].prepare_create(daemon_spec)
988 self.mgr.agent_helpers._request_agent_acks(
989 hosts={daemon_spec.host},
990 increment=True,
991 daemon_spec=daemon_spec,
992 )
993 else:
994 self.mgr._daemon_action(daemon_spec, action='reconfig')
995 return down
996 except Exception as e:
997 self.mgr.log.debug(
998 f'Agent on host {host} not ready to have config and deps checked: {e}')
999 action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
1000 if action:
1001 try:
1002 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
1003 self.mgr._daemon_action(daemon_spec, action=action)
1004 self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
1005 except Exception as e:
1006 self.mgr.log.debug(
1007 f'Agent on host {host} not ready to {action}: {e}')
1008 return down