6 from collections
import defaultdict
7 from configparser
import ConfigParser
8 from contextlib
import contextmanager
9 from functools
import wraps
10 from tempfile
import TemporaryDirectory
11 from threading
import Event
14 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
15 Any
, Set
, TYPE_CHECKING
, cast
, Iterator
, Union
, NamedTuple
22 import multiprocessing
.pool
25 from ceph
.deployment
import inventory
26 from ceph
.deployment
.drive_group
import DriveGroupSpec
27 from ceph
.deployment
.service_spec
import \
28 NFSServiceSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
, \
29 CustomContainerSpec
, HostPlacementSpec
30 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
31 from cephadm
.serve
import CephadmServe
32 from cephadm
.services
.cephadmservice
import CephadmDaemonSpec
34 from mgr_module
import MgrModule
, HandleCommandResult
36 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
37 CLICommandMeta
, OrchestratorEvent
, set_exception_subject
, DaemonDescription
38 from orchestrator
._interface
import GenericSpec
42 from .migrations
import Migrations
43 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
44 RbdMirrorService
, CrashService
, CephadmService
45 from .services
.container
import CustomContainerService
46 from .services
.iscsi
import IscsiService
47 from .services
.nfs
import NFSService
48 from .services
.osd
import RemoveUtil
, OSDRemovalQueue
, OSDService
, OSD
, NotFoundError
49 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
51 from .schedule
import HostAssignment
52 from .inventory
import Inventory
, SpecStore
, HostCache
, EventStore
53 from .upgrade
import CEPH_UPGRADE_ORDER
, CephadmUpgrade
54 from .template
import TemplateMgr
55 from .utils
import forall_hosts
, CephadmNoImage
, cephadmNoImage
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils
.version
import StrictVersion
62 if StrictVersion(remoto
.__version
__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self
: Any
) -> bool:
64 return self
.gateway
.hasreceiver()
66 from remoto
.backends
import BaseConnection
67 BaseConnection
.has_connection
= remoto_has_connection
69 import execnet
.gateway_bootstrap
70 except ImportError as e
:
72 remoto_import_error
= str(e
)
75 from typing
import List
79 logger
= logging
.getLogger(__name__
)
83 DEFAULT_SSH_CONFIG
= """
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
91 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
94 class CephadmCompletion(orchestrator
.Completion
[T
]):
95 def evaluate(self
) -> None:
99 def trivial_completion(f
: Callable
[..., T
]) -> Callable
[..., CephadmCompletion
[T
]]:
101 Decorator to make CephadmCompletion methods return
102 a completion object that executes themselves.
106 def wrapper(*args
: Any
, **kwargs
: Any
) -> CephadmCompletion
:
107 return CephadmCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
112 class ContainerInspectInfo(NamedTuple
):
114 ceph_version
: Optional
[str]
115 repo_digest
: Optional
[str]
118 @six.add_metaclass(CLICommandMeta
)
119 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
121 _STORE_HOST_PREFIX
= "host"
124 NATIVE_OPTIONS
= [] # type: List[Any]
125 MODULE_OPTIONS
: List
[dict] = [
127 'name': 'ssh_config_file',
130 'desc': 'customized SSH config file to connect to managed hosts',
133 'name': 'device_cache_timeout',
136 'desc': 'seconds to cache device inventory',
139 'name': 'daemon_cache_timeout',
142 'desc': 'seconds to cache service (daemon) inventory',
145 'name': 'facts_cache_timeout',
148 'desc': 'seconds to cache host facts data',
151 'name': 'host_check_interval',
154 'desc': 'how frequently to perform a host check',
159 'enum_allowed': ['root', 'cephadm-package'],
161 'desc': 'mode for remote execution of cephadm',
164 'name': 'container_image_base',
165 'default': 'docker.io/ceph/ceph',
166 'desc': 'Container image name, without the tag',
170 'name': 'container_image_prometheus',
171 'default': 'docker.io/prom/prometheus:v2.18.1',
172 'desc': 'Prometheus container image',
175 'name': 'container_image_grafana',
176 'default': 'docker.io/ceph/ceph-grafana:6.6.2',
177 'desc': 'Prometheus container image',
180 'name': 'container_image_alertmanager',
181 'default': 'docker.io/prom/alertmanager:v0.20.0',
182 'desc': 'Prometheus container image',
185 'name': 'container_image_node_exporter',
186 'default': 'docker.io/prom/node-exporter:v0.18.1',
187 'desc': 'Prometheus container image',
190 'name': 'warn_on_stray_hosts',
193 'desc': 'raise a health warning if daemons are detected on a host '
194 'that is not managed by cephadm',
197 'name': 'warn_on_stray_daemons',
200 'desc': 'raise a health warning if daemons are detected '
201 'that are not managed by cephadm',
204 'name': 'warn_on_failed_host_check',
207 'desc': 'raise a health warning if the host check fails',
210 'name': 'log_to_cluster',
213 'desc': 'log to the "cephadm" cluster log channel"',
216 'name': 'allow_ptrace',
219 'desc': 'allow SYS_PTRACE capability on ceph containers',
220 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
221 'process with gdb or strace. Enabling this options '
222 'can allow debugging daemons that encounter problems '
226 'name': 'container_init',
229 'desc': 'Run podman/docker with `--init`',
232 'name': 'prometheus_alerts_path',
234 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
235 'desc': 'location of alerts to include in prometheus deployments',
238 'name': 'migration_current',
241 'desc': 'internal - do not modify',
242 # used to track track spec and other data migrations.
245 'name': 'config_dashboard',
248 'desc': 'manage configs like API endpoints in Dashboard.'
251 'name': 'manage_etc_ceph_ceph_conf',
254 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
257 'name': 'registry_url',
260 'desc': 'Custom repository url'
263 'name': 'registry_username',
266 'desc': 'Custom repository username'
269 'name': 'registry_password',
272 'desc': 'Custom repository password'
275 'name': 'use_repo_digest',
278 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
282 def __init__(self
, *args
: Any
, **kwargs
: Any
):
283 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
284 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
285 self
.last_monmap
: Optional
[datetime
.datetime
] = None
291 if self
.get_store('pause'):
296 # for mypy which does not run the code
298 self
.ssh_config_file
= None # type: Optional[str]
299 self
.device_cache_timeout
= 0
300 self
.daemon_cache_timeout
= 0
301 self
.facts_cache_timeout
= 0
302 self
.host_check_interval
= 0
304 self
.container_image_base
= ''
305 self
.container_image_prometheus
= ''
306 self
.container_image_grafana
= ''
307 self
.container_image_alertmanager
= ''
308 self
.container_image_node_exporter
= ''
309 self
.warn_on_stray_hosts
= True
310 self
.warn_on_stray_daemons
= True
311 self
.warn_on_failed_host_check
= True
312 self
.allow_ptrace
= False
313 self
.container_init
= False
314 self
.prometheus_alerts_path
= ''
315 self
.migration_current
: Optional
[int] = None
316 self
.config_dashboard
= True
317 self
.manage_etc_ceph_ceph_conf
= True
318 self
.registry_url
: Optional
[str] = None
319 self
.registry_username
: Optional
[str] = None
320 self
.registry_password
: Optional
[str] = None
321 self
.use_repo_digest
= False
323 self
._cons
: Dict
[str, Tuple
[remoto
.backends
.BaseConnection
,
324 remoto
.backends
.LegacyModuleExecute
]] = {}
326 self
.notify('mon_map', None)
329 path
= self
.get_ceph_option('cephadm_path')
331 with
open(path
, 'r') as f
:
332 self
._cephadm
= f
.read()
333 except (IOError, TypeError) as e
:
334 raise RuntimeError("unable to read cephadm at '%s': %s" % (
337 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
341 CephadmOrchestrator
.instance
= self
343 self
.upgrade
= CephadmUpgrade(self
)
345 self
.health_checks
: Dict
[str, dict] = {}
347 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
349 self
.inventory
= Inventory(self
)
351 self
.cache
= HostCache(self
)
354 self
.to_remove_osds
= OSDRemovalQueue(self
)
355 self
.to_remove_osds
.load_from_store()
357 self
.spec_store
= SpecStore(self
)
358 self
.spec_store
.load()
360 # ensure the host lists are in sync
361 for h
in self
.inventory
.keys():
362 if h
not in self
.cache
.daemons
:
363 self
.cache
.prime_empty_host(h
)
364 for h
in self
.cache
.get_hosts():
365 if h
not in self
.inventory
:
366 self
.cache
.rm_host(h
)
369 self
.events
= EventStore(self
)
370 self
.offline_hosts
: Set
[str] = set()
372 self
.migration
= Migrations(self
)
375 self
.osd_service
= OSDService(self
)
376 self
.nfs_service
= NFSService(self
)
377 self
.mon_service
= MonService(self
)
378 self
.mgr_service
= MgrService(self
)
379 self
.mds_service
= MdsService(self
)
380 self
.rgw_service
= RgwService(self
)
381 self
.rbd_mirror_service
= RbdMirrorService(self
)
382 self
.grafana_service
= GrafanaService(self
)
383 self
.alertmanager_service
= AlertmanagerService(self
)
384 self
.prometheus_service
= PrometheusService(self
)
385 self
.node_exporter_service
= NodeExporterService(self
)
386 self
.crash_service
= CrashService(self
)
387 self
.iscsi_service
= IscsiService(self
)
388 self
.container_service
= CustomContainerService(self
)
389 self
.cephadm_services
= {
390 'mon': self
.mon_service
,
391 'mgr': self
.mgr_service
,
392 'osd': self
.osd_service
,
393 'mds': self
.mds_service
,
394 'rgw': self
.rgw_service
,
395 'rbd-mirror': self
.rbd_mirror_service
,
396 'nfs': self
.nfs_service
,
397 'grafana': self
.grafana_service
,
398 'alertmanager': self
.alertmanager_service
,
399 'prometheus': self
.prometheus_service
,
400 'node-exporter': self
.node_exporter_service
,
401 'crash': self
.crash_service
,
402 'iscsi': self
.iscsi_service
,
403 'container': self
.container_service
,
406 self
.template
= TemplateMgr(self
)
408 self
.requires_post_actions
: Set
[str] = set()
410 def shutdown(self
) -> None:
411 self
.log
.debug('shutdown')
412 self
._worker
_pool
.close()
413 self
._worker
_pool
.join()
417 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
418 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
419 return self
.cephadm_services
[service_type
]
421 def _kick_serve_loop(self
) -> None:
422 self
.log
.debug('_kick_serve_loop')
425 # function responsible for logging single host into custom registry
426 def _registry_login(self
, host
: str, url
: Optional
[str], username
: Optional
[str], password
: Optional
[str]) -> Optional
[str]:
427 self
.log
.debug(f
"Attempting to log host {host} into custom registry @ {url}")
428 # want to pass info over stdin rather than through normal list of args
429 args_str
= json
.dumps({
431 'username': username
,
432 'password': password
,
434 out
, err
, code
= self
._run
_cephadm
(
435 host
, 'mon', 'registry-login',
436 ['--registry-json', '-'], stdin
=args_str
, error_ok
=True)
438 return f
"Host {host} failed to login to {url} as {username} with given password"
441 def serve(self
) -> None:
443 The main loop of cephadm.
445 A command handler will typically change the declarative state
446 of cephadm. This loop will then attempt to apply this new state.
448 serve
= CephadmServe(self
)
451 def set_container_image(self
, entity
: str, image
: str) -> None:
452 self
.check_mon_command({
453 'prefix': 'config set',
454 'name': 'container_image',
459 def config_notify(self
) -> None:
461 This method is called whenever one of our config options is changed.
463 TODO: this method should be moved into mgr_module.py
465 for opt
in self
.MODULE_OPTIONS
:
467 opt
['name'], # type: ignore
468 self
.get_module_option(opt
['name'])) # type: ignore
469 self
.log
.debug(' mgr option %s = %s',
470 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
471 for opt
in self
.NATIVE_OPTIONS
:
474 self
.get_ceph_option(opt
))
475 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
479 def notify(self
, notify_type
: str, notify_id
: Optional
[str]) -> None:
480 if notify_type
== "mon_map":
481 # get monmap mtime so we can refresh configs when mons change
482 monmap
= self
.get('mon_map')
483 self
.last_monmap
= str_to_datetime(monmap
['modified'])
484 if self
.last_monmap
and self
.last_monmap
> datetime_now():
485 self
.last_monmap
= None # just in case clocks are skewed
486 if getattr(self
, 'manage_etc_ceph_ceph_conf', False):
487 # getattr, due to notify() being called before config_notify()
488 self
._kick
_serve
_loop
()
489 if notify_type
== "pg_summary":
490 self
._trigger
_osd
_removal
()
492 def _trigger_osd_removal(self
) -> None:
493 data
= self
.get("osd_stats")
494 for osd
in data
.get('osd_stats', []):
495 if osd
.get('num_pgs') == 0:
496 # if _ANY_ osd that is currently in the queue appears to be empty,
497 # start the removal process
498 if int(osd
.get('osd')) in self
.to_remove_osds
.as_osd_ids():
499 self
.log
.debug(f
"Found empty osd. Starting removal process")
500 # if the osd that is now empty is also part of the removal queue
502 self
._kick
_serve
_loop
()
504 def pause(self
) -> None:
506 self
.log
.info('Paused')
507 self
.set_store('pause', 'true')
509 # wake loop so we update the health status
510 self
._kick
_serve
_loop
()
512 def resume(self
) -> None:
514 self
.log
.info('Resumed')
516 self
.set_store('pause', None)
517 # unconditionally wake loop so that 'orch resume' can be used to kick
519 self
._kick
_serve
_loop
()
521 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
523 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
525 Generate a unique random service name
527 suffix
= daemon_type
not in [
528 'mon', 'crash', 'nfs',
529 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
533 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
534 raise orchestrator
.OrchestratorValidationError(
535 f
'name {daemon_type}.{forcename} already in use')
539 host
= host
.split('.')[0]
547 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
549 if len([d
for d
in existing
if d
.daemon_id
== name
]):
551 raise orchestrator
.OrchestratorValidationError(
552 f
'name {daemon_type}.{name} already in use')
553 self
.log
.debug('name %s exists, trying again', name
)
557 def _reconfig_ssh(self
) -> None:
558 temp_files
= [] # type: list
559 ssh_options
= [] # type: List[str]
562 ssh_config_fname
= self
.ssh_config_file
563 ssh_config
= self
.get_store("ssh_config")
564 if ssh_config
is not None or ssh_config_fname
is None:
566 ssh_config
= DEFAULT_SSH_CONFIG
567 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
568 os
.fchmod(f
.fileno(), 0o600)
569 f
.write(ssh_config
.encode('utf-8'))
570 f
.flush() # make visible to other processes
572 ssh_config_fname
= f
.name
574 self
.validate_ssh_config_fname(ssh_config_fname
)
575 ssh_options
+= ['-F', ssh_config_fname
]
576 self
.ssh_config
= ssh_config
579 ssh_key
= self
.get_store("ssh_identity_key")
580 ssh_pub
= self
.get_store("ssh_identity_pub")
581 self
.ssh_pub
= ssh_pub
582 self
.ssh_key
= ssh_key
583 if ssh_key
and ssh_pub
:
584 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
585 tkey
.write(ssh_key
.encode('utf-8'))
586 os
.fchmod(tkey
.fileno(), 0o600)
587 tkey
.flush() # make visible to other processes
588 tpub
= open(tkey
.name
+ '.pub', 'w')
589 os
.fchmod(tpub
.fileno(), 0o600)
591 tpub
.flush() # make visible to other processes
592 temp_files
+= [tkey
, tpub
]
593 ssh_options
+= ['-i', tkey
.name
]
595 self
._temp
_files
= temp_files
597 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
599 self
._ssh
_options
= None
601 if self
.mode
== 'root':
602 self
.ssh_user
= self
.get_store('ssh_user', default
='root')
603 elif self
.mode
== 'cephadm-package':
604 self
.ssh_user
= 'cephadm'
608 def validate_ssh_config_content(self
, ssh_config
: Optional
[str]) -> None:
609 if ssh_config
is None or len(ssh_config
.strip()) == 0:
610 raise OrchestratorValidationError('ssh_config cannot be empty')
611 # StrictHostKeyChecking is [yes|no] ?
612 l
= re
.findall(r
'StrictHostKeyChecking\s+.*', ssh_config
)
614 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
616 if 'ask' in s
.lower():
617 raise OrchestratorValidationError(f
'ssh_config cannot contain: \'{s}\'')
619 def validate_ssh_config_fname(self
, ssh_config_fname
: str) -> None:
620 if not os
.path
.isfile(ssh_config_fname
):
621 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
624 def _reset_con(self
, host
: str) -> None:
625 conn
, r
= self
._cons
.get(host
, (None, None))
627 self
.log
.debug('_reset_con close %s' % host
)
631 def _reset_cons(self
) -> None:
632 for host
, conn_and_r
in self
._cons
.items():
633 self
.log
.debug('_reset_cons close %s' % host
)
638 def offline_hosts_remove(self
, host
: str) -> None:
639 if host
in self
.offline_hosts
:
640 self
.offline_hosts
.remove(host
)
643 def can_run() -> Tuple
[bool, str]:
644 if remoto
is not None:
647 return False, "loading remoto library:{}".format(
650 def available(self
) -> Tuple
[bool, str]:
652 The cephadm orchestrator is always available.
654 ok
, err
= self
.can_run()
657 if not self
.ssh_key
or not self
.ssh_pub
:
658 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
661 def process(self
, completions
: List
[CephadmCompletion
]) -> None:
663 Does nothing, as completions are processed in another thread.
666 self
.log
.debug("process: completions={0}".format(
667 orchestrator
.pretty_print(completions
)))
669 for p
in completions
:
672 @orchestrator._cli
_write
_command
(
673 prefix
='cephadm set-ssh-config',
674 desc
='Set the ssh_config file (use -i <ssh_config>)')
675 def _set_ssh_config(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
677 Set an ssh_config file provided from stdin
679 if inbuf
== self
.ssh_config
:
680 return 0, "value unchanged", ""
681 self
.validate_ssh_config_content(inbuf
)
682 self
.set_store("ssh_config", inbuf
)
683 self
.log
.info('Set ssh_config')
687 @orchestrator._cli
_write
_command
(
688 prefix
='cephadm clear-ssh-config',
689 desc
='Clear the ssh_config file')
690 def _clear_ssh_config(self
) -> Tuple
[int, str, str]:
692 Clear the ssh_config file provided from stdin
694 self
.set_store("ssh_config", None)
695 self
.ssh_config_tmp
= None
696 self
.log
.info('Cleared ssh_config')
700 @orchestrator._cli
_read
_command
(
701 prefix
='cephadm get-ssh-config',
702 desc
='Returns the ssh config as used by cephadm'
704 def _get_ssh_config(self
) -> HandleCommandResult
:
705 if self
.ssh_config_file
:
706 self
.validate_ssh_config_fname(self
.ssh_config_file
)
707 with
open(self
.ssh_config_file
) as f
:
708 return HandleCommandResult(stdout
=f
.read())
709 ssh_config
= self
.get_store("ssh_config")
711 return HandleCommandResult(stdout
=ssh_config
)
712 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
714 @orchestrator._cli
_write
_command
(
715 'cephadm generate-key',
716 desc
='Generate a cluster SSH key (if not present)')
717 def _generate_key(self
) -> Tuple
[int, str, str]:
718 if not self
.ssh_pub
or not self
.ssh_key
:
719 self
.log
.info('Generating ssh key...')
720 tmp_dir
= TemporaryDirectory()
721 path
= tmp_dir
.name
+ '/key'
723 subprocess
.check_call([
724 '/usr/bin/ssh-keygen',
725 '-C', 'ceph-%s' % self
._cluster
_fsid
,
729 with
open(path
, 'r') as f
:
731 with
open(path
+ '.pub', 'r') as f
:
735 os
.unlink(path
+ '.pub')
737 self
.set_store('ssh_identity_key', secret
)
738 self
.set_store('ssh_identity_pub', pub
)
742 @orchestrator._cli
_write
_command
(
743 'cephadm set-priv-key',
744 desc
='Set cluster SSH private key (use -i <private_key>)')
745 def _set_priv_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
746 if inbuf
is None or len(inbuf
) == 0:
747 return -errno
.EINVAL
, "", "empty private ssh key provided"
748 if inbuf
== self
.ssh_key
:
749 return 0, "value unchanged", ""
750 self
.set_store("ssh_identity_key", inbuf
)
751 self
.log
.info('Set ssh private key')
755 @orchestrator._cli
_write
_command
(
756 'cephadm set-pub-key',
757 desc
='Set cluster SSH public key (use -i <public_key>)')
758 def _set_pub_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
759 if inbuf
is None or len(inbuf
) == 0:
760 return -errno
.EINVAL
, "", "empty public ssh key provided"
761 if inbuf
== self
.ssh_pub
:
762 return 0, "value unchanged", ""
763 self
.set_store("ssh_identity_pub", inbuf
)
764 self
.log
.info('Set ssh public key')
768 @orchestrator._cli
_write
_command
(
770 desc
='Clear cluster SSH key')
771 def _clear_key(self
) -> Tuple
[int, str, str]:
772 self
.set_store('ssh_identity_key', None)
773 self
.set_store('ssh_identity_pub', None)
775 self
.log
.info('Cleared cluster SSH key')
778 @orchestrator._cli
_read
_command
(
779 'cephadm get-pub-key',
780 desc
='Show SSH public key for connecting to cluster hosts')
781 def _get_pub_key(self
) -> Tuple
[int, str, str]:
783 return 0, self
.ssh_pub
, ''
785 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
787 @orchestrator._cli
_read
_command
(
789 desc
='Show user for SSHing to cluster hosts')
790 def _get_user(self
) -> Tuple
[int, str, str]:
791 return 0, self
.ssh_user
, ''
793 @orchestrator._cli
_read
_command
(
795 'name=user,type=CephString',
796 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
797 def set_ssh_user(self
, user
: str) -> Tuple
[int, str, str]:
798 current_user
= self
.ssh_user
799 if user
== current_user
:
800 return 0, "value unchanged", ""
802 self
.set_store('ssh_user', user
)
805 host
= self
.cache
.get_hosts()[0]
806 r
= CephadmServe(self
)._check
_host
(host
)
808 # connection failed reset user
809 self
.set_store('ssh_user', current_user
)
811 return -errno
.EINVAL
, '', 'ssh connection %s@%s failed' % (user
, host
)
813 msg
= 'ssh user set to %s' % user
815 msg
+= ' sudo will be used'
819 @orchestrator._cli
_read
_command
(
820 'cephadm registry-login',
821 "name=url,type=CephString,req=false "
822 "name=username,type=CephString,req=false "
823 "name=password,type=CephString,req=false",
824 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
825 def registry_login(self
, url
: Optional
[str] = None, username
: Optional
[str] = None, password
: Optional
[str] = None, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
826 # if password not given in command line, get it through file input
827 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
828 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
829 "or -i <login credentials json file>")
830 elif not (url
and username
and password
):
831 assert isinstance(inbuf
, str)
832 login_info
= json
.loads(inbuf
)
833 if "url" in login_info
and "username" in login_info
and "password" in login_info
:
834 url
= login_info
["url"]
835 username
= login_info
["username"]
836 password
= login_info
["password"]
838 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
839 "Please setup json file as\n"
841 " \"url\": \"REGISTRY_URL\",\n"
842 " \"username\": \"REGISTRY_USERNAME\",\n"
843 " \"password\": \"REGISTRY_PASSWORD\"\n"
845 # verify login info works by attempting login on random host
847 for host_name
in self
.inventory
.keys():
851 raise OrchestratorError('no hosts defined')
852 r
= self
._registry
_login
(host
, url
, username
, password
)
855 # if logins succeeded, store info
856 self
.log
.debug("Host logins successful. Storing login info.")
857 self
.set_module_option('registry_url', url
)
858 self
.set_module_option('registry_username', username
)
859 self
.set_module_option('registry_password', password
)
860 # distribute new login info to all hosts
861 self
.cache
.distribute_new_registry_login_info()
862 return 0, "registry login scheduled", ''
864 @orchestrator._cli
_read
_command
(
865 'cephadm check-host',
866 'name=host,type=CephString '
867 'name=addr,type=CephString,req=false',
868 'Check whether we can access and manage a remote host')
869 def check_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
871 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'check-host',
872 ['--expect-hostname', host
],
874 error_ok
=True, no_fsid
=True)
876 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
877 except OrchestratorError
as e
:
878 self
.log
.exception(f
"check-host failed for '{host}'")
879 return 1, '', ('check-host failed:\n' +
880 f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
881 # if we have an outstanding health alert for this host, give the
882 # serve thread a kick
883 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
884 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
885 if item
.startswith('host %s ' % host
):
887 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
889 @orchestrator._cli
_read
_command
(
890 'cephadm prepare-host',
891 'name=host,type=CephString '
892 'name=addr,type=CephString,req=false',
893 'Prepare a remote host for use with cephadm')
894 def _prepare_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
895 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'prepare-host',
896 ['--expect-hostname', host
],
898 error_ok
=True, no_fsid
=True)
900 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
901 # if we have an outstanding health alert for this host, give the
902 # serve thread a kick
903 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
904 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
905 if item
.startswith('host %s ' % host
):
907 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
909 @orchestrator._cli
_write
_command
(
910 prefix
='cephadm set-extra-ceph-conf',
911 desc
="Text that is appended to all daemon's ceph.conf.\n"
912 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
913 "a complete ceph.conf.\n\n"
914 "Warning: this is a dangerous operation.")
915 def _set_extra_ceph_conf(self
, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
919 cp
.read_string(inbuf
, source
='<infile>')
921 self
.set_store("extra_ceph_conf", json
.dumps({
923 'last_modified': datetime_to_str(datetime_now())
925 self
.log
.info('Set extra_ceph_conf')
926 self
._kick
_serve
_loop
()
927 return HandleCommandResult()
929 @orchestrator._cli
_read
_command
(
930 'cephadm get-extra-ceph-conf',
931 desc
='Get extra ceph conf that is appended')
932 def _get_extra_ceph_conf(self
) -> HandleCommandResult
:
933 return HandleCommandResult(stdout
=self
.extra_ceph_conf().conf
)
935 class ExtraCephConf(NamedTuple
):
937 last_modified
: Optional
[datetime
.datetime
]
939 def extra_ceph_conf(self
) -> 'CephadmOrchestrator.ExtraCephConf':
940 data
= self
.get_store('extra_ceph_conf')
942 return CephadmOrchestrator
.ExtraCephConf('', None)
946 msg
= 'Unable to load extra_ceph_conf: Cannot decode JSON'
947 self
.log
.exception('%s: \'%s\'', msg
, data
)
948 return CephadmOrchestrator
.ExtraCephConf('', None)
949 return CephadmOrchestrator
.ExtraCephConf(j
['conf'], str_to_datetime(j
['last_modified']))
951 def extra_ceph_conf_is_newer(self
, dt
: datetime
.datetime
) -> bool:
952 conf
= self
.extra_ceph_conf()
953 if not conf
.last_modified
:
955 return conf
.last_modified
> dt
957 def _get_connection(self
, host
: str) -> Tuple
['remoto.backends.BaseConnection',
958 'remoto.backends.LegacyModuleExecute']:
960 Setup a connection for running commands on remote host.
962 conn
, r
= self
._cons
.get(host
, (None, None))
964 if conn
.has_connection():
965 self
.log
.debug('Have connection to %s' % host
)
968 self
._reset
_con
(host
)
969 n
= self
.ssh_user
+ '@' + host
970 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
971 n
, self
._ssh
_options
))
972 child_logger
= self
.log
.getChild(n
)
973 child_logger
.setLevel('WARNING')
974 conn
= remoto
.Connection(
977 ssh_options
=self
._ssh
_options
,
978 sudo
=True if self
.ssh_user
!= 'root' else False)
980 r
= conn
.import_module(remotes
)
981 self
._cons
[host
] = conn
, r
985 def _executable_path(self
, conn
: 'remoto.backends.BaseConnection', executable
: str) -> str:
987 Remote validator that accepts a connection object to ensure that a certain
988 executable is available returning its full path if so.
990 Otherwise an exception with thorough details will be raised, informing the
991 user that the executable was not found.
993 executable_path
= conn
.remote_module
.which(executable
)
994 if not executable_path
:
995 raise RuntimeError("Executable '{}' not found on host '{}'".format(
996 executable
, conn
.hostname
))
997 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
999 return executable_path
1002 def _remote_connection(self
,
1004 addr
: Optional
[str] = None,
1005 ) -> Iterator
[Tuple
["BaseConnection", Any
]]:
1006 if not addr
and host
in self
.inventory
:
1007 addr
= self
.inventory
.get_addr(host
)
1009 self
.offline_hosts_remove(host
)
1014 raise OrchestratorError("host address is empty")
1015 conn
, connr
= self
._get
_connection
(addr
)
1016 except OSError as e
:
1017 self
._reset
_con
(host
)
1018 msg
= f
"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1019 raise execnet
.gateway_bootstrap
.HostNotFound(msg
)
1023 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1024 # this is a misleading exception as it seems to be thrown for
1025 # any sort of connection failure, even those having nothing to
1026 # do with "host not found" (e.g., ssh key permission denied).
1027 self
.offline_hosts
.add(host
)
1028 self
._reset
_con
(host
)
1030 user
= self
.ssh_user
if self
.mode
== 'root' else 'cephadm'
1031 if str(e
).startswith("Can't communicate"):
1034 msg
= f
'''Failed to connect to {host} ({addr}).
1035 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1037 To add the cephadm SSH key to the host:
1038 > ceph cephadm get-pub-key > ~/ceph.pub
1039 > ssh-copy-id -f -i ~/ceph.pub {user}@{host}
1041 To check that the host is reachable:
1042 > ceph cephadm get-ssh-config > ssh_config
1043 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1044 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
1045 raise OrchestratorError(msg
) from e
1046 except Exception as ex
:
1047 self
.log
.exception(ex
)
1050 def _get_container_image(self
, daemon_name
: str) -> Optional
[str]:
1051 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1052 if daemon_type
in CEPH_TYPES
or \
1053 daemon_type
== 'nfs' or \
1054 daemon_type
== 'iscsi':
1055 # get container image
1056 ret
, image
, err
= self
.check_mon_command({
1057 'prefix': 'config get',
1058 'who': utils
.name_to_config_section(daemon_name
),
1059 'key': 'container_image',
1061 image
= image
.strip() # type: ignore
1062 elif daemon_type
== 'prometheus':
1063 image
= self
.container_image_prometheus
1064 elif daemon_type
== 'grafana':
1065 image
= self
.container_image_grafana
1066 elif daemon_type
== 'alertmanager':
1067 image
= self
.container_image_alertmanager
1068 elif daemon_type
== 'node-exporter':
1069 image
= self
.container_image_node_exporter
1070 elif daemon_type
== CustomContainerService
.TYPE
:
1071 # The image can't be resolved, the necessary information
1072 # is only available when a container is deployed (given
1076 assert False, daemon_type
1078 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1082 def _run_cephadm(self
,
1084 entity
: Union
[CephadmNoImage
, str],
1087 addr
: Optional
[str] = "",
1088 stdin
: Optional
[str] = "",
1089 no_fsid
: Optional
[bool] = False,
1090 error_ok
: Optional
[bool] = False,
1091 image
: Optional
[str] = "",
1092 env_vars
: Optional
[List
[str]] = None,
1093 ) -> Tuple
[List
[str], List
[str], int]:
1095 Run cephadm on the remote host with the given command + args
1097 :env_vars: in format -> [KEY=VALUE, ..]
1099 with self
._remote
_connection
(host
, addr
) as tpl
:
1101 assert image
or entity
1102 if not image
and entity
is not cephadmNoImage
:
1103 image
= self
._get
_container
_image
(entity
)
1108 for env_var_pair
in env_vars
:
1109 final_args
.extend(['--env', env_var_pair
])
1112 final_args
.extend(['--image', image
])
1113 final_args
.append(command
)
1116 final_args
+= ['--fsid', self
._cluster
_fsid
]
1118 if self
.container_init
:
1119 final_args
+= ['--container-init']
1123 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1124 if self
.mode
== 'root':
1126 self
.log
.debug('stdin: %s' % stdin
)
1127 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
1129 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
1130 script
+= self
._cephadm
1131 python
= connr
.choose_python()
1134 'unable to find python on %s (tried %s in %s)' % (
1135 host
, remotes
.PYTHONS
, remotes
.PATH
))
1137 out
, err
, code
= remoto
.process
.check(
1140 stdin
=script
.encode('utf-8'))
1141 except RuntimeError as e
:
1142 self
._reset
_con
(host
)
1144 return [], [str(e
)], 1
1146 elif self
.mode
== 'cephadm-package':
1148 out
, err
, code
= remoto
.process
.check(
1150 ['sudo', '/usr/bin/cephadm'] + final_args
,
1152 except RuntimeError as e
:
1153 self
._reset
_con
(host
)
1155 return [], [str(e
)], 1
1158 assert False, 'unsupported mode'
1160 self
.log
.debug('code: %d' % code
)
1162 self
.log
.debug('out: %s' % '\n'.join(out
))
1164 self
.log
.debug('err: %s' % '\n'.join(err
))
1165 if code
and not error_ok
:
1166 raise OrchestratorError(
1167 'cephadm exited with an error code: %d, stderr:%s' % (
1168 code
, '\n'.join(err
)))
1169 return out
, err
, code
1171 def _hosts_with_daemon_inventory(self
) -> List
[HostSpec
]:
1173 Returns all hosts that went through _refresh_host_daemons().
1175 This mitigates a potential race, where new host was added *after*
1176 ``_refresh_host_daemons()`` was called, but *before*
1177 ``_apply_all_specs()`` was called. thus we end up with a hosts
1178 where daemons might be running, but we have not yet detected them.
1181 h
for h
in self
.inventory
.all_specs()
1182 if self
.cache
.host_had_daemon_refresh(h
.hostname
)
1185 def _add_host(self
, spec
):
1186 # type: (HostSpec) -> str
1188 Add a host to be managed by the orchestrator.
1190 :param host: host name
1192 assert_valid_host(spec
.hostname
)
1193 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, cephadmNoImage
, 'check-host',
1194 ['--expect-hostname', spec
.hostname
],
1196 error_ok
=True, no_fsid
=True)
1198 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1199 spec
.hostname
, spec
.addr
, err
))
1201 self
.inventory
.add_host(spec
)
1202 self
.cache
.prime_empty_host(spec
.hostname
)
1203 self
.offline_hosts_remove(spec
.hostname
)
1204 self
.event
.set() # refresh stray health check
1205 self
.log
.info('Added host %s' % spec
.hostname
)
1206 return "Added host '{}'".format(spec
.hostname
)
1209 def add_host(self
, spec
: HostSpec
) -> str:
1210 return self
._add
_host
(spec
)
1213 def remove_host(self
, host
):
1214 # type: (str) -> str
1216 Remove a host from orchestrator management.
1218 :param host: host name
1220 self
.inventory
.rm_host(host
)
1221 self
.cache
.rm_host(host
)
1222 self
._reset
_con
(host
)
1223 self
.event
.set() # refresh stray health check
1224 self
.log
.info('Removed host %s' % host
)
1225 return "Removed host '{}'".format(host
)
1228 def update_host_addr(self
, host
: str, addr
: str) -> str:
1229 self
.inventory
.set_addr(host
, addr
)
1230 self
._reset
_con
(host
)
1231 self
.event
.set() # refresh stray health check
1232 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1233 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1236 def get_hosts(self
):
1237 # type: () -> List[orchestrator.HostSpec]
1239 Return a list of hosts managed by the orchestrator.
1242 - skip async: manager reads from cache.
1244 return list(self
.inventory
.all_specs())
1247 def add_host_label(self
, host
: str, label
: str) -> str:
1248 self
.inventory
.add_label(host
, label
)
1249 self
.log
.info('Added label %s to host %s' % (label
, host
))
1250 return 'Added label %s to host %s' % (label
, host
)
1253 def remove_host_label(self
, host
: str, label
: str) -> str:
1254 self
.inventory
.rm_label(host
, label
)
1255 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1256 return 'Removed label %s from host %s' % (label
, host
)
1259 def host_ok_to_stop(self
, hostname
: str) -> str:
1260 if hostname
not in self
.cache
.get_hosts():
1261 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1263 daemons
= self
.cache
.get_daemons()
1264 daemon_map
= defaultdict(lambda: [])
1266 if dd
.hostname
== hostname
:
1267 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1269 for daemon_type
, daemon_ids
in daemon_map
.items():
1270 r
= self
.cephadm_services
[daemon_type
].ok_to_stop(daemon_ids
)
1272 self
.log
.error(f
'It is NOT safe to stop host {hostname}')
1273 raise orchestrator
.OrchestratorError(
1277 msg
= f
'It is presumed safe to stop host {hostname}'
1281 def get_minimal_ceph_conf(self
) -> str:
1282 _
, config
, _
= self
.check_mon_command({
1283 "prefix": "config generate-minimal-conf",
1285 extra
= self
.extra_ceph_conf().conf
1287 config
+= '\n\n' + extra
.strip() + '\n'
1290 def _invalidate_daemons_and_kick_serve(self
, filter_host
: Optional
[str] = None) -> None:
1292 self
.cache
.invalidate_host_daemons(filter_host
)
1294 for h
in self
.cache
.get_hosts():
1295 # Also discover daemons deployed manually
1296 self
.cache
.invalidate_host_daemons(h
)
1298 self
._kick
_serve
_loop
()
1301 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
1302 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
1304 self
._invalidate
_daemons
_and
_kick
_serve
()
1305 self
.log
.info('Kicked serve() loop to refresh all services')
1308 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
1310 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1311 for name
, dd
in dm
.items():
1312 if service_type
and service_type
!= dd
.daemon_type
:
1314 n
: str = dd
.service_name()
1315 if service_name
and service_name
!= n
:
1317 if dd
.daemon_type
== 'osd':
1319 OSDs do not know the affinity to their spec out of the box.
1321 n
= f
"osd.{dd.osdspec_affinity}"
1322 if not dd
.osdspec_affinity
:
1323 # If there is no osdspec_affinity, the spec should suffice for displaying
1325 if n
in self
.spec_store
.specs
:
1326 spec
= self
.spec_store
.specs
[n
]
1330 service_type
=dd
.daemon_type
,
1331 service_id
=dd
.service_id(),
1332 placement
=PlacementSpec(
1337 sm
[n
] = orchestrator
.ServiceDescription(
1338 last_refresh
=dd
.last_refresh
,
1339 container_image_id
=dd
.container_image_id
,
1340 container_image_name
=dd
.container_image_name
,
1342 events
=self
.events
.get_for_service(spec
.service_name()),
1344 if n
in self
.spec_store
.specs
:
1345 if dd
.daemon_type
== 'osd':
1347 The osd count can't be determined by the Placement spec.
1348 Showing an actual/expected representation cannot be determined
1349 here. So we're setting running = size for now.
1352 sm
[n
].size
= osd_count
1354 sm
[n
].size
= spec
.placement
.get_host_selection_size(
1355 self
.inventory
.all_specs())
1357 sm
[n
].created
= self
.spec_store
.spec_created
[n
]
1358 if service_type
== 'nfs':
1359 spec
= cast(NFSServiceSpec
, spec
)
1360 sm
[n
].rados_config_location
= spec
.rados_config_location()
1365 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1366 sm
[n
].last_refresh
= dd
.last_refresh
1367 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1368 sm
[n
].container_image_id
= 'mix'
1369 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1370 sm
[n
].container_image_name
= 'mix'
1371 for n
, spec
in self
.spec_store
.specs
.items():
1374 if service_type
is not None and service_type
!= spec
.service_type
:
1376 if service_name
is not None and service_name
!= n
:
1378 sm
[n
] = orchestrator
.ServiceDescription(
1380 size
=spec
.placement
.get_host_selection_size(self
.inventory
.all_specs()),
1382 events
=self
.events
.get_for_service(spec
.service_name()),
1384 if service_type
== 'nfs':
1385 spec
= cast(NFSServiceSpec
, spec
)
1386 sm
[n
].rados_config_location
= spec
.rados_config_location()
1387 return list(sm
.values())
1390 def list_daemons(self
,
1391 service_name
: Optional
[str] = None,
1392 daemon_type
: Optional
[str] = None,
1393 daemon_id
: Optional
[str] = None,
1394 host
: Optional
[str] = None,
1395 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
1397 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
1398 self
.log
.info('Kicked serve() loop to refresh all daemons')
1401 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1402 if host
and h
!= host
:
1404 for name
, dd
in dm
.items():
1405 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1407 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1409 if service_name
is not None and service_name
!= dd
.service_name():
1415 def service_action(self
, action
: str, service_name
: str) -> List
[str]:
1416 dds
: List
[DaemonDescription
] = self
.cache
.get_daemons_by_service(service_name
)
1417 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1419 self
._schedule
_daemon
_action
(dd
.name(), action
)
1423 def _daemon_action(self
, daemon_type
: str, daemon_id
: str, host
: str, action
: str, image
: Optional
[str] = None) -> str:
1424 daemon_spec
: CephadmDaemonSpec
= CephadmDaemonSpec(
1426 daemon_id
=daemon_id
,
1427 daemon_type
=daemon_type
,
1430 self
._daemon
_action
_set
_image
(action
, image
, daemon_type
, daemon_id
)
1432 if action
== 'redeploy':
1433 if self
.daemon_is_self(daemon_type
, daemon_id
):
1434 self
.mgr_service
.fail_over()
1435 return '' # unreachable
1436 # stop, recreate the container+unit, then restart
1437 return self
._create
_daemon
(daemon_spec
)
1438 elif action
== 'reconfig':
1439 return self
._create
_daemon
(daemon_spec
, reconfig
=True)
1442 'start': ['reset-failed', 'start'],
1444 'restart': ['reset-failed', 'restart'],
1446 name
= daemon_spec
.name()
1447 for a
in actions
[action
]:
1449 out
, err
, code
= self
._run
_cephadm
(
1451 ['--name', name
, a
])
1453 self
.log
.exception(f
'`{host}: cephadm unit {name} {a}` failed')
1454 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1455 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
1456 self
.events
.for_daemon(name
, 'INFO', msg
)
1459 def _daemon_action_set_image(self
, action
: str, image
: Optional
[str], daemon_type
: str, daemon_id
: str) -> None:
1460 if image
is not None:
1461 if action
!= 'redeploy':
1462 raise OrchestratorError(
1463 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1464 if daemon_type
not in CEPH_TYPES
:
1465 raise OrchestratorError(
1466 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1467 f
'types are: {", ".join(CEPH_TYPES)}')
1469 self
.check_mon_command({
1470 'prefix': 'config set',
1471 'name': 'container_image',
1473 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
1477 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> str:
1478 d
= self
.cache
.get_daemon(daemon_name
)
1480 if action
== 'redeploy' and self
.daemon_is_self(d
.daemon_type
, d
.daemon_id
) \
1481 and not self
.mgr_service
.mgr_map_has_standby():
1482 raise OrchestratorError(
1483 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1485 self
._daemon
_action
_set
_image
(action
, image
, d
.daemon_type
, d
.daemon_id
)
1487 self
.log
.info(f
'Schedule {action} daemon {daemon_name}')
1488 return self
._schedule
_daemon
_action
(daemon_name
, action
)
1490 def daemon_is_self(self
, daemon_type
: str, daemon_id
: str) -> bool:
1491 return daemon_type
== 'mgr' and daemon_id
== self
.get_mgr_id()
1493 def _schedule_daemon_action(self
, daemon_name
: str, action
: str) -> str:
1494 dd
= self
.cache
.get_daemon(daemon_name
)
1495 if action
== 'redeploy' and self
.daemon_is_self(dd
.daemon_type
, dd
.daemon_id
) \
1496 and not self
.mgr_service
.mgr_map_has_standby():
1497 raise OrchestratorError(
1498 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1499 self
.cache
.schedule_daemon_action(dd
.hostname
, dd
.name(), action
)
1500 msg
= "Scheduled to {} {} on host '{}'".format(action
, daemon_name
, dd
.hostname
)
1501 self
._kick
_serve
_loop
()
1505 def remove_daemons(self
, names
):
1506 # type: (List[str]) -> List[str]
1508 for host
, dm
in self
.cache
.daemons
.items():
1511 args
.append((name
, host
))
1513 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
1514 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
1515 return self
._remove
_daemons
(args
)
1518 def remove_service(self
, service_name
: str) -> str:
1519 self
.log
.info('Remove service %s' % service_name
)
1520 self
._trigger
_preview
_refresh
(service_name
=service_name
)
1521 found
= self
.spec_store
.rm(service_name
)
1523 self
._kick
_serve
_loop
()
1524 return 'Removed service %s' % service_name
1526 # must be idempotent: still a success.
1527 return f
'Failed to remove service. <{service_name}> was not found.'
1530 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
1532 Return the storage inventory of hosts matching the given filter.
1534 :param host_filter: host filter
1537 - add filtering by label
1540 if host_filter
and host_filter
.hosts
:
1541 for h
in host_filter
.hosts
:
1542 self
.cache
.invalidate_host_devices(h
)
1544 for h
in self
.cache
.get_hosts():
1545 self
.cache
.invalidate_host_devices(h
)
1548 self
.log
.info('Kicked serve() loop to refresh devices')
1551 for host
, dls
in self
.cache
.devices
.items():
1552 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
1554 result
.append(orchestrator
.InventoryHost(host
,
1555 inventory
.Devices(dls
)))
1559 def zap_device(self
, host
: str, path
: str) -> str:
1560 self
.log
.info('Zap device %s:%s' % (host
, path
))
1561 out
, err
, code
= self
._run
_cephadm
(
1562 host
, 'osd', 'ceph-volume',
1563 ['--', 'lvm', 'zap', '--destroy', path
],
1565 self
.cache
.invalidate_host_devices(host
)
1567 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
1568 return '\n'.join(out
+ err
)
1571 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
1573 Blink a device light. Calling something like::
1575 lsmcli local-disk-ident-led-on --path $path
1577 If you must, you can customize this via::
1579 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1580 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1582 See templates/blink_device_light_cmd.j2
1585 def blink(host
: str, dev
: str, path
: str) -> str:
1586 cmd_line
= self
.template
.render('blink_device_light_cmd.j2',
1589 'ident_fault': ident_fault
,
1594 cmd_args
= shlex
.split(cmd_line
)
1596 out
, err
, code
= self
._run
_cephadm
(
1597 host
, 'osd', 'shell', ['--'] + cmd_args
,
1600 raise OrchestratorError(
1601 'Unable to affect %s light for %s:%s. Command: %s' % (
1602 ident_fault
, host
, dev
, ' '.join(cmd_args
)))
1603 self
.log
.info('Set %s light for %s:%s %s' % (
1604 ident_fault
, host
, dev
, 'on' if on
else 'off'))
1605 return "Set %s light for %s:%s %s" % (
1606 ident_fault
, host
, dev
, 'on' if on
else 'off')
1610 def get_osd_uuid_map(self
, only_up
=False):
1611 # type: (bool) -> Dict[str, str]
1612 osd_map
= self
.get('osd_map')
1614 for o
in osd_map
['osds']:
1615 # only include OSDs that have ever started in this map. this way
1616 # an interrupted osd create can be repeated and succeed the second
1618 osd_id
= o
.get('osd')
1620 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1621 if not only_up
or (o
['up_from'] > 0):
1622 r
[str(osd_id
)] = o
.get('uuid', '')
1625 def _trigger_preview_refresh(self
,
1626 specs
: Optional
[List
[DriveGroupSpec
]] = None,
1627 service_name
: Optional
[str] = None,
1629 # Only trigger a refresh when a spec has changed
1633 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
1634 # the to-be-preview spec != the actual spec, this means we need to
1635 # trigger a refresh, if the spec has been removed (==None) we need to
1637 if not preview_spec
or spec
!= preview_spec
:
1638 trigger_specs
.append(spec
)
1640 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
1641 if not any(trigger_specs
):
1644 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
1645 for host
in refresh_hosts
:
1646 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
1647 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
1650 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
1652 Deprecated. Please use `apply()` instead.
1654 Keeping this around to be compapatible to mgr/dashboard
1656 return [self
._apply
(spec
) for spec
in specs
]
1659 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
1660 return self
.osd_service
.create_from_spec(drive_group
)
1662 def _preview_osdspecs(self
,
1663 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
1666 return {'n/a': [{'error': True,
1667 'message': 'No OSDSpec or matching hosts found.'}]}
1668 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
1669 if not matching_hosts
:
1670 return {'n/a': [{'error': True,
1671 'message': 'No OSDSpec or matching hosts found.'}]}
1672 # Is any host still loading previews or still in the queue to be previewed
1673 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
1674 if pending_hosts
or any(item
in self
.cache
.osdspec_previews_refresh_queue
for item
in matching_hosts
):
1675 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1676 return {'n/a': [{'error': True,
1677 'message': 'Preview data is being generated.. '
1678 'Please re-run this command in a bit.'}]}
1679 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1680 previews_for_specs
= {}
1681 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
1682 if host
not in matching_hosts
:
1685 for osd_report
in raw_reports
:
1686 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
1687 osd_reports
.append(osd_report
)
1688 previews_for_specs
.update({host
: osd_reports
})
1689 return previews_for_specs
1691 def _calc_daemon_deps(self
, daemon_type
: str, daemon_id
: str) -> List
[str]:
1693 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1694 'grafana': ['prometheus'],
1695 'alertmanager': ['mgr', 'alertmanager'],
1698 for dep_type
in need
.get(daemon_type
, []):
1699 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
1700 deps
.append(dd
.name())
1703 def _create_daemon(self
,
1704 daemon_spec
: CephadmDaemonSpec
,
1705 reconfig
: bool = False,
1706 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1709 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1710 daemon_type
=daemon_spec
.daemon_type
,
1711 daemon_id
=daemon_spec
.daemon_id
,
1712 hostname
=daemon_spec
.host
,
1713 ).service_id(), overwrite
=True):
1716 start_time
= datetime_now()
1717 ports
: List
[int] = daemon_spec
.ports
if daemon_spec
.ports
else []
1719 if daemon_spec
.daemon_type
== 'container':
1720 spec
: Optional
[CustomContainerSpec
] = daemon_spec
.spec
1722 # Exit here immediately because the required service
1723 # spec to create a daemon is not provided. This is only
1724 # provided when a service is applied via 'orch apply'
1726 msg
= "Failed to {} daemon {} on {}: Required " \
1727 "service specification not provided".format(
1728 'reconfigure' if reconfig
else 'deploy',
1729 daemon_spec
.name(), daemon_spec
.host
)
1734 ports
.extend(spec
.ports
)
1736 cephadm_config
, deps
= self
.cephadm_services
[daemon_spec
.daemon_type
].generate_config(
1739 # TCP port to open in the host firewall
1741 daemon_spec
.extra_args
.extend([
1742 '--tcp-ports', ' '.join(map(str, ports
))
1745 # osd deployments needs an --osd-uuid arg
1746 if daemon_spec
.daemon_type
== 'osd':
1747 if not osd_uuid_map
:
1748 osd_uuid_map
= self
.get_osd_uuid_map()
1749 osd_uuid
= osd_uuid_map
.get(daemon_spec
.daemon_id
)
1751 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec
.daemon_id
)
1752 daemon_spec
.extra_args
.extend(['--osd-fsid', osd_uuid
])
1755 daemon_spec
.extra_args
.append('--reconfig')
1756 if self
.allow_ptrace
:
1757 daemon_spec
.extra_args
.append('--allow-ptrace')
1759 if self
.cache
.host_needs_registry_login(daemon_spec
.host
) and self
.registry_url
:
1760 self
._registry
_login
(daemon_spec
.host
, self
.registry_url
,
1761 self
.registry_username
, self
.registry_password
)
1763 daemon_spec
.extra_args
.extend(['--config-json', '-'])
1765 self
.log
.info('%s daemon %s on %s' % (
1766 'Reconfiguring' if reconfig
else 'Deploying',
1767 daemon_spec
.name(), daemon_spec
.host
))
1769 out
, err
, code
= self
._run
_cephadm
(
1770 daemon_spec
.host
, daemon_spec
.name(), 'deploy',
1772 '--name', daemon_spec
.name(),
1773 ] + daemon_spec
.extra_args
,
1774 stdin
=json
.dumps(cephadm_config
),
1776 if not code
and daemon_spec
.host
in self
.cache
.daemons
:
1777 # prime cached service state with what we (should have)
1779 sd
= orchestrator
.DaemonDescription()
1780 sd
.daemon_type
= daemon_spec
.daemon_type
1781 sd
.daemon_id
= daemon_spec
.daemon_id
1782 sd
.hostname
= daemon_spec
.host
1784 sd
.status_desc
= 'starting'
1785 self
.cache
.add_daemon(daemon_spec
.host
, sd
)
1786 if daemon_spec
.daemon_type
in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1787 self
.requires_post_actions
.add(daemon_spec
.daemon_type
)
1788 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1789 self
.cache
.update_daemon_config_deps(
1790 daemon_spec
.host
, daemon_spec
.name(), deps
, start_time
)
1791 self
.cache
.save_host(daemon_spec
.host
)
1792 msg
= "{} {} on host '{}'".format(
1793 'Reconfigured' if reconfig
else 'Deployed', daemon_spec
.name(), daemon_spec
.host
)
1795 self
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.INFO
, msg
)
1797 what
= 'reconfigure' if reconfig
else 'deploy'
1798 self
.events
.for_daemon(
1799 daemon_spec
.name(), OrchestratorEvent
.ERROR
, f
'Failed to {what}: {err}')
1803 def _remove_daemons(self
, name
: str, host
: str) -> str:
1804 return self
._remove
_daemon
(name
, host
)
1806 def _remove_daemon(self
, name
: str, host
: str) -> str:
1810 (daemon_type
, daemon_id
) = name
.split('.', 1)
1811 daemon
= orchestrator
.DaemonDescription(
1812 daemon_type
=daemon_type
,
1813 daemon_id
=daemon_id
,
1816 with
set_exception_subject('service', daemon
.service_id(), overwrite
=True):
1818 self
.cephadm_services
[daemon_type
].pre_remove(daemon
)
1820 args
= ['--name', name
, '--force']
1821 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
1822 out
, err
, code
= self
._run
_cephadm
(
1823 host
, name
, 'rm-daemon', args
)
1825 # remove item from cache
1826 self
.cache
.rm_daemon(host
, name
)
1827 self
.cache
.invalidate_host_daemons(host
)
1829 self
.cephadm_services
[daemon_type
].post_remove(daemon
)
1831 return "Removed {} from host '{}'".format(name
, host
)
1833 def _check_pool_exists(self
, pool
: str, service_name
: str) -> None:
1834 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
1835 if not self
.rados
.pool_exists(pool
):
1836 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
1837 f
'service {service_name}')
1839 def _add_daemon(self
,
1842 create_func
: Callable
[..., CephadmDaemonSpec
],
1843 config_func
: Optional
[Callable
] = None) -> List
[str]:
1845 Add (and place) a daemon. Require explicit host placement. Do not
1846 schedule, and do not apply the related scheduling limitations.
1848 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
1849 if not spec
.placement
.hosts
:
1850 raise OrchestratorError('must specify host(s) to deploy on')
1851 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
1852 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
1853 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
1854 spec
.placement
.hosts
, count
,
1855 create_func
, config_func
)
1857 def _create_daemons(self
,
1860 daemons
: List
[DaemonDescription
],
1861 hosts
: List
[HostPlacementSpec
],
1863 create_func
: Callable
[..., CephadmDaemonSpec
],
1864 config_func
: Optional
[Callable
] = None) -> List
[str]:
1865 if count
> len(hosts
):
1866 raise OrchestratorError('too few hosts: want %d, have %s' % (
1871 args
= [] # type: List[CephadmDaemonSpec]
1872 for host
, network
, name
in hosts
:
1873 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
1874 prefix
=spec
.service_id
,
1877 if not did_config
and config_func
:
1878 if daemon_type
== 'rgw':
1879 config_func(spec
, daemon_id
)
1884 daemon_spec
= self
.cephadm_services
[daemon_type
].make_daemon_spec(
1885 host
, daemon_id
, network
, spec
)
1886 self
.log
.debug('Placing %s.%s on host %s' % (
1887 daemon_type
, daemon_id
, host
))
1888 args
.append(daemon_spec
)
1890 # add to daemon list so next name(s) will also be unique
1891 sd
= orchestrator
.DaemonDescription(
1893 daemon_type
=daemon_type
,
1894 daemon_id
=daemon_id
,
1899 def create_func_map(*args
: Any
) -> str:
1900 daemon_spec
= create_func(*args
)
1901 return self
._create
_daemon
(daemon_spec
)
1903 return create_func_map(args
)
1906 def apply_mon(self
, spec
: ServiceSpec
) -> str:
1907 return self
._apply
(spec
)
1910 def add_mon(self
, spec
):
1911 # type: (ServiceSpec) -> List[str]
1912 return self
._add
_daemon
('mon', spec
, self
.mon_service
.prepare_create
)
1915 def add_mgr(self
, spec
):
1916 # type: (ServiceSpec) -> List[str]
1917 return self
._add
_daemon
('mgr', spec
, self
.mgr_service
.prepare_create
)
1919 def _apply(self
, spec
: GenericSpec
) -> str:
1920 if spec
.service_type
== 'host':
1921 return self
._add
_host
(cast(HostSpec
, spec
))
1923 if spec
.service_type
== 'osd':
1924 # _trigger preview refresh needs to be smart and
1925 # should only refresh if a change has been detected
1926 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
1928 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
1930 def _plan(self
, spec
: ServiceSpec
) -> dict:
1931 if spec
.service_type
== 'osd':
1932 return {'service_name': spec
.service_name(),
1933 'service_type': spec
.service_type
,
1934 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
1936 ha
= HostAssignment(
1938 hosts
=self
._hosts
_with
_daemon
_inventory
(),
1939 get_daemons_func
=self
.cache
.get_daemons_by_service
,
1944 add_daemon_hosts
= ha
.add_daemon_hosts(hosts
)
1945 remove_daemon_hosts
= ha
.remove_daemon_hosts(hosts
)
1948 'service_name': spec
.service_name(),
1949 'service_type': spec
.service_type
,
1950 'add': [hs
.hostname
for hs
in add_daemon_hosts
],
1951 'remove': [d
.hostname
for d
in remove_daemon_hosts
]
1955 def plan(self
, specs
: List
[GenericSpec
]) -> List
:
1956 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1957 'to the current inventory setup. If any on these conditions changes, the \n'
1958 'preview will be invalid. Please make sure to have a minimal \n'
1959 'timeframe between planning and applying the specs.'}]
1960 if any([spec
.service_type
== 'host' for spec
in specs
]):
1961 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1963 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
1966 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
1967 if spec
.placement
.is_empty():
1968 # fill in default placement
1970 'mon': PlacementSpec(count
=5),
1971 'mgr': PlacementSpec(count
=2),
1972 'mds': PlacementSpec(count
=2),
1973 'rgw': PlacementSpec(count
=2),
1974 'iscsi': PlacementSpec(count
=1),
1975 'rbd-mirror': PlacementSpec(count
=2),
1976 'nfs': PlacementSpec(count
=1),
1977 'grafana': PlacementSpec(count
=1),
1978 'alertmanager': PlacementSpec(count
=1),
1979 'prometheus': PlacementSpec(count
=1),
1980 'node-exporter': PlacementSpec(host_pattern
='*'),
1981 'crash': PlacementSpec(host_pattern
='*'),
1982 'container': PlacementSpec(count
=1),
1984 spec
.placement
= defaults
[spec
.service_type
]
1985 elif spec
.service_type
in ['mon', 'mgr'] and \
1986 spec
.placement
.count
is not None and \
1987 spec
.placement
.count
< 1:
1988 raise OrchestratorError('cannot scale %s service below 1' % (
1993 hosts
=self
.inventory
.all_specs(), # All hosts, even those without daemon refresh
1994 get_daemons_func
=self
.cache
.get_daemons_by_service
,
1997 self
.log
.info('Saving service %s spec with placement %s' % (
1998 spec
.service_name(), spec
.placement
.pretty_str()))
1999 self
.spec_store
.save(spec
)
2000 self
._kick
_serve
_loop
()
2001 return "Scheduled %s update..." % spec
.service_name()
2004 def apply(self
, specs
: List
[GenericSpec
]) -> List
[str]:
2007 results
.append(self
._apply
(spec
))
2011 def apply_mgr(self
, spec
: ServiceSpec
) -> str:
2012 return self
._apply
(spec
)
2015 def add_mds(self
, spec
: ServiceSpec
) -> List
[str]:
2016 return self
._add
_daemon
('mds', spec
, self
.mds_service
.prepare_create
, self
.mds_service
.config
)
2019 def apply_mds(self
, spec
: ServiceSpec
) -> str:
2020 return self
._apply
(spec
)
2023 def add_rgw(self
, spec
: ServiceSpec
) -> List
[str]:
2024 return self
._add
_daemon
('rgw', spec
, self
.rgw_service
.prepare_create
, self
.rgw_service
.config
)
2027 def apply_rgw(self
, spec
: ServiceSpec
) -> str:
2028 return self
._apply
(spec
)
2031 def add_iscsi(self
, spec
):
2032 # type: (ServiceSpec) -> List[str]
2033 return self
._add
_daemon
('iscsi', spec
, self
.iscsi_service
.prepare_create
, self
.iscsi_service
.config
)
2036 def apply_iscsi(self
, spec
: ServiceSpec
) -> str:
2037 return self
._apply
(spec
)
2040 def add_rbd_mirror(self
, spec
: ServiceSpec
) -> List
[str]:
2041 return self
._add
_daemon
('rbd-mirror', spec
, self
.rbd_mirror_service
.prepare_create
)
2044 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> str:
2045 return self
._apply
(spec
)
2048 def add_nfs(self
, spec
: ServiceSpec
) -> List
[str]:
2049 return self
._add
_daemon
('nfs', spec
, self
.nfs_service
.prepare_create
, self
.nfs_service
.config
)
2052 def apply_nfs(self
, spec
: ServiceSpec
) -> str:
2053 return self
._apply
(spec
)
2055 def _get_dashboard_url(self
):
2057 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
2060 def add_prometheus(self
, spec
: ServiceSpec
) -> List
[str]:
2061 return self
._add
_daemon
('prometheus', spec
, self
.prometheus_service
.prepare_create
)
2064 def apply_prometheus(self
, spec
: ServiceSpec
) -> str:
2065 return self
._apply
(spec
)
2068 def add_node_exporter(self
, spec
):
2069 # type: (ServiceSpec) -> List[str]
2070 return self
._add
_daemon
('node-exporter', spec
,
2071 self
.node_exporter_service
.prepare_create
)
2074 def apply_node_exporter(self
, spec
: ServiceSpec
) -> str:
2075 return self
._apply
(spec
)
2078 def add_crash(self
, spec
):
2079 # type: (ServiceSpec) -> List[str]
2080 return self
._add
_daemon
('crash', spec
,
2081 self
.crash_service
.prepare_create
)
2084 def apply_crash(self
, spec
: ServiceSpec
) -> str:
2085 return self
._apply
(spec
)
2088 def add_grafana(self
, spec
):
2089 # type: (ServiceSpec) -> List[str]
2090 return self
._add
_daemon
('grafana', spec
, self
.grafana_service
.prepare_create
)
2093 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
2094 return self
._apply
(spec
)
2097 def add_alertmanager(self
, spec
):
2098 # type: (ServiceSpec) -> List[str]
2099 return self
._add
_daemon
('alertmanager', spec
, self
.alertmanager_service
.prepare_create
)
2102 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
2103 return self
._apply
(spec
)
2106 def add_container(self
, spec
: ServiceSpec
) -> List
[str]:
2107 return self
._add
_daemon
('container', spec
,
2108 self
.container_service
.prepare_create
)
2111 def apply_container(self
, spec
: ServiceSpec
) -> str:
2112 return self
._apply
(spec
)
2114 def _get_container_image_info(self
, image_name
: str) -> ContainerInspectInfo
:
2115 # pick a random host...
2117 for host_name
in self
.inventory
.keys():
2121 raise OrchestratorError('no hosts defined')
2122 if self
.cache
.host_needs_registry_login(host
) and self
.registry_url
:
2123 self
._registry
_login
(host
, self
.registry_url
,
2124 self
.registry_username
, self
.registry_password
)
2125 out
, err
, code
= self
._run
_cephadm
(
2126 host
, '', 'pull', [],
2131 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2132 image_name
, host
, '\n'.join(out
)))
2134 j
= json
.loads('\n'.join(out
))
2135 r
= ContainerInspectInfo(
2137 j
.get('ceph_version'),
2138 j
.get('repo_digest')
2140 self
.log
.debug(f
'image {image_name} -> {r}')
2142 except (ValueError, KeyError) as _
:
2143 msg
= 'Failed to pull %s on %s: Cannot decode JSON' % (image_name
, host
)
2144 self
.log
.exception('%s: \'%s\'' % (msg
, '\n'.join(out
)))
2145 raise OrchestratorError(msg
)
2148 def upgrade_check(self
, image
: str, version
: str) -> str:
2150 target_name
= self
.container_image_base
+ ':v' + version
2154 raise OrchestratorError('must specify either image or version')
2156 image_info
= self
._get
_container
_image
_info
(target_name
)
2157 self
.log
.debug(f
'image info {image} -> {image_info}')
2159 'target_name': target_name
,
2160 'target_id': image_info
.image_id
,
2161 'target_version': image_info
.ceph_version
,
2162 'needs_update': dict(),
2163 'up_to_date': list(),
2165 for host
, dm
in self
.cache
.daemons
.items():
2166 for name
, dd
in dm
.items():
2167 if image_info
.image_id
== dd
.container_image_id
:
2168 r
['up_to_date'].append(dd
.name())
2170 r
['needs_update'][dd
.name()] = {
2171 'current_name': dd
.container_image_name
,
2172 'current_id': dd
.container_image_id
,
2173 'current_version': dd
.version
,
2175 if self
.use_repo_digest
:
2176 r
['target_digest'] = image_info
.repo_digest
2178 return json
.dumps(r
, indent
=4, sort_keys
=True)
2181 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
2182 return self
.upgrade
.upgrade_status()
2185 def upgrade_start(self
, image
: str, version
: str) -> str:
2186 return self
.upgrade
.upgrade_start(image
, version
)
2189 def upgrade_pause(self
) -> str:
2190 return self
.upgrade
.upgrade_pause()
2193 def upgrade_resume(self
) -> str:
2194 return self
.upgrade
.upgrade_resume()
2197 def upgrade_stop(self
) -> str:
2198 return self
.upgrade
.upgrade_stop()
2201 def remove_osds(self
, osd_ids
: List
[str],
2202 replace
: bool = False,
2203 force
: bool = False) -> str:
2205 Takes a list of OSDs and schedules them for removal.
2206 The function that takes care of the actual removal is
2207 process_removal_queue().
2210 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
2211 to_remove_daemons
= list()
2212 for daemon
in daemons
:
2213 if daemon
.daemon_id
in osd_ids
:
2214 to_remove_daemons
.append(daemon
)
2215 if not to_remove_daemons
:
2216 return f
"Unable to find OSDs: {osd_ids}"
2218 for daemon
in to_remove_daemons
:
2220 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
2223 hostname
=daemon
.hostname
,
2224 fullname
=daemon
.name(),
2225 process_started_at
=datetime_now(),
2226 remove_util
=self
.to_remove_osds
.rm_util
))
2227 except NotFoundError
:
2228 return f
"Unable to find OSDs: {osd_ids}"
2230 # trigger the serve loop to initiate the removal
2231 self
._kick
_serve
_loop
()
2232 return "Scheduled OSD(s) for removal"
2235 def stop_remove_osds(self
, osd_ids
: List
[str]) -> str:
2237 Stops a `removal` process for a List of OSDs.
2238 This will revert their weight and remove it from the osds_to_remove queue
2240 for osd_id
in osd_ids
:
2242 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
2243 remove_util
=self
.to_remove_osds
.rm_util
))
2244 except (NotFoundError
, KeyError):
2245 return f
'Unable to find OSD in the queue: {osd_id}'
2247 # trigger the serve loop to halt the removal
2248 self
._kick
_serve
_loop
()
2249 return "Stopped OSD(s) removal"
2252 def remove_osds_status(self
) -> List
[OSD
]:
2254 The CLI call to retrieve an osd removal report
2256 return self
.to_remove_osds
.all_osds()