4 from collections
import defaultdict
5 from functools
import wraps
6 from tempfile
import TemporaryDirectory
7 from threading
import Event
10 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
11 Any
, Set
, TYPE_CHECKING
, cast
18 import multiprocessing
.pool
22 from ceph
.deployment
import inventory
23 from ceph
.deployment
.drive_group
import DriveGroupSpec
24 from ceph
.deployment
.service_spec
import \
25 NFSServiceSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
27 from mgr_module
import MgrModule
, HandleCommandResult
29 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
31 from orchestrator
._interface
import GenericSpec
35 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
36 RbdMirrorService
, CrashService
, CephadmService
37 from .services
.iscsi
import IscsiService
38 from .services
.nfs
import NFSService
39 from .services
.osd
import RemoveUtil
, OSDRemoval
, OSDService
40 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
42 from .schedule
import HostAssignment
43 from .inventory
import Inventory
, SpecStore
, HostCache
44 from .upgrade
import CEPH_UPGRADE_ORDER
, CephadmUpgrade
45 from .template
import TemplateMgr
49 # NOTE(mattoliverau) Patch remoto until remoto PR
50 # (https://github.com/alfredodeza/remoto/pull/56) lands
51 from distutils
.version
import StrictVersion
52 if StrictVersion(remoto
.__version
__) <= StrictVersion('1.2'):
53 def remoto_has_connection(self
):
54 return self
.gateway
.hasreceiver()
56 from remoto
.backends
import BaseConnection
57 BaseConnection
.has_connection
= remoto_has_connection
59 import execnet
.gateway_bootstrap
60 except ImportError as e
:
62 remoto_import_error
= str(e
)
65 from typing
import List
69 logger
= logging
.getLogger(__name__
)
73 DEFAULT_SSH_CONFIG
= """
76 StrictHostKeyChecking no
77 UserKnownHostsFile /dev/null
81 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
82 CEPH_DATEFMT
= '%Y-%m-%dT%H:%M:%S.%fZ'
84 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
87 def forall_hosts(f
: Callable
[..., T
]) -> Callable
[..., List
[T
]]:
89 def forall_hosts_wrapper(*args
) -> List
[T
]:
91 # Some weired logic to make calling functions with multiple arguments work.
98 assert 'either f([...]) or self.f([...])'
101 if not isinstance(arg
, tuple):
107 except Exception as e
:
108 logger
.exception(f
'executing {f.__name__}({args}) failed.')
111 assert CephadmOrchestrator
.instance
is not None
112 return CephadmOrchestrator
.instance
._worker
_pool
.map(do_work
, vals
)
115 return forall_hosts_wrapper
118 class CephadmCompletion(orchestrator
.Completion
):
122 def trivial_completion(f
: Callable
) -> Callable
[..., CephadmCompletion
]:
124 Decorator to make CephadmCompletion methods return
125 a completion object that executes themselves.
129 def wrapper(*args
, **kwargs
):
130 return CephadmCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
135 @six.add_metaclass(CLICommandMeta
)
136 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
138 _STORE_HOST_PREFIX
= "host"
141 NATIVE_OPTIONS
= [] # type: List[Any]
144 'name': 'ssh_config_file',
147 'desc': 'customized SSH config file to connect to managed hosts',
150 'name': 'device_cache_timeout',
153 'desc': 'seconds to cache device inventory',
156 'name': 'daemon_cache_timeout',
159 'desc': 'seconds to cache service (daemon) inventory',
162 'name': 'host_check_interval',
165 'desc': 'how frequently to perform a host check',
170 'enum_allowed': ['root', 'cephadm-package'],
172 'desc': 'mode for remote execution of cephadm',
175 'name': 'container_image_base',
176 'default': 'docker.io/ceph/ceph',
177 'desc': 'Container image name, without the tag',
181 'name': 'container_image_prometheus',
182 'default': 'prom/prometheus:v2.18.1',
183 'desc': 'Prometheus container image',
186 'name': 'container_image_grafana',
187 'default': 'ceph/ceph-grafana:latest',
188 'desc': 'Prometheus container image',
191 'name': 'container_image_alertmanager',
192 'default': 'prom/alertmanager:v0.20.0',
193 'desc': 'Prometheus container image',
196 'name': 'container_image_node_exporter',
197 'default': 'prom/node-exporter:v0.18.1',
198 'desc': 'Prometheus container image',
201 'name': 'warn_on_stray_hosts',
204 'desc': 'raise a health warning if daemons are detected on a host '
205 'that is not managed by cephadm',
208 'name': 'warn_on_stray_daemons',
211 'desc': 'raise a health warning if daemons are detected '
212 'that are not managed by cephadm',
215 'name': 'warn_on_failed_host_check',
218 'desc': 'raise a health warning if the host check fails',
221 'name': 'log_to_cluster',
224 'desc': 'log to the "cephadm" cluster log channel"',
227 'name': 'allow_ptrace',
230 'desc': 'allow SYS_PTRACE capability on ceph containers',
231 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
232 'process with gdb or strace. Enabling this options '
233 'can allow debugging daemons that encounter problems '
237 'name': 'prometheus_alerts_path',
239 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
240 'desc': 'location of alerts to include in prometheus deployments',
244 def __init__(self
, *args
, **kwargs
):
245 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
246 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
252 if self
.get_store('pause'):
257 # for mypy which does not run the code
259 self
.ssh_config_file
= None # type: Optional[str]
260 self
.device_cache_timeout
= 0
261 self
.daemon_cache_timeout
= 0
262 self
.host_check_interval
= 0
264 self
.container_image_base
= ''
265 self
.container_image_prometheus
= ''
266 self
.container_image_grafana
= ''
267 self
.container_image_alertmanager
= ''
268 self
.container_image_node_exporter
= ''
269 self
.warn_on_stray_hosts
= True
270 self
.warn_on_stray_daemons
= True
271 self
.warn_on_failed_host_check
= True
272 self
.allow_ptrace
= False
273 self
.prometheus_alerts_path
= ''
275 self
._cons
= {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
279 path
= self
.get_ceph_option('cephadm_path')
281 with
open(path
, 'r') as f
:
282 self
._cephadm
= f
.read()
283 except (IOError, TypeError) as e
:
284 raise RuntimeError("unable to read cephadm at '%s': %s" % (
287 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
291 CephadmOrchestrator
.instance
= self
293 self
.upgrade
= CephadmUpgrade(self
)
295 self
.health_checks
= {}
297 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
299 self
.inventory
= Inventory(self
)
301 self
.cache
= HostCache(self
)
303 self
.rm_util
= RemoveUtil(self
)
305 self
.spec_store
= SpecStore(self
)
306 self
.spec_store
.load()
308 # ensure the host lists are in sync
309 for h
in self
.inventory
.keys():
310 if h
not in self
.cache
.daemons
:
311 self
.cache
.prime_empty_host(h
)
312 for h
in self
.cache
.get_hosts():
313 if h
not in self
.inventory
:
314 self
.cache
.rm_host(h
)
317 self
.offline_hosts
: Set
[str] = set()
320 self
.osd_service
= OSDService(self
)
321 self
.nfs_service
= NFSService(self
)
322 self
.mon_service
= MonService(self
)
323 self
.mgr_service
= MgrService(self
)
324 self
.mds_service
= MdsService(self
)
325 self
.rgw_service
= RgwService(self
)
326 self
.rbd_mirror_service
= RbdMirrorService(self
)
327 self
.grafana_service
= GrafanaService(self
)
328 self
.alertmanager_service
= AlertmanagerService(self
)
329 self
.prometheus_service
= PrometheusService(self
)
330 self
.node_exporter_service
= NodeExporterService(self
)
331 self
.crash_service
= CrashService(self
)
332 self
.iscsi_service
= IscsiService(self
)
333 self
.cephadm_services
= {
334 'mon': self
.mon_service
,
335 'mgr': self
.mgr_service
,
336 'osd': self
.osd_service
,
337 'mds': self
.mds_service
,
338 'rgw': self
.rgw_service
,
339 'rbd-mirror': self
.rbd_mirror_service
,
340 'nfs': self
.nfs_service
,
341 'grafana': self
.grafana_service
,
342 'alertmanager': self
.alertmanager_service
,
343 'prometheus': self
.prometheus_service
,
344 'node-exporter': self
.node_exporter_service
,
345 'crash': self
.crash_service
,
346 'iscsi': self
.iscsi_service
,
349 self
.template
= TemplateMgr()
352 self
.log
.debug('shutdown')
353 self
._worker
_pool
.close()
354 self
._worker
_pool
.join()
358 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
359 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
360 return self
.cephadm_services
[service_type
]
362 def _kick_serve_loop(self
):
363 self
.log
.debug('_kick_serve_loop')
366 def _check_safe_to_destroy_mon(self
, mon_id
):
367 # type: (str) -> None
368 ret
, out
, err
= self
.check_mon_command({
369 'prefix': 'quorum_status',
373 except Exception as e
:
374 raise OrchestratorError('failed to parse quorum status')
376 mons
= [m
['name'] for m
in j
['monmap']['mons']]
377 if mon_id
not in mons
:
378 self
.log
.info('Safe to remove mon.%s: not in monmap (%s)' % (
381 new_mons
= [m
for m
in mons
if m
!= mon_id
]
382 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
383 if len(new_quorum
) > len(new_mons
) / 2:
384 self
.log
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id
, new_quorum
, new_mons
))
386 raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
388 def _check_host(self
, host
):
389 if host
not in self
.inventory
:
391 self
.log
.debug(' checking %s' % host
)
393 out
, err
, code
= self
._run
_cephadm
(
394 host
, 'client', 'check-host', [],
395 error_ok
=True, no_fsid
=True)
396 self
.cache
.update_last_host_check(host
)
397 self
.cache
.save_host(host
)
399 self
.log
.debug(' host %s failed check' % host
)
400 if self
.warn_on_failed_host_check
:
401 return 'host %s failed check: %s' % (host
, err
)
403 self
.log
.debug(' host %s ok' % host
)
404 except Exception as e
:
405 self
.log
.debug(' host %s failed check' % host
)
406 return 'host %s failed check: %s' % (host
, e
)
408 def _check_for_strays(self
):
409 self
.log
.debug('_check_for_strays')
410 for k
in ['CEPHADM_STRAY_HOST',
411 'CEPHADM_STRAY_DAEMON']:
412 if k
in self
.health_checks
:
413 del self
.health_checks
[k
]
414 if self
.warn_on_stray_hosts
or self
.warn_on_stray_daemons
:
415 ls
= self
.list_servers()
416 managed
= self
.cache
.get_daemon_names()
417 host_detail
= [] # type: List[str]
419 daemon_detail
= [] # type: List[str]
421 host
= item
.get('hostname')
422 daemons
= item
.get('services') # misnomer!
425 name
= '%s.%s' % (s
.get('type'), s
.get('id'))
426 if host
not in self
.inventory
:
427 missing_names
.append(name
)
428 host_num_daemons
+= 1
429 if name
not in managed
:
430 daemon_detail
.append(
431 'stray daemon %s on host %s not managed by cephadm' % (name
, host
))
434 'stray host %s has %d stray daemons: %s' % (
435 host
, len(missing_names
), missing_names
))
436 if self
.warn_on_stray_hosts
and host_detail
:
437 self
.health_checks
['CEPHADM_STRAY_HOST'] = {
438 'severity': 'warning',
439 'summary': '%d stray host(s) with %s daemon(s) '
440 'not managed by cephadm' % (
441 len(host_detail
), host_num_daemons
),
442 'count': len(host_detail
),
443 'detail': host_detail
,
445 if self
.warn_on_stray_daemons
and daemon_detail
:
446 self
.health_checks
['CEPHADM_STRAY_DAEMON'] = {
447 'severity': 'warning',
448 'summary': '%d stray daemons(s) not managed by cephadm' % (
450 'count': len(daemon_detail
),
451 'detail': daemon_detail
,
453 self
.set_health_checks(self
.health_checks
)
455 def _serve_sleep(self
):
457 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
458 ret
= self
.event
.wait(sleep_interval
)
463 self
.log
.debug("serve starting")
467 self
.log
.debug('refreshing hosts')
470 for host
in self
.cache
.get_hosts():
471 if self
.cache
.host_needs_check(host
):
472 r
= self
._check
_host
(host
)
475 if self
.cache
.host_needs_daemon_refresh(host
):
476 self
.log
.debug('refreshing %s daemons' % host
)
477 r
= self
._refresh
_host
_daemons
(host
)
480 if self
.cache
.host_needs_device_refresh(host
):
481 self
.log
.debug('refreshing %s devices' % host
)
482 r
= self
._refresh
_host
_devices
(host
)
486 if self
.cache
.host_needs_osdspec_preview_refresh(host
):
487 self
.log
.debug(f
"refreshing OSDSpec previews for {host}")
488 r
= self
._refresh
_host
_osdspec
_previews
(host
)
492 health_changed
= False
493 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
494 del self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']
495 health_changed
= True
497 self
.health_checks
['CEPHADM_HOST_CHECK_FAILED'] = {
498 'severity': 'warning',
499 'summary': '%d hosts fail cephadm check' % len(bad_hosts
),
500 'count': len(bad_hosts
),
503 health_changed
= True
505 self
.health_checks
['CEPHADM_REFRESH_FAILED'] = {
506 'severity': 'warning',
507 'summary': 'failed to probe daemons or devices',
508 'count': len(failures
),
511 health_changed
= True
512 elif 'CEPHADM_REFRESH_FAILED' in self
.health_checks
:
513 del self
.health_checks
['CEPHADM_REFRESH_FAILED']
514 health_changed
= True
516 self
.set_health_checks(self
.health_checks
)
518 self
._check
_for
_strays
()
521 self
.health_checks
['CEPHADM_PAUSED'] = {
522 'severity': 'warning',
523 'summary': 'cephadm background work is paused',
525 'detail': ["'ceph orch resume' to resume"],
527 self
.set_health_checks(self
.health_checks
)
529 if 'CEPHADM_PAUSED' in self
.health_checks
:
530 del self
.health_checks
['CEPHADM_PAUSED']
531 self
.set_health_checks(self
.health_checks
)
533 self
.rm_util
._remove
_osds
_bg
()
535 if self
._apply
_all
_services
():
536 continue # did something, refresh
538 self
._check
_daemons
()
540 if self
.upgrade
.continue_upgrade():
544 self
.log
.debug("serve exit")
546 def config_notify(self
):
548 This method is called whenever one of our config options is changed.
550 for opt
in self
.MODULE_OPTIONS
:
552 opt
['name'], # type: ignore
553 self
.get_module_option(opt
['name'])) # type: ignore
554 self
.log
.debug(' mgr option %s = %s',
555 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
556 for opt
in self
.NATIVE_OPTIONS
:
559 self
.get_ceph_option(opt
))
560 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
564 def notify(self
, notify_type
, notify_id
):
569 self
.log
.info('Paused')
570 self
.set_store('pause', 'true')
572 # wake loop so we update the health status
573 self
._kick
_serve
_loop
()
577 self
.log
.info('Resumed')
579 self
.set_store('pause', None)
580 # unconditionally wake loop so that 'orch resume' can be used to kick
582 self
._kick
_serve
_loop
()
584 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
586 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
588 Generate a unique random service name
590 suffix
= daemon_type
not in [
591 'mon', 'crash', 'nfs',
592 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
595 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
596 raise orchestrator
.OrchestratorValidationError('name %s already in use', forcename
)
600 host
= host
.split('.')[0]
608 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
610 if len([d
for d
in existing
if d
.daemon_id
== name
]):
612 raise orchestrator
.OrchestratorValidationError('name %s already in use', name
)
613 self
.log
.debug('name %s exists, trying again', name
)
617 def _reconfig_ssh(self
):
618 temp_files
= [] # type: list
619 ssh_options
= [] # type: List[str]
622 ssh_config_fname
= self
.ssh_config_file
623 ssh_config
= self
.get_store("ssh_config")
624 if ssh_config
is not None or ssh_config_fname
is None:
626 ssh_config
= DEFAULT_SSH_CONFIG
627 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
628 os
.fchmod(f
.fileno(), 0o600)
629 f
.write(ssh_config
.encode('utf-8'))
630 f
.flush() # make visible to other processes
632 ssh_config_fname
= f
.name
634 self
.validate_ssh_config_fname(ssh_config_fname
)
635 ssh_options
+= ['-F', ssh_config_fname
]
638 ssh_key
= self
.get_store("ssh_identity_key")
639 ssh_pub
= self
.get_store("ssh_identity_pub")
640 self
.ssh_pub
= ssh_pub
641 self
.ssh_key
= ssh_key
642 if ssh_key
and ssh_pub
:
643 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
644 tkey
.write(ssh_key
.encode('utf-8'))
645 os
.fchmod(tkey
.fileno(), 0o600)
646 tkey
.flush() # make visible to other processes
647 tpub
= open(tkey
.name
+ '.pub', 'w')
648 os
.fchmod(tpub
.fileno(), 0o600)
650 tpub
.flush() # make visible to other processes
651 temp_files
+= [tkey
, tpub
]
652 ssh_options
+= ['-i', tkey
.name
]
654 self
._temp
_files
= temp_files
656 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
658 self
._ssh
_options
= None
660 if self
.mode
== 'root':
661 self
.ssh_user
= 'root'
662 elif self
.mode
== 'cephadm-package':
663 self
.ssh_user
= 'cephadm'
667 def validate_ssh_config_fname(self
, ssh_config_fname
):
668 if not os
.path
.isfile(ssh_config_fname
):
669 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
672 def _reset_con(self
, host
):
673 conn
, r
= self
._cons
.get(host
, (None, None))
675 self
.log
.debug('_reset_con close %s' % host
)
679 def _reset_cons(self
):
680 for host
, conn_and_r
in self
._cons
.items():
681 self
.log
.debug('_reset_cons close %s' % host
)
686 def offline_hosts_remove(self
, host
):
687 if host
in self
.offline_hosts
:
688 self
.offline_hosts
.remove(host
)
693 if remoto
is not None:
696 return False, "loading remoto library:{}".format(
701 The cephadm orchestrator is always available.
703 return self
.can_run()
705 def process(self
, completions
):
707 Does nothing, as completions are processed in another thread.
710 self
.log
.debug("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
712 for p
in completions
:
715 @orchestrator._cli
_write
_command
(
716 prefix
='cephadm set-ssh-config',
717 desc
='Set the ssh_config file (use -i <ssh_config>)')
718 def _set_ssh_config(self
, inbuf
=None):
720 Set an ssh_config file provided from stdin
725 if inbuf
is None or len(inbuf
) == 0:
726 return -errno
.EINVAL
, "", "empty ssh config provided"
727 self
.set_store("ssh_config", inbuf
)
728 self
.log
.info('Set ssh_config')
731 @orchestrator._cli
_write
_command
(
732 prefix
='cephadm clear-ssh-config',
733 desc
='Clear the ssh_config file')
734 def _clear_ssh_config(self
):
736 Clear the ssh_config file provided from stdin
738 self
.set_store("ssh_config", None)
739 self
.ssh_config_tmp
= None
740 self
.log
.info('Cleared ssh_config')
743 @orchestrator._cli
_read
_command
(
744 prefix
='cephadm get-ssh-config',
745 desc
='Returns the ssh config as used by cephadm'
747 def _get_ssh_config(self
):
748 if self
.ssh_config_file
:
749 self
.validate_ssh_config_fname(self
.ssh_config_file
)
750 with
open(self
.ssh_config_file
) as f
:
751 return HandleCommandResult(stdout
=f
.read())
752 ssh_config
= self
.get_store("ssh_config")
754 return HandleCommandResult(stdout
=ssh_config
)
755 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
758 @orchestrator._cli
_write
_command
(
759 'cephadm generate-key',
760 desc
='Generate a cluster SSH key (if not present)')
761 def _generate_key(self
):
762 if not self
.ssh_pub
or not self
.ssh_key
:
763 self
.log
.info('Generating ssh key...')
764 tmp_dir
= TemporaryDirectory()
765 path
= tmp_dir
.name
+ '/key'
767 subprocess
.check_call([
768 '/usr/bin/ssh-keygen',
769 '-C', 'ceph-%s' % self
._cluster
_fsid
,
773 with
open(path
, 'r') as f
:
775 with
open(path
+ '.pub', 'r') as f
:
779 os
.unlink(path
+ '.pub')
781 self
.set_store('ssh_identity_key', secret
)
782 self
.set_store('ssh_identity_pub', pub
)
786 @orchestrator._cli
_write
_command
(
787 'cephadm set-priv-key',
788 desc
='Set cluster SSH private key (use -i <private_key>)')
789 def _set_priv_key(self
, inbuf
=None):
790 if inbuf
is None or len(inbuf
) == 0:
791 return -errno
.EINVAL
, "", "empty private ssh key provided"
792 self
.set_store("ssh_identity_key", inbuf
)
793 self
.log
.info('Set ssh private key')
797 @orchestrator._cli
_write
_command
(
798 'cephadm set-pub-key',
799 desc
='Set cluster SSH public key (use -i <public_key>)')
800 def _set_pub_key(self
, inbuf
=None):
801 if inbuf
is None or len(inbuf
) == 0:
802 return -errno
.EINVAL
, "", "empty public ssh key provided"
803 self
.set_store("ssh_identity_pub", inbuf
)
804 self
.log
.info('Set ssh public key')
808 @orchestrator._cli
_write
_command
(
810 desc
='Clear cluster SSH key')
811 def _clear_key(self
):
812 self
.set_store('ssh_identity_key', None)
813 self
.set_store('ssh_identity_pub', None)
815 self
.log
.info('Cleared cluster SSH key')
818 @orchestrator._cli
_read
_command
(
819 'cephadm get-pub-key',
820 desc
='Show SSH public key for connecting to cluster hosts')
821 def _get_pub_key(self
):
823 return 0, self
.ssh_pub
, ''
825 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
827 @orchestrator._cli
_read
_command
(
829 desc
='Show user for SSHing to cluster hosts')
831 return 0, self
.ssh_user
, ''
833 @orchestrator._cli
_read
_command
(
834 'cephadm check-host',
835 'name=host,type=CephString '
836 'name=addr,type=CephString,req=false',
837 'Check whether we can access and manage a remote host')
838 def check_host(self
, host
, addr
=None):
839 out
, err
, code
= self
._run
_cephadm
(host
, 'client', 'check-host',
840 ['--expect-hostname', host
],
842 error_ok
=True, no_fsid
=True)
844 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
845 # if we have an outstanding health alert for this host, give the
846 # serve thread a kick
847 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
848 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
849 if item
.startswith('host %s ' % host
):
851 return 0, '%s (%s) ok' % (host
, addr
), err
853 @orchestrator._cli
_read
_command
(
854 'cephadm prepare-host',
855 'name=host,type=CephString '
856 'name=addr,type=CephString,req=false',
857 'Prepare a remote host for use with cephadm')
858 def _prepare_host(self
, host
, addr
=None):
859 out
, err
, code
= self
._run
_cephadm
(host
, 'client', 'prepare-host',
860 ['--expect-hostname', host
],
862 error_ok
=True, no_fsid
=True)
864 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
865 # if we have an outstanding health alert for this host, give the
866 # serve thread a kick
867 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
868 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
869 if item
.startswith('host %s ' % host
):
871 return 0, '%s (%s) ok' % (host
, addr
), err
873 def _get_connection(self
, host
):
875 Setup a connection for running commands on remote host.
877 conn
, r
= self
._cons
.get(host
, (None, None))
879 if conn
.has_connection():
880 self
.log
.debug('Have connection to %s' % host
)
883 self
._reset
_con
(host
)
884 n
= self
.ssh_user
+ '@' + host
885 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
886 n
, self
._ssh
_options
))
887 child_logger
=self
.log
.getChild(n
)
888 child_logger
.setLevel('WARNING')
889 conn
= remoto
.Connection(
892 ssh_options
=self
._ssh
_options
)
894 r
= conn
.import_module(remotes
)
895 self
._cons
[host
] = conn
, r
899 def _executable_path(self
, conn
, executable
):
901 Remote validator that accepts a connection object to ensure that a certain
902 executable is available returning its full path if so.
904 Otherwise an exception with thorough details will be raised, informing the
905 user that the executable was not found.
907 executable_path
= conn
.remote_module
.which(executable
)
908 if not executable_path
:
909 raise RuntimeError("Executable '{}' not found on host '{}'".format(
910 executable
, conn
.hostname
))
911 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
913 return executable_path
915 def _run_cephadm(self
,
917 entity
: Optional
[str],
920 addr
: Optional
[str] = None,
921 stdin
: Optional
[str] = None,
924 image
: Optional
[str] = None,
925 env_vars
: Optional
[List
[str]] = None,
926 ) -> Tuple
[List
[str], List
[str], int]:
928 Run cephadm on the remote host with the given command + args
930 :env_vars: in format -> [KEY=VALUE, ..]
932 if not addr
and host
in self
.inventory
:
933 addr
= self
.inventory
.get_addr(host
)
935 self
.offline_hosts_remove(host
)
939 conn
, connr
= self
._get
_connection
(addr
)
942 self
.log
.exception('failed to establish ssh connection')
943 return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
944 raise execnet
.gateway_bootstrap
.HostNotFound(str(e
)) from e
946 assert image
or entity
948 daemon_type
= entity
.split('.', 1)[0] # type: ignore
949 if daemon_type
in CEPH_TYPES
or \
950 daemon_type
== 'nfs' or \
951 daemon_type
== 'iscsi':
952 # get container image
953 ret
, image
, err
= self
.check_mon_command({
954 'prefix': 'config get',
955 'who': utils
.name_to_config_section(entity
),
956 'key': 'container_image',
958 image
= image
.strip() # type: ignore
959 elif daemon_type
== 'prometheus':
960 image
= self
.container_image_prometheus
961 elif daemon_type
== 'grafana':
962 image
= self
.container_image_grafana
963 elif daemon_type
== 'alertmanager':
964 image
= self
.container_image_alertmanager
965 elif daemon_type
== 'node-exporter':
966 image
= self
.container_image_node_exporter
968 self
.log
.debug('%s container image %s' % (entity
, image
))
973 for env_var_pair
in env_vars
:
974 final_args
.extend(['--env', env_var_pair
])
977 final_args
.extend(['--image', image
])
978 final_args
.append(command
)
981 final_args
+= ['--fsid', self
._cluster
_fsid
]
984 self
.log
.debug('args: %s' % (' '.join(final_args
)))
985 if self
.mode
== 'root':
987 self
.log
.debug('stdin: %s' % stdin
)
988 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
990 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
991 script
+= self
._cephadm
992 python
= connr
.choose_python()
995 'unable to find python on %s (tried %s in %s)' % (
996 host
, remotes
.PYTHONS
, remotes
.PATH
))
998 out
, err
, code
= remoto
.process
.check(
1001 stdin
=script
.encode('utf-8'))
1002 except RuntimeError as e
:
1003 self
._reset
_con
(host
)
1005 return [], [str(e
)], 1
1007 elif self
.mode
== 'cephadm-package':
1009 out
, err
, code
= remoto
.process
.check(
1011 ['sudo', '/usr/bin/cephadm'] + final_args
,
1013 except RuntimeError as e
:
1014 self
._reset
_con
(host
)
1016 return [], [str(e
)], 1
1019 assert False, 'unsupported mode'
1021 self
.log
.debug('code: %d' % code
)
1023 self
.log
.debug('out: %s' % '\n'.join(out
))
1025 self
.log
.debug('err: %s' % '\n'.join(err
))
1026 if code
and not error_ok
:
1028 'cephadm exited with an error code: %d, stderr:%s' % (
1029 code
, '\n'.join(err
)))
1030 return out
, err
, code
1032 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1033 # this is a misleading exception as it seems to be thrown for
1034 # any sort of connection failure, even those having nothing to
1035 # do with "host not found" (e.g., ssh key permission denied).
1036 self
.offline_hosts
.add(host
)
1037 user
= 'root' if self
.mode
== 'root' else 'cephadm'
1038 msg
= f
'''Failed to connect to {host} ({addr}).
1039 Check that the host is reachable and accepts connections using the cephadm SSH key
1041 you may want to run:
1042 > ceph cephadm get-ssh-config > ssh_config
1043 > ceph config-key get mgr/cephadm/ssh_identity_key > key
1044 > ssh -F ssh_config -i key {user}@{host}'''
1045 raise OrchestratorError(msg
) from e
1046 except Exception as ex
:
1047 self
.log
.exception(ex
)
1050 def _get_hosts(self
, label
: Optional
[str] = '', as_hostspec
: bool = False) -> List
:
1051 return list(self
.inventory
.filter_by_label(label
=label
, as_hostspec
=as_hostspec
))
1053 def _add_host(self
, spec
):
1054 # type: (HostSpec) -> str
1056 Add a host to be managed by the orchestrator.
1058 :param host: host name
1060 assert_valid_host(spec
.hostname
)
1061 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, 'client', 'check-host',
1062 ['--expect-hostname', spec
.hostname
],
1064 error_ok
=True, no_fsid
=True)
1066 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1067 spec
.hostname
, spec
.addr
, err
))
1069 self
.inventory
.add_host(spec
)
1070 self
.cache
.prime_empty_host(spec
.hostname
)
1071 self
.offline_hosts_remove(spec
.hostname
)
1072 self
.event
.set() # refresh stray health check
1073 self
.log
.info('Added host %s' % spec
.hostname
)
1074 return "Added host '{}'".format(spec
.hostname
)
1077 def add_host(self
, spec
: HostSpec
) -> str:
1078 return self
._add
_host
(spec
)
1081 def remove_host(self
, host
):
1082 # type: (str) -> str
1084 Remove a host from orchestrator management.
1086 :param host: host name
1088 self
.inventory
.rm_host(host
)
1089 self
.cache
.rm_host(host
)
1090 self
._reset
_con
(host
)
1091 self
.event
.set() # refresh stray health check
1092 self
.log
.info('Removed host %s' % host
)
1093 return "Removed host '{}'".format(host
)
1096 def update_host_addr(self
, host
, addr
):
1097 self
.inventory
.set_addr(host
, addr
)
1098 self
._reset
_con
(host
)
1099 self
.event
.set() # refresh stray health check
1100 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1101 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1104 def get_hosts(self
):
1105 # type: () -> List[orchestrator.HostSpec]
1107 Return a list of hosts managed by the orchestrator.
1110 - skip async: manager reads from cache.
1112 return list(self
.inventory
.all_specs())
1115 def add_host_label(self
, host
, label
):
1116 self
.inventory
.add_label(host
, label
)
1117 self
.log
.info('Added label %s to host %s' % (label
, host
))
1118 return 'Added label %s to host %s' % (label
, host
)
1121 def remove_host_label(self
, host
, label
):
1122 self
.inventory
.rm_label(host
, label
)
1123 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1124 return 'Removed label %s from host %s' % (label
, host
)
1126 def update_osdspec_previews(self
, search_host
: str = ''):
1127 # Set global 'pending' flag for host
1128 self
.cache
.loading_osdspec_preview
.add(search_host
)
1130 # query OSDSpecs for host <search host> and generate/get the preview
1131 # There can be multiple previews for one host due to multiple OSDSpecs.
1132 previews
.extend(self
.osd_service
.get_previews(search_host
))
1133 self
.log
.debug(f
"Loading OSDSpec previews to HostCache")
1134 self
.cache
.osdspec_previews
[search_host
] = previews
1135 # Unset global 'pending' flag for host
1136 self
.cache
.loading_osdspec_preview
.remove(search_host
)
1138 def _refresh_host_osdspec_previews(self
, host
) -> bool:
1139 self
.update_osdspec_previews(host
)
1140 self
.cache
.save_host(host
)
1141 self
.log
.debug(f
'Refreshed OSDSpec previews for host <{host}>')
1144 def _refresh_host_daemons(self
, host
):
1146 out
, err
, code
= self
._run
_cephadm
(
1147 host
, 'mon', 'ls', [], no_fsid
=True)
1149 return 'host %s cephadm ls returned %d: %s' % (
1151 except Exception as e
:
1152 return 'host %s scrape failed: %s' % (host
, e
)
1153 ls
= json
.loads(''.join(out
))
1156 if not d
['style'].startswith('cephadm'):
1158 if d
['fsid'] != self
._cluster
_fsid
:
1160 if '.' not in d
['name']:
1162 sd
= orchestrator
.DaemonDescription()
1163 sd
.last_refresh
= datetime
.datetime
.utcnow()
1164 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
1167 setattr(sd
, k
, datetime
.datetime
.strptime(d
[k
], DATEFMT
))
1168 sd
.daemon_type
= d
['name'].split('.')[0]
1169 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
1171 sd
.container_id
= d
.get('container_id')
1174 sd
.container_id
= sd
.container_id
[0:12]
1175 sd
.container_image_name
= d
.get('container_image_name')
1176 sd
.container_image_id
= d
.get('container_image_id')
1177 sd
.version
= d
.get('version')
1178 if sd
.daemon_type
== 'osd':
1179 sd
.osdspec_affinity
= self
.osd_service
.get_osdspec_affinity(sd
.daemon_id
)
1181 sd
.status_desc
= d
['state']
1189 sd
.status_desc
= 'unknown'
1192 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
1193 self
.cache
.update_host_daemons(host
, dm
)
1194 self
.cache
.save_host(host
)
1197 def _refresh_host_devices(self
, host
):
1199 out
, err
, code
= self
._run
_cephadm
(
1202 ['--', 'inventory', '--format=json'])
1204 return 'host %s ceph-volume inventory returned %d: %s' % (
1206 except Exception as e
:
1207 return 'host %s ceph-volume inventory failed: %s' % (host
, e
)
1208 devices
= json
.loads(''.join(out
))
1210 out
, err
, code
= self
._run
_cephadm
(
1216 return 'host %s list-networks returned %d: %s' % (
1218 except Exception as e
:
1219 return 'host %s list-networks failed: %s' % (host
, e
)
1220 networks
= json
.loads(''.join(out
))
1221 self
.log
.debug('Refreshed host %s devices (%d) networks (%s)' % (
1222 host
, len(devices
), len(networks
)))
1223 devices
= inventory
.Devices
.from_json(devices
)
1224 self
.cache
.update_host_devices_networks(host
, devices
.devices
, networks
)
1225 self
.update_osdspec_previews(host
)
1226 self
.cache
.save_host(host
)
1230 def describe_service(self
, service_type
=None, service_name
=None,
1233 # ugly sync path, FIXME someday perhaps?
1234 for host
in self
.inventory
.keys():
1235 self
._refresh
_host
_daemons
(host
)
1237 sm
= {} # type: Dict[str, orchestrator.ServiceDescription]
1239 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1240 for name
, dd
in dm
.items():
1241 if service_type
and service_type
!= dd
.daemon_type
:
1243 n
: str = dd
.service_name()
1244 if service_name
and service_name
!= n
:
1246 if dd
.daemon_type
== 'osd':
1248 OSDs do not know the affinity to their spec out of the box.
1250 n
= f
"osd.{dd.osdspec_affinity}"
1251 if n
in self
.spec_store
.specs
:
1252 spec
= self
.spec_store
.specs
[n
]
1256 service_type
=dd
.daemon_type
,
1257 service_id
=dd
.service_id(),
1258 placement
=PlacementSpec(
1263 sm
[n
] = orchestrator
.ServiceDescription(
1264 last_refresh
=dd
.last_refresh
,
1265 container_image_id
=dd
.container_image_id
,
1266 container_image_name
=dd
.container_image_name
,
1269 if n
in self
.spec_store
.specs
:
1270 if dd
.daemon_type
== 'osd':
1272 The osd count can't be determined by the Placement spec.
1273 It's rather pointless to show a actual/expected representation
1274 here. So we're setting running = size for now.
1277 sm
[n
].size
= osd_count
1279 sm
[n
].size
= spec
.placement
.get_host_selection_size(self
._get
_hosts
)
1281 sm
[n
].created
= self
.spec_store
.spec_created
[n
]
1282 if service_type
== 'nfs':
1283 spec
= cast(NFSServiceSpec
, spec
)
1284 sm
[n
].rados_config_location
= spec
.rados_config_location()
1289 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1290 sm
[n
].last_refresh
= dd
.last_refresh
1291 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1292 sm
[n
].container_image_id
= 'mix'
1293 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1294 sm
[n
].container_image_name
= 'mix'
1295 for n
, spec
in self
.spec_store
.specs
.items():
1298 if service_type
is not None and service_type
!= spec
.service_type
:
1300 if service_name
is not None and service_name
!= n
:
1302 sm
[n
] = orchestrator
.ServiceDescription(
1304 size
=spec
.placement
.get_host_selection_size(self
._get
_hosts
),
1307 if service_type
== 'nfs':
1308 spec
= cast(NFSServiceSpec
, spec
)
1309 sm
[n
].rados_config_location
= spec
.rados_config_location()
1310 return list(sm
.values())
1313 def list_daemons(self
, service_name
=None, daemon_type
=None, daemon_id
=None,
1314 host
=None, refresh
=False):
1316 # ugly sync path, FIXME someday perhaps?
1318 self
._refresh
_host
_daemons
(host
)
1320 for hostname
in self
.inventory
.keys():
1321 self
._refresh
_host
_daemons
(hostname
)
1323 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1324 if host
and h
!= host
:
1326 for name
, dd
in dm
.items():
1327 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1329 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1331 if service_name
is not None and service_name
!= dd
.service_name():
1337 def service_action(self
, action
, service_name
):
1339 for host
, dm
in self
.cache
.daemons
.items():
1340 for name
, d
in dm
.items():
1341 if d
.matches_service(service_name
):
1342 args
.append((d
.daemon_type
, d
.daemon_id
,
1343 d
.hostname
, action
))
1344 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1345 return self
._daemon
_actions
(args
)
1348 def _daemon_actions(self
, daemon_type
, daemon_id
, host
, action
):
1349 return self
._daemon
_action
(daemon_type
, daemon_id
, host
, action
)
1351 def _daemon_action(self
, daemon_type
, daemon_id
, host
, action
):
1352 if action
== 'redeploy':
1353 # stop, recreate the container+unit, then restart
1354 return self
._create
_daemon
(daemon_type
, daemon_id
, host
)
1355 elif action
== 'reconfig':
1356 return self
._create
_daemon
(daemon_type
, daemon_id
, host
,
1360 'start': ['reset-failed', 'start'],
1362 'restart': ['reset-failed', 'restart'],
1364 name
= '%s.%s' % (daemon_type
, daemon_id
)
1365 for a
in actions
[action
]:
1366 out
, err
, code
= self
._run
_cephadm
(
1368 ['--name', name
, a
],
1370 self
.cache
.invalidate_host_daemons(host
)
1371 return "{} {} from host '{}'".format(action
, name
, host
)
1374 def daemon_action(self
, action
, daemon_type
, daemon_id
):
1376 for host
, dm
in self
.cache
.daemons
.items():
1377 for name
, d
in dm
.items():
1378 if d
.daemon_type
== daemon_type
and d
.daemon_id
== daemon_id
:
1379 args
.append((d
.daemon_type
, d
.daemon_id
,
1380 d
.hostname
, action
))
1382 raise orchestrator
.OrchestratorError(
1383 'Unable to find %s.%s daemon(s)' % (
1384 daemon_type
, daemon_id
))
1385 self
.log
.info('%s daemons %s' % (
1386 action
.capitalize(),
1387 ','.join(['%s.%s' % (a
[0], a
[1]) for a
in args
])))
1388 return self
._daemon
_actions
(args
)
1391 def remove_daemons(self
, names
):
1392 # type: (List[str]) -> List[str]
1394 for host
, dm
in self
.cache
.daemons
.items():
1397 args
.append((name
, host
))
1399 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
1400 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
1401 return self
._remove
_daemons
(args
)
1404 def remove_service(self
, service_name
):
1405 self
.log
.info('Remove service %s' % service_name
)
1406 self
._trigger
_preview
_refresh
(service_name
=service_name
)
1407 found
= self
.spec_store
.rm(service_name
)
1409 self
._kick
_serve
_loop
()
1410 return ['Removed service %s' % service_name
]
1412 # must be idempotent: still a success.
1413 return [f
'Failed to remove service. <{service_name}> was not found.']
1416 def get_inventory(self
, host_filter
=None, refresh
=False):
1418 Return the storage inventory of hosts matching the given filter.
1420 :param host_filter: host filter
1423 - add filtering by label
1426 # ugly sync path, FIXME someday perhaps?
1428 for host
in host_filter
.hosts
:
1429 self
._refresh
_host
_devices
(host
)
1431 for host
in self
.inventory
.keys():
1432 self
._refresh
_host
_devices
(host
)
1435 for host
, dls
in self
.cache
.devices
.items():
1436 if host_filter
and host
not in host_filter
.hosts
:
1438 result
.append(orchestrator
.InventoryHost(host
,
1439 inventory
.Devices(dls
)))
1443 def zap_device(self
, host
, path
):
1444 self
.log
.info('Zap device %s:%s' % (host
, path
))
1445 out
, err
, code
= self
._run
_cephadm
(
1446 host
, 'osd', 'ceph-volume',
1447 ['--', 'lvm', 'zap', '--destroy', path
],
1449 self
.cache
.invalidate_host_devices(host
)
1451 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
1452 return '\n'.join(out
+ err
)
1455 def blink_device_light(self
, ident_fault
, on
, locs
):
1457 def blink(host
, dev
, path
):
1460 'local-disk-%s-led-%s' % (
1462 'on' if on
else 'off'),
1463 '--path', path
or dev
,
1465 out
, err
, code
= self
._run
_cephadm
(
1466 host
, 'osd', 'shell', ['--'] + cmd
,
1470 'Unable to affect %s light for %s:%s. Command: %s' % (
1471 ident_fault
, host
, dev
, ' '.join(cmd
)))
1472 self
.log
.info('Set %s light for %s:%s %s' % (
1473 ident_fault
, host
, dev
, 'on' if on
else 'off'))
1474 return "Set %s light for %s:%s %s" % (
1475 ident_fault
, host
, dev
, 'on' if on
else 'off')
1479 def get_osd_uuid_map(self
, only_up
=False):
1480 # type: (bool) -> Dict[str, str]
1481 osd_map
= self
.get('osd_map')
1483 for o
in osd_map
['osds']:
1484 # only include OSDs that have ever started in this map. this way
1485 # an interrupted osd create can be repeated and succeed the second
1487 osd_id
= o
.get('osd')
1489 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1490 if not only_up
or (o
['up_from'] > 0):
1491 r
[str(osd_id
)] = o
.get('uuid', '')
1494 def resolve_hosts_for_osdspecs(self
,
1495 specs
: Optional
[List
[DriveGroupSpec
]] = None,
1496 service_name
: Optional
[str] = None
1500 self
.log
.debug(f
"Looking for OSDSpec with service_name: {service_name}")
1501 osdspecs
= self
.spec_store
.find(service_name
=service_name
)
1502 self
.log
.debug(f
"Found OSDSpecs: {osdspecs}")
1504 osdspecs
= [cast(DriveGroupSpec
, spec
) for spec
in specs
]
1505 if not service_name
and not specs
:
1506 # if neither parameters are fulfilled, search for all available osdspecs
1507 osdspecs
= self
.spec_store
.find(service_name
='osd')
1508 self
.log
.debug(f
"Found OSDSpecs: {osdspecs}")
1510 self
.log
.debug("No OSDSpecs found")
1512 return sum([spec
.placement
.filter_matching_hosts(self
._get
_hosts
) for spec
in osdspecs
], [])
1514 def resolve_osdspecs_for_host(self
, host
):
1516 self
.log
.debug(f
"Finding OSDSpecs for host: <{host}>")
1517 for spec
in self
.spec_store
.find('osd'):
1518 if host
in spec
.placement
.filter_matching_hosts(self
._get
_hosts
):
1519 self
.log
.debug(f
"Found OSDSpecs for host: <{host}> -> <{spec}>")
1520 matching_specs
.append(spec
)
1521 return matching_specs
1523 def _trigger_preview_refresh(self
,
1524 specs
: Optional
[List
[DriveGroupSpec
]] = None,
1525 service_name
: Optional
[str] = None):
1526 refresh_hosts
= self
.resolve_hosts_for_osdspecs(specs
=specs
, service_name
=service_name
)
1527 for host
in refresh_hosts
:
1528 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
1529 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
1532 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]):
1533 self
._trigger
_preview
_refresh
(specs
=specs
)
1534 return [self
._apply
(spec
) for spec
in specs
]
1537 def create_osds(self
, drive_group
: DriveGroupSpec
):
1538 return self
.osd_service
.create(drive_group
)
1541 def preview_osdspecs(self
,
1542 osdspec_name
: Optional
[str] = None,
1543 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
1545 matching_hosts
= self
.resolve_hosts_for_osdspecs(specs
=osdspecs
, service_name
=osdspec_name
)
1546 if not matching_hosts
:
1547 return {'n/a': [{'error': True,
1548 'message': 'No OSDSpec or matching hosts found.'}]}
1549 # Is any host still loading previews
1550 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
1552 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1553 return {'n/a': [{'error': True,
1554 'message': 'Preview data is being generated.. '
1555 'Please try again in a bit.'}]}
1556 # drop all keys that are not in search_hosts and return preview struct
1557 return {k
: v
for (k
, v
) in self
.cache
.osdspec_previews
.items() if k
in matching_hosts
}
1559 def _calc_daemon_deps(self
, daemon_type
, daemon_id
):
1561 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1562 'grafana': ['prometheus'],
1563 'alertmanager': ['mgr', 'alertmanager'],
1566 for dep_type
in need
.get(daemon_type
, []):
1567 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
1568 deps
.append(dd
.name())
1571 def _get_config_and_keyring(self
, daemon_type
, daemon_id
,
1573 extra_ceph_config
=None):
1574 # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
1577 ename
= utils
.name_to_auth_entity(daemon_type
+ '.' + daemon_id
)
1578 ret
, keyring
, err
= self
.check_mon_command({
1579 'prefix': 'auth get',
1584 ret
, config
, err
= self
.check_mon_command({
1585 "prefix": "config generate-minimal-conf",
1587 if extra_ceph_config
:
1588 config
+= extra_ceph_config
1595 def _create_daemon(self
,
1599 keyring
: Optional
[str] = None,
1600 extra_args
: Optional
[List
[str]] = None,
1601 extra_config
: Optional
[Dict
[str, Any
]] = None,
1603 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1609 if not extra_config
:
1611 name
= '%s.%s' % (daemon_type
, daemon_id
)
1613 start_time
= datetime
.datetime
.utcnow()
1614 deps
= [] # type: List[str]
1615 cephadm_config
= {} # type: Dict[str, Any]
1616 if daemon_type
== 'prometheus':
1617 cephadm_config
, deps
= self
.prometheus_service
.generate_config()
1618 extra_args
.extend(['--config-json', '-'])
1619 elif daemon_type
== 'grafana':
1620 cephadm_config
, deps
= self
.grafana_service
.generate_config()
1621 extra_args
.extend(['--config-json', '-'])
1622 elif daemon_type
== 'nfs':
1623 cephadm_config
, deps
= \
1624 self
.nfs_service
._generate
_nfs
_config
(daemon_type
, daemon_id
, host
)
1625 extra_args
.extend(['--config-json', '-'])
1626 elif daemon_type
== 'alertmanager':
1627 cephadm_config
, deps
= self
.alertmanager_service
.generate_config()
1628 extra_args
.extend(['--config-json', '-'])
1629 elif daemon_type
== 'node-exporter':
1630 cephadm_config
, deps
= self
.node_exporter_service
.generate_config()
1631 extra_args
.extend(['--config-json', '-'])
1633 # Ceph.daemons (mon, mgr, mds, osd, etc)
1634 cephadm_config
= self
._get
_config
_and
_keyring
(
1635 daemon_type
, daemon_id
,
1637 extra_ceph_config
=extra_config
.pop('config', ''))
1639 cephadm_config
.update({'files': extra_config
})
1640 extra_args
.extend(['--config-json', '-'])
1642 # osd deployments needs an --osd-uuid arg
1643 if daemon_type
== 'osd':
1644 if not osd_uuid_map
:
1645 osd_uuid_map
= self
.get_osd_uuid_map()
1646 osd_uuid
= osd_uuid_map
.get(daemon_id
)
1648 raise OrchestratorError('osd.%s not in osdmap' % daemon_id
)
1649 extra_args
.extend(['--osd-fsid', osd_uuid
])
1652 extra_args
.append('--reconfig')
1653 if self
.allow_ptrace
:
1654 extra_args
.append('--allow-ptrace')
1656 self
.log
.info('%s daemon %s on %s' % (
1657 'Reconfiguring' if reconfig
else 'Deploying',
1660 out
, err
, code
= self
._run
_cephadm
(
1661 host
, name
, 'deploy',
1665 stdin
=json
.dumps(cephadm_config
))
1666 if not code
and host
in self
.cache
.daemons
:
1667 # prime cached service state with what we (should have)
1669 sd
= orchestrator
.DaemonDescription()
1670 sd
.daemon_type
= daemon_type
1671 sd
.daemon_id
= daemon_id
1674 sd
.status_desc
= 'starting'
1675 self
.cache
.add_daemon(host
, sd
)
1676 self
.cache
.invalidate_host_daemons(host
)
1677 self
.cache
.update_daemon_config_deps(host
, name
, deps
, start_time
)
1678 self
.cache
.save_host(host
)
1679 return "{} {} on host '{}'".format(
1680 'Reconfigured' if reconfig
else 'Deployed', name
, host
)
1683 def _remove_daemons(self
, name
, host
) -> str:
1684 return self
._remove
_daemon
(name
, host
)
1686 def _remove_daemon(self
, name
, host
) -> str:
1690 (daemon_type
, daemon_id
) = name
.split('.', 1)
1691 if daemon_type
== 'mon':
1692 self
._check
_safe
_to
_destroy
_mon
(daemon_id
)
1694 # remove mon from quorum before we destroy the daemon
1695 self
.log
.info('Removing monitor %s from monmap...' % name
)
1696 ret
, out
, err
= self
.check_mon_command({
1701 args
= ['--name', name
, '--force']
1702 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
1703 out
, err
, code
= self
._run
_cephadm
(
1704 host
, name
, 'rm-daemon', args
)
1706 # remove item from cache
1707 self
.cache
.rm_daemon(host
, name
)
1708 self
.cache
.invalidate_host_daemons(host
)
1709 return "Removed {} from host '{}'".format(name
, host
)
1711 def _create_fn(self
, service_type
: str) -> Callable
[..., str]:
1713 d
: Dict
[str, function
] = {
1714 'mon': self
.mon_service
.create
,
1715 'mgr': self
.mgr_service
.create
,
1716 'osd': self
.osd_service
.create
,
1717 'mds': self
.mds_service
.create
,
1718 'rgw': self
.rgw_service
.create
,
1719 'rbd-mirror': self
.rbd_mirror_service
.create
,
1720 'nfs': self
.nfs_service
.create
,
1721 'grafana': self
.grafana_service
.create
,
1722 'alertmanager': self
.alertmanager_service
.create
,
1723 'prometheus': self
.prometheus_service
.create
,
1724 'node-exporter': self
.node_exporter_service
.create
,
1725 'crash': self
.crash_service
.create
,
1726 'iscsi': self
.iscsi_service
.create
,
1728 return d
[service_type
] # type: ignore
1730 self
.log
.exception(f
'unknown service type {service_type}')
1731 raise OrchestratorError(f
'unknown service type {service_type}') from e
1733 def _config_fn(self
, service_type
) -> Optional
[Callable
[[ServiceSpec
], None]]:
1735 'mds': self
.mds_service
.config
,
1736 'rgw': self
.rgw_service
.config
,
1737 'nfs': self
.nfs_service
.config
,
1738 'iscsi': self
.iscsi_service
.config
,
1741 def _apply_service(self
, spec
: ServiceSpec
) -> bool:
1743 Schedule a service. Deploy new daemons or remove old ones, depending
1744 on the target label and count specified in the placement.
1746 daemon_type
= spec
.service_type
1747 service_name
= spec
.service_name()
1749 self
.log
.debug('Skipping unmanaged service %s spec' % service_name
)
1751 self
.log
.debug('Applying service %s spec' % service_name
)
1753 create_func
= self
._create
_fn
(daemon_type
)
1754 config_func
= self
._config
_fn
(daemon_type
)
1756 if daemon_type
== 'osd':
1758 # TODO: return True would result in a busy loop
1761 daemons
= self
.cache
.get_daemons_by_service(service_name
)
1763 public_network
= None
1764 if daemon_type
== 'mon':
1765 ret
, out
, err
= self
.check_mon_command({
1766 'prefix': 'config get',
1768 'key': 'public_network',
1771 public_network
= out
.strip()
1772 self
.log
.debug('mon public_network is %s' % public_network
)
1774 def matches_network(host
):
1775 # type: (str) -> bool
1776 if not public_network
:
1778 # make sure we have 1 or more IPs for that network on that
1780 return len(self
.cache
.networks
[host
].get(public_network
, [])) > 0
1782 hosts
= HostAssignment(
1784 get_hosts_func
=self
._get
_hosts
,
1785 get_daemons_func
=self
.cache
.get_daemons_by_service
,
1786 filter_new_host
=matches_network
if daemon_type
== 'mon' else None,
1792 if daemon_type
in ['mon', 'mgr'] and len(hosts
) < 1:
1793 self
.log
.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts
)
1798 hosts_with_daemons
= {d
.hostname
for d
in daemons
}
1799 self
.log
.debug('hosts with daemons: %s' % hosts_with_daemons
)
1800 for host
, network
, name
in hosts
:
1801 if host
not in hosts_with_daemons
:
1802 if not did_config
and config_func
:
1805 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
1806 prefix
=spec
.service_id
,
1808 self
.log
.debug('Placing %s.%s on host %s' % (
1809 daemon_type
, daemon_id
, host
))
1810 if daemon_type
== 'mon':
1811 create_func(daemon_id
, host
, network
) # type: ignore
1812 elif daemon_type
in ['nfs', 'iscsi']:
1813 create_func(daemon_id
, host
, spec
) # type: ignore
1815 create_func(daemon_id
, host
) # type: ignore
1817 # add to daemon list so next name(s) will also be unique
1818 sd
= orchestrator
.DaemonDescription(
1820 daemon_type
=daemon_type
,
1821 daemon_id
=daemon_id
,
1827 target_hosts
= [h
.hostname
for h
in hosts
]
1829 if d
.hostname
not in target_hosts
:
1830 # NOTE: we are passing the 'force' flag here, which means
1831 # we can delete a mon instances data.
1832 self
._remove
_daemon
(d
.name(), d
.hostname
)
1837 def _apply_all_services(self
):
1839 specs
= [] # type: List[ServiceSpec]
1840 for sn
, spec
in self
.spec_store
.specs
.items():
1844 if self
._apply
_service
(spec
):
1846 except Exception as e
:
1847 self
.log
.exception('Failed to apply %s spec %s: %s' % (
1848 spec
.service_name(), spec
, e
))
1851 def _check_pool_exists(self
, pool
, service_name
):
1852 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
1853 if not self
.rados
.pool_exists(pool
):
1854 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
1855 f
'service {service_name}')
1857 def _check_daemons(self
):
1858 # get monmap mtime so we can refresh configs when mons change
1859 monmap
= self
.get('mon_map')
1860 last_monmap
: Optional
[datetime
.datetime
] = datetime
.datetime
.strptime(
1861 monmap
['modified'], CEPH_DATEFMT
)
1862 if last_monmap
and last_monmap
> datetime
.datetime
.utcnow():
1863 last_monmap
= None # just in case clocks are skewed
1865 daemons
= self
.cache
.get_daemons()
1866 daemons_post
= defaultdict(list)
1869 spec
= self
.spec_store
.specs
.get(dd
.service_name(), None)
1870 if not spec
and dd
.daemon_type
not in ['mon', 'mgr', 'osd']:
1871 # (mon and mgr specs should always exist; osds aren't matched
1872 # to a service spec)
1873 self
.log
.info('Removing orphan daemon %s...' % dd
.name())
1874 self
._remove
_daemon
(dd
.name(), dd
.hostname
)
1876 # ignore unmanaged services
1877 if spec
and spec
.unmanaged
:
1880 # These daemon types require additional configs after creation
1881 if dd
.daemon_type
in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1882 daemons_post
[dd
.daemon_type
].append(dd
)
1884 deps
= self
._calc
_daemon
_deps
(dd
.daemon_type
, dd
.daemon_id
)
1885 last_deps
, last_config
= self
.cache
.get_daemon_last_config_deps(
1886 dd
.hostname
, dd
.name())
1887 if last_deps
is None:
1891 self
.log
.info('Reconfiguring %s (unknown last config time)...'% (
1894 elif last_deps
!= deps
:
1895 self
.log
.debug('%s deps %s -> %s' % (dd
.name(), last_deps
,
1897 self
.log
.info('Reconfiguring %s (dependencies changed)...' % (
1900 elif last_monmap
and \
1901 last_monmap
> last_config
and \
1902 dd
.daemon_type
in CEPH_TYPES
:
1903 self
.log
.info('Reconfiguring %s (monmap changed)...' % dd
.name())
1906 self
._create
_daemon
(dd
.daemon_type
, dd
.daemon_id
,
1907 dd
.hostname
, reconfig
=True)
1909 # do daemon post actions
1910 for daemon_type
, daemon_descs
in daemons_post
.items():
1911 self
._get
_cephadm
_service
(daemon_type
).daemon_check_post(daemon_descs
)
1913 def _add_daemon(self
, daemon_type
, spec
,
1914 create_func
: Callable
[..., T
], config_func
=None) -> List
[T
]:
1916 Add (and place) a daemon. Require explicit host placement. Do not
1917 schedule, and do not apply the related scheduling limitations.
1919 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
1920 if not spec
.placement
.hosts
:
1921 raise OrchestratorError('must specify host(s) to deploy on')
1922 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
1923 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
1924 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
1925 spec
.placement
.hosts
, count
,
1926 create_func
, config_func
)
1928 def _create_daemons(self
, daemon_type
, spec
, daemons
,
1930 create_func
: Callable
[..., T
], config_func
=None) -> List
[T
]:
1931 if count
> len(hosts
):
1932 raise OrchestratorError('too few hosts: want %d, have %s' % (
1938 args
= [] # type: List[tuple]
1939 for host
, network
, name
in hosts
:
1940 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
1941 prefix
=spec
.service_id
,
1943 self
.log
.debug('Placing %s.%s on host %s' % (
1944 daemon_type
, daemon_id
, host
))
1945 if daemon_type
== 'mon':
1946 args
.append((daemon_id
, host
, network
)) # type: ignore
1947 elif daemon_type
in ['nfs', 'iscsi']:
1948 args
.append((daemon_id
, host
, spec
)) # type: ignore
1950 args
.append((daemon_id
, host
)) # type: ignore
1952 # add to daemon list so next name(s) will also be unique
1953 sd
= orchestrator
.DaemonDescription(
1955 daemon_type
=daemon_type
,
1956 daemon_id
=daemon_id
,
1961 def create_func_map(*args
):
1962 return create_func(*args
)
1964 return create_func_map(args
)
1967 def apply_mon(self
, spec
):
1968 return self
._apply
(spec
)
1971 def add_mon(self
, spec
):
1972 # type: (ServiceSpec) -> List[str]
1973 return self
._add
_daemon
('mon', spec
, self
.mon_service
.create
)
1976 def add_mgr(self
, spec
):
1977 # type: (ServiceSpec) -> List[str]
1978 return self
._add
_daemon
('mgr', spec
, self
.mgr_service
.create
)
1980 def _apply(self
, spec
: GenericSpec
) -> str:
1981 if spec
.service_type
== 'host':
1982 return self
._add
_host
(cast(HostSpec
, spec
))
1984 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
1986 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
1987 if spec
.placement
.is_empty():
1988 # fill in default placement
1990 'mon': PlacementSpec(count
=5),
1991 'mgr': PlacementSpec(count
=2),
1992 'mds': PlacementSpec(count
=2),
1993 'rgw': PlacementSpec(count
=2),
1994 'iscsi': PlacementSpec(count
=1),
1995 'rbd-mirror': PlacementSpec(count
=2),
1996 'nfs': PlacementSpec(count
=1),
1997 'grafana': PlacementSpec(count
=1),
1998 'alertmanager': PlacementSpec(count
=1),
1999 'prometheus': PlacementSpec(count
=1),
2000 'node-exporter': PlacementSpec(host_pattern
='*'),
2001 'crash': PlacementSpec(host_pattern
='*'),
2003 spec
.placement
= defaults
[spec
.service_type
]
2004 elif spec
.service_type
in ['mon', 'mgr'] and \
2005 spec
.placement
.count
is not None and \
2006 spec
.placement
.count
< 1:
2007 raise OrchestratorError('cannot scale %s service below 1' % (
2012 get_hosts_func
=self
._get
_hosts
,
2013 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2016 self
.log
.info('Saving service %s spec with placement %s' % (
2017 spec
.service_name(), spec
.placement
.pretty_str()))
2018 self
.spec_store
.save(spec
)
2019 self
._kick
_serve
_loop
()
2020 return "Scheduled %s update..." % spec
.service_name()
2023 def apply(self
, specs
: List
[GenericSpec
]):
2026 results
.append(self
._apply
(spec
))
2030 def apply_mgr(self
, spec
):
2031 return self
._apply
(spec
)
2034 def add_mds(self
, spec
: ServiceSpec
):
2035 return self
._add
_daemon
('mds', spec
, self
.mds_service
.create
, self
.mds_service
.config
)
2038 def apply_mds(self
, spec
: ServiceSpec
):
2039 return self
._apply
(spec
)
2042 def add_rgw(self
, spec
):
2043 return self
._add
_daemon
('rgw', spec
, self
.rgw_service
.create
, self
.rgw_service
.config
)
2046 def apply_rgw(self
, spec
):
2047 return self
._apply
(spec
)
2050 def add_iscsi(self
, spec
):
2051 # type: (ServiceSpec) -> List[str]
2052 return self
._add
_daemon
('iscsi', spec
, self
.iscsi_service
.create
, self
.iscsi_service
.config
)
2055 def apply_iscsi(self
, spec
):
2056 return self
._apply
(spec
)
2059 def add_rbd_mirror(self
, spec
):
2060 return self
._add
_daemon
('rbd-mirror', spec
, self
.rbd_mirror_service
.create
)
2063 def apply_rbd_mirror(self
, spec
):
2064 return self
._apply
(spec
)
2067 def add_nfs(self
, spec
):
2068 return self
._add
_daemon
('nfs', spec
, self
.nfs_service
.create
, self
.nfs_service
.config
)
2071 def apply_nfs(self
, spec
):
2072 return self
._apply
(spec
)
2074 def _get_dashboard_url(self
):
2076 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
2079 def add_prometheus(self
, spec
):
2080 return self
._add
_daemon
('prometheus', spec
, self
.prometheus_service
.create
)
2083 def apply_prometheus(self
, spec
):
2084 return self
._apply
(spec
)
2087 def add_node_exporter(self
, spec
):
2088 # type: (ServiceSpec) -> List[str]
2089 return self
._add
_daemon
('node-exporter', spec
,
2090 self
.node_exporter_service
.create
)
2093 def apply_node_exporter(self
, spec
):
2094 return self
._apply
(spec
)
2097 def add_crash(self
, spec
):
2098 # type: (ServiceSpec) -> List[str]
2099 return self
._add
_daemon
('crash', spec
,
2100 self
.crash_service
.create
)
2103 def apply_crash(self
, spec
):
2104 return self
._apply
(spec
)
2107 def add_grafana(self
, spec
):
2108 # type: (ServiceSpec) -> List[str]
2109 return self
._add
_daemon
('grafana', spec
, self
.grafana_service
.create
)
2112 def apply_grafana(self
, spec
: ServiceSpec
):
2113 return self
._apply
(spec
)
2116 def add_alertmanager(self
, spec
):
2117 # type: (ServiceSpec) -> List[str]
2118 return self
._add
_daemon
('alertmanager', spec
, self
.alertmanager_service
.create
)
2121 def apply_alertmanager(self
, spec
: ServiceSpec
):
2122 return self
._apply
(spec
)
2124 def _get_container_image_id(self
, image_name
):
2125 # pick a random host...
2127 for host_name
in self
.inventory
.keys():
2131 raise OrchestratorError('no hosts defined')
2132 out
, err
, code
= self
._run
_cephadm
(
2133 host
, None, 'pull', [],
2138 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2139 image_name
, host
, '\n'.join(out
)))
2140 j
= json
.loads('\n'.join(out
))
2141 image_id
= j
.get('image_id')
2142 ceph_version
= j
.get('ceph_version')
2143 self
.log
.debug('image %s -> id %s version %s' %
2144 (image_name
, image_id
, ceph_version
))
2145 return image_id
, ceph_version
2148 def upgrade_check(self
, image
, version
):
2150 target_name
= self
.container_image_base
+ ':v' + version
2154 raise OrchestratorError('must specify either image or version')
2156 target_id
, target_version
= self
._get
_container
_image
_id
(target_name
)
2157 self
.log
.debug('Target image %s id %s version %s' % (
2158 target_name
, target_id
, target_version
))
2160 'target_name': target_name
,
2161 'target_id': target_id
,
2162 'target_version': target_version
,
2163 'needs_update': dict(),
2164 'up_to_date': list(),
2166 for host
, dm
in self
.cache
.daemons
.items():
2167 for name
, dd
in dm
.items():
2168 if target_id
== dd
.container_image_id
:
2169 r
['up_to_date'].append(dd
.name())
2171 r
['needs_update'][dd
.name()] = {
2172 'current_name': dd
.container_image_name
,
2173 'current_id': dd
.container_image_id
,
2174 'current_version': dd
.version
,
2176 return json
.dumps(r
, indent
=4, sort_keys
=True)
2179 def upgrade_status(self
):
2180 return self
.upgrade
.upgrade_status()
2183 def upgrade_start(self
, image
, version
):
2184 return self
.upgrade
.upgrade_start(image
, version
)
2187 def upgrade_pause(self
):
2188 return self
.upgrade
.upgrade_pause()
2191 def upgrade_resume(self
):
2192 return self
.upgrade
.upgrade_resume()
2195 def upgrade_stop(self
):
2196 return self
.upgrade
.upgrade_stop()
2199 def remove_osds(self
, osd_ids
: List
[str],
2200 replace
: bool = False,
2201 force
: bool = False):
2203 Takes a list of OSDs and schedules them for removal.
2204 The function that takes care of the actual removal is
2208 daemons
= self
.cache
.get_daemons_by_service('osd')
2209 found
: Set
[OSDRemoval
] = set()
2210 for daemon
in daemons
:
2211 if daemon
.daemon_id
not in osd_ids
:
2213 found
.add(OSDRemoval(daemon
.daemon_id
, replace
, force
,
2214 daemon
.hostname
, daemon
.name(),
2215 datetime
.datetime
.utcnow(), -1))
2217 not_found
= {osd_id
for osd_id
in osd_ids
if osd_id
not in [x
.osd_id
for x
in found
]}
2219 raise OrchestratorError('Unable to find OSD: %s' % not_found
)
2221 self
.rm_util
.queue_osds_for_removal(found
)
2223 # trigger the serve loop to initiate the removal
2224 self
._kick
_serve
_loop
()
2225 return "Scheduled OSD(s) for removal"
2228 def remove_osds_status(self
):
2230 The CLI call to retrieve an osd removal report
2232 return self
.rm_util
.report