]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/serve.py
877a00cf714b94f9178701772f4ff6cc45e64192
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
1 import ipaddress
2 import hashlib
3 import json
4 import logging
5 import uuid
6 import os
7 from collections import defaultdict
8 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
9 DefaultDict
10
11 from ceph.deployment import inventory
12 from ceph.deployment.drive_group import DriveGroupSpec
13 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec, RGWSpec
14 from ceph.utils import datetime_now
15
16 import orchestrator
17 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
18 DaemonDescriptionStatus, daemon_type_to_service
19 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
20 from cephadm.schedule import HostAssignment
21 from cephadm.autotune import MemoryAutotuner
22 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
23 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
24 from mgr_module import MonCommandFailed
25 from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
26
27 from . import utils
28
29 if TYPE_CHECKING:
30 from cephadm.module import CephadmOrchestrator
31
32 logger = logging.getLogger(__name__)
33
34 REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
35
36
37 class CephadmServe:
38 """
39 This module contains functions that are executed in the
40 serve() thread. Thus they don't block the CLI.
41
42 Please see the `Note regarding network calls from CLI handlers`
43 chapter in the cephadm developer guide.
44
45 On the other hand, These function should *not* be called form
46 CLI handlers, to avoid blocking the CLI
47 """
48
49 def __init__(self, mgr: "CephadmOrchestrator"):
50 self.mgr: "CephadmOrchestrator" = mgr
51 self.log = logger
52
53 def serve(self) -> None:
54 """
55 The main loop of cephadm.
56
57 A command handler will typically change the declarative state
58 of cephadm. This loop will then attempt to apply this new state.
59 """
60 self.log.debug("serve starting")
61 self.mgr.config_checker.load_network_config()
62
63 while self.mgr.run:
64 self.log.debug("serve loop start")
65
66 try:
67
68 self.convert_tags_to_repo_digest()
69
70 # refresh daemons
71 self.log.debug('refreshing hosts and daemons')
72 self._refresh_hosts_and_daemons()
73
74 self._check_for_strays()
75
76 self._update_paused_health()
77
78 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
79 self.mgr.need_connect_dashboard_rgw = False
80 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
81 self.log.info('Checking dashboard <-> RGW credentials')
82 self.mgr.remote('dashboard', 'set_rgw_credentials')
83
84 if not self.mgr.paused:
85 self._run_async_actions()
86
87 self.mgr.to_remove_osds.process_removal_queue()
88
89 self.mgr.migration.migrate()
90 if self.mgr.migration.is_migration_ongoing():
91 continue
92
93 if self._apply_all_services():
94 continue # did something, refresh
95
96 self._check_daemons()
97
98 self._check_certificates()
99
100 self._purge_deleted_services()
101
102 self._check_for_moved_osds()
103
104 if self.mgr.agent_helpers._handle_use_agent_setting():
105 continue
106
107 if self.mgr.upgrade.continue_upgrade():
108 continue
109
110 except OrchestratorError as e:
111 if e.event_subject:
112 self.mgr.events.from_orch_error(e)
113
114 self.log.debug("serve loop sleep")
115 self._serve_sleep()
116 self.log.debug("serve loop wake")
117 self.log.debug("serve exit")
118
119 def _check_certificates(self) -> None:
120 for d in self.mgr.cache.get_daemons_by_type('grafana'):
121 cert = self.mgr.get_store(f'{d.hostname}/grafana_crt')
122 key = self.mgr.get_store(f'{d.hostname}/grafana_key')
123 if (not cert or not cert.strip()) and (not key or not key.strip()):
124 # certificate/key are empty... nothing to check
125 return
126
127 try:
128 get_cert_issuer_info(cert)
129 verify_tls(cert, key)
130 self.mgr.remove_health_warning('CEPHADM_CERT_ERROR')
131 except ServerConfigException as e:
132 err_msg = f"""
133 Detected invalid grafana certificates. Please, use the following commands:
134
135 > ceph config-key set mgr/cephadm/{d.hostname}/grafana_crt -i <path-to-ctr-file>
136 > ceph config-key set mgr/cephadm/{d.hostname}/grafana_key -i <path-to-key-file>
137
138 to set valid key and certificate or reset their value to an empty string
139 in case you want cephadm to generate self-signed Grafana certificates.
140
141 Once done, run the following command to reconfig the daemon:
142
143 > ceph orch daemon reconfig grafana.{d.hostname}
144
145 """
146 self.log.error(f'Detected invalid grafana certificate on host {d.hostname}: {e}')
147 self.mgr.set_health_warning('CEPHADM_CERT_ERROR',
148 f'Invalid grafana certificate on host {d.hostname}: {e}',
149 1, [err_msg])
150 break
151
152 def _serve_sleep(self) -> None:
153 sleep_interval = max(
154 30,
155 min(
156 self.mgr.host_check_interval,
157 self.mgr.facts_cache_timeout,
158 self.mgr.daemon_cache_timeout,
159 self.mgr.device_cache_timeout,
160 )
161 )
162 self.log.debug('Sleeping for %d seconds', sleep_interval)
163 self.mgr.event.wait(sleep_interval)
164 self.mgr.event.clear()
165
166 def _update_paused_health(self) -> None:
167 self.log.debug('_update_paused_health')
168 if self.mgr.paused:
169 self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
170 "'ceph orch resume' to resume"])
171 else:
172 self.mgr.remove_health_warning('CEPHADM_PAUSED')
173
174 def _autotune_host_memory(self, host: str) -> None:
175 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
176 if not total_mem:
177 val = None
178 else:
179 total_mem *= 1024 # kb -> bytes
180 total_mem *= self.mgr.autotune_memory_target_ratio
181 a = MemoryAutotuner(
182 daemons=self.mgr.cache.get_daemons_by_host(host),
183 config_get=self.mgr.get_foreign_ceph_option,
184 total_mem=total_mem,
185 )
186 val, osds = a.tune()
187 any_changed = False
188 for o in osds:
189 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
190 self.mgr.check_mon_command({
191 'prefix': 'config rm',
192 'who': o,
193 'name': 'osd_memory_target',
194 })
195 any_changed = True
196 if val is not None:
197 if any_changed:
198 self.mgr.log.info(
199 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
200 )
201 ret, out, err = self.mgr.mon_command({
202 'prefix': 'config set',
203 'who': f'osd/host:{host.split(".")[0]}',
204 'name': 'osd_memory_target',
205 'value': str(val),
206 })
207 if ret:
208 self.log.warning(
209 f'Unable to set osd_memory_target on {host} to {val}: {err}'
210 )
211 else:
212 # if osd memory autotuning is off, we don't want to remove these config
213 # options as users may be using them. Since there is no way to set autotuning
214 # on/off at a host level, best we can do is check if it is globally on.
215 if self.mgr.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'):
216 self.mgr.check_mon_command({
217 'prefix': 'config rm',
218 'who': f'osd/host:{host.split(".")[0]}',
219 'name': 'osd_memory_target',
220 })
221 self.mgr.cache.update_autotune(host)
222
223 def _refresh_hosts_and_daemons(self) -> None:
224 self.log.debug('_refresh_hosts_and_daemons')
225 bad_hosts = []
226 failures = []
227 agents_down: List[str] = []
228
229 @forall_hosts
230 def refresh(host: str) -> None:
231
232 # skip hosts that are in maintenance - they could be powered off
233 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
234 return
235
236 if self.mgr.use_agent:
237 if self.mgr.agent_helpers._check_agent(host):
238 agents_down.append(host)
239
240 if self.mgr.cache.host_needs_check(host):
241 r = self._check_host(host)
242 if r is not None:
243 bad_hosts.append(r)
244
245 if (
246 not self.mgr.use_agent
247 or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
248 or host in agents_down
249 ):
250 if self.mgr.cache.host_needs_daemon_refresh(host):
251 self.log.debug('refreshing %s daemons' % host)
252 r = self._refresh_host_daemons(host)
253 if r:
254 failures.append(r)
255
256 if self.mgr.cache.host_needs_facts_refresh(host):
257 self.log.debug(('Refreshing %s facts' % host))
258 r = self._refresh_facts(host)
259 if r:
260 failures.append(r)
261
262 if self.mgr.cache.host_needs_network_refresh(host):
263 self.log.debug(('Refreshing %s networks' % host))
264 r = self._refresh_host_networks(host)
265 if r:
266 failures.append(r)
267
268 if self.mgr.cache.host_needs_device_refresh(host):
269 self.log.debug('refreshing %s devices' % host)
270 r = self._refresh_host_devices(host)
271 if r:
272 failures.append(r)
273 self.mgr.cache.metadata_up_to_date[host] = True
274 elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
275 if self.mgr.cache.host_needs_daemon_refresh(host):
276 self.log.debug('refreshing %s daemons' % host)
277 r = self._refresh_host_daemons(host)
278 if r:
279 failures.append(r)
280 self.mgr.cache.metadata_up_to_date[host] = True
281
282 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
283 self.log.debug(f"Logging `{host}` into custom registry")
284 with self.mgr.async_timeout_handler(host, 'cephadm registry-login'):
285 r = self.mgr.wait_async(self._registry_login(
286 host, json.loads(str(self.mgr.get_store('registry_credentials')))))
287 if r:
288 bad_hosts.append(r)
289
290 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
291 self.log.debug(f"refreshing OSDSpec previews for {host}")
292 r = self._refresh_host_osdspec_previews(host)
293 if r:
294 failures.append(r)
295
296 if (
297 self.mgr.cache.host_needs_autotune_memory(host)
298 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
299 ):
300 self.log.debug(f"autotuning memory for {host}")
301 self._autotune_host_memory(host)
302
303 refresh(self.mgr.cache.get_hosts())
304
305 self._write_all_client_files()
306
307 self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
308 self.mgr.http_server.config_update()
309
310 self.mgr.config_checker.run_checks()
311
312 for k in [
313 'CEPHADM_HOST_CHECK_FAILED',
314 'CEPHADM_REFRESH_FAILED',
315 ]:
316 self.mgr.remove_health_warning(k)
317 if bad_hosts:
318 self.mgr.set_health_warning(
319 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
320 if failures:
321 self.mgr.set_health_warning(
322 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
323 self.mgr.update_failed_daemon_health_check()
324
325 def _check_host(self, host: str) -> Optional[str]:
326 if host not in self.mgr.inventory:
327 return None
328 self.log.debug(' checking %s' % host)
329 try:
330 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
331 with self.mgr.async_timeout_handler(host, 'cephadm check-host'):
332 out, err, code = self.mgr.wait_async(self._run_cephadm(
333 host, cephadmNoImage, 'check-host', [],
334 error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
335 self.mgr.cache.update_last_host_check(host)
336 self.mgr.cache.save_host(host)
337 if code:
338 self.log.debug(' host %s (%s) failed check' % (host, addr))
339 if self.mgr.warn_on_failed_host_check:
340 return 'host %s (%s) failed check: %s' % (host, addr, err)
341 else:
342 self.log.debug(' host %s (%s) ok' % (host, addr))
343 except Exception as e:
344 self.log.debug(' host %s (%s) failed check' % (host, addr))
345 return 'host %s (%s) failed check: %s' % (host, addr, e)
346 return None
347
348 def _refresh_host_daemons(self, host: str) -> Optional[str]:
349 try:
350 with self.mgr.async_timeout_handler(host, 'cephadm ls'):
351 ls = self.mgr.wait_async(self._run_cephadm_json(
352 host, 'mon', 'ls', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
353 except OrchestratorError as e:
354 return str(e)
355 self.mgr._process_ls_output(host, ls)
356 return None
357
358 def _refresh_facts(self, host: str) -> Optional[str]:
359 try:
360 with self.mgr.async_timeout_handler(host, 'cephadm gather-facts'):
361 val = self.mgr.wait_async(self._run_cephadm_json(
362 host, cephadmNoImage, 'gather-facts', [],
363 no_fsid=True, log_output=self.mgr.log_refresh_metadata))
364 except OrchestratorError as e:
365 return str(e)
366
367 self.mgr.cache.update_host_facts(host, val)
368
369 return None
370
371 def _refresh_host_devices(self, host: str) -> Optional[str]:
372 with_lsm = self.mgr.device_enhanced_scan
373 inventory_args = ['--', 'inventory',
374 '--format=json-pretty',
375 '--filter-for-batch']
376 if with_lsm:
377 inventory_args.insert(-1, "--with-lsm")
378
379 try:
380 try:
381 with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
382 devices = self.mgr.wait_async(self._run_cephadm_json(
383 host, 'osd', 'ceph-volume', inventory_args, log_output=self.mgr.log_refresh_metadata))
384 except OrchestratorError as e:
385 if 'unrecognized arguments: --filter-for-batch' in str(e):
386 rerun_args = inventory_args.copy()
387 rerun_args.remove('--filter-for-batch')
388 with self.mgr.async_timeout_handler(host, 'cephadm ceph-volume -- inventory'):
389 devices = self.mgr.wait_async(self._run_cephadm_json(
390 host, 'osd', 'ceph-volume', rerun_args, log_output=self.mgr.log_refresh_metadata))
391 else:
392 raise
393
394 except OrchestratorError as e:
395 return str(e)
396
397 self.log.debug('Refreshed host %s devices (%d)' % (
398 host, len(devices)))
399 ret = inventory.Devices.from_json(devices)
400 self.mgr.cache.update_host_devices(host, ret.devices)
401 self.update_osdspec_previews(host)
402 self.mgr.cache.save_host(host)
403 return None
404
405 def _refresh_host_networks(self, host: str) -> Optional[str]:
406 try:
407 with self.mgr.async_timeout_handler(host, 'cephadm list-networks'):
408 networks = self.mgr.wait_async(self._run_cephadm_json(
409 host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
410 except OrchestratorError as e:
411 return str(e)
412
413 self.log.debug('Refreshed host %s networks (%s)' % (
414 host, len(networks)))
415 self.mgr.cache.update_host_networks(host, networks)
416 self.mgr.cache.save_host(host)
417 return None
418
419 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
420 self.update_osdspec_previews(host)
421 self.mgr.cache.save_host(host)
422 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
423 return None
424
425 def update_osdspec_previews(self, search_host: str = '') -> None:
426 # Set global 'pending' flag for host
427 self.mgr.cache.loading_osdspec_preview.add(search_host)
428 previews = []
429 # query OSDSpecs for host <search host> and generate/get the preview
430 # There can be multiple previews for one host due to multiple OSDSpecs.
431 previews.extend(self.mgr.osd_service.get_previews(search_host))
432 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
433 self.mgr.cache.osdspec_previews[search_host] = previews
434 # Unset global 'pending' flag for host
435 self.mgr.cache.loading_osdspec_preview.remove(search_host)
436
437 def _run_async_actions(self) -> None:
438 while self.mgr.scheduled_async_actions:
439 (self.mgr.scheduled_async_actions.pop(0))()
440
441 def _check_for_strays(self) -> None:
442 self.log.debug('_check_for_strays')
443 for k in ['CEPHADM_STRAY_HOST',
444 'CEPHADM_STRAY_DAEMON']:
445 self.mgr.remove_health_warning(k)
446 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
447 ls = self.mgr.list_servers()
448 self.log.debug(ls)
449 managed = self.mgr.cache.get_daemon_names()
450 host_detail = [] # type: List[str]
451 host_num_daemons = 0
452 daemon_detail = [] # type: List[str]
453 for item in ls:
454 host = item.get('hostname')
455 assert isinstance(host, str)
456 daemons = item.get('services') # misnomer!
457 assert isinstance(daemons, list)
458 missing_names = []
459 for s in daemons:
460 daemon_id = s.get('id')
461 assert daemon_id
462 name = '%s.%s' % (s.get('type'), daemon_id)
463 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
464 metadata = self.mgr.get_metadata(
465 cast(str, s.get('type')), daemon_id, {})
466 assert metadata is not None
467 try:
468 if s.get('type') == 'rgw-nfs':
469 # https://tracker.ceph.com/issues/49573
470 name = metadata['id'][:-4]
471 else:
472 name = '%s.%s' % (s.get('type'), metadata['id'])
473 except (KeyError, TypeError):
474 self.log.debug(
475 "Failed to find daemon id for %s service %s" % (
476 s.get('type'), s.get('id')
477 )
478 )
479 if s.get('type') == 'tcmu-runner':
480 # because we don't track tcmu-runner daemons in the host cache
481 # and don't have a way to check if the daemon is part of iscsi service
482 # we assume that all tcmu-runner daemons are managed by cephadm
483 managed.append(name)
484 if host not in self.mgr.inventory:
485 missing_names.append(name)
486 host_num_daemons += 1
487 if name not in managed:
488 daemon_detail.append(
489 'stray daemon %s on host %s not managed by cephadm' % (name, host))
490 if missing_names:
491 host_detail.append(
492 'stray host %s has %d stray daemons: %s' % (
493 host, len(missing_names), missing_names))
494 if self.mgr.warn_on_stray_hosts and host_detail:
495 self.mgr.set_health_warning(
496 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
497 if self.mgr.warn_on_stray_daemons and daemon_detail:
498 self.mgr.set_health_warning(
499 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
500
501 def _check_for_moved_osds(self) -> None:
502 self.log.debug('_check_for_moved_osds')
503 all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
504 for dd in self.mgr.cache.get_daemons_by_type('osd'):
505 assert dd.daemon_id
506 all_osds[int(dd.daemon_id)].append(dd)
507 for osd_id, dds in all_osds.items():
508 if len(dds) <= 1:
509 continue
510 running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
511 error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
512 msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
513 logger.info(msg)
514 if len(running) != 1:
515 continue
516 osd = self.mgr.get_osd_by_id(osd_id)
517 if not osd or not osd['up']:
518 continue
519 for e in error:
520 assert e.hostname
521 try:
522 self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
523 self.mgr.events.for_daemon(
524 e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
525 except OrchestratorError as ex:
526 self.mgr.events.from_orch_error(ex)
527 logger.exception(f'failed to remove duplicated daemon {e}')
528
529 def _apply_all_services(self) -> bool:
530 self.log.debug('_apply_all_services')
531 r = False
532 specs = [] # type: List[ServiceSpec]
533 # if metadata is not up to date, we still need to apply spec for agent
534 # since the agent is the one who gather the metadata. If we don't we
535 # end up stuck between wanting metadata to be up to date to apply specs
536 # and needing to apply the agent spec to get up to date metadata
537 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
538 self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
539 try:
540 specs.append(self.mgr.spec_store['agent'].spec)
541 except Exception as e:
542 self.log.debug(f'Failed to find agent spec: {e}')
543 self.mgr.agent_helpers._apply_agent()
544 return r
545 else:
546 _specs: List[ServiceSpec] = []
547 for sn, spec in self.mgr.spec_store.active_specs.items():
548 _specs.append(spec)
549 # apply specs that don't use count first sice their placement is deterministic
550 # and not dependant on other daemon's placements in any way
551 specs = [s for s in _specs if not s.placement.count] + [s for s in _specs if s.placement.count]
552
553 for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
554 self.mgr.remove_health_warning(name)
555 self.mgr.apply_spec_fails = []
556 for spec in specs:
557 try:
558 if self._apply_service(spec):
559 r = True
560 except Exception as e:
561 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
562 self.log.exception(msg)
563 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
564 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
565 warnings = []
566 for x in self.mgr.apply_spec_fails:
567 warnings.append(f'{x[0]}: {x[1]}')
568 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
569 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
570 len(self.mgr.apply_spec_fails),
571 warnings)
572 self.mgr.update_watched_hosts()
573 self.mgr.tuned_profile_utils._write_all_tuned_profiles()
574 return r
575
576 def _apply_service_config(self, spec: ServiceSpec) -> None:
577 if spec.config:
578 section = utils.name_to_config_section(spec.service_name())
579 for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
580 self.mgr.remove_health_warning(name)
581 invalid_config_options = []
582 options_failed_to_set = []
583 for k, v in spec.config.items():
584 try:
585 current = self.mgr.get_foreign_ceph_option(section, k)
586 except KeyError:
587 msg = f'Ignoring invalid {spec.service_name()} config option {k}'
588 self.log.warning(msg)
589 self.mgr.events.for_service(
590 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
591 )
592 invalid_config_options.append(msg)
593 continue
594 if current != v:
595 self.log.debug(f'setting [{section}] {k} = {v}')
596 try:
597 self.mgr.check_mon_command({
598 'prefix': 'config set',
599 'name': k,
600 'value': str(v),
601 'who': section,
602 })
603 except MonCommandFailed as e:
604 msg = f'Failed to set {spec.service_name()} option {k}: {e}'
605 self.log.warning(msg)
606 options_failed_to_set.append(msg)
607
608 if invalid_config_options:
609 self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
610 invalid_config_options), invalid_config_options)
611 if options_failed_to_set:
612 self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
613 options_failed_to_set), options_failed_to_set)
614
615 def _update_rgw_endpoints(self, rgw_spec: RGWSpec) -> None:
616
617 if not rgw_spec.update_endpoints or rgw_spec.rgw_realm_token is None:
618 return
619
620 ep = []
621 protocol = 'https' if rgw_spec.ssl else 'http'
622 for s in self.mgr.cache.get_daemons_by_service(rgw_spec.service_name()):
623 if s.ports:
624 for p in s.ports:
625 ep.append(f'{protocol}://{s.hostname}:{p}')
626 zone_update_cmd = {
627 'prefix': 'rgw zone modify',
628 'realm_name': rgw_spec.rgw_realm,
629 'zonegroup_name': rgw_spec.rgw_zonegroup,
630 'zone_name': rgw_spec.rgw_zone,
631 'realm_token': rgw_spec.rgw_realm_token,
632 'zone_endpoints': ep,
633 }
634 self.log.debug(f'rgw cmd: {zone_update_cmd}')
635 rc, out, err = self.mgr.mon_command(zone_update_cmd)
636 rgw_spec.update_endpoints = (rc != 0) # keep trying on failure
637 if rc != 0:
638 self.log.error(f'Error when trying to update rgw zone: {err}')
639 self.mgr.set_health_warning('CEPHADM_RGW', 'Cannot update rgw endpoints, error: {err}', 1,
640 [f'Cannot update rgw endpoints for daemon {rgw_spec.service_name()}, error: {err}'])
641 else:
642 self.mgr.remove_health_warning('CEPHADM_RGW')
643
644 def _apply_service(self, spec: ServiceSpec) -> bool:
645 """
646 Schedule a service. Deploy new daemons or remove old ones, depending
647 on the target label and count specified in the placement.
648 """
649 self.mgr.migration.verify_no_migration()
650
651 service_type = spec.service_type
652 service_name = spec.service_name()
653 if spec.unmanaged:
654 self.log.debug('Skipping unmanaged service %s' % service_name)
655 return False
656 if spec.preview_only:
657 self.log.debug('Skipping preview_only service %s' % service_name)
658 return False
659 self.log.debug('Applying service %s spec' % service_name)
660
661 if service_type == 'agent':
662 try:
663 assert self.mgr.http_server.agent
664 assert self.mgr.http_server.agent.ssl_certs.get_root_cert()
665 except Exception:
666 self.log.info(
667 'Delaying applying agent spec until cephadm endpoint root cert created')
668 return False
669
670 self._apply_service_config(spec)
671
672 if service_type == 'osd':
673 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
674 # TODO: return True would result in a busy loop
675 # can't know if daemon count changed; create_from_spec doesn't
676 # return a solid indication
677 return False
678
679 svc = self.mgr.cephadm_services[service_type]
680 daemons = self.mgr.cache.get_daemons_by_service(service_name)
681 related_service_daemons = self.mgr.cache.get_related_service_daemons(spec)
682
683 public_networks: List[str] = []
684 if service_type == 'mon':
685 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
686 if '/' in out:
687 public_networks = [x.strip() for x in out.split(',')]
688 self.log.debug('mon public_network(s) is %s' % public_networks)
689
690 def matches_network(host):
691 # type: (str) -> bool
692 # make sure the host has at least one network that belongs to some configured public network(s)
693 for pn in public_networks:
694 public_network = ipaddress.ip_network(pn)
695 for hn in self.mgr.cache.networks[host]:
696 host_network = ipaddress.ip_network(hn)
697 if host_network.overlaps(public_network):
698 return True
699
700 host_networks = ','.join(self.mgr.cache.networks[host])
701 pub_networks = ','.join(public_networks)
702 self.log.info(
703 f"Filtered out host {host}: does not belong to mon public_network(s): "
704 f" {pub_networks}, host network(s): {host_networks}"
705 )
706 return False
707
708 rank_map = None
709 if svc.ranked():
710 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
711 ha = HostAssignment(
712 spec=spec,
713 hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
714 ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
715 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
716 draining_hosts=self.mgr.cache.get_draining_hosts(),
717 daemons=daemons,
718 related_service_daemons=related_service_daemons,
719 networks=self.mgr.cache.networks,
720 filter_new_host=(
721 matches_network if service_type == 'mon'
722 else None
723 ),
724 allow_colo=svc.allow_colo(),
725 primary_daemon_type=svc.primary_daemon_type(spec),
726 per_host_daemon_type=svc.per_host_daemon_type(spec),
727 rank_map=rank_map,
728 )
729
730 try:
731 all_slots, slots_to_add, daemons_to_remove = ha.place()
732 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
733 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
734 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
735 except OrchestratorError as e:
736 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
737 self.log.error(msg)
738 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
739 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
740 warnings = []
741 for x in self.mgr.apply_spec_fails:
742 warnings.append(f'{x[0]}: {x[1]}')
743 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
744 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
745 len(self.mgr.apply_spec_fails),
746 warnings)
747 return False
748
749 r = None
750
751 # sanity check
752 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
753 if service_type in ['mon', 'mgr'] and final_count < 1:
754 self.log.debug('cannot scale mon|mgr below 1)')
755 return False
756
757 # progress
758 progress_id = str(uuid.uuid4())
759 delta: List[str] = []
760 if slots_to_add:
761 delta += [f'+{len(slots_to_add)}']
762 if daemons_to_remove:
763 delta += [f'-{len(daemons_to_remove)}']
764 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
765 progress_total = len(slots_to_add) + len(daemons_to_remove)
766 progress_done = 0
767
768 def update_progress() -> None:
769 self.mgr.remote(
770 'progress', 'update', progress_id,
771 ev_msg=progress_title,
772 ev_progress=(progress_done / progress_total),
773 add_to_ceph_s=True,
774 )
775
776 if progress_total:
777 update_progress()
778
779 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
780 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
781
782 hosts_altered: Set[str] = set()
783
784 try:
785 # assign names
786 for i in range(len(slots_to_add)):
787 slot = slots_to_add[i]
788 slot = slot.assign_name(self.mgr.get_unique_name(
789 slot.daemon_type,
790 slot.hostname,
791 [d for d in daemons if d not in daemons_to_remove],
792 prefix=spec.service_id,
793 forcename=slot.name,
794 rank=slot.rank,
795 rank_generation=slot.rank_generation,
796 ))
797 slots_to_add[i] = slot
798 if rank_map is not None:
799 assert slot.rank is not None
800 assert slot.rank_generation is not None
801 assert rank_map[slot.rank][slot.rank_generation] is None
802 rank_map[slot.rank][slot.rank_generation] = slot.name
803
804 if rank_map:
805 # record the rank_map before we make changes so that if we fail the
806 # next mgr will clean up.
807 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
808
809 # remove daemons now, since we are going to fence them anyway
810 for d in daemons_to_remove:
811 assert d.hostname is not None
812 self._remove_daemon(d.name(), d.hostname)
813 daemons_to_remove = []
814
815 # fence them
816 svc.fence_old_ranks(spec, rank_map, len(all_slots))
817
818 # create daemons
819 daemon_place_fails = []
820 for slot in slots_to_add:
821 # first remove daemon with conflicting port or name?
822 if slot.ports or slot.name in [d.name() for d in daemons_to_remove]:
823 for d in daemons_to_remove:
824 if (
825 d.hostname != slot.hostname
826 or not (set(d.ports or []) & set(slot.ports))
827 or (d.ip and slot.ip and d.ip != slot.ip)
828 and d.name() != slot.name
829 ):
830 continue
831 if d.name() != slot.name:
832 self.log.info(
833 f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict'
834 )
835 # NOTE: we don't check ok-to-stop here to avoid starvation if
836 # there is only 1 gateway.
837 self._remove_daemon(d.name(), d.hostname)
838 daemons_to_remove.remove(d)
839 progress_done += 1
840 hosts_altered.add(d.hostname)
841 break
842
843 # deploy new daemon
844 daemon_id = slot.name
845
846 daemon_spec = svc.make_daemon_spec(
847 slot.hostname, daemon_id, slot.network, spec,
848 daemon_type=slot.daemon_type,
849 ports=slot.ports,
850 ip=slot.ip,
851 rank=slot.rank,
852 rank_generation=slot.rank_generation,
853 )
854 self.log.debug('Placing %s.%s on host %s' % (
855 slot.daemon_type, daemon_id, slot.hostname))
856
857 try:
858 daemon_spec = svc.prepare_create(daemon_spec)
859 with self.mgr.async_timeout_handler(slot.hostname, f'cephadm deploy ({daemon_spec.daemon_type} type dameon)'):
860 self.mgr.wait_async(self._create_daemon(daemon_spec))
861 r = True
862 progress_done += 1
863 update_progress()
864 hosts_altered.add(daemon_spec.host)
865 self.mgr.spec_store.mark_needs_configuration(spec.service_name())
866 except (RuntimeError, OrchestratorError) as e:
867 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
868 f"on {slot.hostname}: {e}")
869 self.mgr.events.for_service(spec, 'ERROR', msg)
870 self.mgr.log.error(msg)
871 daemon_place_fails.append(msg)
872 # only return "no change" if no one else has already succeeded.
873 # later successes will also change to True
874 if r is None:
875 r = False
876 progress_done += 1
877 update_progress()
878 continue
879
880 # add to daemon list so next name(s) will also be unique
881 sd = orchestrator.DaemonDescription(
882 hostname=slot.hostname,
883 daemon_type=slot.daemon_type,
884 daemon_id=daemon_id,
885 service_name=spec.service_name()
886 )
887 daemons.append(sd)
888 self.mgr.cache.append_tmp_daemon(slot.hostname, sd)
889
890 if daemon_place_fails:
891 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
892 daemon_place_fails), daemon_place_fails)
893
894 if service_type == 'mgr':
895 active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr'))
896 if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]:
897 # We can't just remove the active mgr like any other daemon.
898 # Need to fail over later so it can be removed on next pass.
899 # This can be accomplished by scheduling a restart of the active mgr.
900 self.mgr._schedule_daemon_action(active_mgr.name(), 'restart')
901
902 if service_type == 'rgw':
903 self._update_rgw_endpoints(cast(RGWSpec, spec))
904
905 # remove any?
906 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
907 daemon_ids = [d.daemon_id for d in remove_daemons]
908 assert None not in daemon_ids
909 # setting force flag retains previous behavior
910 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
911 return not r.retval
912
913 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
914 # let's find a subset that is ok-to-stop
915 daemons_to_remove.pop()
916 for d in daemons_to_remove:
917 r = True
918 assert d.hostname is not None
919 self._remove_daemon(d.name(), d.hostname)
920
921 progress_done += 1
922 update_progress()
923 hosts_altered.add(d.hostname)
924 self.mgr.spec_store.mark_needs_configuration(spec.service_name())
925
926 self.mgr.remote('progress', 'complete', progress_id)
927 except Exception as e:
928 self.mgr.remote('progress', 'fail', progress_id, str(e))
929 raise
930 finally:
931 if self.mgr.spec_store.needs_configuration(spec.service_name()):
932 svc.config(spec)
933 self.mgr.spec_store.mark_configured(spec.service_name())
934 if self.mgr.use_agent:
935 # can only send ack to agents if we know for sure port they bound to
936 hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
937 h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
938 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
939
940 if r is None:
941 r = False
942 return r
943
944 def _check_daemons(self) -> None:
945 self.log.debug('_check_daemons')
946 daemons = self.mgr.cache.get_daemons()
947 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
948 for dd in daemons:
949 # orphan?
950 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
951 assert dd.hostname is not None
952 assert dd.daemon_type is not None
953 assert dd.daemon_id is not None
954
955 # any action we can try will fail for a daemon on an offline host,
956 # including removing the daemon
957 if dd.hostname in self.mgr.offline_hosts:
958 continue
959
960 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
961 # (mon and mgr specs should always exist; osds aren't matched
962 # to a service spec)
963 self.log.info('Removing orphan daemon %s...' % dd.name())
964 self._remove_daemon(dd.name(), dd.hostname)
965
966 # ignore unmanaged services
967 if spec and spec.unmanaged:
968 continue
969
970 # ignore daemons for deleted services
971 if dd.service_name() in self.mgr.spec_store.spec_deleted:
972 continue
973
974 if dd.daemon_type == 'agent':
975 try:
976 self.mgr.agent_helpers._check_agent(dd.hostname)
977 except Exception as e:
978 self.log.debug(
979 f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
980 continue
981
982 # These daemon types require additional configs after creation
983 if dd.daemon_type in REQUIRES_POST_ACTIONS:
984 daemons_post[dd.daemon_type].append(dd)
985
986 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
987 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
988 dd.is_active = True
989 else:
990 dd.is_active = False
991
992 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
993 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
994 dd.hostname, dd.name())
995 if last_deps is None:
996 last_deps = []
997 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
998 if not last_config:
999 self.log.info('Reconfiguring %s (unknown last config time)...' % (
1000 dd.name()))
1001 action = 'reconfig'
1002 elif last_deps != deps:
1003 self.log.debug(f'{dd.name()} deps {last_deps} -> {deps}')
1004 self.log.info(f'Reconfiguring {dd.name()} (dependencies changed)...')
1005 action = 'reconfig'
1006 # we need only redeploy if secure_monitoring_stack value has changed:
1007 if dd.daemon_type in ['prometheus', 'node-exporter', 'alertmanager']:
1008 diff = list(set(last_deps) - set(deps))
1009 if any('secure_monitoring_stack' in e for e in diff):
1010 action = 'redeploy'
1011
1012 elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
1013 self.log.debug(
1014 f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
1015 self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
1016 dd.extra_container_args = spec.extra_container_args
1017 action = 'redeploy'
1018 elif spec is not None and hasattr(spec, 'extra_entrypoint_args') and dd.extra_entrypoint_args != spec.extra_entrypoint_args:
1019 self.log.info(f'Redeploying {dd.name()}, (entrypoint args changed) . . .')
1020 self.log.debug(
1021 f'{dd.name()} daemon entrypoint args {dd.extra_entrypoint_args} -> {spec.extra_entrypoint_args}')
1022 dd.extra_entrypoint_args = spec.extra_entrypoint_args
1023 action = 'redeploy'
1024 elif self.mgr.last_monmap and \
1025 self.mgr.last_monmap > last_config and \
1026 dd.daemon_type in CEPH_TYPES:
1027 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
1028 action = 'reconfig'
1029 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
1030 dd.daemon_type in CEPH_TYPES:
1031 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
1032 action = 'reconfig'
1033 if action:
1034 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
1035 and action == 'reconfig':
1036 action = 'redeploy'
1037 try:
1038 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
1039 self.mgr._daemon_action(daemon_spec, action=action)
1040 if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
1041 self.mgr.cache.save_host(dd.hostname)
1042 except OrchestratorError as e:
1043 self.log.exception(e)
1044 self.mgr.events.from_orch_error(e)
1045 if dd.daemon_type in daemons_post:
1046 del daemons_post[dd.daemon_type]
1047 # continue...
1048 except Exception as e:
1049 self.log.exception(e)
1050 self.mgr.events.for_daemon_from_exception(dd.name(), e)
1051 if dd.daemon_type in daemons_post:
1052 del daemons_post[dd.daemon_type]
1053 # continue...
1054
1055 # do daemon post actions
1056 for daemon_type, daemon_descs in daemons_post.items():
1057 run_post = False
1058 for d in daemon_descs:
1059 if d.name() in self.mgr.requires_post_actions:
1060 self.mgr.requires_post_actions.remove(d.name())
1061 run_post = True
1062 if run_post:
1063 self.mgr._get_cephadm_service(daemon_type_to_service(
1064 daemon_type)).daemon_check_post(daemon_descs)
1065
1066 def _purge_deleted_services(self) -> None:
1067 self.log.debug('_purge_deleted_services')
1068 existing_services = self.mgr.spec_store.all_specs.items()
1069 for service_name, spec in list(existing_services):
1070 if service_name not in self.mgr.spec_store.spec_deleted:
1071 continue
1072 if self.mgr.cache.get_daemons_by_service(service_name):
1073 continue
1074 if spec.service_type in ['mon', 'mgr']:
1075 continue
1076
1077 logger.info(f'Purge service {service_name}')
1078
1079 self.mgr.cephadm_services[spec.service_type].purge(service_name)
1080 self.mgr.spec_store.finally_rm(service_name)
1081
1082 def convert_tags_to_repo_digest(self) -> None:
1083 if not self.mgr.use_repo_digest:
1084 return
1085 settings = self.mgr.upgrade.get_distinct_container_image_settings()
1086 digests: Dict[str, ContainerInspectInfo] = {}
1087 for container_image_ref in set(settings.values()):
1088 if not is_repo_digest(container_image_ref):
1089 with self.mgr.async_timeout_handler(cmd=f'cephadm inspect-image (image {container_image_ref})'):
1090 image_info = self.mgr.wait_async(
1091 self._get_container_image_info(container_image_ref))
1092 if image_info.repo_digests:
1093 # FIXME: we assume the first digest here is the best
1094 assert is_repo_digest(image_info.repo_digests[0]), image_info
1095 digests[container_image_ref] = image_info
1096
1097 for entity, container_image_ref in settings.items():
1098 if not is_repo_digest(container_image_ref):
1099 image_info = digests[container_image_ref]
1100 if image_info.repo_digests:
1101 # FIXME: we assume the first digest here is the best
1102 self.mgr.set_container_image(entity, image_info.repo_digests[0])
1103
1104 def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
1105 # host -> path -> (mode, uid, gid, content, digest)
1106 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
1107
1108 # ceph.conf
1109 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
1110 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
1111 cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config'
1112
1113 if self.mgr.manage_etc_ceph_ceph_conf:
1114 try:
1115 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
1116 ha = HostAssignment(
1117 spec=ServiceSpec('mon', placement=pspec),
1118 hosts=self.mgr.cache.get_schedulable_hosts(),
1119 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1120 draining_hosts=self.mgr.cache.get_draining_hosts(),
1121 daemons=[],
1122 networks=self.mgr.cache.networks,
1123 )
1124 all_slots, _, _ = ha.place()
1125 for host in {s.hostname for s in all_slots}:
1126 if host not in client_files:
1127 client_files[host] = {}
1128 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1129 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1130 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1131 except Exception as e:
1132 self.mgr.log.warning(
1133 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
1134
1135 # client keyrings
1136 for ks in self.mgr.keys.keys.values():
1137 try:
1138 ret, keyring, err = self.mgr.mon_command({
1139 'prefix': 'auth get',
1140 'entity': ks.entity,
1141 })
1142 if ret:
1143 self.log.warning(f'unable to fetch keyring for {ks.entity}')
1144 continue
1145 digest = ''.join('%02x' % c for c in hashlib.sha256(
1146 keyring.encode('utf-8')).digest())
1147 ha = HostAssignment(
1148 spec=ServiceSpec('mon', placement=ks.placement),
1149 hosts=self.mgr.cache.get_schedulable_hosts(),
1150 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1151 draining_hosts=self.mgr.cache.get_draining_hosts(),
1152 daemons=[],
1153 networks=self.mgr.cache.networks,
1154 )
1155 all_slots, _, _ = ha.place()
1156 for host in {s.hostname for s in all_slots}:
1157 if host not in client_files:
1158 client_files[host] = {}
1159 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1160 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1161 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1162 ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest)
1163 client_files[host][ks.path] = ceph_admin_key
1164 client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key
1165 except Exception as e:
1166 self.log.warning(
1167 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
1168 return client_files
1169
1170 def _write_all_client_files(self) -> None:
1171 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
1172 client_files = self._calc_client_files()
1173 else:
1174 client_files = {}
1175
1176 @forall_hosts
1177 def _write_files(host: str) -> None:
1178 self._write_client_files(client_files, host)
1179
1180 _write_files(self.mgr.cache.get_hosts())
1181
1182 def _write_client_files(self,
1183 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
1184 host: str) -> None:
1185 updated_files = False
1186 if host in self.mgr.offline_hosts:
1187 return
1188 old_files = self.mgr.cache.get_host_client_files(host).copy()
1189 for path, m in client_files.get(host, {}).items():
1190 mode, uid, gid, content, digest = m
1191 if path in old_files:
1192 match = old_files[path] == (digest, mode, uid, gid)
1193 del old_files[path]
1194 if match:
1195 continue
1196 self.log.info(f'Updating {host}:{path}')
1197 self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
1198 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
1199 updated_files = True
1200 for path in old_files.keys():
1201 if path == '/etc/ceph/ceph.conf':
1202 continue
1203 self.log.info(f'Removing {host}:{path}')
1204 cmd = ['rm', '-f', path]
1205 self.mgr.ssh.check_execute_command(host, cmd)
1206 updated_files = True
1207 self.mgr.cache.removed_client_file(host, path)
1208 if updated_files:
1209 self.mgr.cache.save_host(host)
1210
1211 async def _create_daemon(self,
1212 daemon_spec: CephadmDaemonDeploySpec,
1213 reconfig: bool = False,
1214 osd_uuid_map: Optional[Dict[str, Any]] = None,
1215 ) -> str:
1216
1217 with set_exception_subject('service', orchestrator.DaemonDescription(
1218 daemon_type=daemon_spec.daemon_type,
1219 daemon_id=daemon_spec.daemon_id,
1220 hostname=daemon_spec.host,
1221 ).service_id(), overwrite=True):
1222
1223 try:
1224 image = ''
1225 start_time = datetime_now()
1226 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1227
1228 if daemon_spec.daemon_type == 'container':
1229 spec = cast(CustomContainerSpec,
1230 self.mgr.spec_store[daemon_spec.service_name].spec)
1231 image = spec.image
1232 if spec.ports:
1233 ports.extend(spec.ports)
1234
1235 # TCP port to open in the host firewall
1236 if len(ports) > 0:
1237 daemon_spec.extra_args.extend([
1238 '--tcp-ports', ' '.join(map(str, ports))
1239 ])
1240
1241 # osd deployments needs an --osd-uuid arg
1242 if daemon_spec.daemon_type == 'osd':
1243 if not osd_uuid_map:
1244 osd_uuid_map = self.mgr.get_osd_uuid_map()
1245 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1246 if not osd_uuid:
1247 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1248 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1249
1250 if reconfig:
1251 daemon_spec.extra_args.append('--reconfig')
1252 if self.mgr.allow_ptrace:
1253 daemon_spec.extra_args.append('--allow-ptrace')
1254
1255 daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec)
1256
1257 if daemon_spec.service_name in self.mgr.spec_store:
1258 configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs
1259 if configs is not None:
1260 daemon_spec.final_config.update(
1261 {'custom_config_files': [c.to_json() for c in configs]})
1262
1263 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
1264 await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
1265
1266 self.log.info('%s daemon %s on %s' % (
1267 'Reconfiguring' if reconfig else 'Deploying',
1268 daemon_spec.name(), daemon_spec.host))
1269
1270 out, err, code = await self._run_cephadm(
1271 daemon_spec.host, daemon_spec.name(), 'deploy',
1272 [
1273 '--name', daemon_spec.name(),
1274 '--meta-json', json.dumps({
1275 'service_name': daemon_spec.service_name,
1276 'ports': daemon_spec.ports,
1277 'ip': daemon_spec.ip,
1278 'deployed_by': self.mgr.get_active_mgr_digests(),
1279 'rank': daemon_spec.rank,
1280 'rank_generation': daemon_spec.rank_generation,
1281 'extra_container_args': extra_container_args,
1282 'extra_entrypoint_args': extra_entrypoint_args
1283 }),
1284 '--config-json', '-',
1285 ] + daemon_spec.extra_args,
1286 stdin=json.dumps(daemon_spec.final_config),
1287 image=image,
1288 )
1289
1290 if daemon_spec.daemon_type == 'agent':
1291 self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
1292 self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
1293
1294 # refresh daemon state? (ceph daemon reconfig does not need it)
1295 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1296 if not code and daemon_spec.host in self.mgr.cache.daemons:
1297 # prime cached service state with what we (should have)
1298 # just created
1299 sd = daemon_spec.to_daemon_description(
1300 DaemonDescriptionStatus.starting, 'starting')
1301 self.mgr.cache.add_daemon(daemon_spec.host, sd)
1302 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
1303 self.mgr.requires_post_actions.add(daemon_spec.name())
1304 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1305
1306 if daemon_spec.daemon_type != 'agent':
1307 self.mgr.cache.update_daemon_config_deps(
1308 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1309 self.mgr.cache.save_host(daemon_spec.host)
1310 else:
1311 self.mgr.agent_cache.update_agent_config_deps(
1312 daemon_spec.host, daemon_spec.deps, start_time)
1313 self.mgr.agent_cache.save_agent(daemon_spec.host)
1314 msg = "{} {} on host '{}'".format(
1315 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1316 if not code:
1317 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1318 else:
1319 what = 'reconfigure' if reconfig else 'deploy'
1320 self.mgr.events.for_daemon(
1321 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1322 return msg
1323 except OrchestratorError:
1324 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1325 if not reconfig and not redeploy:
1326 # we have to clean up the daemon. E.g. keyrings.
1327 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1328 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
1329 self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
1330 raise
1331
1332 def _setup_extra_deployment_args(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[CephadmDaemonDeploySpec, Optional[List[str]], Optional[List[str]]]:
1333 # this function is for handling any potential user specified
1334 # (in the service spec) extra runtime or entrypoint args for a daemon
1335 # we are going to deploy. Effectively just adds a set of extra args to
1336 # pass to the cephadm binary to indicate the daemon being deployed
1337 # needs extra runtime/entrypoint args. Returns the modified daemon spec
1338 # as well as what args were added (as those are included in unit.meta file)
1339 try:
1340 eca = daemon_spec.extra_container_args
1341 if eca:
1342 for a in eca:
1343 # args with spaces need to be split into multiple args
1344 # in order to work properly
1345 args = a.split(' ')
1346 for arg in args:
1347 if arg:
1348 daemon_spec.extra_args.append(f'--extra-container-args={arg}')
1349 except AttributeError:
1350 eca = None
1351 try:
1352 eea = daemon_spec.extra_entrypoint_args
1353 if eea:
1354 for a in eea:
1355 # args with spaces need to be split into multiple args
1356 # in order to work properly
1357 args = a.split(' ')
1358 for arg in args:
1359 if arg:
1360 daemon_spec.extra_args.append(f'--extra-entrypoint-args={arg}')
1361 except AttributeError:
1362 eea = None
1363 return daemon_spec, eca, eea
1364
1365 def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
1366 """
1367 Remove a daemon
1368 """
1369 (daemon_type, daemon_id) = name.split('.', 1)
1370 daemon = orchestrator.DaemonDescription(
1371 daemon_type=daemon_type,
1372 daemon_id=daemon_id,
1373 hostname=host)
1374
1375 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1376
1377 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1378 # NOTE: we are passing the 'force' flag here, which means
1379 # we can delete a mon instances data.
1380 dd = self.mgr.cache.get_daemon(daemon.daemon_name)
1381 if dd.ports:
1382 args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))]
1383 else:
1384 args = ['--name', name, '--force']
1385
1386 self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
1387 with self.mgr.async_timeout_handler(host, f'cephadm rm-daemon (daemon {name})'):
1388 out, err, code = self.mgr.wait_async(self._run_cephadm(
1389 host, name, 'rm-daemon', args))
1390 if not code:
1391 # remove item from cache
1392 self.mgr.cache.rm_daemon(host, name)
1393 self.mgr.cache.invalidate_host_daemons(host)
1394
1395 if not no_post_remove:
1396 if daemon_type not in ['iscsi']:
1397 self.mgr.cephadm_services[daemon_type_to_service(
1398 daemon_type)].post_remove(daemon, is_failed_deploy=False)
1399 else:
1400 self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service(
1401 daemon_type)].post_remove(daemon, is_failed_deploy=False))
1402 self.mgr._kick_serve_loop()
1403
1404 return "Removed {} from host '{}'".format(name, host)
1405
1406 async def _run_cephadm_json(self,
1407 host: str,
1408 entity: Union[CephadmNoImage, str],
1409 command: str,
1410 args: List[str],
1411 no_fsid: Optional[bool] = False,
1412 error_ok: Optional[bool] = False,
1413 image: Optional[str] = "",
1414 log_output: Optional[bool] = True,
1415 ) -> Any:
1416 try:
1417 out, err, code = await self._run_cephadm(
1418 host, entity, command, args, no_fsid=no_fsid, error_ok=error_ok,
1419 image=image, log_output=log_output)
1420 if code:
1421 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1422 except Exception as e:
1423 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1424 try:
1425 return json.loads(''.join(out))
1426 except (ValueError, KeyError):
1427 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1428 self.log.exception(f'{msg}: {"".join(out)}')
1429 raise OrchestratorError(msg)
1430
1431 async def _run_cephadm(self,
1432 host: str,
1433 entity: Union[CephadmNoImage, str],
1434 command: str,
1435 args: List[str],
1436 addr: Optional[str] = "",
1437 stdin: Optional[str] = "",
1438 no_fsid: Optional[bool] = False,
1439 error_ok: Optional[bool] = False,
1440 image: Optional[str] = "",
1441 env_vars: Optional[List[str]] = None,
1442 log_output: Optional[bool] = True,
1443 timeout: Optional[int] = None, # timeout in seconds
1444 ) -> Tuple[List[str], List[str], int]:
1445 """
1446 Run cephadm on the remote host with the given command + args
1447
1448 Important: You probably don't want to run _run_cephadm from CLI handlers
1449
1450 :env_vars: in format -> [KEY=VALUE, ..]
1451 """
1452
1453 await self.mgr.ssh._remote_connection(host, addr)
1454
1455 self.log.debug(f"_run_cephadm : command = {command}")
1456 self.log.debug(f"_run_cephadm : args = {args}")
1457
1458 bypass_image = ('agent')
1459
1460 assert image or entity
1461 # Skip the image check for daemons deployed that are not ceph containers
1462 if not str(entity).startswith(bypass_image):
1463 if not image and entity is not cephadmNoImage:
1464 image = self.mgr._get_container_image(entity)
1465
1466 final_args = []
1467
1468 # global args
1469 if env_vars:
1470 for env_var_pair in env_vars:
1471 final_args.extend(['--env', env_var_pair])
1472
1473 if image:
1474 final_args.extend(['--image', image])
1475
1476 if not self.mgr.container_init:
1477 final_args += ['--no-container-init']
1478
1479 if not self.mgr.cgroups_split:
1480 final_args += ['--no-cgroups-split']
1481
1482 if not timeout:
1483 # default global timeout if no timeout was passed
1484 timeout = self.mgr.default_cephadm_command_timeout
1485 # put a lower bound of 60 seconds in case users
1486 # accidentally set it to something unreasonable.
1487 # For example if they though it was in minutes
1488 # rather than seconds
1489 if timeout < 60:
1490 self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
1491 timeout = 60
1492 # subtract a small amount to give this timeout
1493 # in the binary a chance to actually happen over
1494 # the asyncio based timeout in the mgr module
1495 timeout -= 5
1496 final_args += ['--timeout', str(timeout)]
1497
1498 # subcommand
1499 final_args.append(command)
1500
1501 # subcommand args
1502 if not no_fsid:
1503 final_args += ['--fsid', self.mgr._cluster_fsid]
1504
1505 final_args += args
1506
1507 # exec
1508 self.log.debug('args: %s' % (' '.join(final_args)))
1509 if self.mgr.mode == 'root':
1510 # agent has cephadm binary as an extra file which is
1511 # therefore passed over stdin. Even for debug logs it's too much
1512 if stdin and 'agent' not in str(entity):
1513 self.log.debug('stdin: %s' % stdin)
1514
1515 cmd = ['which', 'python3']
1516 python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
1517 cmd = [python, self.mgr.cephadm_binary_path] + final_args
1518
1519 try:
1520 out, err, code = await self.mgr.ssh._execute_command(
1521 host, cmd, stdin=stdin, addr=addr)
1522 if code == 2:
1523 ls_cmd = ['ls', self.mgr.cephadm_binary_path]
1524 out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr,
1525 log_command=log_output)
1526 if code_ls == 2:
1527 await self._deploy_cephadm_binary(host, addr)
1528 out, err, code = await self.mgr.ssh._execute_command(
1529 host, cmd, stdin=stdin, addr=addr)
1530 # if there is an agent on this host, make sure it is using the most recent
1531 # version of cephadm binary
1532 if host in self.mgr.inventory:
1533 for agent in self.mgr.cache.get_daemons_by_type('agent', host):
1534 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
1535
1536 except Exception as e:
1537 await self.mgr.ssh._reset_con(host)
1538 if error_ok:
1539 return [], [str(e)], 1
1540 raise
1541
1542 elif self.mgr.mode == 'cephadm-package':
1543 try:
1544 cmd = ['/usr/bin/cephadm'] + final_args
1545 out, err, code = await self.mgr.ssh._execute_command(
1546 host, cmd, stdin=stdin, addr=addr)
1547 except Exception as e:
1548 await self.mgr.ssh._reset_con(host)
1549 if error_ok:
1550 return [], [str(e)], 1
1551 raise
1552 else:
1553 assert False, 'unsupported mode'
1554
1555 if log_output:
1556 self.log.debug(f'code: {code}')
1557 if out:
1558 self.log.debug(f'out: {out}')
1559 if err:
1560 self.log.debug(f'err: {err}')
1561 if code and not error_ok:
1562 raise OrchestratorError(
1563 f'cephadm exited with an error code: {code}, stderr: {err}')
1564 return [out], [err], code
1565
1566 async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
1567 # pick a random host...
1568 host = None
1569 for host_name in self.mgr.inventory.keys():
1570 host = host_name
1571 break
1572 if not host:
1573 raise OrchestratorError('no hosts defined')
1574 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
1575 await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
1576
1577 j = None
1578 try:
1579 j = await self._run_cephadm_json(host, '', 'inspect-image', [],
1580 image=image_name, no_fsid=True,
1581 error_ok=True)
1582 except OrchestratorError:
1583 pass
1584
1585 if not j:
1586 pullargs: List[str] = []
1587 if self.mgr.registry_insecure:
1588 pullargs.append("--insecure")
1589
1590 j = await self._run_cephadm_json(host, '', 'pull', pullargs,
1591 image=image_name, no_fsid=True)
1592 r = ContainerInspectInfo(
1593 j['image_id'],
1594 j.get('ceph_version'),
1595 j.get('repo_digests')
1596 )
1597 self.log.debug(f'image {image_name} -> {r}')
1598 return r
1599
1600 # function responsible for logging single host into custom registry
1601 async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
1602 self.log.debug(
1603 f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
1604 # want to pass info over stdin rather than through normal list of args
1605 out, err, code = await self._run_cephadm(
1606 host, 'mon', 'registry-login',
1607 ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
1608 if code:
1609 return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
1610 return None
1611
1612 async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
1613 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1614 self.log.info(f"Deploying cephadm binary to {host}")
1615 await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
1616 self.mgr._cephadm, addr=addr)