4 from collections
import defaultdict
5 from contextlib
import contextmanager
6 from functools
import wraps
7 from tempfile
import TemporaryDirectory
8 from threading
import Event
11 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
12 Any
, Set
, TYPE_CHECKING
, cast
, Iterator
, Union
19 import multiprocessing
.pool
23 from ceph
.deployment
import inventory
24 from ceph
.deployment
.drive_group
import DriveGroupSpec
25 from ceph
.deployment
.service_spec
import \
26 NFSServiceSpec
, RGWSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
27 from cephadm
.services
.cephadmservice
import CephadmDaemonSpec
29 from mgr_module
import MgrModule
, HandleCommandResult
31 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
32 CLICommandMeta
, OrchestratorEvent
, set_exception_subject
, DaemonDescription
33 from orchestrator
._interface
import GenericSpec
37 from .migrations
import Migrations
38 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
39 RbdMirrorService
, CrashService
, CephadmService
40 from .services
.iscsi
import IscsiService
41 from .services
.nfs
import NFSService
42 from .services
.osd
import RemoveUtil
, OSDQueue
, OSDService
, OSD
, NotFoundError
43 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
45 from .schedule
import HostAssignment
, HostPlacementSpec
46 from .inventory
import Inventory
, SpecStore
, HostCache
, EventStore
47 from .upgrade
import CEPH_UPGRADE_ORDER
, CephadmUpgrade
48 from .template
import TemplateMgr
49 from .utils
import forall_hosts
, CephadmNoImage
, cephadmNoImage
53 # NOTE(mattoliverau) Patch remoto until remoto PR
54 # (https://github.com/alfredodeza/remoto/pull/56) lands
55 from distutils
.version
import StrictVersion
56 if StrictVersion(remoto
.__version
__) <= StrictVersion('1.2'):
57 def remoto_has_connection(self
):
58 return self
.gateway
.hasreceiver()
60 from remoto
.backends
import BaseConnection
61 BaseConnection
.has_connection
= remoto_has_connection
63 import execnet
.gateway_bootstrap
64 except ImportError as e
:
66 remoto_import_error
= str(e
)
69 from typing
import List
73 logger
= logging
.getLogger(__name__
)
77 DEFAULT_SSH_CONFIG
= """
80 StrictHostKeyChecking no
81 UserKnownHostsFile /dev/null
85 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
86 CEPH_DATEFMT
= '%Y-%m-%dT%H:%M:%S.%fZ'
88 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
91 class CephadmCompletion(orchestrator
.Completion
[T
]):
95 def trivial_completion(f
: Callable
[..., T
]) -> Callable
[..., CephadmCompletion
[T
]]:
97 Decorator to make CephadmCompletion methods return
98 a completion object that executes themselves.
102 def wrapper(*args
, **kwargs
):
103 return CephadmCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
108 @six.add_metaclass(CLICommandMeta
)
109 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
111 _STORE_HOST_PREFIX
= "host"
114 NATIVE_OPTIONS
= [] # type: List[Any]
115 MODULE_OPTIONS
: List
[dict] = [
117 'name': 'ssh_config_file',
120 'desc': 'customized SSH config file to connect to managed hosts',
123 'name': 'device_cache_timeout',
126 'desc': 'seconds to cache device inventory',
129 'name': 'daemon_cache_timeout',
132 'desc': 'seconds to cache service (daemon) inventory',
135 'name': 'host_check_interval',
138 'desc': 'how frequently to perform a host check',
143 'enum_allowed': ['root', 'cephadm-package'],
145 'desc': 'mode for remote execution of cephadm',
148 'name': 'container_image_base',
149 'default': 'docker.io/ceph/ceph',
150 'desc': 'Container image name, without the tag',
154 'name': 'container_image_prometheus',
155 'default': 'prom/prometheus:v2.18.1',
156 'desc': 'Prometheus container image',
159 'name': 'container_image_grafana',
160 'default': 'ceph/ceph-grafana:6.6.2',
161 'desc': 'Prometheus container image',
164 'name': 'container_image_alertmanager',
165 'default': 'prom/alertmanager:v0.20.0',
166 'desc': 'Prometheus container image',
169 'name': 'container_image_node_exporter',
170 'default': 'prom/node-exporter:v0.18.1',
171 'desc': 'Prometheus container image',
174 'name': 'warn_on_stray_hosts',
177 'desc': 'raise a health warning if daemons are detected on a host '
178 'that is not managed by cephadm',
181 'name': 'warn_on_stray_daemons',
184 'desc': 'raise a health warning if daemons are detected '
185 'that are not managed by cephadm',
188 'name': 'warn_on_failed_host_check',
191 'desc': 'raise a health warning if the host check fails',
194 'name': 'log_to_cluster',
197 'desc': 'log to the "cephadm" cluster log channel"',
200 'name': 'allow_ptrace',
203 'desc': 'allow SYS_PTRACE capability on ceph containers',
204 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
205 'process with gdb or strace. Enabling this options '
206 'can allow debugging daemons that encounter problems '
210 'name': 'prometheus_alerts_path',
212 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
213 'desc': 'location of alerts to include in prometheus deployments',
216 'name': 'migration_current',
219 'desc': 'internal - do not modify',
220 # used to track track spec and other data migrations.
223 'name': 'config_dashboard',
226 'desc': 'manage configs like API endpoints in Dashboard.'
229 'name': 'manage_etc_ceph_ceph_conf',
232 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
235 'name': 'registry_url',
238 'desc': 'Custom repository url'
241 'name': 'registry_username',
244 'desc': 'Custom repository username'
247 'name': 'registry_password',
250 'desc': 'Custom repository password'
254 def __init__(self
, *args
, **kwargs
):
255 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
256 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
257 self
.last_monmap
: Optional
[datetime
.datetime
] = None
263 if self
.get_store('pause'):
268 # for mypy which does not run the code
270 self
.ssh_config_file
= None # type: Optional[str]
271 self
.device_cache_timeout
= 0
272 self
.daemon_cache_timeout
= 0
273 self
.host_check_interval
= 0
275 self
.container_image_base
= ''
276 self
.container_image_prometheus
= ''
277 self
.container_image_grafana
= ''
278 self
.container_image_alertmanager
= ''
279 self
.container_image_node_exporter
= ''
280 self
.warn_on_stray_hosts
= True
281 self
.warn_on_stray_daemons
= True
282 self
.warn_on_failed_host_check
= True
283 self
.allow_ptrace
= False
284 self
.prometheus_alerts_path
= ''
285 self
.migration_current
= None
286 self
.config_dashboard
= True
287 self
.manage_etc_ceph_ceph_conf
= True
288 self
.registry_url
: Optional
[str] = None
289 self
.registry_username
: Optional
[str] = None
290 self
.registry_password
: Optional
[str] = None
292 self
._cons
= {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
295 self
.notify('mon_map', None)
298 path
= self
.get_ceph_option('cephadm_path')
300 with
open(path
, 'r') as f
:
301 self
._cephadm
= f
.read()
302 except (IOError, TypeError) as e
:
303 raise RuntimeError("unable to read cephadm at '%s': %s" % (
306 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
310 CephadmOrchestrator
.instance
= self
312 self
.upgrade
= CephadmUpgrade(self
)
314 self
.health_checks
= {}
316 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
318 self
.inventory
= Inventory(self
)
320 self
.cache
= HostCache(self
)
323 self
.rm_util
= RemoveUtil(self
)
324 self
.to_remove_osds
= OSDQueue()
325 self
.rm_util
.load_from_store()
327 self
.spec_store
= SpecStore(self
)
328 self
.spec_store
.load()
330 # ensure the host lists are in sync
331 for h
in self
.inventory
.keys():
332 if h
not in self
.cache
.daemons
:
333 self
.cache
.prime_empty_host(h
)
334 for h
in self
.cache
.get_hosts():
335 if h
not in self
.inventory
:
336 self
.cache
.rm_host(h
)
340 self
.events
= EventStore(self
)
341 self
.offline_hosts
: Set
[str] = set()
343 self
.migration
= Migrations(self
)
346 self
.osd_service
= OSDService(self
)
347 self
.nfs_service
= NFSService(self
)
348 self
.mon_service
= MonService(self
)
349 self
.mgr_service
= MgrService(self
)
350 self
.mds_service
= MdsService(self
)
351 self
.rgw_service
= RgwService(self
)
352 self
.rbd_mirror_service
= RbdMirrorService(self
)
353 self
.grafana_service
= GrafanaService(self
)
354 self
.alertmanager_service
= AlertmanagerService(self
)
355 self
.prometheus_service
= PrometheusService(self
)
356 self
.node_exporter_service
= NodeExporterService(self
)
357 self
.crash_service
= CrashService(self
)
358 self
.iscsi_service
= IscsiService(self
)
359 self
.cephadm_services
= {
360 'mon': self
.mon_service
,
361 'mgr': self
.mgr_service
,
362 'osd': self
.osd_service
,
363 'mds': self
.mds_service
,
364 'rgw': self
.rgw_service
,
365 'rbd-mirror': self
.rbd_mirror_service
,
366 'nfs': self
.nfs_service
,
367 'grafana': self
.grafana_service
,
368 'alertmanager': self
.alertmanager_service
,
369 'prometheus': self
.prometheus_service
,
370 'node-exporter': self
.node_exporter_service
,
371 'crash': self
.crash_service
,
372 'iscsi': self
.iscsi_service
,
375 self
.template
= TemplateMgr()
377 self
.requires_post_actions
= set()
380 self
.log
.debug('shutdown')
381 self
._worker
_pool
.close()
382 self
._worker
_pool
.join()
386 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
387 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
388 return self
.cephadm_services
[service_type
]
390 def _kick_serve_loop(self
):
391 self
.log
.debug('_kick_serve_loop')
394 # function responsible for logging single host into custom registry
395 def _registry_login(self
, host
, url
, username
, password
):
396 self
.log
.debug(f
"Attempting to log host {host} into custom registry @ {url}")
397 # want to pass info over stdin rather than through normal list of args
398 args_str
= ("{\"url\": \"" + url
+ "\", \"username\": \"" + username
+ "\", "
399 " \"password\": \"" + password
+ "\"}")
400 out
, err
, code
= self
._run
_cephadm
(
401 host
, 'mon', 'registry-login',
402 ['--registry-json', '-'], stdin
=args_str
, error_ok
=True)
404 return f
"Host {host} failed to login to {url} as {username} with given password"
408 def _check_host(self
, host
):
409 if host
not in self
.inventory
:
411 self
.log
.debug(' checking %s' % host
)
413 out
, err
, code
= self
._run
_cephadm
(
414 host
, cephadmNoImage
, 'check-host', [],
415 error_ok
=True, no_fsid
=True)
416 self
.cache
.update_last_host_check(host
)
417 self
.cache
.save_host(host
)
419 self
.log
.debug(' host %s failed check' % host
)
420 if self
.warn_on_failed_host_check
:
421 return 'host %s failed check: %s' % (host
, err
)
423 self
.log
.debug(' host %s ok' % host
)
424 except Exception as e
:
425 self
.log
.debug(' host %s failed check' % host
)
426 return 'host %s failed check: %s' % (host
, e
)
428 def _check_for_strays(self
):
429 self
.log
.debug('_check_for_strays')
430 for k
in ['CEPHADM_STRAY_HOST',
431 'CEPHADM_STRAY_DAEMON']:
432 if k
in self
.health_checks
:
433 del self
.health_checks
[k
]
434 if self
.warn_on_stray_hosts
or self
.warn_on_stray_daemons
:
435 ls
= self
.list_servers()
436 managed
= self
.cache
.get_daemon_names()
437 host_detail
= [] # type: List[str]
439 daemon_detail
= [] # type: List[str]
441 host
= item
.get('hostname')
442 daemons
= item
.get('services') # misnomer!
445 name
= '%s.%s' % (s
.get('type'), s
.get('id'))
446 if host
not in self
.inventory
:
447 missing_names
.append(name
)
448 host_num_daemons
+= 1
449 if name
not in managed
:
450 daemon_detail
.append(
451 'stray daemon %s on host %s not managed by cephadm' % (name
, host
))
454 'stray host %s has %d stray daemons: %s' % (
455 host
, len(missing_names
), missing_names
))
456 if self
.warn_on_stray_hosts
and host_detail
:
457 self
.health_checks
['CEPHADM_STRAY_HOST'] = {
458 'severity': 'warning',
459 'summary': '%d stray host(s) with %s daemon(s) '
460 'not managed by cephadm' % (
461 len(host_detail
), host_num_daemons
),
462 'count': len(host_detail
),
463 'detail': host_detail
,
465 if self
.warn_on_stray_daemons
and daemon_detail
:
466 self
.health_checks
['CEPHADM_STRAY_DAEMON'] = {
467 'severity': 'warning',
468 'summary': '%d stray daemons(s) not managed by cephadm' % (
470 'count': len(daemon_detail
),
471 'detail': daemon_detail
,
473 self
.set_health_checks(self
.health_checks
)
475 def _serve_sleep(self
):
477 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
478 ret
= self
.event
.wait(sleep_interval
)
481 def serve(self
) -> None:
483 The main loop of cephadm.
485 A command handler will typically change the declarative state
486 of cephadm. This loop will then attempt to apply this new state.
488 self
.log
.debug("serve starting")
494 self
.log
.debug('refreshing hosts and daemons')
495 self
._refresh
_hosts
_and
_daemons
()
497 self
._check
_for
_strays
()
499 self
._update
_paused
_health
()
502 self
.rm_util
.process_removal_queue()
504 self
.migration
.migrate()
505 if self
.migration
.is_migration_ongoing():
508 if self
._apply
_all
_services
():
509 continue # did something, refresh
511 self
._check
_daemons
()
513 if self
.upgrade
.continue_upgrade():
516 except OrchestratorError
as e
:
518 self
.events
.from_orch_error(e
)
521 self
.log
.debug("serve exit")
523 def _update_paused_health(self
):
525 self
.health_checks
['CEPHADM_PAUSED'] = {
526 'severity': 'warning',
527 'summary': 'cephadm background work is paused',
529 'detail': ["'ceph orch resume' to resume"],
531 self
.set_health_checks(self
.health_checks
)
533 if 'CEPHADM_PAUSED' in self
.health_checks
:
534 del self
.health_checks
['CEPHADM_PAUSED']
535 self
.set_health_checks(self
.health_checks
)
537 def config_notify(self
):
539 This method is called whenever one of our config options is changed.
541 TODO: this method should be moved into mgr_module.py
543 for opt
in self
.MODULE_OPTIONS
:
545 opt
['name'], # type: ignore
546 self
.get_module_option(opt
['name'])) # type: ignore
547 self
.log
.debug(' mgr option %s = %s',
548 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
549 for opt
in self
.NATIVE_OPTIONS
:
552 self
.get_ceph_option(opt
))
553 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
557 def notify(self
, notify_type
, notify_id
):
558 if notify_type
== "mon_map":
559 # get monmap mtime so we can refresh configs when mons change
560 monmap
= self
.get('mon_map')
561 self
.last_monmap
= datetime
.datetime
.strptime(
562 monmap
['modified'], CEPH_DATEFMT
)
563 if self
.last_monmap
and self
.last_monmap
> datetime
.datetime
.utcnow():
564 self
.last_monmap
= None # just in case clocks are skewed
565 if notify_type
== "pg_summary":
566 self
._trigger
_osd
_removal
()
568 def _trigger_osd_removal(self
):
569 data
= self
.get("osd_stats")
570 for osd
in data
.get('osd_stats', []):
571 if osd
.get('num_pgs') == 0:
572 # if _ANY_ osd that is currently in the queue appears to be empty,
573 # start the removal process
574 if int(osd
.get('osd')) in self
.to_remove_osds
.as_osd_ids():
575 self
.log
.debug(f
"Found empty osd. Starting removal process")
576 # if the osd that is now empty is also part of the removal queue
578 self
.rm_util
.process_removal_queue()
582 self
.log
.info('Paused')
583 self
.set_store('pause', 'true')
585 # wake loop so we update the health status
586 self
._kick
_serve
_loop
()
590 self
.log
.info('Resumed')
592 self
.set_store('pause', None)
593 # unconditionally wake loop so that 'orch resume' can be used to kick
595 self
._kick
_serve
_loop
()
597 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
599 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
601 Generate a unique random service name
603 suffix
= daemon_type
not in [
604 'mon', 'crash', 'nfs',
605 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
608 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
609 raise orchestrator
.OrchestratorValidationError(f
'name {daemon_type}.{forcename} already in use')
613 host
= host
.split('.')[0]
621 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
623 if len([d
for d
in existing
if d
.daemon_id
== name
]):
625 raise orchestrator
.OrchestratorValidationError(f
'name {daemon_type}.{name} already in use')
626 self
.log
.debug('name %s exists, trying again', name
)
630 def _reconfig_ssh(self
):
631 temp_files
= [] # type: list
632 ssh_options
= [] # type: List[str]
635 ssh_config_fname
= self
.ssh_config_file
636 ssh_config
= self
.get_store("ssh_config")
637 if ssh_config
is not None or ssh_config_fname
is None:
639 ssh_config
= DEFAULT_SSH_CONFIG
640 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
641 os
.fchmod(f
.fileno(), 0o600)
642 f
.write(ssh_config
.encode('utf-8'))
643 f
.flush() # make visible to other processes
645 ssh_config_fname
= f
.name
647 self
.validate_ssh_config_fname(ssh_config_fname
)
648 ssh_options
+= ['-F', ssh_config_fname
]
649 self
.ssh_config
= ssh_config
652 ssh_key
= self
.get_store("ssh_identity_key")
653 ssh_pub
= self
.get_store("ssh_identity_pub")
654 self
.ssh_pub
= ssh_pub
655 self
.ssh_key
= ssh_key
656 if ssh_key
and ssh_pub
:
657 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
658 tkey
.write(ssh_key
.encode('utf-8'))
659 os
.fchmod(tkey
.fileno(), 0o600)
660 tkey
.flush() # make visible to other processes
661 tpub
= open(tkey
.name
+ '.pub', 'w')
662 os
.fchmod(tpub
.fileno(), 0o600)
664 tpub
.flush() # make visible to other processes
665 temp_files
+= [tkey
, tpub
]
666 ssh_options
+= ['-i', tkey
.name
]
668 self
._temp
_files
= temp_files
670 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
672 self
._ssh
_options
= None
674 if self
.mode
== 'root':
675 self
.ssh_user
= self
.get_store('ssh_user', default
='root')
676 elif self
.mode
== 'cephadm-package':
677 self
.ssh_user
= 'cephadm'
681 def validate_ssh_config_fname(self
, ssh_config_fname
):
682 if not os
.path
.isfile(ssh_config_fname
):
683 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
686 def _reset_con(self
, host
):
687 conn
, r
= self
._cons
.get(host
, (None, None))
689 self
.log
.debug('_reset_con close %s' % host
)
693 def _reset_cons(self
):
694 for host
, conn_and_r
in self
._cons
.items():
695 self
.log
.debug('_reset_cons close %s' % host
)
700 def offline_hosts_remove(self
, host
):
701 if host
in self
.offline_hosts
:
702 self
.offline_hosts
.remove(host
)
707 if remoto
is not None:
710 return False, "loading remoto library:{}".format(
715 The cephadm orchestrator is always available.
717 ok
, err
= self
.can_run()
720 if not self
.ssh_key
or not self
.ssh_pub
:
721 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
724 def process(self
, completions
):
726 Does nothing, as completions are processed in another thread.
729 self
.log
.debug("process: completions={0}".format(orchestrator
.pretty_print(completions
)))
731 for p
in completions
:
734 @orchestrator._cli
_write
_command
(
735 prefix
='cephadm set-ssh-config',
736 desc
='Set the ssh_config file (use -i <ssh_config>)')
737 def _set_ssh_config(self
, inbuf
=None):
739 Set an ssh_config file provided from stdin
744 if inbuf
is None or len(inbuf
) == 0:
745 return -errno
.EINVAL
, "", "empty ssh config provided"
746 if inbuf
== self
.ssh_config
:
747 return 0, "value unchanged", ""
748 self
.set_store("ssh_config", inbuf
)
749 self
.log
.info('Set ssh_config')
753 @orchestrator._cli
_write
_command
(
754 prefix
='cephadm clear-ssh-config',
755 desc
='Clear the ssh_config file')
756 def _clear_ssh_config(self
):
758 Clear the ssh_config file provided from stdin
760 self
.set_store("ssh_config", None)
761 self
.ssh_config_tmp
= None
762 self
.log
.info('Cleared ssh_config')
766 @orchestrator._cli
_read
_command
(
767 prefix
='cephadm get-ssh-config',
768 desc
='Returns the ssh config as used by cephadm'
770 def _get_ssh_config(self
):
771 if self
.ssh_config_file
:
772 self
.validate_ssh_config_fname(self
.ssh_config_file
)
773 with
open(self
.ssh_config_file
) as f
:
774 return HandleCommandResult(stdout
=f
.read())
775 ssh_config
= self
.get_store("ssh_config")
777 return HandleCommandResult(stdout
=ssh_config
)
778 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
781 @orchestrator._cli
_write
_command
(
782 'cephadm generate-key',
783 desc
='Generate a cluster SSH key (if not present)')
784 def _generate_key(self
):
785 if not self
.ssh_pub
or not self
.ssh_key
:
786 self
.log
.info('Generating ssh key...')
787 tmp_dir
= TemporaryDirectory()
788 path
= tmp_dir
.name
+ '/key'
790 subprocess
.check_call([
791 '/usr/bin/ssh-keygen',
792 '-C', 'ceph-%s' % self
._cluster
_fsid
,
796 with
open(path
, 'r') as f
:
798 with
open(path
+ '.pub', 'r') as f
:
802 os
.unlink(path
+ '.pub')
804 self
.set_store('ssh_identity_key', secret
)
805 self
.set_store('ssh_identity_pub', pub
)
809 @orchestrator._cli
_write
_command
(
810 'cephadm set-priv-key',
811 desc
='Set cluster SSH private key (use -i <private_key>)')
812 def _set_priv_key(self
, inbuf
=None):
813 if inbuf
is None or len(inbuf
) == 0:
814 return -errno
.EINVAL
, "", "empty private ssh key provided"
815 if inbuf
== self
.ssh_key
:
816 return 0, "value unchanged", ""
817 self
.set_store("ssh_identity_key", inbuf
)
818 self
.log
.info('Set ssh private key')
822 @orchestrator._cli
_write
_command
(
823 'cephadm set-pub-key',
824 desc
='Set cluster SSH public key (use -i <public_key>)')
825 def _set_pub_key(self
, inbuf
=None):
826 if inbuf
is None or len(inbuf
) == 0:
827 return -errno
.EINVAL
, "", "empty public ssh key provided"
828 if inbuf
== self
.ssh_pub
:
829 return 0, "value unchanged", ""
830 self
.set_store("ssh_identity_pub", inbuf
)
831 self
.log
.info('Set ssh public key')
835 @orchestrator._cli
_write
_command
(
837 desc
='Clear cluster SSH key')
838 def _clear_key(self
):
839 self
.set_store('ssh_identity_key', None)
840 self
.set_store('ssh_identity_pub', None)
842 self
.log
.info('Cleared cluster SSH key')
845 @orchestrator._cli
_read
_command
(
846 'cephadm get-pub-key',
847 desc
='Show SSH public key for connecting to cluster hosts')
848 def _get_pub_key(self
):
850 return 0, self
.ssh_pub
, ''
852 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
854 @orchestrator._cli
_read
_command
(
856 desc
='Show user for SSHing to cluster hosts')
858 return 0, self
.ssh_user
, ''
860 @orchestrator._cli
_read
_command
(
862 'name=user,type=CephString',
863 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
864 def set_ssh_user(self
, user
):
865 current_user
= self
.ssh_user
866 if user
== current_user
:
867 return 0, "value unchanged", ""
869 self
.set_store('ssh_user', user
)
872 host
= self
.cache
.get_hosts()[0]
873 r
= self
._check
_host
(host
)
875 #connection failed reset user
876 self
.set_store('ssh_user', current_user
)
878 return -errno
.EINVAL
, '', 'ssh connection %s@%s failed' % (user
, host
)
880 msg
= 'ssh user set to %s' % user
882 msg
+= ' sudo will be used'
886 @orchestrator._cli
_read
_command
(
887 'cephadm registry-login',
888 "name=url,type=CephString,req=false "
889 "name=username,type=CephString,req=false "
890 "name=password,type=CephString,req=false",
891 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
892 def registry_login(self
, url
=None, username
=None, password
=None, inbuf
=None):
893 # if password not given in command line, get it through file input
894 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
895 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
896 "or -i <login credentials json file>")
897 elif not (url
and username
and password
):
898 login_info
= json
.loads(inbuf
)
899 if "url" in login_info
and "username" in login_info
and "password" in login_info
:
900 url
= login_info
["url"]
901 username
= login_info
["username"]
902 password
= login_info
["password"]
904 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
905 "Please setup json file as\n"
907 " \"url\": \"REGISTRY_URL\",\n"
908 " \"username\": \"REGISTRY_USERNAME\",\n"
909 " \"password\": \"REGISTRY_PASSWORD\"\n"
911 # verify login info works by attempting login on random host
913 for host_name
in self
.inventory
.keys():
917 raise OrchestratorError('no hosts defined')
918 r
= self
._registry
_login
(host
, url
, username
, password
)
921 # if logins succeeded, store info
922 self
.log
.debug("Host logins successful. Storing login info.")
923 self
.set_module_option('registry_url', url
)
924 self
.set_module_option('registry_username', username
)
925 self
.set_module_option('registry_password', password
)
926 # distribute new login info to all hosts
927 self
.cache
.distribute_new_registry_login_info()
928 return 0, "registry login scheduled", ''
930 @orchestrator._cli
_read
_command
(
931 'cephadm check-host',
932 'name=host,type=CephString '
933 'name=addr,type=CephString,req=false',
934 'Check whether we can access and manage a remote host')
935 def check_host(self
, host
, addr
=None):
937 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'check-host',
938 ['--expect-hostname', host
],
940 error_ok
=True, no_fsid
=True)
942 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
943 except OrchestratorError
as e
:
944 self
.log
.exception(f
"check-host failed for '{host}'")
945 return 1, '', ('check-host failed:\n' +
946 f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
947 # if we have an outstanding health alert for this host, give the
948 # serve thread a kick
949 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
950 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
951 if item
.startswith('host %s ' % host
):
953 return 0, '%s (%s) ok' % (host
, addr
), err
955 @orchestrator._cli
_read
_command
(
956 'cephadm prepare-host',
957 'name=host,type=CephString '
958 'name=addr,type=CephString,req=false',
959 'Prepare a remote host for use with cephadm')
960 def _prepare_host(self
, host
, addr
=None):
961 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'prepare-host',
962 ['--expect-hostname', host
],
964 error_ok
=True, no_fsid
=True)
966 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
967 # if we have an outstanding health alert for this host, give the
968 # serve thread a kick
969 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
970 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
971 if item
.startswith('host %s ' % host
):
973 return 0, '%s (%s) ok' % (host
, addr
), err
975 def _get_connection(self
, host
: str):
977 Setup a connection for running commands on remote host.
979 conn
, r
= self
._cons
.get(host
, (None, None))
981 if conn
.has_connection():
982 self
.log
.debug('Have connection to %s' % host
)
985 self
._reset
_con
(host
)
986 n
= self
.ssh_user
+ '@' + host
987 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
988 n
, self
._ssh
_options
))
989 child_logger
=self
.log
.getChild(n
)
990 child_logger
.setLevel('WARNING')
991 conn
= remoto
.Connection(
994 ssh_options
=self
._ssh
_options
,
995 sudo
=True if self
.ssh_user
!= 'root' else False)
997 r
= conn
.import_module(remotes
)
998 self
._cons
[host
] = conn
, r
1002 def _executable_path(self
, conn
, executable
):
1004 Remote validator that accepts a connection object to ensure that a certain
1005 executable is available returning its full path if so.
1007 Otherwise an exception with thorough details will be raised, informing the
1008 user that the executable was not found.
1010 executable_path
= conn
.remote_module
.which(executable
)
1011 if not executable_path
:
1012 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1013 executable
, conn
.hostname
))
1014 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
1016 return executable_path
1019 def _remote_connection(self
,
1021 addr
: Optional
[str]=None,
1022 ) -> Iterator
[Tuple
["BaseConnection", Any
]]:
1023 if not addr
and host
in self
.inventory
:
1024 addr
= self
.inventory
.get_addr(host
)
1026 self
.offline_hosts_remove(host
)
1031 raise OrchestratorError("host address is empty")
1032 conn
, connr
= self
._get
_connection
(addr
)
1033 except OSError as e
:
1034 self
._reset
_con
(host
)
1035 msg
= f
"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1036 raise execnet
.gateway_bootstrap
.HostNotFound(msg
)
1040 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1041 # this is a misleading exception as it seems to be thrown for
1042 # any sort of connection failure, even those having nothing to
1043 # do with "host not found" (e.g., ssh key permission denied).
1044 self
.offline_hosts
.add(host
)
1045 self
._reset
_con
(host
)
1047 user
= self
.ssh_user
if self
.mode
== 'root' else 'cephadm'
1048 msg
= f
'''Failed to connect to {host} ({addr}).
1049 Check that the host is reachable and accepts connections using the cephadm SSH key
1051 you may want to run:
1052 > ceph cephadm get-ssh-config > ssh_config
1053 > ceph config-key get mgr/cephadm/ssh_identity_key > key
1054 > ssh -F ssh_config -i key {user}@{host}'''
1055 raise OrchestratorError(msg
) from e
1056 except Exception as ex
:
1057 self
.log
.exception(ex
)
1060 def _get_container_image(self
, daemon_name
: str) -> str:
1061 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1062 if daemon_type
in CEPH_TYPES
or \
1063 daemon_type
== 'nfs' or \
1064 daemon_type
== 'iscsi':
1065 # get container image
1066 ret
, image
, err
= self
.check_mon_command({
1067 'prefix': 'config get',
1068 'who': utils
.name_to_config_section(daemon_name
),
1069 'key': 'container_image',
1071 image
= image
.strip() # type: ignore
1072 elif daemon_type
== 'prometheus':
1073 image
= self
.container_image_prometheus
1074 elif daemon_type
== 'grafana':
1075 image
= self
.container_image_grafana
1076 elif daemon_type
== 'alertmanager':
1077 image
= self
.container_image_alertmanager
1078 elif daemon_type
== 'node-exporter':
1079 image
= self
.container_image_node_exporter
1081 assert False, daemon_type
1083 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1087 def _run_cephadm(self
,
1089 entity
: Union
[CephadmNoImage
, str],
1092 addr
: Optional
[str] = "",
1093 stdin
: Optional
[str] = "",
1094 no_fsid
: Optional
[bool] = False,
1095 error_ok
: Optional
[bool] = False,
1096 image
: Optional
[str] = "",
1097 env_vars
: Optional
[List
[str]]= None,
1098 ) -> Tuple
[List
[str], List
[str], int]:
1100 Run cephadm on the remote host with the given command + args
1102 :env_vars: in format -> [KEY=VALUE, ..]
1104 with self
._remote
_connection
(host
, addr
) as tpl
:
1106 assert image
or entity
1107 if not image
and entity
is not cephadmNoImage
:
1108 image
= self
._get
_container
_image
(entity
)
1113 for env_var_pair
in env_vars
:
1114 final_args
.extend(['--env', env_var_pair
])
1117 final_args
.extend(['--image', image
])
1118 final_args
.append(command
)
1121 final_args
+= ['--fsid', self
._cluster
_fsid
]
1124 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1125 if self
.mode
== 'root':
1127 self
.log
.debug('stdin: %s' % stdin
)
1128 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
1130 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
1131 script
+= self
._cephadm
1132 python
= connr
.choose_python()
1135 'unable to find python on %s (tried %s in %s)' % (
1136 host
, remotes
.PYTHONS
, remotes
.PATH
))
1138 out
, err
, code
= remoto
.process
.check(
1141 stdin
=script
.encode('utf-8'))
1142 except RuntimeError as e
:
1143 self
._reset
_con
(host
)
1145 return [], [str(e
)], 1
1147 elif self
.mode
== 'cephadm-package':
1149 out
, err
, code
= remoto
.process
.check(
1151 ['sudo', '/usr/bin/cephadm'] + final_args
,
1153 except RuntimeError as e
:
1154 self
._reset
_con
(host
)
1156 return [], [str(e
)], 1
1159 assert False, 'unsupported mode'
1161 self
.log
.debug('code: %d' % code
)
1163 self
.log
.debug('out: %s' % '\n'.join(out
))
1165 self
.log
.debug('err: %s' % '\n'.join(err
))
1166 if code
and not error_ok
:
1167 raise OrchestratorError(
1168 'cephadm exited with an error code: %d, stderr:%s' % (
1169 code
, '\n'.join(err
)))
1170 return out
, err
, code
1173 def _get_hosts(self
, label
: Optional
[str] = '', as_hostspec
: bool = False) -> List
:
1174 return list(self
.inventory
.filter_by_label(label
=label
, as_hostspec
=as_hostspec
))
1176 def _add_host(self
, spec
):
1177 # type: (HostSpec) -> str
1179 Add a host to be managed by the orchestrator.
1181 :param host: host name
1183 assert_valid_host(spec
.hostname
)
1184 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, cephadmNoImage
, 'check-host',
1185 ['--expect-hostname', spec
.hostname
],
1187 error_ok
=True, no_fsid
=True)
1189 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1190 spec
.hostname
, spec
.addr
, err
))
1192 self
.inventory
.add_host(spec
)
1193 self
.cache
.prime_empty_host(spec
.hostname
)
1194 self
.offline_hosts_remove(spec
.hostname
)
1195 self
.event
.set() # refresh stray health check
1196 self
.log
.info('Added host %s' % spec
.hostname
)
1197 return "Added host '{}'".format(spec
.hostname
)
1200 def add_host(self
, spec
: HostSpec
) -> str:
1201 return self
._add
_host
(spec
)
1204 def remove_host(self
, host
):
1205 # type: (str) -> str
1207 Remove a host from orchestrator management.
1209 :param host: host name
1211 self
.inventory
.rm_host(host
)
1212 self
.cache
.rm_host(host
)
1213 self
._reset
_con
(host
)
1214 self
.event
.set() # refresh stray health check
1215 self
.log
.info('Removed host %s' % host
)
1216 return "Removed host '{}'".format(host
)
1219 def update_host_addr(self
, host
, addr
) -> str:
1220 self
.inventory
.set_addr(host
, addr
)
1221 self
._reset
_con
(host
)
1222 self
.event
.set() # refresh stray health check
1223 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1224 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1227 def get_hosts(self
):
1228 # type: () -> List[orchestrator.HostSpec]
1230 Return a list of hosts managed by the orchestrator.
1233 - skip async: manager reads from cache.
1235 return list(self
.inventory
.all_specs())
1238 def add_host_label(self
, host
, label
) -> str:
1239 self
.inventory
.add_label(host
, label
)
1240 self
.log
.info('Added label %s to host %s' % (label
, host
))
1241 return 'Added label %s to host %s' % (label
, host
)
1244 def remove_host_label(self
, host
, label
) -> str:
1245 self
.inventory
.rm_label(host
, label
)
1246 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1247 return 'Removed label %s from host %s' % (label
, host
)
1250 def host_ok_to_stop(self
, hostname
: str):
1251 if hostname
not in self
.cache
.get_hosts():
1252 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1254 daemons
= self
.cache
.get_daemons()
1255 daemon_map
= defaultdict(lambda: [])
1257 if dd
.hostname
== hostname
:
1258 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1260 for daemon_type
,daemon_ids
in daemon_map
.items():
1261 r
= self
.cephadm_services
[daemon_type
].ok_to_stop(daemon_ids
)
1263 self
.log
.error(f
'It is NOT safe to stop host {hostname}')
1264 raise orchestrator
.OrchestratorError(
1268 msg
= f
'It is presumed safe to stop host {hostname}'
1272 def update_osdspec_previews(self
, search_host
: str = ''):
1273 # Set global 'pending' flag for host
1274 self
.cache
.loading_osdspec_preview
.add(search_host
)
1276 # query OSDSpecs for host <search host> and generate/get the preview
1277 # There can be multiple previews for one host due to multiple OSDSpecs.
1278 previews
.extend(self
.osd_service
.get_previews(search_host
))
1279 self
.log
.debug(f
"Loading OSDSpec previews to HostCache")
1280 self
.cache
.osdspec_previews
[search_host
] = previews
1281 # Unset global 'pending' flag for host
1282 self
.cache
.loading_osdspec_preview
.remove(search_host
)
1284 def _refresh_host_osdspec_previews(self
, host
) -> bool:
1285 self
.update_osdspec_previews(host
)
1286 self
.cache
.save_host(host
)
1287 self
.log
.debug(f
'Refreshed OSDSpec previews for host <{host}>')
1290 def _refresh_hosts_and_daemons(self
) -> None:
1296 if self
.cache
.host_needs_check(host
):
1297 r
= self
._check
_host
(host
)
1300 if self
.cache
.host_needs_daemon_refresh(host
):
1301 self
.log
.debug('refreshing %s daemons' % host
)
1302 r
= self
._refresh
_host
_daemons
(host
)
1306 if self
.cache
.host_needs_registry_login(host
) and self
.registry_url
:
1307 self
.log
.debug(f
"Logging `{host}` into custom registry")
1308 r
= self
._registry
_login
(host
, self
.registry_url
, self
.registry_username
, self
.registry_password
)
1312 if self
.cache
.host_needs_device_refresh(host
):
1313 self
.log
.debug('refreshing %s devices' % host
)
1314 r
= self
._refresh
_host
_devices
(host
)
1318 if self
.cache
.host_needs_osdspec_preview_refresh(host
):
1319 self
.log
.debug(f
"refreshing OSDSpec previews for {host}")
1320 r
= self
._refresh
_host
_osdspec
_previews
(host
)
1324 if self
.cache
.host_needs_new_etc_ceph_ceph_conf(host
):
1325 self
.log
.debug(f
"deploying new /etc/ceph/ceph.conf on `{host}`")
1326 r
= self
._deploy
_etc
_ceph
_ceph
_conf
(host
)
1330 refresh(self
.cache
.get_hosts())
1332 health_changed
= False
1333 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1334 del self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']
1335 health_changed
= True
1337 self
.health_checks
['CEPHADM_HOST_CHECK_FAILED'] = {
1338 'severity': 'warning',
1339 'summary': '%d hosts fail cephadm check' % len(bad_hosts
),
1340 'count': len(bad_hosts
),
1341 'detail': bad_hosts
,
1343 health_changed
= True
1345 self
.health_checks
['CEPHADM_REFRESH_FAILED'] = {
1346 'severity': 'warning',
1347 'summary': 'failed to probe daemons or devices',
1348 'count': len(failures
),
1351 health_changed
= True
1352 elif 'CEPHADM_REFRESH_FAILED' in self
.health_checks
:
1353 del self
.health_checks
['CEPHADM_REFRESH_FAILED']
1354 health_changed
= True
1356 self
.set_health_checks(self
.health_checks
)
1358 def _refresh_host_daemons(self
, host
) -> Optional
[str]:
1360 out
, err
, code
= self
._run
_cephadm
(
1361 host
, 'mon', 'ls', [], no_fsid
=True)
1363 return 'host %s cephadm ls returned %d: %s' % (
1365 except Exception as e
:
1366 return 'host %s scrape failed: %s' % (host
, e
)
1367 ls
= json
.loads(''.join(out
))
1370 if not d
['style'].startswith('cephadm'):
1372 if d
['fsid'] != self
._cluster
_fsid
:
1374 if '.' not in d
['name']:
1376 sd
= orchestrator
.DaemonDescription()
1377 sd
.last_refresh
= datetime
.datetime
.utcnow()
1378 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
1381 setattr(sd
, k
, datetime
.datetime
.strptime(d
[k
], DATEFMT
))
1382 sd
.daemon_type
= d
['name'].split('.')[0]
1383 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
1385 sd
.container_id
= d
.get('container_id')
1388 sd
.container_id
= sd
.container_id
[0:12]
1389 sd
.container_image_name
= d
.get('container_image_name')
1390 sd
.container_image_id
= d
.get('container_image_id')
1391 sd
.version
= d
.get('version')
1392 if sd
.daemon_type
== 'osd':
1393 sd
.osdspec_affinity
= self
.osd_service
.get_osdspec_affinity(sd
.daemon_id
)
1395 sd
.status_desc
= d
['state']
1403 sd
.status_desc
= 'unknown'
1406 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
1407 self
.cache
.update_host_daemons(host
, dm
)
1408 self
.cache
.save_host(host
)
1411 def _refresh_host_devices(self
, host
) -> Optional
[str]:
1413 out
, err
, code
= self
._run
_cephadm
(
1416 ['--', 'inventory', '--format=json'])
1418 return 'host %s ceph-volume inventory returned %d: %s' % (
1420 except Exception as e
:
1421 return 'host %s ceph-volume inventory failed: %s' % (host
, e
)
1422 devices
= json
.loads(''.join(out
))
1424 out
, err
, code
= self
._run
_cephadm
(
1430 return 'host %s list-networks returned %d: %s' % (
1432 except Exception as e
:
1433 return 'host %s list-networks failed: %s' % (host
, e
)
1434 networks
= json
.loads(''.join(out
))
1435 self
.log
.debug('Refreshed host %s devices (%d) networks (%s)' % (
1436 host
, len(devices
), len(networks
)))
1437 devices
= inventory
.Devices
.from_json(devices
)
1438 self
.cache
.update_host_devices_networks(host
, devices
.devices
, networks
)
1439 self
.update_osdspec_previews(host
)
1440 self
.cache
.save_host(host
)
1443 def _deploy_etc_ceph_ceph_conf(self
, host
: str) -> Optional
[str]:
1444 ret
, config
, err
= self
.check_mon_command({
1445 "prefix": "config generate-minimal-conf",
1449 with self
._remote
_connection
(host
) as tpl
:
1451 out
, err
, code
= remoto
.process
.check(
1453 ['mkdir', '-p', '/etc/ceph'])
1455 return f
'failed to create /etc/ceph on {host}: {err}'
1456 out
, err
, code
= remoto
.process
.check(
1458 ['dd', 'of=/etc/ceph/ceph.conf'],
1459 stdin
=config
.encode('utf-8')
1462 return f
'failed to create /etc/ceph/ceph.conf on {host}: {err}'
1463 self
.cache
.update_last_etc_ceph_ceph_conf(host
)
1464 self
.cache
.save_host(host
)
1465 except OrchestratorError
as e
:
1466 return f
'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
1469 def _invalidate_daemons_and_kick_serve(self
, filter_host
=None):
1471 self
.cache
.invalidate_host_daemons(filter_host
)
1473 for h
in self
.cache
.get_hosts():
1474 # Also discover daemons deployed manually
1475 self
.cache
.invalidate_host_daemons(h
)
1477 self
._kick
_serve
_loop
()
1480 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
1481 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
1483 self
._invalidate
_daemons
_and
_kick
_serve
()
1484 self
.log
.info('Kicked serve() loop to refresh all services')
1487 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
1489 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1490 for name
, dd
in dm
.items():
1491 if service_type
and service_type
!= dd
.daemon_type
:
1493 n
: str = dd
.service_name()
1494 if service_name
and service_name
!= n
:
1496 if dd
.daemon_type
== 'osd':
1498 OSDs do not know the affinity to their spec out of the box.
1500 n
= f
"osd.{dd.osdspec_affinity}"
1501 if not dd
.osdspec_affinity
:
1502 # If there is no osdspec_affinity, the spec should suffice for displaying
1504 if n
in self
.spec_store
.specs
:
1505 spec
= self
.spec_store
.specs
[n
]
1509 service_type
=dd
.daemon_type
,
1510 service_id
=dd
.service_id(),
1511 placement
=PlacementSpec(
1516 sm
[n
] = orchestrator
.ServiceDescription(
1517 last_refresh
=dd
.last_refresh
,
1518 container_image_id
=dd
.container_image_id
,
1519 container_image_name
=dd
.container_image_name
,
1521 events
=self
.events
.get_for_service(spec
.service_name()),
1523 if n
in self
.spec_store
.specs
:
1524 if dd
.daemon_type
== 'osd':
1526 The osd count can't be determined by the Placement spec.
1527 Showing an actual/expected representation cannot be determined
1528 here. So we're setting running = size for now.
1531 sm
[n
].size
= osd_count
1533 sm
[n
].size
= spec
.placement
.get_host_selection_size(
1534 self
.inventory
.all_specs())
1536 sm
[n
].created
= self
.spec_store
.spec_created
[n
]
1537 if service_type
== 'nfs':
1538 spec
= cast(NFSServiceSpec
, spec
)
1539 sm
[n
].rados_config_location
= spec
.rados_config_location()
1544 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1545 sm
[n
].last_refresh
= dd
.last_refresh
1546 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1547 sm
[n
].container_image_id
= 'mix'
1548 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1549 sm
[n
].container_image_name
= 'mix'
1550 for n
, spec
in self
.spec_store
.specs
.items():
1553 if service_type
is not None and service_type
!= spec
.service_type
:
1555 if service_name
is not None and service_name
!= n
:
1557 sm
[n
] = orchestrator
.ServiceDescription(
1559 size
=spec
.placement
.get_host_selection_size(self
.inventory
.all_specs()),
1561 events
=self
.events
.get_for_service(spec
.service_name()),
1563 if service_type
== 'nfs':
1564 spec
= cast(NFSServiceSpec
, spec
)
1565 sm
[n
].rados_config_location
= spec
.rados_config_location()
1566 return list(sm
.values())
1569 def list_daemons(self
,
1570 service_name
: Optional
[str] = None,
1571 daemon_type
: Optional
[str] = None,
1572 daemon_id
: Optional
[str] = None,
1573 host
: Optional
[str] = None,
1574 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
1576 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
1577 self
.log
.info('Kicked serve() loop to refresh all daemons')
1580 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1581 if host
and h
!= host
:
1583 for name
, dd
in dm
.items():
1584 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1586 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1588 if service_name
is not None and service_name
!= dd
.service_name():
1594 def service_action(self
, action
, service_name
) -> List
[str]:
1596 for host
, dm
in self
.cache
.daemons
.items():
1597 for name
, d
in dm
.items():
1598 if d
.matches_service(service_name
):
1599 args
.append((d
.daemon_type
, d
.daemon_id
,
1600 d
.hostname
, action
))
1601 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1602 return self
._daemon
_actions
(args
)
1605 def _daemon_actions(self
, daemon_type
, daemon_id
, host
, action
) -> str:
1606 with
set_exception_subject('daemon', DaemonDescription(
1607 daemon_type
=daemon_type
,
1610 return self
._daemon
_action
(daemon_type
, daemon_id
, host
, action
)
1612 def _daemon_action(self
, daemon_type
, daemon_id
, host
, action
, image
=None):
1613 daemon_spec
: CephadmDaemonSpec
= CephadmDaemonSpec(
1615 daemon_id
=daemon_id
,
1616 daemon_type
=daemon_type
,
1619 if image
is not None:
1620 if action
!= 'redeploy':
1621 raise OrchestratorError(
1622 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1623 if daemon_type
not in CEPH_TYPES
:
1624 raise OrchestratorError(
1625 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1626 f
'types are: {", ".join(CEPH_TYPES)}')
1628 self
.check_mon_command({
1629 'prefix': 'config set',
1630 'name': 'container_image',
1632 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
1635 if action
== 'redeploy':
1636 # stop, recreate the container+unit, then restart
1637 return self
._create
_daemon
(daemon_spec
)
1638 elif action
== 'reconfig':
1639 return self
._create
_daemon
(daemon_spec
, reconfig
=True)
1642 'start': ['reset-failed', 'start'],
1644 'restart': ['reset-failed', 'restart'],
1646 name
= daemon_spec
.name()
1647 for a
in actions
[action
]:
1649 out
, err
, code
= self
._run
_cephadm
(
1651 ['--name', name
, a
])
1653 self
.log
.exception(f
'`{host}: cephadm unit {name} {a}` failed')
1654 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1655 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
1656 self
.events
.for_daemon(name
, 'INFO', msg
)
1660 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str]=None) -> str:
1661 d
= self
.cache
.get_daemon(daemon_name
)
1663 self
.log
.info(f
'{action} daemon {daemon_name}')
1664 return self
._daemon
_action
(d
.daemon_type
, d
.daemon_id
,
1665 d
.hostname
, action
, image
=image
)
1668 def remove_daemons(self
, names
):
1669 # type: (List[str]) -> List[str]
1671 for host
, dm
in self
.cache
.daemons
.items():
1674 args
.append((name
, host
))
1676 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
1677 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
1678 return self
._remove
_daemons
(args
)
1681 def remove_service(self
, service_name
) -> str:
1682 self
.log
.info('Remove service %s' % service_name
)
1683 self
._trigger
_preview
_refresh
(service_name
=service_name
)
1684 found
= self
.spec_store
.rm(service_name
)
1686 self
._kick
_serve
_loop
()
1687 return 'Removed service %s' % service_name
1689 # must be idempotent: still a success.
1690 return f
'Failed to remove service. <{service_name}> was not found.'
1693 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
=False) -> List
[orchestrator
.InventoryHost
]:
1695 Return the storage inventory of hosts matching the given filter.
1697 :param host_filter: host filter
1700 - add filtering by label
1703 if host_filter
and host_filter
.hosts
:
1704 for h
in host_filter
.hosts
:
1705 self
.cache
.invalidate_host_devices(h
)
1707 for h
in self
.cache
.get_hosts():
1708 self
.cache
.invalidate_host_devices(h
)
1711 self
.log
.info('Kicked serve() loop to refresh devices')
1714 for host
, dls
in self
.cache
.devices
.items():
1715 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
1717 result
.append(orchestrator
.InventoryHost(host
,
1718 inventory
.Devices(dls
)))
1722 def zap_device(self
, host
, path
) -> str:
1723 self
.log
.info('Zap device %s:%s' % (host
, path
))
1724 out
, err
, code
= self
._run
_cephadm
(
1725 host
, 'osd', 'ceph-volume',
1726 ['--', 'lvm', 'zap', '--destroy', path
],
1728 self
.cache
.invalidate_host_devices(host
)
1730 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
1731 return '\n'.join(out
+ err
)
1734 def blink_device_light(self
, ident_fault
, on
, locs
) -> List
[str]:
1736 def blink(host
, dev
, path
):
1739 'local-disk-%s-led-%s' % (
1741 'on' if on
else 'off'),
1742 '--path', path
or dev
,
1744 out
, err
, code
= self
._run
_cephadm
(
1745 host
, 'osd', 'shell', ['--'] + cmd
,
1748 raise OrchestratorError(
1749 'Unable to affect %s light for %s:%s. Command: %s' % (
1750 ident_fault
, host
, dev
, ' '.join(cmd
)))
1751 self
.log
.info('Set %s light for %s:%s %s' % (
1752 ident_fault
, host
, dev
, 'on' if on
else 'off'))
1753 return "Set %s light for %s:%s %s" % (
1754 ident_fault
, host
, dev
, 'on' if on
else 'off')
1758 def get_osd_uuid_map(self
, only_up
=False):
1759 # type: (bool) -> Dict[str, str]
1760 osd_map
= self
.get('osd_map')
1762 for o
in osd_map
['osds']:
1763 # only include OSDs that have ever started in this map. this way
1764 # an interrupted osd create can be repeated and succeed the second
1766 osd_id
= o
.get('osd')
1768 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1769 if not only_up
or (o
['up_from'] > 0):
1770 r
[str(osd_id
)] = o
.get('uuid', '')
1773 def _trigger_preview_refresh(self
,
1774 specs
: Optional
[List
[DriveGroupSpec
]] = None,
1775 service_name
: Optional
[str] = None,
1777 # Only trigger a refresh when a spec has changed
1781 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
1782 # the to-be-preview spec != the actual spec, this means we need to
1783 # trigger a refresh, if the spec has been removed (==None) we need to
1785 if not preview_spec
or spec
!= preview_spec
:
1786 trigger_specs
.append(spec
)
1788 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
1789 if not any(trigger_specs
):
1792 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
1793 for host
in refresh_hosts
:
1794 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
1795 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
1798 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
1800 Deprecated. Please use `apply()` instead.
1802 Keeping this around to be compapatible to mgr/dashboard
1804 return [self
._apply
(spec
) for spec
in specs
]
1807 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
1808 return self
.osd_service
.create_from_spec(drive_group
)
1810 def _preview_osdspecs(self
,
1811 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
1814 return {'n/a': [{'error': True,
1815 'message': 'No OSDSpec or matching hosts found.'}]}
1816 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
1817 if not matching_hosts
:
1818 return {'n/a': [{'error': True,
1819 'message': 'No OSDSpec or matching hosts found.'}]}
1820 # Is any host still loading previews
1821 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
1823 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1824 return {'n/a': [{'error': True,
1825 'message': 'Preview data is being generated.. '
1826 'Please re-run this command in a bit.'}]}
1827 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1828 previews_for_specs
= {}
1829 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
1830 if host
not in matching_hosts
:
1833 for osd_report
in raw_reports
:
1834 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
1835 osd_reports
.append(osd_report
)
1836 previews_for_specs
.update({host
: osd_reports
})
1837 return previews_for_specs
1839 def _calc_daemon_deps(self
, daemon_type
, daemon_id
):
1841 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1842 'grafana': ['prometheus'],
1843 'alertmanager': ['mgr', 'alertmanager'],
1846 for dep_type
in need
.get(daemon_type
, []):
1847 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
1848 deps
.append(dd
.name())
1851 def _get_config_and_keyring(self
, daemon_type
, daemon_id
, host
,
1853 extra_ceph_config
=None):
1854 # type: (str, str, str, Optional[str], Optional[str]) -> Dict[str, Any]
1857 ename
= utils
.name_to_auth_entity(daemon_type
, daemon_id
, host
=host
)
1858 ret
, keyring
, err
= self
.check_mon_command({
1859 'prefix': 'auth get',
1864 ret
, config
, err
= self
.check_mon_command({
1865 "prefix": "config generate-minimal-conf",
1867 if extra_ceph_config
:
1868 config
+= extra_ceph_config
1875 def _create_daemon(self
,
1876 daemon_spec
: CephadmDaemonSpec
,
1878 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1883 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1884 daemon_type
=daemon_spec
.daemon_type
,
1885 daemon_id
=daemon_spec
.daemon_id
,
1886 hostname
=daemon_spec
.host
,
1887 ).service_id(), overwrite
=True):
1889 start_time
= datetime
.datetime
.utcnow()
1890 cephadm_config
, deps
= self
.cephadm_services
[daemon_spec
.daemon_type
].generate_config(daemon_spec
)
1892 daemon_spec
.extra_args
.extend(['--config-json', '-'])
1894 # TCP port to open in the host firewall
1895 if daemon_spec
.ports
:
1896 daemon_spec
.extra_args
.extend(['--tcp-ports', ' '.join(map(str,daemon_spec
.ports
))])
1898 # osd deployments needs an --osd-uuid arg
1899 if daemon_spec
.daemon_type
== 'osd':
1900 if not osd_uuid_map
:
1901 osd_uuid_map
= self
.get_osd_uuid_map()
1902 osd_uuid
= osd_uuid_map
.get(daemon_spec
.daemon_id
)
1904 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec
.daemon_id
)
1905 daemon_spec
.extra_args
.extend(['--osd-fsid', osd_uuid
])
1908 daemon_spec
.extra_args
.append('--reconfig')
1909 if self
.allow_ptrace
:
1910 daemon_spec
.extra_args
.append('--allow-ptrace')
1912 if self
.cache
.host_needs_registry_login(daemon_spec
.host
) and self
.registry_url
:
1913 self
._registry
_login
(daemon_spec
.host
, self
.registry_url
, self
.registry_username
, self
.registry_password
)
1915 self
.log
.info('%s daemon %s on %s' % (
1916 'Reconfiguring' if reconfig
else 'Deploying',
1917 daemon_spec
.name(), daemon_spec
.host
))
1919 out
, err
, code
= self
._run
_cephadm
(
1920 daemon_spec
.host
, daemon_spec
.name(), 'deploy',
1922 '--name', daemon_spec
.name(),
1923 ] + daemon_spec
.extra_args
,
1924 stdin
=json
.dumps(cephadm_config
))
1925 if not code
and daemon_spec
.host
in self
.cache
.daemons
:
1926 # prime cached service state with what we (should have)
1928 sd
= orchestrator
.DaemonDescription()
1929 sd
.daemon_type
= daemon_spec
.daemon_type
1930 sd
.daemon_id
= daemon_spec
.daemon_id
1931 sd
.hostname
= daemon_spec
.host
1933 sd
.status_desc
= 'starting'
1934 self
.cache
.add_daemon(daemon_spec
.host
, sd
)
1935 if daemon_spec
.daemon_type
in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
1936 self
.requires_post_actions
.add(daemon_spec
.daemon_type
)
1937 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1938 self
.cache
.update_daemon_config_deps(daemon_spec
.host
, daemon_spec
.name(), deps
, start_time
)
1939 self
.cache
.save_host(daemon_spec
.host
)
1940 msg
= "{} {} on host '{}'".format(
1941 'Reconfigured' if reconfig
else 'Deployed', daemon_spec
.name(), daemon_spec
.host
)
1943 self
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.INFO
, msg
)
1945 what
= 'reconfigure' if reconfig
else 'deploy'
1946 self
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.ERROR
, f
'Failed to {what}: {err}')
1950 def _remove_daemons(self
, name
, host
) -> str:
1951 return self
._remove
_daemon
(name
, host
)
1953 def _remove_daemon(self
, name
, host
) -> str:
1957 (daemon_type
, daemon_id
) = name
.split('.', 1)
1959 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1960 daemon_type
=daemon_type
,
1961 daemon_id
=daemon_id
,
1963 ).service_id(), overwrite
=True):
1966 self
.cephadm_services
[daemon_type
].pre_remove(daemon_id
)
1968 args
= ['--name', name
, '--force']
1969 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
1970 out
, err
, code
= self
._run
_cephadm
(
1971 host
, name
, 'rm-daemon', args
)
1973 # remove item from cache
1974 self
.cache
.rm_daemon(host
, name
)
1975 self
.cache
.invalidate_host_daemons(host
)
1976 return "Removed {} from host '{}'".format(name
, host
)
1978 def _config_fn(self
, service_type
) -> Optional
[Callable
[[ServiceSpec
], None]]:
1980 'mds': self
.mds_service
.config
,
1981 'rgw': self
.rgw_service
.config
,
1982 'nfs': self
.nfs_service
.config
,
1983 'iscsi': self
.iscsi_service
.config
,
1985 return cast(Callable
[[ServiceSpec
], None], fn
)
1987 def _apply_service(self
, spec
: ServiceSpec
) -> bool:
1989 Schedule a service. Deploy new daemons or remove old ones, depending
1990 on the target label and count specified in the placement.
1992 daemon_type
= spec
.service_type
1993 service_name
= spec
.service_name()
1995 self
.log
.debug('Skipping unmanaged service %s' % service_name
)
1997 if spec
.preview_only
:
1998 self
.log
.debug('Skipping preview_only service %s' % service_name
)
2000 self
.log
.debug('Applying service %s spec' % service_name
)
2002 config_func
= self
._config
_fn
(daemon_type
)
2004 if daemon_type
== 'osd':
2005 self
.osd_service
.create_from_spec(cast(DriveGroupSpec
, spec
))
2006 # TODO: return True would result in a busy loop
2009 daemons
= self
.cache
.get_daemons_by_service(service_name
)
2011 public_network
= None
2012 if daemon_type
== 'mon':
2013 ret
, out
, err
= self
.check_mon_command({
2014 'prefix': 'config get',
2016 'key': 'public_network',
2019 public_network
= out
.strip()
2020 self
.log
.debug('mon public_network is %s' % public_network
)
2022 def matches_network(host
):
2023 # type: (str) -> bool
2024 if not public_network
:
2026 # make sure we have 1 or more IPs for that network on that
2028 return len(self
.cache
.networks
[host
].get(public_network
, [])) > 0
2030 ha
= HostAssignment(
2032 get_hosts_func
=self
._get
_hosts
,
2033 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2034 filter_new_host
=matches_network
if daemon_type
== 'mon' else None,
2037 hosts
: List
[HostPlacementSpec
] = ha
.place()
2038 self
.log
.debug('Usable hosts: %s' % hosts
)
2043 if daemon_type
in ['mon', 'mgr'] and len(hosts
) < 1:
2044 self
.log
.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts
)
2050 add_daemon_hosts
: Set
[HostPlacementSpec
] = ha
.add_daemon_hosts(hosts
)
2051 self
.log
.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts
)
2053 remove_daemon_hosts
: Set
[orchestrator
.DaemonDescription
] = ha
.remove_daemon_hosts(hosts
)
2054 self
.log
.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts
)
2056 for host
, network
, name
in add_daemon_hosts
:
2057 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2058 prefix
=spec
.service_id
,
2061 if not did_config
and config_func
:
2062 if daemon_type
== 'rgw':
2063 rgw_config_func
= cast(Callable
[[RGWSpec
, str], None], config_func
)
2064 rgw_config_func(cast(RGWSpec
, spec
), daemon_id
)
2069 daemon_spec
= self
.cephadm_services
[daemon_type
].make_daemon_spec(host
, daemon_id
, network
, spec
)
2070 self
.log
.debug('Placing %s.%s on host %s' % (
2071 daemon_type
, daemon_id
, host
))
2073 self
.cephadm_services
[daemon_type
].create(daemon_spec
)
2075 # add to daemon list so next name(s) will also be unique
2076 sd
= orchestrator
.DaemonDescription(
2078 daemon_type
=daemon_type
,
2079 daemon_id
=daemon_id
,
2085 def _ok_to_stop(remove_daemon_hosts
: Set
[orchestrator
.DaemonDescription
]) -> bool:
2086 daemon_ids
= [d
.daemon_id
for d
in remove_daemon_hosts
]
2087 r
= self
.cephadm_services
[daemon_type
].ok_to_stop(daemon_ids
)
2090 while remove_daemon_hosts
and not _ok_to_stop(remove_daemon_hosts
):
2091 # let's find a subset that is ok-to-stop
2092 remove_daemon_hosts
.pop()
2093 for d
in remove_daemon_hosts
:
2094 # NOTE: we are passing the 'force' flag here, which means
2095 # we can delete a mon instances data.
2096 self
._remove
_daemon
(d
.name(), d
.hostname
)
2101 def _apply_all_services(self
):
2103 specs
= [] # type: List[ServiceSpec]
2104 for sn
, spec
in self
.spec_store
.specs
.items():
2108 if self
._apply
_service
(spec
):
2110 except Exception as e
:
2111 self
.log
.exception('Failed to apply %s spec %s: %s' % (
2112 spec
.service_name(), spec
, e
))
2113 self
.events
.for_service(spec
, 'ERROR', 'Failed to apply: ' + str(e
))
2117 def _check_pool_exists(self
, pool
, service_name
):
2118 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
2119 if not self
.rados
.pool_exists(pool
):
2120 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
2121 f
'service {service_name}')
2123 def _check_daemons(self
):
2125 daemons
= self
.cache
.get_daemons()
2126 daemons_post
: Dict
[str, List
[orchestrator
.DaemonDescription
]] = defaultdict(list)
2129 spec
= self
.spec_store
.specs
.get(dd
.service_name(), None)
2130 if not spec
and dd
.daemon_type
not in ['mon', 'mgr', 'osd']:
2131 # (mon and mgr specs should always exist; osds aren't matched
2132 # to a service spec)
2133 self
.log
.info('Removing orphan daemon %s...' % dd
.name())
2134 self
._remove
_daemon
(dd
.name(), dd
.hostname
)
2136 # ignore unmanaged services
2137 if spec
and spec
.unmanaged
:
2140 # These daemon types require additional configs after creation
2141 if dd
.daemon_type
in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
2142 daemons_post
[dd
.daemon_type
].append(dd
)
2144 if self
.cephadm_services
[dd
.daemon_type
].get_active_daemon(
2145 self
.cache
.get_daemons_by_service(dd
.service_name())).daemon_id
== dd
.daemon_id
:
2148 dd
.is_active
= False
2150 deps
= self
._calc
_daemon
_deps
(dd
.daemon_type
, dd
.daemon_id
)
2151 last_deps
, last_config
= self
.cache
.get_daemon_last_config_deps(
2152 dd
.hostname
, dd
.name())
2153 if last_deps
is None:
2157 self
.log
.info('Reconfiguring %s (unknown last config time)...'% (
2160 elif last_deps
!= deps
:
2161 self
.log
.debug('%s deps %s -> %s' % (dd
.name(), last_deps
,
2163 self
.log
.info('Reconfiguring %s (dependencies changed)...' % (
2166 elif self
.last_monmap
and \
2167 self
.last_monmap
> last_config
and \
2168 dd
.daemon_type
in CEPH_TYPES
:
2169 self
.log
.info('Reconfiguring %s (monmap changed)...' % dd
.name())
2173 self
._create
_daemon
(
2176 daemon_id
=dd
.daemon_id
,
2177 daemon_type
=dd
.daemon_type
),
2179 except OrchestratorError
as e
:
2180 self
.events
.from_orch_error(e
)
2181 if dd
.daemon_type
in daemons_post
:
2182 del daemons_post
[dd
.daemon_type
]
2184 except Exception as e
:
2185 self
.events
.for_daemon_from_exception(dd
.name(), e
)
2186 if dd
.daemon_type
in daemons_post
:
2187 del daemons_post
[dd
.daemon_type
]
2190 # do daemon post actions
2191 for daemon_type
, daemon_descs
in daemons_post
.items():
2192 if daemon_type
in self
.requires_post_actions
:
2193 self
.requires_post_actions
.remove(daemon_type
)
2194 self
._get
_cephadm
_service
(daemon_type
).daemon_check_post(daemon_descs
)
2196 def _add_daemon(self
, daemon_type
, spec
,
2197 create_func
: Callable
[..., T
], config_func
=None) -> List
[T
]:
2199 Add (and place) a daemon. Require explicit host placement. Do not
2200 schedule, and do not apply the related scheduling limitations.
2202 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
2203 if not spec
.placement
.hosts
:
2204 raise OrchestratorError('must specify host(s) to deploy on')
2205 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
2206 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2207 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
2208 spec
.placement
.hosts
, count
,
2209 create_func
, config_func
)
2211 def _create_daemons(self
, daemon_type
, spec
, daemons
,
2213 create_func
: Callable
[..., T
], config_func
=None) -> List
[T
]:
2214 if count
> len(hosts
):
2215 raise OrchestratorError('too few hosts: want %d, have %s' % (
2220 args
= [] # type: List[CephadmDaemonSpec]
2221 for host
, network
, name
in hosts
:
2222 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2223 prefix
=spec
.service_id
,
2226 if not did_config
and config_func
:
2227 if daemon_type
== 'rgw':
2228 config_func(spec
, daemon_id
)
2233 daemon_spec
= self
.cephadm_services
[daemon_type
].make_daemon_spec(host
, daemon_id
, network
, spec
)
2234 self
.log
.debug('Placing %s.%s on host %s' % (
2235 daemon_type
, daemon_id
, host
))
2236 args
.append(daemon_spec
)
2238 # add to daemon list so next name(s) will also be unique
2239 sd
= orchestrator
.DaemonDescription(
2241 daemon_type
=daemon_type
,
2242 daemon_id
=daemon_id
,
2247 def create_func_map(*args
):
2248 return create_func(*args
)
2250 return create_func_map(args
)
2253 def apply_mon(self
, spec
) -> str:
2254 return self
._apply
(spec
)
2257 def add_mon(self
, spec
):
2258 # type: (ServiceSpec) -> List[str]
2259 return self
._add
_daemon
('mon', spec
, self
.mon_service
.create
)
2262 def add_mgr(self
, spec
):
2263 # type: (ServiceSpec) -> List[str]
2264 return self
._add
_daemon
('mgr', spec
, self
.mgr_service
.create
)
2266 def _apply(self
, spec
: GenericSpec
) -> str:
2267 self
.migration
.verify_no_migration()
2269 if spec
.service_type
== 'host':
2270 return self
._add
_host
(cast(HostSpec
, spec
))
2272 if spec
.service_type
== 'osd':
2273 # _trigger preview refresh needs to be smart and
2274 # should only refresh if a change has been detected
2275 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
2277 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
2279 def _plan(self
, spec
: ServiceSpec
):
2280 if spec
.service_type
== 'osd':
2281 return {'service_name': spec
.service_name(),
2282 'service_type': spec
.service_type
,
2283 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
2285 ha
= HostAssignment(
2287 get_hosts_func
=self
._get
_hosts
,
2288 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2293 add_daemon_hosts
= ha
.add_daemon_hosts(hosts
)
2294 remove_daemon_hosts
= ha
.remove_daemon_hosts(hosts
)
2297 'service_name': spec
.service_name(),
2298 'service_type': spec
.service_type
,
2299 'add': [hs
.hostname
for hs
in add_daemon_hosts
],
2300 'remove': [d
.hostname
for d
in remove_daemon_hosts
]
2304 def plan(self
, specs
: List
[GenericSpec
]) -> List
:
2305 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2306 'to the current inventory setup. If any on these conditions changes, the \n'
2307 'preview will be invalid. Please make sure to have a minimal \n'
2308 'timeframe between planning and applying the specs.'}]
2309 if any([spec
.service_type
== 'host' for spec
in specs
]):
2310 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2312 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
2315 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
2316 if spec
.placement
.is_empty():
2317 # fill in default placement
2319 'mon': PlacementSpec(count
=5),
2320 'mgr': PlacementSpec(count
=2),
2321 'mds': PlacementSpec(count
=2),
2322 'rgw': PlacementSpec(count
=2),
2323 'iscsi': PlacementSpec(count
=1),
2324 'rbd-mirror': PlacementSpec(count
=2),
2325 'nfs': PlacementSpec(count
=1),
2326 'grafana': PlacementSpec(count
=1),
2327 'alertmanager': PlacementSpec(count
=1),
2328 'prometheus': PlacementSpec(count
=1),
2329 'node-exporter': PlacementSpec(host_pattern
='*'),
2330 'crash': PlacementSpec(host_pattern
='*'),
2332 spec
.placement
= defaults
[spec
.service_type
]
2333 elif spec
.service_type
in ['mon', 'mgr'] and \
2334 spec
.placement
.count
is not None and \
2335 spec
.placement
.count
< 1:
2336 raise OrchestratorError('cannot scale %s service below 1' % (
2341 get_hosts_func
=self
._get
_hosts
,
2342 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2345 self
.log
.info('Saving service %s spec with placement %s' % (
2346 spec
.service_name(), spec
.placement
.pretty_str()))
2347 self
.spec_store
.save(spec
)
2348 self
._kick
_serve
_loop
()
2349 return "Scheduled %s update..." % spec
.service_name()
2352 def apply(self
, specs
: List
[GenericSpec
]) -> List
[str]:
2355 results
.append(self
._apply
(spec
))
2359 def apply_mgr(self
, spec
) -> str:
2360 return self
._apply
(spec
)
2363 def add_mds(self
, spec
: ServiceSpec
) -> List
[str]:
2364 return self
._add
_daemon
('mds', spec
, self
.mds_service
.create
, self
.mds_service
.config
)
2367 def apply_mds(self
, spec
: ServiceSpec
) -> str:
2368 return self
._apply
(spec
)
2371 def add_rgw(self
, spec
) -> List
[str]:
2372 return self
._add
_daemon
('rgw', spec
, self
.rgw_service
.create
, self
.rgw_service
.config
)
2375 def apply_rgw(self
, spec
) -> str:
2376 return self
._apply
(spec
)
2379 def add_iscsi(self
, spec
):
2380 # type: (ServiceSpec) -> List[str]
2381 return self
._add
_daemon
('iscsi', spec
, self
.iscsi_service
.create
, self
.iscsi_service
.config
)
2384 def apply_iscsi(self
, spec
) -> str:
2385 return self
._apply
(spec
)
2388 def add_rbd_mirror(self
, spec
) -> List
[str]:
2389 return self
._add
_daemon
('rbd-mirror', spec
, self
.rbd_mirror_service
.create
)
2392 def apply_rbd_mirror(self
, spec
) -> str:
2393 return self
._apply
(spec
)
2396 def add_nfs(self
, spec
) -> List
[str]:
2397 return self
._add
_daemon
('nfs', spec
, self
.nfs_service
.create
, self
.nfs_service
.config
)
2400 def apply_nfs(self
, spec
) -> str:
2401 return self
._apply
(spec
)
2403 def _get_dashboard_url(self
):
2405 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
2408 def add_prometheus(self
, spec
) -> List
[str]:
2409 return self
._add
_daemon
('prometheus', spec
, self
.prometheus_service
.create
)
2412 def apply_prometheus(self
, spec
) -> str:
2413 return self
._apply
(spec
)
2416 def add_node_exporter(self
, spec
):
2417 # type: (ServiceSpec) -> List[str]
2418 return self
._add
_daemon
('node-exporter', spec
,
2419 self
.node_exporter_service
.create
)
2422 def apply_node_exporter(self
, spec
) -> str:
2423 return self
._apply
(spec
)
2426 def add_crash(self
, spec
):
2427 # type: (ServiceSpec) -> List[str]
2428 return self
._add
_daemon
('crash', spec
,
2429 self
.crash_service
.create
)
2432 def apply_crash(self
, spec
) -> str:
2433 return self
._apply
(spec
)
2436 def add_grafana(self
, spec
):
2437 # type: (ServiceSpec) -> List[str]
2438 return self
._add
_daemon
('grafana', spec
, self
.grafana_service
.create
)
2441 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
2442 return self
._apply
(spec
)
2445 def add_alertmanager(self
, spec
):
2446 # type: (ServiceSpec) -> List[str]
2447 return self
._add
_daemon
('alertmanager', spec
, self
.alertmanager_service
.create
)
2450 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
2451 return self
._apply
(spec
)
2453 def _get_container_image_id(self
, image_name
):
2454 # pick a random host...
2456 for host_name
in self
.inventory
.keys():
2460 raise OrchestratorError('no hosts defined')
2461 if self
.cache
.host_needs_registry_login(host
) and self
.registry_url
:
2462 self
._registry
_login
(host
, self
.registry_url
, self
.registry_username
, self
.registry_password
)
2463 out
, err
, code
= self
._run
_cephadm
(
2464 host
, '', 'pull', [],
2469 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2470 image_name
, host
, '\n'.join(out
)))
2471 j
= json
.loads('\n'.join(out
))
2472 image_id
= j
.get('image_id')
2473 ceph_version
= j
.get('ceph_version')
2474 self
.log
.debug('image %s -> id %s version %s' %
2475 (image_name
, image_id
, ceph_version
))
2476 return image_id
, ceph_version
2479 def upgrade_check(self
, image
, version
) -> str:
2481 target_name
= self
.container_image_base
+ ':v' + version
2485 raise OrchestratorError('must specify either image or version')
2487 target_id
, target_version
= self
._get
_container
_image
_id
(target_name
)
2488 self
.log
.debug('Target image %s id %s version %s' % (
2489 target_name
, target_id
, target_version
))
2491 'target_name': target_name
,
2492 'target_id': target_id
,
2493 'target_version': target_version
,
2494 'needs_update': dict(),
2495 'up_to_date': list(),
2497 for host
, dm
in self
.cache
.daemons
.items():
2498 for name
, dd
in dm
.items():
2499 if target_id
== dd
.container_image_id
:
2500 r
['up_to_date'].append(dd
.name())
2502 r
['needs_update'][dd
.name()] = {
2503 'current_name': dd
.container_image_name
,
2504 'current_id': dd
.container_image_id
,
2505 'current_version': dd
.version
,
2507 return json
.dumps(r
, indent
=4, sort_keys
=True)
2510 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
2511 return self
.upgrade
.upgrade_status()
2514 def upgrade_start(self
, image
, version
) -> str:
2515 return self
.upgrade
.upgrade_start(image
, version
)
2518 def upgrade_pause(self
) -> str:
2519 return self
.upgrade
.upgrade_pause()
2522 def upgrade_resume(self
) -> str:
2523 return self
.upgrade
.upgrade_resume()
2526 def upgrade_stop(self
) -> str:
2527 return self
.upgrade
.upgrade_stop()
2530 def remove_osds(self
, osd_ids
: List
[str],
2531 replace
: bool = False,
2532 force
: bool = False) -> str:
2534 Takes a list of OSDs and schedules them for removal.
2535 The function that takes care of the actual removal is
2536 process_removal_queue().
2539 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
2540 to_remove_daemons
= list()
2541 for daemon
in daemons
:
2542 if daemon
.daemon_id
in osd_ids
:
2543 to_remove_daemons
.append(daemon
)
2544 if not to_remove_daemons
:
2545 return f
"Unable to find OSDs: {osd_ids}"
2547 for daemon
in to_remove_daemons
:
2549 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
2552 hostname
=daemon
.hostname
,
2553 fullname
=daemon
.name(),
2554 process_started_at
=datetime
.datetime
.utcnow(),
2555 remove_util
=self
.rm_util
))
2556 except NotFoundError
:
2557 return f
"Unable to find OSDs: {osd_ids}"
2559 # trigger the serve loop to initiate the removal
2560 self
._kick
_serve
_loop
()
2561 return "Scheduled OSD(s) for removal"
2564 def stop_remove_osds(self
, osd_ids
: List
[str]):
2566 Stops a `removal` process for a List of OSDs.
2567 This will revert their weight and remove it from the osds_to_remove queue
2569 for osd_id
in osd_ids
:
2571 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
2572 remove_util
=self
.rm_util
))
2573 except (NotFoundError
, KeyError):
2574 return f
'Unable to find OSD in the queue: {osd_id}'
2576 # trigger the serve loop to halt the removal
2577 self
._kick
_serve
_loop
()
2578 return "Stopped OSD(s) removal"
2581 def remove_osds_status(self
):
2583 The CLI call to retrieve an osd removal report
2585 return self
.to_remove_osds
.all_osds()