6 from collections
import defaultdict
7 from configparser
import ConfigParser
8 from contextlib
import contextmanager
9 from functools
import wraps
10 from tempfile
import TemporaryDirectory
11 from threading
import Event
14 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
15 Any
, Set
, TYPE_CHECKING
, cast
, Iterator
, Union
, NamedTuple
22 import multiprocessing
.pool
25 from ceph
.deployment
import inventory
26 from ceph
.deployment
.drive_group
import DriveGroupSpec
27 from ceph
.deployment
.service_spec
import \
28 NFSServiceSpec
, ServiceSpec
, PlacementSpec
, assert_valid_host
, \
30 from cephadm
.serve
import CephadmServe
31 from cephadm
.services
.cephadmservice
import CephadmDaemonSpec
33 from mgr_module
import MgrModule
, HandleCommandResult
35 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
36 CLICommandMeta
, OrchestratorEvent
, set_exception_subject
, DaemonDescription
37 from orchestrator
._interface
import GenericSpec
41 from .migrations
import Migrations
42 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
43 RbdMirrorService
, CrashService
, CephadmService
44 from .services
.container
import CustomContainerService
45 from .services
.iscsi
import IscsiService
46 from .services
.nfs
import NFSService
47 from .services
.osd
import RemoveUtil
, OSDQueue
, OSDService
, OSD
, NotFoundError
48 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
50 from .schedule
import HostAssignment
51 from .inventory
import Inventory
, SpecStore
, HostCache
, EventStore
52 from .upgrade
import CEPH_UPGRADE_ORDER
, CephadmUpgrade
53 from .template
import TemplateMgr
54 from .utils
import forall_hosts
, CephadmNoImage
, cephadmNoImage
, \
55 str_to_datetime
, datetime_to_str
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils
.version
import StrictVersion
62 if StrictVersion(remoto
.__version
__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self
):
64 return self
.gateway
.hasreceiver()
66 from remoto
.backends
import BaseConnection
67 BaseConnection
.has_connection
= remoto_has_connection
69 import execnet
.gateway_bootstrap
70 except ImportError as e
:
72 remoto_import_error
= str(e
)
75 from typing
import List
79 logger
= logging
.getLogger(__name__
)
83 DEFAULT_SSH_CONFIG
= """
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
91 CEPH_DATEFMT
= '%Y-%m-%dT%H:%M:%S.%fZ'
93 CEPH_TYPES
= set(CEPH_UPGRADE_ORDER
)
96 class CephadmCompletion(orchestrator
.Completion
[T
]):
101 def trivial_completion(f
: Callable
[..., T
]) -> Callable
[..., CephadmCompletion
[T
]]:
103 Decorator to make CephadmCompletion methods return
104 a completion object that executes themselves.
108 def wrapper(*args
, **kwargs
):
109 return CephadmCompletion(on_complete
=lambda _
: f(*args
, **kwargs
))
114 class ContainerInspectInfo(NamedTuple
):
116 ceph_version
: Optional
[str]
117 repo_digest
: Optional
[str]
120 @six.add_metaclass(CLICommandMeta
)
121 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
):
123 _STORE_HOST_PREFIX
= "host"
126 NATIVE_OPTIONS
= [] # type: List[Any]
127 MODULE_OPTIONS
: List
[dict] = [
129 'name': 'ssh_config_file',
132 'desc': 'customized SSH config file to connect to managed hosts',
135 'name': 'device_cache_timeout',
138 'desc': 'seconds to cache device inventory',
141 'name': 'daemon_cache_timeout',
144 'desc': 'seconds to cache service (daemon) inventory',
147 'name': 'host_check_interval',
150 'desc': 'how frequently to perform a host check',
155 'enum_allowed': ['root', 'cephadm-package'],
157 'desc': 'mode for remote execution of cephadm',
160 'name': 'container_image_base',
161 'default': 'docker.io/ceph/ceph',
162 'desc': 'Container image name, without the tag',
166 'name': 'container_image_prometheus',
167 'default': 'docker.io/prom/prometheus:v2.18.1',
168 'desc': 'Prometheus container image',
171 'name': 'container_image_grafana',
172 'default': 'docker.io/ceph/ceph-grafana:6.6.2',
173 'desc': 'Prometheus container image',
176 'name': 'container_image_alertmanager',
177 'default': 'docker.io/prom/alertmanager:v0.20.0',
178 'desc': 'Prometheus container image',
181 'name': 'container_image_node_exporter',
182 'default': 'docker.io/prom/node-exporter:v0.18.1',
183 'desc': 'Prometheus container image',
186 'name': 'warn_on_stray_hosts',
189 'desc': 'raise a health warning if daemons are detected on a host '
190 'that is not managed by cephadm',
193 'name': 'warn_on_stray_daemons',
196 'desc': 'raise a health warning if daemons are detected '
197 'that are not managed by cephadm',
200 'name': 'warn_on_failed_host_check',
203 'desc': 'raise a health warning if the host check fails',
206 'name': 'log_to_cluster',
209 'desc': 'log to the "cephadm" cluster log channel"',
212 'name': 'allow_ptrace',
215 'desc': 'allow SYS_PTRACE capability on ceph containers',
216 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
217 'process with gdb or strace. Enabling this options '
218 'can allow debugging daemons that encounter problems '
222 'name': 'container_init',
225 'desc': 'Run podman/docker with `--init`',
228 'name': 'prometheus_alerts_path',
230 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
231 'desc': 'location of alerts to include in prometheus deployments',
234 'name': 'migration_current',
237 'desc': 'internal - do not modify',
238 # used to track track spec and other data migrations.
241 'name': 'config_dashboard',
244 'desc': 'manage configs like API endpoints in Dashboard.'
247 'name': 'manage_etc_ceph_ceph_conf',
250 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
253 'name': 'registry_url',
256 'desc': 'Custom repository url'
259 'name': 'registry_username',
262 'desc': 'Custom repository username'
265 'name': 'registry_password',
268 'desc': 'Custom repository password'
271 'name': 'use_repo_digest',
274 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
278 def __init__(self
, *args
, **kwargs
):
279 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
280 self
._cluster
_fsid
= self
.get('mon_map')['fsid']
281 self
.last_monmap
: Optional
[datetime
.datetime
] = None
287 if self
.get_store('pause'):
292 # for mypy which does not run the code
294 self
.ssh_config_file
= None # type: Optional[str]
295 self
.device_cache_timeout
= 0
296 self
.daemon_cache_timeout
= 0
297 self
.host_check_interval
= 0
299 self
.container_image_base
= ''
300 self
.container_image_prometheus
= ''
301 self
.container_image_grafana
= ''
302 self
.container_image_alertmanager
= ''
303 self
.container_image_node_exporter
= ''
304 self
.warn_on_stray_hosts
= True
305 self
.warn_on_stray_daemons
= True
306 self
.warn_on_failed_host_check
= True
307 self
.allow_ptrace
= False
308 self
.container_init
= False
309 self
.prometheus_alerts_path
= ''
310 self
.migration_current
= None
311 self
.config_dashboard
= True
312 self
.manage_etc_ceph_ceph_conf
= True
313 self
.registry_url
: Optional
[str] = None
314 self
.registry_username
: Optional
[str] = None
315 self
.registry_password
: Optional
[str] = None
316 self
.use_repo_digest
= False
318 self
._cons
: Dict
[str, Tuple
[remoto
.backends
.BaseConnection
,
319 remoto
.backends
.LegacyModuleExecute
]] = {}
321 self
.notify('mon_map', None)
324 path
= self
.get_ceph_option('cephadm_path')
326 with
open(path
, 'r') as f
:
327 self
._cephadm
= f
.read()
328 except (IOError, TypeError) as e
:
329 raise RuntimeError("unable to read cephadm at '%s': %s" % (
332 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
336 CephadmOrchestrator
.instance
= self
338 self
.upgrade
= CephadmUpgrade(self
)
340 self
.health_checks
= {}
342 self
.all_progress_references
= list() # type: List[orchestrator.ProgressReference]
344 self
.inventory
= Inventory(self
)
346 self
.cache
= HostCache(self
)
349 self
.rm_util
= RemoveUtil(self
)
350 self
.to_remove_osds
= OSDQueue()
351 self
.rm_util
.load_from_store()
353 self
.spec_store
= SpecStore(self
)
354 self
.spec_store
.load()
356 # ensure the host lists are in sync
357 for h
in self
.inventory
.keys():
358 if h
not in self
.cache
.daemons
:
359 self
.cache
.prime_empty_host(h
)
360 for h
in self
.cache
.get_hosts():
361 if h
not in self
.inventory
:
362 self
.cache
.rm_host(h
)
365 self
.events
= EventStore(self
)
366 self
.offline_hosts
: Set
[str] = set()
368 self
.migration
= Migrations(self
)
371 self
.osd_service
= OSDService(self
)
372 self
.nfs_service
= NFSService(self
)
373 self
.mon_service
= MonService(self
)
374 self
.mgr_service
= MgrService(self
)
375 self
.mds_service
= MdsService(self
)
376 self
.rgw_service
= RgwService(self
)
377 self
.rbd_mirror_service
= RbdMirrorService(self
)
378 self
.grafana_service
= GrafanaService(self
)
379 self
.alertmanager_service
= AlertmanagerService(self
)
380 self
.prometheus_service
= PrometheusService(self
)
381 self
.node_exporter_service
= NodeExporterService(self
)
382 self
.crash_service
= CrashService(self
)
383 self
.iscsi_service
= IscsiService(self
)
384 self
.container_service
= CustomContainerService(self
)
385 self
.cephadm_services
= {
386 'mon': self
.mon_service
,
387 'mgr': self
.mgr_service
,
388 'osd': self
.osd_service
,
389 'mds': self
.mds_service
,
390 'rgw': self
.rgw_service
,
391 'rbd-mirror': self
.rbd_mirror_service
,
392 'nfs': self
.nfs_service
,
393 'grafana': self
.grafana_service
,
394 'alertmanager': self
.alertmanager_service
,
395 'prometheus': self
.prometheus_service
,
396 'node-exporter': self
.node_exporter_service
,
397 'crash': self
.crash_service
,
398 'iscsi': self
.iscsi_service
,
399 'container': self
.container_service
,
402 self
.template
= TemplateMgr(self
)
404 self
.requires_post_actions
= set()
407 self
.log
.debug('shutdown')
408 self
._worker
_pool
.close()
409 self
._worker
_pool
.join()
413 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
414 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
415 return self
.cephadm_services
[service_type
]
417 def _kick_serve_loop(self
):
418 self
.log
.debug('_kick_serve_loop')
421 # function responsible for logging single host into custom registry
422 def _registry_login(self
, host
, url
, username
, password
):
423 self
.log
.debug(f
"Attempting to log host {host} into custom registry @ {url}")
424 # want to pass info over stdin rather than through normal list of args
425 args_str
= ("{\"url\": \"" + url
+ "\", \"username\": \"" + username
+ "\", "
426 " \"password\": \"" + password
+ "\"}")
427 out
, err
, code
= self
._run
_cephadm
(
428 host
, 'mon', 'registry-login',
429 ['--registry-json', '-'], stdin
=args_str
, error_ok
=True)
431 return f
"Host {host} failed to login to {url} as {username} with given password"
434 def serve(self
) -> None:
436 The main loop of cephadm.
438 A command handler will typically change the declarative state
439 of cephadm. This loop will then attempt to apply this new state.
441 serve
= CephadmServe(self
)
444 def set_container_image(self
, entity
: str, image
):
445 self
.check_mon_command({
446 'prefix': 'config set',
447 'name': 'container_image',
452 def config_notify(self
):
454 This method is called whenever one of our config options is changed.
456 TODO: this method should be moved into mgr_module.py
458 for opt
in self
.MODULE_OPTIONS
:
460 opt
['name'], # type: ignore
461 self
.get_module_option(opt
['name'])) # type: ignore
462 self
.log
.debug(' mgr option %s = %s',
463 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
464 for opt
in self
.NATIVE_OPTIONS
:
467 self
.get_ceph_option(opt
))
468 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
472 def notify(self
, notify_type
, notify_id
):
473 if notify_type
== "mon_map":
474 # get monmap mtime so we can refresh configs when mons change
475 monmap
= self
.get('mon_map')
476 self
.last_monmap
= datetime
.datetime
.strptime(
477 monmap
['modified'], CEPH_DATEFMT
)
478 if self
.last_monmap
and self
.last_monmap
> datetime
.datetime
.utcnow():
479 self
.last_monmap
= None # just in case clocks are skewed
480 if getattr(self
, 'manage_etc_ceph_ceph_conf', False):
481 # getattr, due to notify() being called before config_notify()
482 self
._kick
_serve
_loop
()
483 if notify_type
== "pg_summary":
484 self
._trigger
_osd
_removal
()
486 def _trigger_osd_removal(self
):
487 data
= self
.get("osd_stats")
488 for osd
in data
.get('osd_stats', []):
489 if osd
.get('num_pgs') == 0:
490 # if _ANY_ osd that is currently in the queue appears to be empty,
491 # start the removal process
492 if int(osd
.get('osd')) in self
.to_remove_osds
.as_osd_ids():
493 self
.log
.debug(f
"Found empty osd. Starting removal process")
494 # if the osd that is now empty is also part of the removal queue
496 self
.rm_util
.process_removal_queue()
500 self
.log
.info('Paused')
501 self
.set_store('pause', 'true')
503 # wake loop so we update the health status
504 self
._kick
_serve
_loop
()
508 self
.log
.info('Resumed')
510 self
.set_store('pause', None)
511 # unconditionally wake loop so that 'orch resume' can be used to kick
513 self
._kick
_serve
_loop
()
515 def get_unique_name(self
, daemon_type
, host
, existing
, prefix
=None,
517 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
519 Generate a unique random service name
521 suffix
= daemon_type
not in [
522 'mon', 'crash', 'nfs',
523 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
527 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
528 raise orchestrator
.OrchestratorValidationError(
529 f
'name {daemon_type}.{forcename} already in use')
533 host
= host
.split('.')[0]
541 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
543 if len([d
for d
in existing
if d
.daemon_id
== name
]):
545 raise orchestrator
.OrchestratorValidationError(
546 f
'name {daemon_type}.{name} already in use')
547 self
.log
.debug('name %s exists, trying again', name
)
551 def _reconfig_ssh(self
):
552 temp_files
= [] # type: list
553 ssh_options
= [] # type: List[str]
556 ssh_config_fname
= self
.ssh_config_file
557 ssh_config
= self
.get_store("ssh_config")
558 if ssh_config
is not None or ssh_config_fname
is None:
560 ssh_config
= DEFAULT_SSH_CONFIG
561 f
= tempfile
.NamedTemporaryFile(prefix
='cephadm-conf-')
562 os
.fchmod(f
.fileno(), 0o600)
563 f
.write(ssh_config
.encode('utf-8'))
564 f
.flush() # make visible to other processes
566 ssh_config_fname
= f
.name
568 self
.validate_ssh_config_fname(ssh_config_fname
)
569 ssh_options
+= ['-F', ssh_config_fname
]
570 self
.ssh_config
= ssh_config
573 ssh_key
= self
.get_store("ssh_identity_key")
574 ssh_pub
= self
.get_store("ssh_identity_pub")
575 self
.ssh_pub
= ssh_pub
576 self
.ssh_key
= ssh_key
577 if ssh_key
and ssh_pub
:
578 tkey
= tempfile
.NamedTemporaryFile(prefix
='cephadm-identity-')
579 tkey
.write(ssh_key
.encode('utf-8'))
580 os
.fchmod(tkey
.fileno(), 0o600)
581 tkey
.flush() # make visible to other processes
582 tpub
= open(tkey
.name
+ '.pub', 'w')
583 os
.fchmod(tpub
.fileno(), 0o600)
585 tpub
.flush() # make visible to other processes
586 temp_files
+= [tkey
, tpub
]
587 ssh_options
+= ['-i', tkey
.name
]
589 self
._temp
_files
= temp_files
591 self
._ssh
_options
= ' '.join(ssh_options
) # type: Optional[str]
593 self
._ssh
_options
= None
595 if self
.mode
== 'root':
596 self
.ssh_user
= self
.get_store('ssh_user', default
='root')
597 elif self
.mode
== 'cephadm-package':
598 self
.ssh_user
= 'cephadm'
602 def validate_ssh_config_content(self
, ssh_config
):
603 if ssh_config
is None or len(ssh_config
.strip()) == 0:
604 raise OrchestratorValidationError('ssh_config cannot be empty')
605 # StrictHostKeyChecking is [yes|no] ?
606 l
= re
.findall(r
'StrictHostKeyChecking\s+.*', ssh_config
)
608 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
610 if 'ask' in s
.lower():
611 raise OrchestratorValidationError(f
'ssh_config cannot contain: \'{s}\'')
613 def validate_ssh_config_fname(self
, ssh_config_fname
):
614 if not os
.path
.isfile(ssh_config_fname
):
615 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
618 def _reset_con(self
, host
):
619 conn
, r
= self
._cons
.get(host
, (None, None))
621 self
.log
.debug('_reset_con close %s' % host
)
625 def _reset_cons(self
):
626 for host
, conn_and_r
in self
._cons
.items():
627 self
.log
.debug('_reset_cons close %s' % host
)
632 def offline_hosts_remove(self
, host
):
633 if host
in self
.offline_hosts
:
634 self
.offline_hosts
.remove(host
)
638 if remoto
is not None:
641 return False, "loading remoto library:{}".format(
646 The cephadm orchestrator is always available.
648 ok
, err
= self
.can_run()
651 if not self
.ssh_key
or not self
.ssh_pub
:
652 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
655 def process(self
, completions
):
657 Does nothing, as completions are processed in another thread.
660 self
.log
.debug("process: completions={0}".format(
661 orchestrator
.pretty_print(completions
)))
663 for p
in completions
:
666 @orchestrator._cli
_write
_command
(
667 prefix
='cephadm set-ssh-config',
668 desc
='Set the ssh_config file (use -i <ssh_config>)')
669 def _set_ssh_config(self
, inbuf
=None):
671 Set an ssh_config file provided from stdin
673 if inbuf
== self
.ssh_config
:
674 return 0, "value unchanged", ""
675 self
.validate_ssh_config_content(inbuf
)
676 self
.set_store("ssh_config", inbuf
)
677 self
.log
.info('Set ssh_config')
681 @orchestrator._cli
_write
_command
(
682 prefix
='cephadm clear-ssh-config',
683 desc
='Clear the ssh_config file')
684 def _clear_ssh_config(self
):
686 Clear the ssh_config file provided from stdin
688 self
.set_store("ssh_config", None)
689 self
.ssh_config_tmp
= None
690 self
.log
.info('Cleared ssh_config')
694 @orchestrator._cli
_read
_command
(
695 prefix
='cephadm get-ssh-config',
696 desc
='Returns the ssh config as used by cephadm'
698 def _get_ssh_config(self
):
699 if self
.ssh_config_file
:
700 self
.validate_ssh_config_fname(self
.ssh_config_file
)
701 with
open(self
.ssh_config_file
) as f
:
702 return HandleCommandResult(stdout
=f
.read())
703 ssh_config
= self
.get_store("ssh_config")
705 return HandleCommandResult(stdout
=ssh_config
)
706 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
708 @orchestrator._cli
_write
_command
(
709 'cephadm generate-key',
710 desc
='Generate a cluster SSH key (if not present)')
711 def _generate_key(self
):
712 if not self
.ssh_pub
or not self
.ssh_key
:
713 self
.log
.info('Generating ssh key...')
714 tmp_dir
= TemporaryDirectory()
715 path
= tmp_dir
.name
+ '/key'
717 subprocess
.check_call([
718 '/usr/bin/ssh-keygen',
719 '-C', 'ceph-%s' % self
._cluster
_fsid
,
723 with
open(path
, 'r') as f
:
725 with
open(path
+ '.pub', 'r') as f
:
729 os
.unlink(path
+ '.pub')
731 self
.set_store('ssh_identity_key', secret
)
732 self
.set_store('ssh_identity_pub', pub
)
736 @orchestrator._cli
_write
_command
(
737 'cephadm set-priv-key',
738 desc
='Set cluster SSH private key (use -i <private_key>)')
739 def _set_priv_key(self
, inbuf
=None):
740 if inbuf
is None or len(inbuf
) == 0:
741 return -errno
.EINVAL
, "", "empty private ssh key provided"
742 if inbuf
== self
.ssh_key
:
743 return 0, "value unchanged", ""
744 self
.set_store("ssh_identity_key", inbuf
)
745 self
.log
.info('Set ssh private key')
749 @orchestrator._cli
_write
_command
(
750 'cephadm set-pub-key',
751 desc
='Set cluster SSH public key (use -i <public_key>)')
752 def _set_pub_key(self
, inbuf
=None):
753 if inbuf
is None or len(inbuf
) == 0:
754 return -errno
.EINVAL
, "", "empty public ssh key provided"
755 if inbuf
== self
.ssh_pub
:
756 return 0, "value unchanged", ""
757 self
.set_store("ssh_identity_pub", inbuf
)
758 self
.log
.info('Set ssh public key')
762 @orchestrator._cli
_write
_command
(
764 desc
='Clear cluster SSH key')
765 def _clear_key(self
):
766 self
.set_store('ssh_identity_key', None)
767 self
.set_store('ssh_identity_pub', None)
769 self
.log
.info('Cleared cluster SSH key')
772 @orchestrator._cli
_read
_command
(
773 'cephadm get-pub-key',
774 desc
='Show SSH public key for connecting to cluster hosts')
775 def _get_pub_key(self
):
777 return 0, self
.ssh_pub
, ''
779 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
781 @orchestrator._cli
_read
_command
(
783 desc
='Show user for SSHing to cluster hosts')
785 return 0, self
.ssh_user
, ''
787 @orchestrator._cli
_read
_command
(
789 'name=user,type=CephString',
790 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
791 def set_ssh_user(self
, user
):
792 current_user
= self
.ssh_user
793 if user
== current_user
:
794 return 0, "value unchanged", ""
796 self
.set_store('ssh_user', user
)
799 host
= self
.cache
.get_hosts()[0]
800 r
= CephadmServe(self
)._check
_host
(host
)
802 # connection failed reset user
803 self
.set_store('ssh_user', current_user
)
805 return -errno
.EINVAL
, '', 'ssh connection %s@%s failed' % (user
, host
)
807 msg
= 'ssh user set to %s' % user
809 msg
+= ' sudo will be used'
813 @orchestrator._cli
_read
_command
(
814 'cephadm registry-login',
815 "name=url,type=CephString,req=false "
816 "name=username,type=CephString,req=false "
817 "name=password,type=CephString,req=false",
818 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
819 def registry_login(self
, url
=None, username
=None, password
=None, inbuf
=None):
820 # if password not given in command line, get it through file input
821 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
822 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
823 "or -i <login credentials json file>")
824 elif not (url
and username
and password
):
825 login_info
= json
.loads(inbuf
)
826 if "url" in login_info
and "username" in login_info
and "password" in login_info
:
827 url
= login_info
["url"]
828 username
= login_info
["username"]
829 password
= login_info
["password"]
831 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
832 "Please setup json file as\n"
834 " \"url\": \"REGISTRY_URL\",\n"
835 " \"username\": \"REGISTRY_USERNAME\",\n"
836 " \"password\": \"REGISTRY_PASSWORD\"\n"
838 # verify login info works by attempting login on random host
840 for host_name
in self
.inventory
.keys():
844 raise OrchestratorError('no hosts defined')
845 r
= self
._registry
_login
(host
, url
, username
, password
)
848 # if logins succeeded, store info
849 self
.log
.debug("Host logins successful. Storing login info.")
850 self
.set_module_option('registry_url', url
)
851 self
.set_module_option('registry_username', username
)
852 self
.set_module_option('registry_password', password
)
853 # distribute new login info to all hosts
854 self
.cache
.distribute_new_registry_login_info()
855 return 0, "registry login scheduled", ''
857 @orchestrator._cli
_read
_command
(
858 'cephadm check-host',
859 'name=host,type=CephString '
860 'name=addr,type=CephString,req=false',
861 'Check whether we can access and manage a remote host')
862 def check_host(self
, host
, addr
=None):
864 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'check-host',
865 ['--expect-hostname', host
],
867 error_ok
=True, no_fsid
=True)
869 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
870 except OrchestratorError
as e
:
871 self
.log
.exception(f
"check-host failed for '{host}'")
872 return 1, '', ('check-host failed:\n' +
873 f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
874 # if we have an outstanding health alert for this host, give the
875 # serve thread a kick
876 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
877 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
878 if item
.startswith('host %s ' % host
):
880 return 0, '%s (%s) ok' % (host
, addr
), err
882 @orchestrator._cli
_read
_command
(
883 'cephadm prepare-host',
884 'name=host,type=CephString '
885 'name=addr,type=CephString,req=false',
886 'Prepare a remote host for use with cephadm')
887 def _prepare_host(self
, host
, addr
=None):
888 out
, err
, code
= self
._run
_cephadm
(host
, cephadmNoImage
, 'prepare-host',
889 ['--expect-hostname', host
],
891 error_ok
=True, no_fsid
=True)
893 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
894 # if we have an outstanding health alert for this host, give the
895 # serve thread a kick
896 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
897 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
898 if item
.startswith('host %s ' % host
):
900 return 0, '%s (%s) ok' % (host
, addr
), err
902 @orchestrator._cli
_write
_command
(
903 prefix
='cephadm set-extra-ceph-conf',
904 desc
="Text that is appended to all daemon's ceph.conf.\n"
905 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
906 "a complete ceph.conf.\n\n"
907 "Warning: this is a dangerous operation.")
908 def _set_extra_ceph_conf(self
, inbuf
=None) -> HandleCommandResult
:
912 cp
.read_string(inbuf
, source
='<infile>')
914 self
.set_store("extra_ceph_conf", json
.dumps({
916 'last_modified': datetime_to_str(datetime
.datetime
.utcnow())
918 self
.log
.info('Set extra_ceph_conf')
919 self
._kick
_serve
_loop
()
920 return HandleCommandResult()
922 @orchestrator._cli
_read
_command
(
923 'cephadm get-extra-ceph-conf',
924 desc
='Get extra ceph conf that is appended')
925 def _get_extra_ceph_conf(self
) -> HandleCommandResult
:
926 return HandleCommandResult(stdout
=self
.extra_ceph_conf().conf
)
928 class ExtraCephConf(NamedTuple
):
930 last_modified
: Optional
[datetime
.datetime
]
932 def extra_ceph_conf(self
) -> 'CephadmOrchestrator.ExtraCephConf':
933 data
= self
.get_store('extra_ceph_conf')
935 return CephadmOrchestrator
.ExtraCephConf('', None)
939 self
.log
.exception('unable to laod extra_ceph_conf')
940 return CephadmOrchestrator
.ExtraCephConf('', None)
941 return CephadmOrchestrator
.ExtraCephConf(j
['conf'], str_to_datetime(j
['last_modified']))
943 def extra_ceph_conf_is_newer(self
, dt
: datetime
.datetime
) -> bool:
944 conf
= self
.extra_ceph_conf()
945 if not conf
.last_modified
:
947 return conf
.last_modified
> dt
949 def _get_connection(self
, host
: str):
951 Setup a connection for running commands on remote host.
953 conn
, r
= self
._cons
.get(host
, (None, None))
955 if conn
.has_connection():
956 self
.log
.debug('Have connection to %s' % host
)
959 self
._reset
_con
(host
)
960 n
= self
.ssh_user
+ '@' + host
961 self
.log
.debug("Opening connection to {} with ssh options '{}'".format(
962 n
, self
._ssh
_options
))
963 child_logger
= self
.log
.getChild(n
)
964 child_logger
.setLevel('WARNING')
965 conn
= remoto
.Connection(
968 ssh_options
=self
._ssh
_options
,
969 sudo
=True if self
.ssh_user
!= 'root' else False)
971 r
= conn
.import_module(remotes
)
972 self
._cons
[host
] = conn
, r
976 def _executable_path(self
, conn
, executable
):
978 Remote validator that accepts a connection object to ensure that a certain
979 executable is available returning its full path if so.
981 Otherwise an exception with thorough details will be raised, informing the
982 user that the executable was not found.
984 executable_path
= conn
.remote_module
.which(executable
)
985 if not executable_path
:
986 raise RuntimeError("Executable '{}' not found on host '{}'".format(
987 executable
, conn
.hostname
))
988 self
.log
.debug("Found executable '{}' at path '{}'".format(executable
,
990 return executable_path
993 def _remote_connection(self
,
995 addr
: Optional
[str] = None,
996 ) -> Iterator
[Tuple
["BaseConnection", Any
]]:
997 if not addr
and host
in self
.inventory
:
998 addr
= self
.inventory
.get_addr(host
)
1000 self
.offline_hosts_remove(host
)
1005 raise OrchestratorError("host address is empty")
1006 conn
, connr
= self
._get
_connection
(addr
)
1007 except OSError as e
:
1008 self
._reset
_con
(host
)
1009 msg
= f
"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1010 raise execnet
.gateway_bootstrap
.HostNotFound(msg
)
1014 except execnet
.gateway_bootstrap
.HostNotFound
as e
:
1015 # this is a misleading exception as it seems to be thrown for
1016 # any sort of connection failure, even those having nothing to
1017 # do with "host not found" (e.g., ssh key permission denied).
1018 self
.offline_hosts
.add(host
)
1019 self
._reset
_con
(host
)
1021 user
= self
.ssh_user
if self
.mode
== 'root' else 'cephadm'
1022 msg
= f
'''Failed to connect to {host} ({addr}).
1023 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1025 To add the cephadm SSH key to the host:
1026 > ceph cephadm get-pub-key > ~/ceph.pub
1027 > ssh-copy-id -f -i ~/ceph.pub {user}@{host}
1029 To check that the host is reachable:
1030 > ceph cephadm get-ssh-config > ssh_config
1031 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1032 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
1033 raise OrchestratorError(msg
) from e
1034 except Exception as ex
:
1035 self
.log
.exception(ex
)
1038 def _get_container_image(self
, daemon_name
: str) -> Optional
[str]:
1039 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1040 if daemon_type
in CEPH_TYPES
or \
1041 daemon_type
== 'nfs' or \
1042 daemon_type
== 'iscsi':
1043 # get container image
1044 ret
, image
, err
= self
.check_mon_command({
1045 'prefix': 'config get',
1046 'who': utils
.name_to_config_section(daemon_name
),
1047 'key': 'container_image',
1049 image
= image
.strip() # type: ignore
1050 elif daemon_type
== 'prometheus':
1051 image
= self
.container_image_prometheus
1052 elif daemon_type
== 'grafana':
1053 image
= self
.container_image_grafana
1054 elif daemon_type
== 'alertmanager':
1055 image
= self
.container_image_alertmanager
1056 elif daemon_type
== 'node-exporter':
1057 image
= self
.container_image_node_exporter
1058 elif daemon_type
== CustomContainerService
.TYPE
:
1059 # The image can't be resolved, the necessary information
1060 # is only available when a container is deployed (given
1064 assert False, daemon_type
1066 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1070 def _run_cephadm(self
,
1072 entity
: Union
[CephadmNoImage
, str],
1075 addr
: Optional
[str] = "",
1076 stdin
: Optional
[str] = "",
1077 no_fsid
: Optional
[bool] = False,
1078 error_ok
: Optional
[bool] = False,
1079 image
: Optional
[str] = "",
1080 env_vars
: Optional
[List
[str]] = None,
1081 ) -> Tuple
[List
[str], List
[str], int]:
1083 Run cephadm on the remote host with the given command + args
1085 :env_vars: in format -> [KEY=VALUE, ..]
1087 with self
._remote
_connection
(host
, addr
) as tpl
:
1089 assert image
or entity
1090 if not image
and entity
is not cephadmNoImage
:
1091 image
= self
._get
_container
_image
(entity
)
1096 for env_var_pair
in env_vars
:
1097 final_args
.extend(['--env', env_var_pair
])
1100 final_args
.extend(['--image', image
])
1101 final_args
.append(command
)
1104 final_args
+= ['--fsid', self
._cluster
_fsid
]
1106 if self
.container_init
:
1107 final_args
+= ['--container-init']
1111 self
.log
.debug('args: %s' % (' '.join(final_args
)))
1112 if self
.mode
== 'root':
1114 self
.log
.debug('stdin: %s' % stdin
)
1115 script
= 'injected_argv = ' + json
.dumps(final_args
) + '\n'
1117 script
+= 'injected_stdin = ' + json
.dumps(stdin
) + '\n'
1118 script
+= self
._cephadm
1119 python
= connr
.choose_python()
1122 'unable to find python on %s (tried %s in %s)' % (
1123 host
, remotes
.PYTHONS
, remotes
.PATH
))
1125 out
, err
, code
= remoto
.process
.check(
1128 stdin
=script
.encode('utf-8'))
1129 except RuntimeError as e
:
1130 self
._reset
_con
(host
)
1132 return [], [str(e
)], 1
1134 elif self
.mode
== 'cephadm-package':
1136 out
, err
, code
= remoto
.process
.check(
1138 ['sudo', '/usr/bin/cephadm'] + final_args
,
1140 except RuntimeError as e
:
1141 self
._reset
_con
(host
)
1143 return [], [str(e
)], 1
1146 assert False, 'unsupported mode'
1148 self
.log
.debug('code: %d' % code
)
1150 self
.log
.debug('out: %s' % '\n'.join(out
))
1152 self
.log
.debug('err: %s' % '\n'.join(err
))
1153 if code
and not error_ok
:
1154 raise OrchestratorError(
1155 'cephadm exited with an error code: %d, stderr:%s' % (
1156 code
, '\n'.join(err
)))
1157 return out
, err
, code
1159 def _hosts_with_daemon_inventory(self
) -> List
[HostSpec
]:
1161 Returns all hosts that went through _refresh_host_daemons().
1163 This mitigates a potential race, where new host was added *after*
1164 ``_refresh_host_daemons()`` was called, but *before*
1165 ``_apply_all_specs()`` was called. thus we end up with a hosts
1166 where daemons might be running, but we have not yet detected them.
1169 h
for h
in self
.inventory
.all_specs()
1170 if self
.cache
.host_had_daemon_refresh(h
.hostname
)
1173 def _add_host(self
, spec
):
1174 # type: (HostSpec) -> str
1176 Add a host to be managed by the orchestrator.
1178 :param host: host name
1180 assert_valid_host(spec
.hostname
)
1181 out
, err
, code
= self
._run
_cephadm
(spec
.hostname
, cephadmNoImage
, 'check-host',
1182 ['--expect-hostname', spec
.hostname
],
1184 error_ok
=True, no_fsid
=True)
1186 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1187 spec
.hostname
, spec
.addr
, err
))
1189 self
.inventory
.add_host(spec
)
1190 self
.cache
.prime_empty_host(spec
.hostname
)
1191 self
.offline_hosts_remove(spec
.hostname
)
1192 self
.event
.set() # refresh stray health check
1193 self
.log
.info('Added host %s' % spec
.hostname
)
1194 return "Added host '{}'".format(spec
.hostname
)
1197 def add_host(self
, spec
: HostSpec
) -> str:
1198 return self
._add
_host
(spec
)
1201 def remove_host(self
, host
):
1202 # type: (str) -> str
1204 Remove a host from orchestrator management.
1206 :param host: host name
1208 self
.inventory
.rm_host(host
)
1209 self
.cache
.rm_host(host
)
1210 self
._reset
_con
(host
)
1211 self
.event
.set() # refresh stray health check
1212 self
.log
.info('Removed host %s' % host
)
1213 return "Removed host '{}'".format(host
)
1216 def update_host_addr(self
, host
, addr
) -> str:
1217 self
.inventory
.set_addr(host
, addr
)
1218 self
._reset
_con
(host
)
1219 self
.event
.set() # refresh stray health check
1220 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1221 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1224 def get_hosts(self
):
1225 # type: () -> List[orchestrator.HostSpec]
1227 Return a list of hosts managed by the orchestrator.
1230 - skip async: manager reads from cache.
1232 return list(self
.inventory
.all_specs())
1235 def add_host_label(self
, host
, label
) -> str:
1236 self
.inventory
.add_label(host
, label
)
1237 self
.log
.info('Added label %s to host %s' % (label
, host
))
1238 return 'Added label %s to host %s' % (label
, host
)
1241 def remove_host_label(self
, host
, label
) -> str:
1242 self
.inventory
.rm_label(host
, label
)
1243 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1244 return 'Removed label %s from host %s' % (label
, host
)
1247 def host_ok_to_stop(self
, hostname
: str):
1248 if hostname
not in self
.cache
.get_hosts():
1249 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1251 daemons
= self
.cache
.get_daemons()
1252 daemon_map
= defaultdict(lambda: [])
1254 if dd
.hostname
== hostname
:
1255 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1257 for daemon_type
, daemon_ids
in daemon_map
.items():
1258 r
= self
.cephadm_services
[daemon_type
].ok_to_stop(daemon_ids
)
1260 self
.log
.error(f
'It is NOT safe to stop host {hostname}')
1261 raise orchestrator
.OrchestratorError(
1265 msg
= f
'It is presumed safe to stop host {hostname}'
1269 def get_minimal_ceph_conf(self
) -> str:
1270 _
, config
, _
= self
.check_mon_command({
1271 "prefix": "config generate-minimal-conf",
1273 extra
= self
.extra_ceph_conf().conf
1275 config
+= '\n\n' + extra
.strip() + '\n'
1278 def _invalidate_daemons_and_kick_serve(self
, filter_host
=None):
1280 self
.cache
.invalidate_host_daemons(filter_host
)
1282 for h
in self
.cache
.get_hosts():
1283 # Also discover daemons deployed manually
1284 self
.cache
.invalidate_host_daemons(h
)
1286 self
._kick
_serve
_loop
()
1289 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
1290 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
1292 self
._invalidate
_daemons
_and
_kick
_serve
()
1293 self
.log
.info('Kicked serve() loop to refresh all services')
1296 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
1298 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1299 for name
, dd
in dm
.items():
1300 if service_type
and service_type
!= dd
.daemon_type
:
1302 n
: str = dd
.service_name()
1303 if service_name
and service_name
!= n
:
1305 if dd
.daemon_type
== 'osd':
1307 OSDs do not know the affinity to their spec out of the box.
1309 n
= f
"osd.{dd.osdspec_affinity}"
1310 if not dd
.osdspec_affinity
:
1311 # If there is no osdspec_affinity, the spec should suffice for displaying
1313 if n
in self
.spec_store
.specs
:
1314 spec
= self
.spec_store
.specs
[n
]
1318 service_type
=dd
.daemon_type
,
1319 service_id
=dd
.service_id(),
1320 placement
=PlacementSpec(
1325 sm
[n
] = orchestrator
.ServiceDescription(
1326 last_refresh
=dd
.last_refresh
,
1327 container_image_id
=dd
.container_image_id
,
1328 container_image_name
=dd
.container_image_name
,
1330 events
=self
.events
.get_for_service(spec
.service_name()),
1332 if n
in self
.spec_store
.specs
:
1333 if dd
.daemon_type
== 'osd':
1335 The osd count can't be determined by the Placement spec.
1336 Showing an actual/expected representation cannot be determined
1337 here. So we're setting running = size for now.
1340 sm
[n
].size
= osd_count
1342 sm
[n
].size
= spec
.placement
.get_host_selection_size(
1343 self
.inventory
.all_specs())
1345 sm
[n
].created
= self
.spec_store
.spec_created
[n
]
1346 if service_type
== 'nfs':
1347 spec
= cast(NFSServiceSpec
, spec
)
1348 sm
[n
].rados_config_location
= spec
.rados_config_location()
1353 if not sm
[n
].last_refresh
or not dd
.last_refresh
or dd
.last_refresh
< sm
[n
].last_refresh
: # type: ignore
1354 sm
[n
].last_refresh
= dd
.last_refresh
1355 if sm
[n
].container_image_id
!= dd
.container_image_id
:
1356 sm
[n
].container_image_id
= 'mix'
1357 if sm
[n
].container_image_name
!= dd
.container_image_name
:
1358 sm
[n
].container_image_name
= 'mix'
1359 for n
, spec
in self
.spec_store
.specs
.items():
1362 if service_type
is not None and service_type
!= spec
.service_type
:
1364 if service_name
is not None and service_name
!= n
:
1366 sm
[n
] = orchestrator
.ServiceDescription(
1368 size
=spec
.placement
.get_host_selection_size(self
.inventory
.all_specs()),
1370 events
=self
.events
.get_for_service(spec
.service_name()),
1372 if service_type
== 'nfs':
1373 spec
= cast(NFSServiceSpec
, spec
)
1374 sm
[n
].rados_config_location
= spec
.rados_config_location()
1375 return list(sm
.values())
1378 def list_daemons(self
,
1379 service_name
: Optional
[str] = None,
1380 daemon_type
: Optional
[str] = None,
1381 daemon_id
: Optional
[str] = None,
1382 host
: Optional
[str] = None,
1383 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
1385 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
1386 self
.log
.info('Kicked serve() loop to refresh all daemons')
1389 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1390 if host
and h
!= host
:
1392 for name
, dd
in dm
.items():
1393 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1395 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1397 if service_name
is not None and service_name
!= dd
.service_name():
1403 def service_action(self
, action
, service_name
) -> List
[str]:
1405 for host
, dm
in self
.cache
.daemons
.items():
1406 for name
, d
in dm
.items():
1407 if d
.matches_service(service_name
):
1408 args
.append((d
.daemon_type
, d
.daemon_id
,
1409 d
.hostname
, action
))
1410 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1411 return self
._daemon
_actions
(args
)
1414 def _daemon_actions(self
, daemon_type
, daemon_id
, host
, action
) -> str:
1415 with
set_exception_subject('daemon', DaemonDescription(
1416 daemon_type
=daemon_type
,
1419 return self
._daemon
_action
(daemon_type
, daemon_id
, host
, action
)
1421 def _daemon_action(self
, daemon_type
, daemon_id
, host
, action
, image
=None) -> str:
1422 daemon_spec
: CephadmDaemonSpec
= CephadmDaemonSpec(
1424 daemon_id
=daemon_id
,
1425 daemon_type
=daemon_type
,
1428 self
._daemon
_action
_set
_image
(action
, image
, daemon_type
, daemon_id
)
1430 if action
== 'redeploy':
1431 if self
.daemon_is_self(daemon_type
, daemon_id
):
1432 self
.mgr_service
.fail_over()
1433 return '' # unreachable
1434 # stop, recreate the container+unit, then restart
1435 return self
._create
_daemon
(daemon_spec
)
1436 elif action
== 'reconfig':
1437 return self
._create
_daemon
(daemon_spec
, reconfig
=True)
1440 'start': ['reset-failed', 'start'],
1442 'restart': ['reset-failed', 'restart'],
1444 name
= daemon_spec
.name()
1445 for a
in actions
[action
]:
1447 out
, err
, code
= self
._run
_cephadm
(
1449 ['--name', name
, a
])
1451 self
.log
.exception(f
'`{host}: cephadm unit {name} {a}` failed')
1452 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1453 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
1454 self
.events
.for_daemon(name
, 'INFO', msg
)
1457 def _daemon_action_set_image(self
, action
: str, image
: Optional
[str], daemon_type
: str, daemon_id
: str):
1458 if image
is not None:
1459 if action
!= 'redeploy':
1460 raise OrchestratorError(
1461 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1462 if daemon_type
not in CEPH_TYPES
:
1463 raise OrchestratorError(
1464 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1465 f
'types are: {", ".join(CEPH_TYPES)}')
1467 self
.check_mon_command({
1468 'prefix': 'config set',
1469 'name': 'container_image',
1471 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
1475 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> str:
1476 d
= self
.cache
.get_daemon(daemon_name
)
1478 if action
== 'redeploy' and self
.daemon_is_self(d
.daemon_type
, d
.daemon_id
) \
1479 and not self
.mgr_service
.mgr_map_has_standby():
1480 raise OrchestratorError(
1481 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1483 self
._daemon
_action
_set
_image
(action
, image
, d
.daemon_type
, d
.daemon_id
)
1485 self
.log
.info(f
'Schedule {action} daemon {daemon_name}')
1486 return self
._schedule
_daemon
_action
(daemon_name
, action
)
1488 def daemon_is_self(self
, daemon_type
: str, daemon_id
: str) -> bool:
1489 return daemon_type
== 'mgr' and daemon_id
== self
.get_mgr_id()
1491 def _schedule_daemon_action(self
, daemon_name
: str, action
: str):
1492 dd
= self
.cache
.get_daemon(daemon_name
)
1493 if action
== 'redeploy' and self
.daemon_is_self(dd
.daemon_type
, dd
.daemon_id
) \
1494 and not self
.mgr_service
.mgr_map_has_standby():
1495 raise OrchestratorError(
1496 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1497 self
.cache
.schedule_daemon_action(dd
.hostname
, dd
.name(), action
)
1498 msg
= "Scheduled to {} {} on host '{}'".format(action
, daemon_name
, dd
.hostname
)
1499 self
._kick
_serve
_loop
()
1503 def remove_daemons(self
, names
):
1504 # type: (List[str]) -> List[str]
1506 for host
, dm
in self
.cache
.daemons
.items():
1509 args
.append((name
, host
))
1511 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
1512 self
.log
.info('Remove daemons %s' % [a
[0] for a
in args
])
1513 return self
._remove
_daemons
(args
)
1516 def remove_service(self
, service_name
) -> str:
1517 self
.log
.info('Remove service %s' % service_name
)
1518 self
._trigger
_preview
_refresh
(service_name
=service_name
)
1519 found
= self
.spec_store
.rm(service_name
)
1521 self
._kick
_serve
_loop
()
1522 return 'Removed service %s' % service_name
1524 # must be idempotent: still a success.
1525 return f
'Failed to remove service. <{service_name}> was not found.'
1528 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
=False) -> List
[orchestrator
.InventoryHost
]:
1530 Return the storage inventory of hosts matching the given filter.
1532 :param host_filter: host filter
1535 - add filtering by label
1538 if host_filter
and host_filter
.hosts
:
1539 for h
in host_filter
.hosts
:
1540 self
.cache
.invalidate_host_devices(h
)
1542 for h
in self
.cache
.get_hosts():
1543 self
.cache
.invalidate_host_devices(h
)
1546 self
.log
.info('Kicked serve() loop to refresh devices')
1549 for host
, dls
in self
.cache
.devices
.items():
1550 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
1552 result
.append(orchestrator
.InventoryHost(host
,
1553 inventory
.Devices(dls
)))
1557 def zap_device(self
, host
, path
) -> str:
1558 self
.log
.info('Zap device %s:%s' % (host
, path
))
1559 out
, err
, code
= self
._run
_cephadm
(
1560 host
, 'osd', 'ceph-volume',
1561 ['--', 'lvm', 'zap', '--destroy', path
],
1563 self
.cache
.invalidate_host_devices(host
)
1565 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
1566 return '\n'.join(out
+ err
)
1569 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
1571 Blink a device light. Calling something like::
1573 lsmcli local-disk-ident-led-on --path $path
1575 If you must, you can customize this via::
1577 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1578 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1580 See templates/blink_device_light_cmd.j2
1583 def blink(host
, dev
, path
):
1584 cmd_line
= self
.template
.render('blink_device_light_cmd.j2',
1587 'ident_fault': ident_fault
,
1592 cmd_args
= shlex
.split(cmd_line
)
1594 out
, err
, code
= self
._run
_cephadm
(
1595 host
, 'osd', 'shell', ['--'] + cmd_args
,
1598 raise OrchestratorError(
1599 'Unable to affect %s light for %s:%s. Command: %s' % (
1600 ident_fault
, host
, dev
, ' '.join(cmd_args
)))
1601 self
.log
.info('Set %s light for %s:%s %s' % (
1602 ident_fault
, host
, dev
, 'on' if on
else 'off'))
1603 return "Set %s light for %s:%s %s" % (
1604 ident_fault
, host
, dev
, 'on' if on
else 'off')
1608 def get_osd_uuid_map(self
, only_up
=False):
1609 # type: (bool) -> Dict[str, str]
1610 osd_map
= self
.get('osd_map')
1612 for o
in osd_map
['osds']:
1613 # only include OSDs that have ever started in this map. this way
1614 # an interrupted osd create can be repeated and succeed the second
1616 osd_id
= o
.get('osd')
1618 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1619 if not only_up
or (o
['up_from'] > 0):
1620 r
[str(osd_id
)] = o
.get('uuid', '')
1623 def _trigger_preview_refresh(self
,
1624 specs
: Optional
[List
[DriveGroupSpec
]] = None,
1625 service_name
: Optional
[str] = None,
1627 # Only trigger a refresh when a spec has changed
1631 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
1632 # the to-be-preview spec != the actual spec, this means we need to
1633 # trigger a refresh, if the spec has been removed (==None) we need to
1635 if not preview_spec
or spec
!= preview_spec
:
1636 trigger_specs
.append(spec
)
1638 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
1639 if not any(trigger_specs
):
1642 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
1643 for host
in refresh_hosts
:
1644 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
1645 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
1648 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
1650 Deprecated. Please use `apply()` instead.
1652 Keeping this around to be compapatible to mgr/dashboard
1654 return [self
._apply
(spec
) for spec
in specs
]
1657 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
1658 return self
.osd_service
.create_from_spec(drive_group
)
1660 def _preview_osdspecs(self
,
1661 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
1664 return {'n/a': [{'error': True,
1665 'message': 'No OSDSpec or matching hosts found.'}]}
1666 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
1667 if not matching_hosts
:
1668 return {'n/a': [{'error': True,
1669 'message': 'No OSDSpec or matching hosts found.'}]}
1670 # Is any host still loading previews
1671 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
1673 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1674 return {'n/a': [{'error': True,
1675 'message': 'Preview data is being generated.. '
1676 'Please re-run this command in a bit.'}]}
1677 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1678 previews_for_specs
= {}
1679 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
1680 if host
not in matching_hosts
:
1683 for osd_report
in raw_reports
:
1684 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
1685 osd_reports
.append(osd_report
)
1686 previews_for_specs
.update({host
: osd_reports
})
1687 return previews_for_specs
1689 def _calc_daemon_deps(self
, daemon_type
, daemon_id
):
1691 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1692 'grafana': ['prometheus'],
1693 'alertmanager': ['mgr', 'alertmanager'],
1696 for dep_type
in need
.get(daemon_type
, []):
1697 for dd
in self
.cache
.get_daemons_by_service(dep_type
):
1698 deps
.append(dd
.name())
1701 def _create_daemon(self
,
1702 daemon_spec
: CephadmDaemonSpec
,
1704 osd_uuid_map
: Optional
[Dict
[str, Any
]] = None,
1707 with
set_exception_subject('service', orchestrator
.DaemonDescription(
1708 daemon_type
=daemon_spec
.daemon_type
,
1709 daemon_id
=daemon_spec
.daemon_id
,
1710 hostname
=daemon_spec
.host
,
1711 ).service_id(), overwrite
=True):
1714 start_time
= datetime
.datetime
.utcnow()
1715 ports
: List
[int] = daemon_spec
.ports
if daemon_spec
.ports
else []
1717 if daemon_spec
.daemon_type
== 'container':
1718 spec
: Optional
[CustomContainerSpec
] = daemon_spec
.spec
1720 # Exit here immediately because the required service
1721 # spec to create a daemon is not provided. This is only
1722 # provided when a service is applied via 'orch apply'
1724 msg
= "Failed to {} daemon {} on {}: Required " \
1725 "service specification not provided".format(
1726 'reconfigure' if reconfig
else 'deploy',
1727 daemon_spec
.name(), daemon_spec
.host
)
1732 ports
.extend(spec
.ports
)
1734 cephadm_config
, deps
= self
.cephadm_services
[daemon_spec
.daemon_type
].generate_config(
1737 # TCP port to open in the host firewall
1739 daemon_spec
.extra_args
.extend([
1740 '--tcp-ports', ' '.join(map(str, ports
))
1743 # osd deployments needs an --osd-uuid arg
1744 if daemon_spec
.daemon_type
== 'osd':
1745 if not osd_uuid_map
:
1746 osd_uuid_map
= self
.get_osd_uuid_map()
1747 osd_uuid
= osd_uuid_map
.get(daemon_spec
.daemon_id
)
1749 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec
.daemon_id
)
1750 daemon_spec
.extra_args
.extend(['--osd-fsid', osd_uuid
])
1753 daemon_spec
.extra_args
.append('--reconfig')
1754 if self
.allow_ptrace
:
1755 daemon_spec
.extra_args
.append('--allow-ptrace')
1757 if self
.cache
.host_needs_registry_login(daemon_spec
.host
) and self
.registry_url
:
1758 self
._registry
_login
(daemon_spec
.host
, self
.registry_url
,
1759 self
.registry_username
, self
.registry_password
)
1761 daemon_spec
.extra_args
.extend(['--config-json', '-'])
1763 self
.log
.info('%s daemon %s on %s' % (
1764 'Reconfiguring' if reconfig
else 'Deploying',
1765 daemon_spec
.name(), daemon_spec
.host
))
1767 out
, err
, code
= self
._run
_cephadm
(
1768 daemon_spec
.host
, daemon_spec
.name(), 'deploy',
1770 '--name', daemon_spec
.name(),
1771 ] + daemon_spec
.extra_args
,
1772 stdin
=json
.dumps(cephadm_config
),
1774 if not code
and daemon_spec
.host
in self
.cache
.daemons
:
1775 # prime cached service state with what we (should have)
1777 sd
= orchestrator
.DaemonDescription()
1778 sd
.daemon_type
= daemon_spec
.daemon_type
1779 sd
.daemon_id
= daemon_spec
.daemon_id
1780 sd
.hostname
= daemon_spec
.host
1782 sd
.status_desc
= 'starting'
1783 self
.cache
.add_daemon(daemon_spec
.host
, sd
)
1784 if daemon_spec
.daemon_type
in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1785 self
.requires_post_actions
.add(daemon_spec
.daemon_type
)
1786 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1787 self
.cache
.update_daemon_config_deps(
1788 daemon_spec
.host
, daemon_spec
.name(), deps
, start_time
)
1789 self
.cache
.save_host(daemon_spec
.host
)
1790 msg
= "{} {} on host '{}'".format(
1791 'Reconfigured' if reconfig
else 'Deployed', daemon_spec
.name(), daemon_spec
.host
)
1793 self
.events
.for_daemon(daemon_spec
.name(), OrchestratorEvent
.INFO
, msg
)
1795 what
= 'reconfigure' if reconfig
else 'deploy'
1796 self
.events
.for_daemon(
1797 daemon_spec
.name(), OrchestratorEvent
.ERROR
, f
'Failed to {what}: {err}')
1801 def _remove_daemons(self
, name
, host
) -> str:
1802 return self
._remove
_daemon
(name
, host
)
1804 def _remove_daemon(self
, name
, host
) -> str:
1808 (daemon_type
, daemon_id
) = name
.split('.', 1)
1809 daemon
= orchestrator
.DaemonDescription(
1810 daemon_type
=daemon_type
,
1811 daemon_id
=daemon_id
,
1814 with
set_exception_subject('service', daemon
.service_id(), overwrite
=True):
1816 self
.cephadm_services
[daemon_type
].pre_remove(daemon
)
1818 args
= ['--name', name
, '--force']
1819 self
.log
.info('Removing daemon %s from %s' % (name
, host
))
1820 out
, err
, code
= self
._run
_cephadm
(
1821 host
, name
, 'rm-daemon', args
)
1823 # remove item from cache
1824 self
.cache
.rm_daemon(host
, name
)
1825 self
.cache
.invalidate_host_daemons(host
)
1827 self
.cephadm_services
[daemon_type
].post_remove(daemon
)
1829 return "Removed {} from host '{}'".format(name
, host
)
1831 def _check_pool_exists(self
, pool
, service_name
):
1832 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
1833 if not self
.rados
.pool_exists(pool
):
1834 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
1835 f
'service {service_name}')
1837 def _add_daemon(self
, daemon_type
, spec
,
1838 create_func
: Callable
[..., CephadmDaemonSpec
], config_func
=None) -> List
[str]:
1840 Add (and place) a daemon. Require explicit host placement. Do not
1841 schedule, and do not apply the related scheduling limitations.
1843 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
1844 if not spec
.placement
.hosts
:
1845 raise OrchestratorError('must specify host(s) to deploy on')
1846 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
1847 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
1848 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
1849 spec
.placement
.hosts
, count
,
1850 create_func
, config_func
)
1852 def _create_daemons(self
, daemon_type
, spec
, daemons
,
1854 create_func
: Callable
[..., CephadmDaemonSpec
], config_func
=None) -> List
[str]:
1855 if count
> len(hosts
):
1856 raise OrchestratorError('too few hosts: want %d, have %s' % (
1861 args
= [] # type: List[CephadmDaemonSpec]
1862 for host
, network
, name
in hosts
:
1863 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
1864 prefix
=spec
.service_id
,
1867 if not did_config
and config_func
:
1868 if daemon_type
== 'rgw':
1869 config_func(spec
, daemon_id
)
1874 daemon_spec
= self
.cephadm_services
[daemon_type
].make_daemon_spec(
1875 host
, daemon_id
, network
, spec
)
1876 self
.log
.debug('Placing %s.%s on host %s' % (
1877 daemon_type
, daemon_id
, host
))
1878 args
.append(daemon_spec
)
1880 # add to daemon list so next name(s) will also be unique
1881 sd
= orchestrator
.DaemonDescription(
1883 daemon_type
=daemon_type
,
1884 daemon_id
=daemon_id
,
1889 def create_func_map(*args
):
1890 daemon_spec
= create_func(*args
)
1891 return self
._create
_daemon
(daemon_spec
)
1893 return create_func_map(args
)
1896 def apply_mon(self
, spec
) -> str:
1897 return self
._apply
(spec
)
1900 def add_mon(self
, spec
):
1901 # type: (ServiceSpec) -> List[str]
1902 return self
._add
_daemon
('mon', spec
, self
.mon_service
.prepare_create
)
1905 def add_mgr(self
, spec
):
1906 # type: (ServiceSpec) -> List[str]
1907 return self
._add
_daemon
('mgr', spec
, self
.mgr_service
.prepare_create
)
1909 def _apply(self
, spec
: GenericSpec
) -> str:
1910 if spec
.service_type
== 'host':
1911 return self
._add
_host
(cast(HostSpec
, spec
))
1913 if spec
.service_type
== 'osd':
1914 # _trigger preview refresh needs to be smart and
1915 # should only refresh if a change has been detected
1916 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
1918 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
1920 def _plan(self
, spec
: ServiceSpec
):
1921 if spec
.service_type
== 'osd':
1922 return {'service_name': spec
.service_name(),
1923 'service_type': spec
.service_type
,
1924 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
1926 ha
= HostAssignment(
1928 hosts
=self
._hosts
_with
_daemon
_inventory
(),
1929 get_daemons_func
=self
.cache
.get_daemons_by_service
,
1934 add_daemon_hosts
= ha
.add_daemon_hosts(hosts
)
1935 remove_daemon_hosts
= ha
.remove_daemon_hosts(hosts
)
1938 'service_name': spec
.service_name(),
1939 'service_type': spec
.service_type
,
1940 'add': [hs
.hostname
for hs
in add_daemon_hosts
],
1941 'remove': [d
.hostname
for d
in remove_daemon_hosts
]
1945 def plan(self
, specs
: List
[GenericSpec
]) -> List
:
1946 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1947 'to the current inventory setup. If any on these conditions changes, the \n'
1948 'preview will be invalid. Please make sure to have a minimal \n'
1949 'timeframe between planning and applying the specs.'}]
1950 if any([spec
.service_type
== 'host' for spec
in specs
]):
1951 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1953 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
1956 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
1957 if spec
.placement
.is_empty():
1958 # fill in default placement
1960 'mon': PlacementSpec(count
=5),
1961 'mgr': PlacementSpec(count
=2),
1962 'mds': PlacementSpec(count
=2),
1963 'rgw': PlacementSpec(count
=2),
1964 'iscsi': PlacementSpec(count
=1),
1965 'rbd-mirror': PlacementSpec(count
=2),
1966 'nfs': PlacementSpec(count
=1),
1967 'grafana': PlacementSpec(count
=1),
1968 'alertmanager': PlacementSpec(count
=1),
1969 'prometheus': PlacementSpec(count
=1),
1970 'node-exporter': PlacementSpec(host_pattern
='*'),
1971 'crash': PlacementSpec(host_pattern
='*'),
1972 'container': PlacementSpec(count
=1),
1974 spec
.placement
= defaults
[spec
.service_type
]
1975 elif spec
.service_type
in ['mon', 'mgr'] and \
1976 spec
.placement
.count
is not None and \
1977 spec
.placement
.count
< 1:
1978 raise OrchestratorError('cannot scale %s service below 1' % (
1983 hosts
=self
.inventory
.all_specs(), # All hosts, even those without daemon refresh
1984 get_daemons_func
=self
.cache
.get_daemons_by_service
,
1987 self
.log
.info('Saving service %s spec with placement %s' % (
1988 spec
.service_name(), spec
.placement
.pretty_str()))
1989 self
.spec_store
.save(spec
)
1990 self
._kick
_serve
_loop
()
1991 return "Scheduled %s update..." % spec
.service_name()
1994 def apply(self
, specs
: List
[GenericSpec
]) -> List
[str]:
1997 results
.append(self
._apply
(spec
))
2001 def apply_mgr(self
, spec
) -> str:
2002 return self
._apply
(spec
)
2005 def add_mds(self
, spec
: ServiceSpec
) -> List
[str]:
2006 return self
._add
_daemon
('mds', spec
, self
.mds_service
.prepare_create
, self
.mds_service
.config
)
2009 def apply_mds(self
, spec
: ServiceSpec
) -> str:
2010 return self
._apply
(spec
)
2013 def add_rgw(self
, spec
) -> List
[str]:
2014 return self
._add
_daemon
('rgw', spec
, self
.rgw_service
.prepare_create
, self
.rgw_service
.config
)
2017 def apply_rgw(self
, spec
) -> str:
2018 return self
._apply
(spec
)
2021 def add_iscsi(self
, spec
):
2022 # type: (ServiceSpec) -> List[str]
2023 return self
._add
_daemon
('iscsi', spec
, self
.iscsi_service
.prepare_create
, self
.iscsi_service
.config
)
2026 def apply_iscsi(self
, spec
) -> str:
2027 return self
._apply
(spec
)
2030 def add_rbd_mirror(self
, spec
) -> List
[str]:
2031 return self
._add
_daemon
('rbd-mirror', spec
, self
.rbd_mirror_service
.prepare_create
)
2034 def apply_rbd_mirror(self
, spec
) -> str:
2035 return self
._apply
(spec
)
2038 def add_nfs(self
, spec
) -> List
[str]:
2039 return self
._add
_daemon
('nfs', spec
, self
.nfs_service
.prepare_create
, self
.nfs_service
.config
)
2042 def apply_nfs(self
, spec
) -> str:
2043 return self
._apply
(spec
)
2045 def _get_dashboard_url(self
):
2047 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
2050 def add_prometheus(self
, spec
) -> List
[str]:
2051 return self
._add
_daemon
('prometheus', spec
, self
.prometheus_service
.prepare_create
)
2054 def apply_prometheus(self
, spec
) -> str:
2055 return self
._apply
(spec
)
2058 def add_node_exporter(self
, spec
):
2059 # type: (ServiceSpec) -> List[str]
2060 return self
._add
_daemon
('node-exporter', spec
,
2061 self
.node_exporter_service
.prepare_create
)
2064 def apply_node_exporter(self
, spec
) -> str:
2065 return self
._apply
(spec
)
2068 def add_crash(self
, spec
):
2069 # type: (ServiceSpec) -> List[str]
2070 return self
._add
_daemon
('crash', spec
,
2071 self
.crash_service
.prepare_create
)
2074 def apply_crash(self
, spec
) -> str:
2075 return self
._apply
(spec
)
2078 def add_grafana(self
, spec
):
2079 # type: (ServiceSpec) -> List[str]
2080 return self
._add
_daemon
('grafana', spec
, self
.grafana_service
.prepare_create
)
2083 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
2084 return self
._apply
(spec
)
2087 def add_alertmanager(self
, spec
):
2088 # type: (ServiceSpec) -> List[str]
2089 return self
._add
_daemon
('alertmanager', spec
, self
.alertmanager_service
.prepare_create
)
2092 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
2093 return self
._apply
(spec
)
2096 def add_container(self
, spec
: ServiceSpec
) -> List
[str]:
2097 return self
._add
_daemon
('container', spec
,
2098 self
.container_service
.prepare_create
)
2101 def apply_container(self
, spec
: ServiceSpec
) -> str:
2102 return self
._apply
(spec
)
2104 def _get_container_image_info(self
, image_name
) -> ContainerInspectInfo
:
2105 # pick a random host...
2107 for host_name
in self
.inventory
.keys():
2111 raise OrchestratorError('no hosts defined')
2112 if self
.cache
.host_needs_registry_login(host
) and self
.registry_url
:
2113 self
._registry
_login
(host
, self
.registry_url
,
2114 self
.registry_username
, self
.registry_password
)
2115 out
, err
, code
= self
._run
_cephadm
(
2116 host
, '', 'pull', [],
2121 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2122 image_name
, host
, '\n'.join(out
)))
2124 j
= json
.loads('\n'.join(out
))
2125 r
= ContainerInspectInfo(
2127 j
.get('ceph_version'),
2128 j
.get('repo_digest')
2130 self
.log
.debug(f
'image {image_name} -> {r}')
2132 except (ValueError, KeyError) as _
:
2133 msg
= 'Failed to pull %s on %s: %s' % (image_name
, host
, '\n'.join(out
))
2134 self
.log
.exception(msg
)
2135 raise OrchestratorError(msg
)
2138 def upgrade_check(self
, image
, version
) -> str:
2140 target_name
= self
.container_image_base
+ ':v' + version
2144 raise OrchestratorError('must specify either image or version')
2146 image_info
= self
._get
_container
_image
_info
(target_name
)
2147 self
.log
.debug(f
'image info {image} -> {image_info}')
2149 'target_name': target_name
,
2150 'target_id': image_info
.image_id
,
2151 'target_version': image_info
.ceph_version
,
2152 'needs_update': dict(),
2153 'up_to_date': list(),
2155 for host
, dm
in self
.cache
.daemons
.items():
2156 for name
, dd
in dm
.items():
2157 if image_info
.image_id
== dd
.container_image_id
:
2158 r
['up_to_date'].append(dd
.name())
2160 r
['needs_update'][dd
.name()] = {
2161 'current_name': dd
.container_image_name
,
2162 'current_id': dd
.container_image_id
,
2163 'current_version': dd
.version
,
2165 if self
.use_repo_digest
:
2166 r
['target_digest'] = image_info
.repo_digest
2168 return json
.dumps(r
, indent
=4, sort_keys
=True)
2171 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
2172 return self
.upgrade
.upgrade_status()
2175 def upgrade_start(self
, image
, version
) -> str:
2176 return self
.upgrade
.upgrade_start(image
, version
)
2179 def upgrade_pause(self
) -> str:
2180 return self
.upgrade
.upgrade_pause()
2183 def upgrade_resume(self
) -> str:
2184 return self
.upgrade
.upgrade_resume()
2187 def upgrade_stop(self
) -> str:
2188 return self
.upgrade
.upgrade_stop()
2191 def remove_osds(self
, osd_ids
: List
[str],
2192 replace
: bool = False,
2193 force
: bool = False) -> str:
2195 Takes a list of OSDs and schedules them for removal.
2196 The function that takes care of the actual removal is
2197 process_removal_queue().
2200 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
2201 to_remove_daemons
= list()
2202 for daemon
in daemons
:
2203 if daemon
.daemon_id
in osd_ids
:
2204 to_remove_daemons
.append(daemon
)
2205 if not to_remove_daemons
:
2206 return f
"Unable to find OSDs: {osd_ids}"
2208 for daemon
in to_remove_daemons
:
2210 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
2213 hostname
=daemon
.hostname
,
2214 fullname
=daemon
.name(),
2215 process_started_at
=datetime
.datetime
.utcnow(),
2216 remove_util
=self
.rm_util
))
2217 except NotFoundError
:
2218 return f
"Unable to find OSDs: {osd_ids}"
2220 # trigger the serve loop to initiate the removal
2221 self
._kick
_serve
_loop
()
2222 return "Scheduled OSD(s) for removal"
2225 def stop_remove_osds(self
, osd_ids
: List
[str]):
2227 Stops a `removal` process for a List of OSDs.
2228 This will revert their weight and remove it from the osds_to_remove queue
2230 for osd_id
in osd_ids
:
2232 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
2233 remove_util
=self
.rm_util
))
2234 except (NotFoundError
, KeyError):
2235 return f
'Unable to find OSD in the queue: {osd_id}'
2237 # trigger the serve loop to halt the removal
2238 self
._kick
_serve
_loop
()
2239 return "Stopped OSD(s) removal"
2242 def remove_osds_status(self
):
2244 The CLI call to retrieve an osd removal report
2246 return self
.to_remove_osds
.all_osds()