]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/agent.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / agent.py
CommitLineData
1e59de90
TL
1try:
2 import cherrypy
3 from cherrypy._cpserver import Server
4except ImportError:
5 # to avoid sphinx build crash
6 class Server: # type: ignore
7 pass
8
20effc67
TL
9import json
10import logging
11import socket
12import ssl
13import tempfile
14import threading
15import time
16
1e59de90 17from orchestrator import DaemonDescriptionStatus
20effc67
TL
18from orchestrator._interface import daemon_type_to_service
19from ceph.utils import datetime_now
20from ceph.deployment.inventory import Devices
21from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
1e59de90
TL
22from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
23from cephadm.ssl_cert_utils import SSLCerts
24from mgr_util import test_port_allocation, PortAlreadyInUse
20effc67 25
1e59de90 26from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
20effc67
TL
27
28if TYPE_CHECKING:
29 from cephadm.module import CephadmOrchestrator
30
31
32def cherrypy_filter(record: logging.LogRecord) -> int:
33 blocked = [
34 'TLSV1_ALERT_DECRYPT_ERROR'
35 ]
36 msg = record.getMessage()
37 return not any([m for m in blocked if m in msg])
38
39
40logging.getLogger('cherrypy.error').addFilter(cherrypy_filter)
41cherrypy.log.access_log.propagate = False
42
43
1e59de90
TL
44class AgentEndpoint:
45
46 KV_STORE_AGENT_ROOT_CERT = 'cephadm_agent/root/cert'
47 KV_STORE_AGENT_ROOT_KEY = 'cephadm_agent/root/key'
48
20effc67
TL
49 def __init__(self, mgr: "CephadmOrchestrator") -> None:
50 self.mgr = mgr
1e59de90 51 self.ssl_certs = SSLCerts()
20effc67
TL
52 self.server_port = 7150
53 self.server_addr = self.mgr.get_mgr_ip()
1e59de90
TL
54
55 def configure_routes(self) -> None:
33c7a0ef 56 d = cherrypy.dispatch.RoutesDispatcher()
1e59de90
TL
57 d.connect(name='host-data', route='/data/',
58 controller=self.host_data.POST,
33c7a0ef 59 conditions=dict(method=['POST']))
1e59de90 60 cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}})
33c7a0ef 61
1e59de90
TL
62 def configure_tls(self, server: Server) -> None:
63 old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT)
64 old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY)
65 if old_cert and old_key:
66 self.ssl_certs.load_root_credentials(old_cert, old_key)
67 else:
68 self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip())
69 self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert())
70 self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key())
33c7a0ef 71
1e59de90
TL
72 host = self.mgr.get_hostname()
73 addr = self.mgr.get_mgr_ip()
74 server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr)
20effc67 75
1e59de90
TL
76 def find_free_port(self) -> None:
77 max_port = self.server_port + 150
78 while self.server_port <= max_port:
20effc67 79 try:
1e59de90
TL
80 test_port_allocation(self.server_addr, self.server_port)
81 self.host_data.socket_port = self.server_port
82 self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}')
20effc67 83 return
1e59de90 84 except PortAlreadyInUse:
20effc67 85 self.server_port += 1
1e59de90 86 self.mgr.log.error(f'Cephadm agent could not find free port in range {max_port - 150}-{max_port} and failed to start')
20effc67 87
1e59de90
TL
88 def configure(self) -> None:
89 self.host_data = HostData(self.mgr, self.server_port, self.server_addr)
90 self.configure_tls(self.host_data)
91 self.configure_routes()
92 self.find_free_port()
20effc67 93
33c7a0ef 94
1e59de90 95class HostData(Server):
20effc67
TL
96 exposed = True
97
1e59de90 98 def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str):
20effc67 99 self.mgr = mgr
1e59de90
TL
100 super().__init__()
101 self.socket_port = port
102 self.socket_host = host
103 self.subscribe()
104
105 def stop(self) -> None:
106 # we must call unsubscribe before stopping the server,
107 # otherwise the port is not released and we will get
108 # an exception when trying to restart it
109 self.unsubscribe()
110 super().stop()
20effc67
TL
111
112 @cherrypy.tools.json_in()
113 @cherrypy.tools.json_out()
114 def POST(self) -> Dict[str, Any]:
115 data: Dict[str, Any] = cherrypy.request.json
116 results: Dict[str, Any] = {}
117 try:
118 self.check_request_fields(data)
119 except Exception as e:
120 results['result'] = f'Bad metadata: {e}'
121 self.mgr.log.warning(f'Received bad metadata from an agent: {e}')
122 else:
123 # if we got here, we've already verified the keyring of the agent. If
124 # host agent is reporting on is marked offline, it shouldn't be any more
125 self.mgr.offline_hosts_remove(data['host'])
126 results['result'] = self.handle_metadata(data)
127 return results
128
129 def check_request_fields(self, data: Dict[str, Any]) -> None:
130 fields = '{' + ', '.join([key for key in data.keys()]) + '}'
131 if 'host' not in data:
132 raise Exception(
133 f'No host in metadata from agent ("host" field). Only received fields {fields}')
134 host = data['host']
135 if host not in self.mgr.cache.get_hosts():
136 raise Exception(f'Received metadata from agent on unknown hostname {host}')
137 if 'keyring' not in data:
138 raise Exception(
139 f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
140 if host not in self.mgr.agent_cache.agent_keys:
141 raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent')
142 if data['keyring'] != self.mgr.agent_cache.agent_keys[host]:
143 raise Exception(f'Got wrong keyring from agent on host {host}.')
144 if 'port' not in data:
145 raise Exception(
146 f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
147 if 'ack' not in data:
148 raise Exception(
149 f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
150 try:
151 int(data['ack'])
152 except Exception as e:
153 raise Exception(
154 f'Counter value from agent on host {host} could not be converted to an integer: {e}')
155 metadata_types = ['ls', 'networks', 'facts', 'volume']
156 metadata_types_str = '{' + ', '.join(metadata_types) + '}'
157 if not all(item in data.keys() for item in metadata_types):
158 self.mgr.log.warning(
159 f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
160
161 def handle_metadata(self, data: Dict[str, Any]) -> str:
162 try:
163 host = data['host']
164 self.mgr.agent_cache.agent_ports[host] = int(data['port'])
165 if host not in self.mgr.agent_cache.agent_counter:
166 self.mgr.agent_cache.agent_counter[host] = 1
167 self.mgr.agent_helpers._request_agent_acks({host})
168 res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
169 self.mgr.log.debug(res)
170 return res
171
172 # update timestamp of most recent agent update
173 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
174
175 error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
176 daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host))
177
178 up_to_date = False
179
180 int_ack = int(data['ack'])
181 if int_ack == self.mgr.agent_cache.agent_counter[host]:
182 up_to_date = True
183 else:
184 # we got old counter value with message, inform agent of new timestamp
185 if not self.mgr.agent_cache.messaging_agent(host):
186 self.mgr.agent_helpers._request_agent_acks({host})
187 self.mgr.log.debug(
188 f'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
189
190 if 'ls' in data and data['ls']:
191 self.mgr._process_ls_output(host, data['ls'])
192 self.mgr.update_failed_daemon_health_check()
193 if 'networks' in data and data['networks']:
194 self.mgr.cache.update_host_networks(host, data['networks'])
195 if 'facts' in data and data['facts']:
196 self.mgr.cache.update_host_facts(host, json.loads(data['facts']))
197 if 'volume' in data and data['volume']:
198 ret = Devices.from_json(json.loads(data['volume']))
199 self.mgr.cache.update_host_devices(host, ret.devices)
200
201 if (
202 error_daemons_old != set([dd.name() for dd in self.mgr.cache.get_error_daemons()])
203 or daemon_count_old != len(self.mgr.cache.get_daemons_by_host(host))
204 ):
205 self.mgr.log.debug(
206 f'Change detected in state of daemons from {host} agent metadata. Kicking serve loop')
207 self.mgr._kick_serve_loop()
208
209 if up_to_date and ('ls' in data and data['ls']):
210 was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date()
211 self.mgr.cache.metadata_up_to_date[host] = True
212 if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date():
213 self.mgr.log.debug(
214 'New metadata from agent has made all hosts up to date. Kicking serve loop')
215 self.mgr._kick_serve_loop()
216 self.mgr.log.debug(
217 f'Received up-to-date metadata from agent on host {host}.')
218
219 self.mgr.agent_cache.save_agent(host)
220 return 'Successfully processed metadata.'
221
222 except Exception as e:
223 err_str = f'Failed to update metadata with metadata from agent on host {host}: {e}'
224 self.mgr.log.warning(err_str)
225 return err_str
226
227
228class AgentMessageThread(threading.Thread):
229 def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
230 self.mgr = mgr
1e59de90 231 self.agent = mgr.http_server.agent
20effc67
TL
232 self.host = host
233 self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
234 self.port = port
235 self.data: str = json.dumps(data)
236 self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec
1e59de90 237 super().__init__(target=self.run)
20effc67
TL
238
239 def run(self) -> None:
240 self.mgr.log.debug(f'Sending message to agent on host {self.host}')
241 self.mgr.agent_cache.sending_agent_message[self.host] = True
242 try:
1e59de90
TL
243 assert self.agent
244 root_cert = self.agent.ssl_certs.get_root_cert()
20effc67
TL
245 root_cert_tmp = tempfile.NamedTemporaryFile()
246 root_cert_tmp.write(root_cert.encode('utf-8'))
247 root_cert_tmp.flush()
248 root_cert_fname = root_cert_tmp.name
249
1e59de90
TL
250 cert, key = self.agent.ssl_certs.generate_cert(
251 self.mgr.get_hostname(), self.mgr.get_mgr_ip())
20effc67
TL
252
253 cert_tmp = tempfile.NamedTemporaryFile()
254 cert_tmp.write(cert.encode('utf-8'))
255 cert_tmp.flush()
256 cert_fname = cert_tmp.name
257
258 key_tmp = tempfile.NamedTemporaryFile()
259 key_tmp.write(key.encode('utf-8'))
260 key_tmp.flush()
261 key_fname = key_tmp.name
262
263 ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=root_cert_fname)
264 ssl_ctx.verify_mode = ssl.CERT_REQUIRED
265 ssl_ctx.check_hostname = True
266 ssl_ctx.load_cert_chain(cert_fname, key_fname)
267 except Exception as e:
268 self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}')
269 self.mgr.agent_cache.sending_agent_message[self.host] = False
270 return
271 try:
272 bytes_len: str = str(len(self.data.encode('utf-8')))
273 if len(bytes_len.encode('utf-8')) > 10:
274 raise Exception(
275 f'Message is too big to send to agent. Message size is {bytes_len} bytes!')
276 while len(bytes_len.encode('utf-8')) < 10:
277 bytes_len = '0' + bytes_len
278 except Exception as e:
279 self.mgr.log.error(f'Failed to get length of json payload: {e}')
280 self.mgr.agent_cache.sending_agent_message[self.host] = False
281 return
282 for retry_wait in [3, 5]:
283 try:
284 agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
285 secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr)
286 secure_agent_socket.connect((self.addr, self.port))
287 msg = (bytes_len + self.data)
288 secure_agent_socket.sendall(msg.encode('utf-8'))
289 agent_response = secure_agent_socket.recv(1024).decode()
290 self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}')
291 if self.daemon_spec:
292 self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec)
293 self.mgr.agent_cache.sending_agent_message[self.host] = False
294 return
295 except ConnectionError as e:
296 # if it's a connection error, possibly try to connect again.
297 # We could have just deployed agent and it might not be ready
298 self.mgr.log.debug(
299 f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
300 time.sleep(retry_wait)
301 except Exception as e:
302 # if it's not a connection error, something has gone wrong. Give up.
303 self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}')
304 self.mgr.agent_cache.sending_agent_message[self.host] = False
305 return
306 self.mgr.log.error(f'Could not connect to agent on host {self.host}')
307 self.mgr.agent_cache.sending_agent_message[self.host] = False
308 return
309
310
311class CephadmAgentHelpers:
312 def __init__(self, mgr: "CephadmOrchestrator"):
313 self.mgr: "CephadmOrchestrator" = mgr
1e59de90 314 self.agent = mgr.http_server.agent
20effc67
TL
315
316 def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None:
317 for host in hosts:
318 if increment:
319 self.mgr.cache.metadata_up_to_date[host] = False
320 if host not in self.mgr.agent_cache.agent_counter:
321 self.mgr.agent_cache.agent_counter[host] = 1
322 elif increment:
323 self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1
324 payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]}
325 if daemon_spec:
326 payload['config'] = daemon_spec.final_config
327 message_thread = AgentMessageThread(
328 host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
329 message_thread.start()
330
331 def _request_ack_all_not_up_to_date(self) -> None:
332 self.mgr.agent_helpers._request_agent_acks(
333 set([h for h in self.mgr.cache.get_hosts() if
334 (not self.mgr.cache.host_metadata_up_to_date(h)
335 and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))]))
336
337 def _agent_down(self, host: str) -> bool:
338 # if host is draining or drained (has _no_schedule label) there should not
339 # be an agent deployed there and therefore we should return False
aee94f69 340 if self.mgr.cache.is_host_draining(host):
20effc67
TL
341 return False
342 # if we haven't deployed an agent on the host yet, don't say an agent is down
343 if not self.mgr.cache.get_daemons_by_type('agent', host=host):
344 return False
345 # if we don't have a timestamp, it's likely because of a mgr fail over.
346 # just set the timestamp to now. However, if host was offline before, we
347 # should not allow creating a new timestamp to cause it to be marked online
348 if host not in self.mgr.agent_cache.agent_timestamp:
349 if host in self.mgr.offline_hosts:
350 return False
351 self.mgr.agent_cache.agent_timestamp[host] = datetime_now()
352 # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
353 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
354 time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host]
355 if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate):
356 return True
357 return False
358
359 def _update_agent_down_healthcheck(self, down_agent_hosts: List[str]) -> None:
360 self.mgr.remove_health_warning('CEPHADM_AGENT_DOWN')
361 if down_agent_hosts:
362 detail: List[str] = []
363 down_mult: float = max(self.mgr.agent_down_multiplier, 1.5)
364 for agent in down_agent_hosts:
365 detail.append((f'Cephadm agent on host {agent} has not reported in '
366 f'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
367 'down and host may be offline.'))
368 for dd in [d for d in self.mgr.cache.get_daemons_by_type('agent') if d.hostname in down_agent_hosts]:
369 dd.status = DaemonDescriptionStatus.error
370 self.mgr.set_health_warning(
371 'CEPHADM_AGENT_DOWN',
372 summary='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % (
373 len(down_agent_hosts)),
374 count=len(down_agent_hosts),
375 detail=detail,
376 )
377
378 # this function probably seems very unnecessary, but it makes it considerably easier
379 # to get the unit tests working. All unit tests that check which daemons were deployed
380 # or services setup would have to be individually changed to expect an agent service or
381 # daemons, OR we can put this in its own function then mock the function
382 def _apply_agent(self) -> None:
383 spec = ServiceSpec(
384 service_type='agent',
385 placement=PlacementSpec(host_pattern='*')
386 )
387 self.mgr.spec_store.save(spec)
388
389 def _handle_use_agent_setting(self) -> bool:
390 need_apply = False
391 if self.mgr.use_agent:
392 # on the off chance there are still agents hanging around from
393 # when we turned the config option off, we need to redeploy them
394 # we can tell they're in that state if we don't have a keyring for
395 # them in the host cache
396 for agent in self.mgr.cache.get_daemons_by_service('agent'):
397 if agent.hostname not in self.mgr.agent_cache.agent_keys:
398 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
399 if 'agent' not in self.mgr.spec_store:
400 self.mgr.agent_helpers._apply_agent()
401 need_apply = True
402 else:
403 if 'agent' in self.mgr.spec_store:
404 self.mgr.spec_store.rm('agent')
405 need_apply = True
406 self.mgr.agent_cache.agent_counter = {}
407 self.mgr.agent_cache.agent_timestamp = {}
408 self.mgr.agent_cache.agent_keys = {}
409 self.mgr.agent_cache.agent_ports = {}
410 return need_apply
411
412 def _check_agent(self, host: str) -> bool:
413 down = False
414 try:
1e59de90
TL
415 assert self.agent
416 assert self.agent.ssl_certs.get_root_cert()
20effc67
TL
417 except Exception:
418 self.mgr.log.debug(
419 f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
420 return down
421 if self.mgr.agent_helpers._agent_down(host):
422 down = True
423 try:
424 agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0]
425 assert agent.daemon_id is not None
426 assert agent.hostname is not None
427 except Exception as e:
428 self.mgr.log.debug(
429 f'Could not retrieve agent on host {host} from daemon cache: {e}')
430 return down
431 try:
432 spec = self.mgr.spec_store.active_specs.get('agent', None)
433 deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id)
434 last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host)
435 if not last_config or last_deps != deps:
436 # if root cert is the dep that changed, we must use ssh to reconfig
437 # so it's necessary to check this one specifically
438 root_cert_match = False
439 try:
1e59de90 440 root_cert = self.agent.ssl_certs.get_root_cert()
20effc67
TL
441 if last_deps and root_cert in last_deps:
442 root_cert_match = True
443 except Exception:
444 pass
445 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
446 # we need to know the agent port to try to reconfig w/ http
447 # otherwise there is no choice but a full ssh reconfig
448 if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down:
449 daemon_spec = self.mgr.cephadm_services[daemon_type_to_service(
450 daemon_spec.daemon_type)].prepare_create(daemon_spec)
451 self.mgr.agent_helpers._request_agent_acks(
452 hosts={daemon_spec.host},
453 increment=True,
454 daemon_spec=daemon_spec,
455 )
456 else:
457 self.mgr._daemon_action(daemon_spec, action='reconfig')
458 return down
459 except Exception as e:
460 self.mgr.log.debug(
461 f'Agent on host {host} not ready to have config and deps checked: {e}')
462 action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name())
463 if action:
464 try:
465 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent)
466 self.mgr._daemon_action(daemon_spec, action=action)
467 self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name())
468 except Exception as e:
469 self.mgr.log.debug(
470 f'Agent on host {host} not ready to {action}: {e}')
471 return down