]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/agent.py
3 from cherrypy
._cpserver
import Server
5 # to avoid sphinx build crash
6 class Server
: # type: ignore
17 from orchestrator
import DaemonDescriptionStatus
18 from orchestrator
._interface
import daemon_type_to_service
19 from ceph
.utils
import datetime_now
20 from ceph
.deployment
.inventory
import Devices
21 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
22 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
23 from cephadm
.ssl_cert_utils
import SSLCerts
24 from mgr_util
import test_port_allocation
, PortAlreadyInUse
26 from typing
import Any
, Dict
, List
, Set
, TYPE_CHECKING
, Optional
29 from cephadm
.module
import CephadmOrchestrator
32 def cherrypy_filter(record
: logging
.LogRecord
) -> int:
34 'TLSV1_ALERT_DECRYPT_ERROR'
36 msg
= record
.getMessage()
37 return not any([m
for m
in blocked
if m
in msg
])
40 logging
.getLogger('cherrypy.error').addFilter(cherrypy_filter
)
41 cherrypy
.log
.access_log
.propagate
= False
46 KV_STORE_AGENT_ROOT_CERT
= 'cephadm_agent/root/cert'
47 KV_STORE_AGENT_ROOT_KEY
= 'cephadm_agent/root/key'
49 def __init__(self
, mgr
: "CephadmOrchestrator") -> None:
51 self
.ssl_certs
= SSLCerts()
52 self
.server_port
= 7150
53 self
.server_addr
= self
.mgr
.get_mgr_ip()
55 def configure_routes(self
) -> None:
56 d
= cherrypy
.dispatch
.RoutesDispatcher()
57 d
.connect(name
='host-data', route
='/data/',
58 controller
=self
.host_data
.POST
,
59 conditions
=dict(method
=['POST']))
60 cherrypy
.tree
.mount(None, '/', config
={'/': {'request.dispatch': d
}})
62 def configure_tls(self
, server
: Server
) -> None:
63 old_cert
= self
.mgr
.get_store(self
.KV_STORE_AGENT_ROOT_CERT
)
64 old_key
= self
.mgr
.get_store(self
.KV_STORE_AGENT_ROOT_KEY
)
65 if old_cert
and old_key
:
66 self
.ssl_certs
.load_root_credentials(old_cert
, old_key
)
68 self
.ssl_certs
.generate_root_cert(self
.mgr
.get_mgr_ip())
69 self
.mgr
.set_store(self
.KV_STORE_AGENT_ROOT_CERT
, self
.ssl_certs
.get_root_cert())
70 self
.mgr
.set_store(self
.KV_STORE_AGENT_ROOT_KEY
, self
.ssl_certs
.get_root_key())
72 host
= self
.mgr
.get_hostname()
73 addr
= self
.mgr
.get_mgr_ip()
74 server
.ssl_certificate
, server
.ssl_private_key
= self
.ssl_certs
.generate_cert_files(host
, addr
)
76 def find_free_port(self
) -> None:
77 max_port
= self
.server_port
+ 150
78 while self
.server_port
<= max_port
:
80 test_port_allocation(self
.server_addr
, self
.server_port
)
81 self
.host_data
.socket_port
= self
.server_port
82 self
.mgr
.log
.debug(f
'Cephadm agent endpoint using {self.server_port}')
84 except PortAlreadyInUse
:
86 self
.mgr
.log
.error(f
'Cephadm agent could not find free port in range {max_port - 150}-{max_port} and failed to start')
88 def configure(self
) -> None:
89 self
.host_data
= HostData(self
.mgr
, self
.server_port
, self
.server_addr
)
90 self
.configure_tls(self
.host_data
)
91 self
.configure_routes()
95 class HostData(Server
):
98 def __init__(self
, mgr
: "CephadmOrchestrator", port
: int, host
: str):
101 self
.socket_port
= port
102 self
.socket_host
= host
105 def stop(self
) -> None:
106 # we must call unsubscribe before stopping the server,
107 # otherwise the port is not released and we will get
108 # an exception when trying to restart it
112 @cherrypy.tools
.json_in()
113 @cherrypy.tools
.json_out()
114 def POST(self
) -> Dict
[str, Any
]:
115 data
: Dict
[str, Any
] = cherrypy
.request
.json
116 results
: Dict
[str, Any
] = {}
118 self
.check_request_fields(data
)
119 except Exception as e
:
120 results
['result'] = f
'Bad metadata: {e}'
121 self
.mgr
.log
.warning(f
'Received bad metadata from an agent: {e}')
123 # if we got here, we've already verified the keyring of the agent. If
124 # host agent is reporting on is marked offline, it shouldn't be any more
125 self
.mgr
.offline_hosts_remove(data
['host'])
126 results
['result'] = self
.handle_metadata(data
)
129 def check_request_fields(self
, data
: Dict
[str, Any
]) -> None:
130 fields
= '{' + ', '.join([key
for key
in data
.keys()]) + '}'
131 if 'host' not in data
:
133 f
'No host in metadata from agent ("host" field). Only received fields {fields}')
135 if host
not in self
.mgr
.cache
.get_hosts():
136 raise Exception(f
'Received metadata from agent on unknown hostname {host}')
137 if 'keyring' not in data
:
139 f
'Agent on host {host} not reporting its keyring for validation ("keyring" field). Only received fields {fields}')
140 if host
not in self
.mgr
.agent_cache
.agent_keys
:
141 raise Exception(f
'No agent keyring stored for host {host}. Cannot verify agent')
142 if data
['keyring'] != self
.mgr
.agent_cache
.agent_keys
[host
]:
143 raise Exception(f
'Got wrong keyring from agent on host {host}.')
144 if 'port' not in data
:
146 f
'Agent on host {host} not reporting its listener port ("port" fields). Only received fields {fields}')
147 if 'ack' not in data
:
149 f
'Agent on host {host} not reporting its counter value ("ack" field). Only received fields {fields}')
152 except Exception as e
:
154 f
'Counter value from agent on host {host} could not be converted to an integer: {e}')
155 metadata_types
= ['ls', 'networks', 'facts', 'volume']
156 metadata_types_str
= '{' + ', '.join(metadata_types
) + '}'
157 if not all(item
in data
.keys() for item
in metadata_types
):
158 self
.mgr
.log
.warning(
159 f
'Agent on host {host} reported incomplete metadata. Not all of {metadata_types_str} were present. Received fields {fields}')
161 def handle_metadata(self
, data
: Dict
[str, Any
]) -> str:
164 self
.mgr
.agent_cache
.agent_ports
[host
] = int(data
['port'])
165 if host
not in self
.mgr
.agent_cache
.agent_counter
:
166 self
.mgr
.agent_cache
.agent_counter
[host
] = 1
167 self
.mgr
.agent_helpers
._request
_agent
_acks
({host}
)
168 res
= f
'Got metadata from agent on host {host} with no known counter entry. Starting counter at 1 and requesting new metadata'
169 self
.mgr
.log
.debug(res
)
172 # update timestamp of most recent agent update
173 self
.mgr
.agent_cache
.agent_timestamp
[host
] = datetime_now()
175 error_daemons_old
= set([dd
.name() for dd
in self
.mgr
.cache
.get_error_daemons()])
176 daemon_count_old
= len(self
.mgr
.cache
.get_daemons_by_host(host
))
180 int_ack
= int(data
['ack'])
181 if int_ack
== self
.mgr
.agent_cache
.agent_counter
[host
]:
184 # we got old counter value with message, inform agent of new timestamp
185 if not self
.mgr
.agent_cache
.messaging_agent(host
):
186 self
.mgr
.agent_helpers
._request
_agent
_acks
({host}
)
188 f
'Received old metadata from agent on host {host}. Requested up-to-date metadata.')
190 if 'ls' in data
and data
['ls']:
191 self
.mgr
._process
_ls
_output
(host
, data
['ls'])
192 self
.mgr
.update_failed_daemon_health_check()
193 if 'networks' in data
and data
['networks']:
194 self
.mgr
.cache
.update_host_networks(host
, data
['networks'])
195 if 'facts' in data
and data
['facts']:
196 self
.mgr
.cache
.update_host_facts(host
, json
.loads(data
['facts']))
197 if 'volume' in data
and data
['volume']:
198 ret
= Devices
.from_json(json
.loads(data
['volume']))
199 self
.mgr
.cache
.update_host_devices(host
, ret
.devices
)
202 error_daemons_old
!= set([dd
.name() for dd
in self
.mgr
.cache
.get_error_daemons()])
203 or daemon_count_old
!= len(self
.mgr
.cache
.get_daemons_by_host(host
))
206 f
'Change detected in state of daemons from {host} agent metadata. Kicking serve loop')
207 self
.mgr
._kick
_serve
_loop
()
209 if up_to_date
and ('ls' in data
and data
['ls']):
210 was_out_of_date
= not self
.mgr
.cache
.all_host_metadata_up_to_date()
211 self
.mgr
.cache
.metadata_up_to_date
[host
] = True
212 if was_out_of_date
and self
.mgr
.cache
.all_host_metadata_up_to_date():
214 'New metadata from agent has made all hosts up to date. Kicking serve loop')
215 self
.mgr
._kick
_serve
_loop
()
217 f
'Received up-to-date metadata from agent on host {host}.')
219 self
.mgr
.agent_cache
.save_agent(host
)
220 return 'Successfully processed metadata.'
222 except Exception as e
:
223 err_str
= f
'Failed to update metadata with metadata from agent on host {host}: {e}'
224 self
.mgr
.log
.warning(err_str
)
228 class AgentMessageThread(threading
.Thread
):
229 def __init__(self
, host
: str, port
: int, data
: Dict
[Any
, Any
], mgr
: "CephadmOrchestrator", daemon_spec
: Optional
[CephadmDaemonDeploySpec
] = None) -> None:
231 self
.agent
= mgr
.http_server
.agent
233 self
.addr
= self
.mgr
.inventory
.get_addr(host
) if host
in self
.mgr
.inventory
else host
235 self
.data
: str = json
.dumps(data
)
236 self
.daemon_spec
: Optional
[CephadmDaemonDeploySpec
] = daemon_spec
237 super().__init
__(target
=self
.run
)
239 def run(self
) -> None:
240 self
.mgr
.log
.debug(f
'Sending message to agent on host {self.host}')
241 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = True
244 root_cert
= self
.agent
.ssl_certs
.get_root_cert()
245 root_cert_tmp
= tempfile
.NamedTemporaryFile()
246 root_cert_tmp
.write(root_cert
.encode('utf-8'))
247 root_cert_tmp
.flush()
248 root_cert_fname
= root_cert_tmp
.name
250 cert
, key
= self
.agent
.ssl_certs
.generate_cert(
251 self
.mgr
.get_hostname(), self
.mgr
.get_mgr_ip())
253 cert_tmp
= tempfile
.NamedTemporaryFile()
254 cert_tmp
.write(cert
.encode('utf-8'))
256 cert_fname
= cert_tmp
.name
258 key_tmp
= tempfile
.NamedTemporaryFile()
259 key_tmp
.write(key
.encode('utf-8'))
261 key_fname
= key_tmp
.name
263 ssl_ctx
= ssl
.create_default_context(ssl
.Purpose
.SERVER_AUTH
, cafile
=root_cert_fname
)
264 ssl_ctx
.verify_mode
= ssl
.CERT_REQUIRED
265 ssl_ctx
.check_hostname
= True
266 ssl_ctx
.load_cert_chain(cert_fname
, key_fname
)
267 except Exception as e
:
268 self
.mgr
.log
.error(f
'Failed to get certs for connecting to agent: {e}')
269 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
272 bytes_len
: str = str(len(self
.data
.encode('utf-8')))
273 if len(bytes_len
.encode('utf-8')) > 10:
275 f
'Message is too big to send to agent. Message size is {bytes_len} bytes!')
276 while len(bytes_len
.encode('utf-8')) < 10:
277 bytes_len
= '0' + bytes_len
278 except Exception as e
:
279 self
.mgr
.log
.error(f
'Failed to get length of json payload: {e}')
280 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
282 for retry_wait
in [3, 5]:
284 agent_socket
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
285 secure_agent_socket
= ssl_ctx
.wrap_socket(agent_socket
, server_hostname
=self
.addr
)
286 secure_agent_socket
.connect((self
.addr
, self
.port
))
287 msg
= (bytes_len
+ self
.data
)
288 secure_agent_socket
.sendall(msg
.encode('utf-8'))
289 agent_response
= secure_agent_socket
.recv(1024).decode()
290 self
.mgr
.log
.debug(f
'Received "{agent_response}" from agent on host {self.host}')
292 self
.mgr
.agent_cache
.agent_config_successfully_delivered(self
.daemon_spec
)
293 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
295 except ConnectionError
as e
:
296 # if it's a connection error, possibly try to connect again.
297 # We could have just deployed agent and it might not be ready
299 f
'Retrying connection to agent on {self.host} in {str(retry_wait)} seconds. Connection failed with: {e}')
300 time
.sleep(retry_wait
)
301 except Exception as e
:
302 # if it's not a connection error, something has gone wrong. Give up.
303 self
.mgr
.log
.error(f
'Failed to contact agent on host {self.host}: {e}')
304 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
306 self
.mgr
.log
.error(f
'Could not connect to agent on host {self.host}')
307 self
.mgr
.agent_cache
.sending_agent_message
[self
.host
] = False
311 class CephadmAgentHelpers
:
312 def __init__(self
, mgr
: "CephadmOrchestrator"):
313 self
.mgr
: "CephadmOrchestrator" = mgr
314 self
.agent
= mgr
.http_server
.agent
316 def _request_agent_acks(self
, hosts
: Set
[str], increment
: bool = False, daemon_spec
: Optional
[CephadmDaemonDeploySpec
] = None) -> None:
319 self
.mgr
.cache
.metadata_up_to_date
[host
] = False
320 if host
not in self
.mgr
.agent_cache
.agent_counter
:
321 self
.mgr
.agent_cache
.agent_counter
[host
] = 1
323 self
.mgr
.agent_cache
.agent_counter
[host
] = self
.mgr
.agent_cache
.agent_counter
[host
] + 1
324 payload
: Dict
[str, Any
] = {'counter': self
.mgr
.agent_cache
.agent_counter
[host
]}
326 payload
['config'] = daemon_spec
.final_config
327 message_thread
= AgentMessageThread(
328 host
, self
.mgr
.agent_cache
.agent_ports
[host
], payload
, self
.mgr
, daemon_spec
)
329 message_thread
.start()
331 def _request_ack_all_not_up_to_date(self
) -> None:
332 self
.mgr
.agent_helpers
._request
_agent
_acks
(
333 set([h
for h
in self
.mgr
.cache
.get_hosts() if
334 (not self
.mgr
.cache
.host_metadata_up_to_date(h
)
335 and h
in self
.mgr
.agent_cache
.agent_ports
and not self
.mgr
.agent_cache
.messaging_agent(h
))]))
337 def _agent_down(self
, host
: str) -> bool:
338 # if host is draining or drained (has _no_schedule label) there should not
339 # be an agent deployed there and therefore we should return False
340 if self
.mgr
.cache
.is_host_draining(host
):
342 # if we haven't deployed an agent on the host yet, don't say an agent is down
343 if not self
.mgr
.cache
.get_daemons_by_type('agent', host
=host
):
345 # if we don't have a timestamp, it's likely because of a mgr fail over.
346 # just set the timestamp to now. However, if host was offline before, we
347 # should not allow creating a new timestamp to cause it to be marked online
348 if host
not in self
.mgr
.agent_cache
.agent_timestamp
:
349 if host
in self
.mgr
.offline_hosts
:
351 self
.mgr
.agent_cache
.agent_timestamp
[host
] = datetime_now()
352 # agent hasn't reported in down multiplier * it's refresh rate. Something is likely wrong with it.
353 down_mult
: float = max(self
.mgr
.agent_down_multiplier
, 1.5)
354 time_diff
= datetime_now() - self
.mgr
.agent_cache
.agent_timestamp
[host
]
355 if time_diff
.total_seconds() > down_mult
* float(self
.mgr
.agent_refresh_rate
):
359 def _update_agent_down_healthcheck(self
, down_agent_hosts
: List
[str]) -> None:
360 self
.mgr
.remove_health_warning('CEPHADM_AGENT_DOWN')
362 detail
: List
[str] = []
363 down_mult
: float = max(self
.mgr
.agent_down_multiplier
, 1.5)
364 for agent
in down_agent_hosts
:
365 detail
.append((f
'Cephadm agent on host {agent} has not reported in '
366 f
'{down_mult * self.mgr.agent_refresh_rate} seconds. Agent is assumed '
367 'down and host may be offline.'))
368 for dd
in [d
for d
in self
.mgr
.cache
.get_daemons_by_type('agent') if d
.hostname
in down_agent_hosts
]:
369 dd
.status
= DaemonDescriptionStatus
.error
370 self
.mgr
.set_health_warning(
371 'CEPHADM_AGENT_DOWN',
372 summary
='%d Cephadm Agent(s) are not reporting. Hosts may be offline' % (
373 len(down_agent_hosts
)),
374 count
=len(down_agent_hosts
),
378 # this function probably seems very unnecessary, but it makes it considerably easier
379 # to get the unit tests working. All unit tests that check which daemons were deployed
380 # or services setup would have to be individually changed to expect an agent service or
381 # daemons, OR we can put this in its own function then mock the function
382 def _apply_agent(self
) -> None:
384 service_type
='agent',
385 placement
=PlacementSpec(host_pattern
='*')
387 self
.mgr
.spec_store
.save(spec
)
389 def _handle_use_agent_setting(self
) -> bool:
391 if self
.mgr
.use_agent
:
392 # on the off chance there are still agents hanging around from
393 # when we turned the config option off, we need to redeploy them
394 # we can tell they're in that state if we don't have a keyring for
395 # them in the host cache
396 for agent
in self
.mgr
.cache
.get_daemons_by_service('agent'):
397 if agent
.hostname
not in self
.mgr
.agent_cache
.agent_keys
:
398 self
.mgr
._schedule
_daemon
_action
(agent
.name(), 'redeploy')
399 if 'agent' not in self
.mgr
.spec_store
:
400 self
.mgr
.agent_helpers
._apply
_agent
()
403 if 'agent' in self
.mgr
.spec_store
:
404 self
.mgr
.spec_store
.rm('agent')
406 self
.mgr
.agent_cache
.agent_counter
= {}
407 self
.mgr
.agent_cache
.agent_timestamp
= {}
408 self
.mgr
.agent_cache
.agent_keys
= {}
409 self
.mgr
.agent_cache
.agent_ports
= {}
412 def _check_agent(self
, host
: str) -> bool:
416 assert self
.agent
.ssl_certs
.get_root_cert()
419 f
'Delaying checking agent on {host} until cephadm endpoint finished creating root cert')
421 if self
.mgr
.agent_helpers
._agent
_down
(host
):
424 agent
= self
.mgr
.cache
.get_daemons_by_type('agent', host
=host
)[0]
425 assert agent
.daemon_id
is not None
426 assert agent
.hostname
is not None
427 except Exception as e
:
429 f
'Could not retrieve agent on host {host} from daemon cache: {e}')
432 spec
= self
.mgr
.spec_store
.active_specs
.get('agent', None)
433 deps
= self
.mgr
._calc
_daemon
_deps
(spec
, 'agent', agent
.daemon_id
)
434 last_deps
, last_config
= self
.mgr
.agent_cache
.get_agent_last_config_deps(host
)
435 if not last_config
or last_deps
!= deps
:
436 # if root cert is the dep that changed, we must use ssh to reconfig
437 # so it's necessary to check this one specifically
438 root_cert_match
= False
440 root_cert
= self
.agent
.ssl_certs
.get_root_cert()
441 if last_deps
and root_cert
in last_deps
:
442 root_cert_match
= True
445 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(agent
)
446 # we need to know the agent port to try to reconfig w/ http
447 # otherwise there is no choice but a full ssh reconfig
448 if host
in self
.mgr
.agent_cache
.agent_ports
and root_cert_match
and not down
:
449 daemon_spec
= self
.mgr
.cephadm_services
[daemon_type_to_service(
450 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
451 self
.mgr
.agent_helpers
._request
_agent
_acks
(
452 hosts
={daemon_spec
.host
},
454 daemon_spec
=daemon_spec
,
457 self
.mgr
._daemon
_action
(daemon_spec
, action
='reconfig')
459 except Exception as e
:
461 f
'Agent on host {host} not ready to have config and deps checked: {e}')
462 action
= self
.mgr
.cache
.get_scheduled_daemon_action(agent
.hostname
, agent
.name())
465 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(agent
)
466 self
.mgr
._daemon
_action
(daemon_spec
, action
=action
)
467 self
.mgr
.cache
.rm_scheduled_daemon_action(agent
.hostname
, agent
.name())
468 except Exception as e
:
470 f
'Agent on host {host} not ready to {action}: {e}')