5 from collections
import defaultdict
6 from contextlib
import contextmanager
7 from typing
import TYPE_CHECKING
, Optional
, List
, cast
, Dict
, Any
, Union
, Tuple
, Iterator
9 from cephadm
import remotes
13 import execnet
.gateway_bootstrap
17 from ceph
.deployment
import inventory
18 from ceph
.deployment
.drive_group
import DriveGroupSpec
19 from ceph
.deployment
.service_spec
import ServiceSpec
, CustomContainerSpec
, PlacementSpec
20 from ceph
.utils
import str_to_datetime
, datetime_now
23 from orchestrator
import OrchestratorError
, set_exception_subject
, OrchestratorEvent
, \
24 DaemonDescriptionStatus
, daemon_type_to_service
25 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
26 from cephadm
.schedule
import HostAssignment
27 from cephadm
.autotune
import MemoryAutotuner
28 from cephadm
.utils
import forall_hosts
, cephadmNoImage
, is_repo_digest
, \
29 CephadmNoImage
, CEPH_TYPES
, ContainerInspectInfo
30 from mgr_module
import MonCommandFailed
31 from mgr_util
import format_bytes
36 from cephadm
.module
import CephadmOrchestrator
37 from remoto
.backends
import BaseConnection
39 logger
= logging
.getLogger(__name__
)
41 REQUIRES_POST_ACTIONS
= ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
46 This module contains functions that are executed in the
47 serve() thread. Thus they don't block the CLI.
49 Please see the `Note regarding network calls from CLI handlers`
50 chapter in the cephadm developer guide.
52 On the other hand, These function should *not* be called form
53 CLI handlers, to avoid blocking the CLI
56 def __init__(self
, mgr
: "CephadmOrchestrator"):
57 self
.mgr
: "CephadmOrchestrator" = mgr
60 def serve(self
) -> None:
62 The main loop of cephadm.
64 A command handler will typically change the declarative state
65 of cephadm. This loop will then attempt to apply this new state.
67 self
.log
.debug("serve starting")
68 self
.mgr
.config_checker
.load_network_config()
74 self
.convert_tags_to_repo_digest()
77 self
.log
.debug('refreshing hosts and daemons')
78 self
._refresh
_hosts
_and
_daemons
()
80 self
._check
_for
_strays
()
82 self
._update
_paused
_health
()
84 if self
.mgr
.need_connect_dashboard_rgw
and self
.mgr
.config_dashboard
:
85 self
.mgr
.need_connect_dashboard_rgw
= False
86 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
87 self
.log
.info('Checking dashboard <-> RGW credentials')
88 self
.mgr
.remote('dashboard', 'set_rgw_credentials')
90 if not self
.mgr
.paused
:
91 self
.mgr
.to_remove_osds
.process_removal_queue()
93 self
.mgr
.migration
.migrate()
94 if self
.mgr
.migration
.is_migration_ongoing():
97 if self
._apply
_all
_services
():
98 continue # did something, refresh
100 self
._check
_daemons
()
102 self
._purge
_deleted
_services
()
104 if self
.mgr
.upgrade
.continue_upgrade():
107 except OrchestratorError
as e
:
109 self
.mgr
.events
.from_orch_error(e
)
112 self
.log
.debug("serve exit")
114 def _serve_sleep(self
) -> None:
115 sleep_interval
= max(
118 self
.mgr
.host_check_interval
,
119 self
.mgr
.facts_cache_timeout
,
120 self
.mgr
.daemon_cache_timeout
,
121 self
.mgr
.device_cache_timeout
,
124 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
125 self
.mgr
.event
.wait(sleep_interval
)
126 self
.mgr
.event
.clear()
128 def _update_paused_health(self
) -> None:
130 self
.mgr
.health_checks
['CEPHADM_PAUSED'] = {
131 'severity': 'warning',
132 'summary': 'cephadm background work is paused',
134 'detail': ["'ceph orch resume' to resume"],
136 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
138 if 'CEPHADM_PAUSED' in self
.mgr
.health_checks
:
139 del self
.mgr
.health_checks
['CEPHADM_PAUSED']
140 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
142 def _autotune_host_memory(self
, host
: str) -> None:
143 total_mem
= self
.mgr
.cache
.get_facts(host
).get('memory_total_kb', 0)
147 total_mem
*= 1024 # kb -> bytes
148 total_mem
*= self
.mgr
.autotune_memory_target_ratio
150 daemons
=self
.mgr
.cache
.get_daemons_by_host(host
),
151 config_get
=self
.mgr
.get_foreign_ceph_option
,
157 if self
.mgr
.get_foreign_ceph_option(o
, 'osd_memory_target') != val
:
158 self
.mgr
.check_mon_command({
159 'prefix': 'config rm',
161 'name': 'osd_memory_target',
167 f
'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
169 ret
, out
, err
= self
.mgr
.mon_command({
170 'prefix': 'config set',
171 'who': f
'osd/host:{host}',
172 'name': 'osd_memory_target',
177 f
'Unable to set osd_memory_target on {host} to {val}: {err}'
180 self
.mgr
.check_mon_command({
181 'prefix': 'config rm',
182 'who': f
'osd/host:{host}',
183 'name': 'osd_memory_target',
185 self
.mgr
.cache
.update_autotune(host
)
187 def _refresh_hosts_and_daemons(self
) -> None:
191 # host -> path -> (mode, uid, gid, content, digest)
192 client_files
: Dict
[str, Dict
[str, Tuple
[int, int, int, bytes
, str]]] = {}
195 if self
.mgr
.manage_etc_ceph_ceph_conf
or self
.mgr
.keys
.keys
:
196 config
= self
.mgr
.get_minimal_ceph_conf().encode('utf-8')
197 config_digest
= ''.join('%02x' % c
for c
in hashlib
.sha256(config
).digest())
199 if self
.mgr
.manage_etc_ceph_ceph_conf
:
201 pspec
= PlacementSpec
.from_string(self
.mgr
.manage_etc_ceph_ceph_conf_hosts
)
203 spec
=ServiceSpec('mon', placement
=pspec
),
204 hosts
=self
.mgr
._schedulable
_hosts
(),
205 unreachable_hosts
=self
.mgr
._unreachable
_hosts
(),
207 networks
=self
.mgr
.cache
.networks
,
209 all_slots
, _
, _
= ha
.place()
210 for host
in {s
.hostname
for s
in all_slots
}:
211 if host
not in client_files
:
212 client_files
[host
] = {}
213 client_files
[host
]['/etc/ceph/ceph.conf'] = (
214 0o644, 0, 0, bytes(config
), str(config_digest
)
216 except Exception as e
:
217 self
.mgr
.log
.warning(
218 f
'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
221 for ks
in self
.mgr
.keys
.keys
.values():
225 ret
, keyring
, err
= self
.mgr
.mon_command({
226 'prefix': 'auth get',
230 self
.log
.warning(f
'unable to fetch keyring for {ks.entity}')
232 digest
= ''.join('%02x' % c
for c
in hashlib
.sha256(
233 keyring
.encode('utf-8')).digest())
235 spec
=ServiceSpec('mon', placement
=ks
.placement
),
236 hosts
=self
.mgr
._schedulable
_hosts
(),
237 unreachable_hosts
=self
.mgr
._unreachable
_hosts
(),
239 networks
=self
.mgr
.cache
.networks
,
241 all_slots
, _
, _
= ha
.place()
242 for host
in {s
.hostname
for s
in all_slots
}:
243 if host
not in client_files
:
244 client_files
[host
] = {}
245 client_files
[host
]['/etc/ceph/ceph.conf'] = (
246 0o644, 0, 0, bytes(config
), str(config_digest
)
248 client_files
[host
][ks
.path
] = (
249 ks
.mode
, ks
.uid
, ks
.gid
, keyring
.encode('utf-8'), digest
251 except Exception as e
:
253 f
'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
256 def refresh(host
: str) -> None:
258 # skip hosts that are in maintenance - they could be powered off
259 if self
.mgr
.inventory
._inventory
[host
].get("status", "").lower() == "maintenance":
262 if self
.mgr
.cache
.host_needs_check(host
):
263 r
= self
._check
_host
(host
)
266 if self
.mgr
.cache
.host_needs_daemon_refresh(host
):
267 self
.log
.debug('refreshing %s daemons' % host
)
268 r
= self
._refresh
_host
_daemons
(host
)
272 if self
.mgr
.cache
.host_needs_registry_login(host
) and self
.mgr
.registry_url
:
273 self
.log
.debug(f
"Logging `{host}` into custom registry")
274 r
= self
._registry
_login
(host
, self
.mgr
.registry_url
,
275 self
.mgr
.registry_username
, self
.mgr
.registry_password
)
279 if self
.mgr
.cache
.host_needs_device_refresh(host
):
280 self
.log
.debug('refreshing %s devices' % host
)
281 r
= self
._refresh
_host
_devices
(host
)
285 if self
.mgr
.cache
.host_needs_facts_refresh(host
):
286 self
.log
.debug(('Refreshing %s facts' % host
))
287 r
= self
._refresh
_facts
(host
)
291 if self
.mgr
.cache
.host_needs_osdspec_preview_refresh(host
):
292 self
.log
.debug(f
"refreshing OSDSpec previews for {host}")
293 r
= self
._refresh
_host
_osdspec
_previews
(host
)
298 self
.mgr
.cache
.host_needs_autotune_memory(host
)
299 and not self
.mgr
.inventory
.has_label(host
, '_no_autotune_memory')
301 self
.log
.debug(f
"autotuning memory for {host}")
302 self
._autotune
_host
_memory
(host
)
305 updated_files
= False
306 old_files
= self
.mgr
.cache
.get_host_client_files(host
).copy()
307 for path
, m
in client_files
.get(host
, {}).items():
308 mode
, uid
, gid
, content
, digest
= m
309 if path
in old_files
:
310 match
= old_files
[path
] == (digest
, mode
, uid
, gid
)
314 self
.log
.info(f
'Updating {host}:{path}')
315 self
._write
_remote
_file
(host
, path
, content
, mode
, uid
, gid
)
316 self
.mgr
.cache
.update_client_file(host
, path
, digest
, mode
, uid
, gid
)
318 for path
in old_files
.keys():
319 self
.log
.info(f
'Removing {host}:{path}')
320 with self
._remote
_connection
(host
) as tpl
:
322 out
, err
, code
= remoto
.process
.check(
326 self
.mgr
.cache
.removed_client_file(host
, path
)
328 self
.mgr
.cache
.save_host(host
)
330 refresh(self
.mgr
.cache
.get_hosts())
332 self
.mgr
.config_checker
.run_checks()
334 health_changed
= False
336 'CEPHADM_HOST_CHECK_FAILED',
337 'CEPHADM_FAILED_DAEMON',
338 'CEPHADM_REFRESH_FAILED',
340 if k
in self
.mgr
.health_checks
:
341 del self
.mgr
.health_checks
[k
]
342 health_changed
= True
344 self
.mgr
.health_checks
['CEPHADM_HOST_CHECK_FAILED'] = {
345 'severity': 'warning',
346 'summary': '%d hosts fail cephadm check' % len(bad_hosts
),
347 'count': len(bad_hosts
),
350 health_changed
= True
352 self
.mgr
.health_checks
['CEPHADM_REFRESH_FAILED'] = {
353 'severity': 'warning',
354 'summary': 'failed to probe daemons or devices',
355 'count': len(failures
),
358 health_changed
= True
360 for dd
in self
.mgr
.cache
.get_daemons():
361 if dd
.status
is not None and dd
.status
== DaemonDescriptionStatus
.error
:
362 failed_daemons
.append('daemon %s on %s is in %s state' % (
363 dd
.name(), dd
.hostname
, dd
.status_desc
366 self
.mgr
.health_checks
['CEPHADM_FAILED_DAEMON'] = {
367 'severity': 'warning',
368 'summary': '%d failed cephadm daemon(s)' % len(failed_daemons
),
369 'count': len(failed_daemons
),
370 'detail': failed_daemons
,
372 health_changed
= True
374 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
376 def _check_host(self
, host
: str) -> Optional
[str]:
377 if host
not in self
.mgr
.inventory
:
379 self
.log
.debug(' checking %s' % host
)
381 addr
= self
.mgr
.inventory
.get_addr(host
) if host
in self
.mgr
.inventory
else host
382 out
, err
, code
= self
._run
_cephadm
(
383 host
, cephadmNoImage
, 'check-host', [],
384 error_ok
=True, no_fsid
=True)
385 self
.mgr
.cache
.update_last_host_check(host
)
386 self
.mgr
.cache
.save_host(host
)
388 self
.log
.debug(' host %s (%s) failed check' % (host
, addr
))
389 if self
.mgr
.warn_on_failed_host_check
:
390 return 'host %s (%s) failed check: %s' % (host
, addr
, err
)
392 self
.log
.debug(' host %s (%s) ok' % (host
, addr
))
393 except Exception as e
:
394 self
.log
.debug(' host %s (%s) failed check' % (host
, addr
))
395 return 'host %s (%s) failed check: %s' % (host
, addr
, e
)
398 def _refresh_host_daemons(self
, host
: str) -> Optional
[str]:
400 ls
= self
._run
_cephadm
_json
(host
, 'mon', 'ls', [], no_fsid
=True)
401 except OrchestratorError
as e
:
405 if not d
['style'].startswith('cephadm'):
407 if d
['fsid'] != self
.mgr
._cluster
_fsid
:
409 if '.' not in d
['name']:
411 sd
= orchestrator
.DaemonDescription()
412 sd
.last_refresh
= datetime_now()
413 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
416 setattr(sd
, k
, str_to_datetime(d
[k
]))
417 sd
.daemon_type
= d
['name'].split('.')[0]
418 if sd
.daemon_type
not in orchestrator
.KNOWN_DAEMON_TYPES
:
419 logger
.warning(f
"Found unknown daemon type {sd.daemon_type} on host {host}")
422 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
424 sd
.container_id
= d
.get('container_id')
427 sd
.container_id
= sd
.container_id
[0:12]
428 sd
.container_image_name
= d
.get('container_image_name')
429 sd
.container_image_id
= d
.get('container_image_id')
430 sd
.container_image_digests
= d
.get('container_image_digests')
431 sd
.memory_usage
= d
.get('memory_usage')
432 sd
.memory_request
= d
.get('memory_request')
433 sd
.memory_limit
= d
.get('memory_limit')
434 sd
._service
_name
= d
.get('service_name')
435 sd
.deployed_by
= d
.get('deployed_by')
436 sd
.version
= d
.get('version')
437 sd
.ports
= d
.get('ports')
439 sd
.rank
= int(d
['rank']) if d
.get('rank') is not None else None
440 sd
.rank_generation
= int(d
['rank_generation']) if d
.get(
441 'rank_generation') is not None else None
442 if sd
.daemon_type
== 'osd':
443 sd
.osdspec_affinity
= self
.mgr
.osd_service
.get_osdspec_affinity(sd
.daemon_id
)
445 sd
.status_desc
= d
['state']
447 'running': DaemonDescriptionStatus
.running
,
448 'stopped': DaemonDescriptionStatus
.stopped
,
449 'error': DaemonDescriptionStatus
.error
,
450 'unknown': DaemonDescriptionStatus
.error
,
453 sd
.status_desc
= 'unknown'
456 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
457 self
.mgr
.cache
.update_host_daemons(host
, dm
)
458 self
.mgr
.cache
.save_host(host
)
461 def _refresh_facts(self
, host
: str) -> Optional
[str]:
463 val
= self
._run
_cephadm
_json
(host
, cephadmNoImage
, 'gather-facts', [], no_fsid
=True)
464 except OrchestratorError
as e
:
467 self
.mgr
.cache
.update_host_facts(host
, val
)
471 def _refresh_host_devices(self
, host
: str) -> Optional
[str]:
473 with_lsm
= self
.mgr
.get_module_option('device_enhanced_scan')
474 inventory_args
= ['--', 'inventory',
476 '--filter-for-batch']
478 inventory_args
.insert(-1, "--with-lsm")
482 devices
= self
._run
_cephadm
_json
(host
, 'osd', 'ceph-volume',
484 except OrchestratorError
as e
:
485 if 'unrecognized arguments: --filter-for-batch' in str(e
):
486 rerun_args
= inventory_args
.copy()
487 rerun_args
.remove('--filter-for-batch')
488 devices
= self
._run
_cephadm
_json
(host
, 'osd', 'ceph-volume',
493 networks
= self
._run
_cephadm
_json
(host
, 'mon', 'list-networks', [], no_fsid
=True)
494 except OrchestratorError
as e
:
497 self
.log
.debug('Refreshed host %s devices (%d) networks (%s)' % (
498 host
, len(devices
), len(networks
)))
499 ret
= inventory
.Devices
.from_json(devices
)
500 self
.mgr
.cache
.update_host_devices_networks(host
, ret
.devices
, networks
)
501 self
.update_osdspec_previews(host
)
502 self
.mgr
.cache
.save_host(host
)
505 def _refresh_host_osdspec_previews(self
, host
: str) -> Optional
[str]:
506 self
.update_osdspec_previews(host
)
507 self
.mgr
.cache
.save_host(host
)
508 self
.log
.debug(f
'Refreshed OSDSpec previews for host <{host}>')
511 def update_osdspec_previews(self
, search_host
: str = '') -> None:
512 # Set global 'pending' flag for host
513 self
.mgr
.cache
.loading_osdspec_preview
.add(search_host
)
515 # query OSDSpecs for host <search host> and generate/get the preview
516 # There can be multiple previews for one host due to multiple OSDSpecs.
517 previews
.extend(self
.mgr
.osd_service
.get_previews(search_host
))
518 self
.log
.debug(f
'Loading OSDSpec previews to HostCache for host <{search_host}>')
519 self
.mgr
.cache
.osdspec_previews
[search_host
] = previews
520 # Unset global 'pending' flag for host
521 self
.mgr
.cache
.loading_osdspec_preview
.remove(search_host
)
523 def _check_for_strays(self
) -> None:
524 self
.log
.debug('_check_for_strays')
525 for k
in ['CEPHADM_STRAY_HOST',
526 'CEPHADM_STRAY_DAEMON']:
527 if k
in self
.mgr
.health_checks
:
528 del self
.mgr
.health_checks
[k
]
529 if self
.mgr
.warn_on_stray_hosts
or self
.mgr
.warn_on_stray_daemons
:
530 ls
= self
.mgr
.list_servers()
531 managed
= self
.mgr
.cache
.get_daemon_names()
532 host_detail
= [] # type: List[str]
534 daemon_detail
= [] # type: List[str]
536 host
= item
.get('hostname')
537 assert isinstance(host
, str)
538 daemons
= item
.get('services') # misnomer!
539 assert isinstance(daemons
, list)
542 daemon_id
= s
.get('id')
544 name
= '%s.%s' % (s
.get('type'), daemon_id
)
545 if s
.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
546 metadata
= self
.mgr
.get_metadata(
547 cast(str, s
.get('type')), daemon_id
, {})
548 assert metadata
is not None
550 if s
.get('type') == 'rgw-nfs':
551 # https://tracker.ceph.com/issues/49573
552 name
= metadata
['id'][:-4]
554 name
= '%s.%s' % (s
.get('type'), metadata
['id'])
555 except (KeyError, TypeError):
557 "Failed to find daemon id for %s service %s" % (
558 s
.get('type'), s
.get('id')
562 if host
not in self
.mgr
.inventory
:
563 missing_names
.append(name
)
564 host_num_daemons
+= 1
565 if name
not in managed
:
566 daemon_detail
.append(
567 'stray daemon %s on host %s not managed by cephadm' % (name
, host
))
570 'stray host %s has %d stray daemons: %s' % (
571 host
, len(missing_names
), missing_names
))
572 if self
.mgr
.warn_on_stray_hosts
and host_detail
:
573 self
.mgr
.health_checks
['CEPHADM_STRAY_HOST'] = {
574 'severity': 'warning',
575 'summary': '%d stray host(s) with %s daemon(s) '
576 'not managed by cephadm' % (
577 len(host_detail
), host_num_daemons
),
578 'count': len(host_detail
),
579 'detail': host_detail
,
581 if self
.mgr
.warn_on_stray_daemons
and daemon_detail
:
582 self
.mgr
.health_checks
['CEPHADM_STRAY_DAEMON'] = {
583 'severity': 'warning',
584 'summary': '%d stray daemon(s) not managed by cephadm' % (
586 'count': len(daemon_detail
),
587 'detail': daemon_detail
,
589 self
.mgr
.set_health_checks(self
.mgr
.health_checks
)
591 def _apply_all_services(self
) -> bool:
593 specs
= [] # type: List[ServiceSpec]
594 for sn
, spec
in self
.mgr
.spec_store
.active_specs
.items():
598 if self
._apply
_service
(spec
):
600 except Exception as e
:
601 self
.log
.exception('Failed to apply %s spec %s: %s' % (
602 spec
.service_name(), spec
, e
))
603 self
.mgr
.events
.for_service(spec
, 'ERROR', 'Failed to apply: ' + str(e
))
607 def _apply_service_config(self
, spec
: ServiceSpec
) -> None:
609 section
= utils
.name_to_config_section(spec
.service_name())
610 for k
, v
in spec
.config
.items():
612 current
= self
.mgr
.get_foreign_ceph_option(section
, k
)
615 f
'Ignoring invalid {spec.service_name()} config option {k}'
617 self
.mgr
.events
.for_service(
618 spec
, OrchestratorEvent
.ERROR
, f
'Invalid config option {k}'
622 self
.log
.debug(f
'setting [{section}] {k} = {v}')
624 self
.mgr
.check_mon_command({
625 'prefix': 'config set',
630 except MonCommandFailed
as e
:
632 f
'Failed to set {spec.service_name()} option {k}: {e}'
635 def _apply_service(self
, spec
: ServiceSpec
) -> bool:
637 Schedule a service. Deploy new daemons or remove old ones, depending
638 on the target label and count specified in the placement.
640 self
.mgr
.migration
.verify_no_migration()
642 service_type
= spec
.service_type
643 service_name
= spec
.service_name()
645 self
.log
.debug('Skipping unmanaged service %s' % service_name
)
647 if spec
.preview_only
:
648 self
.log
.debug('Skipping preview_only service %s' % service_name
)
650 self
.log
.debug('Applying service %s spec' % service_name
)
652 self
._apply
_service
_config
(spec
)
654 if service_type
== 'osd':
655 self
.mgr
.osd_service
.create_from_spec(cast(DriveGroupSpec
, spec
))
656 # TODO: return True would result in a busy loop
657 # can't know if daemon count changed; create_from_spec doesn't
658 # return a solid indication
661 svc
= self
.mgr
.cephadm_services
[service_type
]
662 daemons
= self
.mgr
.cache
.get_daemons_by_service(service_name
)
664 public_networks
: List
[str] = []
665 if service_type
== 'mon':
666 out
= str(self
.mgr
.get_foreign_ceph_option('mon', 'public_network'))
668 public_networks
= [x
.strip() for x
in out
.split(',')]
669 self
.log
.debug('mon public_network(s) is %s' % public_networks
)
671 def matches_network(host
):
672 # type: (str) -> bool
673 # make sure we have 1 or more IPs for any of those networks on that
675 for network
in public_networks
:
676 if len(self
.mgr
.cache
.networks
[host
].get(network
, [])) > 0:
679 f
"Filtered out host {host}: does not belong to mon public_network"
680 f
" ({','.join(public_networks)})"
686 rank_map
= self
.mgr
.spec_store
[spec
.service_name()].rank_map
or {}
689 hosts
=self
.mgr
._schedulable
_hosts
(),
690 unreachable_hosts
=self
.mgr
._unreachable
_hosts
(),
692 networks
=self
.mgr
.cache
.networks
,
694 matches_network
if service_type
== 'mon'
697 allow_colo
=svc
.allow_colo(),
698 primary_daemon_type
=svc
.primary_daemon_type(),
699 per_host_daemon_type
=svc
.per_host_daemon_type(),
704 all_slots
, slots_to_add
, daemons_to_remove
= ha
.place()
705 daemons_to_remove
= [d
for d
in daemons_to_remove
if (d
.hostname
and self
.mgr
.inventory
._inventory
[d
.hostname
].get(
706 'status', '').lower() not in ['maintenance', 'offline'] and d
.hostname
not in self
.mgr
.offline_hosts
)]
707 self
.log
.debug('Add %s, remove %s' % (slots_to_add
, daemons_to_remove
))
708 except OrchestratorError
as e
:
709 self
.log
.error('Failed to apply %s spec %s: %s' % (
710 spec
.service_name(), spec
, e
))
711 self
.mgr
.events
.for_service(spec
, 'ERROR', 'Failed to apply: ' + str(e
))
717 final_count
= len(daemons
) + len(slots_to_add
) - len(daemons_to_remove
)
718 if service_type
in ['mon', 'mgr'] and final_count
< 1:
719 self
.log
.debug('cannot scale mon|mgr below 1)')
723 progress_id
= str(uuid
.uuid4())
724 delta
: List
[str] = []
726 delta
+= [f
'+{len(slots_to_add)}']
727 if daemons_to_remove
:
728 delta
+= [f
'-{len(daemons_to_remove)}']
729 progress_title
= f
'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
730 progress_total
= len(slots_to_add
) + len(daemons_to_remove
)
733 def update_progress() -> None:
735 'progress', 'update', progress_id
,
736 ev_msg
=progress_title
,
737 ev_progress
=(progress_done
/ progress_total
),
747 self
.log
.debug('Hosts that will receive new daemons: %s' % slots_to_add
)
748 self
.log
.debug('Daemons that will be removed: %s' % daemons_to_remove
)
752 for i
in range(len(slots_to_add
)):
753 slot
= slots_to_add
[i
]
754 slot
= slot
.assign_name(self
.mgr
.get_unique_name(
758 prefix
=spec
.service_id
,
761 rank_generation
=slot
.rank_generation
,
763 slots_to_add
[i
] = slot
764 if rank_map
is not None:
765 assert slot
.rank
is not None
766 assert slot
.rank_generation
is not None
767 assert rank_map
[slot
.rank
][slot
.rank_generation
] is None
768 rank_map
[slot
.rank
][slot
.rank_generation
] = slot
.name
771 # record the rank_map before we make changes so that if we fail the
772 # next mgr will clean up.
773 self
.mgr
.spec_store
.save_rank_map(spec
.service_name(), rank_map
)
775 # remove daemons now, since we are going to fence them anyway
776 for d
in daemons_to_remove
:
777 assert d
.hostname
is not None
778 self
._remove
_daemon
(d
.name(), d
.hostname
)
779 daemons_to_remove
= []
782 svc
.fence_old_ranks(spec
, rank_map
, len(all_slots
))
785 for slot
in slots_to_add
:
786 # first remove daemon on conflicting port?
788 for d
in daemons_to_remove
:
789 if d
.hostname
!= slot
.hostname
:
791 if not (set(d
.ports
or []) & set(slot
.ports
)):
793 if d
.ip
and slot
.ip
and d
.ip
!= slot
.ip
:
796 f
'Removing {d.name()} before deploying to {slot} to avoid a port conflict'
798 # NOTE: we don't check ok-to-stop here to avoid starvation if
799 # there is only 1 gateway.
800 self
._remove
_daemon
(d
.name(), d
.hostname
)
801 daemons_to_remove
.remove(d
)
806 daemon_id
= slot
.name
811 daemon_spec
= svc
.make_daemon_spec(
812 slot
.hostname
, daemon_id
, slot
.network
, spec
,
813 daemon_type
=slot
.daemon_type
,
817 rank_generation
=slot
.rank_generation
,
819 self
.log
.debug('Placing %s.%s on host %s' % (
820 slot
.daemon_type
, daemon_id
, slot
.hostname
))
823 daemon_spec
= svc
.prepare_create(daemon_spec
)
824 self
._create
_daemon
(daemon_spec
)
828 except (RuntimeError, OrchestratorError
) as e
:
829 msg
= (f
"Failed while placing {slot.daemon_type}.{daemon_id} "
830 f
"on {slot.hostname}: {e}")
831 self
.mgr
.events
.for_service(spec
, 'ERROR', msg
)
832 self
.mgr
.log
.error(msg
)
833 # only return "no change" if no one else has already succeeded.
834 # later successes will also change to True
841 # add to daemon list so next name(s) will also be unique
842 sd
= orchestrator
.DaemonDescription(
843 hostname
=slot
.hostname
,
844 daemon_type
=slot
.daemon_type
,
850 def _ok_to_stop(remove_daemons
: List
[orchestrator
.DaemonDescription
]) -> bool:
851 daemon_ids
= [d
.daemon_id
for d
in remove_daemons
]
852 assert None not in daemon_ids
853 # setting force flag retains previous behavior
854 r
= svc
.ok_to_stop(cast(List
[str], daemon_ids
), force
=True)
857 while daemons_to_remove
and not _ok_to_stop(daemons_to_remove
):
858 # let's find a subset that is ok-to-stop
859 daemons_to_remove
.pop()
860 for d
in daemons_to_remove
:
862 assert d
.hostname
is not None
863 self
._remove
_daemon
(d
.name(), d
.hostname
)
868 self
.mgr
.remote('progress', 'complete', progress_id
)
869 except Exception as e
:
870 self
.mgr
.remote('progress', 'fail', progress_id
, str(e
))
877 def _check_daemons(self
) -> None:
879 daemons
= self
.mgr
.cache
.get_daemons()
880 daemons_post
: Dict
[str, List
[orchestrator
.DaemonDescription
]] = defaultdict(list)
883 spec
= self
.mgr
.spec_store
.active_specs
.get(dd
.service_name(), None)
884 assert dd
.hostname
is not None
885 assert dd
.daemon_type
is not None
886 assert dd
.daemon_id
is not None
887 if not spec
and dd
.daemon_type
not in ['mon', 'mgr', 'osd']:
888 # (mon and mgr specs should always exist; osds aren't matched
890 self
.log
.info('Removing orphan daemon %s...' % dd
.name())
891 self
._remove
_daemon
(dd
.name(), dd
.hostname
)
893 # ignore unmanaged services
894 if spec
and spec
.unmanaged
:
897 # ignore daemons for deleted services
898 if dd
.service_name() in self
.mgr
.spec_store
.spec_deleted
:
901 # These daemon types require additional configs after creation
902 if dd
.daemon_type
in REQUIRES_POST_ACTIONS
:
903 daemons_post
[dd
.daemon_type
].append(dd
)
905 if self
.mgr
.cephadm_services
[daemon_type_to_service(dd
.daemon_type
)].get_active_daemon(
906 self
.mgr
.cache
.get_daemons_by_service(dd
.service_name())).daemon_id
== dd
.daemon_id
:
911 deps
= self
.mgr
._calc
_daemon
_deps
(spec
, dd
.daemon_type
, dd
.daemon_id
)
912 last_deps
, last_config
= self
.mgr
.cache
.get_daemon_last_config_deps(
913 dd
.hostname
, dd
.name())
914 if last_deps
is None:
916 action
= self
.mgr
.cache
.get_scheduled_daemon_action(dd
.hostname
, dd
.name())
918 self
.log
.info('Reconfiguring %s (unknown last config time)...' % (
921 elif last_deps
!= deps
:
922 self
.log
.debug('%s deps %s -> %s' % (dd
.name(), last_deps
,
924 self
.log
.info('Reconfiguring %s (dependencies changed)...' % (
927 elif self
.mgr
.last_monmap
and \
928 self
.mgr
.last_monmap
> last_config
and \
929 dd
.daemon_type
in CEPH_TYPES
:
930 self
.log
.info('Reconfiguring %s (monmap changed)...' % dd
.name())
932 elif self
.mgr
.extra_ceph_conf_is_newer(last_config
) and \
933 dd
.daemon_type
in CEPH_TYPES
:
934 self
.log
.info('Reconfiguring %s (extra config changed)...' % dd
.name())
937 if self
.mgr
.cache
.get_scheduled_daemon_action(dd
.hostname
, dd
.name()) == 'redeploy' \
938 and action
== 'reconfig':
941 daemon_spec
= CephadmDaemonDeploySpec
.from_daemon_description(dd
)
942 self
.mgr
._daemon
_action
(daemon_spec
, action
=action
)
943 self
.mgr
.cache
.rm_scheduled_daemon_action(dd
.hostname
, dd
.name())
944 except OrchestratorError
as e
:
945 self
.mgr
.events
.from_orch_error(e
)
946 if dd
.daemon_type
in daemons_post
:
947 del daemons_post
[dd
.daemon_type
]
949 except Exception as e
:
950 self
.mgr
.events
.for_daemon_from_exception(dd
.name(), e
)
951 if dd
.daemon_type
in daemons_post
:
952 del daemons_post
[dd
.daemon_type
]
955 # do daemon post actions
956 for daemon_type
, daemon_descs
in daemons_post
.items():
957 if daemon_type
in self
.mgr
.requires_post_actions
:
958 self
.mgr
.requires_post_actions
.remove(daemon_type
)
959 self
.mgr
._get
_cephadm
_service
(daemon_type_to_service(
960 daemon_type
)).daemon_check_post(daemon_descs
)
962 def _purge_deleted_services(self
) -> None:
963 existing_services
= self
.mgr
.spec_store
.all_specs
.items()
964 for service_name
, spec
in list(existing_services
):
965 if service_name
not in self
.mgr
.spec_store
.spec_deleted
:
967 if self
.mgr
.cache
.get_daemons_by_service(service_name
):
969 if spec
.service_type
in ['mon', 'mgr']:
972 logger
.info(f
'Purge service {service_name}')
974 self
.mgr
.cephadm_services
[spec
.service_type
].purge(service_name
)
975 self
.mgr
.spec_store
.finally_rm(service_name
)
977 def convert_tags_to_repo_digest(self
) -> None:
978 if not self
.mgr
.use_repo_digest
:
980 settings
= self
.mgr
.upgrade
.get_distinct_container_image_settings()
981 digests
: Dict
[str, ContainerInspectInfo
] = {}
982 for container_image_ref
in set(settings
.values()):
983 if not is_repo_digest(container_image_ref
):
984 image_info
= self
._get
_container
_image
_info
(container_image_ref
)
985 if image_info
.repo_digests
:
986 # FIXME: we assume the first digest here is the best
987 assert is_repo_digest(image_info
.repo_digests
[0]), image_info
988 digests
[container_image_ref
] = image_info
990 for entity
, container_image_ref
in settings
.items():
991 if not is_repo_digest(container_image_ref
):
992 image_info
= digests
[container_image_ref
]
993 if image_info
.repo_digests
:
994 # FIXME: we assume the first digest here is the best
995 self
.mgr
.set_container_image(entity
, image_info
.repo_digests
[0])
997 def _create_daemon(self
,
998 daemon_spec
: CephadmDaemonDeploySpec
,
999 reconfig
: bool = False,
1000 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1003 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1004 daemon_type
=daemon_spec
.daemon_type
,
1005 daemon_id
=daemon_spec
.daemon_id
,
1006 hostname
=daemon_spec
.host
,
1007 ).service_id(), overwrite
=True):
1011 start_time
= datetime_now()
1012 ports
: List
[int] = daemon_spec
.ports
if daemon_spec
.ports
else []
1014 if daemon_spec
.daemon_type
== 'container':
1015 spec
= cast(CustomContainerSpec
,
1016 self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
1019 ports
.extend(spec
.ports
)
1021 if daemon_spec
.daemon_type
== 'cephadm-exporter':
1023 assert daemon_spec
.host
1024 self
._deploy
_cephadm
_binary
(daemon_spec
.host
)
1026 # TCP port to open in the host firewall
1028 daemon_spec
.extra_args
.extend([
1029 '--tcp-ports', ' '.join(map(str, ports
))
1032 # osd deployments needs an --osd-uuid arg
1033 if daemon_spec
.daemon_type
== 'osd':
1034 if not osd_uuid_map
:
1035 osd_uuid_map
= self
.mgr
.get_osd_uuid_map()
1036 osd_uuid
= osd_uuid_map
.get(daemon_spec
.daemon_id
)
1038 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec
.daemon_id
)
1039 daemon_spec
.extra_args
.extend(['--osd-fsid', osd_uuid
])
1042 daemon_spec
.extra_args
.append('--reconfig')
1043 if self
.mgr
.allow_ptrace
:
1044 daemon_spec
.extra_args
.append('--allow-ptrace')
1046 if self
.mgr
.cache
.host_needs_registry_login(daemon_spec
.host
) and self
.mgr
.registry_url
:
1047 self
._registry
_login
(daemon_spec
.host
, self
.mgr
.registry_url
,
1048 self
.mgr
.registry_username
, self
.mgr
.registry_password
)
1050 self
.log
.info('%s daemon %s on %s' % (
1051 'Reconfiguring' if reconfig
else 'Deploying',
1052 daemon_spec
.name(), daemon_spec
.host
))
1054 out
, err
, code
= self
._run
_cephadm
(
1055 daemon_spec
.host
, daemon_spec
.name(), 'deploy',
1057 '--name', daemon_spec
.name(),
1058 '--meta-json', json
.dumps({
1059 'service_name': daemon_spec
.service_name
,
1060 'ports': daemon_spec
.ports
,
1061 'ip': daemon_spec
.ip
,
1062 'deployed_by': self
.mgr
.get_active_mgr_digests(),
1063 'rank': daemon_spec
.rank
,
1064 'rank_generation': daemon_spec
.rank_generation
,
1066 '--config-json', '-',
1067 ] + daemon_spec
.extra_args
,
1068 stdin
=json
.dumps(daemon_spec
.final_config
),
1071 # refresh daemon state? (ceph daemon reconfig does not need it)
1072 if not reconfig
or daemon_spec
.daemon_type
not in CEPH_TYPES
:
1073 if not code
and daemon_spec
.host
in self
.mgr
.cache
.daemons
:
1074 # prime cached service state with what we (should have)
1076 sd
= daemon_spec
.to_daemon_description(
1077 DaemonDescriptionStatus
.running
, 'starting')
1078 self
.mgr
.cache
.add_daemon(daemon_spec
.host
, sd
)
1079 if daemon_spec
.daemon_type
in REQUIRES_POST_ACTIONS
:
1080 self
.mgr
.requires_post_actions
.add(daemon_spec
.daemon_type
)
1081 self
.mgr
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1083 self
.mgr
.cache
.update_daemon_config_deps(
1084 daemon_spec
.host
, daemon_spec
.name(), daemon_spec
.deps
, start_time
)
1085 self
.mgr
.cache
.save_host(daemon_spec
.host
)
1086 msg
= "{} {} on host '{}'".format(
1087 'Reconfigured' if reconfig
else 'Deployed', daemon_spec
.name(), daemon_spec
.host
)
1089 self
.mgr
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.INFO
, msg
)
1091 what
= 'reconfigure' if reconfig
else 'deploy'
1092 self
.mgr
.events
.for_daemon(
1093 daemon_spec
.name(), OrchestratorEvent
.ERROR
, f
'Failed to {what}: {err}')
1095 except OrchestratorError
:
1096 redeploy
= daemon_spec
.name() in self
.mgr
.cache
.get_daemon_names()
1097 if not reconfig
and not redeploy
:
1098 # we have to clean up the daemon. E.g. keyrings.
1099 servict_type
= daemon_type_to_service(daemon_spec
.daemon_type
)
1100 dd
= daemon_spec
.to_daemon_description(DaemonDescriptionStatus
.error
, 'failed')
1101 self
.mgr
.cephadm_services
[servict_type
].post_remove(dd
)
1104 def _remove_daemon(self
, name
: str, host
: str) -> str:
1108 (daemon_type
, daemon_id
) = name
.split('.', 1)
1109 daemon
= orchestrator
.DaemonDescription(
1110 daemon_type
=daemon_type
,
1111 daemon_id
=daemon_id
,
1114 with
set_exception_subject('service', daemon
.service_id(), overwrite
=True):
1116 self
.mgr
.cephadm_services
[daemon_type_to_service(daemon_type
)].pre_remove(daemon
)
1118 # NOTE: we are passing the 'force' flag here, which means
1119 # we can delete a mon instances data.
1120 args
= ['--name', name
, '--force']
1121 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
1122 out
, err
, code
= self
._run
_cephadm
(
1123 host
, name
, 'rm-daemon', args
)
1125 # remove item from cache
1126 self
.mgr
.cache
.rm_daemon(host
, name
)
1127 self
.mgr
.cache
.invalidate_host_daemons(host
)
1129 self
.mgr
.cephadm_services
[daemon_type_to_service(daemon_type
)].post_remove(daemon
)
1131 return "Removed {} from host '{}'".format(name
, host
)
1133 def _run_cephadm_json(self
,
1135 entity
: Union
[CephadmNoImage
, str],
1138 no_fsid
: Optional
[bool] = False,
1139 image
: Optional
[str] = "",
1142 out
, err
, code
= self
._run
_cephadm
(
1143 host
, entity
, command
, args
, no_fsid
=no_fsid
, image
=image
)
1145 raise OrchestratorError(f
'host {host} `cephadm {command}` returned {code}: {err}')
1146 except Exception as e
:
1147 raise OrchestratorError(f
'host {host} `cephadm {command}` failed: {e}')
1149 return json
.loads(''.join(out
))
1150 except (ValueError, KeyError):
1151 msg
= f
'host {host} `cephadm {command}` failed: Cannot decode JSON'
1152 self
.log
.exception(f
'{msg}: {"".join(out)}')
1153 raise OrchestratorError(msg
)
1155 def _run_cephadm(self
,
1157 entity
: Union
[CephadmNoImage
, str],
1160 addr
: Optional
[str] = "",
1161 stdin
: Optional
[str] = "",
1162 no_fsid
: Optional
[bool] = False,
1163 error_ok
: Optional
[bool] = False,
1164 image
: Optional
[str] = "",
1165 env_vars
: Optional
[List
[str]] = None,
1166 ) -> Tuple
[List
[str], List
[str], int]:
1168 Run cephadm on the remote host with the given command + args
1170 Important: You probably don't want to run _run_cephadm from CLI handlers
1172 :env_vars: in format -> [KEY=VALUE, ..]
1174 self
.log
.debug(f
"_run_cephadm : command = {command}")
1175 self
.log
.debug(f
"_run_cephadm : args = {args}")
1177 bypass_image
= ('cephadm-exporter',)
1179 with self
._remote
_connection
(host
, addr
) as tpl
:
1181 assert image
or entity
1182 # Skip the image check for daemons deployed that are not ceph containers
1183 if not str(entity
).startswith(bypass_image
):
1184 if not image
and entity
is not cephadmNoImage
:
1185 image
= self
.mgr
._get
_container
_image
(entity
)
1191 for env_var_pair
in env_vars
:
1192 final_args
.extend(['--env', env_var_pair
])
1195 final_args
.extend(['--image', image
])
1197 if not self
.mgr
.container_init
:
1198 final_args
+= ['--no-container-init']
1201 final_args
.append(command
)
1205 final_args
+= ['--fsid', self
.mgr
._cluster
_fsid
]
1210 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1211 if self
.mgr
.mode
== 'root':
1213 self
.log
.debug('stdin: %s' % stdin
)
1215 python
= connr
.choose_python()
1218 'unable to find python on %s (tried %s in %s)' % (
1219 host
, remotes
.PYTHONS
, remotes
.PATH
))
1221 out
, err
, code
= remoto
.process
.check(
1223 [python
, self
.mgr
.cephadm_binary_path
] + final_args
,
1224 stdin
=stdin
.encode('utf-8') if stdin
is not None else None)
1226 out_ls
, err_ls
, code_ls
= remoto
.process
.check(
1227 conn
, ['ls', self
.mgr
.cephadm_binary_path
])
1229 self
._deploy
_cephadm
_binary
_conn
(conn
, host
)
1230 out
, err
, code
= remoto
.process
.check(
1232 [python
, self
.mgr
.cephadm_binary_path
] + final_args
,
1233 stdin
=stdin
.encode('utf-8') if stdin
is not None else None)
1235 except RuntimeError as e
:
1236 self
.mgr
._reset
_con
(host
)
1238 return [], [str(e
)], 1
1241 elif self
.mgr
.mode
== 'cephadm-package':
1243 out
, err
, code
= remoto
.process
.check(
1245 ['sudo', '/usr/bin/cephadm'] + final_args
,
1247 except RuntimeError as e
:
1248 self
.mgr
._reset
_con
(host
)
1250 return [], [str(e
)], 1
1253 assert False, 'unsupported mode'
1255 self
.log
.debug('code: %d' % code
)
1257 self
.log
.debug('out: %s' % '\n'.join(out
))
1259 self
.log
.debug('err: %s' % '\n'.join(err
))
1260 if code
and not error_ok
:
1261 raise OrchestratorError(
1262 'cephadm exited with an error code: %d, stderr:%s' % (
1263 code
, '\n'.join(err
)))
1264 return out
, err
, code
1266 def _get_container_image_info(self
, image_name
: str) -> ContainerInspectInfo
:
1267 # pick a random host...
1269 for host_name
in self
.mgr
.inventory
.keys():
1273 raise OrchestratorError('no hosts defined')
1274 if self
.mgr
.cache
.host_needs_registry_login(host
) and self
.mgr
.registry_url
:
1275 self
._registry
_login
(host
, self
.mgr
.registry_url
,
1276 self
.mgr
.registry_username
, self
.mgr
.registry_password
)
1278 j
= self
._run
_cephadm
_json
(host
, '', 'pull', [], image
=image_name
, no_fsid
=True)
1280 r
= ContainerInspectInfo(
1282 j
.get('ceph_version'),
1283 j
.get('repo_digests')
1285 self
.log
.debug(f
'image {image_name} -> {r}')
1288 # function responsible for logging single host into custom registry
1289 def _registry_login(self
, host
: str, url
: Optional
[str], username
: Optional
[str], password
: Optional
[str]) -> Optional
[str]:
1290 self
.log
.debug(f
"Attempting to log host {host} into custom registry @ {url}")
1291 # want to pass info over stdin rather than through normal list of args
1292 args_str
= json
.dumps({
1294 'username': username
,
1295 'password': password
,
1297 out
, err
, code
= self
._run
_cephadm
(
1298 host
, 'mon', 'registry-login',
1299 ['--registry-json', '-'], stdin
=args_str
, error_ok
=True)
1301 return f
"Host {host} failed to login to {url} as {username} with given password"
1304 def _deploy_cephadm_binary(self
, host
: str) -> None:
1305 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1306 self
.log
.info(f
"Deploying cephadm binary to {host}")
1307 with self
._remote
_connection
(host
) as tpl
:
1309 return self
._deploy
_cephadm
_binary
_conn
(conn
, host
)
1311 def _deploy_cephadm_binary_conn(self
, conn
: "BaseConnection", host
: str) -> None:
1312 _out
, _err
, code
= remoto
.process
.check(
1314 ['mkdir', '-p', f
'/var/lib/ceph/{self.mgr._cluster_fsid}'])
1316 msg
= f
"Unable to deploy the cephadm binary to {host}: {_err}"
1317 self
.log
.warning(msg
)
1318 raise OrchestratorError(msg
)
1319 _out
, _err
, code
= remoto
.process
.check(
1321 ['tee', '-', self
.mgr
.cephadm_binary_path
],
1322 stdin
=self
.mgr
._cephadm
.encode('utf-8'))
1324 msg
= f
"Unable to deploy the cephadm binary to {host}: {_err}"
1325 self
.log
.warning(msg
)
1326 raise OrchestratorError(msg
)
1328 def _write_remote_file(self
,
1335 with self
._remote
_connection
(host
) as tpl
:
1338 errmsg
= connr
.write_file(path
, content
, mode
, uid
, gid
)
1339 if errmsg
is not None:
1340 raise OrchestratorError(errmsg
)
1341 except Exception as e
:
1342 msg
= f
"Unable to write {host}:{path}: {e}"
1343 self
.log
.warning(msg
)
1344 raise OrchestratorError(msg
)
1347 def _remote_connection(self
,
1349 addr
: Optional
[str] = None,
1350 ) -> Iterator
[Tuple
["BaseConnection", Any
]]:
1351 if not addr
and host
in self
.mgr
.inventory
:
1352 addr
= self
.mgr
.inventory
.get_addr(host
)
1354 self
.mgr
.offline_hosts_remove(host
)
1359 raise OrchestratorError("host address is empty")
1360 conn
, connr
= self
.mgr
._get
_connection
(addr
)
1361 except OSError as e
:
1362 self
.mgr
._reset
_con
(host
)
1363 msg
= f
"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1364 raise execnet
.gateway_bootstrap
.HostNotFound(msg
)
1368 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1369 # this is a misleading exception as it seems to be thrown for
1370 # any sort of connection failure, even those having nothing to
1371 # do with "host not found" (e.g., ssh key permission denied).
1372 self
.mgr
.offline_hosts
.add(host
)
1373 self
.mgr
._reset
_con
(host
)
1375 user
= self
.mgr
.ssh_user
if self
.mgr
.mode
== 'root' else 'cephadm'
1376 if str(e
).startswith("Can't communicate"):
1379 msg
= f
'''Failed to connect to {host} ({addr}).
1380 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1382 To add the cephadm SSH key to the host:
1383 > ceph cephadm get-pub-key > ~/ceph.pub
1384 > ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
1386 To check that the host is reachable open a new shell with the --no-hosts flag:
1387 > cephadm shell --no-hosts
1389 Then run the following:
1390 > ceph cephadm get-ssh-config > ssh_config
1391 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1392 > chmod 0600 ~/cephadm_private_key
1393 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
1394 raise OrchestratorError(msg
) from e
1395 except Exception as ex
:
1396 self
.log
.exception(ex
)