8 from collections
import defaultdict
9 from configparser
import ConfigParser
10 from contextlib
import contextmanager
11 from functools
import wraps
12 from tempfile
import TemporaryDirectory
, NamedTemporaryFile
13 from threading
import Event
15 from cephadm
.service_discovery
import ServiceDiscovery
18 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
19 Any
, Set
, TYPE_CHECKING
, cast
, NamedTuple
, Sequence
, Type
, \
25 import multiprocessing
.pool
27 from prettytable
import PrettyTable
29 from ceph
.deployment
import inventory
30 from ceph
.deployment
.drive_group
import DriveGroupSpec
31 from ceph
.deployment
.service_spec
import \
32 ServiceSpec
, PlacementSpec
, \
33 HostPlacementSpec
, IngressSpec
, \
34 TunedProfileSpec
, IscsiServiceSpec
35 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
36 from cephadm
.serve
import CephadmServe
37 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
38 from cephadm
.http_server
import CephadmHttpServer
39 from cephadm
.agent
import CephadmAgentHelpers
42 from mgr_module
import MgrModule
, HandleCommandResult
, Option
, NotifyType
44 from orchestrator
.module
import to_format
, Format
46 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
47 CLICommandMeta
, DaemonDescription
, DaemonDescriptionStatus
, handle_orch_error
, \
48 service_to_daemon_types
49 from orchestrator
._interface
import GenericSpec
50 from orchestrator
._interface
import daemon_type_to_service
54 from .migrations
import Migrations
55 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
56 RbdMirrorService
, CrashService
, CephadmService
, CephfsMirrorService
, CephadmAgent
, \
58 from .services
.ingress
import IngressService
59 from .services
.container
import CustomContainerService
60 from .services
.iscsi
import IscsiService
61 from .services
.nfs
import NFSService
62 from .services
.osd
import OSDRemovalQueue
, OSDService
, OSD
, NotFoundError
63 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
64 NodeExporterService
, SNMPGatewayService
, LokiService
, PromtailService
65 from .services
.jaeger
import ElasticSearchService
, JaegerAgentService
, JaegerCollectorService
, JaegerQueryService
66 from .schedule
import HostAssignment
67 from .inventory
import Inventory
, SpecStore
, HostCache
, AgentCache
, EventStore
, \
68 ClientKeyringStore
, ClientKeyringSpec
, TunedProfileStore
69 from .upgrade
import CephadmUpgrade
70 from .template
import TemplateMgr
71 from .utils
import CEPH_IMAGE_TYPES
, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
, forall_hosts
, \
72 cephadmNoImage
, CEPH_UPGRADE_ORDER
73 from .configchecks
import CephadmConfigChecks
74 from .offline_watcher
import OfflineHostWatcher
75 from .tuned_profiles
import TunedProfileUtils
79 except ImportError as e
:
80 asyncssh
= None # type: ignore
81 asyncssh_import_error
= str(e
)
83 logger
= logging
.getLogger(__name__
)
87 DEFAULT_SSH_CONFIG
= """
90 StrictHostKeyChecking no
91 UserKnownHostsFile /dev/null
95 # cherrypy likes to sys.exit on error. don't let it take us down too!
98 def os_exit_noop(status
: int) -> None:
102 os
._exit
= os_exit_noop
# type: ignore
105 # Default container images -----------------------------------------------------
106 DEFAULT_IMAGE
= 'quay.io/ceph/ceph:v18'
107 DEFAULT_PROMETHEUS_IMAGE
= 'quay.io/prometheus/prometheus:v2.43.0'
108 DEFAULT_NODE_EXPORTER_IMAGE
= 'quay.io/prometheus/node-exporter:v1.5.0'
109 DEFAULT_LOKI_IMAGE
= 'docker.io/grafana/loki:2.4.0'
110 DEFAULT_PROMTAIL_IMAGE
= 'docker.io/grafana/promtail:2.4.0'
111 DEFAULT_ALERT_MANAGER_IMAGE
= 'quay.io/prometheus/alertmanager:v0.25.0'
112 DEFAULT_GRAFANA_IMAGE
= 'quay.io/ceph/ceph-grafana:9.4.7'
113 DEFAULT_HAPROXY_IMAGE
= 'quay.io/ceph/haproxy:2.3'
114 DEFAULT_KEEPALIVED_IMAGE
= 'quay.io/ceph/keepalived:2.2.4'
115 DEFAULT_SNMP_GATEWAY_IMAGE
= 'docker.io/maxwo/snmp-notifier:v1.2.1'
116 DEFAULT_ELASTICSEARCH_IMAGE
= 'quay.io/omrizeneva/elasticsearch:6.8.23'
117 DEFAULT_JAEGER_COLLECTOR_IMAGE
= 'quay.io/jaegertracing/jaeger-collector:1.29'
118 DEFAULT_JAEGER_AGENT_IMAGE
= 'quay.io/jaegertracing/jaeger-agent:1.29'
119 DEFAULT_JAEGER_QUERY_IMAGE
= 'quay.io/jaegertracing/jaeger-query:1.29'
120 # ------------------------------------------------------------------------------
123 def host_exists(hostname_position
: int = 1) -> Callable
:
124 """Check that a hostname exists in the inventory"""
125 def inner(func
: Callable
) -> Callable
:
127 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
128 this
= args
[0] # self object
129 hostname
= args
[hostname_position
]
130 if hostname
not in this
.cache
.get_hosts():
131 candidates
= ','.join([h
for h
in this
.cache
.get_hosts() if h
.startswith(hostname
)])
132 help_msg
= f
"Did you mean {candidates}?" if candidates
else ""
133 raise OrchestratorError(
134 f
"Cannot find host '{hostname}' in the inventory. {help_msg}")
136 return func(*args
, **kwargs
)
141 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
,
142 metaclass
=CLICommandMeta
):
144 _STORE_HOST_PREFIX
= "host"
147 NOTIFY_TYPES
= [NotifyType
.mon_map
, NotifyType
.pg_summary
]
148 NATIVE_OPTIONS
= [] # type: List[Any]
154 desc
='customized SSH config file to connect to managed hosts',
157 'device_cache_timeout',
160 desc
='seconds to cache device inventory',
163 'device_enhanced_scan',
166 desc
='Use libstoragemgmt during device scans',
169 'daemon_cache_timeout',
172 desc
='seconds to cache service (daemon) inventory',
175 'facts_cache_timeout',
178 desc
='seconds to cache host facts data',
181 'host_check_interval',
184 desc
='how frequently to perform a host check',
189 enum_allowed
=['root', 'cephadm-package'],
191 desc
='mode for remote execution of cephadm',
194 'container_image_base',
195 default
=DEFAULT_IMAGE
,
196 desc
='Container image name, without the tag',
200 'container_image_prometheus',
201 default
=DEFAULT_PROMETHEUS_IMAGE
,
202 desc
='Prometheus container image',
205 'container_image_grafana',
206 default
=DEFAULT_GRAFANA_IMAGE
,
207 desc
='Prometheus container image',
210 'container_image_alertmanager',
211 default
=DEFAULT_ALERT_MANAGER_IMAGE
,
212 desc
='Prometheus container image',
215 'container_image_node_exporter',
216 default
=DEFAULT_NODE_EXPORTER_IMAGE
,
217 desc
='Prometheus container image',
220 'container_image_loki',
221 default
=DEFAULT_LOKI_IMAGE
,
222 desc
='Loki container image',
225 'container_image_promtail',
226 default
=DEFAULT_PROMTAIL_IMAGE
,
227 desc
='Promtail container image',
230 'container_image_haproxy',
231 default
=DEFAULT_HAPROXY_IMAGE
,
232 desc
='HAproxy container image',
235 'container_image_keepalived',
236 default
=DEFAULT_KEEPALIVED_IMAGE
,
237 desc
='Keepalived container image',
240 'container_image_snmp_gateway',
241 default
=DEFAULT_SNMP_GATEWAY_IMAGE
,
242 desc
='SNMP Gateway container image',
245 'container_image_elasticsearch',
246 default
=DEFAULT_ELASTICSEARCH_IMAGE
,
247 desc
='elasticsearch container image',
250 'container_image_jaeger_agent',
251 default
=DEFAULT_JAEGER_AGENT_IMAGE
,
252 desc
='Jaeger agent container image',
255 'container_image_jaeger_collector',
256 default
=DEFAULT_JAEGER_COLLECTOR_IMAGE
,
257 desc
='Jaeger collector container image',
260 'container_image_jaeger_query',
261 default
=DEFAULT_JAEGER_QUERY_IMAGE
,
262 desc
='Jaeger query container image',
265 'warn_on_stray_hosts',
268 desc
='raise a health warning if daemons are detected on a host '
269 'that is not managed by cephadm',
272 'warn_on_stray_daemons',
275 desc
='raise a health warning if daemons are detected '
276 'that are not managed by cephadm',
279 'warn_on_failed_host_check',
282 desc
='raise a health warning if the host check fails',
288 desc
='log to the "cephadm" cluster log channel"',
294 desc
='allow SYS_PTRACE capability on ceph containers',
295 long_desc
='The SYS_PTRACE capability is needed to attach to a '
296 'process with gdb or strace. Enabling this options '
297 'can allow debugging daemons that encounter problems '
304 desc
='Run podman/docker with `--init`'
307 'prometheus_alerts_path',
309 default
='/etc/prometheus/ceph/ceph_default_alerts.yml',
310 desc
='location of alerts to include in prometheus deployments',
316 desc
='internal - do not modify',
317 # used to track spec and other data migrations.
323 desc
='manage configs like API endpoints in Dashboard.'
326 'manage_etc_ceph_ceph_conf',
329 desc
='Manage and own /etc/ceph/ceph.conf on the hosts.',
332 'manage_etc_ceph_ceph_conf_hosts',
335 desc
='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
342 desc
='Registry url for login purposes. This is not the default registry'
348 desc
='Custom repository username. Only used for logging into a registry.'
354 desc
='Custom repository password. Only used for logging into a registry.'
361 desc
='Registry is to be considered insecure (no TLS available). Only for development purposes.'
367 desc
='Automatically convert image tags to image digest. Make sure all daemons use the same image',
370 'config_checks_enabled',
373 desc
='Enable or disable the cephadm configuration analysis',
379 desc
='Search-registry to which we should normalize unqualified image names. '
380 'This is not the default registry',
383 'max_count_per_host',
386 desc
='max number of daemons per service per host',
389 'autotune_memory_target_ratio',
392 desc
='ratio of total system memory to divide amongst autotuned daemons'
398 desc
='how frequently to autotune daemon memory'
404 desc
='Use cephadm agent on each host to gather and send metadata'
407 'agent_refresh_rate',
410 desc
='How often agent on each host will try to gather and send metadata'
413 'agent_starting_port',
416 desc
='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
419 'agent_down_multiplier',
422 desc
='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
425 'max_osd_draining_count',
428 desc
='max number of osds that will be drained simultaneously when osds are removed'
431 'service_discovery_port',
434 desc
='cephadm service discovery port'
440 desc
='Pass --cgroups=split when cephadm creates containers (currently podman only)'
443 'log_refresh_metadata',
446 desc
='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
449 'prometheus_web_user',
452 desc
='Prometheus web user'
455 'prometheus_web_password',
458 desc
='Prometheus web password'
461 'alertmanager_web_user',
464 desc
='Alertmanager web user'
467 'alertmanager_web_password',
470 desc
='Alertmanager web password'
473 'secure_monitoring_stack',
476 desc
='Enable TLS security for all the monitoring stack daemons'
479 'default_cephadm_command_timeout',
482 desc
='Default timeout applied to cephadm commands run directly on '
483 'the host (in seconds)'
487 def __init__(self
, *args
: Any
, **kwargs
: Any
):
488 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
489 self
._cluster
_fsid
: str = self
.get('mon_map')['fsid']
490 self
.last_monmap
: Optional
[datetime
.datetime
] = None
496 self
.ssh
= ssh
.SSHManager(self
)
498 if self
.get_store('pause'):
503 # for mypy which does not run the code
505 self
.ssh_config_file
= None # type: Optional[str]
506 self
.device_cache_timeout
= 0
507 self
.daemon_cache_timeout
= 0
508 self
.facts_cache_timeout
= 0
509 self
.host_check_interval
= 0
510 self
.max_count_per_host
= 0
512 self
.container_image_base
= ''
513 self
.container_image_prometheus
= ''
514 self
.container_image_grafana
= ''
515 self
.container_image_alertmanager
= ''
516 self
.container_image_node_exporter
= ''
517 self
.container_image_loki
= ''
518 self
.container_image_promtail
= ''
519 self
.container_image_haproxy
= ''
520 self
.container_image_keepalived
= ''
521 self
.container_image_snmp_gateway
= ''
522 self
.container_image_elasticsearch
= ''
523 self
.container_image_jaeger_agent
= ''
524 self
.container_image_jaeger_collector
= ''
525 self
.container_image_jaeger_query
= ''
526 self
.warn_on_stray_hosts
= True
527 self
.warn_on_stray_daemons
= True
528 self
.warn_on_failed_host_check
= True
529 self
.allow_ptrace
= False
530 self
.container_init
= True
531 self
.prometheus_alerts_path
= ''
532 self
.migration_current
: Optional
[int] = None
533 self
.config_dashboard
= True
534 self
.manage_etc_ceph_ceph_conf
= True
535 self
.manage_etc_ceph_ceph_conf_hosts
= '*'
536 self
.registry_url
: Optional
[str] = None
537 self
.registry_username
: Optional
[str] = None
538 self
.registry_password
: Optional
[str] = None
539 self
.registry_insecure
: bool = False
540 self
.use_repo_digest
= True
541 self
.default_registry
= ''
542 self
.autotune_memory_target_ratio
= 0.0
543 self
.autotune_interval
= 0
544 self
.ssh_user
: Optional
[str] = None
545 self
._ssh
_options
: Optional
[str] = None
546 self
.tkey
= NamedTemporaryFile()
547 self
.ssh_config_fname
: Optional
[str] = None
548 self
.ssh_config
: Optional
[str] = None
549 self
._temp
_files
: List
= []
550 self
.ssh_key
: Optional
[str] = None
551 self
.ssh_pub
: Optional
[str] = None
552 self
.use_agent
= False
553 self
.agent_refresh_rate
= 0
554 self
.agent_down_multiplier
= 0.0
555 self
.agent_starting_port
= 0
556 self
.service_discovery_port
= 0
557 self
.secure_monitoring_stack
= False
558 self
.prometheus_web_password
: Optional
[str] = None
559 self
.prometheus_web_user
: Optional
[str] = None
560 self
.alertmanager_web_password
: Optional
[str] = None
561 self
.alertmanager_web_user
: Optional
[str] = None
562 self
.apply_spec_fails
: List
[Tuple
[str, str]] = []
563 self
.max_osd_draining_count
= 10
564 self
.device_enhanced_scan
= False
565 self
.cgroups_split
= True
566 self
.log_refresh_metadata
= False
567 self
.default_cephadm_command_timeout
= 0
569 self
.notify(NotifyType
.mon_map
, None)
572 path
= self
.get_ceph_option('cephadm_path')
574 assert isinstance(path
, str)
575 with
open(path
, 'rb') as f
:
576 self
._cephadm
= f
.read()
577 except (IOError, TypeError) as e
:
578 raise RuntimeError("unable to read cephadm at '%s': %s" % (
581 self
.cephadm_binary_path
= self
._get
_cephadm
_binary
_path
()
583 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
585 self
.ssh
._reconfig
_ssh
()
587 CephadmOrchestrator
.instance
= self
589 self
.upgrade
= CephadmUpgrade(self
)
591 self
.health_checks
: Dict
[str, dict] = {}
593 self
.inventory
= Inventory(self
)
595 self
.cache
= HostCache(self
)
598 self
.agent_cache
= AgentCache(self
)
599 self
.agent_cache
.load()
601 self
.to_remove_osds
= OSDRemovalQueue(self
)
602 self
.to_remove_osds
.load_from_store()
604 self
.spec_store
= SpecStore(self
)
605 self
.spec_store
.load()
607 self
.keys
= ClientKeyringStore(self
)
610 self
.tuned_profiles
= TunedProfileStore(self
)
611 self
.tuned_profiles
.load()
613 self
.tuned_profile_utils
= TunedProfileUtils(self
)
615 # ensure the host lists are in sync
616 for h
in self
.inventory
.keys():
617 if h
not in self
.cache
.daemons
:
618 self
.cache
.prime_empty_host(h
)
619 for h
in self
.cache
.get_hosts():
620 if h
not in self
.inventory
:
621 self
.cache
.rm_host(h
)
624 self
.events
= EventStore(self
)
625 self
.offline_hosts
: Set
[str] = set()
627 self
.migration
= Migrations(self
)
629 _service_classes
: Sequence
[Type
[CephadmService
]] = [
630 OSDService
, NFSService
, MonService
, MgrService
, MdsService
,
631 RgwService
, RbdMirrorService
, GrafanaService
, AlertmanagerService
,
632 PrometheusService
, NodeExporterService
, LokiService
, PromtailService
, CrashService
, IscsiService
,
633 IngressService
, CustomContainerService
, CephfsMirrorService
,
634 CephadmAgent
, CephExporterService
, SNMPGatewayService
, ElasticSearchService
,
635 JaegerQueryService
, JaegerAgentService
, JaegerCollectorService
638 # https://github.com/python/mypy/issues/8993
639 self
.cephadm_services
: Dict
[str, CephadmService
] = {
640 cls
.TYPE
: cls(self
) for cls
in _service_classes
} # type: ignore
642 self
.mgr_service
: MgrService
= cast(MgrService
, self
.cephadm_services
['mgr'])
643 self
.osd_service
: OSDService
= cast(OSDService
, self
.cephadm_services
['osd'])
644 self
.iscsi_service
: IscsiService
= cast(IscsiService
, self
.cephadm_services
['iscsi'])
646 self
.scheduled_async_actions
: List
[Callable
] = []
648 self
.template
= TemplateMgr(self
)
650 self
.requires_post_actions
: Set
[str] = set()
651 self
.need_connect_dashboard_rgw
= False
653 self
.config_checker
= CephadmConfigChecks(self
)
655 self
.http_server
= CephadmHttpServer(self
)
656 self
.http_server
.start()
657 self
.agent_helpers
= CephadmAgentHelpers(self
)
659 self
.agent_helpers
._apply
_agent
()
661 self
.offline_watcher
= OfflineHostWatcher(self
)
662 self
.offline_watcher
.start()
664 def shutdown(self
) -> None:
665 self
.log
.debug('shutdown')
666 self
._worker
_pool
.close()
667 self
._worker
_pool
.join()
668 self
.http_server
.shutdown()
669 self
.offline_watcher
.shutdown()
673 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
674 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
675 return self
.cephadm_services
[service_type
]
677 def _get_cephadm_binary_path(self
) -> str:
680 m
.update(self
._cephadm
)
681 return f
'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
683 def _kick_serve_loop(self
) -> None:
684 self
.log
.debug('_kick_serve_loop')
687 def serve(self
) -> None:
689 The main loop of cephadm.
691 A command handler will typically change the declarative state
692 of cephadm. This loop will then attempt to apply this new state.
695 self
.event_loop
= ssh
.EventLoopThread()
697 serve
= CephadmServe(self
)
700 def wait_async(self
, coro
: Awaitable
[T
], timeout
: Optional
[int] = None) -> T
:
702 timeout
= self
.default_cephadm_command_timeout
703 # put a lower bound of 60 seconds in case users
704 # accidentally set it to something unreasonable.
705 # For example if they though it was in minutes
706 # rather than seconds
708 self
.log
.info(f
'Found default timeout set to {timeout}. Instead trying minimum of 60.')
710 return self
.event_loop
.get_result(coro
, timeout
)
713 def async_timeout_handler(self
, host
: Optional
[str] = '',
714 cmd
: Optional
[str] = '',
715 timeout
: Optional
[int] = None) -> Iterator
[None]:
716 # this is meant to catch asyncio.TimeoutError and convert it into an
717 # OrchestratorError which much of the cephadm codebase is better equipped to handle.
718 # If the command being run, the host it is run on, or the timeout being used
719 # are provided, that will be included in the OrchestratorError's message
722 except asyncio
.TimeoutError
:
725 err_str
= f
'Command "{cmd}" timed out '
727 err_str
= 'Command timed out '
729 err_str
+= f
'on host {host} '
731 err_str
+= f
'(non-default {timeout} second timeout)'
733 err_str
+= (f
'(default {self.default_cephadm_command_timeout} second timeout)')
734 raise OrchestratorError(err_str
)
736 def set_container_image(self
, entity
: str, image
: str) -> None:
737 self
.check_mon_command({
738 'prefix': 'config set',
739 'name': 'container_image',
744 def config_notify(self
) -> None:
746 This method is called whenever one of our config options is changed.
748 TODO: this method should be moved into mgr_module.py
750 for opt
in self
.MODULE_OPTIONS
:
752 opt
['name'], # type: ignore
753 self
.get_module_option(opt
['name'])) # type: ignore
754 self
.log
.debug(' mgr option %s = %s',
755 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
756 for opt
in self
.NATIVE_OPTIONS
:
759 self
.get_ceph_option(opt
))
760 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
764 def notify(self
, notify_type
: NotifyType
, notify_id
: Optional
[str]) -> None:
765 if notify_type
== NotifyType
.mon_map
:
766 # get monmap mtime so we can refresh configs when mons change
767 monmap
= self
.get('mon_map')
768 self
.last_monmap
= str_to_datetime(monmap
['modified'])
769 if self
.last_monmap
and self
.last_monmap
> datetime_now():
770 self
.last_monmap
= None # just in case clocks are skewed
771 if getattr(self
, 'manage_etc_ceph_ceph_conf', False):
772 # getattr, due to notify() being called before config_notify()
773 self
._kick
_serve
_loop
()
774 if notify_type
== NotifyType
.pg_summary
:
775 self
._trigger
_osd
_removal
()
777 def _trigger_osd_removal(self
) -> None:
778 remove_queue
= self
.to_remove_osds
.as_osd_ids()
781 data
= self
.get("osd_stats")
782 for osd
in data
.get('osd_stats', []):
783 if osd
.get('num_pgs') == 0:
784 # if _ANY_ osd that is currently in the queue appears to be empty,
785 # start the removal process
786 if int(osd
.get('osd')) in remove_queue
:
787 self
.log
.debug('Found empty osd. Starting removal process')
788 # if the osd that is now empty is also part of the removal queue
790 self
._kick
_serve
_loop
()
792 def pause(self
) -> None:
794 self
.log
.info('Paused')
795 self
.set_store('pause', 'true')
797 # wake loop so we update the health status
798 self
._kick
_serve
_loop
()
800 def resume(self
) -> None:
802 self
.log
.info('Resumed')
804 self
.set_store('pause', None)
805 # unconditionally wake loop so that 'orch resume' can be used to kick
807 self
._kick
_serve
_loop
()
813 existing
: List
[orchestrator
.DaemonDescription
],
814 prefix
: Optional
[str] = None,
815 forcename
: Optional
[str] = None,
816 rank
: Optional
[int] = None,
817 rank_generation
: Optional
[int] = None,
820 Generate a unique random service name
822 suffix
= daemon_type
not in [
823 'mon', 'crash', 'ceph-exporter',
824 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
825 'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
826 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
829 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
830 raise orchestrator
.OrchestratorValidationError(
831 f
'name {daemon_type}.{forcename} already in use')
835 host
= host
.split('.')[0]
841 if rank
is not None and rank_generation
is not None:
842 name
+= f
'{rank}.{rank_generation}.'
845 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
847 if len([d
for d
in existing
if d
.daemon_id
== name
]):
849 raise orchestrator
.OrchestratorValidationError(
850 f
'name {daemon_type}.{name} already in use')
851 self
.log
.debug('name %s exists, trying again', name
)
855 def validate_ssh_config_content(self
, ssh_config
: Optional
[str]) -> None:
856 if ssh_config
is None or len(ssh_config
.strip()) == 0:
857 raise OrchestratorValidationError('ssh_config cannot be empty')
858 # StrictHostKeyChecking is [yes|no] ?
859 res
= re
.findall(r
'StrictHostKeyChecking\s+.*', ssh_config
)
861 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
863 if 'ask' in s
.lower():
864 raise OrchestratorValidationError(f
'ssh_config cannot contain: \'{s}\'')
866 def validate_ssh_config_fname(self
, ssh_config_fname
: str) -> None:
867 if not os
.path
.isfile(ssh_config_fname
):
868 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
871 def _process_ls_output(self
, host
: str, ls
: List
[Dict
[str, Any
]]) -> None:
874 if not d
['style'].startswith('cephadm'):
876 if d
['fsid'] != self
._cluster
_fsid
:
878 if '.' not in d
['name']:
880 sd
= orchestrator
.DaemonDescription()
881 sd
.last_refresh
= datetime_now()
882 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
885 setattr(sd
, k
, str_to_datetime(d
[k
]))
886 sd
.daemon_type
= d
['name'].split('.')[0]
887 if sd
.daemon_type
not in orchestrator
.KNOWN_DAEMON_TYPES
:
888 logger
.warning(f
"Found unknown daemon type {sd.daemon_type} on host {host}")
891 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
893 sd
.container_id
= d
.get('container_id')
896 sd
.container_id
= sd
.container_id
[0:12]
897 sd
.container_image_name
= d
.get('container_image_name')
898 sd
.container_image_id
= d
.get('container_image_id')
899 sd
.container_image_digests
= d
.get('container_image_digests')
900 sd
.memory_usage
= d
.get('memory_usage')
901 sd
.memory_request
= d
.get('memory_request')
902 sd
.memory_limit
= d
.get('memory_limit')
903 sd
.cpu_percentage
= d
.get('cpu_percentage')
904 sd
._service
_name
= d
.get('service_name')
905 sd
.deployed_by
= d
.get('deployed_by')
906 sd
.version
= d
.get('version')
907 sd
.ports
= d
.get('ports')
909 sd
.rank
= int(d
['rank']) if d
.get('rank') is not None else None
910 sd
.rank_generation
= int(d
['rank_generation']) if d
.get(
911 'rank_generation') is not None else None
912 sd
.extra_container_args
= d
.get('extra_container_args')
913 sd
.extra_entrypoint_args
= d
.get('extra_entrypoint_args')
915 sd
.status_desc
= d
['state']
917 'running': DaemonDescriptionStatus
.running
,
918 'stopped': DaemonDescriptionStatus
.stopped
,
919 'error': DaemonDescriptionStatus
.error
,
920 'unknown': DaemonDescriptionStatus
.error
,
923 sd
.status_desc
= 'unknown'
926 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
927 self
.cache
.update_host_daemons(host
, dm
)
928 self
.cache
.save_host(host
)
931 def update_watched_hosts(self
) -> None:
932 # currently, we are watching hosts with nfs daemons
933 hosts_to_watch
= [d
.hostname
for d
in self
.cache
.get_daemons(
934 ) if d
.daemon_type
in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
]
935 self
.offline_watcher
.set_hosts(list(set([h
for h
in hosts_to_watch
if h
is not None])))
937 def offline_hosts_remove(self
, host
: str) -> None:
938 if host
in self
.offline_hosts
:
939 self
.offline_hosts
.remove(host
)
941 def update_failed_daemon_health_check(self
) -> None:
943 for dd
in self
.cache
.get_error_daemons():
944 if dd
.daemon_type
!= 'agent': # agents tracked by CEPHADM_AGENT_DOWN
945 failed_daemons
.append('daemon %s on %s is in %s state' % (
946 dd
.name(), dd
.hostname
, dd
.status_desc
948 self
.remove_health_warning('CEPHADM_FAILED_DAEMON')
950 self
.set_health_warning('CEPHADM_FAILED_DAEMON', f
'{len(failed_daemons)} failed cephadm daemon(s)', len(
951 failed_daemons
), failed_daemons
)
954 def can_run() -> Tuple
[bool, str]:
955 if asyncssh
is not None:
958 return False, "loading asyncssh library:{}".format(
959 asyncssh_import_error
)
961 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
963 The cephadm orchestrator is always available.
965 ok
, err
= self
.can_run()
968 if not self
.ssh_key
or not self
.ssh_pub
:
969 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
971 # mypy is unable to determine type for _processes since it's private
972 worker_count
: int = self
._worker
_pool
._processes
# type: ignore
974 "workers": worker_count
,
975 "paused": self
.paused
,
978 return True, err
, ret
980 def _validate_and_set_ssh_val(self
, what
: str, new
: Optional
[str], old
: Optional
[str]) -> None:
981 self
.set_store(what
, new
)
982 self
.ssh
._reconfig
_ssh
()
983 if self
.cache
.get_hosts():
984 # Can't check anything without hosts
985 host
= self
.cache
.get_hosts()[0]
986 r
= CephadmServe(self
)._check
_host
(host
)
988 # connection failed reset user
989 self
.set_store(what
, old
)
990 self
.ssh
._reconfig
_ssh
()
991 raise OrchestratorError('ssh connection %s@%s failed' % (self
.ssh_user
, host
))
992 self
.log
.info(f
'Set ssh {what}')
994 @orchestrator._cli
_write
_command
(
995 prefix
='cephadm set-ssh-config')
996 def _set_ssh_config(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
998 Set the ssh_config file (use -i <ssh_config>)
1000 # Set an ssh_config file provided from stdin
1002 old
= self
.ssh_config
1004 return 0, "value unchanged", ""
1005 self
.validate_ssh_config_content(inbuf
)
1006 self
._validate
_and
_set
_ssh
_val
('ssh_config', inbuf
, old
)
1009 @orchestrator._cli
_write
_command
('cephadm clear-ssh-config')
1010 def _clear_ssh_config(self
) -> Tuple
[int, str, str]:
1012 Clear the ssh_config file
1014 # Clear the ssh_config file provided from stdin
1015 self
.set_store("ssh_config", None)
1016 self
.ssh_config_tmp
= None
1017 self
.log
.info('Cleared ssh_config')
1018 self
.ssh
._reconfig
_ssh
()
1021 @orchestrator._cli
_read
_command
('cephadm get-ssh-config')
1022 def _get_ssh_config(self
) -> HandleCommandResult
:
1024 Returns the ssh config as used by cephadm
1026 if self
.ssh_config_file
:
1027 self
.validate_ssh_config_fname(self
.ssh_config_file
)
1028 with
open(self
.ssh_config_file
) as f
:
1029 return HandleCommandResult(stdout
=f
.read())
1030 ssh_config
= self
.get_store("ssh_config")
1032 return HandleCommandResult(stdout
=ssh_config
)
1033 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
1035 @orchestrator._cli
_write
_command
('cephadm generate-key')
1036 def _generate_key(self
) -> Tuple
[int, str, str]:
1038 Generate a cluster SSH key (if not present)
1040 if not self
.ssh_pub
or not self
.ssh_key
:
1041 self
.log
.info('Generating ssh key...')
1042 tmp_dir
= TemporaryDirectory()
1043 path
= tmp_dir
.name
+ '/key'
1045 subprocess
.check_call([
1046 '/usr/bin/ssh-keygen',
1047 '-C', 'ceph-%s' % self
._cluster
_fsid
,
1051 with
open(path
, 'r') as f
:
1053 with
open(path
+ '.pub', 'r') as f
:
1057 os
.unlink(path
+ '.pub')
1059 self
.set_store('ssh_identity_key', secret
)
1060 self
.set_store('ssh_identity_pub', pub
)
1061 self
.ssh
._reconfig
_ssh
()
1064 @orchestrator._cli
_write
_command
(
1065 'cephadm set-priv-key')
1066 def _set_priv_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1067 """Set cluster SSH private key (use -i <private_key>)"""
1068 if inbuf
is None or len(inbuf
) == 0:
1069 return -errno
.EINVAL
, "", "empty private ssh key provided"
1072 return 0, "value unchanged", ""
1073 self
._validate
_and
_set
_ssh
_val
('ssh_identity_key', inbuf
, old
)
1074 self
.log
.info('Set ssh private key')
1077 @orchestrator._cli
_write
_command
(
1078 'cephadm set-pub-key')
1079 def _set_pub_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1080 """Set cluster SSH public key (use -i <public_key>)"""
1081 if inbuf
is None or len(inbuf
) == 0:
1082 return -errno
.EINVAL
, "", "empty public ssh key provided"
1085 return 0, "value unchanged", ""
1086 self
._validate
_and
_set
_ssh
_val
('ssh_identity_pub', inbuf
, old
)
1089 @orchestrator._cli
_write
_command
(
1090 'cephadm clear-key')
1091 def _clear_key(self
) -> Tuple
[int, str, str]:
1092 """Clear cluster SSH key"""
1093 self
.set_store('ssh_identity_key', None)
1094 self
.set_store('ssh_identity_pub', None)
1095 self
.ssh
._reconfig
_ssh
()
1096 self
.log
.info('Cleared cluster SSH key')
1099 @orchestrator._cli
_read
_command
(
1100 'cephadm get-pub-key')
1101 def _get_pub_key(self
) -> Tuple
[int, str, str]:
1102 """Show SSH public key for connecting to cluster hosts"""
1104 return 0, self
.ssh_pub
, ''
1106 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
1108 @orchestrator._cli
_read
_command
(
1110 def _get_user(self
) -> Tuple
[int, str, str]:
1112 Show user for SSHing to cluster hosts
1114 if self
.ssh_user
is None:
1115 return -errno
.ENOENT
, '', 'No cluster SSH user configured'
1117 return 0, self
.ssh_user
, ''
1119 @orchestrator._cli
_read
_command
(
1121 def set_ssh_user(self
, user
: str) -> Tuple
[int, str, str]:
1123 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
1125 current_user
= self
.ssh_user
1126 if user
== current_user
:
1127 return 0, "value unchanged", ""
1129 self
._validate
_and
_set
_ssh
_val
('ssh_user', user
, current_user
)
1130 current_ssh_config
= self
._get
_ssh
_config
()
1131 new_ssh_config
= re
.sub(r
"(\s{2}User\s)(.*)", r
"\1" + user
, current_ssh_config
.stdout
)
1132 self
._set
_ssh
_config
(new_ssh_config
)
1134 msg
= 'ssh user set to %s' % user
1136 msg
+= '. sudo will be used'
1140 @orchestrator._cli
_read
_command
(
1141 'cephadm registry-login')
1142 def registry_login(self
, url
: Optional
[str] = None, username
: Optional
[str] = None, password
: Optional
[str] = None, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1144 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1146 # if password not given in command line, get it through file input
1147 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
1148 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
1149 "or -i <login credentials json file>")
1150 elif (url
and username
and password
):
1151 registry_json
= {'url': url
, 'username': username
, 'password': password
}
1153 assert isinstance(inbuf
, str)
1154 registry_json
= json
.loads(inbuf
)
1155 if "url" not in registry_json
or "username" not in registry_json
or "password" not in registry_json
:
1156 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
1157 "Please setup json file as\n"
1159 " \"url\": \"REGISTRY_URL\",\n"
1160 " \"username\": \"REGISTRY_USERNAME\",\n"
1161 " \"password\": \"REGISTRY_PASSWORD\"\n"
1164 # verify login info works by attempting login on random host
1166 for host_name
in self
.inventory
.keys():
1170 raise OrchestratorError('no hosts defined')
1171 with self
.async_timeout_handler(host
, 'cephadm registry-login'):
1172 r
= self
.wait_async(CephadmServe(self
)._registry
_login
(host
, registry_json
))
1175 # if logins succeeded, store info
1176 self
.log
.debug("Host logins successful. Storing login info.")
1177 self
.set_store('registry_credentials', json
.dumps(registry_json
))
1178 # distribute new login info to all hosts
1179 self
.cache
.distribute_new_registry_login_info()
1180 return 0, "registry login scheduled", ''
1182 @orchestrator._cli
_read
_command
('cephadm check-host')
1183 def check_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
1184 """Check whether we can access and manage a remote host"""
1186 with self
.async_timeout_handler(host
, f
'cephadm check-host --expect-hostname {host}'):
1187 out
, err
, code
= self
.wait_async(
1188 CephadmServe(self
)._run
_cephadm
(
1189 host
, cephadmNoImage
, 'check-host', ['--expect-hostname', host
],
1190 addr
=addr
, error_ok
=True, no_fsid
=True))
1192 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
1193 except ssh
.HostConnectionError
as e
:
1194 self
.log
.exception(f
"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
1195 return 1, '', ('check-host failed:\n'
1196 + f
"Failed to connect to {host} at address ({e.addr}): {str(e)}")
1197 except OrchestratorError
:
1198 self
.log
.exception(f
"check-host failed for '{host}'")
1199 return 1, '', ('check-host failed:\n'
1200 + f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
1201 # if we have an outstanding health alert for this host, give the
1202 # serve thread a kick
1203 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1204 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1205 if item
.startswith('host %s ' % host
):
1207 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
1209 @orchestrator._cli
_read
_command
(
1210 'cephadm prepare-host')
1211 def _prepare_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
1212 """Prepare a remote host for use with cephadm"""
1213 with self
.async_timeout_handler(host
, 'cephadm prepare-host'):
1214 out
, err
, code
= self
.wait_async(
1215 CephadmServe(self
)._run
_cephadm
(
1216 host
, cephadmNoImage
, 'prepare-host', ['--expect-hostname', host
],
1217 addr
=addr
, error_ok
=True, no_fsid
=True))
1219 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
1220 # if we have an outstanding health alert for this host, give the
1221 # serve thread a kick
1222 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1223 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1224 if item
.startswith('host %s ' % host
):
1226 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
1228 @orchestrator._cli
_write
_command
(
1229 prefix
='cephadm set-extra-ceph-conf')
1230 def _set_extra_ceph_conf(self
, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1232 Text that is appended to all daemon's ceph.conf.
1233 Mainly a workaround, till `config generate-minimal-conf` generates
1234 a complete ceph.conf.
1236 Warning: this is a dangerous operation.
1241 cp
.read_string(inbuf
, source
='<infile>')
1243 self
.set_store("extra_ceph_conf", json
.dumps({
1245 'last_modified': datetime_to_str(datetime_now())
1247 self
.log
.info('Set extra_ceph_conf')
1248 self
._kick
_serve
_loop
()
1249 return HandleCommandResult()
1251 @orchestrator._cli
_read
_command
(
1252 'cephadm get-extra-ceph-conf')
1253 def _get_extra_ceph_conf(self
) -> HandleCommandResult
:
1255 Get extra ceph conf that is appended
1257 return HandleCommandResult(stdout
=self
.extra_ceph_conf().conf
)
1259 @orchestrator._cli
_read
_command
('cephadm config-check ls')
1260 def _config_checks_list(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1261 """List the available configuration checks and their current state"""
1263 if format
not in [Format
.plain
, Format
.json
, Format
.json_pretty
]:
1264 return HandleCommandResult(
1266 stderr
="Requested format is not supported when listing configuration checks"
1269 if format
in [Format
.json
, Format
.json_pretty
]:
1270 return HandleCommandResult(
1271 stdout
=to_format(self
.config_checker
.health_checks
,
1277 table
= PrettyTable(
1283 table
.align
['NAME'] = 'l'
1284 table
.align
['HEALTHCHECK'] = 'l'
1285 table
.align
['STATUS'] = 'l'
1286 table
.align
['DESCRIPTION'] = 'l'
1287 table
.left_padding_width
= 0
1288 table
.right_padding_width
= 2
1289 for c
in self
.config_checker
.health_checks
:
1297 return HandleCommandResult(stdout
=table
.get_string())
1299 @orchestrator._cli
_read
_command
('cephadm config-check status')
1300 def _config_check_status(self
) -> HandleCommandResult
:
1301 """Show whether the configuration checker feature is enabled/disabled"""
1302 status
= self
.get_module_option('config_checks_enabled')
1303 return HandleCommandResult(stdout
="Enabled" if status
else "Disabled")
1305 @orchestrator._cli
_write
_command
('cephadm config-check enable')
1306 def _config_check_enable(self
, check_name
: str) -> HandleCommandResult
:
1307 """Enable a specific configuration check"""
1308 if not self
._config
_check
_valid
(check_name
):
1309 return HandleCommandResult(retval
=1, stderr
="Invalid check name")
1311 err
, msg
= self
._update
_config
_check
(check_name
, 'enabled')
1313 return HandleCommandResult(
1315 stderr
=f
"Failed to enable check '{check_name}' : {msg}")
1317 return HandleCommandResult(stdout
="ok")
1319 @orchestrator._cli
_write
_command
('cephadm config-check disable')
1320 def _config_check_disable(self
, check_name
: str) -> HandleCommandResult
:
1321 """Disable a specific configuration check"""
1322 if not self
._config
_check
_valid
(check_name
):
1323 return HandleCommandResult(retval
=1, stderr
="Invalid check name")
1325 err
, msg
= self
._update
_config
_check
(check_name
, 'disabled')
1327 return HandleCommandResult(retval
=err
, stderr
=f
"Failed to disable check '{check_name}': {msg}")
1329 # drop any outstanding raised healthcheck for this check
1330 config_check
= self
.config_checker
.lookup_check(check_name
)
1332 if config_check
.healthcheck_name
in self
.health_checks
:
1333 self
.health_checks
.pop(config_check
.healthcheck_name
, None)
1334 self
.set_health_checks(self
.health_checks
)
1337 f
"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1339 return HandleCommandResult(stdout
="ok")
1341 def _config_check_valid(self
, check_name
: str) -> bool:
1342 return check_name
in [chk
.name
for chk
in self
.config_checker
.health_checks
]
1344 def _update_config_check(self
, check_name
: str, status
: str) -> Tuple
[int, str]:
1345 checks_raw
= self
.get_store('config_checks')
1347 return 1, "config_checks setting is not available"
1349 checks
= json
.loads(checks_raw
)
1353 self
.log
.info(f
"updated config check '{check_name}' : {status}")
1354 self
.set_store('config_checks', json
.dumps(checks
))
1357 class ExtraCephConf(NamedTuple
):
1359 last_modified
: Optional
[datetime
.datetime
]
1361 def extra_ceph_conf(self
) -> 'CephadmOrchestrator.ExtraCephConf':
1362 data
= self
.get_store('extra_ceph_conf')
1364 return CephadmOrchestrator
.ExtraCephConf('', None)
1366 j
= json
.loads(data
)
1368 msg
= 'Unable to load extra_ceph_conf: Cannot decode JSON'
1369 self
.log
.exception('%s: \'%s\'', msg
, data
)
1370 return CephadmOrchestrator
.ExtraCephConf('', None)
1371 return CephadmOrchestrator
.ExtraCephConf(j
['conf'], str_to_datetime(j
['last_modified']))
1373 def extra_ceph_conf_is_newer(self
, dt
: datetime
.datetime
) -> bool:
1374 conf
= self
.extra_ceph_conf()
1375 if not conf
.last_modified
:
1377 return conf
.last_modified
> dt
1379 @orchestrator._cli
_write
_command
(
1380 'cephadm osd activate'
1382 def _osd_activate(self
, host
: List
[str]) -> HandleCommandResult
:
1384 Start OSD containers for existing OSDs
1388 def run(h
: str) -> str:
1389 with self
.async_timeout_handler(h
, 'cephadm deploy (osd daemon)'):
1390 return self
.wait_async(self
.osd_service
.deploy_osd_daemons_for_existing_osds(h
, 'osd'))
1392 return HandleCommandResult(stdout
='\n'.join(run(host
)))
1394 @orchestrator._cli
_read
_command
('orch client-keyring ls')
1395 def _client_keyring_ls(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1397 List client keyrings under cephadm management
1399 if format
!= Format
.plain
:
1400 output
= to_format(self
.keys
.keys
.values(), format
, many
=True, cls
=ClientKeyringSpec
)
1402 table
= PrettyTable(
1403 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1406 table
.left_padding_width
= 0
1407 table
.right_padding_width
= 2
1408 for ks
in sorted(self
.keys
.keys
.values(), key
=lambda ks
: ks
.entity
):
1410 ks
.entity
, ks
.placement
.pretty_str(),
1411 utils
.file_mode_to_str(ks
.mode
),
1412 f
'{ks.uid}:{ks.gid}',
1415 output
= table
.get_string()
1416 return HandleCommandResult(stdout
=output
)
1418 @orchestrator._cli
_write
_command
('orch client-keyring set')
1419 def _client_keyring_set(
1423 owner
: Optional
[str] = None,
1424 mode
: Optional
[str] = None,
1425 ) -> HandleCommandResult
:
1427 Add or update client keyring under cephadm management
1429 if not entity
.startswith('client.'):
1430 raise OrchestratorError('entity must start with client.')
1433 uid
, gid
= map(int, owner
.split(':'))
1435 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1441 imode
= int(mode
, 8)
1443 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1446 pspec
= PlacementSpec
.from_string(placement
)
1447 ks
= ClientKeyringSpec(entity
, pspec
, mode
=imode
, uid
=uid
, gid
=gid
)
1448 self
.keys
.update(ks
)
1449 self
._kick
_serve
_loop
()
1450 return HandleCommandResult()
1452 @orchestrator._cli
_write
_command
('orch client-keyring rm')
1453 def _client_keyring_rm(
1456 ) -> HandleCommandResult
:
1458 Remove client keyring from cephadm management
1460 self
.keys
.rm(entity
)
1461 self
._kick
_serve
_loop
()
1462 return HandleCommandResult()
1464 def _get_container_image(self
, daemon_name
: str) -> Optional
[str]:
1465 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1466 image
: Optional
[str] = None
1467 if daemon_type
in CEPH_IMAGE_TYPES
:
1468 # get container image
1469 image
= str(self
.get_foreign_ceph_option(
1470 utils
.name_to_config_section(daemon_name
),
1473 elif daemon_type
== 'prometheus':
1474 image
= self
.container_image_prometheus
1475 elif daemon_type
== 'grafana':
1476 image
= self
.container_image_grafana
1477 elif daemon_type
== 'alertmanager':
1478 image
= self
.container_image_alertmanager
1479 elif daemon_type
== 'node-exporter':
1480 image
= self
.container_image_node_exporter
1481 elif daemon_type
== 'loki':
1482 image
= self
.container_image_loki
1483 elif daemon_type
== 'promtail':
1484 image
= self
.container_image_promtail
1485 elif daemon_type
== 'haproxy':
1486 image
= self
.container_image_haproxy
1487 elif daemon_type
== 'keepalived':
1488 image
= self
.container_image_keepalived
1489 elif daemon_type
== 'elasticsearch':
1490 image
= self
.container_image_elasticsearch
1491 elif daemon_type
== 'jaeger-agent':
1492 image
= self
.container_image_jaeger_agent
1493 elif daemon_type
== 'jaeger-collector':
1494 image
= self
.container_image_jaeger_collector
1495 elif daemon_type
== 'jaeger-query':
1496 image
= self
.container_image_jaeger_query
1497 elif daemon_type
== CustomContainerService
.TYPE
:
1498 # The image can't be resolved, the necessary information
1499 # is only available when a container is deployed (given
1502 elif daemon_type
== 'snmp-gateway':
1503 image
= self
.container_image_snmp_gateway
1505 assert False, daemon_type
1507 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1511 def _check_valid_addr(self
, host
: str, addr
: str) -> str:
1512 # make sure hostname is resolvable before trying to make a connection
1514 ip_addr
= utils
.resolve_ip(addr
)
1515 except OrchestratorError
as e
:
1517 You may need to supply an address for {addr}
1519 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1520 To add the cephadm SSH key to the host:
1521 > ceph cephadm get-pub-key > ~/ceph.pub
1522 > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1524 To check that the host is reachable open a new shell with the --no-hosts flag:
1525 > cephadm shell --no-hosts
1527 Then run the following:
1528 > ceph cephadm get-ssh-config > ssh_config
1529 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1530 > chmod 0600 ~/cephadm_private_key
1531 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1532 raise OrchestratorError(msg
)
1534 if ipaddress
.ip_address(ip_addr
).is_loopback
and host
== addr
:
1535 # if this is a re-add, use old address. otherwise error
1536 if host
not in self
.inventory
or self
.inventory
.get_addr(host
) == host
:
1537 raise OrchestratorError(
1538 (f
'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1539 + f
'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1541 f
'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1542 ip_addr
= self
.inventory
.get_addr(host
)
1544 with self
.async_timeout_handler(host
, f
'cephadm check-host --expect-hostname {host}'):
1545 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
1546 host
, cephadmNoImage
, 'check-host',
1547 ['--expect-hostname', host
],
1549 error_ok
=True, no_fsid
=True))
1551 msg
= 'check-host failed:\n' + '\n'.join(err
)
1552 # err will contain stdout and stderr, so we filter on the message text to
1553 # only show the errors
1554 errors
= [_i
.replace("ERROR: ", "") for _i
in err
if _i
.startswith('ERROR')]
1556 msg
= f
'Host {host} ({addr}) failed check(s): {errors}'
1557 raise OrchestratorError(msg
)
1558 except ssh
.HostConnectionError
as e
:
1559 raise OrchestratorError(str(e
))
1562 def _add_host(self
, spec
):
1563 # type: (HostSpec) -> str
1565 Add a host to be managed by the orchestrator.
1567 :param host: host name
1569 HostSpec
.validate(spec
)
1570 ip_addr
= self
._check
_valid
_addr
(spec
.hostname
, spec
.addr
)
1571 if spec
.addr
== spec
.hostname
and ip_addr
:
1574 if spec
.hostname
in self
.inventory
and self
.inventory
.get_addr(spec
.hostname
) != spec
.addr
:
1575 self
.cache
.refresh_all_host_info(spec
.hostname
)
1579 self
.check_mon_command({
1580 'prefix': 'osd crush add-bucket',
1581 'name': spec
.hostname
,
1583 'args': [f
'{k}={v}' for k
, v
in spec
.location
.items()],
1586 if spec
.hostname
not in self
.inventory
:
1587 self
.cache
.prime_empty_host(spec
.hostname
)
1588 self
.inventory
.add_host(spec
)
1589 self
.offline_hosts_remove(spec
.hostname
)
1590 if spec
.status
== 'maintenance':
1591 self
._set
_maintenance
_healthcheck
()
1592 self
.event
.set() # refresh stray health check
1593 self
.log
.info('Added host %s' % spec
.hostname
)
1594 return "Added host '{}' with addr '{}'".format(spec
.hostname
, spec
.addr
)
1597 def add_host(self
, spec
: HostSpec
) -> str:
1598 return self
._add
_host
(spec
)
1601 def remove_host(self
, host
: str, force
: bool = False, offline
: bool = False) -> str:
1603 Remove a host from orchestrator management.
1605 :param host: host name
1606 :param force: bypass running daemons check
1607 :param offline: remove offline host
1610 # check if host is offline
1611 host_offline
= host
in self
.offline_hosts
1613 if host_offline
and not offline
:
1614 raise OrchestratorValidationError(
1615 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host
))
1617 if not host_offline
and offline
:
1618 raise OrchestratorValidationError(
1619 "{} is online, please remove host without --offline.".format(host
))
1621 if offline
and not force
:
1622 raise OrchestratorValidationError("Removing an offline host requires --force")
1624 # check if there are daemons on the host
1626 daemons
= self
.cache
.get_daemons_by_host(host
)
1628 self
.log
.warning(f
"Blocked {host} removal. Daemons running: {daemons}")
1631 daemons_table
+= "{:<20} {:<15}\n".format("type", "id")
1632 daemons_table
+= "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1634 daemons_table
+= "{:<20} {:<15}\n".format(d
.daemon_type
, d
.daemon_id
)
1636 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1637 "The following daemons are running in the host:"
1638 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1639 host
, daemons_table
, host
))
1641 # check, if there we're removing the last _admin host
1643 p
= PlacementSpec(label
='_admin')
1644 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
1645 if len(admin_hosts
) == 1 and admin_hosts
[0] == host
:
1646 raise OrchestratorValidationError(f
"Host {host} is the last host with the '_admin'"
1647 " label. Please add the '_admin' label to a host"
1648 " or add --force to this command")
1650 def run_cmd(cmd_args
: dict) -> None:
1651 ret
, out
, err
= self
.mon_command(cmd_args
)
1653 self
.log
.debug(f
"ran {cmd_args} with mon_command")
1655 f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1656 self
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
1659 daemons
= self
.cache
.get_daemons_by_host(host
)
1661 self
.log
.info(f
"removing: {d.name()}")
1663 if d
.daemon_type
!= 'osd':
1664 self
.cephadm_services
[daemon_type_to_service(str(d
.daemon_type
))].pre_remove(d
)
1665 self
.cephadm_services
[daemon_type_to_service(str(d
.daemon_type
))].post_remove(d
, is_failed_deploy
=False)
1668 'prefix': 'osd purge-actual',
1669 'id': int(str(d
.daemon_id
)),
1670 'yes_i_really_mean_it': True
1675 'prefix': 'osd crush rm',
1680 self
.inventory
.rm_host(host
)
1681 self
.cache
.rm_host(host
)
1682 self
.ssh
.reset_con(host
)
1683 self
.offline_hosts_remove(host
) # if host was in offline host list, we should remove it now.
1684 self
.event
.set() # refresh stray health check
1685 self
.log
.info('Removed host %s' % host
)
1686 return "Removed {} host '{}'".format('offline' if offline
else '', host
)
1689 def update_host_addr(self
, host
: str, addr
: str) -> str:
1690 self
._check
_valid
_addr
(host
, addr
)
1691 self
.inventory
.set_addr(host
, addr
)
1692 self
.ssh
.reset_con(host
)
1693 self
.event
.set() # refresh stray health check
1694 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1695 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1698 def get_hosts(self
):
1699 # type: () -> List[orchestrator.HostSpec]
1701 Return a list of hosts managed by the orchestrator.
1704 - skip async: manager reads from cache.
1706 return list(self
.inventory
.all_specs())
1709 def get_facts(self
, hostname
: Optional
[str] = None) -> List
[Dict
[str, Any
]]:
1711 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1714 - skip async: manager reads from cache.
1717 return [self
.cache
.get_facts(hostname
)]
1719 return [self
.cache
.get_facts(hostname
) for hostname
in self
.cache
.get_hosts()]
1722 def add_host_label(self
, host
: str, label
: str) -> str:
1723 self
.inventory
.add_label(host
, label
)
1724 self
.log
.info('Added label %s to host %s' % (label
, host
))
1725 self
._kick
_serve
_loop
()
1726 return 'Added label %s to host %s' % (label
, host
)
1729 def remove_host_label(self
, host
: str, label
: str, force
: bool = False) -> str:
1730 # if we remove the _admin label from the only host that has it we could end up
1731 # removing the only instance of the config and keyring and cause issues
1732 if not force
and label
== '_admin':
1733 p
= PlacementSpec(label
='_admin')
1734 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
1735 if len(admin_hosts
) == 1 and admin_hosts
[0] == host
:
1736 raise OrchestratorValidationError(f
"Host {host} is the last host with the '_admin'"
1737 " label.\nRemoving the _admin label from this host could cause the removal"
1738 " of the last cluster config/keyring managed by cephadm.\n"
1739 "It is recommended to add the _admin label to another host"
1740 " before completing this operation.\nIf you're certain this is"
1741 " what you want rerun this command with --force.")
1742 self
.inventory
.rm_label(host
, label
)
1743 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1744 self
._kick
_serve
_loop
()
1745 return 'Removed label %s from host %s' % (label
, host
)
1747 def _host_ok_to_stop(self
, hostname
: str, force
: bool = False) -> Tuple
[int, str]:
1748 self
.log
.debug("running host-ok-to-stop checks")
1749 daemons
= self
.cache
.get_daemons()
1750 daemon_map
: Dict
[str, List
[str]] = defaultdict(lambda: [])
1752 assert dd
.hostname
is not None
1753 assert dd
.daemon_type
is not None
1754 assert dd
.daemon_id
is not None
1755 if dd
.hostname
== hostname
:
1756 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1758 notifications
: List
[str] = []
1759 error_notifications
: List
[str] = []
1761 for daemon_type
, daemon_ids
in daemon_map
.items():
1762 r
= self
.cephadm_services
[daemon_type_to_service(
1763 daemon_type
)].ok_to_stop(daemon_ids
, force
=force
)
1766 # collect error notifications so user can see every daemon causing host
1767 # to not be okay to stop
1768 error_notifications
.append(r
.stderr
)
1770 # if extra notifications to print for user, add them to notifications list
1771 notifications
.append(r
.stdout
)
1774 # at least one daemon is not okay to stop
1775 return 1, '\n'.join(error_notifications
)
1778 return 0, (f
'It is presumed safe to stop host {hostname}. '
1779 + 'Note the following:\n\n' + '\n'.join(notifications
))
1780 return 0, f
'It is presumed safe to stop host {hostname}'
1783 def host_ok_to_stop(self
, hostname
: str) -> str:
1784 if hostname
not in self
.cache
.get_hosts():
1785 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1787 rc
, msg
= self
._host
_ok
_to
_stop
(hostname
)
1789 raise OrchestratorError(msg
, errno
=rc
)
1794 def _set_maintenance_healthcheck(self
) -> None:
1795 """Raise/update or clear the maintenance health check as needed"""
1797 in_maintenance
= self
.inventory
.get_host_with_state("maintenance")
1798 if not in_maintenance
:
1799 self
.remove_health_warning('HOST_IN_MAINTENANCE')
1801 s
= "host is" if len(in_maintenance
) == 1 else "hosts are"
1802 self
.set_health_warning("HOST_IN_MAINTENANCE", f
"{len(in_maintenance)} {s} in maintenance mode", 1, [
1803 f
"{h} is in maintenance" for h
in in_maintenance
])
1807 def enter_host_maintenance(self
, hostname
: str, force
: bool = False, yes_i_really_mean_it
: bool = False) -> str:
1808 """ Attempt to place a cluster host in maintenance
1810 Placing a host into maintenance disables the cluster's ceph target in systemd
1811 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1812 for the host subtree in crush to prevent data movement during a host maintenance
1815 :param hostname: (str) name of the host (must match an inventory hostname)
1817 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1819 if yes_i_really_mean_it
and not force
:
1820 raise OrchestratorError("--force must be passed with --yes-i-really-mean-it")
1822 if len(self
.cache
.get_hosts()) == 1 and not yes_i_really_mean_it
:
1823 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1825 # if upgrade is active, deny
1826 if self
.upgrade
.upgrade_state
and not yes_i_really_mean_it
:
1827 raise OrchestratorError(
1828 f
"Unable to place {hostname} in maintenance with upgrade active/paused")
1830 tgt_host
= self
.inventory
._inventory
[hostname
]
1831 if tgt_host
.get("status", "").lower() == "maintenance":
1832 raise OrchestratorError(f
"Host {hostname} is already in maintenance")
1834 host_daemons
= self
.cache
.get_daemon_types(hostname
)
1835 self
.log
.debug("daemons on host {}".format(','.join(host_daemons
)))
1837 # daemons on this host, so check the daemons can be stopped
1838 # and if so, place the host into maintenance by disabling the target
1839 rc
, msg
= self
._host
_ok
_to
_stop
(hostname
, force
)
1840 if rc
and not yes_i_really_mean_it
:
1841 raise OrchestratorError(
1842 msg
+ '\nNote: Warnings can be bypassed with the --force flag', errno
=rc
)
1844 # call the host-maintenance function
1845 with self
.async_timeout_handler(hostname
, 'cephadm host-maintenance enter'):
1846 _out
, _err
, _code
= self
.wait_async(
1847 CephadmServe(self
)._run
_cephadm
(
1848 hostname
, cephadmNoImage
, "host-maintenance",
1851 returned_msg
= _err
[0].split('\n')[-1]
1852 if (returned_msg
.startswith('failed') or returned_msg
.startswith('ERROR')) and not yes_i_really_mean_it
:
1853 raise OrchestratorError(
1854 f
"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1856 if "osd" in host_daemons
:
1857 crush_node
= hostname
if '.' not in hostname
else hostname
.split('.')[0]
1858 rc
, out
, err
= self
.mon_command({
1859 'prefix': 'osd set-group',
1861 'who': [crush_node
],
1864 if rc
and not yes_i_really_mean_it
:
1866 f
"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1867 raise OrchestratorError(
1868 f
"Unable to set the osds on {hostname} to noout (rc={rc})")
1871 f
"maintenance mode request for {hostname} has SET the noout group")
1873 # update the host status in the inventory
1874 tgt_host
["status"] = "maintenance"
1875 self
.inventory
._inventory
[hostname
] = tgt_host
1876 self
.inventory
.save()
1878 self
._set
_maintenance
_healthcheck
()
1879 return f
'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
1883 def exit_host_maintenance(self
, hostname
: str) -> str:
1884 """Exit maintenance mode and return a host to an operational state
1886 Returning from maintenance will enable the clusters systemd target and
1887 start it, and remove any noout that has been added for the host if the
1888 host has osd daemons
1890 :param hostname: (str) host name
1892 :raises OrchestratorError: Unable to return from maintenance, or unset the
1895 tgt_host
= self
.inventory
._inventory
[hostname
]
1896 if tgt_host
['status'] != "maintenance":
1897 raise OrchestratorError(f
"Host {hostname} is not in maintenance mode")
1899 with self
.async_timeout_handler(hostname
, 'cephadm host-maintenance exit'):
1900 outs
, errs
, _code
= self
.wait_async(
1901 CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
,
1902 'host-maintenance', ['exit'], error_ok
=True))
1903 returned_msg
= errs
[0].split('\n')[-1]
1904 if returned_msg
.startswith('failed') or returned_msg
.startswith('ERROR'):
1905 raise OrchestratorError(
1906 f
"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1908 if "osd" in self
.cache
.get_daemon_types(hostname
):
1909 crush_node
= hostname
if '.' not in hostname
else hostname
.split('.')[0]
1910 rc
, _out
, _err
= self
.mon_command({
1911 'prefix': 'osd unset-group',
1913 'who': [crush_node
],
1918 f
"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1919 raise OrchestratorError(f
"Unable to set the osds on {hostname} to noout (rc={rc})")
1922 f
"exit maintenance request has UNSET for the noout group on host {hostname}")
1924 # update the host record status
1925 tgt_host
['status'] = ""
1926 self
.inventory
._inventory
[hostname
] = tgt_host
1927 self
.inventory
.save()
1929 self
._set
_maintenance
_healthcheck
()
1931 return f
"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1935 def rescan_host(self
, hostname
: str) -> str:
1936 """Use cephadm to issue a disk rescan on each HBA
1938 Some HBAs and external enclosures don't automatically register
1939 device insertion with the kernel, so for these scenarios we need
1942 :param hostname: (str) host name
1944 self
.log
.info(f
'disk rescan request sent to host "{hostname}"')
1945 with self
.async_timeout_handler(hostname
, 'cephadm disk-rescan'):
1946 _out
, _err
, _code
= self
.wait_async(
1947 CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
, "disk-rescan",
1948 [], no_fsid
=True, error_ok
=True))
1950 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
1952 msg
= _err
[0].split('\n')[-1]
1953 log_msg
= f
'disk rescan: {msg}'
1954 if msg
.upper().startswith('OK'):
1955 self
.log
.info(log_msg
)
1957 self
.log
.warning(log_msg
)
1961 def get_minimal_ceph_conf(self
) -> str:
1962 _
, config
, _
= self
.check_mon_command({
1963 "prefix": "config generate-minimal-conf",
1965 extra
= self
.extra_ceph_conf().conf
1968 config
= self
._combine
_confs
(config
, extra
)
1969 except Exception as e
:
1970 self
.log
.error(f
'Failed to add extra ceph conf settings to minimal ceph conf: {e}')
1973 def _combine_confs(self
, conf1
: str, conf2
: str) -> str:
1974 section_to_option
: Dict
[str, List
[str]] = {}
1975 final_conf
: str = ''
1976 for conf
in [conf1
, conf2
]:
1980 for line
in conf
.split('\n'):
1981 if line
.strip().startswith('#') or not line
.strip():
1983 if line
.strip().startswith('[') and line
.strip().endswith(']'):
1984 section
= line
.strip().replace('[', '').replace(']', '')
1985 if section
not in section_to_option
:
1986 section_to_option
[section
] = []
1988 section_to_option
[section
].append(line
.strip())
1990 first_section
= True
1991 for section
, options
in section_to_option
.items():
1992 if not first_section
:
1994 final_conf
+= f
'[{section}]\n'
1995 for option
in options
:
1996 final_conf
+= f
'{option}\n'
1997 first_section
= False
2001 def _invalidate_daemons_and_kick_serve(self
, filter_host
: Optional
[str] = None) -> None:
2003 self
.cache
.invalidate_host_daemons(filter_host
)
2005 for h
in self
.cache
.get_hosts():
2006 # Also discover daemons deployed manually
2007 self
.cache
.invalidate_host_daemons(h
)
2009 self
._kick
_serve
_loop
()
2012 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
2013 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
2015 self
._invalidate
_daemons
_and
_kick
_serve
()
2016 self
.log
.debug('Kicked serve() loop to refresh all services')
2018 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
2021 for nm
, spec
in self
.spec_store
.all_specs
.items():
2022 if service_type
is not None and service_type
!= spec
.service_type
:
2024 if service_name
is not None and service_name
!= nm
:
2027 if spec
.service_type
!= 'osd':
2028 size
= spec
.placement
.get_target_count(self
.cache
.get_schedulable_hosts())
2030 # osd counting is special
2033 sm
[nm
] = orchestrator
.ServiceDescription(
2037 events
=self
.events
.get_for_service(spec
.service_name()),
2038 created
=self
.spec_store
.spec_created
[nm
],
2039 deleted
=self
.spec_store
.spec_deleted
.get(nm
, None),
2040 virtual_ip
=spec
.get_virtual_ip(),
2041 ports
=spec
.get_port_start(),
2043 if spec
.service_type
== 'ingress':
2044 # ingress has 2 daemons running per host
2045 # but only if it's the full ingress service, not for keepalive-only
2046 if not cast(IngressSpec
, spec
).keepalive_only
:
2049 # factor daemons into status
2050 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2051 for name
, dd
in dm
.items():
2052 assert dd
.hostname
is not None, f
'no hostname for {dd!r}'
2053 assert dd
.daemon_type
is not None, f
'no daemon_type for {dd!r}'
2055 n
: str = dd
.service_name()
2059 and service_type
!= daemon_type_to_service(dd
.daemon_type
)
2062 if service_name
and service_name
!= n
:
2066 # new unmanaged service
2069 service_type
=daemon_type_to_service(dd
.daemon_type
),
2070 service_id
=dd
.service_id(),
2072 sm
[n
] = orchestrator
.ServiceDescription(
2073 last_refresh
=dd
.last_refresh
,
2074 container_image_id
=dd
.container_image_id
,
2075 container_image_name
=dd
.container_image_name
,
2080 if dd
.status
== DaemonDescriptionStatus
.running
:
2082 if dd
.daemon_type
== 'osd':
2083 # The osd count can't be determined by the Placement spec.
2084 # Showing an actual/expected representation cannot be determined
2085 # here. So we're setting running = size for now.
2088 not sm
[n
].last_refresh
2089 or not dd
.last_refresh
2090 or dd
.last_refresh
< sm
[n
].last_refresh
# type: ignore
2092 sm
[n
].last_refresh
= dd
.last_refresh
2094 return list(sm
.values())
2097 def list_daemons(self
,
2098 service_name
: Optional
[str] = None,
2099 daemon_type
: Optional
[str] = None,
2100 daemon_id
: Optional
[str] = None,
2101 host
: Optional
[str] = None,
2102 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
2104 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
2105 self
.log
.debug('Kicked serve() loop to refresh all daemons')
2108 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2109 if host
and h
!= host
:
2111 for name
, dd
in dm
.items():
2112 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
2114 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
2116 if service_name
is not None and service_name
!= dd
.service_name():
2118 if not dd
.memory_request
and dd
.daemon_type
in ['osd', 'mon']:
2119 dd
.memory_request
= cast(Optional
[int], self
.get_foreign_ceph_option(
2121 f
"{dd.daemon_type}_memory_target"
2127 def service_action(self
, action
: str, service_name
: str) -> List
[str]:
2128 if service_name
not in self
.spec_store
.all_specs
.keys():
2129 raise OrchestratorError(f
'Invalid service name "{service_name}".'
2130 + ' View currently running services using "ceph orch ls"')
2131 dds
: List
[DaemonDescription
] = self
.cache
.get_daemons_by_service(service_name
)
2133 raise OrchestratorError(f
'No daemons exist under service name "{service_name}".'
2134 + ' View currently running services using "ceph orch ls"')
2135 if action
== 'stop' and service_name
.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
2136 return [f
'Stopping entire {service_name} service is prohibited.']
2137 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
2139 self
._schedule
_daemon
_action
(dd
.name(), action
)
2143 def _rotate_daemon_key(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> str:
2144 self
.log
.info(f
'Rotating authentication key for {daemon_spec.name()}')
2145 rc
, out
, err
= self
.mon_command({
2146 'prefix': 'auth get-or-create-pending',
2147 'entity': daemon_spec
.entity_name(),
2151 pending_key
= j
[0]['pending_key']
2153 # deploy a new keyring file
2154 if daemon_spec
.daemon_type
!= 'osd':
2155 daemon_spec
= self
.cephadm_services
[daemon_type_to_service(
2156 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
2157 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2158 self
.wait_async(CephadmServe(self
)._create
_daemon
(daemon_spec
, reconfig
=True))
2160 # try to be clever, or fall back to restarting the daemon
2162 if daemon_spec
.daemon_type
== 'osd':
2163 rc
, out
, err
= self
.tool_exec(
2164 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-stored-key', '-i', '-'],
2165 stdin
=pending_key
.encode()
2168 rc
, out
, err
= self
.tool_exec(
2169 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-key', '-i', '-'],
2170 stdin
=pending_key
.encode()
2172 elif daemon_spec
.daemon_type
== 'mds':
2173 rc
, out
, err
= self
.tool_exec(
2174 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-key', '-i', '-'],
2175 stdin
=pending_key
.encode()
2178 daemon_spec
.daemon_type
== 'mgr'
2179 and daemon_spec
.daemon_id
== self
.get_mgr_id()
2181 rc
, out
, err
= self
.tool_exec(
2182 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-key', '-i', '-'],
2183 stdin
=pending_key
.encode()
2186 self
._daemon
_action
(daemon_spec
, 'restart')
2188 return f
'Rotated key for {daemon_spec.name()}'
2190 def _daemon_action(self
,
2191 daemon_spec
: CephadmDaemonDeploySpec
,
2193 image
: Optional
[str] = None) -> str:
2194 self
._daemon
_action
_set
_image
(action
, image
, daemon_spec
.daemon_type
,
2195 daemon_spec
.daemon_id
)
2197 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(daemon_spec
.daemon_type
,
2198 daemon_spec
.daemon_id
):
2199 self
.mgr_service
.fail_over()
2200 return '' # unreachable
2202 if action
== 'rotate-key':
2203 return self
._rotate
_daemon
_key
(daemon_spec
)
2205 if action
== 'redeploy' or action
== 'reconfig':
2206 if daemon_spec
.daemon_type
!= 'osd':
2207 daemon_spec
= self
.cephadm_services
[daemon_type_to_service(
2208 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
2210 # for OSDs, we still need to update config, just not carry out the full
2211 # prepare_create function
2212 daemon_spec
.final_config
, daemon_spec
.deps
= self
.osd_service
.generate_config(
2214 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2215 return self
.wait_async(
2216 CephadmServe(self
)._create
_daemon
(daemon_spec
, reconfig
=(action
== 'reconfig')))
2219 'start': ['reset-failed', 'start'],
2221 'restart': ['reset-failed', 'restart'],
2223 name
= daemon_spec
.name()
2224 for a
in actions
[action
]:
2226 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm unit --name {name}'):
2227 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2228 daemon_spec
.host
, name
, 'unit',
2229 ['--name', name
, a
]))
2231 self
.log
.exception(f
'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
2232 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
2233 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
2234 self
.events
.for_daemon(name
, 'INFO', msg
)
2237 def _daemon_action_set_image(self
, action
: str, image
: Optional
[str], daemon_type
: str, daemon_id
: str) -> None:
2238 if image
is not None:
2239 if action
!= 'redeploy':
2240 raise OrchestratorError(
2241 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
2242 if daemon_type
not in CEPH_IMAGE_TYPES
:
2243 raise OrchestratorError(
2244 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
2245 f
'types are: {", ".join(CEPH_IMAGE_TYPES)}')
2247 self
.check_mon_command({
2248 'prefix': 'config set',
2249 'name': 'container_image',
2251 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
2255 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> str:
2256 d
= self
.cache
.get_daemon(daemon_name
)
2257 assert d
.daemon_type
is not None
2258 assert d
.daemon_id
is not None
2260 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(d
.daemon_type
, d
.daemon_id
) \
2261 and not self
.mgr_service
.mgr_map_has_standby():
2262 raise OrchestratorError(
2263 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2265 if action
== 'rotate-key':
2266 if d
.daemon_type
not in ['mgr', 'osd', 'mds',
2267 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
2268 raise OrchestratorError(
2269 f
'key rotation not supported for {d.daemon_type}'
2272 self
._daemon
_action
_set
_image
(action
, image
, d
.daemon_type
, d
.daemon_id
)
2274 self
.log
.info(f
'Schedule {action} daemon {daemon_name}')
2275 return self
._schedule
_daemon
_action
(daemon_name
, action
)
2277 def daemon_is_self(self
, daemon_type
: str, daemon_id
: str) -> bool:
2278 return daemon_type
== 'mgr' and daemon_id
== self
.get_mgr_id()
2280 def get_active_mgr(self
) -> DaemonDescription
:
2281 return self
.mgr_service
.get_active_daemon(self
.cache
.get_daemons_by_type('mgr'))
2283 def get_active_mgr_digests(self
) -> List
[str]:
2284 digests
= self
.mgr_service
.get_active_daemon(
2285 self
.cache
.get_daemons_by_type('mgr')).container_image_digests
2286 return digests
if digests
else []
2288 def _schedule_daemon_action(self
, daemon_name
: str, action
: str) -> str:
2289 dd
= self
.cache
.get_daemon(daemon_name
)
2290 assert dd
.daemon_type
is not None
2291 assert dd
.daemon_id
is not None
2292 assert dd
.hostname
is not None
2293 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(dd
.daemon_type
, dd
.daemon_id
) \
2294 and not self
.mgr_service
.mgr_map_has_standby():
2295 raise OrchestratorError(
2296 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2297 self
.cache
.schedule_daemon_action(dd
.hostname
, dd
.name(), action
)
2298 self
.cache
.save_host(dd
.hostname
)
2299 msg
= "Scheduled to {} {} on host '{}'".format(action
, daemon_name
, dd
.hostname
)
2300 self
._kick
_serve
_loop
()
2304 def remove_daemons(self
, names
):
2305 # type: (List[str]) -> List[str]
2307 for host
, dm
in self
.cache
.daemons
.items():
2310 args
.append((name
, host
))
2312 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
2313 self
.log
.info('Remove daemons %s' % ' '.join([a
[0] for a
in args
]))
2314 return self
._remove
_daemons
(args
)
2317 def remove_service(self
, service_name
: str, force
: bool = False) -> str:
2318 self
.log
.info('Remove service %s' % service_name
)
2319 self
._trigger
_preview
_refresh
(service_name
=service_name
)
2320 if service_name
in self
.spec_store
:
2321 if self
.spec_store
[service_name
].spec
.service_type
in ('mon', 'mgr'):
2322 return f
'Unable to remove {service_name} service.\n' \
2323 f
'Note, you might want to mark the {service_name} service as "unmanaged"'
2325 return f
"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
2327 # Report list of affected OSDs?
2328 if not force
and service_name
.startswith('osd.'):
2330 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2332 for name
, dd
in dm
.items():
2333 if dd
.daemon_type
== 'osd' and dd
.service_name() == service_name
:
2334 osds_to_remove
.append(str(dd
.daemon_id
))
2336 osds_msg
[h
] = osds_to_remove
2339 for h
, ls
in osds_msg
.items():
2340 msg
+= f
'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2341 raise OrchestratorError(
2342 f
'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2344 found
= self
.spec_store
.rm(service_name
)
2345 if found
and service_name
.startswith('osd.'):
2346 self
.spec_store
.finally_rm(service_name
)
2347 self
._kick
_serve
_loop
()
2348 return f
'Removed service {service_name}'
2351 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
2353 Return the storage inventory of hosts matching the given filter.
2355 :param host_filter: host filter
2358 - add filtering by label
2361 if host_filter
and host_filter
.hosts
:
2362 for h
in host_filter
.hosts
:
2363 self
.log
.debug(f
'will refresh {h} devs')
2364 self
.cache
.invalidate_host_devices(h
)
2365 self
.cache
.invalidate_host_networks(h
)
2367 for h
in self
.cache
.get_hosts():
2368 self
.log
.debug(f
'will refresh {h} devs')
2369 self
.cache
.invalidate_host_devices(h
)
2370 self
.cache
.invalidate_host_networks(h
)
2373 self
.log
.debug('Kicked serve() loop to refresh devices')
2376 for host
, dls
in self
.cache
.devices
.items():
2377 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
2379 result
.append(orchestrator
.InventoryHost(host
,
2380 inventory
.Devices(dls
)))
2384 def zap_device(self
, host
: str, path
: str) -> str:
2385 """Zap a device on a managed host.
2387 Use ceph-volume zap to return a device to an unused/free state
2390 host (str): hostname of the cluster host
2391 path (str): device path
2394 OrchestratorError: host is not a cluster host
2395 OrchestratorError: host is in maintenance and therefore unavailable
2396 OrchestratorError: device path not found on the host
2397 OrchestratorError: device is known to a different ceph cluster
2398 OrchestratorError: device holds active osd
2399 OrchestratorError: device cache hasn't been populated yet..
2402 str: output from the zap command
2405 self
.log
.info('Zap device %s:%s' % (host
, path
))
2407 if host
not in self
.inventory
.keys():
2408 raise OrchestratorError(
2409 f
"Host '{host}' is not a member of the cluster")
2411 host_info
= self
.inventory
._inventory
.get(host
, {})
2412 if host_info
.get('status', '').lower() == 'maintenance':
2413 raise OrchestratorError(
2414 f
"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2416 if host
not in self
.cache
.devices
:
2417 raise OrchestratorError(
2418 f
"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2420 host_devices
= self
.cache
.devices
[host
]
2422 osd_id_list
: List
[str] = []
2424 for dev
in host_devices
:
2425 if dev
.path
== path
:
2429 raise OrchestratorError(
2430 f
"Device path '{path}' not found on host '{host}'")
2433 dev_name
= os
.path
.basename(path
)
2434 active_osds
: List
[str] = []
2435 for osd_id
in osd_id_list
:
2436 metadata
= self
.get_metadata('osd', str(osd_id
))
2438 if metadata
.get('hostname', '') == host
and dev_name
in metadata
.get('devices', '').split(','):
2439 active_osds
.append("osd." + osd_id
)
2441 raise OrchestratorError(
2442 f
"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2443 f
"OSD{'s' if len(active_osds) > 1 else ''}"
2444 f
" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2446 cv_args
= ['--', 'lvm', 'zap', '--destroy', path
]
2447 with self
.async_timeout_handler(host
, f
'cephadm ceph-volume {" ".join(cv_args)}'):
2448 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2449 host
, 'osd', 'ceph-volume', cv_args
, error_ok
=True))
2451 self
.cache
.invalidate_host_devices(host
)
2452 self
.cache
.invalidate_host_networks(host
)
2454 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
2455 msg
= f
'zap successful for {path} on {host}'
2461 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
2463 Blink a device light. Calling something like::
2465 lsmcli local-disk-ident-led-on --path $path
2467 If you must, you can customize this via::
2469 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2470 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2472 See templates/blink_device_light_cmd.j2
2475 def blink(host
: str, dev
: str, path
: str) -> str:
2476 cmd_line
= self
.template
.render('blink_device_light_cmd.j2',
2479 'ident_fault': ident_fault
,
2484 cmd_args
= shlex
.split(cmd_line
)
2486 with self
.async_timeout_handler(host
, f
'cephadm shell -- {" ".join(cmd_args)}'):
2487 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2488 host
, 'osd', 'shell', ['--'] + cmd_args
,
2491 raise OrchestratorError(
2492 'Unable to affect %s light for %s:%s. Command: %s' % (
2493 ident_fault
, host
, dev
, ' '.join(cmd_args
)))
2494 self
.log
.info('Set %s light for %s:%s %s' % (
2495 ident_fault
, host
, dev
, 'on' if on
else 'off'))
2496 return "Set %s light for %s:%s %s" % (
2497 ident_fault
, host
, dev
, 'on' if on
else 'off')
2501 def get_osd_uuid_map(self
, only_up
=False):
2502 # type: (bool) -> Dict[str, str]
2503 osd_map
= self
.get('osd_map')
2505 for o
in osd_map
['osds']:
2506 # only include OSDs that have ever started in this map. this way
2507 # an interrupted osd create can be repeated and succeed the second
2509 osd_id
= o
.get('osd')
2511 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2513 r
[str(osd_id
)] = o
.get('uuid', '')
2516 def get_osd_by_id(self
, osd_id
: int) -> Optional
[Dict
[str, Any
]]:
2517 osd
= [x
for x
in self
.get('osd_map')['osds']
2518 if x
['osd'] == osd_id
]
2525 def _trigger_preview_refresh(self
,
2526 specs
: Optional
[List
[DriveGroupSpec
]] = None,
2527 service_name
: Optional
[str] = None,
2529 # Only trigger a refresh when a spec has changed
2533 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
2534 # the to-be-preview spec != the actual spec, this means we need to
2535 # trigger a refresh, if the spec has been removed (==None) we need to
2537 if not preview_spec
or spec
!= preview_spec
:
2538 trigger_specs
.append(spec
)
2540 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
2541 if not any(trigger_specs
):
2544 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
2545 for host
in refresh_hosts
:
2546 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
2547 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
2550 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
2552 Deprecated. Please use `apply()` instead.
2554 Keeping this around to be compatible to mgr/dashboard
2556 return [self
._apply
(spec
) for spec
in specs
]
2559 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
2560 hosts
: List
[HostSpec
] = self
.inventory
.all_specs()
2561 filtered_hosts
: List
[str] = drive_group
.placement
.filter_matching_hostspecs(hosts
)
2562 if not filtered_hosts
:
2563 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
2564 return self
.osd_service
.create_from_spec(drive_group
)
2566 def _preview_osdspecs(self
,
2567 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
2570 return {'n/a': [{'error': True,
2571 'message': 'No OSDSpec or matching hosts found.'}]}
2572 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
2573 if not matching_hosts
:
2574 return {'n/a': [{'error': True,
2575 'message': 'No OSDSpec or matching hosts found.'}]}
2576 # Is any host still loading previews or still in the queue to be previewed
2577 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
2578 if pending_hosts
or any(item
in self
.cache
.osdspec_previews_refresh_queue
for item
in matching_hosts
):
2579 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2580 return {'n/a': [{'error': True,
2581 'message': 'Preview data is being generated.. '
2582 'Please re-run this command in a bit.'}]}
2583 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2584 previews_for_specs
= {}
2585 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
2586 if host
not in matching_hosts
:
2589 for osd_report
in raw_reports
:
2590 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
2591 osd_reports
.append(osd_report
)
2592 previews_for_specs
.update({host
: osd_reports
})
2593 return previews_for_specs
2595 def _calc_daemon_deps(self
,
2596 spec
: Optional
[ServiceSpec
],
2598 daemon_id
: str) -> List
[str]:
2600 def get_daemon_names(daemons
: List
[str]) -> List
[str]:
2602 for daemon_type
in daemons
:
2603 for dd
in self
.cache
.get_daemons_by_type(daemon_type
):
2604 daemon_names
.append(dd
.name())
2608 if daemon_type
== 'haproxy':
2609 # because cephadm creates new daemon instances whenever
2610 # port or ip changes, identifying daemons by name is
2611 # sufficient to detect changes.
2614 ingress_spec
= cast(IngressSpec
, spec
)
2615 assert ingress_spec
.backend_service
2616 daemons
= self
.cache
.get_daemons_by_service(ingress_spec
.backend_service
)
2617 deps
= [d
.name() for d
in daemons
]
2618 elif daemon_type
== 'keepalived':
2619 # because cephadm creates new daemon instances whenever
2620 # port or ip changes, identifying daemons by name is
2621 # sufficient to detect changes.
2624 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2625 deps
= [d
.name() for d
in daemons
if d
.daemon_type
== 'haproxy']
2626 elif daemon_type
== 'agent':
2630 server_port
= str(self
.http_server
.agent
.server_port
)
2631 root_cert
= self
.http_server
.agent
.ssl_certs
.get_root_cert()
2634 deps
= sorted([self
.get_mgr_ip(), server_port
, root_cert
,
2635 str(self
.device_enhanced_scan
)])
2636 elif daemon_type
== 'iscsi':
2638 iscsi_spec
= cast(IscsiServiceSpec
, spec
)
2639 deps
= [self
.iscsi_service
.get_trusted_ips(iscsi_spec
)]
2641 deps
= [self
.get_mgr_ip()]
2642 elif daemon_type
== 'prometheus':
2643 # for prometheus we add the active mgr as an explicit dependency,
2644 # this way we force a redeploy after a mgr failover
2645 deps
.append(self
.get_active_mgr().name())
2646 deps
.append(str(self
.get_module_option_ex('prometheus', 'server_port', 9283)))
2647 deps
.append(str(self
.service_discovery_port
))
2648 # prometheus yaml configuration file (generated by prometheus.yml.j2) contains
2649 # a scrape_configs section for each service type. This should be included only
2650 # when at least one daemon of the corresponding service is running. Therefore,
2651 # an explicit dependency is added for each service-type to force a reconfig
2652 # whenever the number of daemons for those service-type changes from 0 to greater
2653 # than zero and vice versa.
2654 deps
+= [s
for s
in ['node-exporter', 'alertmanager'] if self
.cache
.get_daemons_by_service(s
)]
2655 if len(self
.cache
.get_daemons_by_type('ingress')) > 0:
2656 deps
.append('ingress')
2657 # add dependency on ceph-exporter daemons
2658 deps
+= [d
.name() for d
in self
.cache
.get_daemons_by_service('ceph-exporter')]
2659 if self
.secure_monitoring_stack
:
2660 if self
.prometheus_web_user
and self
.prometheus_web_password
:
2661 deps
.append(f
'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
2662 if self
.alertmanager_web_user
and self
.alertmanager_web_password
:
2663 deps
.append(f
'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
2664 elif daemon_type
== 'grafana':
2665 deps
+= get_daemon_names(['prometheus', 'loki'])
2666 if self
.secure_monitoring_stack
and self
.prometheus_web_user
and self
.prometheus_web_password
:
2667 deps
.append(f
'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
2668 elif daemon_type
== 'alertmanager':
2669 deps
+= get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
2670 if self
.secure_monitoring_stack
and self
.alertmanager_web_user
and self
.alertmanager_web_password
:
2671 deps
.append(f
'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
2672 elif daemon_type
== 'promtail':
2673 deps
+= get_daemon_names(['loki'])
2675 # TODO(redo): some error message!
2678 if daemon_type
in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']:
2679 deps
.append(f
'secure_monitoring_stack:{self.secure_monitoring_stack}')
2684 def _remove_daemons(self
, name
: str, host
: str) -> str:
2685 return CephadmServe(self
)._remove
_daemon
(name
, host
)
2687 def _check_pool_exists(self
, pool
: str, service_name
: str) -> None:
2688 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
2689 if not self
.rados
.pool_exists(pool
):
2690 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
2691 f
'service {service_name}')
2693 def _add_daemon(self
,
2695 spec
: ServiceSpec
) -> List
[str]:
2697 Add (and place) a daemon. Require explicit host placement. Do not
2698 schedule, and do not apply the related scheduling limitations.
2700 if spec
.service_name() not in self
.spec_store
:
2701 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2702 'Please use `ceph orch apply ...` to create a Service.\n'
2703 'Note, you might want to create the service with "unmanaged=true"')
2705 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
2706 if not spec
.placement
.hosts
:
2707 raise OrchestratorError('must specify host(s) to deploy on')
2708 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
2709 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2710 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
2711 spec
.placement
.hosts
, count
)
2713 def _create_daemons(self
,
2716 daemons
: List
[DaemonDescription
],
2717 hosts
: List
[HostPlacementSpec
],
2718 count
: int) -> List
[str]:
2719 if count
> len(hosts
):
2720 raise OrchestratorError('too few hosts: want %d, have %s' % (
2724 service_type
= daemon_type_to_service(daemon_type
)
2726 args
= [] # type: List[CephadmDaemonDeploySpec]
2727 for host
, network
, name
in hosts
:
2728 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2729 prefix
=spec
.service_id
,
2733 self
.cephadm_services
[service_type
].config(spec
)
2736 daemon_spec
= self
.cephadm_services
[service_type
].make_daemon_spec(
2737 host
, daemon_id
, network
, spec
,
2738 # NOTE: this does not consider port conflicts!
2739 ports
=spec
.get_port_start())
2740 self
.log
.debug('Placing %s.%s on host %s' % (
2741 daemon_type
, daemon_id
, host
))
2742 args
.append(daemon_spec
)
2744 # add to daemon list so next name(s) will also be unique
2745 sd
= orchestrator
.DaemonDescription(
2747 daemon_type
=daemon_type
,
2748 daemon_id
=daemon_id
,
2753 def create_func_map(*args
: Any
) -> str:
2754 daemon_spec
= self
.cephadm_services
[daemon_type
].prepare_create(*args
)
2755 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2756 return self
.wait_async(CephadmServe(self
)._create
_daemon
(daemon_spec
))
2758 return create_func_map(args
)
2761 def add_daemon(self
, spec
: ServiceSpec
) -> List
[str]:
2764 with orchestrator
.set_exception_subject('service', spec
.service_name(), overwrite
=True):
2765 for d_type
in service_to_daemon_types(spec
.service_type
):
2766 ret
.extend(self
._add
_daemon
(d_type
, spec
))
2768 except OrchestratorError
as e
:
2769 self
.events
.from_orch_error(e
)
2773 def get_prometheus_access_info(self
) -> Dict
[str, str]:
2774 return {'user': self
.prometheus_web_user
or '',
2775 'password': self
.prometheus_web_password
or '',
2776 'certificate': self
.http_server
.service_discovery
.ssl_certs
.get_root_cert()}
2779 def get_alertmanager_access_info(self
) -> Dict
[str, str]:
2780 return {'user': self
.alertmanager_web_user
or '',
2781 'password': self
.alertmanager_web_password
or '',
2782 'certificate': self
.http_server
.service_discovery
.ssl_certs
.get_root_cert()}
2785 def apply_mon(self
, spec
: ServiceSpec
) -> str:
2786 return self
._apply
(spec
)
2788 def _apply(self
, spec
: GenericSpec
) -> str:
2789 if spec
.service_type
== 'host':
2790 return self
._add
_host
(cast(HostSpec
, spec
))
2792 if spec
.service_type
== 'osd':
2793 # _trigger preview refresh needs to be smart and
2794 # should only refresh if a change has been detected
2795 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
2797 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
2799 def _get_candidate_hosts(self
, placement
: PlacementSpec
) -> List
[str]:
2800 """Return a list of candidate hosts according to the placement specification."""
2801 all_hosts
= self
.cache
.get_schedulable_hosts()
2802 draining_hosts
= [dh
.hostname
for dh
in self
.cache
.get_draining_hosts()]
2805 candidates
= [h
.hostname
for h
in placement
.hosts
if h
.hostname
in placement
.hosts
]
2806 elif placement
.label
:
2807 candidates
= [x
.hostname
for x
in [h
for h
in all_hosts
if placement
.label
in h
.labels
]]
2808 elif placement
.host_pattern
:
2809 candidates
= [x
for x
in placement
.filter_matching_hostspecs(all_hosts
)]
2810 elif (placement
.count
is not None or placement
.count_per_host
is not None):
2811 candidates
= [x
.hostname
for x
in all_hosts
]
2812 return [h
for h
in candidates
if h
not in draining_hosts
]
2814 def _validate_one_shot_placement_spec(self
, spec
: PlacementSpec
) -> None:
2815 """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
2816 if spec
.count
is not None:
2817 raise OrchestratorError(
2818 "Placement 'count' field is no supported for this specification.")
2819 if spec
.count_per_host
is not None:
2820 raise OrchestratorError(
2821 "Placement 'count_per_host' field is no supported for this specification.")
2823 all_hosts
= [h
.hostname
for h
in self
.inventory
.all_specs()]
2824 invalid_hosts
= [h
.hostname
for h
in spec
.hosts
if h
.hostname
not in all_hosts
]
2826 raise OrchestratorError(f
"Found invalid host(s) in placement section: {invalid_hosts}. "
2827 f
"Please check 'ceph orch host ls' for available hosts.")
2828 elif not self
._get
_candidate
_hosts
(spec
):
2829 raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n"
2830 "Please check 'ceph orch host ls' for available hosts.\n"
2831 "Note: draining hosts are excluded from the candidate list.")
2833 def _validate_tunedprofile_settings(self
, spec
: TunedProfileSpec
) -> Dict
[str, List
[str]]:
2834 candidate_hosts
= spec
.placement
.filter_matching_hostspecs(self
.inventory
.all_specs())
2835 invalid_options
: Dict
[str, List
[str]] = {}
2836 for host
in candidate_hosts
:
2837 host_sysctl_options
= self
.cache
.get_facts(host
).get('sysctl_options', {})
2838 invalid_options
[host
] = []
2839 for option
in spec
.settings
:
2840 if option
not in host_sysctl_options
:
2841 invalid_options
[host
].append(option
)
2842 return invalid_options
2844 def _validate_tuned_profile_spec(self
, spec
: TunedProfileSpec
) -> None:
2845 if not spec
.settings
:
2846 raise OrchestratorError("Invalid spec: settings section cannot be empty.")
2847 self
._validate
_one
_shot
_placement
_spec
(spec
.placement
)
2848 invalid_options
= self
._validate
_tunedprofile
_settings
(spec
)
2849 if any(e
for e
in invalid_options
.values()):
2850 raise OrchestratorError(
2851 f
'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}')
2854 def apply_tuned_profiles(self
, specs
: List
[TunedProfileSpec
], no_overwrite
: bool = False) -> str:
2857 self
._validate
_tuned
_profile
_spec
(spec
)
2858 if no_overwrite
and self
.tuned_profiles
.exists(spec
.profile_name
):
2860 f
"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2862 # done, let's save the specs
2863 self
.tuned_profiles
.add_profile(spec
)
2864 outs
.append(f
'Saved tuned profile {spec.profile_name}')
2865 self
._kick
_serve
_loop
()
2866 return '\n'.join(outs
)
2869 def rm_tuned_profile(self
, profile_name
: str) -> str:
2870 if profile_name
not in self
.tuned_profiles
:
2871 raise OrchestratorError(
2872 f
'Tuned profile {profile_name} does not exist. Nothing to remove.')
2873 self
.tuned_profiles
.rm_profile(profile_name
)
2874 self
._kick
_serve
_loop
()
2875 return f
'Removed tuned profile {profile_name}'
2878 def tuned_profile_ls(self
) -> List
[TunedProfileSpec
]:
2879 return self
.tuned_profiles
.list_profiles()
2882 def tuned_profile_add_setting(self
, profile_name
: str, setting
: str, value
: str) -> str:
2883 if profile_name
not in self
.tuned_profiles
:
2884 raise OrchestratorError(
2885 f
'Tuned profile {profile_name} does not exist. Cannot add setting.')
2886 self
.tuned_profiles
.add_setting(profile_name
, setting
, value
)
2887 self
._kick
_serve
_loop
()
2888 return f
'Added setting {setting} with value {value} to tuned profile {profile_name}'
2891 def tuned_profile_rm_setting(self
, profile_name
: str, setting
: str) -> str:
2892 if profile_name
not in self
.tuned_profiles
:
2893 raise OrchestratorError(
2894 f
'Tuned profile {profile_name} does not exist. Cannot remove setting.')
2895 self
.tuned_profiles
.rm_setting(profile_name
, setting
)
2896 self
._kick
_serve
_loop
()
2897 return f
'Removed setting {setting} from tuned profile {profile_name}'
2900 def service_discovery_dump_cert(self
) -> str:
2901 root_cert
= self
.get_store(ServiceDiscovery
.KV_STORE_SD_ROOT_CERT
)
2903 raise OrchestratorError('No certificate found for service discovery')
2906 def set_health_warning(self
, name
: str, summary
: str, count
: int, detail
: List
[str]) -> None:
2907 self
.health_checks
[name
] = {
2908 'severity': 'warning',
2913 self
.set_health_checks(self
.health_checks
)
2915 def remove_health_warning(self
, name
: str) -> None:
2916 if name
in self
.health_checks
:
2917 del self
.health_checks
[name
]
2918 self
.set_health_checks(self
.health_checks
)
2920 def _plan(self
, spec
: ServiceSpec
) -> dict:
2921 if spec
.service_type
== 'osd':
2922 return {'service_name': spec
.service_name(),
2923 'service_type': spec
.service_type
,
2924 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
2926 svc
= self
.cephadm_services
[spec
.service_type
]
2927 ha
= HostAssignment(
2929 hosts
=self
.cache
.get_schedulable_hosts(),
2930 unreachable_hosts
=self
.cache
.get_unreachable_hosts(),
2931 draining_hosts
=self
.cache
.get_draining_hosts(),
2932 networks
=self
.cache
.networks
,
2933 daemons
=self
.cache
.get_daemons_by_service(spec
.service_name()),
2934 allow_colo
=svc
.allow_colo(),
2935 rank_map
=self
.spec_store
[spec
.service_name()].rank_map
if svc
.ranked() else None
2938 hosts
, to_add
, to_remove
= ha
.place()
2941 'service_name': spec
.service_name(),
2942 'service_type': spec
.service_type
,
2943 'add': [hs
.hostname
for hs
in to_add
],
2944 'remove': [d
.name() for d
in to_remove
]
2948 def plan(self
, specs
: Sequence
[GenericSpec
]) -> List
:
2949 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2950 'to the current inventory setup. If any of these conditions change, the \n'
2951 'preview will be invalid. Please make sure to have a minimal \n'
2952 'timeframe between planning and applying the specs.'}]
2953 if any([spec
.service_type
== 'host' for spec
in specs
]):
2954 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2956 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
2959 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
2960 if spec
.placement
.is_empty():
2961 # fill in default placement
2963 'mon': PlacementSpec(count
=5),
2964 'mgr': PlacementSpec(count
=2),
2965 'mds': PlacementSpec(count
=2),
2966 'rgw': PlacementSpec(count
=2),
2967 'ingress': PlacementSpec(count
=2),
2968 'iscsi': PlacementSpec(count
=1),
2969 'rbd-mirror': PlacementSpec(count
=2),
2970 'cephfs-mirror': PlacementSpec(count
=1),
2971 'nfs': PlacementSpec(count
=1),
2972 'grafana': PlacementSpec(count
=1),
2973 'alertmanager': PlacementSpec(count
=1),
2974 'prometheus': PlacementSpec(count
=1),
2975 'node-exporter': PlacementSpec(host_pattern
='*'),
2976 'ceph-exporter': PlacementSpec(host_pattern
='*'),
2977 'loki': PlacementSpec(count
=1),
2978 'promtail': PlacementSpec(host_pattern
='*'),
2979 'crash': PlacementSpec(host_pattern
='*'),
2980 'container': PlacementSpec(count
=1),
2981 'snmp-gateway': PlacementSpec(count
=1),
2982 'elasticsearch': PlacementSpec(count
=1),
2983 'jaeger-agent': PlacementSpec(host_pattern
='*'),
2984 'jaeger-collector': PlacementSpec(count
=1),
2985 'jaeger-query': PlacementSpec(count
=1)
2987 spec
.placement
= defaults
[spec
.service_type
]
2988 elif spec
.service_type
in ['mon', 'mgr'] and \
2989 spec
.placement
.count
is not None and \
2990 spec
.placement
.count
< 1:
2991 raise OrchestratorError('cannot scale %s service below 1' % (
2994 host_count
= len(self
.inventory
.keys())
2995 max_count
= self
.max_count_per_host
2997 if spec
.placement
.count
is not None:
2998 if spec
.service_type
in ['mon', 'mgr']:
2999 if spec
.placement
.count
> max(5, host_count
):
3000 raise OrchestratorError(
3001 (f
'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
3002 elif spec
.service_type
!= 'osd':
3003 if spec
.placement
.count
> (max_count
* host_count
):
3004 raise OrchestratorError((f
'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
3005 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3007 if spec
.placement
.count_per_host
is not None and spec
.placement
.count_per_host
> max_count
and spec
.service_type
!= 'osd':
3008 raise OrchestratorError((f
'The maximum count_per_host allowed is {max_count}.'
3009 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3013 hosts
=self
.inventory
.all_specs(), # All hosts, even those without daemon refresh
3014 unreachable_hosts
=self
.cache
.get_unreachable_hosts(),
3015 draining_hosts
=self
.cache
.get_draining_hosts(),
3016 networks
=self
.cache
.networks
,
3017 daemons
=self
.cache
.get_daemons_by_service(spec
.service_name()),
3018 allow_colo
=self
.cephadm_services
[spec
.service_type
].allow_colo(),
3021 self
.log
.info('Saving service %s spec with placement %s' % (
3022 spec
.service_name(), spec
.placement
.pretty_str()))
3023 self
.spec_store
.save(spec
)
3024 self
._kick
_serve
_loop
()
3025 return "Scheduled %s update..." % spec
.service_name()
3028 def apply(self
, specs
: Sequence
[GenericSpec
], no_overwrite
: bool = False) -> List
[str]:
3032 if spec
.service_type
== 'host' and cast(HostSpec
, spec
).hostname
in self
.inventory
:
3033 results
.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
3034 % (cast(HostSpec
, spec
).hostname
, spec
.service_type
))
3036 elif cast(ServiceSpec
, spec
).service_name() in self
.spec_store
:
3037 results
.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
3038 % (cast(ServiceSpec
, spec
).service_name(), cast(ServiceSpec
, spec
).service_name()))
3040 results
.append(self
._apply
(spec
))
3044 def apply_mgr(self
, spec
: ServiceSpec
) -> str:
3045 return self
._apply
(spec
)
3048 def apply_mds(self
, spec
: ServiceSpec
) -> str:
3049 return self
._apply
(spec
)
3052 def apply_rgw(self
, spec
: ServiceSpec
) -> str:
3053 return self
._apply
(spec
)
3056 def apply_ingress(self
, spec
: ServiceSpec
) -> str:
3057 return self
._apply
(spec
)
3060 def apply_iscsi(self
, spec
: ServiceSpec
) -> str:
3061 return self
._apply
(spec
)
3064 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> str:
3065 return self
._apply
(spec
)
3068 def apply_nfs(self
, spec
: ServiceSpec
) -> str:
3069 return self
._apply
(spec
)
3071 def _get_dashboard_url(self
):
3073 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
3076 def apply_prometheus(self
, spec
: ServiceSpec
) -> str:
3077 return self
._apply
(spec
)
3080 def apply_loki(self
, spec
: ServiceSpec
) -> str:
3081 return self
._apply
(spec
)
3084 def apply_promtail(self
, spec
: ServiceSpec
) -> str:
3085 return self
._apply
(spec
)
3088 def apply_node_exporter(self
, spec
: ServiceSpec
) -> str:
3089 return self
._apply
(spec
)
3092 def apply_ceph_exporter(self
, spec
: ServiceSpec
) -> str:
3093 return self
._apply
(spec
)
3096 def apply_crash(self
, spec
: ServiceSpec
) -> str:
3097 return self
._apply
(spec
)
3100 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
3101 return self
._apply
(spec
)
3104 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
3105 return self
._apply
(spec
)
3108 def apply_container(self
, spec
: ServiceSpec
) -> str:
3109 return self
._apply
(spec
)
3112 def apply_snmp_gateway(self
, spec
: ServiceSpec
) -> str:
3113 return self
._apply
(spec
)
3116 def set_unmanaged(self
, service_name
: str, value
: bool) -> str:
3117 return self
.spec_store
.set_unmanaged(service_name
, value
)
3120 def upgrade_check(self
, image
: str, version
: str) -> str:
3121 if self
.inventory
.get_host_with_state("maintenance"):
3122 raise OrchestratorError("check aborted - you have hosts in maintenance state")
3125 target_name
= self
.container_image_base
+ ':v' + version
3129 raise OrchestratorError('must specify either image or version')
3131 with self
.async_timeout_handler(cmd
=f
'cephadm inspect-image (image {target_name})'):
3132 image_info
= self
.wait_async(CephadmServe(self
)._get
_container
_image
_info
(target_name
))
3134 ceph_image_version
= image_info
.ceph_version
3135 if not ceph_image_version
:
3136 return f
'Unable to extract ceph version from {target_name}.'
3137 if ceph_image_version
.startswith('ceph version '):
3138 ceph_image_version
= ceph_image_version
.split(' ')[2]
3139 version_error
= self
.upgrade
._check
_target
_version
(ceph_image_version
)
3141 return f
'Incompatible upgrade: {version_error}'
3143 self
.log
.debug(f
'image info {image} -> {image_info}')
3145 'target_name': target_name
,
3146 'target_id': image_info
.image_id
,
3147 'target_version': image_info
.ceph_version
,
3148 'needs_update': dict(),
3149 'up_to_date': list(),
3150 'non_ceph_image_daemons': list()
3152 for host
, dm
in self
.cache
.daemons
.items():
3153 for name
, dd
in dm
.items():
3154 # check if the container digest for the digest we're checking upgrades for matches
3155 # the container digests for the daemon if "use_repo_digest" setting is true
3156 # or that the image name matches the daemon's image name if "use_repo_digest"
3157 # is false. The idea is to generally check if the daemon is already using
3158 # the image we're checking upgrade to.
3160 (self
.use_repo_digest
and dd
.matches_digests(image_info
.repo_digests
))
3161 or (not self
.use_repo_digest
and dd
.matches_image_name(image
))
3163 r
['up_to_date'].append(dd
.name())
3164 elif dd
.daemon_type
in CEPH_IMAGE_TYPES
:
3165 r
['needs_update'][dd
.name()] = {
3166 'current_name': dd
.container_image_name
,
3167 'current_id': dd
.container_image_id
,
3168 'current_version': dd
.version
,
3171 r
['non_ceph_image_daemons'].append(dd
.name())
3172 if self
.use_repo_digest
and image_info
.repo_digests
:
3173 # FIXME: we assume the first digest is the best one to use
3174 r
['target_digest'] = image_info
.repo_digests
[0]
3176 return json
.dumps(r
, indent
=4, sort_keys
=True)
3179 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
3180 return self
.upgrade
.upgrade_status()
3183 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool]) -> Dict
[Any
, Any
]:
3184 return self
.upgrade
.upgrade_ls(image
, tags
, show_all_versions
)
3187 def upgrade_start(self
, image
: str, version
: str, daemon_types
: Optional
[List
[str]] = None, host_placement
: Optional
[str] = None,
3188 services
: Optional
[List
[str]] = None, limit
: Optional
[int] = None) -> str:
3189 if self
.inventory
.get_host_with_state("maintenance"):
3190 raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
3191 if self
.offline_hosts
:
3192 raise OrchestratorError(f
"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
3193 if daemon_types
is not None and services
is not None:
3194 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
3195 if daemon_types
is not None:
3196 for dtype
in daemon_types
:
3197 if dtype
not in CEPH_UPGRADE_ORDER
:
3198 raise OrchestratorError(f
'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
3199 f
'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
3200 if services
is not None:
3201 for service
in services
:
3202 if service
not in self
.spec_store
:
3203 raise OrchestratorError(f
'Upgrade aborted - Got unknown service name "{service}".\n'
3204 f
'Known services are: {self.spec_store.all_specs.keys()}')
3205 hosts
: Optional
[List
[str]] = None
3206 if host_placement
is not None:
3207 all_hosts
= list(self
.inventory
.all_specs())
3208 placement
= PlacementSpec
.from_string(host_placement
)
3209 hosts
= placement
.filter_matching_hostspecs(all_hosts
)
3211 raise OrchestratorError(
3212 f
'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
3214 if limit
is not None:
3216 raise OrchestratorError(
3217 f
'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
3219 return self
.upgrade
.upgrade_start(image
, version
, daemon_types
, hosts
, services
, limit
)
3222 def upgrade_pause(self
) -> str:
3223 return self
.upgrade
.upgrade_pause()
3226 def upgrade_resume(self
) -> str:
3227 return self
.upgrade
.upgrade_resume()
3230 def upgrade_stop(self
) -> str:
3231 return self
.upgrade
.upgrade_stop()
3234 def remove_osds(self
, osd_ids
: List
[str],
3235 replace
: bool = False,
3236 force
: bool = False,
3238 no_destroy
: bool = False) -> str:
3240 Takes a list of OSDs and schedules them for removal.
3241 The function that takes care of the actual removal is
3242 process_removal_queue().
3245 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
3246 to_remove_daemons
= list()
3247 for daemon
in daemons
:
3248 if daemon
.daemon_id
in osd_ids
:
3249 to_remove_daemons
.append(daemon
)
3250 if not to_remove_daemons
:
3251 return f
"Unable to find OSDs: {osd_ids}"
3253 for daemon
in to_remove_daemons
:
3254 assert daemon
.daemon_id
is not None
3256 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
3260 no_destroy
=no_destroy
,
3261 hostname
=daemon
.hostname
,
3262 process_started_at
=datetime_now(),
3263 remove_util
=self
.to_remove_osds
.rm_util
))
3264 except NotFoundError
:
3265 return f
"Unable to find OSDs: {osd_ids}"
3267 # trigger the serve loop to initiate the removal
3268 self
._kick
_serve
_loop
()
3269 warning_zap
= "" if zap
else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
3270 "Run the `ceph-volume lvm zap` command with `--destroy`"
3271 " against the VG/LV if you want them to be destroyed.")
3272 return f
"Scheduled OSD(s) for removal.{warning_zap}"
3275 def stop_remove_osds(self
, osd_ids
: List
[str]) -> str:
3277 Stops a `removal` process for a List of OSDs.
3278 This will revert their weight and remove it from the osds_to_remove queue
3280 for osd_id
in osd_ids
:
3282 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
3283 remove_util
=self
.to_remove_osds
.rm_util
))
3284 except (NotFoundError
, KeyError, ValueError):
3285 return f
'Unable to find OSD in the queue: {osd_id}'
3287 # trigger the serve loop to halt the removal
3288 self
._kick
_serve
_loop
()
3289 return "Stopped OSD(s) removal"
3292 def remove_osds_status(self
) -> List
[OSD
]:
3294 The CLI call to retrieve an osd removal report
3296 return self
.to_remove_osds
.all_osds()
3299 def drain_host(self
, hostname
, force
=False):
3300 # type: (str, bool) -> str
3302 Drain all daemons from a host.
3303 :param host: host name
3306 # if we drain the last admin host we could end up removing the only instance
3307 # of the config and keyring and cause issues
3309 p
= PlacementSpec(label
='_admin')
3310 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
3311 if len(admin_hosts
) == 1 and admin_hosts
[0] == hostname
:
3312 raise OrchestratorValidationError(f
"Host {hostname} is the last host with the '_admin'"
3313 " label.\nDraining this host could cause the removal"
3314 " of the last cluster config/keyring managed by cephadm.\n"
3315 "It is recommended to add the _admin label to another host"
3316 " before completing this operation.\nIf you're certain this is"
3317 " what you want rerun this command with --force.")
3319 self
.add_host_label(hostname
, '_no_schedule')
3321 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_host(hostname
)
3323 osds_to_remove
= [d
.daemon_id
for d
in daemons
if d
.daemon_type
== 'osd']
3324 self
.remove_osds(osds_to_remove
)
3327 daemons_table
+= "{:<20} {:<15}\n".format("type", "id")
3328 daemons_table
+= "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
3330 daemons_table
+= "{:<20} {:<15}\n".format(d
.daemon_type
, d
.daemon_id
)
3332 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname
, daemons_table
)
3334 def trigger_connect_dashboard_rgw(self
) -> None:
3335 self
.need_connect_dashboard_rgw
= True