7 from collections
import defaultdict
8 from typing
import TYPE_CHECKING
, Optional
, List
, cast
, Dict
, Any
, Union
, Tuple
, Set
, \
11 from ceph
.deployment
import inventory
12 from ceph
.deployment
.drive_group
import DriveGroupSpec
13 from ceph
.deployment
.service_spec
import ServiceSpec
, CustomContainerSpec
, PlacementSpec
, RGWSpec
14 from ceph
.utils
import datetime_now
17 from orchestrator
import OrchestratorError
, set_exception_subject
, OrchestratorEvent
, \
18 DaemonDescriptionStatus
, daemon_type_to_service
19 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
20 from cephadm
.schedule
import HostAssignment
21 from cephadm
.autotune
import MemoryAutotuner
22 from cephadm
.utils
import forall_hosts
, cephadmNoImage
, is_repo_digest
, \
23 CephadmNoImage
, CEPH_TYPES
, ContainerInspectInfo
24 from mgr_module
import MonCommandFailed
25 from mgr_util
import format_bytes
, verify_tls
, get_cert_issuer_info
, ServerConfigException
30 from cephadm
.module
import CephadmOrchestrator
32 logger
= logging
.getLogger(__name__
)
34 REQUIRES_POST_ACTIONS
= ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
39 This module contains functions that are executed in the
40 serve() thread. Thus they don't block the CLI.
42 Please see the `Note regarding network calls from CLI handlers`
43 chapter in the cephadm developer guide.
45 On the other hand, These function should *not* be called form
46 CLI handlers, to avoid blocking the CLI
49 def __init__(self
, mgr
: "CephadmOrchestrator"):
50 self
.mgr
: "CephadmOrchestrator" = mgr
53 def serve(self
) -> None:
55 The main loop of cephadm.
57 A command handler will typically change the declarative state
58 of cephadm. This loop will then attempt to apply this new state.
60 self
.log
.debug("serve starting")
61 self
.mgr
.config_checker
.load_network_config()
64 self
.log
.debug("serve loop start")
68 self
.convert_tags_to_repo_digest()
71 self
.log
.debug('refreshing hosts and daemons')
72 self
._refresh
_hosts
_and
_daemons
()
74 self
._check
_for
_strays
()
76 self
._update
_paused
_health
()
78 if self
.mgr
.need_connect_dashboard_rgw
and self
.mgr
.config_dashboard
:
79 self
.mgr
.need_connect_dashboard_rgw
= False
80 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
81 self
.log
.info('Checking dashboard <-> RGW credentials')
82 self
.mgr
.remote('dashboard', 'set_rgw_credentials')
84 if not self
.mgr
.paused
:
85 self
._run
_async
_actions
()
87 self
.mgr
.to_remove_osds
.process_removal_queue()
89 self
.mgr
.migration
.migrate()
90 if self
.mgr
.migration
.is_migration_ongoing():
93 if self
._apply
_all
_services
():
94 continue # did something, refresh
98 self
._check
_certificates
()
100 self
._purge
_deleted
_services
()
102 self
._check
_for
_moved
_osds
()
104 if self
.mgr
.agent_helpers
._handle
_use
_agent
_setting
():
107 if self
.mgr
.upgrade
.continue_upgrade():
110 except OrchestratorError
as e
:
112 self
.mgr
.events
.from_orch_error(e
)
114 self
.log
.debug("serve loop sleep")
116 self
.log
.debug("serve loop wake")
117 self
.log
.debug("serve exit")
119 def _check_certificates(self
) -> None:
120 for d
in self
.mgr
.cache
.get_daemons_by_type('grafana'):
121 cert
= self
.mgr
.get_store(f
'{d.hostname}/grafana_crt')
122 key
= self
.mgr
.get_store(f
'{d.hostname}/grafana_key')
123 if (not cert
or not cert
.strip()) and (not key
or not key
.strip()):
124 # certificate/key are empty... nothing to check
128 get_cert_issuer_info(cert
)
129 verify_tls(cert
, key
)
130 self
.mgr
.remove_health_warning('CEPHADM_CERT_ERROR')
131 except ServerConfigException
as e
:
133 Detected invalid grafana certificates. Please, use the following commands:
135 > ceph config-key set mgr/cephadm/{d.hostname}/grafana_crt -i <path-to-ctr-file>
136 > ceph config-key set mgr/cephadm/{d.hostname}/grafana_key -i <path-to-key-file>
138 to set valid key and certificate or reset their value to an empty string
139 in case you want cephadm to generate self-signed Grafana certificates.
141 Once done, run the following command to reconfig the daemon:
143 > ceph orch daemon reconfig grafana.{d.hostname}
146 self
.log
.error(f
'Detected invalid grafana certificate on host {d.hostname}: {e}')
147 self
.mgr
.set_health_warning('CEPHADM_CERT_ERROR',
148 f
'Invalid grafana certificate on host {d.hostname}: {e}',
152 def _serve_sleep(self
) -> None:
153 sleep_interval
= max(
156 self
.mgr
.host_check_interval
,
157 self
.mgr
.facts_cache_timeout
,
158 self
.mgr
.daemon_cache_timeout
,
159 self
.mgr
.device_cache_timeout
,
162 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
163 self
.mgr
.event
.wait(sleep_interval
)
164 self
.mgr
.event
.clear()
166 def _update_paused_health(self
) -> None:
167 self
.log
.debug('_update_paused_health')
169 self
.mgr
.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
170 "'ceph orch resume' to resume"])
172 self
.mgr
.remove_health_warning('CEPHADM_PAUSED')
174 def _autotune_host_memory(self
, host
: str) -> None:
175 total_mem
= self
.mgr
.cache
.get_facts(host
).get('memory_total_kb', 0)
179 total_mem
*= 1024 # kb -> bytes
180 total_mem
*= self
.mgr
.autotune_memory_target_ratio
182 daemons
=self
.mgr
.cache
.get_daemons_by_host(host
),
183 config_get
=self
.mgr
.get_foreign_ceph_option
,
189 if self
.mgr
.get_foreign_ceph_option(o
, 'osd_memory_target') != val
:
190 self
.mgr
.check_mon_command({
191 'prefix': 'config rm',
193 'name': 'osd_memory_target',
199 f
'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
201 ret
, out
, err
= self
.mgr
.mon_command({
202 'prefix': 'config set',
203 'who': f
'osd/host:{host.split(".")[0]}',
204 'name': 'osd_memory_target',
209 f
'Unable to set osd_memory_target on {host} to {val}: {err}'
212 # if osd memory autotuning is off, we don't want to remove these config
213 # options as users may be using them. Since there is no way to set autotuning
214 # on/off at a host level, best we can do is check if it is globally on.
215 if self
.mgr
.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'):
216 self
.mgr
.check_mon_command({
217 'prefix': 'config rm',
218 'who': f
'osd/host:{host.split(".")[0]}',
219 'name': 'osd_memory_target',
221 self
.mgr
.cache
.update_autotune(host
)
223 def _refresh_hosts_and_daemons(self
) -> None:
224 self
.log
.debug('_refresh_hosts_and_daemons')
227 agents_down
: List
[str] = []
230 def refresh(host
: str) -> None:
232 # skip hosts that are in maintenance - they could be powered off
233 if self
.mgr
.inventory
._inventory
[host
].get("status", "").lower() == "maintenance":
236 if self
.mgr
.use_agent
:
237 if self
.mgr
.agent_helpers
._check
_agent
(host
):
238 agents_down
.append(host
)
240 if self
.mgr
.cache
.host_needs_check(host
):
241 r
= self
._check
_host
(host
)
246 not self
.mgr
.use_agent
247 or host
not in [h
.hostname
for h
in self
.mgr
.cache
.get_non_draining_hosts()]
248 or host
in agents_down
250 if self
.mgr
.cache
.host_needs_daemon_refresh(host
):
251 self
.log
.debug('refreshing %s daemons' % host
)
252 r
= self
._refresh
_host
_daemons
(host
)
256 if self
.mgr
.cache
.host_needs_facts_refresh(host
):
257 self
.log
.debug(('Refreshing %s facts' % host
))
258 r
= self
._refresh
_facts
(host
)
262 if self
.mgr
.cache
.host_needs_network_refresh(host
):
263 self
.log
.debug(('Refreshing %s networks' % host
))
264 r
= self
._refresh
_host
_networks
(host
)
268 if self
.mgr
.cache
.host_needs_device_refresh(host
):
269 self
.log
.debug('refreshing %s devices' % host
)
270 r
= self
._refresh
_host
_devices
(host
)
273 self
.mgr
.cache
.metadata_up_to_date
[host
] = True
274 elif not self
.mgr
.cache
.get_daemons_by_type('agent', host
=host
):
275 if self
.mgr
.cache
.host_needs_daemon_refresh(host
):
276 self
.log
.debug('refreshing %s daemons' % host
)
277 r
= self
._refresh
_host
_daemons
(host
)
280 self
.mgr
.cache
.metadata_up_to_date
[host
] = True
282 if self
.mgr
.cache
.host_needs_registry_login(host
) and self
.mgr
.get_store('registry_credentials'):
283 self
.log
.debug(f
"Logging `{host}` into custom registry")
284 with self
.mgr
.async_timeout_handler(host
, 'cephadm registry-login'):
285 r
= self
.mgr
.wait_async(self
._registry
_login
(
286 host
, json
.loads(str(self
.mgr
.get_store('registry_credentials')))))
290 if self
.mgr
.cache
.host_needs_osdspec_preview_refresh(host
):
291 self
.log
.debug(f
"refreshing OSDSpec previews for {host}")
292 r
= self
._refresh
_host
_osdspec
_previews
(host
)
297 self
.mgr
.cache
.host_needs_autotune_memory(host
)
298 and not self
.mgr
.inventory
.has_label(host
, '_no_autotune_memory')
300 self
.log
.debug(f
"autotuning memory for {host}")
301 self
._autotune
_host
_memory
(host
)
303 refresh(self
.mgr
.cache
.get_hosts())
305 self
._write
_all
_client
_files
()
307 self
.mgr
.agent_helpers
._update
_agent
_down
_healthcheck
(agents_down
)
308 self
.mgr
.http_server
.config_update()
310 self
.mgr
.config_checker
.run_checks()
313 'CEPHADM_HOST_CHECK_FAILED',
314 'CEPHADM_REFRESH_FAILED',
316 self
.mgr
.remove_health_warning(k
)
318 self
.mgr
.set_health_warning(
319 'CEPHADM_HOST_CHECK_FAILED', f
'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts
), bad_hosts
)
321 self
.mgr
.set_health_warning(
322 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures
), failures
)
323 self
.mgr
.update_failed_daemon_health_check()
325 def _check_host(self
, host
: str) -> Optional
[str]:
326 if host
not in self
.mgr
.inventory
:
328 self
.log
.debug(' checking %s' % host
)
330 addr
= self
.mgr
.inventory
.get_addr(host
) if host
in self
.mgr
.inventory
else host
331 with self
.mgr
.async_timeout_handler(host
, 'cephadm check-host'):
332 out
, err
, code
= self
.mgr
.wait_async(self
._run
_cephadm
(
333 host
, cephadmNoImage
, 'check-host', [],
334 error_ok
=True, no_fsid
=True, log_output
=self
.mgr
.log_refresh_metadata
))
335 self
.mgr
.cache
.update_last_host_check(host
)
336 self
.mgr
.cache
.save_host(host
)
338 self
.log
.debug(' host %s (%s) failed check' % (host
, addr
))
339 if self
.mgr
.warn_on_failed_host_check
:
340 return 'host %s (%s) failed check: %s' % (host
, addr
, err
)
342 self
.log
.debug(' host %s (%s) ok' % (host
, addr
))
343 except Exception as e
:
344 self
.log
.debug(' host %s (%s) failed check' % (host
, addr
))
345 return 'host %s (%s) failed check: %s' % (host
, addr
, e
)
348 def _refresh_host_daemons(self
, host
: str) -> Optional
[str]:
350 with self
.mgr
.async_timeout_handler(host
, 'cephadm ls'):
351 ls
= self
.mgr
.wait_async(self
._run
_cephadm
_json
(
352 host
, 'mon', 'ls', [], no_fsid
=True, log_output
=self
.mgr
.log_refresh_metadata
))
353 except OrchestratorError
as e
:
355 self
.mgr
._process
_ls
_output
(host
, ls
)
358 def _refresh_facts(self
, host
: str) -> Optional
[str]:
360 with self
.mgr
.async_timeout_handler(host
, 'cephadm gather-facts'):
361 val
= self
.mgr
.wait_async(self
._run
_cephadm
_json
(
362 host
, cephadmNoImage
, 'gather-facts', [],
363 no_fsid
=True, log_output
=self
.mgr
.log_refresh_metadata
))
364 except OrchestratorError
as e
:
367 self
.mgr
.cache
.update_host_facts(host
, val
)
371 def _refresh_host_devices(self
, host
: str) -> Optional
[str]:
372 with_lsm
= self
.mgr
.device_enhanced_scan
373 inventory_args
= ['--', 'inventory',
374 '--format=json-pretty',
375 '--filter-for-batch']
377 inventory_args
.insert(-1, "--with-lsm")
381 with self
.mgr
.async_timeout_handler(host
, 'cephadm ceph-volume -- inventory'):
382 devices
= self
.mgr
.wait_async(self
._run
_cephadm
_json
(
383 host
, 'osd', 'ceph-volume', inventory_args
, log_output
=self
.mgr
.log_refresh_metadata
))
384 except OrchestratorError
as e
:
385 if 'unrecognized arguments: --filter-for-batch' in str(e
):
386 rerun_args
= inventory_args
.copy()
387 rerun_args
.remove('--filter-for-batch')
388 with self
.mgr
.async_timeout_handler(host
, 'cephadm ceph-volume -- inventory'):
389 devices
= self
.mgr
.wait_async(self
._run
_cephadm
_json
(
390 host
, 'osd', 'ceph-volume', rerun_args
, log_output
=self
.mgr
.log_refresh_metadata
))
394 except OrchestratorError
as e
:
397 self
.log
.debug('Refreshed host %s devices (%d)' % (
399 ret
= inventory
.Devices
.from_json(devices
)
400 self
.mgr
.cache
.update_host_devices(host
, ret
.devices
)
401 self
.update_osdspec_previews(host
)
402 self
.mgr
.cache
.save_host(host
)
405 def _refresh_host_networks(self
, host
: str) -> Optional
[str]:
407 with self
.mgr
.async_timeout_handler(host
, 'cephadm list-networks'):
408 networks
= self
.mgr
.wait_async(self
._run
_cephadm
_json
(
409 host
, 'mon', 'list-networks', [], no_fsid
=True, log_output
=self
.mgr
.log_refresh_metadata
))
410 except OrchestratorError
as e
:
413 self
.log
.debug('Refreshed host %s networks (%s)' % (
414 host
, len(networks
)))
415 self
.mgr
.cache
.update_host_networks(host
, networks
)
416 self
.mgr
.cache
.save_host(host
)
419 def _refresh_host_osdspec_previews(self
, host
: str) -> Optional
[str]:
420 self
.update_osdspec_previews(host
)
421 self
.mgr
.cache
.save_host(host
)
422 self
.log
.debug(f
'Refreshed OSDSpec previews for host <{host}>')
425 def update_osdspec_previews(self
, search_host
: str = '') -> None:
426 # Set global 'pending' flag for host
427 self
.mgr
.cache
.loading_osdspec_preview
.add(search_host
)
429 # query OSDSpecs for host <search host> and generate/get the preview
430 # There can be multiple previews for one host due to multiple OSDSpecs.
431 previews
.extend(self
.mgr
.osd_service
.get_previews(search_host
))
432 self
.log
.debug(f
'Loading OSDSpec previews to HostCache for host <{search_host}>')
433 self
.mgr
.cache
.osdspec_previews
[search_host
] = previews
434 # Unset global 'pending' flag for host
435 self
.mgr
.cache
.loading_osdspec_preview
.remove(search_host
)
437 def _run_async_actions(self
) -> None:
438 while self
.mgr
.scheduled_async_actions
:
439 (self
.mgr
.scheduled_async_actions
.pop(0))()
441 def _check_for_strays(self
) -> None:
442 self
.log
.debug('_check_for_strays')
443 for k
in ['CEPHADM_STRAY_HOST',
444 'CEPHADM_STRAY_DAEMON']:
445 self
.mgr
.remove_health_warning(k
)
446 if self
.mgr
.warn_on_stray_hosts
or self
.mgr
.warn_on_stray_daemons
:
447 ls
= self
.mgr
.list_servers()
449 managed
= self
.mgr
.cache
.get_daemon_names()
450 host_detail
= [] # type: List[str]
452 daemon_detail
= [] # type: List[str]
454 host
= item
.get('hostname')
455 assert isinstance(host
, str)
456 daemons
= item
.get('services') # misnomer!
457 assert isinstance(daemons
, list)
460 daemon_id
= s
.get('id')
462 name
= '%s.%s' % (s
.get('type'), daemon_id
)
463 if s
.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
464 metadata
= self
.mgr
.get_metadata(
465 cast(str, s
.get('type')), daemon_id
, {})
466 assert metadata
is not None
468 if s
.get('type') == 'rgw-nfs':
469 # https://tracker.ceph.com/issues/49573
470 name
= metadata
['id'][:-4]
472 name
= '%s.%s' % (s
.get('type'), metadata
['id'])
473 except (KeyError, TypeError):
475 "Failed to find daemon id for %s service %s" % (
476 s
.get('type'), s
.get('id')
479 if s
.get('type') == 'tcmu-runner':
480 # because we don't track tcmu-runner daemons in the host cache
481 # and don't have a way to check if the daemon is part of iscsi service
482 # we assume that all tcmu-runner daemons are managed by cephadm
484 if host
not in self
.mgr
.inventory
:
485 missing_names
.append(name
)
486 host_num_daemons
+= 1
487 if name
not in managed
:
488 daemon_detail
.append(
489 'stray daemon %s on host %s not managed by cephadm' % (name
, host
))
492 'stray host %s has %d stray daemons: %s' % (
493 host
, len(missing_names
), missing_names
))
494 if self
.mgr
.warn_on_stray_hosts
and host_detail
:
495 self
.mgr
.set_health_warning(
496 'CEPHADM_STRAY_HOST', f
'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail
), host_detail
)
497 if self
.mgr
.warn_on_stray_daemons
and daemon_detail
:
498 self
.mgr
.set_health_warning(
499 'CEPHADM_STRAY_DAEMON', f
'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail
), daemon_detail
)
501 def _check_for_moved_osds(self
) -> None:
502 self
.log
.debug('_check_for_moved_osds')
503 all_osds
: DefaultDict
[int, List
[orchestrator
.DaemonDescription
]] = defaultdict(list)
504 for dd
in self
.mgr
.cache
.get_daemons_by_type('osd'):
506 all_osds
[int(dd
.daemon_id
)].append(dd
)
507 for osd_id
, dds
in all_osds
.items():
510 running
= [dd
for dd
in dds
if dd
.status
== DaemonDescriptionStatus
.running
]
511 error
= [dd
for dd
in dds
if dd
.status
== DaemonDescriptionStatus
.error
]
512 msg
= f
'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
514 if len(running
) != 1:
516 osd
= self
.mgr
.get_osd_by_id(osd_id
)
517 if not osd
or not osd
['up']:
522 self
._remove
_daemon
(e
.name(), e
.hostname
, no_post_remove
=True)
523 self
.mgr
.events
.for_daemon(
524 e
.name(), 'INFO', f
"Removed duplicated daemon on host '{e.hostname}'")
525 except OrchestratorError
as ex
:
526 self
.mgr
.events
.from_orch_error(ex
)
527 logger
.exception(f
'failed to remove duplicated daemon {e}')
529 def _apply_all_services(self
) -> bool:
530 self
.log
.debug('_apply_all_services')
532 specs
= [] # type: List[ServiceSpec]
533 # if metadata is not up to date, we still need to apply spec for agent
534 # since the agent is the one who gather the metadata. If we don't we
535 # end up stuck between wanting metadata to be up to date to apply specs
536 # and needing to apply the agent spec to get up to date metadata
537 if self
.mgr
.use_agent
and not self
.mgr
.cache
.all_host_metadata_up_to_date():
538 self
.log
.info('Metadata not up to date on all hosts. Skipping non agent specs')
540 specs
.append(self
.mgr
.spec_store
['agent'].spec
)
541 except Exception as e
:
542 self
.log
.debug(f
'Failed to find agent spec: {e}')
543 self
.mgr
.agent_helpers
._apply
_agent
()
546 _specs
: List
[ServiceSpec
] = []
547 for sn
, spec
in self
.mgr
.spec_store
.active_specs
.items():
549 # apply specs that don't use count first sice their placement is deterministic
550 # and not dependant on other daemon's placements in any way
551 specs
= [s
for s
in _specs
if not s
.placement
.count
] + [s
for s
in _specs
if s
.placement
.count
]
553 for name
in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
554 self
.mgr
.remove_health_warning(name
)
555 self
.mgr
.apply_spec_fails
= []
558 if self
._apply
_service
(spec
):
560 except Exception as e
:
561 msg
= f
'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
562 self
.log
.exception(msg
)
563 self
.mgr
.events
.for_service(spec
, 'ERROR', 'Failed to apply: ' + str(e
))
564 self
.mgr
.apply_spec_fails
.append((spec
.service_name(), str(e
)))
566 for x
in self
.mgr
.apply_spec_fails
:
567 warnings
.append(f
'{x[0]}: {x[1]}')
568 self
.mgr
.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
569 f
"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
570 len(self
.mgr
.apply_spec_fails
),
572 self
.mgr
.update_watched_hosts()
573 self
.mgr
.tuned_profile_utils
._write
_all
_tuned
_profiles
()
576 def _apply_service_config(self
, spec
: ServiceSpec
) -> None:
578 section
= utils
.name_to_config_section(spec
.service_name())
579 for name
in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
580 self
.mgr
.remove_health_warning(name
)
581 invalid_config_options
= []
582 options_failed_to_set
= []
583 for k
, v
in spec
.config
.items():
585 current
= self
.mgr
.get_foreign_ceph_option(section
, k
)
587 msg
= f
'Ignoring invalid {spec.service_name()} config option {k}'
588 self
.log
.warning(msg
)
589 self
.mgr
.events
.for_service(
590 spec
, OrchestratorEvent
.ERROR
, f
'Invalid config option {k}'
592 invalid_config_options
.append(msg
)
595 self
.log
.debug(f
'setting [{section}] {k} = {v}')
597 self
.mgr
.check_mon_command({
598 'prefix': 'config set',
603 except MonCommandFailed
as e
:
604 msg
= f
'Failed to set {spec.service_name()} option {k}: {e}'
605 self
.log
.warning(msg
)
606 options_failed_to_set
.append(msg
)
608 if invalid_config_options
:
609 self
.mgr
.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f
'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
610 invalid_config_options
), invalid_config_options
)
611 if options_failed_to_set
:
612 self
.mgr
.set_health_warning('CEPHADM_FAILED_SET_OPTION', f
'Failed to set {len(options_failed_to_set)} option(s)', len(
613 options_failed_to_set
), options_failed_to_set
)
615 def _update_rgw_endpoints(self
, rgw_spec
: RGWSpec
) -> None:
617 if not rgw_spec
.update_endpoints
or rgw_spec
.rgw_realm_token
is None:
621 protocol
= 'https' if rgw_spec
.ssl
else 'http'
622 for s
in self
.mgr
.cache
.get_daemons_by_service(rgw_spec
.service_name()):
625 ep
.append(f
'{protocol}://{s.hostname}:{p}')
627 'prefix': 'rgw zone modify',
628 'realm_name': rgw_spec
.rgw_realm
,
629 'zonegroup_name': rgw_spec
.rgw_zonegroup
,
630 'zone_name': rgw_spec
.rgw_zone
,
631 'realm_token': rgw_spec
.rgw_realm_token
,
632 'zone_endpoints': ep
,
634 self
.log
.debug(f
'rgw cmd: {zone_update_cmd}')
635 rc
, out
, err
= self
.mgr
.mon_command(zone_update_cmd
)
636 rgw_spec
.update_endpoints
= (rc
!= 0) # keep trying on failure
638 self
.log
.error(f
'Error when trying to update rgw zone: {err}')
639 self
.mgr
.set_health_warning('CEPHADM_RGW', 'Cannot update rgw endpoints, error: {err}', 1,
640 [f
'Cannot update rgw endpoints for daemon {rgw_spec.service_name()}, error: {err}'])
642 self
.mgr
.remove_health_warning('CEPHADM_RGW')
644 def _apply_service(self
, spec
: ServiceSpec
) -> bool:
646 Schedule a service. Deploy new daemons or remove old ones, depending
647 on the target label and count specified in the placement.
649 self
.mgr
.migration
.verify_no_migration()
651 service_type
= spec
.service_type
652 service_name
= spec
.service_name()
654 self
.log
.debug('Skipping unmanaged service %s' % service_name
)
656 if spec
.preview_only
:
657 self
.log
.debug('Skipping preview_only service %s' % service_name
)
659 self
.log
.debug('Applying service %s spec' % service_name
)
661 if service_type
== 'agent':
663 assert self
.mgr
.http_server
.agent
664 assert self
.mgr
.http_server
.agent
.ssl_certs
.get_root_cert()
667 'Delaying applying agent spec until cephadm endpoint root cert created')
670 self
._apply
_service
_config
(spec
)
672 if service_type
== 'osd':
673 self
.mgr
.osd_service
.create_from_spec(cast(DriveGroupSpec
, spec
))
674 # TODO: return True would result in a busy loop
675 # can't know if daemon count changed; create_from_spec doesn't
676 # return a solid indication
679 svc
= self
.mgr
.cephadm_services
[service_type
]
680 daemons
= self
.mgr
.cache
.get_daemons_by_service(service_name
)
681 related_service_daemons
= self
.mgr
.cache
.get_related_service_daemons(spec
)
683 public_networks
: List
[str] = []
684 if service_type
== 'mon':
685 out
= str(self
.mgr
.get_foreign_ceph_option('mon', 'public_network'))
687 public_networks
= [x
.strip() for x
in out
.split(',')]
688 self
.log
.debug('mon public_network(s) is %s' % public_networks
)
690 def matches_network(host
):
691 # type: (str) -> bool
692 # make sure the host has at least one network that belongs to some configured public network(s)
693 for pn
in public_networks
:
694 public_network
= ipaddress
.ip_network(pn
)
695 for hn
in self
.mgr
.cache
.networks
[host
]:
696 host_network
= ipaddress
.ip_network(hn
)
697 if host_network
.overlaps(public_network
):
700 host_networks
= ','.join(self
.mgr
.cache
.networks
[host
])
701 pub_networks
= ','.join(public_networks
)
703 f
"Filtered out host {host}: does not belong to mon public_network(s): "
704 f
" {pub_networks}, host network(s): {host_networks}"
710 rank_map
= self
.mgr
.spec_store
[spec
.service_name()].rank_map
or {}
713 hosts
=self
.mgr
.cache
.get_non_draining_hosts() if spec
.service_name(
714 ) == 'agent' else self
.mgr
.cache
.get_schedulable_hosts(),
715 unreachable_hosts
=self
.mgr
.cache
.get_unreachable_hosts(),
716 draining_hosts
=self
.mgr
.cache
.get_draining_hosts(),
718 related_service_daemons
=related_service_daemons
,
719 networks
=self
.mgr
.cache
.networks
,
721 matches_network
if service_type
== 'mon'
724 allow_colo
=svc
.allow_colo(),
725 primary_daemon_type
=svc
.primary_daemon_type(spec
),
726 per_host_daemon_type
=svc
.per_host_daemon_type(spec
),
731 all_slots
, slots_to_add
, daemons_to_remove
= ha
.place()
732 daemons_to_remove
= [d
for d
in daemons_to_remove
if (d
.hostname
and self
.mgr
.inventory
._inventory
[d
.hostname
].get(
733 'status', '').lower() not in ['maintenance', 'offline'] and d
.hostname
not in self
.mgr
.offline_hosts
)]
734 self
.log
.debug('Add %s, remove %s' % (slots_to_add
, daemons_to_remove
))
735 except OrchestratorError
as e
:
736 msg
= f
'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
738 self
.mgr
.events
.for_service(spec
, 'ERROR', 'Failed to apply: ' + str(e
))
739 self
.mgr
.apply_spec_fails
.append((spec
.service_name(), str(e
)))
741 for x
in self
.mgr
.apply_spec_fails
:
742 warnings
.append(f
'{x[0]}: {x[1]}')
743 self
.mgr
.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
744 f
"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
745 len(self
.mgr
.apply_spec_fails
),
752 final_count
= len(daemons
) + len(slots_to_add
) - len(daemons_to_remove
)
753 if service_type
in ['mon', 'mgr'] and final_count
< 1:
754 self
.log
.debug('cannot scale mon|mgr below 1)')
758 progress_id
= str(uuid
.uuid4())
759 delta
: List
[str] = []
761 delta
+= [f
'+{len(slots_to_add)}']
762 if daemons_to_remove
:
763 delta
+= [f
'-{len(daemons_to_remove)}']
764 progress_title
= f
'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
765 progress_total
= len(slots_to_add
) + len(daemons_to_remove
)
768 def update_progress() -> None:
770 'progress', 'update', progress_id
,
771 ev_msg
=progress_title
,
772 ev_progress
=(progress_done
/ progress_total
),
779 self
.log
.debug('Hosts that will receive new daemons: %s' % slots_to_add
)
780 self
.log
.debug('Daemons that will be removed: %s' % daemons_to_remove
)
782 hosts_altered
: Set
[str] = set()
786 for i
in range(len(slots_to_add
)):
787 slot
= slots_to_add
[i
]
788 slot
= slot
.assign_name(self
.mgr
.get_unique_name(
791 [d
for d
in daemons
if d
not in daemons_to_remove
],
792 prefix
=spec
.service_id
,
795 rank_generation
=slot
.rank_generation
,
797 slots_to_add
[i
] = slot
798 if rank_map
is not None:
799 assert slot
.rank
is not None
800 assert slot
.rank_generation
is not None
801 assert rank_map
[slot
.rank
][slot
.rank_generation
] is None
802 rank_map
[slot
.rank
][slot
.rank_generation
] = slot
.name
805 # record the rank_map before we make changes so that if we fail the
806 # next mgr will clean up.
807 self
.mgr
.spec_store
.save_rank_map(spec
.service_name(), rank_map
)
809 # remove daemons now, since we are going to fence them anyway
810 for d
in daemons_to_remove
:
811 assert d
.hostname
is not None
812 self
._remove
_daemon
(d
.name(), d
.hostname
)
813 daemons_to_remove
= []
816 svc
.fence_old_ranks(spec
, rank_map
, len(all_slots
))
819 daemon_place_fails
= []
820 for slot
in slots_to_add
:
821 # first remove daemon with conflicting port or name?
822 if slot
.ports
or slot
.name
in [d
.name() for d
in daemons_to_remove
]:
823 for d
in daemons_to_remove
:
825 d
.hostname
!= slot
.hostname
826 or not (set(d
.ports
or []) & set(slot
.ports
))
827 or (d
.ip
and slot
.ip
and d
.ip
!= slot
.ip
)
828 and d
.name() != slot
.name
831 if d
.name() != slot
.name
:
833 f
'Removing {d.name()} before deploying to {slot} to avoid a port or conflict'
835 # NOTE: we don't check ok-to-stop here to avoid starvation if
836 # there is only 1 gateway.
837 self
._remove
_daemon
(d
.name(), d
.hostname
)
838 daemons_to_remove
.remove(d
)
840 hosts_altered
.add(d
.hostname
)
844 daemon_id
= slot
.name
846 daemon_spec
= svc
.make_daemon_spec(
847 slot
.hostname
, daemon_id
, slot
.network
, spec
,
848 daemon_type
=slot
.daemon_type
,
852 rank_generation
=slot
.rank_generation
,
854 self
.log
.debug('Placing %s.%s on host %s' % (
855 slot
.daemon_type
, daemon_id
, slot
.hostname
))
858 daemon_spec
= svc
.prepare_create(daemon_spec
)
859 with self
.mgr
.async_timeout_handler(slot
.hostname
, f
'cephadm deploy ({daemon_spec.daemon_type} type dameon)'):
860 self
.mgr
.wait_async(self
._create
_daemon
(daemon_spec
))
864 hosts_altered
.add(daemon_spec
.host
)
865 self
.mgr
.spec_store
.mark_needs_configuration(spec
.service_name())
866 except (RuntimeError, OrchestratorError
) as e
:
867 msg
= (f
"Failed while placing {slot.daemon_type}.{daemon_id} "
868 f
"on {slot.hostname}: {e}")
869 self
.mgr
.events
.for_service(spec
, 'ERROR', msg
)
870 self
.mgr
.log
.error(msg
)
871 daemon_place_fails
.append(msg
)
872 # only return "no change" if no one else has already succeeded.
873 # later successes will also change to True
880 # add to daemon list so next name(s) will also be unique
881 sd
= orchestrator
.DaemonDescription(
882 hostname
=slot
.hostname
,
883 daemon_type
=slot
.daemon_type
,
885 service_name
=spec
.service_name()
888 self
.mgr
.cache
.append_tmp_daemon(slot
.hostname
, sd
)
890 if daemon_place_fails
:
891 self
.mgr
.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f
'Failed to place {len(daemon_place_fails)} daemon(s)', len(
892 daemon_place_fails
), daemon_place_fails
)
894 if service_type
== 'mgr':
895 active_mgr
= svc
.get_active_daemon(self
.mgr
.cache
.get_daemons_by_type('mgr'))
896 if active_mgr
.daemon_id
in [d
.daemon_id
for d
in daemons_to_remove
]:
897 # We can't just remove the active mgr like any other daemon.
898 # Need to fail over later so it can be removed on next pass.
899 # This can be accomplished by scheduling a restart of the active mgr.
900 self
.mgr
._schedule
_daemon
_action
(active_mgr
.name(), 'restart')
902 if service_type
== 'rgw':
903 self
._update
_rgw
_endpoints
(cast(RGWSpec
, spec
))
906 def _ok_to_stop(remove_daemons
: List
[orchestrator
.DaemonDescription
]) -> bool:
907 daemon_ids
= [d
.daemon_id
for d
in remove_daemons
]
908 assert None not in daemon_ids
909 # setting force flag retains previous behavior
910 r
= svc
.ok_to_stop(cast(List
[str], daemon_ids
), force
=True)
913 while daemons_to_remove
and not _ok_to_stop(daemons_to_remove
):
914 # let's find a subset that is ok-to-stop
915 daemons_to_remove
.pop()
916 for d
in daemons_to_remove
:
918 assert d
.hostname
is not None
919 self
._remove
_daemon
(d
.name(), d
.hostname
)
923 hosts_altered
.add(d
.hostname
)
924 self
.mgr
.spec_store
.mark_needs_configuration(spec
.service_name())
926 self
.mgr
.remote('progress', 'complete', progress_id
)
927 except Exception as e
:
928 self
.mgr
.remote('progress', 'fail', progress_id
, str(e
))
931 if self
.mgr
.spec_store
.needs_configuration(spec
.service_name()):
933 self
.mgr
.spec_store
.mark_configured(spec
.service_name())
934 if self
.mgr
.use_agent
:
935 # can only send ack to agents if we know for sure port they bound to
936 hosts_altered
= set([h
for h
in hosts_altered
if (h
in self
.mgr
.agent_cache
.agent_ports
and h
in [
937 h2
.hostname
for h2
in self
.mgr
.cache
.get_non_draining_hosts()])])
938 self
.mgr
.agent_helpers
._request
_agent
_acks
(hosts_altered
, increment
=True)
944 def _check_daemons(self
) -> None:
945 self
.log
.debug('_check_daemons')
946 daemons
= self
.mgr
.cache
.get_daemons()
947 daemons_post
: Dict
[str, List
[orchestrator
.DaemonDescription
]] = defaultdict(list)
950 spec
= self
.mgr
.spec_store
.active_specs
.get(dd
.service_name(), None)
951 assert dd
.hostname
is not None
952 assert dd
.daemon_type
is not None
953 assert dd
.daemon_id
is not None
955 # any action we can try will fail for a daemon on an offline host,
956 # including removing the daemon
957 if dd
.hostname
in self
.mgr
.offline_hosts
:
960 if not spec
and dd
.daemon_type
not in ['mon', 'mgr', 'osd']:
961 # (mon and mgr specs should always exist; osds aren't matched
963 self
.log
.info('Removing orphan daemon %s...' % dd
.name())
964 self
._remove
_daemon
(dd
.name(), dd
.hostname
)
966 # ignore unmanaged services
967 if spec
and spec
.unmanaged
:
970 # ignore daemons for deleted services
971 if dd
.service_name() in self
.mgr
.spec_store
.spec_deleted
:
974 if dd
.daemon_type
== 'agent':
976 self
.mgr
.agent_helpers
._check
_agent
(dd
.hostname
)
977 except Exception as e
:
979 f
'Agent {dd.name()} could not be checked in _check_daemons: {e}')
982 # These daemon types require additional configs after creation
983 if dd
.daemon_type
in REQUIRES_POST_ACTIONS
:
984 daemons_post
[dd
.daemon_type
].append(dd
)
986 if self
.mgr
.cephadm_services
[daemon_type_to_service(dd
.daemon_type
)].get_active_daemon(
987 self
.mgr
.cache
.get_daemons_by_service(dd
.service_name())).daemon_id
== dd
.daemon_id
:
992 deps
= self
.mgr
._calc
_daemon
_deps
(spec
, dd
.daemon_type
, dd
.daemon_id
)
993 last_deps
, last_config
= self
.mgr
.cache
.get_daemon_last_config_deps(
994 dd
.hostname
, dd
.name())
995 if last_deps
is None:
997 action
= self
.mgr
.cache
.get_scheduled_daemon_action(dd
.hostname
, dd
.name())
999 self
.log
.info('Reconfiguring %s (unknown last config time)...' % (
1002 elif last_deps
!= deps
:
1003 self
.log
.debug(f
'{dd.name()} deps {last_deps} -> {deps}')
1004 self
.log
.info(f
'Reconfiguring {dd.name()} (dependencies changed)...')
1006 # we need only redeploy if secure_monitoring_stack value has changed:
1007 if dd
.daemon_type
in ['prometheus', 'node-exporter', 'alertmanager']:
1008 diff
= list(set(last_deps
) - set(deps
))
1009 if any('secure_monitoring_stack' in e
for e
in diff
):
1012 elif spec
is not None and hasattr(spec
, 'extra_container_args') and dd
.extra_container_args
!= spec
.extra_container_args
:
1014 f
'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
1015 self
.log
.info(f
'Redeploying {dd.name()}, (container cli args changed) . . .')
1016 dd
.extra_container_args
= spec
.extra_container_args
1018 elif spec
is not None and hasattr(spec
, 'extra_entrypoint_args') and dd
.extra_entrypoint_args
!= spec
.extra_entrypoint_args
:
1019 self
.log
.info(f
'Redeploying {dd.name()}, (entrypoint args changed) . . .')
1021 f
'{dd.name()} daemon entrypoint args {dd.extra_entrypoint_args} -> {spec.extra_entrypoint_args}')
1022 dd
.extra_entrypoint_args
= spec
.extra_entrypoint_args
1024 elif self
.mgr
.last_monmap
and \
1025 self
.mgr
.last_monmap
> last_config
and \
1026 dd
.daemon_type
in CEPH_TYPES
:
1027 self
.log
.info('Reconfiguring %s (monmap changed)...' % dd
.name())
1029 elif self
.mgr
.extra_ceph_conf_is_newer(last_config
) and \
1030 dd
.daemon_type
in CEPH_TYPES
:
1031 self
.log
.info('Reconfiguring %s (extra config changed)...' % dd
.name())
1034 if self
.mgr
.cache
.get_scheduled_daemon_action(dd
.hostname
, dd
.name()) == 'redeploy' \
1035 and action
== 'reconfig':
1038 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(dd
)
1039 self
.mgr
._daemon
_action
(daemon_spec
, action
=action
)
1040 if self
.mgr
.cache
.rm_scheduled_daemon_action(dd
.hostname
, dd
.name()):
1041 self
.mgr
.cache
.save_host(dd
.hostname
)
1042 except OrchestratorError
as e
:
1043 self
.log
.exception(e
)
1044 self
.mgr
.events
.from_orch_error(e
)
1045 if dd
.daemon_type
in daemons_post
:
1046 del daemons_post
[dd
.daemon_type
]
1048 except Exception as e
:
1049 self
.log
.exception(e
)
1050 self
.mgr
.events
.for_daemon_from_exception(dd
.name(), e
)
1051 if dd
.daemon_type
in daemons_post
:
1052 del daemons_post
[dd
.daemon_type
]
1055 # do daemon post actions
1056 for daemon_type
, daemon_descs
in daemons_post
.items():
1058 for d
in daemon_descs
:
1059 if d
.name() in self
.mgr
.requires_post_actions
:
1060 self
.mgr
.requires_post_actions
.remove(d
.name())
1063 self
.mgr
._get
_cephadm
_service
(daemon_type_to_service(
1064 daemon_type
)).daemon_check_post(daemon_descs
)
1066 def _purge_deleted_services(self
) -> None:
1067 self
.log
.debug('_purge_deleted_services')
1068 existing_services
= self
.mgr
.spec_store
.all_specs
.items()
1069 for service_name
, spec
in list(existing_services
):
1070 if service_name
not in self
.mgr
.spec_store
.spec_deleted
:
1072 if self
.mgr
.cache
.get_daemons_by_service(service_name
):
1074 if spec
.service_type
in ['mon', 'mgr']:
1077 logger
.info(f
'Purge service {service_name}')
1079 self
.mgr
.cephadm_services
[spec
.service_type
].purge(service_name
)
1080 self
.mgr
.spec_store
.finally_rm(service_name
)
1082 def convert_tags_to_repo_digest(self
) -> None:
1083 if not self
.mgr
.use_repo_digest
:
1085 settings
= self
.mgr
.upgrade
.get_distinct_container_image_settings()
1086 digests
: Dict
[str, ContainerInspectInfo
] = {}
1087 for container_image_ref
in set(settings
.values()):
1088 if not is_repo_digest(container_image_ref
):
1089 with self
.mgr
.async_timeout_handler(cmd
=f
'cephadm inspect-image (image {container_image_ref})'):
1090 image_info
= self
.mgr
.wait_async(
1091 self
._get
_container
_image
_info
(container_image_ref
))
1092 if image_info
.repo_digests
:
1093 # FIXME: we assume the first digest here is the best
1094 assert is_repo_digest(image_info
.repo_digests
[0]), image_info
1095 digests
[container_image_ref
] = image_info
1097 for entity
, container_image_ref
in settings
.items():
1098 if not is_repo_digest(container_image_ref
):
1099 image_info
= digests
[container_image_ref
]
1100 if image_info
.repo_digests
:
1101 # FIXME: we assume the first digest here is the best
1102 self
.mgr
.set_container_image(entity
, image_info
.repo_digests
[0])
1104 def _calc_client_files(self
) -> Dict
[str, Dict
[str, Tuple
[int, int, int, bytes
, str]]]:
1105 # host -> path -> (mode, uid, gid, content, digest)
1106 client_files
: Dict
[str, Dict
[str, Tuple
[int, int, int, bytes
, str]]] = {}
1109 config
= self
.mgr
.get_minimal_ceph_conf().encode('utf-8')
1110 config_digest
= ''.join('%02x' % c
for c
in hashlib
.sha256(config
).digest())
1111 cluster_cfg_dir
= f
'/var/lib/ceph/{self.mgr._cluster_fsid}/config'
1113 if self
.mgr
.manage_etc_ceph_ceph_conf
:
1115 pspec
= PlacementSpec
.from_string(self
.mgr
.manage_etc_ceph_ceph_conf_hosts
)
1116 ha
= HostAssignment(
1117 spec
=ServiceSpec('mon', placement
=pspec
),
1118 hosts
=self
.mgr
.cache
.get_schedulable_hosts(),
1119 unreachable_hosts
=self
.mgr
.cache
.get_unreachable_hosts(),
1120 draining_hosts
=self
.mgr
.cache
.get_draining_hosts(),
1122 networks
=self
.mgr
.cache
.networks
,
1124 all_slots
, _
, _
= ha
.place()
1125 for host
in {s
.hostname
for s
in all_slots
}:
1126 if host
not in client_files
:
1127 client_files
[host
] = {}
1128 ceph_conf
= (0o644, 0, 0, bytes(config
), str(config_digest
))
1129 client_files
[host
]['/etc/ceph/ceph.conf'] = ceph_conf
1130 client_files
[host
][f
'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1131 except Exception as e
:
1132 self
.mgr
.log
.warning(
1133 f
'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
1136 for ks
in self
.mgr
.keys
.keys
.values():
1138 ret
, keyring
, err
= self
.mgr
.mon_command({
1139 'prefix': 'auth get',
1140 'entity': ks
.entity
,
1143 self
.log
.warning(f
'unable to fetch keyring for {ks.entity}')
1145 digest
= ''.join('%02x' % c
for c
in hashlib
.sha256(
1146 keyring
.encode('utf-8')).digest())
1147 ha
= HostAssignment(
1148 spec
=ServiceSpec('mon', placement
=ks
.placement
),
1149 hosts
=self
.mgr
.cache
.get_schedulable_hosts(),
1150 unreachable_hosts
=self
.mgr
.cache
.get_unreachable_hosts(),
1151 draining_hosts
=self
.mgr
.cache
.get_draining_hosts(),
1153 networks
=self
.mgr
.cache
.networks
,
1155 all_slots
, _
, _
= ha
.place()
1156 for host
in {s
.hostname
for s
in all_slots
}:
1157 if host
not in client_files
:
1158 client_files
[host
] = {}
1159 ceph_conf
= (0o644, 0, 0, bytes(config
), str(config_digest
))
1160 client_files
[host
]['/etc/ceph/ceph.conf'] = ceph_conf
1161 client_files
[host
][f
'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1162 ceph_admin_key
= (ks
.mode
, ks
.uid
, ks
.gid
, keyring
.encode('utf-8'), digest
)
1163 client_files
[host
][ks
.path
] = ceph_admin_key
1164 client_files
[host
][f
'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key
1165 except Exception as e
:
1167 f
'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
1170 def _write_all_client_files(self
) -> None:
1171 if self
.mgr
.manage_etc_ceph_ceph_conf
or self
.mgr
.keys
.keys
:
1172 client_files
= self
._calc
_client
_files
()
1177 def _write_files(host
: str) -> None:
1178 self
._write
_client
_files
(client_files
, host
)
1180 _write_files(self
.mgr
.cache
.get_hosts())
1182 def _write_client_files(self
,
1183 client_files
: Dict
[str, Dict
[str, Tuple
[int, int, int, bytes
, str]]],
1185 updated_files
= False
1186 if host
in self
.mgr
.offline_hosts
:
1188 old_files
= self
.mgr
.cache
.get_host_client_files(host
).copy()
1189 for path
, m
in client_files
.get(host
, {}).items():
1190 mode
, uid
, gid
, content
, digest
= m
1191 if path
in old_files
:
1192 match
= old_files
[path
] == (digest
, mode
, uid
, gid
)
1196 self
.log
.info(f
'Updating {host}:{path}')
1197 self
.mgr
.ssh
.write_remote_file(host
, path
, content
, mode
, uid
, gid
)
1198 self
.mgr
.cache
.update_client_file(host
, path
, digest
, mode
, uid
, gid
)
1199 updated_files
= True
1200 for path
in old_files
.keys():
1201 if path
== '/etc/ceph/ceph.conf':
1203 self
.log
.info(f
'Removing {host}:{path}')
1204 cmd
= ['rm', '-f', path
]
1205 self
.mgr
.ssh
.check_execute_command(host
, cmd
)
1206 updated_files
= True
1207 self
.mgr
.cache
.removed_client_file(host
, path
)
1209 self
.mgr
.cache
.save_host(host
)
1211 async def _create_daemon(self
,
1212 daemon_spec
: CephadmDaemonDeploySpec
,
1213 reconfig
: bool = False,
1214 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1217 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1218 daemon_type
=daemon_spec
.daemon_type
,
1219 daemon_id
=daemon_spec
.daemon_id
,
1220 hostname
=daemon_spec
.host
,
1221 ).service_id(), overwrite
=True):
1225 start_time
= datetime_now()
1226 ports
: List
[int] = daemon_spec
.ports
if daemon_spec
.ports
else []
1228 if daemon_spec
.daemon_type
== 'container':
1229 spec
= cast(CustomContainerSpec
,
1230 self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
1233 ports
.extend(spec
.ports
)
1235 # TCP port to open in the host firewall
1237 daemon_spec
.extra_args
.extend([
1238 '--tcp-ports', ' '.join(map(str, ports
))
1241 # osd deployments needs an --osd-uuid arg
1242 if daemon_spec
.daemon_type
== 'osd':
1243 if not osd_uuid_map
:
1244 osd_uuid_map
= self
.mgr
.get_osd_uuid_map()
1245 osd_uuid
= osd_uuid_map
.get(daemon_spec
.daemon_id
)
1247 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec
.daemon_id
)
1248 daemon_spec
.extra_args
.extend(['--osd-fsid', osd_uuid
])
1251 daemon_spec
.extra_args
.append('--reconfig')
1252 if self
.mgr
.allow_ptrace
:
1253 daemon_spec
.extra_args
.append('--allow-ptrace')
1255 daemon_spec
, extra_container_args
, extra_entrypoint_args
= self
._setup
_extra
_deployment
_args
(daemon_spec
)
1257 if daemon_spec
.service_name
in self
.mgr
.spec_store
:
1258 configs
= self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
.custom_configs
1259 if configs
is not None:
1260 daemon_spec
.final_config
.update(
1261 {'custom_config_files': [c
.to_json() for c
in configs
]})
1263 if self
.mgr
.cache
.host_needs_registry_login(daemon_spec
.host
) and self
.mgr
.registry_url
:
1264 await self
._registry
_login
(daemon_spec
.host
, json
.loads(str(self
.mgr
.get_store('registry_credentials'))))
1266 self
.log
.info('%s daemon %s on %s' % (
1267 'Reconfiguring' if reconfig
else 'Deploying',
1268 daemon_spec
.name(), daemon_spec
.host
))
1270 out
, err
, code
= await self
._run
_cephadm
(
1271 daemon_spec
.host
, daemon_spec
.name(), 'deploy',
1273 '--name', daemon_spec
.name(),
1274 '--meta-json', json
.dumps({
1275 'service_name': daemon_spec
.service_name
,
1276 'ports': daemon_spec
.ports
,
1277 'ip': daemon_spec
.ip
,
1278 'deployed_by': self
.mgr
.get_active_mgr_digests(),
1279 'rank': daemon_spec
.rank
,
1280 'rank_generation': daemon_spec
.rank_generation
,
1281 'extra_container_args': extra_container_args
,
1282 'extra_entrypoint_args': extra_entrypoint_args
1284 '--config-json', '-',
1285 ] + daemon_spec
.extra_args
,
1286 stdin
=json
.dumps(daemon_spec
.final_config
),
1290 if daemon_spec
.daemon_type
== 'agent':
1291 self
.mgr
.agent_cache
.agent_timestamp
[daemon_spec
.host
] = datetime_now()
1292 self
.mgr
.agent_cache
.agent_counter
[daemon_spec
.host
] = 1
1294 # refresh daemon state? (ceph daemon reconfig does not need it)
1295 if not reconfig
or daemon_spec
.daemon_type
not in CEPH_TYPES
:
1296 if not code
and daemon_spec
.host
in self
.mgr
.cache
.daemons
:
1297 # prime cached service state with what we (should have)
1299 sd
= daemon_spec
.to_daemon_description(
1300 DaemonDescriptionStatus
.starting
, 'starting')
1301 self
.mgr
.cache
.add_daemon(daemon_spec
.host
, sd
)
1302 if daemon_spec
.daemon_type
in REQUIRES_POST_ACTIONS
:
1303 self
.mgr
.requires_post_actions
.add(daemon_spec
.name())
1304 self
.mgr
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1306 if daemon_spec
.daemon_type
!= 'agent':
1307 self
.mgr
.cache
.update_daemon_config_deps(
1308 daemon_spec
.host
, daemon_spec
.name(), daemon_spec
.deps
, start_time
)
1309 self
.mgr
.cache
.save_host(daemon_spec
.host
)
1311 self
.mgr
.agent_cache
.update_agent_config_deps(
1312 daemon_spec
.host
, daemon_spec
.deps
, start_time
)
1313 self
.mgr
.agent_cache
.save_agent(daemon_spec
.host
)
1314 msg
= "{} {} on host '{}'".format(
1315 'Reconfigured' if reconfig
else 'Deployed', daemon_spec
.name(), daemon_spec
.host
)
1317 self
.mgr
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.INFO
, msg
)
1319 what
= 'reconfigure' if reconfig
else 'deploy'
1320 self
.mgr
.events
.for_daemon(
1321 daemon_spec
.name(), OrchestratorEvent
.ERROR
, f
'Failed to {what}: {err}')
1323 except OrchestratorError
:
1324 redeploy
= daemon_spec
.name() in self
.mgr
.cache
.get_daemon_names()
1325 if not reconfig
and not redeploy
:
1326 # we have to clean up the daemon. E.g. keyrings.
1327 servict_type
= daemon_type_to_service(daemon_spec
.daemon_type
)
1328 dd
= daemon_spec
.to_daemon_description(DaemonDescriptionStatus
.error
, 'failed')
1329 self
.mgr
.cephadm_services
[servict_type
].post_remove(dd
, is_failed_deploy
=True)
1332 def _setup_extra_deployment_args(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[CephadmDaemonDeploySpec
, Optional
[List
[str]], Optional
[List
[str]]]:
1333 # this function is for handling any potential user specified
1334 # (in the service spec) extra runtime or entrypoint args for a daemon
1335 # we are going to deploy. Effectively just adds a set of extra args to
1336 # pass to the cephadm binary to indicate the daemon being deployed
1337 # needs extra runtime/entrypoint args. Returns the modified daemon spec
1338 # as well as what args were added (as those are included in unit.meta file)
1340 eca
= daemon_spec
.extra_container_args
1343 # args with spaces need to be split into multiple args
1344 # in order to work properly
1348 daemon_spec
.extra_args
.append(f
'--extra-container-args={arg}')
1349 except AttributeError:
1352 eea
= daemon_spec
.extra_entrypoint_args
1355 # args with spaces need to be split into multiple args
1356 # in order to work properly
1360 daemon_spec
.extra_args
.append(f
'--extra-entrypoint-args={arg}')
1361 except AttributeError:
1363 return daemon_spec
, eca
, eea
1365 def _remove_daemon(self
, name
: str, host
: str, no_post_remove
: bool = False) -> str:
1369 (daemon_type
, daemon_id
) = name
.split('.', 1)
1370 daemon
= orchestrator
.DaemonDescription(
1371 daemon_type
=daemon_type
,
1372 daemon_id
=daemon_id
,
1375 with
set_exception_subject('service', daemon
.service_id(), overwrite
=True):
1377 self
.mgr
.cephadm_services
[daemon_type_to_service(daemon_type
)].pre_remove(daemon
)
1378 # NOTE: we are passing the 'force' flag here, which means
1379 # we can delete a mon instances data.
1380 dd
= self
.mgr
.cache
.get_daemon(daemon
.daemon_name
)
1382 args
= ['--name', name
, '--force', '--tcp-ports', ' '.join(map(str, dd
.ports
))]
1384 args
= ['--name', name
, '--force']
1386 self
.log
.info('Removing daemon %s from %s -- ports %s' % (name
, host
, dd
.ports
))
1387 with self
.mgr
.async_timeout_handler(host
, f
'cephadm rm-daemon (daemon {name})'):
1388 out
, err
, code
= self
.mgr
.wait_async(self
._run
_cephadm
(
1389 host
, name
, 'rm-daemon', args
))
1391 # remove item from cache
1392 self
.mgr
.cache
.rm_daemon(host
, name
)
1393 self
.mgr
.cache
.invalidate_host_daemons(host
)
1395 if not no_post_remove
:
1396 if daemon_type
not in ['iscsi']:
1397 self
.mgr
.cephadm_services
[daemon_type_to_service(
1398 daemon_type
)].post_remove(daemon
, is_failed_deploy
=False)
1400 self
.mgr
.scheduled_async_actions
.append(lambda: self
.mgr
.cephadm_services
[daemon_type_to_service(
1401 daemon_type
)].post_remove(daemon
, is_failed_deploy
=False))
1402 self
.mgr
._kick
_serve
_loop
()
1404 return "Removed {} from host '{}'".format(name
, host
)
1406 async def _run_cephadm_json(self
,
1408 entity
: Union
[CephadmNoImage
, str],
1411 no_fsid
: Optional
[bool] = False,
1412 error_ok
: Optional
[bool] = False,
1413 image
: Optional
[str] = "",
1414 log_output
: Optional
[bool] = True,
1417 out
, err
, code
= await self
._run
_cephadm
(
1418 host
, entity
, command
, args
, no_fsid
=no_fsid
, error_ok
=error_ok
,
1419 image
=image
, log_output
=log_output
)
1421 raise OrchestratorError(f
'host {host} `cephadm {command}` returned {code}: {err}')
1422 except Exception as e
:
1423 raise OrchestratorError(f
'host {host} `cephadm {command}` failed: {e}')
1425 return json
.loads(''.join(out
))
1426 except (ValueError, KeyError):
1427 msg
= f
'host {host} `cephadm {command}` failed: Cannot decode JSON'
1428 self
.log
.exception(f
'{msg}: {"".join(out)}')
1429 raise OrchestratorError(msg
)
1431 async def _run_cephadm(self
,
1433 entity
: Union
[CephadmNoImage
, str],
1436 addr
: Optional
[str] = "",
1437 stdin
: Optional
[str] = "",
1438 no_fsid
: Optional
[bool] = False,
1439 error_ok
: Optional
[bool] = False,
1440 image
: Optional
[str] = "",
1441 env_vars
: Optional
[List
[str]] = None,
1442 log_output
: Optional
[bool] = True,
1443 timeout
: Optional
[int] = None, # timeout in seconds
1444 ) -> Tuple
[List
[str], List
[str], int]:
1446 Run cephadm on the remote host with the given command + args
1448 Important: You probably don't want to run _run_cephadm from CLI handlers
1450 :env_vars: in format -> [KEY=VALUE, ..]
1453 await self
.mgr
.ssh
._remote
_connection
(host
, addr
)
1455 self
.log
.debug(f
"_run_cephadm : command = {command}")
1456 self
.log
.debug(f
"_run_cephadm : args = {args}")
1458 bypass_image
= ('agent')
1460 assert image
or entity
1461 # Skip the image check for daemons deployed that are not ceph containers
1462 if not str(entity
).startswith(bypass_image
):
1463 if not image
and entity
is not cephadmNoImage
:
1464 image
= self
.mgr
._get
_container
_image
(entity
)
1470 for env_var_pair
in env_vars
:
1471 final_args
.extend(['--env', env_var_pair
])
1474 final_args
.extend(['--image', image
])
1476 if not self
.mgr
.container_init
:
1477 final_args
+= ['--no-container-init']
1479 if not self
.mgr
.cgroups_split
:
1480 final_args
+= ['--no-cgroups-split']
1483 # default global timeout if no timeout was passed
1484 timeout
= self
.mgr
.default_cephadm_command_timeout
1485 # put a lower bound of 60 seconds in case users
1486 # accidentally set it to something unreasonable.
1487 # For example if they though it was in minutes
1488 # rather than seconds
1490 self
.log
.info(f
'Found default timeout set to {timeout}. Instead trying minimum of 60.')
1492 # subtract a small amount to give this timeout
1493 # in the binary a chance to actually happen over
1494 # the asyncio based timeout in the mgr module
1496 final_args
+= ['--timeout', str(timeout
)]
1499 final_args
.append(command
)
1503 final_args
+= ['--fsid', self
.mgr
._cluster
_fsid
]
1508 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1509 if self
.mgr
.mode
== 'root':
1510 # agent has cephadm binary as an extra file which is
1511 # therefore passed over stdin. Even for debug logs it's too much
1512 if stdin
and 'agent' not in str(entity
):
1513 self
.log
.debug('stdin: %s' % stdin
)
1515 cmd
= ['which', 'python3']
1516 python
= await self
.mgr
.ssh
._check
_execute
_command
(host
, cmd
, addr
=addr
)
1517 cmd
= [python
, self
.mgr
.cephadm_binary_path
] + final_args
1520 out
, err
, code
= await self
.mgr
.ssh
._execute
_command
(
1521 host
, cmd
, stdin
=stdin
, addr
=addr
)
1523 ls_cmd
= ['ls', self
.mgr
.cephadm_binary_path
]
1524 out_ls
, err_ls
, code_ls
= await self
.mgr
.ssh
._execute
_command
(host
, ls_cmd
, addr
=addr
,
1525 log_command
=log_output
)
1527 await self
._deploy
_cephadm
_binary
(host
, addr
)
1528 out
, err
, code
= await self
.mgr
.ssh
._execute
_command
(
1529 host
, cmd
, stdin
=stdin
, addr
=addr
)
1530 # if there is an agent on this host, make sure it is using the most recent
1531 # version of cephadm binary
1532 if host
in self
.mgr
.inventory
:
1533 for agent
in self
.mgr
.cache
.get_daemons_by_type('agent', host
):
1534 self
.mgr
._schedule
_daemon
_action
(agent
.name(), 'redeploy')
1536 except Exception as e
:
1537 await self
.mgr
.ssh
._reset
_con
(host
)
1539 return [], [str(e
)], 1
1542 elif self
.mgr
.mode
== 'cephadm-package':
1544 cmd
= ['/usr/bin/cephadm'] + final_args
1545 out
, err
, code
= await self
.mgr
.ssh
._execute
_command
(
1546 host
, cmd
, stdin
=stdin
, addr
=addr
)
1547 except Exception as e
:
1548 await self
.mgr
.ssh
._reset
_con
(host
)
1550 return [], [str(e
)], 1
1553 assert False, 'unsupported mode'
1556 self
.log
.debug(f
'code: {code}')
1558 self
.log
.debug(f
'out: {out}')
1560 self
.log
.debug(f
'err: {err}')
1561 if code
and not error_ok
:
1562 raise OrchestratorError(
1563 f
'cephadm exited with an error code: {code}, stderr: {err}')
1564 return [out
], [err
], code
1566 async def _get_container_image_info(self
, image_name
: str) -> ContainerInspectInfo
:
1567 # pick a random host...
1569 for host_name
in self
.mgr
.inventory
.keys():
1573 raise OrchestratorError('no hosts defined')
1574 if self
.mgr
.cache
.host_needs_registry_login(host
) and self
.mgr
.registry_url
:
1575 await self
._registry
_login
(host
, json
.loads(str(self
.mgr
.get_store('registry_credentials'))))
1579 j
= await self
._run
_cephadm
_json
(host
, '', 'inspect-image', [],
1580 image
=image_name
, no_fsid
=True,
1582 except OrchestratorError
:
1586 pullargs
: List
[str] = []
1587 if self
.mgr
.registry_insecure
:
1588 pullargs
.append("--insecure")
1590 j
= await self
._run
_cephadm
_json
(host
, '', 'pull', pullargs
,
1591 image
=image_name
, no_fsid
=True)
1592 r
= ContainerInspectInfo(
1594 j
.get('ceph_version'),
1595 j
.get('repo_digests')
1597 self
.log
.debug(f
'image {image_name} -> {r}')
1600 # function responsible for logging single host into custom registry
1601 async def _registry_login(self
, host
: str, registry_json
: Dict
[str, str]) -> Optional
[str]:
1603 f
"Attempting to log host {host} into custom registry @ {registry_json['url']}")
1604 # want to pass info over stdin rather than through normal list of args
1605 out
, err
, code
= await self
._run
_cephadm
(
1606 host
, 'mon', 'registry-login',
1607 ['--registry-json', '-'], stdin
=json
.dumps(registry_json
), error_ok
=True)
1609 return f
"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
1612 async def _deploy_cephadm_binary(self
, host
: str, addr
: Optional
[str] = None) -> None:
1613 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1614 self
.log
.info(f
"Deploying cephadm binary to {host}")
1615 await self
.mgr
.ssh
._write
_remote
_file
(host
, self
.mgr
.cephadm_binary_path
,
1616 self
.mgr
._cephadm
, addr
=addr
)