]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | try: |
2 | import cherrypy | |
3 | from cherrypy._cpserver import Server | |
4 | except ImportError: | |
5 | # to avoid sphinx build crash | |
6 | class Server: # type: ignore | |
7 | pass | |
8 | ||
20effc67 TL |
9 | import json |
10 | import logging | |
11 | import socket | |
12 | import ssl | |
13 | import tempfile | |
14 | import threading | |
15 | import time | |
16 | ||
1e59de90 | 17 | from orchestrator import DaemonDescriptionStatus |
20effc67 TL |
18 | from orchestrator._interface import daemon_type_to_service |
19 | from ceph.utils import datetime_now | |
20 | from ceph.deployment.inventory import Devices | |
21 | from ceph.deployment.service_spec import ServiceSpec, PlacementSpec | |
1e59de90 TL |
22 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec |
23 | from cephadm.ssl_cert_utils import SSLCerts | |
24 | from mgr_util import test_port_allocation, PortAlreadyInUse | |
20effc67 | 25 | |
1e59de90 | 26 | from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional |
20effc67 TL |
27 | |
28 | if TYPE_CHECKING: | |
29 | from cephadm.module import CephadmOrchestrator | |
30 | ||
31 | ||
32 | def cherrypy_filter(record: logging.LogRecord) -> int: | |
33 | blocked = [ | |
34 | 'TLSV1_ALERT_DECRYPT_ERROR' | |
35 | ] | |
36 | msg = record.getMessage() | |
37 | return not any([m for m in blocked if m in msg]) | |
38 | ||
39 | ||
40 | logging.getLogger('cherrypy.error').addFilter(cherrypy_filter) | |
41 | cherrypy.log.access_log.propagate = False | |
42 | ||
43 | ||
1e59de90 TL |
44 | class AgentEndpoint: |
45 | ||
46 | KV_STORE_AGENT_ROOT_CERT = 'cephadm_agent/root/cert' | |
47 | KV_STORE_AGENT_ROOT_KEY = 'cephadm_agent/root/key' | |
48 | ||
20effc67 TL |
49 | def __init__(self, mgr: "CephadmOrchestrator") -> None: |
50 | self.mgr = mgr | |
1e59de90 | 51 | self.ssl_certs = SSLCerts() |
20effc67 TL |
52 | self.server_port = 7150 |
53 | self.server_addr = self.mgr.get_mgr_ip() | |
1e59de90 TL |
54 | |
55 | def configure_routes(self) -> None: | |
33c7a0ef | 56 | d = cherrypy.dispatch.RoutesDispatcher() |
1e59de90 TL |
57 | d.connect(name='host-data', route='/data/', |
58 | controller=self.host_data.POST, | |
33c7a0ef | 59 | conditions=dict(method=['POST'])) |
1e59de90 | 60 | cherrypy.tree.mount(None, '/', config={'/': {'request.dispatch': d}}) |
33c7a0ef | 61 | |
1e59de90 TL |
62 | def configure_tls(self, server: Server) -> None: |
63 | old_cert = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_CERT) | |
64 | old_key = self.mgr.get_store(self.KV_STORE_AGENT_ROOT_KEY) | |
65 | if old_cert and old_key: | |
66 | self.ssl_certs.load_root_credentials(old_cert, old_key) | |
67 | else: | |
68 | self.ssl_certs.generate_root_cert(self.mgr.get_mgr_ip()) | |
69 | self.mgr.set_store(self.KV_STORE_AGENT_ROOT_CERT, self.ssl_certs.get_root_cert()) | |
70 | self.mgr.set_store(self.KV_STORE_AGENT_ROOT_KEY, self.ssl_certs.get_root_key()) | |
33c7a0ef | 71 | |
1e59de90 TL |
72 | host = self.mgr.get_hostname() |
73 | addr = self.mgr.get_mgr_ip() | |
74 | server.ssl_certificate, server.ssl_private_key = self.ssl_certs.generate_cert_files(host, addr) | |
20effc67 | 75 | |
1e59de90 TL |
76 | def find_free_port(self) -> None: |
77 | max_port = self.server_port + 150 | |
78 | while self.server_port <= max_port: | |
20effc67 | 79 | try: |
1e59de90 TL |
80 | test_port_allocation(self.server_addr, self.server_port) |
81 | self.host_data.socket_port = self.server_port | |
82 | self.mgr.log.debug(f'Cephadm agent endpoint using {self.server_port}') | |
20effc67 | 83 | return |
1e59de90 | 84 | except PortAlreadyInUse: |
20effc67 | 85 | self.server_port += 1 |
1e59de90 | 86 | self.mgr.log.error(f'Cephadm agent could not find free port in range {max_port - 150}-{max_port} and failed to start') |
20effc67 | 87 | |
1e59de90 TL |
88 | def configure(self) -> None: |
89 | self.host_data = HostData(self.mgr, self.server_port, self.server_addr) | |
90 | self.configure_tls(self.host_data) | |
91 | self.configure_routes() | |
92 | self.find_free_port() | |
20effc67 | 93 | |
33c7a0ef | 94 | |
1e59de90 | 95 | class HostData(Server): |
20effc67 TL |
96 | exposed = True |
97 | ||
1e59de90 | 98 | def __init__(self, mgr: "CephadmOrchestrator", port: int, host: str): |
20effc67 | 99 | self.mgr = mgr |
1e59de90 TL |
100 | super().__init__() |
101 | self.socket_port = port | |
102 | self.socket_host = host | |
103 | self.subscribe() | |
104 | ||
105 | def stop(self) -> None: | |
106 | # we must call unsubscribe before stopping the server, | |
107 | # otherwise the port is not released and we will get | |
108 | # an exception when trying to restart it | |
109 | self.unsubscribe() | |
110 | super().stop() | |
20effc67 TL |
111 | |
112 | @cherrypy.tools.json_in() | |
113 | @cherrypy.tools.json_out() | |
114 | def POST(self) -> Dict[str, Any]: | |
115 | data: Dict[str, Any] = cherrypy.request.json | |
116 | results: Dict[str, Any] = {} | |
117 | try: | |
118 | self.check_request_fields(data) | |
119 | except Exception as e: | |
120 | results['result'] = f'Bad metadata: {e}' | |
121 | self.mgr.log.warning(f'Received bad metadata from an agent: {e}') | |
122 | else: | |
123 | # if we got here, we've already verified the keyring of the agent. If | |
124 | # host agent is reporting on is marked offline, it shouldn't be any more | |
125 | self.mgr.offline_hosts_remove(data['host']) | |
126 | results['result'] = self.handle_metadata(data) | |
127 | return results | |
128 | ||
129 | def check_request_fields(self, data: Dict[str, Any]) -> None: | |
130 | fields = '{' + ', '.join([key for key in data.keys()]) + '}' | |
131 | if 'host' not in data: | |
132 | raise Exception( | |
133 | f'No host in metadata from agent ("host" field). Only received fields {fields}') | |
134 | host = data['host'] | |
135 | if host not in self.mgr.cache.get_hosts(): | |
136 | raise Exception(f'Received metadata from agent on unknown hostname {host}') | |
137 | if 'keyring' not in data: | |
138 | raise Exception( | |
139 | f'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}') | |
140 | if host not in self.mgr.agent_cache.agent_keys: | |
141 | raise Exception(f'No agent keyring stored for host {host}. Cannot verify agent') | |
142 | if data['keyring'] != self.mgr.agent_cache.agent_keys[host]: | |
143 | raise Exception(f'Got wrong keyring from agent on host {host}.') | |
144 | if 'port' not in data: | |
145 | raise Exception( | |
146 | f'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}') | |
147 | if 'ack' not in data: | |
148 | raise Exception( | |
149 | f'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}') | |
150 | try: | |
151 | int(data['ack']) | |
152 | except Exception as e: | |
153 | raise Exception( | |
154 | f'Counter value from agent on host {host} could not be converted to an integer: {e}') | |
155 | metadata_types = ['ls', 'networks', 'facts', 'volume'] | |
156 | metadata_types_str = '{' + ', '.join(metadata_types) + '}' | |
157 | if not all(item in data.keys() for item in metadata_types): | |
158 | self.mgr.log.warning( | |
159 | f'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}') | |
160 | ||
161 | def handle_metadata(self, data: Dict[str, Any]) -> str: | |
162 | try: | |
163 | host = data['host'] | |
164 | self.mgr.agent_cache.agent_ports[host] = int(data['port']) | |
165 | if host not in self.mgr.agent_cache.agent_counter: | |
166 | self.mgr.agent_cache.agent_counter[host] = 1 | |
167 | self.mgr.agent_helpers._request_agent_acks({host}) | |
168 | res = f'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata' | |
169 | self.mgr.log.debug(res) | |
170 | return res | |
171 | ||
172 | # update timestamp of most recent agent update | |
173 | self.mgr.agent_cache.agent_timestamp[host] = datetime_now() | |
174 | ||
175 | error_daemons_old = set([dd.name() for dd in self.mgr.cache.get_error_daemons()]) | |
176 | daemon_count_old = len(self.mgr.cache.get_daemons_by_host(host)) | |
177 | ||
178 | up_to_date = False | |
179 | ||
180 | int_ack = int(data['ack']) | |
181 | if int_ack == self.mgr.agent_cache.agent_counter[host]: | |
182 | up_to_date = True | |
183 | else: | |
184 | # we got old counter value with message, inform agent of new timestamp | |
185 | if not self.mgr.agent_cache.messaging_agent(host): | |
186 | self.mgr.agent_helpers._request_agent_acks({host}) | |
187 | self.mgr.log.debug( | |
188 | f'Received old metadata from agent on host {host}. Requested up-to-date metadata.') | |
189 | ||
190 | if 'ls' in data and data['ls']: | |
191 | self.mgr._process_ls_output(host, data['ls']) | |
192 | self.mgr.update_failed_daemon_health_check() | |
193 | if 'networks' in data and data['networks']: | |
194 | self.mgr.cache.update_host_networks(host, data['networks']) | |
195 | if 'facts' in data and data['facts']: | |
196 | self.mgr.cache.update_host_facts(host, json.loads(data['facts'])) | |
197 | if 'volume' in data and data['volume']: | |
198 | ret = Devices.from_json(json.loads(data['volume'])) | |
199 | self.mgr.cache.update_host_devices(host, ret.devices) | |
200 | ||
201 | if ( | |
202 | error_daemons_old != set([dd.name() for dd in self.mgr.cache.get_error_daemons()]) | |
203 | or daemon_count_old != len(self.mgr.cache.get_daemons_by_host(host)) | |
204 | ): | |
205 | self.mgr.log.debug( | |
206 | f'Change detected in state of daemons from {host} agent metadata. Kicking serve loop') | |
207 | self.mgr._kick_serve_loop() | |
208 | ||
209 | if up_to_date and ('ls' in data and data['ls']): | |
210 | was_out_of_date = not self.mgr.cache.all_host_metadata_up_to_date() | |
211 | self.mgr.cache.metadata_up_to_date[host] = True | |
212 | if was_out_of_date and self.mgr.cache.all_host_metadata_up_to_date(): | |
213 | self.mgr.log.debug( | |
214 | 'New metadata from agent has made all hosts up to date. Kicking serve loop') | |
215 | self.mgr._kick_serve_loop() | |
216 | self.mgr.log.debug( | |
217 | f'Received up-to-date metadata from agent on host {host}.') | |
218 | ||
219 | self.mgr.agent_cache.save_agent(host) | |
220 | return 'Successfully processed metadata.' | |
221 | ||
222 | except Exception as e: | |
223 | err_str = f'Failed to update metadata with metadata from agent on host {host}: {e}' | |
224 | self.mgr.log.warning(err_str) | |
225 | return err_str | |
226 | ||
227 | ||
228 | class AgentMessageThread(threading.Thread): | |
229 | def __init__(self, host: str, port: int, data: Dict[Any, Any], mgr: "CephadmOrchestrator", daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None: | |
230 | self.mgr = mgr | |
1e59de90 | 231 | self.agent = mgr.http_server.agent |
20effc67 TL |
232 | self.host = host |
233 | self.addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host | |
234 | self.port = port | |
235 | self.data: str = json.dumps(data) | |
236 | self.daemon_spec: Optional[CephadmDaemonDeploySpec] = daemon_spec | |
1e59de90 | 237 | super().__init__(target=self.run) |
20effc67 TL |
238 | |
239 | def run(self) -> None: | |
240 | self.mgr.log.debug(f'Sending message to agent on host {self.host}') | |
241 | self.mgr.agent_cache.sending_agent_message[self.host] = True | |
242 | try: | |
1e59de90 TL |
243 | assert self.agent |
244 | root_cert = self.agent.ssl_certs.get_root_cert() | |
20effc67 TL |
245 | root_cert_tmp = tempfile.NamedTemporaryFile() |
246 | root_cert_tmp.write(root_cert.encode('utf-8')) | |
247 | root_cert_tmp.flush() | |
248 | root_cert_fname = root_cert_tmp.name | |
249 | ||
1e59de90 TL |
250 | cert, key = self.agent.ssl_certs.generate_cert( |
251 | self.mgr.get_hostname(), self.mgr.get_mgr_ip()) | |
20effc67 TL |
252 | |
253 | cert_tmp = tempfile.NamedTemporaryFile() | |
254 | cert_tmp.write(cert.encode('utf-8')) | |
255 | cert_tmp.flush() | |
256 | cert_fname = cert_tmp.name | |
257 | ||
258 | key_tmp = tempfile.NamedTemporaryFile() | |
259 | key_tmp.write(key.encode('utf-8')) | |
260 | key_tmp.flush() | |
261 | key_fname = key_tmp.name | |
262 | ||
263 | ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=root_cert_fname) | |
264 | ssl_ctx.verify_mode = ssl.CERT_REQUIRED | |
265 | ssl_ctx.check_hostname = True | |
266 | ssl_ctx.load_cert_chain(cert_fname, key_fname) | |
267 | except Exception as e: | |
268 | self.mgr.log.error(f'Failed to get certs for connecting to agent: {e}') | |
269 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
270 | return | |
271 | try: | |
272 | bytes_len: str = str(len(self.data.encode('utf-8'))) | |
273 | if len(bytes_len.encode('utf-8')) > 10: | |
274 | raise Exception( | |
275 | f'Message is too big to send to agent. Message size is {bytes_len} bytes!') | |
276 | while len(bytes_len.encode('utf-8')) < 10: | |
277 | bytes_len = '0' + bytes_len | |
278 | except Exception as e: | |
279 | self.mgr.log.error(f'Failed to get length of json payload: {e}') | |
280 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
281 | return | |
282 | for retry_wait in [3, 5]: | |
283 | try: | |
284 | agent_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
285 | secure_agent_socket = ssl_ctx.wrap_socket(agent_socket, server_hostname=self.addr) | |
286 | secure_agent_socket.connect((self.addr, self.port)) | |
287 | msg = (bytes_len + self.data) | |
288 | secure_agent_socket.sendall(msg.encode('utf-8')) | |
289 | agent_response = secure_agent_socket.recv(1024).decode() | |
290 | self.mgr.log.debug(f'Received "{agent_response}" from agent on host {self.host}') | |
291 | if self.daemon_spec: | |
292 | self.mgr.agent_cache.agent_config_successfully_delivered(self.daemon_spec) | |
293 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
294 | return | |
295 | except ConnectionError as e: | |
296 | # if it's a connection error, possibly try to connect again. | |
297 | # We could have just deployed agent and it might not be ready | |
298 | self.mgr.log.debug( | |
299 | f'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}') | |
300 | time.sleep(retry_wait) | |
301 | except Exception as e: | |
302 | # if it's not a connection error, something has gone wrong. Give up. | |
303 | self.mgr.log.error(f'Failed to contact agent on host {self.host}: {e}') | |
304 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
305 | return | |
306 | self.mgr.log.error(f'Could not connect to agent on host {self.host}') | |
307 | self.mgr.agent_cache.sending_agent_message[self.host] = False | |
308 | return | |
309 | ||
310 | ||
311 | class CephadmAgentHelpers: | |
312 | def __init__(self, mgr: "CephadmOrchestrator"): | |
313 | self.mgr: "CephadmOrchestrator" = mgr | |
1e59de90 | 314 | self.agent = mgr.http_server.agent |
20effc67 TL |
315 | |
316 | def _request_agent_acks(self, hosts: Set[str], increment: bool = False, daemon_spec: Optional[CephadmDaemonDeploySpec] = None) -> None: | |
317 | for host in hosts: | |
318 | if increment: | |
319 | self.mgr.cache.metadata_up_to_date[host] = False | |
320 | if host not in self.mgr.agent_cache.agent_counter: | |
321 | self.mgr.agent_cache.agent_counter[host] = 1 | |
322 | elif increment: | |
323 | self.mgr.agent_cache.agent_counter[host] = self.mgr.agent_cache.agent_counter[host] + 1 | |
324 | payload: Dict[str, Any] = {'counter': self.mgr.agent_cache.agent_counter[host]} | |
325 | if daemon_spec: | |
326 | payload['config'] = daemon_spec.final_config | |
327 | message_thread = AgentMessageThread( | |
328 | host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec) | |
329 | message_thread.start() | |
330 | ||
331 | def _request_ack_all_not_up_to_date(self) -> None: | |
332 | self.mgr.agent_helpers._request_agent_acks( | |
333 | set([h for h in self.mgr.cache.get_hosts() if | |
334 | (not self.mgr.cache.host_metadata_up_to_date(h) | |
335 | and h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])) | |
336 | ||
337 | def _agent_down(self, host: str) -> bool: | |
338 | # if host is draining or drained (has _no_schedule label) there should not | |
339 | # be an agent deployed there and therefore we should return False | |
aee94f69 | 340 | if self.mgr.cache.is_host_draining(host): |
20effc67 TL |
341 | return False |
342 | # if we haven't deployed an agent on the host yet, don't say an agent is down | |
343 | if not self.mgr.cache.get_daemons_by_type('agent', host=host): | |
344 | return False | |
345 | # if we don't have a timestamp, it's likely because of a mgr fail over. | |
346 | # just set the timestamp to now. However, if host was offline before, we | |
347 | # should not allow creating a new timestamp to cause it to be marked online | |
348 | if host not in self.mgr.agent_cache.agent_timestamp: | |
349 | if host in self.mgr.offline_hosts: | |
350 | return False | |
351 | self.mgr.agent_cache.agent_timestamp[host] = datetime_now() | |
352 | # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it. | |
353 | down_mult: float = max(self.mgr.agent_down_multiplier, 1.5) | |
354 | time_diff = datetime_now() - self.mgr.agent_cache.agent_timestamp[host] | |
355 | if time_diff.total_seconds() > down_mult * float(self.mgr.agent_refresh_rate): | |
356 | return True | |
357 | return False | |
358 | ||
359 | def _update_agent_down_healthcheck(self, down_agent_hosts: List[str]) -> None: | |
360 | self.mgr.remove_health_warning('CEPHADM_AGENT_DOWN') | |
361 | if down_agent_hosts: | |
362 | detail: List[str] = [] | |
363 | down_mult: float = max(self.mgr.agent_down_multiplier, 1.5) | |
364 | for agent in down_agent_hosts: | |
365 | detail.append((f'Cephadm agent on host {agent} has not reported in ' | |
366 | f'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed ' | |
367 | 'down and host may be offline.')) | |
368 | for dd in [d for d in self.mgr.cache.get_daemons_by_type('agent') if d.hostname in down_agent_hosts]: | |
369 | dd.status = DaemonDescriptionStatus.error | |
370 | self.mgr.set_health_warning( | |
371 | 'CEPHADM_AGENT_DOWN', | |
372 | summary='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % ( | |
373 | len(down_agent_hosts)), | |
374 | count=len(down_agent_hosts), | |
375 | detail=detail, | |
376 | ) | |
377 | ||
378 | # this function probably seems very unnecessary, but it makes it considerably easier | |
379 | # to get the unit tests working. All unit tests that check which daemons were deployed | |
380 | # or services setup would have to be individually changed to expect an agent service or | |
381 | # daemons, OR we can put this in its own function then mock the function | |
382 | def _apply_agent(self) -> None: | |
383 | spec = ServiceSpec( | |
384 | service_type='agent', | |
385 | placement=PlacementSpec(host_pattern='*') | |
386 | ) | |
387 | self.mgr.spec_store.save(spec) | |
388 | ||
389 | def _handle_use_agent_setting(self) -> bool: | |
390 | need_apply = False | |
391 | if self.mgr.use_agent: | |
392 | # on the off chance there are still agents hanging around from | |
393 | # when we turned the config option off, we need to redeploy them | |
394 | # we can tell they're in that state if we don't have a keyring for | |
395 | # them in the host cache | |
396 | for agent in self.mgr.cache.get_daemons_by_service('agent'): | |
397 | if agent.hostname not in self.mgr.agent_cache.agent_keys: | |
398 | self.mgr._schedule_daemon_action(agent.name(), 'redeploy') | |
399 | if 'agent' not in self.mgr.spec_store: | |
400 | self.mgr.agent_helpers._apply_agent() | |
401 | need_apply = True | |
402 | else: | |
403 | if 'agent' in self.mgr.spec_store: | |
404 | self.mgr.spec_store.rm('agent') | |
405 | need_apply = True | |
406 | self.mgr.agent_cache.agent_counter = {} | |
407 | self.mgr.agent_cache.agent_timestamp = {} | |
408 | self.mgr.agent_cache.agent_keys = {} | |
409 | self.mgr.agent_cache.agent_ports = {} | |
410 | return need_apply | |
411 | ||
412 | def _check_agent(self, host: str) -> bool: | |
413 | down = False | |
414 | try: | |
1e59de90 TL |
415 | assert self.agent |
416 | assert self.agent.ssl_certs.get_root_cert() | |
20effc67 TL |
417 | except Exception: |
418 | self.mgr.log.debug( | |
419 | f'Delaying checking agent on {host} until cephadm endpoint finished creating root cert') | |
420 | return down | |
421 | if self.mgr.agent_helpers._agent_down(host): | |
422 | down = True | |
423 | try: | |
424 | agent = self.mgr.cache.get_daemons_by_type('agent', host=host)[0] | |
425 | assert agent.daemon_id is not None | |
426 | assert agent.hostname is not None | |
427 | except Exception as e: | |
428 | self.mgr.log.debug( | |
429 | f'Could not retrieve agent on host {host} from daemon cache: {e}') | |
430 | return down | |
431 | try: | |
432 | spec = self.mgr.spec_store.active_specs.get('agent', None) | |
433 | deps = self.mgr._calc_daemon_deps(spec, 'agent', agent.daemon_id) | |
434 | last_deps, last_config = self.mgr.agent_cache.get_agent_last_config_deps(host) | |
435 | if not last_config or last_deps != deps: | |
436 | # if root cert is the dep that changed, we must use ssh to reconfig | |
437 | # so it's necessary to check this one specifically | |
438 | root_cert_match = False | |
439 | try: | |
1e59de90 | 440 | root_cert = self.agent.ssl_certs.get_root_cert() |
20effc67 TL |
441 | if last_deps and root_cert in last_deps: |
442 | root_cert_match = True | |
443 | except Exception: | |
444 | pass | |
445 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent) | |
446 | # we need to know the agent port to try to reconfig w/ http | |
447 | # otherwise there is no choice but a full ssh reconfig | |
448 | if host in self.mgr.agent_cache.agent_ports and root_cert_match and not down: | |
449 | daemon_spec = self.mgr.cephadm_services[daemon_type_to_service( | |
450 | daemon_spec.daemon_type)].prepare_create(daemon_spec) | |
451 | self.mgr.agent_helpers._request_agent_acks( | |
452 | hosts={daemon_spec.host}, | |
453 | increment=True, | |
454 | daemon_spec=daemon_spec, | |
455 | ) | |
456 | else: | |
457 | self.mgr._daemon_action(daemon_spec, action='reconfig') | |
458 | return down | |
459 | except Exception as e: | |
460 | self.mgr.log.debug( | |
461 | f'Agent on host {host} not ready to have config and deps checked: {e}') | |
462 | action = self.mgr.cache.get_scheduled_daemon_action(agent.hostname, agent.name()) | |
463 | if action: | |
464 | try: | |
465 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(agent) | |
466 | self.mgr._daemon_action(daemon_spec, action=action) | |
467 | self.mgr.cache.rm_scheduled_daemon_action(agent.hostname, agent.name()) | |
468 | except Exception as e: | |
469 | self.mgr.log.debug( | |
470 | f'Agent on host {host} not ready to {action}: {e}') | |
471 | return down |