6 from collections
import defaultdict
7 from configparser
import ConfigParser
8 from contextlib
import contextmanager
9 from functools
import wraps
10 from tempfile
import TemporaryDirectory
11 from threading
import Event
14 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
15 Any
, Set
, TYPE_CHECKING
, cast
, Iterator
, Union
, NamedTuple
22 import multiprocessing
.pool
25 from ceph
.deployment
import inventory
26 from ceph
.deployment
.drive_group
import DriveGroupSpec
27 from ceph
.deployment
.service_spec
import \
28 NFSServiceSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
, \
29 CustomContainerSpec
, HostPlacementSpec
30 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
31 from cephadm
.serve
import CephadmServe
32 from cephadm
.services
.cephadmservice
import CephadmDaemonSpec
34 from mgr_module
import MgrModule
, HandleCommandResult
36 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
37 CLICommandMeta
, OrchestratorEvent
, set_exception_subject
, DaemonDescription
38 from orchestrator
._interface
import GenericSpec
42 from .migrations
import Migrations
43 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
44 RbdMirrorService
, CrashService
, CephadmService
45 from .services
.container
import CustomContainerService
46 from .services
.iscsi
import IscsiService
47 from .services
.nfs
import NFSService
48 from .services
.osd
import RemoveUtil
, OSDRemovalQueue
, OSDService
, OSD
, NotFoundError
49 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
51 from .schedule
import HostAssignment
52 from .inventory
import Inventory
, SpecStore
, HostCache
, EventStore
53 from .upgrade
import CEPH_UPGRADE_ORDER
, CephadmUpgrade
54 from .template
import TemplateMgr
55 from .utils
import forall_hosts
, CephadmNoImage
, cephadmNoImage
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils
.version
import StrictVersion
62 if StrictVersion(remoto
.__version
__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self
: Any
) -> bool:
64 return self
.gateway
.hasreceiver()
66 from remoto
.backends
import BaseConnection
67 BaseConnection
.has_connection
= remoto_has_connection
69 import execnet
.gateway_bootstrap
70 except ImportError as e
:
72 remoto_import_error
= str(e
)
75 from typing
import List
79 logger
= logging
.getLogger(__name__
)
83 DEFAULT_SSH_CONFIG
= """
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
91 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
94 class CephadmCompletion(orchestrator
.Completion
[T
]):
95 def evaluate(self
) -> None:
99 def trivial_completion(f
: Callable
[..., T
]) -> Callable
[..., CephadmCompletion
[T
]]:
101 Decorator to make CephadmCompletion methods return
102 a completion object that executes themselves.
106 def wrapper(*args
: Any
, **kwargs
: Any
) -> CephadmCompletion
:
107 return CephadmCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
112 class ContainerInspectInfo(NamedTuple
):
114 ceph_version
: Optional
[str]
115 repo_digest
: Optional
[str]
118 @six.add_metaclass(CLICommandMeta
)
119 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
121 _STORE_HOST_PREFIX
= "host"
124 NATIVE_OPTIONS
= [] # type: List[Any]
125 MODULE_OPTIONS
: List
[dict] = [
127 'name': 'ssh_config_file',
130 'desc': 'customized SSH config file to connect to managed hosts',
133 'name': 'device_cache_timeout',
136 'desc': 'seconds to cache device inventory',
139 'name': 'daemon_cache_timeout',
142 'desc': 'seconds to cache service (daemon) inventory',
145 'name': 'facts_cache_timeout',
148 'desc': 'seconds to cache host facts data',
151 'name': 'host_check_interval',
154 'desc': 'how frequently to perform a host check',
159 'enum_allowed': ['root', 'cephadm-package'],
161 'desc': 'mode for remote execution of cephadm',
164 'name': 'container_image_base',
165 'default': 'docker.io/ceph/ceph',
166 'desc': 'Container image name, without the tag',
170 'name': 'container_image_prometheus',
171 'default': 'docker.io/prom/prometheus:v2.18.1',
172 'desc': 'Prometheus container image',
175 'name': 'container_image_grafana',
176 'default': 'docker.io/ceph/ceph-grafana:6.7.4',
177 'desc': 'Prometheus container image',
180 'name': 'container_image_alertmanager',
181 'default': 'docker.io/prom/alertmanager:v0.20.0',
182 'desc': 'Prometheus container image',
185 'name': 'container_image_node_exporter',
186 'default': 'docker.io/prom/node-exporter:v0.18.1',
187 'desc': 'Prometheus container image',
190 'name': 'warn_on_stray_hosts',
193 'desc': 'raise a health warning if daemons are detected on a host '
194 'that is not managed by cephadm',
197 'name': 'warn_on_stray_daemons',
200 'desc': 'raise a health warning if daemons are detected '
201 'that are not managed by cephadm',
204 'name': 'warn_on_failed_host_check',
207 'desc': 'raise a health warning if the host check fails',
210 'name': 'log_to_cluster',
213 'desc': 'log to the "cephadm" cluster log channel"',
216 'name': 'allow_ptrace',
219 'desc': 'allow SYS_PTRACE capability on ceph containers',
220 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
221 'process with gdb or strace. Enabling this options '
222 'can allow debugging daemons that encounter problems '
226 'name': 'container_init',
229 'desc': 'Run podman/docker with `--init`',
232 'name': 'prometheus_alerts_path',
234 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
235 'desc': 'location of alerts to include in prometheus deployments',
238 'name': 'migration_current',
241 'desc': 'internal - do not modify',
242 # used to track track spec and other data migrations.
245 'name': 'config_dashboard',
248 'desc': 'manage configs like API endpoints in Dashboard.'
251 'name': 'manage_etc_ceph_ceph_conf',
254 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
257 'name': 'registry_url',
260 'desc': 'Custom repository url'
263 'name': 'registry_username',
266 'desc': 'Custom repository username'
269 'name': 'registry_password',
272 'desc': 'Custom repository password'
275 'name': 'use_repo_digest',
278 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
282 def __init__(self
, *args
: Any
, **kwargs
: Any
):
283 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
284 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
285 self
.last_monmap
: Optional
[datetime
.datetime
] = None
291 if self
.get_store('pause'):
296 # for mypy which does not run the code
298 self
.ssh_config_file
= None # type: Optional[str]
299 self
.device_cache_timeout
= 0
300 self
.daemon_cache_timeout
= 0
301 self
.facts_cache_timeout
= 0
302 self
.host_check_interval
= 0
304 self
.container_image_base
= ''
305 self
.container_image_prometheus
= ''
306 self
.container_image_grafana
= ''
307 self
.container_image_alertmanager
= ''
308 self
.container_image_node_exporter
= ''
309 self
.warn_on_stray_hosts
= True
310 self
.warn_on_stray_daemons
= True
311 self
.warn_on_failed_host_check
= True
312 self
.allow_ptrace
= False
313 self
.container_init
= False
314 self
.prometheus_alerts_path
= ''
315 self
.migration_current
: Optional
[int] = None
316 self
.config_dashboard
= True
317 self
.manage_etc_ceph_ceph_conf
= True
318 self
.registry_url
: Optional
[str] = None
319 self
.registry_username
: Optional
[str] = None
320 self
.registry_password
: Optional
[str] = None
321 self
.use_repo_digest
= False
323 self
._cons
: Dict
[str, Tuple
[remoto
.backends
.BaseConnection
,
324 remoto
.backends
.LegacyModuleExecute
]] = {}
326 self
.notify('mon_map', None)
329 path
= self
.get_ceph_option('cephadm_path')
331 with
open(path
, 'r') as f
:
332 self
._cephadm
= f
.read()
333 except (IOError, TypeError) as e
:
334 raise RuntimeError("unable to read cephadm at '%s': %s" % (
337 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
341 CephadmOrchestrator
.instance
= self
343 self
.upgrade
= CephadmUpgrade(self
)
345 self
.health_checks
: Dict
[str, dict] = {}
347 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
349 self
.inventory
= Inventory(self
)
351 self
.cache
= HostCache(self
)
354 self
.to_remove_osds
= OSDRemovalQueue(self
)
355 self
.to_remove_osds
.load_from_store()
357 self
.spec_store
= SpecStore(self
)
358 self
.spec_store
.load()
360 # ensure the host lists are in sync
361 for h
in self
.inventory
.keys():
362 if h
not in self
.cache
.daemons
:
363 self
.cache
.prime_empty_host(h
)
364 for h
in self
.cache
.get_hosts():
365 if h
not in self
.inventory
:
366 self
.cache
.rm_host(h
)
369 self
.events
= EventStore(self
)
370 self
.offline_hosts
: Set
[str] = set()
372 self
.migration
= Migrations(self
)
375 self
.osd_service
= OSDService(self
)
376 self
.nfs_service
= NFSService(self
)
377 self
.mon_service
= MonService(self
)
378 self
.mgr_service
= MgrService(self
)
379 self
.mds_service
= MdsService(self
)
380 self
.rgw_service
= RgwService(self
)
381 self
.rbd_mirror_service
= RbdMirrorService(self
)
382 self
.grafana_service
= GrafanaService(self
)
383 self
.alertmanager_service
= AlertmanagerService(self
)
384 self
.prometheus_service
= PrometheusService(self
)
385 self
.node_exporter_service
= NodeExporterService(self
)
386 self
.crash_service
= CrashService(self
)
387 self
.iscsi_service
= IscsiService(self
)
388 self
.container_service
= CustomContainerService(self
)
389 self
.cephadm_services
= {
390 'mon': self
.mon_service
,
391 'mgr': self
.mgr_service
,
392 'osd': self
.osd_service
,
393 'mds': self
.mds_service
,
394 'rgw': self
.rgw_service
,
395 'rbd-mirror': self
.rbd_mirror_service
,
396 'nfs': self
.nfs_service
,
397 'grafana': self
.grafana_service
,
398 'alertmanager': self
.alertmanager_service
,
399 'prometheus': self
.prometheus_service
,
400 'node-exporter': self
.node_exporter_service
,
401 'crash': self
.crash_service
,
402 'iscsi': self
.iscsi_service
,
403 'container': self
.container_service
,
406 self
.template
= TemplateMgr(self
)
408 self
.requires_post_actions
: Set
[str] = set()
410 def shutdown(self
) -> None:
411 self
.log
.debug('shutdown')
412 self
._worker
_pool
.close()
413 self
._worker
_pool
.join()
417 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
418 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
419 return self
.cephadm_services
[service_type
]
421 def _kick_serve_loop(self
) -> None:
422 self
.log
.debug('_kick_serve_loop')
425 # function responsible for logging single host into custom registry
426 def _registry_login(self
, host
: str, url
: Optional
[str], username
: Optional
[str], password
: Optional
[str]) -> Optional
[str]:
427 self
.log
.debug(f
"Attempting to log host {host} into custom registry @ {url}")
428 # want to pass info over stdin rather than through normal list of args
429 args_str
= json
.dumps({
431 'username': username
,
432 'password': password
,
434 out
, err
, code
= self
._run
_cephadm
(
435 host
, 'mon', 'registry-login',
436 ['--registry-json', '-'], stdin
=args_str
, error_ok
=True)
438 return f
"Host {host} failed to login to {url} as {username} with given password"
441 def serve(self
) -> None:
443 The main loop of cephadm.
445 A command handler will typically change the declarative state
446 of cephadm. This loop will then attempt to apply this new state.
448 serve
= CephadmServe(self
)
451 def set_container_image(self
, entity
: str, image
: str) -> None:
452 self
.check_mon_command({
453 'prefix': 'config set',
454 'name': 'container_image',
459 def config_notify(self
) -> None:
461 This method is called whenever one of our config options is changed.
463 TODO: this method should be moved into mgr_module.py
465 for opt
in self
.MODULE_OPTIONS
:
467 opt
['name'], # type: ignore
468 self
.get_module_option(opt
['name'])) # type: ignore
469 self
.log
.debug(' mgr option %s = %s',
470 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
471 for opt
in self
.NATIVE_OPTIONS
:
474 self
.get_ceph_option(opt
))
475 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
479 def notify(self
, notify_type
: str, notify_id
: Optional
[str]) -> None:
480 if notify_type
== "mon_map":
481 # get monmap mtime so we can refresh configs when mons change
482 monmap
= self
.get('mon_map')
483 self
.last_monmap
= str_to_datetime(monmap
['modified'])
484 if self
.last_monmap
and self
.last_monmap
> datetime_now():
485 self
.last_monmap
= None # just in case clocks are skewed
486 if getattr(self
, 'manage_etc_ceph_ceph_conf', False):
487 # getattr, due to notify() being called before config_notify()
488 self
._kick
_serve
_loop
()
489 if notify_type
== "pg_summary":
490 self
._trigger
_osd
_removal
()
492 def _trigger_osd_removal(self
) -> None:
493 data
= self
.get("osd_stats")
494 for osd
in data
.get('osd_stats', []):
495 if osd
.get('num_pgs') == 0:
496 # if _ANY_ osd that is currently in the queue appears to be empty,
497 # start the removal process
498 if int(osd
.get('osd')) in self
.to_remove_osds
.as_osd_ids():
499 self
.log
.debug(f
"Found empty osd. Starting removal process")
500 # if the osd that is now empty is also part of the removal queue
502 self
._kick
_serve
_loop
()
504 def pause(self
) -> None:
506 self
.log
.info('Paused')
507 self
.set_store('pause', 'true')
509 # wake loop so we update the health status
510 self
._kick
_serve
_loop
()
512 def resume(self
) -> None:
514 self
.log
.info('Resumed')
516 self
.set_store('pause', None)
517 # unconditionally wake loop so that 'orch resume' can be used to kick
519 self
._kick
_serve
_loop
()
521 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
523 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
525 Generate a unique random service name
527 suffix
= daemon_type
not in [
528 'mon', 'crash', 'nfs',
529 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
533 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
534 raise orchestrator
.OrchestratorValidationError(
535 f
'name {daemon_type}.{forcename} already in use')
539 host
= host
.split('.')[0]
547 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
549 if len([d
for d
in existing
if d
.daemon_id
== name
]):
551 raise orchestrator
.OrchestratorValidationError(
552 f
'name {daemon_type}.{name} already in use')
553 self
.log
.debug('name %s exists, trying again', name
)
557 def _reconfig_ssh(self
) -> None:
558 temp_files
= [] # type: list
559 ssh_options
= [] # type: List[str]
562 ssh_config_fname
= self
.ssh_config_file
563 ssh_config
= self
.get_store("ssh_config")
564 if ssh_config
is not None or ssh_config_fname
is None:
566 ssh_config
= DEFAULT_SSH_CONFIG
567 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
568 os
.fchmod(f
.fileno(), 0o600)
569 f
.write(ssh_config
.encode('utf-8'))
570 f
.flush() # make visible to other processes
572 ssh_config_fname
= f
.name
574 self
.validate_ssh_config_fname(ssh_config_fname
)
575 ssh_options
+= ['-F', ssh_config_fname
]
576 self
.ssh_config
= ssh_config
579 ssh_key
= self
.get_store("ssh_identity_key")
580 ssh_pub
= self
.get_store("ssh_identity_pub")
581 self
.ssh_pub
= ssh_pub
582 self
.ssh_key
= ssh_key
583 if ssh_key
and ssh_pub
:
584 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
585 tkey
.write(ssh_key
.encode('utf-8'))
586 os
.fchmod(tkey
.fileno(), 0o600)
587 tkey
.flush() # make visible to other processes
588 tpub
= open(tkey
.name
+ '.pub', 'w')
589 os
.fchmod(tpub
.fileno(), 0o600)
591 tpub
.flush() # make visible to other processes
592 temp_files
+= [tkey
, tpub
]
593 ssh_options
+= ['-i', tkey
.name
]
595 self
._temp
_files
= temp_files
597 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
599 self
._ssh
_options
= None
601 if self
.mode
== 'root':
602 self
.ssh_user
= self
.get_store('ssh_user', default
='root')
603 elif self
.mode
== 'cephadm-package':
604 self
.ssh_user
= 'cephadm'
608 def validate_ssh_config_content(self
, ssh_config
: Optional
[str]) -> None:
609 if ssh_config
is None or len(ssh_config
.strip()) == 0:
610 raise OrchestratorValidationError('ssh_config cannot be empty')
611 # StrictHostKeyChecking is [yes|no] ?
612 l
= re
.findall(r
'StrictHostKeyChecking\s+.*', ssh_config
)
614 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
616 if 'ask' in s
.lower():
617 raise OrchestratorValidationError(f
'ssh_config cannot contain: \'{s}\'')
619 def validate_ssh_config_fname(self
, ssh_config_fname
: str) -> None:
620 if not os
.path
.isfile(ssh_config_fname
):
621 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
624 def _reset_con(self
, host
: str) -> None:
625 conn
, r
= self
._cons
.get(host
, (None, None))
627 self
.log
.debug('_reset_con close %s' % host
)
631 def _reset_cons(self
) -> None:
632 for host
, conn_and_r
in self
._cons
.items():
633 self
.log
.debug('_reset_cons close %s' % host
)
638 def offline_hosts_remove(self
, host
: str) -> None:
639 if host
in self
.offline_hosts
:
640 self
.offline_hosts
.remove(host
)
643 def can_run() -> Tuple
[bool, str]:
644 if remoto
is not None:
647 return False, "loading remoto library:{}".format(
650 def available(self
) -> Tuple
[bool, str]:
652 The cephadm orchestrator is always available.
654 ok
, err
= self
.can_run()
657 if not self
.ssh_key
or not self
.ssh_pub
:
658 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
661 def process(self
, completions
: List
[CephadmCompletion
]) -> None:
663 Does nothing, as completions are processed in another thread.
666 self
.log
.debug("process: completions={0}".format(
667 orchestrator
.pretty_print(completions
)))
669 for p
in completions
:
672 @orchestrator._cli
_write
_command
(
673 prefix
='cephadm set-ssh-config',
674 desc
='Set the ssh_config file (use -i <ssh_config>)')
675 def _set_ssh_config(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
677 Set an ssh_config file provided from stdin
679 if inbuf
== self
.ssh_config
:
680 return 0, "value unchanged", ""
681 self
.validate_ssh_config_content(inbuf
)
682 self
.set_store("ssh_config", inbuf
)
683 self
.log
.info('Set ssh_config')
687 @orchestrator._cli
_write
_command
(
688 prefix
='cephadm clear-ssh-config',
689 desc
='Clear the ssh_config file')
690 def _clear_ssh_config(self
) -> Tuple
[int, str, str]:
692 Clear the ssh_config file provided from stdin
694 self
.set_store("ssh_config", None)
695 self
.ssh_config_tmp
= None
696 self
.log
.info('Cleared ssh_config')
700 @orchestrator._cli
_read
_command
(
701 prefix
='cephadm get-ssh-config',
702 desc
='Returns the ssh config as used by cephadm'
704 def _get_ssh_config(self
) -> HandleCommandResult
:
705 if self
.ssh_config_file
:
706 self
.validate_ssh_config_fname(self
.ssh_config_file
)
707 with
open(self
.ssh_config_file
) as f
:
708 return HandleCommandResult(stdout
=f
.read())
709 ssh_config
= self
.get_store("ssh_config")
711 return HandleCommandResult(stdout
=ssh_config
)
712 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
714 @orchestrator._cli
_write
_command
(
715 'cephadm generate-key',
716 desc
='Generate a cluster SSH key (if not present)')
717 def _generate_key(self
) -> Tuple
[int, str, str]:
718 if not self
.ssh_pub
or not self
.ssh_key
:
719 self
.log
.info('Generating ssh key...')
720 tmp_dir
= TemporaryDirectory()
721 path
= tmp_dir
.name
+ '/key'
723 subprocess
.check_call([
724 '/usr/bin/ssh-keygen',
725 '-C', 'ceph-%s' % self
._cluster
_fsid
,
729 with
open(path
, 'r') as f
:
731 with
open(path
+ '.pub', 'r') as f
:
735 os
.unlink(path
+ '.pub')
737 self
.set_store('ssh_identity_key', secret
)
738 self
.set_store('ssh_identity_pub', pub
)
742 @orchestrator._cli
_write
_command
(
743 'cephadm set-priv-key',
744 desc
='Set cluster SSH private key (use -i <private_key>)')
745 def _set_priv_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
746 if inbuf
is None or len(inbuf
) == 0:
747 return -errno
.EINVAL
, "", "empty private ssh key provided"
748 if inbuf
== self
.ssh_key
:
749 return 0, "value unchanged", ""
750 self
.set_store("ssh_identity_key", inbuf
)
751 self
.log
.info('Set ssh private key')
755 @orchestrator._cli
_write
_command
(
756 'cephadm set-pub-key',
757 desc
='Set cluster SSH public key (use -i <public_key>)')
758 def _set_pub_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
759 if inbuf
is None or len(inbuf
) == 0:
760 return -errno
.EINVAL
, "", "empty public ssh key provided"
761 if inbuf
== self
.ssh_pub
:
762 return 0, "value unchanged", ""
763 self
.set_store("ssh_identity_pub", inbuf
)
764 self
.log
.info('Set ssh public key')
768 @orchestrator._cli
_write
_command
(
770 desc
='Clear cluster SSH key')
771 def _clear_key(self
) -> Tuple
[int, str, str]:
772 self
.set_store('ssh_identity_key', None)
773 self
.set_store('ssh_identity_pub', None)
775 self
.log
.info('Cleared cluster SSH key')
778 @orchestrator._cli
_read
_command
(
779 'cephadm get-pub-key',
780 desc
='Show SSH public key for connecting to cluster hosts')
781 def _get_pub_key(self
) -> Tuple
[int, str, str]:
783 return 0, self
.ssh_pub
, ''
785 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
787 @orchestrator._cli
_read
_command
(
789 desc
='Show user for SSHing to cluster hosts')
790 def _get_user(self
) -> Tuple
[int, str, str]:
791 return 0, self
.ssh_user
, ''
793 @orchestrator._cli
_read
_command
(
795 'name=user,type=CephString',
796 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
797 def set_ssh_user(self
, user
: str) -> Tuple
[int, str, str]:
798 current_user
= self
.ssh_user
799 if user
== current_user
:
800 return 0, "value unchanged", ""
802 self
.set_store('ssh_user', user
)
805 host
= self
.cache
.get_hosts()[0]
806 r
= CephadmServe(self
)._check
_host
(host
)
808 # connection failed reset user
809 self
.set_store('ssh_user', current_user
)
811 return -errno
.EINVAL
, '', 'ssh connection %s@%s failed' % (user
, host
)
813 msg
= 'ssh user set to %s' % user
815 msg
+= ' sudo will be used'
819 @orchestrator._cli
_read
_command
(
820 'cephadm registry-login',
821 "name=url,type=CephString,req=false "
822 "name=username,type=CephString,req=false "
823 "name=password,type=CephString,req=false",
824 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
825 def registry_login(self
, url
: Optional
[str] = None, username
: Optional
[str] = None, password
: Optional
[str] = None, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
826 # if password not given in command line, get it through file input
827 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
828 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
829 "or -i <login credentials json file>")
830 elif not (url
and username
and password
):
831 assert isinstance(inbuf
, str)
832 login_info
= json
.loads(inbuf
)
833 if "url" in login_info
and "username" in login_info
and "password" in login_info
:
834 url
= login_info
["url"]
835 username
= login_info
["username"]
836 password
= login_info
["password"]
838 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
839 "Please setup json file as\n"
841 " \"url\": \"REGISTRY_URL\",\n"
842 " \"username\": \"REGISTRY_USERNAME\",\n"
843 " \"password\": \"REGISTRY_PASSWORD\"\n"
845 # verify login info works by attempting login on random host
847 for host_name
in self
.inventory
.keys():
851 raise OrchestratorError('no hosts defined')
852 r
= self
._registry
_login
(host
, url
, username
, password
)
855 # if logins succeeded, store info
856 self
.log
.debug("Host logins successful. Storing login info.")
857 self
.set_module_option('registry_url', url
)
858 self
.set_module_option('registry_username', username
)
859 self
.set_module_option('registry_password', password
)
860 # distribute new login info to all hosts
861 self
.cache
.distribute_new_registry_login_info()
862 return 0, "registry login scheduled", ''
864 @orchestrator._cli
_read
_command
(
865 'cephadm check-host',
866 'name=host,type=CephString '
867 'name=addr,type=CephString,req=false',
868 'Check whether we can access and manage a remote host')
869 def check_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
871 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'check-host',
872 ['--expect-hostname', host
],
874 error_ok
=True, no_fsid
=True)
876 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
877 except OrchestratorError
as e
:
878 self
.log
.exception(f
"check-host failed for '{host}'")
879 return 1, '', ('check-host failed:\n' +
880 f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
881 # if we have an outstanding health alert for this host, give the
882 # serve thread a kick
883 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
884 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
885 if item
.startswith('host %s ' % host
):
887 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
889 @orchestrator._cli
_read
_command
(
890 'cephadm prepare-host',
891 'name=host,type=CephString '
892 'name=addr,type=CephString,req=false',
893 'Prepare a remote host for use with cephadm')
894 def _prepare_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
895 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'prepare-host',
896 ['--expect-hostname', host
],
898 error_ok
=True, no_fsid
=True)
900 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
901 # if we have an outstanding health alert for this host, give the
902 # serve thread a kick
903 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
904 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
905 if item
.startswith('host %s ' % host
):
907 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
909 @orchestrator._cli
_write
_command
(
910 prefix
='cephadm set-extra-ceph-conf',
911 desc
="Text that is appended to all daemon's ceph.conf.\n"
912 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
913 "a complete ceph.conf.\n\n"
914 "Warning: this is a dangerous operation.")
915 def _set_extra_ceph_conf(self
, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
919 cp
.read_string(inbuf
, source
='<infile>')
921 self
.set_store("extra_ceph_conf", json
.dumps({
923 'last_modified': datetime_to_str(datetime_now())
925 self
.log
.info('Set extra_ceph_conf')
926 self
._kick
_serve
_loop
()
927 return HandleCommandResult()
929 @orchestrator._cli
_read
_command
(
930 'cephadm get-extra-ceph-conf',
931 desc
='Get extra ceph conf that is appended')
932 def _get_extra_ceph_conf(self
) -> HandleCommandResult
:
933 return HandleCommandResult(stdout
=self
.extra_ceph_conf().conf
)
935 class ExtraCephConf(NamedTuple
):
937 last_modified
: Optional
[datetime
.datetime
]
939 def extra_ceph_conf(self
) -> 'CephadmOrchestrator.ExtraCephConf':
940 data
= self
.get_store('extra_ceph_conf')
942 return CephadmOrchestrator
.ExtraCephConf('', None)
946 msg
= 'Unable to load extra_ceph_conf: Cannot decode JSON'
947 self
.log
.exception('%s: \'%s\'', msg
, data
)
948 return CephadmOrchestrator
.ExtraCephConf('', None)
949 return CephadmOrchestrator
.ExtraCephConf(j
['conf'], str_to_datetime(j
['last_modified']))
951 def extra_ceph_conf_is_newer(self
, dt
: datetime
.datetime
) -> bool:
952 conf
= self
.extra_ceph_conf()
953 if not conf
.last_modified
:
955 return conf
.last_modified
> dt
957 def _get_connection(self
, host
: str) -> Tuple
['remoto.backends.BaseConnection',
958 'remoto.backends.LegacyModuleExecute']:
960 Setup a connection for running commands on remote host.
962 conn
, r
= self
._cons
.get(host
, (None, None))
964 if conn
.has_connection():
965 self
.log
.debug('Have connection to %s' % host
)
968 self
._reset
_con
(host
)
969 n
= self
.ssh_user
+ '@' + host
970 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
971 n
, self
._ssh
_options
))
972 child_logger
= self
.log
.getChild(n
)
973 child_logger
.setLevel('WARNING')
974 conn
= remoto
.Connection(
977 ssh_options
=self
._ssh
_options
,
978 sudo
=True if self
.ssh_user
!= 'root' else False)
980 r
= conn
.import_module(remotes
)
981 self
._cons
[host
] = conn
, r
985 def _executable_path(self
, conn
: 'remoto.backends.BaseConnection', executable
: str) -> str:
987 Remote validator that accepts a connection object to ensure that a certain
988 executable is available returning its full path if so.
990 Otherwise an exception with thorough details will be raised, informing the
991 user that the executable was not found.
993 executable_path
= conn
.remote_module
.which(executable
)
994 if not executable_path
:
995 raise RuntimeError("Executable '{}' not found on host '{}'".format(
996 executable
, conn
.hostname
))
997 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
999 return executable_path
1002 def _remote_connection(self
,
1004 addr
: Optional
[str] = None,
1005 ) -> Iterator
[Tuple
["BaseConnection", Any
]]:
1006 if not addr
and host
in self
.inventory
:
1007 addr
= self
.inventory
.get_addr(host
)
1009 self
.offline_hosts_remove(host
)
1014 raise OrchestratorError("host address is empty")
1015 conn
, connr
= self
._get
_connection
(addr
)
1016 except OSError as e
:
1017 self
._reset
_con
(host
)
1018 msg
= f
"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1019 raise execnet
.gateway_bootstrap
.HostNotFound(msg
)
1023 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1024 # this is a misleading exception as it seems to be thrown for
1025 # any sort of connection failure, even those having nothing to
1026 # do with "host not found" (e.g., ssh key permission denied).
1027 self
.offline_hosts
.add(host
)
1028 self
._reset
_con
(host
)
1030 user
= self
.ssh_user
if self
.mode
== 'root' else 'cephadm'
1031 if str(e
).startswith("Can't communicate"):
1034 msg
= f
'''Failed to connect to {host} ({addr}).
1035 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1037 To add the cephadm SSH key to the host:
1038 > ceph cephadm get-pub-key > ~/ceph.pub
1039 > ssh-copy-id -f -i ~/ceph.pub {user}@{host}
1041 To check that the host is reachable:
1042 > ceph cephadm get-ssh-config > ssh_config
1043 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1044 > chmod 0600 ~/cephadm_private_key
1045 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
1046 raise OrchestratorError(msg
) from e
1047 except Exception as ex
:
1048 self
.log
.exception(ex
)
1051 def _get_container_image(self
, daemon_name
: str) -> Optional
[str]:
1052 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1053 if daemon_type
in CEPH_TYPES
or \
1054 daemon_type
== 'nfs' or \
1055 daemon_type
== 'iscsi':
1056 # get container image
1057 ret
, image
, err
= self
.check_mon_command({
1058 'prefix': 'config get',
1059 'who': utils
.name_to_config_section(daemon_name
),
1060 'key': 'container_image',
1062 image
= image
.strip() # type: ignore
1063 elif daemon_type
== 'prometheus':
1064 image
= self
.container_image_prometheus
1065 elif daemon_type
== 'grafana':
1066 image
= self
.container_image_grafana
1067 elif daemon_type
== 'alertmanager':
1068 image
= self
.container_image_alertmanager
1069 elif daemon_type
== 'node-exporter':
1070 image
= self
.container_image_node_exporter
1071 elif daemon_type
== CustomContainerService
.TYPE
:
1072 # The image can't be resolved, the necessary information
1073 # is only available when a container is deployed (given
1077 assert False, daemon_type
1079 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1083 def _run_cephadm(self
,
1085 entity
: Union
[CephadmNoImage
, str],
1088 addr
: Optional
[str] = "",
1089 stdin
: Optional
[str] = "",
1090 no_fsid
: Optional
[bool] = False,
1091 error_ok
: Optional
[bool] = False,
1092 image
: Optional
[str] = "",
1093 env_vars
: Optional
[List
[str]] = None,
1094 ) -> Tuple
[List
[str], List
[str], int]:
1096 Run cephadm on the remote host with the given command + args
1098 :env_vars: in format -> [KEY=VALUE, ..]
1100 with self
._remote
_connection
(host
, addr
) as tpl
:
1102 assert image
or entity
1103 if not image
and entity
is not cephadmNoImage
:
1104 image
= self
._get
_container
_image
(entity
)
1110 for env_var_pair
in env_vars
:
1111 final_args
.extend(['--env', env_var_pair
])
1114 final_args
.extend(['--image', image
])
1116 if not self
.container_init
:
1117 final_args
+= ['--no-container-init']
1120 final_args
.append(command
)
1124 final_args
+= ['--fsid', self
._cluster
_fsid
]
1129 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1130 if self
.mode
== 'root':
1132 self
.log
.debug('stdin: %s' % stdin
)
1133 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
1135 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
1136 script
+= self
._cephadm
1137 python
= connr
.choose_python()
1140 'unable to find python on %s (tried %s in %s)' % (
1141 host
, remotes
.PYTHONS
, remotes
.PATH
))
1143 out
, err
, code
= remoto
.process
.check(
1146 stdin
=script
.encode('utf-8'))
1147 except RuntimeError as e
:
1148 self
._reset
_con
(host
)
1150 return [], [str(e
)], 1
1152 elif self
.mode
== 'cephadm-package':
1154 out
, err
, code
= remoto
.process
.check(
1156 ['sudo', '/usr/bin/cephadm'] + final_args
,
1158 except RuntimeError as e
:
1159 self
._reset
_con
(host
)
1161 return [], [str(e
)], 1
1164 assert False, 'unsupported mode'
1166 self
.log
.debug('code: %d' % code
)
1168 self
.log
.debug('out: %s' % '\n'.join(out
))
1170 self
.log
.debug('err: %s' % '\n'.join(err
))
1171 if code
and not error_ok
:
1172 raise OrchestratorError(
1173 'cephadm exited with an error code: %d, stderr:%s' % (
1174 code
, '\n'.join(err
)))
1175 return out
, err
, code
1177 def _hosts_with_daemon_inventory(self
) -> List
[HostSpec
]:
1179 Returns all hosts that went through _refresh_host_daemons().
1181 This mitigates a potential race, where new host was added *after*
1182 ``_refresh_host_daemons()`` was called, but *before*
1183 ``_apply_all_specs()`` was called. thus we end up with a hosts
1184 where daemons might be running, but we have not yet detected them.
1187 h
for h
in self
.inventory
.all_specs()
1188 if self
.cache
.host_had_daemon_refresh(h
.hostname
)
1191 def _add_host(self
, spec
):
1192 # type: (HostSpec) -> str
1194 Add a host to be managed by the orchestrator.
1196 :param host: host name
1198 assert_valid_host(spec
.hostname
)
1199 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, cephadmNoImage
, 'check-host',
1200 ['--expect-hostname', spec
.hostname
],
1202 error_ok
=True, no_fsid
=True)
1204 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1205 spec
.hostname
, spec
.addr
, err
))
1207 self
.inventory
.add_host(spec
)
1208 self
.cache
.prime_empty_host(spec
.hostname
)
1209 self
.offline_hosts_remove(spec
.hostname
)
1210 self
.event
.set() # refresh stray health check
1211 self
.log
.info('Added host %s' % spec
.hostname
)
1212 return "Added host '{}'".format(spec
.hostname
)
1215 def add_host(self
, spec
: HostSpec
) -> str:
1216 return self
._add
_host
(spec
)
1219 def remove_host(self
, host
):
1220 # type: (str) -> str
1222 Remove a host from orchestrator management.
1224 :param host: host name
1226 self
.inventory
.rm_host(host
)
1227 self
.cache
.rm_host(host
)
1228 self
._reset
_con
(host
)
1229 self
.event
.set() # refresh stray health check
1230 self
.log
.info('Removed host %s' % host
)
1231 return "Removed host '{}'".format(host
)
1234 def update_host_addr(self
, host
: str, addr
: str) -> str:
1235 self
.inventory
.set_addr(host
, addr
)
1236 self
._reset
_con
(host
)
1237 self
.event
.set() # refresh stray health check
1238 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1239 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1242 def get_hosts(self
):
1243 # type: () -> List[orchestrator.HostSpec]
1245 Return a list of hosts managed by the orchestrator.
1248 - skip async: manager reads from cache.
1250 return list(self
.inventory
.all_specs())
1253 def add_host_label(self
, host
: str, label
: str) -> str:
1254 self
.inventory
.add_label(host
, label
)
1255 self
.log
.info('Added label %s to host %s' % (label
, host
))
1256 return 'Added label %s to host %s' % (label
, host
)
1259 def remove_host_label(self
, host
: str, label
: str) -> str:
1260 self
.inventory
.rm_label(host
, label
)
1261 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1262 return 'Removed label %s from host %s' % (label
, host
)
1265 def host_ok_to_stop(self
, hostname
: str) -> str:
1266 if hostname
not in self
.cache
.get_hosts():
1267 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1269 daemons
= self
.cache
.get_daemons()
1270 daemon_map
= defaultdict(lambda: [])
1272 if dd
.hostname
== hostname
:
1273 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1275 for daemon_type
, daemon_ids
in daemon_map
.items():
1276 r
= self
.cephadm_services
[daemon_type
].ok_to_stop(daemon_ids
)
1278 self
.log
.error(f
'It is NOT safe to stop host {hostname}')
1279 raise orchestrator
.OrchestratorError(
1283 msg
= f
'It is presumed safe to stop host {hostname}'
1287 def get_minimal_ceph_conf(self
) -> str:
1288 _
, config
, _
= self
.check_mon_command({
1289 "prefix": "config generate-minimal-conf",
1291 extra
= self
.extra_ceph_conf().conf
1293 config
+= '\n\n' + extra
.strip() + '\n'
1296 def _invalidate_daemons_and_kick_serve(self
, filter_host
: Optional
[str] = None) -> None:
1298 self
.cache
.invalidate_host_daemons(filter_host
)
1300 for h
in self
.cache
.get_hosts():
1301 # Also discover daemons deployed manually
1302 self
.cache
.invalidate_host_daemons(h
)
1304 self
._kick
_serve
_loop
()
1307 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
1308 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
1310 self
._invalidate
_daemons
_and
_kick
_serve
()
1311 self
.log
.info('Kicked serve() loop to refresh all services')
1314 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
1316 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1317 for name
, dd
in dm
.items():
1318 if service_type
and service_type
!= dd
.daemon_type
:
1320 n
: str = dd
.service_name()
1321 if service_name
and service_name
!= n
:
1323 if dd
.daemon_type
== 'osd':
1325 OSDs do not know the affinity to their spec out of the box.
1327 n
= f
"osd.{dd.osdspec_affinity}"
1328 if not dd
.osdspec_affinity
:
1329 # If there is no osdspec_affinity, the spec should suffice for displaying
1331 if n
in self
.spec_store
.specs
:
1332 spec
= self
.spec_store
.specs
[n
]
1336 service_type
=dd
.daemon_type
,
1337 service_id
=dd
.service_id(),
1338 placement
=PlacementSpec(
1343 sm
[n
] = orchestrator
.ServiceDescription(
1344 last_refresh
=dd
.last_refresh
,
1345 container_image_id
=dd
.container_image_id
,
1346 container_image_name
=dd
.container_image_name
,
1348 events
=self
.events
.get_for_service(spec
.service_name()),
1350 if n
in self
.spec_store
.specs
:
1351 if dd
.daemon_type
== 'osd':
1353 The osd count can't be determined by the Placement spec.
1354 Showing an actual/expected representation cannot be determined
1355 here. So we're setting running = size for now.
1358 sm
[n
].size
= osd_count
1360 sm
[n
].size
= spec
.placement
.get_host_selection_size(
1361 self
.inventory
.all_specs())
1363 sm
[n
].created
= self
.spec_store
.spec_created
[n
]
1364 if service_type
== 'nfs':
1365 spec
= cast(NFSServiceSpec
, spec
)
1366 sm
[n
].rados_config_location
= spec
.rados_config_location()
1371 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1372 sm
[n
].last_refresh
= dd
.last_refresh
1373 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1374 sm
[n
].container_image_id
= 'mix'
1375 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1376 sm
[n
].container_image_name
= 'mix'
1377 for n
, spec
in self
.spec_store
.specs
.items():
1380 if service_type
is not None and service_type
!= spec
.service_type
:
1382 if service_name
is not None and service_name
!= n
:
1384 sm
[n
] = orchestrator
.ServiceDescription(
1386 size
=spec
.placement
.get_host_selection_size(self
.inventory
.all_specs()),
1388 events
=self
.events
.get_for_service(spec
.service_name()),
1390 if service_type
== 'nfs':
1391 spec
= cast(NFSServiceSpec
, spec
)
1392 sm
[n
].rados_config_location
= spec
.rados_config_location()
1393 return list(sm
.values())
1396 def list_daemons(self
,
1397 service_name
: Optional
[str] = None,
1398 daemon_type
: Optional
[str] = None,
1399 daemon_id
: Optional
[str] = None,
1400 host
: Optional
[str] = None,
1401 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
1403 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
1404 self
.log
.info('Kicked serve() loop to refresh all daemons')
1407 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1408 if host
and h
!= host
:
1410 for name
, dd
in dm
.items():
1411 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1413 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1415 if service_name
is not None and service_name
!= dd
.service_name():
1421 def service_action(self
, action
: str, service_name
: str) -> List
[str]:
1422 dds
: List
[DaemonDescription
] = self
.cache
.get_daemons_by_service(service_name
)
1423 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1425 self
._schedule
_daemon
_action
(dd
.name(), action
)
1429 def _daemon_action(self
, daemon_type
: str, daemon_id
: str, host
: str, action
: str, image
: Optional
[str] = None) -> str:
1430 daemon_spec
: CephadmDaemonSpec
= CephadmDaemonSpec(
1432 daemon_id
=daemon_id
,
1433 daemon_type
=daemon_type
,
1436 self
._daemon
_action
_set
_image
(action
, image
, daemon_type
, daemon_id
)
1438 if action
== 'redeploy':
1439 if self
.daemon_is_self(daemon_type
, daemon_id
):
1440 self
.mgr_service
.fail_over()
1441 return '' # unreachable
1442 # stop, recreate the container+unit, then restart
1443 return self
._create
_daemon
(daemon_spec
)
1444 elif action
== 'reconfig':
1445 return self
._create
_daemon
(daemon_spec
, reconfig
=True)
1448 'start': ['reset-failed', 'start'],
1450 'restart': ['reset-failed', 'restart'],
1452 name
= daemon_spec
.name()
1453 for a
in actions
[action
]:
1455 out
, err
, code
= self
._run
_cephadm
(
1457 ['--name', name
, a
])
1459 self
.log
.exception(f
'`{host}: cephadm unit {name} {a}` failed')
1460 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1461 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
1462 self
.events
.for_daemon(name
, 'INFO', msg
)
1465 def _daemon_action_set_image(self
, action
: str, image
: Optional
[str], daemon_type
: str, daemon_id
: str) -> None:
1466 if image
is not None:
1467 if action
!= 'redeploy':
1468 raise OrchestratorError(
1469 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1470 if daemon_type
not in CEPH_TYPES
:
1471 raise OrchestratorError(
1472 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1473 f
'types are: {", ".join(CEPH_TYPES)}')
1475 self
.check_mon_command({
1476 'prefix': 'config set',
1477 'name': 'container_image',
1479 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
1483 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> str:
1484 d
= self
.cache
.get_daemon(daemon_name
)
1486 if action
== 'redeploy' and self
.daemon_is_self(d
.daemon_type
, d
.daemon_id
) \
1487 and not self
.mgr_service
.mgr_map_has_standby():
1488 raise OrchestratorError(
1489 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1491 self
._daemon
_action
_set
_image
(action
, image
, d
.daemon_type
, d
.daemon_id
)
1493 self
.log
.info(f
'Schedule {action} daemon {daemon_name}')
1494 return self
._schedule
_daemon
_action
(daemon_name
, action
)
1496 def daemon_is_self(self
, daemon_type
: str, daemon_id
: str) -> bool:
1497 return daemon_type
== 'mgr' and daemon_id
== self
.get_mgr_id()
1499 def _schedule_daemon_action(self
, daemon_name
: str, action
: str) -> str:
1500 dd
= self
.cache
.get_daemon(daemon_name
)
1501 if action
== 'redeploy' and self
.daemon_is_self(dd
.daemon_type
, dd
.daemon_id
) \
1502 and not self
.mgr_service
.mgr_map_has_standby():
1503 raise OrchestratorError(
1504 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1505 self
.cache
.schedule_daemon_action(dd
.hostname
, dd
.name(), action
)
1506 msg
= "Scheduled to {} {} on host '{}'".format(action
, daemon_name
, dd
.hostname
)
1507 self
._kick
_serve
_loop
()
1511 def remove_daemons(self
, names
):
1512 # type: (List[str]) -> List[str]
1514 for host
, dm
in self
.cache
.daemons
.items():
1517 args
.append((name
, host
))
1519 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
1520 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
1521 return self
._remove
_daemons
(args
)
1524 def remove_service(self
, service_name
: str) -> str:
1525 self
.log
.info('Remove service %s' % service_name
)
1526 self
._trigger
_preview
_refresh
(service_name
=service_name
)
1527 found
= self
.spec_store
.rm(service_name
)
1529 self
._kick
_serve
_loop
()
1530 return 'Removed service %s' % service_name
1532 # must be idempotent: still a success.
1533 return f
'Failed to remove service. <{service_name}> was not found.'
1536 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
1538 Return the storage inventory of hosts matching the given filter.
1540 :param host_filter: host filter
1543 - add filtering by label
1546 if host_filter
and host_filter
.hosts
:
1547 for h
in host_filter
.hosts
:
1548 self
.cache
.invalidate_host_devices(h
)
1550 for h
in self
.cache
.get_hosts():
1551 self
.cache
.invalidate_host_devices(h
)
1554 self
.log
.info('Kicked serve() loop to refresh devices')
1557 for host
, dls
in self
.cache
.devices
.items():
1558 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
1560 result
.append(orchestrator
.InventoryHost(host
,
1561 inventory
.Devices(dls
)))
1565 def zap_device(self
, host
: str, path
: str) -> str:
1566 self
.log
.info('Zap device %s:%s' % (host
, path
))
1567 out
, err
, code
= self
._run
_cephadm
(
1568 host
, 'osd', 'ceph-volume',
1569 ['--', 'lvm', 'zap', '--destroy', path
],
1571 self
.cache
.invalidate_host_devices(host
)
1573 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
1574 return '\n'.join(out
+ err
)
1577 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
1579 Blink a device light. Calling something like::
1581 lsmcli local-disk-ident-led-on --path $path
1583 If you must, you can customize this via::
1585 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1586 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1588 See templates/blink_device_light_cmd.j2
1591 def blink(host
: str, dev
: str, path
: str) -> str:
1592 cmd_line
= self
.template
.render('blink_device_light_cmd.j2',
1595 'ident_fault': ident_fault
,
1600 cmd_args
= shlex
.split(cmd_line
)
1602 out
, err
, code
= self
._run
_cephadm
(
1603 host
, 'osd', 'shell', ['--'] + cmd_args
,
1606 raise OrchestratorError(
1607 'Unable to affect %s light for %s:%s. Command: %s' % (
1608 ident_fault
, host
, dev
, ' '.join(cmd_args
)))
1609 self
.log
.info('Set %s light for %s:%s %s' % (
1610 ident_fault
, host
, dev
, 'on' if on
else 'off'))
1611 return "Set %s light for %s:%s %s" % (
1612 ident_fault
, host
, dev
, 'on' if on
else 'off')
1616 def get_osd_uuid_map(self
, only_up
=False):
1617 # type: (bool) -> Dict[str, str]
1618 osd_map
= self
.get('osd_map')
1620 for o
in osd_map
['osds']:
1621 # only include OSDs that have ever started in this map. this way
1622 # an interrupted osd create can be repeated and succeed the second
1624 osd_id
= o
.get('osd')
1626 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1627 if not only_up
or (o
['up_from'] > 0):
1628 r
[str(osd_id
)] = o
.get('uuid', '')
1631 def _trigger_preview_refresh(self
,
1632 specs
: Optional
[List
[DriveGroupSpec
]] = None,
1633 service_name
: Optional
[str] = None,
1635 # Only trigger a refresh when a spec has changed
1639 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
1640 # the to-be-preview spec != the actual spec, this means we need to
1641 # trigger a refresh, if the spec has been removed (==None) we need to
1643 if not preview_spec
or spec
!= preview_spec
:
1644 trigger_specs
.append(spec
)
1646 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
1647 if not any(trigger_specs
):
1650 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
1651 for host
in refresh_hosts
:
1652 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
1653 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
1656 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
1658 Deprecated. Please use `apply()` instead.
1660 Keeping this around to be compapatible to mgr/dashboard
1662 return [self
._apply
(spec
) for spec
in specs
]
1665 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
1666 return self
.osd_service
.create_from_spec(drive_group
)
1668 def _preview_osdspecs(self
,
1669 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
1672 return {'n/a': [{'error': True,
1673 'message': 'No OSDSpec or matching hosts found.'}]}
1674 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
1675 if not matching_hosts
:
1676 return {'n/a': [{'error': True,
1677 'message': 'No OSDSpec or matching hosts found.'}]}
1678 # Is any host still loading previews or still in the queue to be previewed
1679 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
1680 if pending_hosts
or any(item
in self
.cache
.osdspec_previews_refresh_queue
for item
in matching_hosts
):
1681 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1682 return {'n/a': [{'error': True,
1683 'message': 'Preview data is being generated.. '
1684 'Please re-run this command in a bit.'}]}
1685 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1686 previews_for_specs
= {}
1687 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
1688 if host
not in matching_hosts
:
1691 for osd_report
in raw_reports
:
1692 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
1693 osd_reports
.append(osd_report
)
1694 previews_for_specs
.update({host
: osd_reports
})
1695 return previews_for_specs
1697 def _calc_daemon_deps(self
, daemon_type
: str, daemon_id
: str) -> List
[str]:
1699 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1700 'grafana': ['prometheus'],
1701 'alertmanager': ['mgr', 'alertmanager'],
1704 for dep_type
in need
.get(daemon_type
, []):
1705 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
1706 deps
.append(dd
.name())
1709 def _create_daemon(self
,
1710 daemon_spec
: CephadmDaemonSpec
,
1711 reconfig
: bool = False,
1712 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1715 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1716 daemon_type
=daemon_spec
.daemon_type
,
1717 daemon_id
=daemon_spec
.daemon_id
,
1718 hostname
=daemon_spec
.host
,
1719 ).service_id(), overwrite
=True):
1722 start_time
= datetime_now()
1723 ports
: List
[int] = daemon_spec
.ports
if daemon_spec
.ports
else []
1725 if daemon_spec
.daemon_type
== 'container':
1726 spec
: Optional
[CustomContainerSpec
] = daemon_spec
.spec
1728 # Exit here immediately because the required service
1729 # spec to create a daemon is not provided. This is only
1730 # provided when a service is applied via 'orch apply'
1732 msg
= "Failed to {} daemon {} on {}: Required " \
1733 "service specification not provided".format(
1734 'reconfigure' if reconfig
else 'deploy',
1735 daemon_spec
.name(), daemon_spec
.host
)
1740 ports
.extend(spec
.ports
)
1742 cephadm_config
, deps
= self
.cephadm_services
[daemon_spec
.daemon_type
].generate_config(
1745 # TCP port to open in the host firewall
1747 daemon_spec
.extra_args
.extend([
1748 '--tcp-ports', ' '.join(map(str, ports
))
1751 # osd deployments needs an --osd-uuid arg
1752 if daemon_spec
.daemon_type
== 'osd':
1753 if not osd_uuid_map
:
1754 osd_uuid_map
= self
.get_osd_uuid_map()
1755 osd_uuid
= osd_uuid_map
.get(daemon_spec
.daemon_id
)
1757 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec
.daemon_id
)
1758 daemon_spec
.extra_args
.extend(['--osd-fsid', osd_uuid
])
1761 daemon_spec
.extra_args
.append('--reconfig')
1762 if self
.allow_ptrace
:
1763 daemon_spec
.extra_args
.append('--allow-ptrace')
1765 if self
.cache
.host_needs_registry_login(daemon_spec
.host
) and self
.registry_url
:
1766 self
._registry
_login
(daemon_spec
.host
, self
.registry_url
,
1767 self
.registry_username
, self
.registry_password
)
1769 daemon_spec
.extra_args
.extend(['--config-json', '-'])
1771 self
.log
.info('%s daemon %s on %s' % (
1772 'Reconfiguring' if reconfig
else 'Deploying',
1773 daemon_spec
.name(), daemon_spec
.host
))
1775 out
, err
, code
= self
._run
_cephadm
(
1776 daemon_spec
.host
, daemon_spec
.name(), 'deploy',
1778 '--name', daemon_spec
.name(),
1779 ] + daemon_spec
.extra_args
,
1780 stdin
=json
.dumps(cephadm_config
),
1782 if not code
and daemon_spec
.host
in self
.cache
.daemons
:
1783 # prime cached service state with what we (should have)
1785 sd
= orchestrator
.DaemonDescription()
1786 sd
.daemon_type
= daemon_spec
.daemon_type
1787 sd
.daemon_id
= daemon_spec
.daemon_id
1788 sd
.hostname
= daemon_spec
.host
1790 sd
.status_desc
= 'starting'
1791 self
.cache
.add_daemon(daemon_spec
.host
, sd
)
1792 if daemon_spec
.daemon_type
in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1793 self
.requires_post_actions
.add(daemon_spec
.daemon_type
)
1794 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1795 self
.cache
.update_daemon_config_deps(
1796 daemon_spec
.host
, daemon_spec
.name(), deps
, start_time
)
1797 self
.cache
.save_host(daemon_spec
.host
)
1798 msg
= "{} {} on host '{}'".format(
1799 'Reconfigured' if reconfig
else 'Deployed', daemon_spec
.name(), daemon_spec
.host
)
1801 self
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.INFO
, msg
)
1803 what
= 'reconfigure' if reconfig
else 'deploy'
1804 self
.events
.for_daemon(
1805 daemon_spec
.name(), OrchestratorEvent
.ERROR
, f
'Failed to {what}: {err}')
1809 def _remove_daemons(self
, name
: str, host
: str) -> str:
1810 return self
._remove
_daemon
(name
, host
)
1812 def _remove_daemon(self
, name
: str, host
: str) -> str:
1816 (daemon_type
, daemon_id
) = name
.split('.', 1)
1817 daemon
= orchestrator
.DaemonDescription(
1818 daemon_type
=daemon_type
,
1819 daemon_id
=daemon_id
,
1822 with
set_exception_subject('service', daemon
.service_id(), overwrite
=True):
1824 self
.cephadm_services
[daemon_type
].pre_remove(daemon
)
1826 args
= ['--name', name
, '--force']
1827 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
1828 out
, err
, code
= self
._run
_cephadm
(
1829 host
, name
, 'rm-daemon', args
)
1831 # remove item from cache
1832 self
.cache
.rm_daemon(host
, name
)
1833 self
.cache
.invalidate_host_daemons(host
)
1835 self
.cephadm_services
[daemon_type
].post_remove(daemon
)
1837 return "Removed {} from host '{}'".format(name
, host
)
1839 def _check_pool_exists(self
, pool
: str, service_name
: str) -> None:
1840 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
1841 if not self
.rados
.pool_exists(pool
):
1842 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
1843 f
'service {service_name}')
1845 def _add_daemon(self
,
1848 create_func
: Callable
[..., CephadmDaemonSpec
],
1849 config_func
: Optional
[Callable
] = None) -> List
[str]:
1851 Add (and place) a daemon. Require explicit host placement. Do not
1852 schedule, and do not apply the related scheduling limitations.
1854 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
1855 if not spec
.placement
.hosts
:
1856 raise OrchestratorError('must specify host(s) to deploy on')
1857 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
1858 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
1859 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
1860 spec
.placement
.hosts
, count
,
1861 create_func
, config_func
)
1863 def _create_daemons(self
,
1866 daemons
: List
[DaemonDescription
],
1867 hosts
: List
[HostPlacementSpec
],
1869 create_func
: Callable
[..., CephadmDaemonSpec
],
1870 config_func
: Optional
[Callable
] = None) -> List
[str]:
1871 if count
> len(hosts
):
1872 raise OrchestratorError('too few hosts: want %d, have %s' % (
1877 args
= [] # type: List[CephadmDaemonSpec]
1878 for host
, network
, name
in hosts
:
1879 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
1880 prefix
=spec
.service_id
,
1883 if not did_config
and config_func
:
1884 if daemon_type
== 'rgw':
1885 config_func(spec
, daemon_id
)
1890 daemon_spec
= self
.cephadm_services
[daemon_type
].make_daemon_spec(
1891 host
, daemon_id
, network
, spec
)
1892 self
.log
.debug('Placing %s.%s on host %s' % (
1893 daemon_type
, daemon_id
, host
))
1894 args
.append(daemon_spec
)
1896 # add to daemon list so next name(s) will also be unique
1897 sd
= orchestrator
.DaemonDescription(
1899 daemon_type
=daemon_type
,
1900 daemon_id
=daemon_id
,
1905 def create_func_map(*args
: Any
) -> str:
1906 daemon_spec
= create_func(*args
)
1907 return self
._create
_daemon
(daemon_spec
)
1909 return create_func_map(args
)
1912 def apply_mon(self
, spec
: ServiceSpec
) -> str:
1913 return self
._apply
(spec
)
1916 def add_mon(self
, spec
):
1917 # type: (ServiceSpec) -> List[str]
1918 return self
._add
_daemon
('mon', spec
, self
.mon_service
.prepare_create
)
1921 def add_mgr(self
, spec
):
1922 # type: (ServiceSpec) -> List[str]
1923 return self
._add
_daemon
('mgr', spec
, self
.mgr_service
.prepare_create
)
1925 def _apply(self
, spec
: GenericSpec
) -> str:
1926 if spec
.service_type
== 'host':
1927 return self
._add
_host
(cast(HostSpec
, spec
))
1929 if spec
.service_type
== 'osd':
1930 # _trigger preview refresh needs to be smart and
1931 # should only refresh if a change has been detected
1932 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
1934 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
1936 def _plan(self
, spec
: ServiceSpec
) -> dict:
1937 if spec
.service_type
== 'osd':
1938 return {'service_name': spec
.service_name(),
1939 'service_type': spec
.service_type
,
1940 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
1942 ha
= HostAssignment(
1944 hosts
=self
._hosts
_with
_daemon
_inventory
(),
1945 get_daemons_func
=self
.cache
.get_daemons_by_service
,
1950 add_daemon_hosts
= ha
.add_daemon_hosts(hosts
)
1951 remove_daemon_hosts
= ha
.remove_daemon_hosts(hosts
)
1954 'service_name': spec
.service_name(),
1955 'service_type': spec
.service_type
,
1956 'add': [hs
.hostname
for hs
in add_daemon_hosts
],
1957 'remove': [d
.hostname
for d
in remove_daemon_hosts
]
1961 def plan(self
, specs
: List
[GenericSpec
]) -> List
:
1962 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1963 'to the current inventory setup. If any on these conditions changes, the \n'
1964 'preview will be invalid. Please make sure to have a minimal \n'
1965 'timeframe between planning and applying the specs.'}]
1966 if any([spec
.service_type
== 'host' for spec
in specs
]):
1967 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1969 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
1972 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
1973 if spec
.placement
.is_empty():
1974 # fill in default placement
1976 'mon': PlacementSpec(count
=5),
1977 'mgr': PlacementSpec(count
=2),
1978 'mds': PlacementSpec(count
=2),
1979 'rgw': PlacementSpec(count
=2),
1980 'iscsi': PlacementSpec(count
=1),
1981 'rbd-mirror': PlacementSpec(count
=2),
1982 'nfs': PlacementSpec(count
=1),
1983 'grafana': PlacementSpec(count
=1),
1984 'alertmanager': PlacementSpec(count
=1),
1985 'prometheus': PlacementSpec(count
=1),
1986 'node-exporter': PlacementSpec(host_pattern
='*'),
1987 'crash': PlacementSpec(host_pattern
='*'),
1988 'container': PlacementSpec(count
=1),
1990 spec
.placement
= defaults
[spec
.service_type
]
1991 elif spec
.service_type
in ['mon', 'mgr'] and \
1992 spec
.placement
.count
is not None and \
1993 spec
.placement
.count
< 1:
1994 raise OrchestratorError('cannot scale %s service below 1' % (
1999 hosts
=self
.inventory
.all_specs(), # All hosts, even those without daemon refresh
2000 get_daemons_func
=self
.cache
.get_daemons_by_service
,
2003 self
.log
.info('Saving service %s spec with placement %s' % (
2004 spec
.service_name(), spec
.placement
.pretty_str()))
2005 self
.spec_store
.save(spec
)
2006 self
._kick
_serve
_loop
()
2007 return "Scheduled %s update..." % spec
.service_name()
2010 def apply(self
, specs
: List
[GenericSpec
]) -> List
[str]:
2013 results
.append(self
._apply
(spec
))
2017 def apply_mgr(self
, spec
: ServiceSpec
) -> str:
2018 return self
._apply
(spec
)
2021 def add_mds(self
, spec
: ServiceSpec
) -> List
[str]:
2022 return self
._add
_daemon
('mds', spec
, self
.mds_service
.prepare_create
, self
.mds_service
.config
)
2025 def apply_mds(self
, spec
: ServiceSpec
) -> str:
2026 return self
._apply
(spec
)
2029 def add_rgw(self
, spec
: ServiceSpec
) -> List
[str]:
2030 return self
._add
_daemon
('rgw', spec
, self
.rgw_service
.prepare_create
, self
.rgw_service
.config
)
2033 def apply_rgw(self
, spec
: ServiceSpec
) -> str:
2034 return self
._apply
(spec
)
2037 def add_iscsi(self
, spec
):
2038 # type: (ServiceSpec) -> List[str]
2039 return self
._add
_daemon
('iscsi', spec
, self
.iscsi_service
.prepare_create
, self
.iscsi_service
.config
)
2042 def apply_iscsi(self
, spec
: ServiceSpec
) -> str:
2043 return self
._apply
(spec
)
2046 def add_rbd_mirror(self
, spec
: ServiceSpec
) -> List
[str]:
2047 return self
._add
_daemon
('rbd-mirror', spec
, self
.rbd_mirror_service
.prepare_create
)
2050 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> str:
2051 return self
._apply
(spec
)
2054 def add_nfs(self
, spec
: ServiceSpec
) -> List
[str]:
2055 return self
._add
_daemon
('nfs', spec
, self
.nfs_service
.prepare_create
, self
.nfs_service
.config
)
2058 def apply_nfs(self
, spec
: ServiceSpec
) -> str:
2059 return self
._apply
(spec
)
2061 def _get_dashboard_url(self
):
2063 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
2066 def add_prometheus(self
, spec
: ServiceSpec
) -> List
[str]:
2067 return self
._add
_daemon
('prometheus', spec
, self
.prometheus_service
.prepare_create
)
2070 def apply_prometheus(self
, spec
: ServiceSpec
) -> str:
2071 return self
._apply
(spec
)
2074 def add_node_exporter(self
, spec
):
2075 # type: (ServiceSpec) -> List[str]
2076 return self
._add
_daemon
('node-exporter', spec
,
2077 self
.node_exporter_service
.prepare_create
)
2080 def apply_node_exporter(self
, spec
: ServiceSpec
) -> str:
2081 return self
._apply
(spec
)
2084 def add_crash(self
, spec
):
2085 # type: (ServiceSpec) -> List[str]
2086 return self
._add
_daemon
('crash', spec
,
2087 self
.crash_service
.prepare_create
)
2090 def apply_crash(self
, spec
: ServiceSpec
) -> str:
2091 return self
._apply
(spec
)
2094 def add_grafana(self
, spec
):
2095 # type: (ServiceSpec) -> List[str]
2096 return self
._add
_daemon
('grafana', spec
, self
.grafana_service
.prepare_create
)
2099 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
2100 return self
._apply
(spec
)
2103 def add_alertmanager(self
, spec
):
2104 # type: (ServiceSpec) -> List[str]
2105 return self
._add
_daemon
('alertmanager', spec
, self
.alertmanager_service
.prepare_create
)
2108 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
2109 return self
._apply
(spec
)
2112 def add_container(self
, spec
: ServiceSpec
) -> List
[str]:
2113 return self
._add
_daemon
('container', spec
,
2114 self
.container_service
.prepare_create
)
2117 def apply_container(self
, spec
: ServiceSpec
) -> str:
2118 return self
._apply
(spec
)
2120 def _get_container_image_info(self
, image_name
: str) -> ContainerInspectInfo
:
2121 # pick a random host...
2123 for host_name
in self
.inventory
.keys():
2127 raise OrchestratorError('no hosts defined')
2128 if self
.cache
.host_needs_registry_login(host
) and self
.registry_url
:
2129 self
._registry
_login
(host
, self
.registry_url
,
2130 self
.registry_username
, self
.registry_password
)
2131 out
, err
, code
= self
._run
_cephadm
(
2132 host
, '', 'pull', [],
2137 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2138 image_name
, host
, '\n'.join(out
)))
2140 j
= json
.loads('\n'.join(out
))
2141 r
= ContainerInspectInfo(
2143 j
.get('ceph_version'),
2144 j
.get('repo_digest')
2146 self
.log
.debug(f
'image {image_name} -> {r}')
2148 except (ValueError, KeyError) as _
:
2149 msg
= 'Failed to pull %s on %s: Cannot decode JSON' % (image_name
, host
)
2150 self
.log
.exception('%s: \'%s\'' % (msg
, '\n'.join(out
)))
2151 raise OrchestratorError(msg
)
2154 def upgrade_check(self
, image
: str, version
: str) -> str:
2156 target_name
= self
.container_image_base
+ ':v' + version
2160 raise OrchestratorError('must specify either image or version')
2162 image_info
= self
._get
_container
_image
_info
(target_name
)
2163 self
.log
.debug(f
'image info {image} -> {image_info}')
2165 'target_name': target_name
,
2166 'target_id': image_info
.image_id
,
2167 'target_version': image_info
.ceph_version
,
2168 'needs_update': dict(),
2169 'up_to_date': list(),
2171 for host
, dm
in self
.cache
.daemons
.items():
2172 for name
, dd
in dm
.items():
2173 if image_info
.image_id
== dd
.container_image_id
:
2174 r
['up_to_date'].append(dd
.name())
2176 r
['needs_update'][dd
.name()] = {
2177 'current_name': dd
.container_image_name
,
2178 'current_id': dd
.container_image_id
,
2179 'current_version': dd
.version
,
2181 if self
.use_repo_digest
:
2182 r
['target_digest'] = image_info
.repo_digest
2184 return json
.dumps(r
, indent
=4, sort_keys
=True)
2187 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
2188 return self
.upgrade
.upgrade_status()
2191 def upgrade_start(self
, image
: str, version
: str) -> str:
2192 return self
.upgrade
.upgrade_start(image
, version
)
2195 def upgrade_pause(self
) -> str:
2196 return self
.upgrade
.upgrade_pause()
2199 def upgrade_resume(self
) -> str:
2200 return self
.upgrade
.upgrade_resume()
2203 def upgrade_stop(self
) -> str:
2204 return self
.upgrade
.upgrade_stop()
2207 def remove_osds(self
, osd_ids
: List
[str],
2208 replace
: bool = False,
2209 force
: bool = False) -> str:
2211 Takes a list of OSDs and schedules them for removal.
2212 The function that takes care of the actual removal is
2213 process_removal_queue().
2216 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
2217 to_remove_daemons
= list()
2218 for daemon
in daemons
:
2219 if daemon
.daemon_id
in osd_ids
:
2220 to_remove_daemons
.append(daemon
)
2221 if not to_remove_daemons
:
2222 return f
"Unable to find OSDs: {osd_ids}"
2224 for daemon
in to_remove_daemons
:
2226 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
2229 hostname
=daemon
.hostname
,
2230 fullname
=daemon
.name(),
2231 process_started_at
=datetime_now(),
2232 remove_util
=self
.to_remove_osds
.rm_util
))
2233 except NotFoundError
:
2234 return f
"Unable to find OSDs: {osd_ids}"
2236 # trigger the serve loop to initiate the removal
2237 self
._kick
_serve
_loop
()
2238 return "Scheduled OSD(s) for removal"
2241 def stop_remove_osds(self
, osd_ids
: List
[str]) -> str:
2243 Stops a `removal` process for a List of OSDs.
2244 This will revert their weight and remove it from the osds_to_remove queue
2246 for osd_id
in osd_ids
:
2248 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
2249 remove_util
=self
.to_remove_osds
.rm_util
))
2250 except (NotFoundError
, KeyError):
2251 return f
'Unable to find OSD in the queue: {osd_id}'
2253 # trigger the serve loop to halt the removal
2254 self
._kick
_serve
_loop
()
2255 return "Stopped OSD(s) removal"
2258 def remove_osds_status(self
) -> List
[OSD
]:
2260 The CLI call to retrieve an osd removal report
2262 return self
.to_remove_osds
.all_osds()