8 from collections
import defaultdict
9 from configparser
import ConfigParser
10 from contextlib
import contextmanager
11 from functools
import wraps
12 from tempfile
import TemporaryDirectory
, NamedTemporaryFile
13 from threading
import Event
15 from cephadm
.service_discovery
import ServiceDiscovery
18 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
19 Any
, Set
, TYPE_CHECKING
, cast
, NamedTuple
, Sequence
, Type
, \
25 import multiprocessing
.pool
27 from prettytable
import PrettyTable
29 from ceph
.deployment
import inventory
30 from ceph
.deployment
.drive_group
import DriveGroupSpec
31 from ceph
.deployment
.service_spec
import \
32 ServiceSpec
, PlacementSpec
, \
33 HostPlacementSpec
, IngressSpec
, \
34 TunedProfileSpec
, IscsiServiceSpec
35 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
36 from cephadm
.serve
import CephadmServe
37 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
38 from cephadm
.http_server
import CephadmHttpServer
39 from cephadm
.agent
import CephadmAgentHelpers
42 from mgr_module
import MgrModule
, HandleCommandResult
, Option
, NotifyType
44 from orchestrator
.module
import to_format
, Format
46 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
47 CLICommandMeta
, DaemonDescription
, DaemonDescriptionStatus
, handle_orch_error
, \
48 service_to_daemon_types
49 from orchestrator
._interface
import GenericSpec
50 from orchestrator
._interface
import daemon_type_to_service
54 from .migrations
import Migrations
55 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
56 RbdMirrorService
, CrashService
, CephadmService
, CephfsMirrorService
, CephadmAgent
, \
58 from .services
.ingress
import IngressService
59 from .services
.container
import CustomContainerService
60 from .services
.iscsi
import IscsiService
61 from .services
.nvmeof
import NvmeofService
62 from .services
.nfs
import NFSService
63 from .services
.osd
import OSDRemovalQueue
, OSDService
, OSD
, NotFoundError
64 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
65 NodeExporterService
, SNMPGatewayService
, LokiService
, PromtailService
66 from .services
.jaeger
import ElasticSearchService
, JaegerAgentService
, JaegerCollectorService
, JaegerQueryService
67 from .schedule
import HostAssignment
68 from .inventory
import Inventory
, SpecStore
, HostCache
, AgentCache
, EventStore
, \
69 ClientKeyringStore
, ClientKeyringSpec
, TunedProfileStore
70 from .upgrade
import CephadmUpgrade
71 from .template
import TemplateMgr
72 from .utils
import CEPH_IMAGE_TYPES
, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
, forall_hosts
, \
73 cephadmNoImage
, CEPH_UPGRADE_ORDER
, SpecialHostLabels
74 from .configchecks
import CephadmConfigChecks
75 from .offline_watcher
import OfflineHostWatcher
76 from .tuned_profiles
import TunedProfileUtils
80 except ImportError as e
:
81 asyncssh
= None # type: ignore
82 asyncssh_import_error
= str(e
)
84 logger
= logging
.getLogger(__name__
)
88 DEFAULT_SSH_CONFIG
= """
91 StrictHostKeyChecking no
92 UserKnownHostsFile /dev/null
96 # cherrypy likes to sys.exit on error. don't let it take us down too!
99 def os_exit_noop(status
: int) -> None:
103 os
._exit
= os_exit_noop
# type: ignore
106 # Default container images -----------------------------------------------------
107 DEFAULT_IMAGE
= 'quay.io/ceph/ceph' # DO NOT ADD TAG TO THIS
108 DEFAULT_PROMETHEUS_IMAGE
= 'quay.io/prometheus/prometheus:v2.43.0'
109 DEFAULT_NODE_EXPORTER_IMAGE
= 'quay.io/prometheus/node-exporter:v1.5.0'
110 DEFAULT_NVMEOF_IMAGE
= 'quay.io/ceph/nvmeof:0.0.2'
111 DEFAULT_LOKI_IMAGE
= 'docker.io/grafana/loki:2.4.0'
112 DEFAULT_PROMTAIL_IMAGE
= 'docker.io/grafana/promtail:2.4.0'
113 DEFAULT_ALERT_MANAGER_IMAGE
= 'quay.io/prometheus/alertmanager:v0.25.0'
114 DEFAULT_GRAFANA_IMAGE
= 'quay.io/ceph/ceph-grafana:9.4.7'
115 DEFAULT_HAPROXY_IMAGE
= 'quay.io/ceph/haproxy:2.3'
116 DEFAULT_KEEPALIVED_IMAGE
= 'quay.io/ceph/keepalived:2.2.4'
117 DEFAULT_SNMP_GATEWAY_IMAGE
= 'docker.io/maxwo/snmp-notifier:v1.2.1'
118 DEFAULT_ELASTICSEARCH_IMAGE
= 'quay.io/omrizeneva/elasticsearch:6.8.23'
119 DEFAULT_JAEGER_COLLECTOR_IMAGE
= 'quay.io/jaegertracing/jaeger-collector:1.29'
120 DEFAULT_JAEGER_AGENT_IMAGE
= 'quay.io/jaegertracing/jaeger-agent:1.29'
121 DEFAULT_JAEGER_QUERY_IMAGE
= 'quay.io/jaegertracing/jaeger-query:1.29'
122 # ------------------------------------------------------------------------------
125 def host_exists(hostname_position
: int = 1) -> Callable
:
126 """Check that a hostname exists in the inventory"""
127 def inner(func
: Callable
) -> Callable
:
129 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
130 this
= args
[0] # self object
131 hostname
= args
[hostname_position
]
132 if hostname
not in this
.cache
.get_hosts():
133 candidates
= ','.join([h
for h
in this
.cache
.get_hosts() if h
.startswith(hostname
)])
134 help_msg
= f
"Did you mean {candidates}?" if candidates
else ""
135 raise OrchestratorError(
136 f
"Cannot find host '{hostname}' in the inventory. {help_msg}")
138 return func(*args
, **kwargs
)
143 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
,
144 metaclass
=CLICommandMeta
):
146 _STORE_HOST_PREFIX
= "host"
149 NOTIFY_TYPES
= [NotifyType
.mon_map
, NotifyType
.pg_summary
]
150 NATIVE_OPTIONS
= [] # type: List[Any]
156 desc
='customized SSH config file to connect to managed hosts',
159 'device_cache_timeout',
162 desc
='seconds to cache device inventory',
165 'device_enhanced_scan',
168 desc
='Use libstoragemgmt during device scans',
171 'inventory_list_all',
174 desc
='Whether ceph-volume inventory should report '
175 'more devices (mostly mappers (LVs / mpaths), partitions...)',
178 'daemon_cache_timeout',
181 desc
='seconds to cache service (daemon) inventory',
184 'facts_cache_timeout',
187 desc
='seconds to cache host facts data',
190 'host_check_interval',
193 desc
='how frequently to perform a host check',
198 enum_allowed
=['root', 'cephadm-package'],
200 desc
='mode for remote execution of cephadm',
203 'container_image_base',
204 default
=DEFAULT_IMAGE
,
205 desc
='Container image name, without the tag',
209 'container_image_prometheus',
210 default
=DEFAULT_PROMETHEUS_IMAGE
,
211 desc
='Prometheus container image',
214 'container_image_nvmeof',
215 default
=DEFAULT_NVMEOF_IMAGE
,
216 desc
='Nvme-of container image',
219 'container_image_grafana',
220 default
=DEFAULT_GRAFANA_IMAGE
,
221 desc
='Prometheus container image',
224 'container_image_alertmanager',
225 default
=DEFAULT_ALERT_MANAGER_IMAGE
,
226 desc
='Prometheus container image',
229 'container_image_node_exporter',
230 default
=DEFAULT_NODE_EXPORTER_IMAGE
,
231 desc
='Prometheus container image',
234 'container_image_loki',
235 default
=DEFAULT_LOKI_IMAGE
,
236 desc
='Loki container image',
239 'container_image_promtail',
240 default
=DEFAULT_PROMTAIL_IMAGE
,
241 desc
='Promtail container image',
244 'container_image_haproxy',
245 default
=DEFAULT_HAPROXY_IMAGE
,
246 desc
='HAproxy container image',
249 'container_image_keepalived',
250 default
=DEFAULT_KEEPALIVED_IMAGE
,
251 desc
='Keepalived container image',
254 'container_image_snmp_gateway',
255 default
=DEFAULT_SNMP_GATEWAY_IMAGE
,
256 desc
='SNMP Gateway container image',
259 'container_image_elasticsearch',
260 default
=DEFAULT_ELASTICSEARCH_IMAGE
,
261 desc
='elasticsearch container image',
264 'container_image_jaeger_agent',
265 default
=DEFAULT_JAEGER_AGENT_IMAGE
,
266 desc
='Jaeger agent container image',
269 'container_image_jaeger_collector',
270 default
=DEFAULT_JAEGER_COLLECTOR_IMAGE
,
271 desc
='Jaeger collector container image',
274 'container_image_jaeger_query',
275 default
=DEFAULT_JAEGER_QUERY_IMAGE
,
276 desc
='Jaeger query container image',
279 'warn_on_stray_hosts',
282 desc
='raise a health warning if daemons are detected on a host '
283 'that is not managed by cephadm',
286 'warn_on_stray_daemons',
289 desc
='raise a health warning if daemons are detected '
290 'that are not managed by cephadm',
293 'warn_on_failed_host_check',
296 desc
='raise a health warning if the host check fails',
302 desc
='log to the "cephadm" cluster log channel"',
308 desc
='allow SYS_PTRACE capability on ceph containers',
309 long_desc
='The SYS_PTRACE capability is needed to attach to a '
310 'process with gdb or strace. Enabling this options '
311 'can allow debugging daemons that encounter problems '
318 desc
='Run podman/docker with `--init`'
321 'prometheus_alerts_path',
323 default
='/etc/prometheus/ceph/ceph_default_alerts.yml',
324 desc
='location of alerts to include in prometheus deployments',
330 desc
='internal - do not modify',
331 # used to track spec and other data migrations.
337 desc
='manage configs like API endpoints in Dashboard.'
340 'manage_etc_ceph_ceph_conf',
343 desc
='Manage and own /etc/ceph/ceph.conf on the hosts.',
346 'manage_etc_ceph_ceph_conf_hosts',
349 desc
='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
356 desc
='Registry url for login purposes. This is not the default registry'
362 desc
='Custom repository username. Only used for logging into a registry.'
368 desc
='Custom repository password. Only used for logging into a registry.'
375 desc
='Registry is to be considered insecure (no TLS available). Only for development purposes.'
381 desc
='Automatically convert image tags to image digest. Make sure all daemons use the same image',
384 'config_checks_enabled',
387 desc
='Enable or disable the cephadm configuration analysis',
393 desc
='Search-registry to which we should normalize unqualified image names. '
394 'This is not the default registry',
397 'max_count_per_host',
400 desc
='max number of daemons per service per host',
403 'autotune_memory_target_ratio',
406 desc
='ratio of total system memory to divide amongst autotuned daemons'
412 desc
='how frequently to autotune daemon memory'
418 desc
='Use cephadm agent on each host to gather and send metadata'
421 'agent_refresh_rate',
424 desc
='How often agent on each host will try to gather and send metadata'
427 'agent_starting_port',
430 desc
='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
433 'agent_down_multiplier',
436 desc
='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
439 'max_osd_draining_count',
442 desc
='max number of osds that will be drained simultaneously when osds are removed'
445 'service_discovery_port',
448 desc
='cephadm service discovery port'
454 desc
='Pass --cgroups=split when cephadm creates containers (currently podman only)'
457 'log_refresh_metadata',
460 desc
='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
463 'secure_monitoring_stack',
466 desc
='Enable TLS security for all the monitoring stack daemons'
469 'default_cephadm_command_timeout',
472 desc
='Default timeout applied to cephadm commands run directly on '
473 'the host (in seconds)'
477 def __init__(self
, *args
: Any
, **kwargs
: Any
):
478 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
479 self
._cluster
_fsid
: str = self
.get('mon_map')['fsid']
480 self
.last_monmap
: Optional
[datetime
.datetime
] = None
486 self
.ssh
= ssh
.SSHManager(self
)
488 if self
.get_store('pause'):
493 # for mypy which does not run the code
495 self
.ssh_config_file
= None # type: Optional[str]
496 self
.device_cache_timeout
= 0
497 self
.daemon_cache_timeout
= 0
498 self
.facts_cache_timeout
= 0
499 self
.host_check_interval
= 0
500 self
.max_count_per_host
= 0
502 self
.container_image_base
= ''
503 self
.container_image_prometheus
= ''
504 self
.container_image_nvmeof
= ''
505 self
.container_image_grafana
= ''
506 self
.container_image_alertmanager
= ''
507 self
.container_image_node_exporter
= ''
508 self
.container_image_loki
= ''
509 self
.container_image_promtail
= ''
510 self
.container_image_haproxy
= ''
511 self
.container_image_keepalived
= ''
512 self
.container_image_snmp_gateway
= ''
513 self
.container_image_elasticsearch
= ''
514 self
.container_image_jaeger_agent
= ''
515 self
.container_image_jaeger_collector
= ''
516 self
.container_image_jaeger_query
= ''
517 self
.warn_on_stray_hosts
= True
518 self
.warn_on_stray_daemons
= True
519 self
.warn_on_failed_host_check
= True
520 self
.allow_ptrace
= False
521 self
.container_init
= True
522 self
.prometheus_alerts_path
= ''
523 self
.migration_current
: Optional
[int] = None
524 self
.config_dashboard
= True
525 self
.manage_etc_ceph_ceph_conf
= True
526 self
.manage_etc_ceph_ceph_conf_hosts
= '*'
527 self
.registry_url
: Optional
[str] = None
528 self
.registry_username
: Optional
[str] = None
529 self
.registry_password
: Optional
[str] = None
530 self
.registry_insecure
: bool = False
531 self
.use_repo_digest
= True
532 self
.default_registry
= ''
533 self
.autotune_memory_target_ratio
= 0.0
534 self
.autotune_interval
= 0
535 self
.ssh_user
: Optional
[str] = None
536 self
._ssh
_options
: Optional
[str] = None
537 self
.tkey
= NamedTemporaryFile()
538 self
.ssh_config_fname
: Optional
[str] = None
539 self
.ssh_config
: Optional
[str] = None
540 self
._temp
_files
: List
= []
541 self
.ssh_key
: Optional
[str] = None
542 self
.ssh_pub
: Optional
[str] = None
543 self
.ssh_cert
: Optional
[str] = None
544 self
.use_agent
= False
545 self
.agent_refresh_rate
= 0
546 self
.agent_down_multiplier
= 0.0
547 self
.agent_starting_port
= 0
548 self
.service_discovery_port
= 0
549 self
.secure_monitoring_stack
= False
550 self
.apply_spec_fails
: List
[Tuple
[str, str]] = []
551 self
.max_osd_draining_count
= 10
552 self
.device_enhanced_scan
= False
553 self
.inventory_list_all
= False
554 self
.cgroups_split
= True
555 self
.log_refresh_metadata
= False
556 self
.default_cephadm_command_timeout
= 0
558 self
.notify(NotifyType
.mon_map
, None)
561 path
= self
.get_ceph_option('cephadm_path')
563 assert isinstance(path
, str)
564 with
open(path
, 'rb') as f
:
565 self
._cephadm
= f
.read()
566 except (IOError, TypeError) as e
:
567 raise RuntimeError("unable to read cephadm at '%s': %s" % (
570 self
.cephadm_binary_path
= self
._get
_cephadm
_binary
_path
()
572 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
574 self
.ssh
._reconfig
_ssh
()
576 CephadmOrchestrator
.instance
= self
578 self
.upgrade
= CephadmUpgrade(self
)
580 self
.health_checks
: Dict
[str, dict] = {}
582 self
.inventory
= Inventory(self
)
584 self
.cache
= HostCache(self
)
587 self
.agent_cache
= AgentCache(self
)
588 self
.agent_cache
.load()
590 self
.to_remove_osds
= OSDRemovalQueue(self
)
591 self
.to_remove_osds
.load_from_store()
593 self
.spec_store
= SpecStore(self
)
594 self
.spec_store
.load()
596 self
.keys
= ClientKeyringStore(self
)
599 self
.tuned_profiles
= TunedProfileStore(self
)
600 self
.tuned_profiles
.load()
602 self
.tuned_profile_utils
= TunedProfileUtils(self
)
604 # ensure the host lists are in sync
605 for h
in self
.inventory
.keys():
606 if h
not in self
.cache
.daemons
:
607 self
.cache
.prime_empty_host(h
)
608 for h
in self
.cache
.get_hosts():
609 if h
not in self
.inventory
:
610 self
.cache
.rm_host(h
)
613 self
.events
= EventStore(self
)
614 self
.offline_hosts
: Set
[str] = set()
616 self
.migration
= Migrations(self
)
618 _service_classes
: Sequence
[Type
[CephadmService
]] = [
619 OSDService
, NFSService
, MonService
, MgrService
, MdsService
,
620 RgwService
, RbdMirrorService
, GrafanaService
, AlertmanagerService
,
621 PrometheusService
, NodeExporterService
, LokiService
, PromtailService
, CrashService
, IscsiService
,
622 IngressService
, CustomContainerService
, CephfsMirrorService
, NvmeofService
,
623 CephadmAgent
, CephExporterService
, SNMPGatewayService
, ElasticSearchService
,
624 JaegerQueryService
, JaegerAgentService
, JaegerCollectorService
627 # https://github.com/python/mypy/issues/8993
628 self
.cephadm_services
: Dict
[str, CephadmService
] = {
629 cls
.TYPE
: cls(self
) for cls
in _service_classes
} # type: ignore
631 self
.mgr_service
: MgrService
= cast(MgrService
, self
.cephadm_services
['mgr'])
632 self
.osd_service
: OSDService
= cast(OSDService
, self
.cephadm_services
['osd'])
633 self
.iscsi_service
: IscsiService
= cast(IscsiService
, self
.cephadm_services
['iscsi'])
634 self
.nvmeof_service
: NvmeofService
= cast(NvmeofService
, self
.cephadm_services
['nvmeof'])
636 self
.scheduled_async_actions
: List
[Callable
] = []
638 self
.template
= TemplateMgr(self
)
640 self
.requires_post_actions
: Set
[str] = set()
641 self
.need_connect_dashboard_rgw
= False
643 self
.config_checker
= CephadmConfigChecks(self
)
645 self
.http_server
= CephadmHttpServer(self
)
646 self
.http_server
.start()
647 self
.agent_helpers
= CephadmAgentHelpers(self
)
649 self
.agent_helpers
._apply
_agent
()
651 self
.offline_watcher
= OfflineHostWatcher(self
)
652 self
.offline_watcher
.start()
654 def shutdown(self
) -> None:
655 self
.log
.debug('shutdown')
656 self
._worker
_pool
.close()
657 self
._worker
_pool
.join()
658 self
.http_server
.shutdown()
659 self
.offline_watcher
.shutdown()
663 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
664 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
665 return self
.cephadm_services
[service_type
]
667 def _get_cephadm_binary_path(self
) -> str:
670 m
.update(self
._cephadm
)
671 return f
'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
673 def _kick_serve_loop(self
) -> None:
674 self
.log
.debug('_kick_serve_loop')
677 def serve(self
) -> None:
679 The main loop of cephadm.
681 A command handler will typically change the declarative state
682 of cephadm. This loop will then attempt to apply this new state.
685 self
.event_loop
= ssh
.EventLoopThread()
687 serve
= CephadmServe(self
)
690 def wait_async(self
, coro
: Awaitable
[T
], timeout
: Optional
[int] = None) -> T
:
692 timeout
= self
.default_cephadm_command_timeout
693 # put a lower bound of 60 seconds in case users
694 # accidentally set it to something unreasonable.
695 # For example if they though it was in minutes
696 # rather than seconds
698 self
.log
.info(f
'Found default timeout set to {timeout}. Instead trying minimum of 60.')
700 return self
.event_loop
.get_result(coro
, timeout
)
703 def async_timeout_handler(self
, host
: Optional
[str] = '',
704 cmd
: Optional
[str] = '',
705 timeout
: Optional
[int] = None) -> Iterator
[None]:
706 # this is meant to catch asyncio.TimeoutError and convert it into an
707 # OrchestratorError which much of the cephadm codebase is better equipped to handle.
708 # If the command being run, the host it is run on, or the timeout being used
709 # are provided, that will be included in the OrchestratorError's message
712 except asyncio
.TimeoutError
:
715 err_str
= f
'Command "{cmd}" timed out '
717 err_str
= 'Command timed out '
719 err_str
+= f
'on host {host} '
721 err_str
+= f
'(non-default {timeout} second timeout)'
723 err_str
+= (f
'(default {self.default_cephadm_command_timeout} second timeout)')
724 raise OrchestratorError(err_str
)
726 def set_container_image(self
, entity
: str, image
: str) -> None:
727 self
.check_mon_command({
728 'prefix': 'config set',
729 'name': 'container_image',
734 def config_notify(self
) -> None:
736 This method is called whenever one of our config options is changed.
738 TODO: this method should be moved into mgr_module.py
740 for opt
in self
.MODULE_OPTIONS
:
742 opt
['name'], # type: ignore
743 self
.get_module_option(opt
['name'])) # type: ignore
744 self
.log
.debug(' mgr option %s = %s',
745 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
746 for opt
in self
.NATIVE_OPTIONS
:
749 self
.get_ceph_option(opt
))
750 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
754 def notify(self
, notify_type
: NotifyType
, notify_id
: Optional
[str]) -> None:
755 if notify_type
== NotifyType
.mon_map
:
756 # get monmap mtime so we can refresh configs when mons change
757 monmap
= self
.get('mon_map')
758 self
.last_monmap
= str_to_datetime(monmap
['modified'])
759 if self
.last_monmap
and self
.last_monmap
> datetime_now():
760 self
.last_monmap
= None # just in case clocks are skewed
761 if getattr(self
, 'manage_etc_ceph_ceph_conf', False):
762 # getattr, due to notify() being called before config_notify()
763 self
._kick
_serve
_loop
()
764 if notify_type
== NotifyType
.pg_summary
:
765 self
._trigger
_osd
_removal
()
767 def _trigger_osd_removal(self
) -> None:
768 remove_queue
= self
.to_remove_osds
.as_osd_ids()
771 data
= self
.get("osd_stats")
772 for osd
in data
.get('osd_stats', []):
773 if osd
.get('num_pgs') == 0:
774 # if _ANY_ osd that is currently in the queue appears to be empty,
775 # start the removal process
776 if int(osd
.get('osd')) in remove_queue
:
777 self
.log
.debug('Found empty osd. Starting removal process')
778 # if the osd that is now empty is also part of the removal queue
780 self
._kick
_serve
_loop
()
782 def pause(self
) -> None:
784 self
.log
.info('Paused')
785 self
.set_store('pause', 'true')
787 # wake loop so we update the health status
788 self
._kick
_serve
_loop
()
790 def resume(self
) -> None:
792 self
.log
.info('Resumed')
794 self
.set_store('pause', None)
795 # unconditionally wake loop so that 'orch resume' can be used to kick
797 self
._kick
_serve
_loop
()
803 existing
: List
[orchestrator
.DaemonDescription
],
804 prefix
: Optional
[str] = None,
805 forcename
: Optional
[str] = None,
806 rank
: Optional
[int] = None,
807 rank_generation
: Optional
[int] = None,
810 Generate a unique random service name
812 suffix
= daemon_type
not in [
813 'mon', 'crash', 'ceph-exporter',
814 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
815 'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
816 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
819 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
820 raise orchestrator
.OrchestratorValidationError(
821 f
'name {daemon_type}.{forcename} already in use')
825 host
= host
.split('.')[0]
831 if rank
is not None and rank_generation
is not None:
832 name
+= f
'{rank}.{rank_generation}.'
835 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
837 if len([d
for d
in existing
if d
.daemon_id
== name
]):
839 raise orchestrator
.OrchestratorValidationError(
840 f
'name {daemon_type}.{name} already in use')
841 self
.log
.debug('name %s exists, trying again', name
)
845 def validate_ssh_config_content(self
, ssh_config
: Optional
[str]) -> None:
846 if ssh_config
is None or len(ssh_config
.strip()) == 0:
847 raise OrchestratorValidationError('ssh_config cannot be empty')
848 # StrictHostKeyChecking is [yes|no] ?
849 res
= re
.findall(r
'StrictHostKeyChecking\s+.*', ssh_config
)
851 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
853 if 'ask' in s
.lower():
854 raise OrchestratorValidationError(f
'ssh_config cannot contain: \'{s}\'')
856 def validate_ssh_config_fname(self
, ssh_config_fname
: str) -> None:
857 if not os
.path
.isfile(ssh_config_fname
):
858 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
861 def _process_ls_output(self
, host
: str, ls
: List
[Dict
[str, Any
]]) -> None:
862 def _as_datetime(value
: Optional
[str]) -> Optional
[datetime
.datetime
]:
863 return str_to_datetime(value
) if value
is not None else None
867 if not d
['style'].startswith('cephadm'):
869 if d
['fsid'] != self
._cluster
_fsid
:
871 if '.' not in d
['name']:
873 daemon_type
= d
['name'].split('.')[0]
874 if daemon_type
not in orchestrator
.KNOWN_DAEMON_TYPES
:
875 logger
.warning(f
"Found unknown daemon type {daemon_type} on host {host}")
878 container_id
= d
.get('container_id')
881 container_id
= container_id
[0:12]
882 rank
= int(d
['rank']) if d
.get('rank') is not None else None
883 rank_generation
= int(d
['rank_generation']) if d
.get(
884 'rank_generation') is not None else None
885 status
, status_desc
= None, 'unknown'
887 status_desc
= d
['state']
889 'running': DaemonDescriptionStatus
.running
,
890 'stopped': DaemonDescriptionStatus
.stopped
,
891 'error': DaemonDescriptionStatus
.error
,
892 'unknown': DaemonDescriptionStatus
.error
,
894 sd
= orchestrator
.DaemonDescription(
895 daemon_type
=daemon_type
,
896 daemon_id
='.'.join(d
['name'].split('.')[1:]),
898 container_id
=container_id
,
899 container_image_id
=d
.get('container_image_id'),
900 container_image_name
=d
.get('container_image_name'),
901 container_image_digests
=d
.get('container_image_digests'),
902 version
=d
.get('version'),
904 status_desc
=status_desc
,
905 created
=_as_datetime(d
.get('created')),
906 started
=_as_datetime(d
.get('started')),
907 last_refresh
=datetime_now(),
908 last_configured
=_as_datetime(d
.get('last_configured')),
909 last_deployed
=_as_datetime(d
.get('last_deployed')),
910 memory_usage
=d
.get('memory_usage'),
911 memory_request
=d
.get('memory_request'),
912 memory_limit
=d
.get('memory_limit'),
913 cpu_percentage
=d
.get('cpu_percentage'),
914 service_name
=d
.get('service_name'),
915 ports
=d
.get('ports'),
917 deployed_by
=d
.get('deployed_by'),
919 rank_generation
=rank_generation
,
920 extra_container_args
=d
.get('extra_container_args'),
921 extra_entrypoint_args
=d
.get('extra_entrypoint_args'),
924 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
925 self
.cache
.update_host_daemons(host
, dm
)
926 self
.cache
.save_host(host
)
929 def update_watched_hosts(self
) -> None:
930 # currently, we are watching hosts with nfs daemons
931 hosts_to_watch
= [d
.hostname
for d
in self
.cache
.get_daemons(
932 ) if d
.daemon_type
in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
]
933 self
.offline_watcher
.set_hosts(list(set([h
for h
in hosts_to_watch
if h
is not None])))
935 def offline_hosts_remove(self
, host
: str) -> None:
936 if host
in self
.offline_hosts
:
937 self
.offline_hosts
.remove(host
)
939 def update_failed_daemon_health_check(self
) -> None:
941 for dd
in self
.cache
.get_error_daemons():
942 if dd
.daemon_type
!= 'agent': # agents tracked by CEPHADM_AGENT_DOWN
943 failed_daemons
.append('daemon %s on %s is in %s state' % (
944 dd
.name(), dd
.hostname
, dd
.status_desc
946 self
.remove_health_warning('CEPHADM_FAILED_DAEMON')
948 self
.set_health_warning('CEPHADM_FAILED_DAEMON', f
'{len(failed_daemons)} failed cephadm daemon(s)', len(
949 failed_daemons
), failed_daemons
)
952 def can_run() -> Tuple
[bool, str]:
953 if asyncssh
is not None:
956 return False, "loading asyncssh library:{}".format(
957 asyncssh_import_error
)
959 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
961 The cephadm orchestrator is always available.
963 ok
, err
= self
.can_run()
966 if not self
.ssh_key
or not self
.ssh_pub
:
967 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
969 # mypy is unable to determine type for _processes since it's private
970 worker_count
: int = self
._worker
_pool
._processes
# type: ignore
972 "workers": worker_count
,
973 "paused": self
.paused
,
976 return True, err
, ret
978 def _validate_and_set_ssh_val(self
, what
: str, new
: Optional
[str], old
: Optional
[str]) -> None:
979 self
.set_store(what
, new
)
980 self
.ssh
._reconfig
_ssh
()
981 if self
.cache
.get_hosts():
982 # Can't check anything without hosts
983 host
= self
.cache
.get_hosts()[0]
984 r
= CephadmServe(self
)._check
_host
(host
)
986 # connection failed reset user
987 self
.set_store(what
, old
)
988 self
.ssh
._reconfig
_ssh
()
989 raise OrchestratorError('ssh connection %s@%s failed' % (self
.ssh_user
, host
))
990 self
.log
.info(f
'Set ssh {what}')
992 @orchestrator._cli
_write
_command
(
993 prefix
='cephadm set-ssh-config')
994 def _set_ssh_config(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
996 Set the ssh_config file (use -i <ssh_config>)
998 # Set an ssh_config file provided from stdin
1000 old
= self
.ssh_config
1002 return 0, "value unchanged", ""
1003 self
.validate_ssh_config_content(inbuf
)
1004 self
._validate
_and
_set
_ssh
_val
('ssh_config', inbuf
, old
)
1007 @orchestrator._cli
_write
_command
('cephadm clear-ssh-config')
1008 def _clear_ssh_config(self
) -> Tuple
[int, str, str]:
1010 Clear the ssh_config file
1012 # Clear the ssh_config file provided from stdin
1013 self
.set_store("ssh_config", None)
1014 self
.ssh_config_tmp
= None
1015 self
.log
.info('Cleared ssh_config')
1016 self
.ssh
._reconfig
_ssh
()
1019 @orchestrator._cli
_read
_command
('cephadm get-ssh-config')
1020 def _get_ssh_config(self
) -> HandleCommandResult
:
1022 Returns the ssh config as used by cephadm
1024 if self
.ssh_config_file
:
1025 self
.validate_ssh_config_fname(self
.ssh_config_file
)
1026 with
open(self
.ssh_config_file
) as f
:
1027 return HandleCommandResult(stdout
=f
.read())
1028 ssh_config
= self
.get_store("ssh_config")
1030 return HandleCommandResult(stdout
=ssh_config
)
1031 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
1033 @orchestrator._cli
_write
_command
('cephadm generate-key')
1034 def _generate_key(self
) -> Tuple
[int, str, str]:
1036 Generate a cluster SSH key (if not present)
1038 if not self
.ssh_pub
or not self
.ssh_key
:
1039 self
.log
.info('Generating ssh key...')
1040 tmp_dir
= TemporaryDirectory()
1041 path
= tmp_dir
.name
+ '/key'
1043 subprocess
.check_call([
1044 '/usr/bin/ssh-keygen',
1045 '-C', 'ceph-%s' % self
._cluster
_fsid
,
1049 with
open(path
, 'r') as f
:
1051 with
open(path
+ '.pub', 'r') as f
:
1055 os
.unlink(path
+ '.pub')
1057 self
.set_store('ssh_identity_key', secret
)
1058 self
.set_store('ssh_identity_pub', pub
)
1059 self
.ssh
._reconfig
_ssh
()
1062 @orchestrator._cli
_write
_command
(
1063 'cephadm set-priv-key')
1064 def _set_priv_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1065 """Set cluster SSH private key (use -i <private_key>)"""
1066 if inbuf
is None or len(inbuf
) == 0:
1067 return -errno
.EINVAL
, "", "empty private ssh key provided"
1070 return 0, "value unchanged", ""
1071 self
._validate
_and
_set
_ssh
_val
('ssh_identity_key', inbuf
, old
)
1072 self
.log
.info('Set ssh private key')
1075 @orchestrator._cli
_write
_command
(
1076 'cephadm set-pub-key')
1077 def _set_pub_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1078 """Set cluster SSH public key (use -i <public_key>)"""
1079 if inbuf
is None or len(inbuf
) == 0:
1080 return -errno
.EINVAL
, "", "empty public ssh key provided"
1083 return 0, "value unchanged", ""
1084 self
._validate
_and
_set
_ssh
_val
('ssh_identity_pub', inbuf
, old
)
1087 @orchestrator._cli
_write
_command
(
1088 'cephadm set-signed-cert')
1089 def _set_signed_cert(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1090 """Set a signed cert if CA signed keys are being used (use -i <cert_filename>)"""
1091 if inbuf
is None or len(inbuf
) == 0:
1092 return -errno
.EINVAL
, "", "empty cert file provided"
1095 return 0, "value unchanged", ""
1096 self
._validate
_and
_set
_ssh
_val
('ssh_identity_cert', inbuf
, old
)
1099 @orchestrator._cli
_write
_command
(
1100 'cephadm clear-key')
1101 def _clear_key(self
) -> Tuple
[int, str, str]:
1102 """Clear cluster SSH key"""
1103 self
.set_store('ssh_identity_key', None)
1104 self
.set_store('ssh_identity_pub', None)
1105 self
.set_store('ssh_identity_cert', None)
1106 self
.ssh
._reconfig
_ssh
()
1107 self
.log
.info('Cleared cluster SSH key')
1110 @orchestrator._cli
_read
_command
(
1111 'cephadm get-pub-key')
1112 def _get_pub_key(self
) -> Tuple
[int, str, str]:
1113 """Show SSH public key for connecting to cluster hosts"""
1115 return 0, self
.ssh_pub
, ''
1117 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
1119 @orchestrator._cli
_read
_command
(
1120 'cephadm get-signed-cert')
1121 def _get_signed_cert(self
) -> Tuple
[int, str, str]:
1122 """Show SSH signed cert for connecting to cluster hosts using CA signed keys"""
1124 return 0, self
.ssh_cert
, ''
1126 return -errno
.ENOENT
, '', 'No signed cert defined'
1128 @orchestrator._cli
_read
_command
(
1130 def _get_user(self
) -> Tuple
[int, str, str]:
1132 Show user for SSHing to cluster hosts
1134 if self
.ssh_user
is None:
1135 return -errno
.ENOENT
, '', 'No cluster SSH user configured'
1137 return 0, self
.ssh_user
, ''
1139 @orchestrator._cli
_read
_command
(
1141 def set_ssh_user(self
, user
: str) -> Tuple
[int, str, str]:
1143 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
1145 current_user
= self
.ssh_user
1146 if user
== current_user
:
1147 return 0, "value unchanged", ""
1149 self
._validate
_and
_set
_ssh
_val
('ssh_user', user
, current_user
)
1150 current_ssh_config
= self
._get
_ssh
_config
()
1151 new_ssh_config
= re
.sub(r
"(\s{2}User\s)(.*)", r
"\1" + user
, current_ssh_config
.stdout
)
1152 self
._set
_ssh
_config
(new_ssh_config
)
1154 msg
= 'ssh user set to %s' % user
1156 msg
+= '. sudo will be used'
1160 @orchestrator._cli
_read
_command
(
1161 'cephadm registry-login')
1162 def registry_login(self
, url
: Optional
[str] = None, username
: Optional
[str] = None, password
: Optional
[str] = None, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1164 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1166 # if password not given in command line, get it through file input
1167 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
1168 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
1169 "or -i <login credentials json file>")
1170 elif (url
and username
and password
):
1171 registry_json
= {'url': url
, 'username': username
, 'password': password
}
1173 assert isinstance(inbuf
, str)
1174 registry_json
= json
.loads(inbuf
)
1175 if "url" not in registry_json
or "username" not in registry_json
or "password" not in registry_json
:
1176 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
1177 "Please setup json file as\n"
1179 " \"url\": \"REGISTRY_URL\",\n"
1180 " \"username\": \"REGISTRY_USERNAME\",\n"
1181 " \"password\": \"REGISTRY_PASSWORD\"\n"
1184 # verify login info works by attempting login on random host
1186 for host_name
in self
.inventory
.keys():
1190 raise OrchestratorError('no hosts defined')
1191 with self
.async_timeout_handler(host
, 'cephadm registry-login'):
1192 r
= self
.wait_async(CephadmServe(self
)._registry
_login
(host
, registry_json
))
1195 # if logins succeeded, store info
1196 self
.log
.debug("Host logins successful. Storing login info.")
1197 self
.set_store('registry_credentials', json
.dumps(registry_json
))
1198 # distribute new login info to all hosts
1199 self
.cache
.distribute_new_registry_login_info()
1200 return 0, "registry login scheduled", ''
1202 @orchestrator._cli
_read
_command
('cephadm check-host')
1203 def check_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
1204 """Check whether we can access and manage a remote host"""
1206 with self
.async_timeout_handler(host
, f
'cephadm check-host --expect-hostname {host}'):
1207 out
, err
, code
= self
.wait_async(
1208 CephadmServe(self
)._run
_cephadm
(
1209 host
, cephadmNoImage
, 'check-host', ['--expect-hostname', host
],
1210 addr
=addr
, error_ok
=True, no_fsid
=True))
1212 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
1213 except ssh
.HostConnectionError
as e
:
1215 f
"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
1216 return 1, '', ('check-host failed:\n'
1217 + f
"Failed to connect to {host} at address ({e.addr}): {str(e)}")
1218 except OrchestratorError
:
1219 self
.log
.exception(f
"check-host failed for '{host}'")
1220 return 1, '', ('check-host failed:\n'
1221 + f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
1222 # if we have an outstanding health alert for this host, give the
1223 # serve thread a kick
1224 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1225 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1226 if item
.startswith('host %s ' % host
):
1228 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
1230 @orchestrator._cli
_read
_command
(
1231 'cephadm prepare-host')
1232 def _prepare_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
1233 """Prepare a remote host for use with cephadm"""
1234 with self
.async_timeout_handler(host
, 'cephadm prepare-host'):
1235 out
, err
, code
= self
.wait_async(
1236 CephadmServe(self
)._run
_cephadm
(
1237 host
, cephadmNoImage
, 'prepare-host', ['--expect-hostname', host
],
1238 addr
=addr
, error_ok
=True, no_fsid
=True))
1240 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
1241 # if we have an outstanding health alert for this host, give the
1242 # serve thread a kick
1243 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1244 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1245 if item
.startswith('host %s ' % host
):
1247 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
1249 @orchestrator._cli
_write
_command
(
1250 prefix
='cephadm set-extra-ceph-conf')
1251 def _set_extra_ceph_conf(self
, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1253 Text that is appended to all daemon's ceph.conf.
1254 Mainly a workaround, till `config generate-minimal-conf` generates
1255 a complete ceph.conf.
1257 Warning: this is a dangerous operation.
1262 cp
.read_string(inbuf
, source
='<infile>')
1264 self
.set_store("extra_ceph_conf", json
.dumps({
1266 'last_modified': datetime_to_str(datetime_now())
1268 self
.log
.info('Set extra_ceph_conf')
1269 self
._kick
_serve
_loop
()
1270 return HandleCommandResult()
1272 @orchestrator._cli
_read
_command
(
1273 'cephadm get-extra-ceph-conf')
1274 def _get_extra_ceph_conf(self
) -> HandleCommandResult
:
1276 Get extra ceph conf that is appended
1278 return HandleCommandResult(stdout
=self
.extra_ceph_conf().conf
)
1280 @orchestrator._cli
_read
_command
('cephadm config-check ls')
1281 def _config_checks_list(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1282 """List the available configuration checks and their current state"""
1284 if format
not in [Format
.plain
, Format
.json
, Format
.json_pretty
]:
1285 return HandleCommandResult(
1287 stderr
="Requested format is not supported when listing configuration checks"
1290 if format
in [Format
.json
, Format
.json_pretty
]:
1291 return HandleCommandResult(
1292 stdout
=to_format(self
.config_checker
.health_checks
,
1298 table
= PrettyTable(
1304 table
.align
['NAME'] = 'l'
1305 table
.align
['HEALTHCHECK'] = 'l'
1306 table
.align
['STATUS'] = 'l'
1307 table
.align
['DESCRIPTION'] = 'l'
1308 table
.left_padding_width
= 0
1309 table
.right_padding_width
= 2
1310 for c
in self
.config_checker
.health_checks
:
1318 return HandleCommandResult(stdout
=table
.get_string())
1320 @orchestrator._cli
_read
_command
('cephadm config-check status')
1321 def _config_check_status(self
) -> HandleCommandResult
:
1322 """Show whether the configuration checker feature is enabled/disabled"""
1323 status
= self
.get_module_option('config_checks_enabled')
1324 return HandleCommandResult(stdout
="Enabled" if status
else "Disabled")
1326 @orchestrator._cli
_write
_command
('cephadm config-check enable')
1327 def _config_check_enable(self
, check_name
: str) -> HandleCommandResult
:
1328 """Enable a specific configuration check"""
1329 if not self
._config
_check
_valid
(check_name
):
1330 return HandleCommandResult(retval
=1, stderr
="Invalid check name")
1332 err
, msg
= self
._update
_config
_check
(check_name
, 'enabled')
1334 return HandleCommandResult(
1336 stderr
=f
"Failed to enable check '{check_name}' : {msg}")
1338 return HandleCommandResult(stdout
="ok")
1340 @orchestrator._cli
_write
_command
('cephadm config-check disable')
1341 def _config_check_disable(self
, check_name
: str) -> HandleCommandResult
:
1342 """Disable a specific configuration check"""
1343 if not self
._config
_check
_valid
(check_name
):
1344 return HandleCommandResult(retval
=1, stderr
="Invalid check name")
1346 err
, msg
= self
._update
_config
_check
(check_name
, 'disabled')
1348 return HandleCommandResult(retval
=err
, stderr
=f
"Failed to disable check '{check_name}': {msg}")
1350 # drop any outstanding raised healthcheck for this check
1351 config_check
= self
.config_checker
.lookup_check(check_name
)
1353 if config_check
.healthcheck_name
in self
.health_checks
:
1354 self
.health_checks
.pop(config_check
.healthcheck_name
, None)
1355 self
.set_health_checks(self
.health_checks
)
1358 f
"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1360 return HandleCommandResult(stdout
="ok")
1362 def _config_check_valid(self
, check_name
: str) -> bool:
1363 return check_name
in [chk
.name
for chk
in self
.config_checker
.health_checks
]
1365 def _update_config_check(self
, check_name
: str, status
: str) -> Tuple
[int, str]:
1366 checks_raw
= self
.get_store('config_checks')
1368 return 1, "config_checks setting is not available"
1370 checks
= json
.loads(checks_raw
)
1374 self
.log
.info(f
"updated config check '{check_name}' : {status}")
1375 self
.set_store('config_checks', json
.dumps(checks
))
1378 class ExtraCephConf(NamedTuple
):
1380 last_modified
: Optional
[datetime
.datetime
]
1382 def extra_ceph_conf(self
) -> 'CephadmOrchestrator.ExtraCephConf':
1383 data
= self
.get_store('extra_ceph_conf')
1385 return CephadmOrchestrator
.ExtraCephConf('', None)
1387 j
= json
.loads(data
)
1389 msg
= 'Unable to load extra_ceph_conf: Cannot decode JSON'
1390 self
.log
.exception('%s: \'%s\'', msg
, data
)
1391 return CephadmOrchestrator
.ExtraCephConf('', None)
1392 return CephadmOrchestrator
.ExtraCephConf(j
['conf'], str_to_datetime(j
['last_modified']))
1394 def extra_ceph_conf_is_newer(self
, dt
: datetime
.datetime
) -> bool:
1395 conf
= self
.extra_ceph_conf()
1396 if not conf
.last_modified
:
1398 return conf
.last_modified
> dt
1400 @orchestrator._cli
_write
_command
(
1401 'cephadm osd activate'
1403 def _osd_activate(self
, host
: List
[str]) -> HandleCommandResult
:
1405 Start OSD containers for existing OSDs
1409 def run(h
: str) -> str:
1410 with self
.async_timeout_handler(h
, 'cephadm deploy (osd daemon)'):
1411 return self
.wait_async(self
.osd_service
.deploy_osd_daemons_for_existing_osds(h
, 'osd'))
1413 return HandleCommandResult(stdout
='\n'.join(run(host
)))
1415 @orchestrator._cli
_read
_command
('orch client-keyring ls')
1416 def _client_keyring_ls(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1418 List client keyrings under cephadm management
1420 if format
!= Format
.plain
:
1421 output
= to_format(self
.keys
.keys
.values(), format
, many
=True, cls
=ClientKeyringSpec
)
1423 table
= PrettyTable(
1424 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1427 table
.left_padding_width
= 0
1428 table
.right_padding_width
= 2
1429 for ks
in sorted(self
.keys
.keys
.values(), key
=lambda ks
: ks
.entity
):
1431 ks
.entity
, ks
.placement
.pretty_str(),
1432 utils
.file_mode_to_str(ks
.mode
),
1433 f
'{ks.uid}:{ks.gid}',
1436 output
= table
.get_string()
1437 return HandleCommandResult(stdout
=output
)
1439 @orchestrator._cli
_write
_command
('orch client-keyring set')
1440 def _client_keyring_set(
1444 owner
: Optional
[str] = None,
1445 mode
: Optional
[str] = None,
1446 ) -> HandleCommandResult
:
1448 Add or update client keyring under cephadm management
1450 if not entity
.startswith('client.'):
1451 raise OrchestratorError('entity must start with client.')
1454 uid
, gid
= map(int, owner
.split(':'))
1456 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1462 imode
= int(mode
, 8)
1464 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1467 pspec
= PlacementSpec
.from_string(placement
)
1468 ks
= ClientKeyringSpec(entity
, pspec
, mode
=imode
, uid
=uid
, gid
=gid
)
1469 self
.keys
.update(ks
)
1470 self
._kick
_serve
_loop
()
1471 return HandleCommandResult()
1473 @orchestrator._cli
_write
_command
('orch client-keyring rm')
1474 def _client_keyring_rm(
1477 ) -> HandleCommandResult
:
1479 Remove client keyring from cephadm management
1481 self
.keys
.rm(entity
)
1482 self
._kick
_serve
_loop
()
1483 return HandleCommandResult()
1485 def _get_container_image(self
, daemon_name
: str) -> Optional
[str]:
1486 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1487 image
: Optional
[str] = None
1488 if daemon_type
in CEPH_IMAGE_TYPES
:
1489 # get container image
1490 image
= str(self
.get_foreign_ceph_option(
1491 utils
.name_to_config_section(daemon_name
),
1494 elif daemon_type
== 'prometheus':
1495 image
= self
.container_image_prometheus
1496 elif daemon_type
== 'nvmeof':
1497 image
= self
.container_image_nvmeof
1498 elif daemon_type
== 'grafana':
1499 image
= self
.container_image_grafana
1500 elif daemon_type
== 'alertmanager':
1501 image
= self
.container_image_alertmanager
1502 elif daemon_type
== 'node-exporter':
1503 image
= self
.container_image_node_exporter
1504 elif daemon_type
== 'loki':
1505 image
= self
.container_image_loki
1506 elif daemon_type
== 'promtail':
1507 image
= self
.container_image_promtail
1508 elif daemon_type
== 'haproxy':
1509 image
= self
.container_image_haproxy
1510 elif daemon_type
== 'keepalived':
1511 image
= self
.container_image_keepalived
1512 elif daemon_type
== 'elasticsearch':
1513 image
= self
.container_image_elasticsearch
1514 elif daemon_type
== 'jaeger-agent':
1515 image
= self
.container_image_jaeger_agent
1516 elif daemon_type
== 'jaeger-collector':
1517 image
= self
.container_image_jaeger_collector
1518 elif daemon_type
== 'jaeger-query':
1519 image
= self
.container_image_jaeger_query
1520 elif daemon_type
== CustomContainerService
.TYPE
:
1521 # The image can't be resolved, the necessary information
1522 # is only available when a container is deployed (given
1525 elif daemon_type
== 'snmp-gateway':
1526 image
= self
.container_image_snmp_gateway
1528 assert False, daemon_type
1530 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1534 def _check_valid_addr(self
, host
: str, addr
: str) -> str:
1535 # make sure hostname is resolvable before trying to make a connection
1537 ip_addr
= utils
.resolve_ip(addr
)
1538 except OrchestratorError
as e
:
1540 You may need to supply an address for {addr}
1542 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1543 To add the cephadm SSH key to the host:
1544 > ceph cephadm get-pub-key > ~/ceph.pub
1545 > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1547 To check that the host is reachable open a new shell with the --no-hosts flag:
1548 > cephadm shell --no-hosts
1550 Then run the following:
1551 > ceph cephadm get-ssh-config > ssh_config
1552 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1553 > chmod 0600 ~/cephadm_private_key
1554 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1555 raise OrchestratorError(msg
)
1557 if ipaddress
.ip_address(ip_addr
).is_loopback
and host
== addr
:
1558 # if this is a re-add, use old address. otherwise error
1559 if host
not in self
.inventory
or self
.inventory
.get_addr(host
) == host
:
1560 raise OrchestratorError(
1561 (f
'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1562 + f
'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1564 f
'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1565 ip_addr
= self
.inventory
.get_addr(host
)
1567 with self
.async_timeout_handler(host
, f
'cephadm check-host --expect-hostname {host}'):
1568 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
1569 host
, cephadmNoImage
, 'check-host',
1570 ['--expect-hostname', host
],
1572 error_ok
=True, no_fsid
=True))
1574 msg
= 'check-host failed:\n' + '\n'.join(err
)
1575 # err will contain stdout and stderr, so we filter on the message text to
1576 # only show the errors
1577 errors
= [_i
.replace("ERROR: ", "") for _i
in err
if _i
.startswith('ERROR')]
1579 msg
= f
'Host {host} ({addr}) failed check(s): {errors}'
1580 raise OrchestratorError(msg
)
1581 except ssh
.HostConnectionError
as e
:
1582 raise OrchestratorError(str(e
))
1585 def _add_host(self
, spec
):
1586 # type: (HostSpec) -> str
1588 Add a host to be managed by the orchestrator.
1590 :param host: host name
1592 HostSpec
.validate(spec
)
1593 ip_addr
= self
._check
_valid
_addr
(spec
.hostname
, spec
.addr
)
1594 if spec
.addr
== spec
.hostname
and ip_addr
:
1597 if spec
.hostname
in self
.inventory
and self
.inventory
.get_addr(spec
.hostname
) != spec
.addr
:
1598 self
.cache
.refresh_all_host_info(spec
.hostname
)
1602 self
.check_mon_command({
1603 'prefix': 'osd crush add-bucket',
1604 'name': spec
.hostname
,
1606 'args': [f
'{k}={v}' for k
, v
in spec
.location
.items()],
1609 if spec
.hostname
not in self
.inventory
:
1610 self
.cache
.prime_empty_host(spec
.hostname
)
1611 self
.inventory
.add_host(spec
)
1612 self
.offline_hosts_remove(spec
.hostname
)
1613 if spec
.status
== 'maintenance':
1614 self
._set
_maintenance
_healthcheck
()
1615 self
.event
.set() # refresh stray health check
1616 self
.log
.info('Added host %s' % spec
.hostname
)
1617 return "Added host '{}' with addr '{}'".format(spec
.hostname
, spec
.addr
)
1620 def add_host(self
, spec
: HostSpec
) -> str:
1621 return self
._add
_host
(spec
)
1624 def remove_host(self
, host
: str, force
: bool = False, offline
: bool = False) -> str:
1626 Remove a host from orchestrator management.
1628 :param host: host name
1629 :param force: bypass running daemons check
1630 :param offline: remove offline host
1633 # check if host is offline
1634 host_offline
= host
in self
.offline_hosts
1636 if host_offline
and not offline
:
1637 raise OrchestratorValidationError(
1638 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host
))
1640 if not host_offline
and offline
:
1641 raise OrchestratorValidationError(
1642 "{} is online, please remove host without --offline.".format(host
))
1644 if offline
and not force
:
1645 raise OrchestratorValidationError("Removing an offline host requires --force")
1647 # check if there are daemons on the host
1649 daemons
= self
.cache
.get_daemons_by_host(host
)
1651 self
.log
.warning(f
"Blocked {host} removal. Daemons running: {daemons}")
1654 daemons_table
+= "{:<20} {:<15}\n".format("type", "id")
1655 daemons_table
+= "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1657 daemons_table
+= "{:<20} {:<15}\n".format(d
.daemon_type
, d
.daemon_id
)
1659 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1660 "The following daemons are running in the host:"
1661 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1662 host
, daemons_table
, host
))
1664 # check, if there we're removing the last _admin host
1666 p
= PlacementSpec(label
=SpecialHostLabels
.ADMIN
)
1667 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
1668 if len(admin_hosts
) == 1 and admin_hosts
[0] == host
:
1669 raise OrchestratorValidationError(f
"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
1670 f
" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host"
1671 " or add --force to this command")
1673 def run_cmd(cmd_args
: dict) -> None:
1674 ret
, out
, err
= self
.mon_command(cmd_args
)
1676 self
.log
.debug(f
"ran {cmd_args} with mon_command")
1678 f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1679 self
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
1682 daemons
= self
.cache
.get_daemons_by_host(host
)
1684 self
.log
.info(f
"removing: {d.name()}")
1686 if d
.daemon_type
!= 'osd':
1687 self
.cephadm_services
[daemon_type_to_service(str(d
.daemon_type
))].pre_remove(d
)
1688 self
.cephadm_services
[daemon_type_to_service(
1689 str(d
.daemon_type
))].post_remove(d
, is_failed_deploy
=False)
1692 'prefix': 'osd purge-actual',
1693 'id': int(str(d
.daemon_id
)),
1694 'yes_i_really_mean_it': True
1699 'prefix': 'osd crush rm',
1704 self
.inventory
.rm_host(host
)
1705 self
.cache
.rm_host(host
)
1706 self
.ssh
.reset_con(host
)
1707 # if host was in offline host list, we should remove it now.
1708 self
.offline_hosts_remove(host
)
1709 self
.event
.set() # refresh stray health check
1710 self
.log
.info('Removed host %s' % host
)
1711 return "Removed {} host '{}'".format('offline' if offline
else '', host
)
1714 def update_host_addr(self
, host
: str, addr
: str) -> str:
1715 self
._check
_valid
_addr
(host
, addr
)
1716 self
.inventory
.set_addr(host
, addr
)
1717 self
.ssh
.reset_con(host
)
1718 self
.event
.set() # refresh stray health check
1719 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1720 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1723 def get_hosts(self
):
1724 # type: () -> List[orchestrator.HostSpec]
1726 Return a list of hosts managed by the orchestrator.
1729 - skip async: manager reads from cache.
1731 return list(self
.inventory
.all_specs())
1734 def get_facts(self
, hostname
: Optional
[str] = None) -> List
[Dict
[str, Any
]]:
1736 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1739 - skip async: manager reads from cache.
1742 return [self
.cache
.get_facts(hostname
)]
1744 return [self
.cache
.get_facts(hostname
) for hostname
in self
.cache
.get_hosts()]
1747 def add_host_label(self
, host
: str, label
: str) -> str:
1748 self
.inventory
.add_label(host
, label
)
1749 self
.log
.info('Added label %s to host %s' % (label
, host
))
1750 self
._kick
_serve
_loop
()
1751 return 'Added label %s to host %s' % (label
, host
)
1754 def remove_host_label(self
, host
: str, label
: str, force
: bool = False) -> str:
1755 # if we remove the _admin label from the only host that has it we could end up
1756 # removing the only instance of the config and keyring and cause issues
1757 if not force
and label
== SpecialHostLabels
.ADMIN
:
1758 p
= PlacementSpec(label
=SpecialHostLabels
.ADMIN
)
1759 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
1760 if len(admin_hosts
) == 1 and admin_hosts
[0] == host
:
1761 raise OrchestratorValidationError(f
"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
1762 f
" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal"
1763 " of the last cluster config/keyring managed by cephadm.\n"
1764 f
"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
1765 " before completing this operation.\nIf you're certain this is"
1766 " what you want rerun this command with --force.")
1767 if self
.inventory
.has_label(host
, label
):
1768 self
.inventory
.rm_label(host
, label
)
1769 msg
= f
'Removed label {label} from host {host}'
1771 msg
= f
"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels."
1773 self
._kick
_serve
_loop
()
1776 def _host_ok_to_stop(self
, hostname
: str, force
: bool = False) -> Tuple
[int, str]:
1777 self
.log
.debug("running host-ok-to-stop checks")
1778 daemons
= self
.cache
.get_daemons()
1779 daemon_map
: Dict
[str, List
[str]] = defaultdict(lambda: [])
1781 assert dd
.hostname
is not None
1782 assert dd
.daemon_type
is not None
1783 assert dd
.daemon_id
is not None
1784 if dd
.hostname
== hostname
:
1785 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1787 notifications
: List
[str] = []
1788 error_notifications
: List
[str] = []
1790 for daemon_type
, daemon_ids
in daemon_map
.items():
1791 r
= self
.cephadm_services
[daemon_type_to_service(
1792 daemon_type
)].ok_to_stop(daemon_ids
, force
=force
)
1795 # collect error notifications so user can see every daemon causing host
1796 # to not be okay to stop
1797 error_notifications
.append(r
.stderr
)
1799 # if extra notifications to print for user, add them to notifications list
1800 notifications
.append(r
.stdout
)
1803 # at least one daemon is not okay to stop
1804 return 1, '\n'.join(error_notifications
)
1807 return 0, (f
'It is presumed safe to stop host {hostname}. '
1808 + 'Note the following:\n\n' + '\n'.join(notifications
))
1809 return 0, f
'It is presumed safe to stop host {hostname}'
1812 def host_ok_to_stop(self
, hostname
: str) -> str:
1813 if hostname
not in self
.cache
.get_hosts():
1814 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1816 rc
, msg
= self
._host
_ok
_to
_stop
(hostname
)
1818 raise OrchestratorError(msg
, errno
=rc
)
1823 def _set_maintenance_healthcheck(self
) -> None:
1824 """Raise/update or clear the maintenance health check as needed"""
1826 in_maintenance
= self
.inventory
.get_host_with_state("maintenance")
1827 if not in_maintenance
:
1828 self
.remove_health_warning('HOST_IN_MAINTENANCE')
1830 s
= "host is" if len(in_maintenance
) == 1 else "hosts are"
1831 self
.set_health_warning("HOST_IN_MAINTENANCE", f
"{len(in_maintenance)} {s} in maintenance mode", 1, [
1832 f
"{h} is in maintenance" for h
in in_maintenance
])
1836 def enter_host_maintenance(self
, hostname
: str, force
: bool = False, yes_i_really_mean_it
: bool = False) -> str:
1837 """ Attempt to place a cluster host in maintenance
1839 Placing a host into maintenance disables the cluster's ceph target in systemd
1840 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1841 for the host subtree in crush to prevent data movement during a host maintenance
1844 :param hostname: (str) name of the host (must match an inventory hostname)
1846 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1848 if yes_i_really_mean_it
and not force
:
1849 raise OrchestratorError("--force must be passed with --yes-i-really-mean-it")
1851 if len(self
.cache
.get_hosts()) == 1 and not yes_i_really_mean_it
:
1852 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1854 # if upgrade is active, deny
1855 if self
.upgrade
.upgrade_state
and not yes_i_really_mean_it
:
1856 raise OrchestratorError(
1857 f
"Unable to place {hostname} in maintenance with upgrade active/paused")
1859 tgt_host
= self
.inventory
._inventory
[hostname
]
1860 if tgt_host
.get("status", "").lower() == "maintenance":
1861 raise OrchestratorError(f
"Host {hostname} is already in maintenance")
1863 host_daemons
= self
.cache
.get_daemon_types(hostname
)
1864 self
.log
.debug("daemons on host {}".format(','.join(host_daemons
)))
1866 # daemons on this host, so check the daemons can be stopped
1867 # and if so, place the host into maintenance by disabling the target
1868 rc
, msg
= self
._host
_ok
_to
_stop
(hostname
, force
)
1869 if rc
and not yes_i_really_mean_it
:
1870 raise OrchestratorError(
1871 msg
+ '\nNote: Warnings can be bypassed with the --force flag', errno
=rc
)
1873 # call the host-maintenance function
1874 with self
.async_timeout_handler(hostname
, 'cephadm host-maintenance enter'):
1875 _out
, _err
, _code
= self
.wait_async(
1876 CephadmServe(self
)._run
_cephadm
(
1877 hostname
, cephadmNoImage
, "host-maintenance",
1880 returned_msg
= _err
[0].split('\n')[-1]
1881 if (returned_msg
.startswith('failed') or returned_msg
.startswith('ERROR')) and not yes_i_really_mean_it
:
1882 raise OrchestratorError(
1883 f
"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1885 if "osd" in host_daemons
:
1886 crush_node
= hostname
if '.' not in hostname
else hostname
.split('.')[0]
1887 rc
, out
, err
= self
.mon_command({
1888 'prefix': 'osd set-group',
1890 'who': [crush_node
],
1893 if rc
and not yes_i_really_mean_it
:
1895 f
"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1896 raise OrchestratorError(
1897 f
"Unable to set the osds on {hostname} to noout (rc={rc})")
1900 f
"maintenance mode request for {hostname} has SET the noout group")
1902 # update the host status in the inventory
1903 tgt_host
["status"] = "maintenance"
1904 self
.inventory
._inventory
[hostname
] = tgt_host
1905 self
.inventory
.save()
1907 self
._set
_maintenance
_healthcheck
()
1908 return f
'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
1912 def exit_host_maintenance(self
, hostname
: str) -> str:
1913 """Exit maintenance mode and return a host to an operational state
1915 Returning from maintenance will enable the clusters systemd target and
1916 start it, and remove any noout that has been added for the host if the
1917 host has osd daemons
1919 :param hostname: (str) host name
1921 :raises OrchestratorError: Unable to return from maintenance, or unset the
1924 tgt_host
= self
.inventory
._inventory
[hostname
]
1925 if tgt_host
['status'] != "maintenance":
1926 raise OrchestratorError(f
"Host {hostname} is not in maintenance mode")
1928 with self
.async_timeout_handler(hostname
, 'cephadm host-maintenance exit'):
1929 outs
, errs
, _code
= self
.wait_async(
1930 CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
,
1931 'host-maintenance', ['exit'], error_ok
=True))
1932 returned_msg
= errs
[0].split('\n')[-1]
1933 if returned_msg
.startswith('failed') or returned_msg
.startswith('ERROR'):
1934 raise OrchestratorError(
1935 f
"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1937 if "osd" in self
.cache
.get_daemon_types(hostname
):
1938 crush_node
= hostname
if '.' not in hostname
else hostname
.split('.')[0]
1939 rc
, _out
, _err
= self
.mon_command({
1940 'prefix': 'osd unset-group',
1942 'who': [crush_node
],
1947 f
"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1948 raise OrchestratorError(f
"Unable to set the osds on {hostname} to noout (rc={rc})")
1951 f
"exit maintenance request has UNSET for the noout group on host {hostname}")
1953 # update the host record status
1954 tgt_host
['status'] = ""
1955 self
.inventory
._inventory
[hostname
] = tgt_host
1956 self
.inventory
.save()
1958 self
._set
_maintenance
_healthcheck
()
1960 return f
"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1964 def rescan_host(self
, hostname
: str) -> str:
1965 """Use cephadm to issue a disk rescan on each HBA
1967 Some HBAs and external enclosures don't automatically register
1968 device insertion with the kernel, so for these scenarios we need
1971 :param hostname: (str) host name
1973 self
.log
.info(f
'disk rescan request sent to host "{hostname}"')
1974 with self
.async_timeout_handler(hostname
, 'cephadm disk-rescan'):
1975 _out
, _err
, _code
= self
.wait_async(
1976 CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
, "disk-rescan",
1977 [], no_fsid
=True, error_ok
=True))
1979 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
1981 msg
= _err
[0].split('\n')[-1]
1982 log_msg
= f
'disk rescan: {msg}'
1983 if msg
.upper().startswith('OK'):
1984 self
.log
.info(log_msg
)
1986 self
.log
.warning(log_msg
)
1990 def get_minimal_ceph_conf(self
) -> str:
1991 _
, config
, _
= self
.check_mon_command({
1992 "prefix": "config generate-minimal-conf",
1994 extra
= self
.extra_ceph_conf().conf
1997 config
= self
._combine
_confs
(config
, extra
)
1998 except Exception as e
:
1999 self
.log
.error(f
'Failed to add extra ceph conf settings to minimal ceph conf: {e}')
2002 def _combine_confs(self
, conf1
: str, conf2
: str) -> str:
2003 section_to_option
: Dict
[str, List
[str]] = {}
2004 final_conf
: str = ''
2005 for conf
in [conf1
, conf2
]:
2009 for line
in conf
.split('\n'):
2010 if line
.strip().startswith('#') or not line
.strip():
2012 if line
.strip().startswith('[') and line
.strip().endswith(']'):
2013 section
= line
.strip().replace('[', '').replace(']', '')
2014 if section
not in section_to_option
:
2015 section_to_option
[section
] = []
2017 section_to_option
[section
].append(line
.strip())
2019 first_section
= True
2020 for section
, options
in section_to_option
.items():
2021 if not first_section
:
2023 final_conf
+= f
'[{section}]\n'
2024 for option
in options
:
2025 final_conf
+= f
'{option}\n'
2026 first_section
= False
2030 def _invalidate_daemons_and_kick_serve(self
, filter_host
: Optional
[str] = None) -> None:
2032 self
.cache
.invalidate_host_daemons(filter_host
)
2034 for h
in self
.cache
.get_hosts():
2035 # Also discover daemons deployed manually
2036 self
.cache
.invalidate_host_daemons(h
)
2038 self
._kick
_serve
_loop
()
2041 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
2042 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
2044 self
._invalidate
_daemons
_and
_kick
_serve
()
2045 self
.log
.debug('Kicked serve() loop to refresh all services')
2047 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
2050 for nm
, spec
in self
.spec_store
.all_specs
.items():
2051 if service_type
is not None and service_type
!= spec
.service_type
:
2053 if service_name
is not None and service_name
!= nm
:
2056 if spec
.service_type
!= 'osd':
2057 size
= spec
.placement
.get_target_count(self
.cache
.get_schedulable_hosts())
2059 # osd counting is special
2062 sm
[nm
] = orchestrator
.ServiceDescription(
2066 events
=self
.events
.get_for_service(spec
.service_name()),
2067 created
=self
.spec_store
.spec_created
[nm
],
2068 deleted
=self
.spec_store
.spec_deleted
.get(nm
, None),
2069 virtual_ip
=spec
.get_virtual_ip(),
2070 ports
=spec
.get_port_start(),
2072 if spec
.service_type
== 'ingress':
2073 # ingress has 2 daemons running per host
2074 # but only if it's the full ingress service, not for keepalive-only
2075 if not cast(IngressSpec
, spec
).keepalive_only
:
2078 # factor daemons into status
2079 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2080 for name
, dd
in dm
.items():
2081 assert dd
.hostname
is not None, f
'no hostname for {dd!r}'
2082 assert dd
.daemon_type
is not None, f
'no daemon_type for {dd!r}'
2084 n
: str = dd
.service_name()
2088 and service_type
!= daemon_type_to_service(dd
.daemon_type
)
2091 if service_name
and service_name
!= n
:
2095 # new unmanaged service
2098 service_type
=daemon_type_to_service(dd
.daemon_type
),
2099 service_id
=dd
.service_id(),
2101 sm
[n
] = orchestrator
.ServiceDescription(
2102 last_refresh
=dd
.last_refresh
,
2103 container_image_id
=dd
.container_image_id
,
2104 container_image_name
=dd
.container_image_name
,
2109 if dd
.status
== DaemonDescriptionStatus
.running
:
2111 if dd
.daemon_type
== 'osd':
2112 # The osd count can't be determined by the Placement spec.
2113 # Showing an actual/expected representation cannot be determined
2114 # here. So we're setting running = size for now.
2117 not sm
[n
].last_refresh
2118 or not dd
.last_refresh
2119 or dd
.last_refresh
< sm
[n
].last_refresh
# type: ignore
2121 sm
[n
].last_refresh
= dd
.last_refresh
2123 return list(sm
.values())
2126 def list_daemons(self
,
2127 service_name
: Optional
[str] = None,
2128 daemon_type
: Optional
[str] = None,
2129 daemon_id
: Optional
[str] = None,
2130 host
: Optional
[str] = None,
2131 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
2133 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
2134 self
.log
.debug('Kicked serve() loop to refresh all daemons')
2137 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2138 if host
and h
!= host
:
2140 for name
, dd
in dm
.items():
2141 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
2143 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
2145 if service_name
is not None and service_name
!= dd
.service_name():
2147 if not dd
.memory_request
and dd
.daemon_type
in ['osd', 'mon']:
2148 dd
.memory_request
= cast(Optional
[int], self
.get_foreign_ceph_option(
2150 f
"{dd.daemon_type}_memory_target"
2156 def service_action(self
, action
: str, service_name
: str) -> List
[str]:
2157 if service_name
not in self
.spec_store
.all_specs
.keys():
2158 raise OrchestratorError(f
'Invalid service name "{service_name}".'
2159 + ' View currently running services using "ceph orch ls"')
2160 dds
: List
[DaemonDescription
] = self
.cache
.get_daemons_by_service(service_name
)
2162 raise OrchestratorError(f
'No daemons exist under service name "{service_name}".'
2163 + ' View currently running services using "ceph orch ls"')
2164 if action
== 'stop' and service_name
.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
2165 return [f
'Stopping entire {service_name} service is prohibited.']
2166 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
2168 self
._schedule
_daemon
_action
(dd
.name(), action
)
2172 def _rotate_daemon_key(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> str:
2173 self
.log
.info(f
'Rotating authentication key for {daemon_spec.name()}')
2174 rc
, out
, err
= self
.mon_command({
2175 'prefix': 'auth get-or-create-pending',
2176 'entity': daemon_spec
.entity_name(),
2180 pending_key
= j
[0]['pending_key']
2182 # deploy a new keyring file
2183 if daemon_spec
.daemon_type
!= 'osd':
2184 daemon_spec
= self
.cephadm_services
[daemon_type_to_service(
2185 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
2186 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2187 self
.wait_async(CephadmServe(self
)._create
_daemon
(daemon_spec
, reconfig
=True))
2189 # try to be clever, or fall back to restarting the daemon
2191 if daemon_spec
.daemon_type
== 'osd':
2192 rc
, out
, err
= self
.tool_exec(
2193 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-stored-key', '-i', '-'],
2194 stdin
=pending_key
.encode()
2197 rc
, out
, err
= self
.tool_exec(
2198 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-key', '-i', '-'],
2199 stdin
=pending_key
.encode()
2201 elif daemon_spec
.daemon_type
== 'mds':
2202 rc
, out
, err
= self
.tool_exec(
2203 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-key', '-i', '-'],
2204 stdin
=pending_key
.encode()
2207 daemon_spec
.daemon_type
== 'mgr'
2208 and daemon_spec
.daemon_id
== self
.get_mgr_id()
2210 rc
, out
, err
= self
.tool_exec(
2211 args
=['ceph', 'tell', daemon_spec
.name(), 'rotate-key', '-i', '-'],
2212 stdin
=pending_key
.encode()
2215 self
._daemon
_action
(daemon_spec
, 'restart')
2217 return f
'Rotated key for {daemon_spec.name()}'
2219 def _daemon_action(self
,
2220 daemon_spec
: CephadmDaemonDeploySpec
,
2222 image
: Optional
[str] = None) -> str:
2223 self
._daemon
_action
_set
_image
(action
, image
, daemon_spec
.daemon_type
,
2224 daemon_spec
.daemon_id
)
2226 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(daemon_spec
.daemon_type
,
2227 daemon_spec
.daemon_id
):
2228 self
.mgr_service
.fail_over()
2229 return '' # unreachable
2231 if action
== 'rotate-key':
2232 return self
._rotate
_daemon
_key
(daemon_spec
)
2234 if action
== 'redeploy' or action
== 'reconfig':
2235 if daemon_spec
.daemon_type
!= 'osd':
2236 daemon_spec
= self
.cephadm_services
[daemon_type_to_service(
2237 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
2239 # for OSDs, we still need to update config, just not carry out the full
2240 # prepare_create function
2241 daemon_spec
.final_config
, daemon_spec
.deps
= self
.osd_service
.generate_config(
2243 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2244 return self
.wait_async(
2245 CephadmServe(self
)._create
_daemon
(daemon_spec
, reconfig
=(action
== 'reconfig')))
2248 'start': ['reset-failed', 'start'],
2250 'restart': ['reset-failed', 'restart'],
2252 name
= daemon_spec
.name()
2253 for a
in actions
[action
]:
2255 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm unit --name {name}'):
2256 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2257 daemon_spec
.host
, name
, 'unit',
2258 ['--name', name
, a
]))
2260 self
.log
.exception(f
'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
2261 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
2262 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
2263 self
.events
.for_daemon(name
, 'INFO', msg
)
2266 def _daemon_action_set_image(self
, action
: str, image
: Optional
[str], daemon_type
: str, daemon_id
: str) -> None:
2267 if image
is not None:
2268 if action
!= 'redeploy':
2269 raise OrchestratorError(
2270 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
2271 if daemon_type
not in CEPH_IMAGE_TYPES
:
2272 raise OrchestratorError(
2273 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
2274 f
'types are: {", ".join(CEPH_IMAGE_TYPES)}')
2276 self
.check_mon_command({
2277 'prefix': 'config set',
2278 'name': 'container_image',
2280 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
2284 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> str:
2285 d
= self
.cache
.get_daemon(daemon_name
)
2286 assert d
.daemon_type
is not None
2287 assert d
.daemon_id
is not None
2289 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(d
.daemon_type
, d
.daemon_id
) \
2290 and not self
.mgr_service
.mgr_map_has_standby():
2291 raise OrchestratorError(
2292 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2294 if action
== 'rotate-key':
2295 if d
.daemon_type
not in ['mgr', 'osd', 'mds',
2296 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
2297 raise OrchestratorError(
2298 f
'key rotation not supported for {d.daemon_type}'
2301 self
._daemon
_action
_set
_image
(action
, image
, d
.daemon_type
, d
.daemon_id
)
2303 self
.log
.info(f
'Schedule {action} daemon {daemon_name}')
2304 return self
._schedule
_daemon
_action
(daemon_name
, action
)
2306 def daemon_is_self(self
, daemon_type
: str, daemon_id
: str) -> bool:
2307 return daemon_type
== 'mgr' and daemon_id
== self
.get_mgr_id()
2309 def get_active_mgr(self
) -> DaemonDescription
:
2310 return self
.mgr_service
.get_active_daemon(self
.cache
.get_daemons_by_type('mgr'))
2312 def get_active_mgr_digests(self
) -> List
[str]:
2313 digests
= self
.mgr_service
.get_active_daemon(
2314 self
.cache
.get_daemons_by_type('mgr')).container_image_digests
2315 return digests
if digests
else []
2317 def _schedule_daemon_action(self
, daemon_name
: str, action
: str) -> str:
2318 dd
= self
.cache
.get_daemon(daemon_name
)
2319 assert dd
.daemon_type
is not None
2320 assert dd
.daemon_id
is not None
2321 assert dd
.hostname
is not None
2322 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(dd
.daemon_type
, dd
.daemon_id
) \
2323 and not self
.mgr_service
.mgr_map_has_standby():
2324 raise OrchestratorError(
2325 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2326 self
.cache
.schedule_daemon_action(dd
.hostname
, dd
.name(), action
)
2327 self
.cache
.save_host(dd
.hostname
)
2328 msg
= "Scheduled to {} {} on host '{}'".format(action
, daemon_name
, dd
.hostname
)
2329 self
._kick
_serve
_loop
()
2333 def remove_daemons(self
, names
):
2334 # type: (List[str]) -> List[str]
2336 for host
, dm
in self
.cache
.daemons
.items():
2339 args
.append((name
, host
))
2341 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
2342 self
.log
.info('Remove daemons %s' % ' '.join([a
[0] for a
in args
]))
2343 return self
._remove
_daemons
(args
)
2346 def remove_service(self
, service_name
: str, force
: bool = False) -> str:
2347 self
.log
.info('Remove service %s' % service_name
)
2348 self
._trigger
_preview
_refresh
(service_name
=service_name
)
2349 if service_name
in self
.spec_store
:
2350 if self
.spec_store
[service_name
].spec
.service_type
in ('mon', 'mgr'):
2351 return f
'Unable to remove {service_name} service.\n' \
2352 f
'Note, you might want to mark the {service_name} service as "unmanaged"'
2354 return f
"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
2356 # Report list of affected OSDs?
2357 if not force
and service_name
.startswith('osd.'):
2359 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2361 for name
, dd
in dm
.items():
2362 if dd
.daemon_type
== 'osd' and dd
.service_name() == service_name
:
2363 osds_to_remove
.append(str(dd
.daemon_id
))
2365 osds_msg
[h
] = osds_to_remove
2368 for h
, ls
in osds_msg
.items():
2369 msg
+= f
'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2370 raise OrchestratorError(
2371 f
'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2373 found
= self
.spec_store
.rm(service_name
)
2374 if found
and service_name
.startswith('osd.'):
2375 self
.spec_store
.finally_rm(service_name
)
2376 self
._kick
_serve
_loop
()
2377 return f
'Removed service {service_name}'
2380 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
2382 Return the storage inventory of hosts matching the given filter.
2384 :param host_filter: host filter
2387 - add filtering by label
2390 if host_filter
and host_filter
.hosts
:
2391 for h
in host_filter
.hosts
:
2392 self
.log
.debug(f
'will refresh {h} devs')
2393 self
.cache
.invalidate_host_devices(h
)
2394 self
.cache
.invalidate_host_networks(h
)
2396 for h
in self
.cache
.get_hosts():
2397 self
.log
.debug(f
'will refresh {h} devs')
2398 self
.cache
.invalidate_host_devices(h
)
2399 self
.cache
.invalidate_host_networks(h
)
2402 self
.log
.debug('Kicked serve() loop to refresh devices')
2405 for host
, dls
in self
.cache
.devices
.items():
2406 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
2408 result
.append(orchestrator
.InventoryHost(host
,
2409 inventory
.Devices(dls
)))
2413 def zap_device(self
, host
: str, path
: str) -> str:
2414 """Zap a device on a managed host.
2416 Use ceph-volume zap to return a device to an unused/free state
2419 host (str): hostname of the cluster host
2420 path (str): device path
2423 OrchestratorError: host is not a cluster host
2424 OrchestratorError: host is in maintenance and therefore unavailable
2425 OrchestratorError: device path not found on the host
2426 OrchestratorError: device is known to a different ceph cluster
2427 OrchestratorError: device holds active osd
2428 OrchestratorError: device cache hasn't been populated yet..
2431 str: output from the zap command
2434 self
.log
.info('Zap device %s:%s' % (host
, path
))
2436 if host
not in self
.inventory
.keys():
2437 raise OrchestratorError(
2438 f
"Host '{host}' is not a member of the cluster")
2440 host_info
= self
.inventory
._inventory
.get(host
, {})
2441 if host_info
.get('status', '').lower() == 'maintenance':
2442 raise OrchestratorError(
2443 f
"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2445 if host
not in self
.cache
.devices
:
2446 raise OrchestratorError(
2447 f
"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2449 host_devices
= self
.cache
.devices
[host
]
2451 osd_id_list
: List
[str] = []
2453 for dev
in host_devices
:
2454 if dev
.path
== path
:
2458 raise OrchestratorError(
2459 f
"Device path '{path}' not found on host '{host}'")
2462 dev_name
= os
.path
.basename(path
)
2463 active_osds
: List
[str] = []
2464 for osd_id
in osd_id_list
:
2465 metadata
= self
.get_metadata('osd', str(osd_id
))
2467 if metadata
.get('hostname', '') == host
and dev_name
in metadata
.get('devices', '').split(','):
2468 active_osds
.append("osd." + osd_id
)
2470 raise OrchestratorError(
2471 f
"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2472 f
"OSD{'s' if len(active_osds) > 1 else ''}"
2473 f
" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2475 cv_args
= ['--', 'lvm', 'zap', '--destroy', path
]
2476 with self
.async_timeout_handler(host
, f
'cephadm ceph-volume {" ".join(cv_args)}'):
2477 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2478 host
, 'osd', 'ceph-volume', cv_args
, error_ok
=True))
2480 self
.cache
.invalidate_host_devices(host
)
2481 self
.cache
.invalidate_host_networks(host
)
2483 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
2484 msg
= f
'zap successful for {path} on {host}'
2490 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
2492 Blink a device light. Calling something like::
2494 lsmcli local-disk-ident-led-on --path $path
2496 If you must, you can customize this via::
2498 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2499 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2501 See templates/blink_device_light_cmd.j2
2504 def blink(host
: str, dev
: str, path
: str) -> str:
2505 cmd_line
= self
.template
.render('blink_device_light_cmd.j2',
2508 'ident_fault': ident_fault
,
2513 cmd_args
= shlex
.split(cmd_line
)
2515 with self
.async_timeout_handler(host
, f
'cephadm shell -- {" ".join(cmd_args)}'):
2516 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2517 host
, 'osd', 'shell', ['--'] + cmd_args
,
2520 raise OrchestratorError(
2521 'Unable to affect %s light for %s:%s. Command: %s' % (
2522 ident_fault
, host
, dev
, ' '.join(cmd_args
)))
2523 self
.log
.info('Set %s light for %s:%s %s' % (
2524 ident_fault
, host
, dev
, 'on' if on
else 'off'))
2525 return "Set %s light for %s:%s %s" % (
2526 ident_fault
, host
, dev
, 'on' if on
else 'off')
2530 def get_osd_uuid_map(self
, only_up
=False):
2531 # type: (bool) -> Dict[str, str]
2532 osd_map
= self
.get('osd_map')
2534 for o
in osd_map
['osds']:
2535 # only include OSDs that have ever started in this map. this way
2536 # an interrupted osd create can be repeated and succeed the second
2538 osd_id
= o
.get('osd')
2540 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2542 r
[str(osd_id
)] = o
.get('uuid', '')
2545 def get_osd_by_id(self
, osd_id
: int) -> Optional
[Dict
[str, Any
]]:
2546 osd
= [x
for x
in self
.get('osd_map')['osds']
2547 if x
['osd'] == osd_id
]
2554 def _trigger_preview_refresh(self
,
2555 specs
: Optional
[List
[DriveGroupSpec
]] = None,
2556 service_name
: Optional
[str] = None,
2558 # Only trigger a refresh when a spec has changed
2562 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
2563 # the to-be-preview spec != the actual spec, this means we need to
2564 # trigger a refresh, if the spec has been removed (==None) we need to
2566 if not preview_spec
or spec
!= preview_spec
:
2567 trigger_specs
.append(spec
)
2569 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
2570 if not any(trigger_specs
):
2573 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
2574 for host
in refresh_hosts
:
2575 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
2576 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
2579 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
2581 Deprecated. Please use `apply()` instead.
2583 Keeping this around to be compatible to mgr/dashboard
2585 return [self
._apply
(spec
) for spec
in specs
]
2588 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
2589 hosts
: List
[HostSpec
] = self
.inventory
.all_specs()
2590 filtered_hosts
: List
[str] = drive_group
.placement
.filter_matching_hostspecs(hosts
)
2591 if not filtered_hosts
:
2592 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
2593 return self
.osd_service
.create_from_spec(drive_group
)
2595 def _preview_osdspecs(self
,
2596 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
2599 return {'n/a': [{'error': True,
2600 'message': 'No OSDSpec or matching hosts found.'}]}
2601 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
2602 if not matching_hosts
:
2603 return {'n/a': [{'error': True,
2604 'message': 'No OSDSpec or matching hosts found.'}]}
2605 # Is any host still loading previews or still in the queue to be previewed
2606 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
2607 if pending_hosts
or any(item
in self
.cache
.osdspec_previews_refresh_queue
for item
in matching_hosts
):
2608 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2609 return {'n/a': [{'error': True,
2610 'message': 'Preview data is being generated.. '
2611 'Please re-run this command in a bit.'}]}
2612 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2613 previews_for_specs
= {}
2614 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
2615 if host
not in matching_hosts
:
2618 for osd_report
in raw_reports
:
2619 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
2620 osd_reports
.append(osd_report
)
2621 previews_for_specs
.update({host
: osd_reports
})
2622 return previews_for_specs
2624 def _calc_daemon_deps(self
,
2625 spec
: Optional
[ServiceSpec
],
2627 daemon_id
: str) -> List
[str]:
2629 def get_daemon_names(daemons
: List
[str]) -> List
[str]:
2631 for daemon_type
in daemons
:
2632 for dd
in self
.cache
.get_daemons_by_type(daemon_type
):
2633 daemon_names
.append(dd
.name())
2636 alertmanager_user
, alertmanager_password
= self
._get
_alertmanager
_credentials
()
2637 prometheus_user
, prometheus_password
= self
._get
_prometheus
_credentials
()
2640 if daemon_type
== 'haproxy':
2641 # because cephadm creates new daemon instances whenever
2642 # port or ip changes, identifying daemons by name is
2643 # sufficient to detect changes.
2646 ingress_spec
= cast(IngressSpec
, spec
)
2647 assert ingress_spec
.backend_service
2648 daemons
= self
.cache
.get_daemons_by_service(ingress_spec
.backend_service
)
2649 deps
= [d
.name() for d
in daemons
]
2650 elif daemon_type
== 'keepalived':
2651 # because cephadm creates new daemon instances whenever
2652 # port or ip changes, identifying daemons by name is
2653 # sufficient to detect changes.
2656 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2657 deps
= [d
.name() for d
in daemons
if d
.daemon_type
== 'haproxy']
2658 elif daemon_type
== 'agent':
2662 server_port
= str(self
.http_server
.agent
.server_port
)
2663 root_cert
= self
.http_server
.agent
.ssl_certs
.get_root_cert()
2666 deps
= sorted([self
.get_mgr_ip(), server_port
, root_cert
,
2667 str(self
.device_enhanced_scan
)])
2668 elif daemon_type
== 'iscsi':
2670 iscsi_spec
= cast(IscsiServiceSpec
, spec
)
2671 deps
= [self
.iscsi_service
.get_trusted_ips(iscsi_spec
)]
2673 deps
= [self
.get_mgr_ip()]
2674 elif daemon_type
== 'prometheus':
2675 # for prometheus we add the active mgr as an explicit dependency,
2676 # this way we force a redeploy after a mgr failover
2677 deps
.append(self
.get_active_mgr().name())
2678 deps
.append(str(self
.get_module_option_ex('prometheus', 'server_port', 9283)))
2679 deps
.append(str(self
.service_discovery_port
))
2680 # prometheus yaml configuration file (generated by prometheus.yml.j2) contains
2681 # a scrape_configs section for each service type. This should be included only
2682 # when at least one daemon of the corresponding service is running. Therefore,
2683 # an explicit dependency is added for each service-type to force a reconfig
2684 # whenever the number of daemons for those service-type changes from 0 to greater
2685 # than zero and vice versa.
2686 deps
+= [s
for s
in ['node-exporter', 'alertmanager']
2687 if self
.cache
.get_daemons_by_service(s
)]
2688 if len(self
.cache
.get_daemons_by_type('ingress')) > 0:
2689 deps
.append('ingress')
2690 # add dependency on ceph-exporter daemons
2691 deps
+= [d
.name() for d
in self
.cache
.get_daemons_by_service('ceph-exporter')]
2692 if self
.secure_monitoring_stack
:
2693 if prometheus_user
and prometheus_password
:
2694 deps
.append(f
'{hash(prometheus_user + prometheus_password)}')
2695 if alertmanager_user
and alertmanager_password
:
2696 deps
.append(f
'{hash(alertmanager_user + alertmanager_password)}')
2697 elif daemon_type
== 'grafana':
2698 deps
+= get_daemon_names(['prometheus', 'loki'])
2699 if self
.secure_monitoring_stack
and prometheus_user
and prometheus_password
:
2700 deps
.append(f
'{hash(prometheus_user + prometheus_password)}')
2701 elif daemon_type
== 'alertmanager':
2702 deps
+= get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
2703 if self
.secure_monitoring_stack
and alertmanager_user
and alertmanager_password
:
2704 deps
.append(f
'{hash(alertmanager_user + alertmanager_password)}')
2705 elif daemon_type
== 'promtail':
2706 deps
+= get_daemon_names(['loki'])
2708 # TODO(redo): some error message!
2711 if daemon_type
in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']:
2712 deps
.append(f
'secure_monitoring_stack:{self.secure_monitoring_stack}')
2717 def _remove_daemons(self
, name
: str, host
: str) -> str:
2718 return CephadmServe(self
)._remove
_daemon
(name
, host
)
2720 def _check_pool_exists(self
, pool
: str, service_name
: str) -> None:
2721 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
2722 if not self
.rados
.pool_exists(pool
):
2723 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
2724 f
'service {service_name}')
2726 def _add_daemon(self
,
2728 spec
: ServiceSpec
) -> List
[str]:
2730 Add (and place) a daemon. Require explicit host placement. Do not
2731 schedule, and do not apply the related scheduling limitations.
2733 if spec
.service_name() not in self
.spec_store
:
2734 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2735 'Please use `ceph orch apply ...` to create a Service.\n'
2736 'Note, you might want to create the service with "unmanaged=true"')
2738 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
2739 if not spec
.placement
.hosts
:
2740 raise OrchestratorError('must specify host(s) to deploy on')
2741 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
2742 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2743 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
2744 spec
.placement
.hosts
, count
)
2746 def _create_daemons(self
,
2749 daemons
: List
[DaemonDescription
],
2750 hosts
: List
[HostPlacementSpec
],
2751 count
: int) -> List
[str]:
2752 if count
> len(hosts
):
2753 raise OrchestratorError('too few hosts: want %d, have %s' % (
2757 service_type
= daemon_type_to_service(daemon_type
)
2759 args
= [] # type: List[CephadmDaemonDeploySpec]
2760 for host
, network
, name
in hosts
:
2761 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2762 prefix
=spec
.service_id
,
2766 self
.cephadm_services
[service_type
].config(spec
)
2769 daemon_spec
= self
.cephadm_services
[service_type
].make_daemon_spec(
2770 host
, daemon_id
, network
, spec
,
2771 # NOTE: this does not consider port conflicts!
2772 ports
=spec
.get_port_start())
2773 self
.log
.debug('Placing %s.%s on host %s' % (
2774 daemon_type
, daemon_id
, host
))
2775 args
.append(daemon_spec
)
2777 # add to daemon list so next name(s) will also be unique
2778 sd
= orchestrator
.DaemonDescription(
2780 daemon_type
=daemon_type
,
2781 daemon_id
=daemon_id
,
2786 def create_func_map(*args
: Any
) -> str:
2787 daemon_spec
= self
.cephadm_services
[daemon_type
].prepare_create(*args
)
2788 with self
.async_timeout_handler(daemon_spec
.host
, f
'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2789 return self
.wait_async(CephadmServe(self
)._create
_daemon
(daemon_spec
))
2791 return create_func_map(args
)
2794 def add_daemon(self
, spec
: ServiceSpec
) -> List
[str]:
2797 with orchestrator
.set_exception_subject('service', spec
.service_name(), overwrite
=True):
2798 for d_type
in service_to_daemon_types(spec
.service_type
):
2799 ret
.extend(self
._add
_daemon
(d_type
, spec
))
2801 except OrchestratorError
as e
:
2802 self
.events
.from_orch_error(e
)
2805 def _get_alertmanager_credentials(self
) -> Tuple
[str, str]:
2806 user
= self
.get_store(AlertmanagerService
.USER_CFG_KEY
)
2807 password
= self
.get_store(AlertmanagerService
.PASS_CFG_KEY
)
2808 if user
is None or password
is None:
2811 self
.set_store(AlertmanagerService
.USER_CFG_KEY
, user
)
2812 self
.set_store(AlertmanagerService
.PASS_CFG_KEY
, password
)
2813 return (user
, password
)
2815 def _get_prometheus_credentials(self
) -> Tuple
[str, str]:
2816 user
= self
.get_store(PrometheusService
.USER_CFG_KEY
)
2817 password
= self
.get_store(PrometheusService
.PASS_CFG_KEY
)
2818 if user
is None or password
is None:
2821 self
.set_store(PrometheusService
.USER_CFG_KEY
, user
)
2822 self
.set_store(PrometheusService
.PASS_CFG_KEY
, password
)
2823 return (user
, password
)
2826 def set_prometheus_access_info(self
, user
: str, password
: str) -> str:
2827 self
.set_store(PrometheusService
.USER_CFG_KEY
, user
)
2828 self
.set_store(PrometheusService
.PASS_CFG_KEY
, password
)
2829 return 'prometheus credentials updated correctly'
2832 def set_alertmanager_access_info(self
, user
: str, password
: str) -> str:
2833 self
.set_store(AlertmanagerService
.USER_CFG_KEY
, user
)
2834 self
.set_store(AlertmanagerService
.PASS_CFG_KEY
, password
)
2835 return 'alertmanager credentials updated correctly'
2838 def get_prometheus_access_info(self
) -> Dict
[str, str]:
2839 user
, password
= self
._get
_prometheus
_credentials
()
2840 return {'user': user
,
2841 'password': password
,
2842 'certificate': self
.http_server
.service_discovery
.ssl_certs
.get_root_cert()}
2845 def get_alertmanager_access_info(self
) -> Dict
[str, str]:
2846 user
, password
= self
._get
_alertmanager
_credentials
()
2847 return {'user': user
,
2848 'password': password
,
2849 'certificate': self
.http_server
.service_discovery
.ssl_certs
.get_root_cert()}
2852 def apply_mon(self
, spec
: ServiceSpec
) -> str:
2853 return self
._apply
(spec
)
2855 def _apply(self
, spec
: GenericSpec
) -> str:
2856 if spec
.service_type
== 'host':
2857 return self
._add
_host
(cast(HostSpec
, spec
))
2859 if spec
.service_type
== 'osd':
2860 # _trigger preview refresh needs to be smart and
2861 # should only refresh if a change has been detected
2862 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
2864 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
2866 def _get_candidate_hosts(self
, placement
: PlacementSpec
) -> List
[str]:
2867 """Return a list of candidate hosts according to the placement specification."""
2868 all_hosts
= self
.cache
.get_schedulable_hosts()
2871 candidates
= [h
.hostname
for h
in placement
.hosts
if h
.hostname
in placement
.hosts
]
2872 elif placement
.label
:
2873 candidates
= [x
.hostname
for x
in [h
for h
in all_hosts
if placement
.label
in h
.labels
]]
2874 elif placement
.host_pattern
:
2875 candidates
= [x
for x
in placement
.filter_matching_hostspecs(all_hosts
)]
2876 elif (placement
.count
is not None or placement
.count_per_host
is not None):
2877 candidates
= [x
.hostname
for x
in all_hosts
]
2878 return [h
for h
in candidates
if not self
.cache
.is_host_draining(h
)]
2880 def _validate_one_shot_placement_spec(self
, spec
: PlacementSpec
) -> None:
2881 """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
2882 if spec
.count
is not None:
2883 raise OrchestratorError(
2884 "Placement 'count' field is no supported for this specification.")
2885 if spec
.count_per_host
is not None:
2886 raise OrchestratorError(
2887 "Placement 'count_per_host' field is no supported for this specification.")
2889 all_hosts
= [h
.hostname
for h
in self
.inventory
.all_specs()]
2890 invalid_hosts
= [h
.hostname
for h
in spec
.hosts
if h
.hostname
not in all_hosts
]
2892 raise OrchestratorError(f
"Found invalid host(s) in placement section: {invalid_hosts}. "
2893 f
"Please check 'ceph orch host ls' for available hosts.")
2894 elif not self
._get
_candidate
_hosts
(spec
):
2895 raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n"
2896 "Please check 'ceph orch host ls' for available hosts.\n"
2897 "Note: draining hosts are excluded from the candidate list.")
2899 def _validate_tunedprofile_settings(self
, spec
: TunedProfileSpec
) -> Dict
[str, List
[str]]:
2900 candidate_hosts
= spec
.placement
.filter_matching_hostspecs(self
.inventory
.all_specs())
2901 invalid_options
: Dict
[str, List
[str]] = {}
2902 for host
in candidate_hosts
:
2903 host_sysctl_options
= self
.cache
.get_facts(host
).get('sysctl_options', {})
2904 invalid_options
[host
] = []
2905 for option
in spec
.settings
:
2906 if option
not in host_sysctl_options
:
2907 invalid_options
[host
].append(option
)
2908 return invalid_options
2910 def _validate_tuned_profile_spec(self
, spec
: TunedProfileSpec
) -> None:
2911 if not spec
.settings
:
2912 raise OrchestratorError("Invalid spec: settings section cannot be empty.")
2913 self
._validate
_one
_shot
_placement
_spec
(spec
.placement
)
2914 invalid_options
= self
._validate
_tunedprofile
_settings
(spec
)
2915 if any(e
for e
in invalid_options
.values()):
2916 raise OrchestratorError(
2917 f
'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}')
2920 def apply_tuned_profiles(self
, specs
: List
[TunedProfileSpec
], no_overwrite
: bool = False) -> str:
2923 self
._validate
_tuned
_profile
_spec
(spec
)
2924 if no_overwrite
and self
.tuned_profiles
.exists(spec
.profile_name
):
2926 f
"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2928 # done, let's save the specs
2929 self
.tuned_profiles
.add_profile(spec
)
2930 outs
.append(f
'Saved tuned profile {spec.profile_name}')
2931 self
._kick
_serve
_loop
()
2932 return '\n'.join(outs
)
2935 def rm_tuned_profile(self
, profile_name
: str) -> str:
2936 if profile_name
not in self
.tuned_profiles
:
2937 raise OrchestratorError(
2938 f
'Tuned profile {profile_name} does not exist. Nothing to remove.')
2939 self
.tuned_profiles
.rm_profile(profile_name
)
2940 self
._kick
_serve
_loop
()
2941 return f
'Removed tuned profile {profile_name}'
2944 def tuned_profile_ls(self
) -> List
[TunedProfileSpec
]:
2945 return self
.tuned_profiles
.list_profiles()
2948 def tuned_profile_add_setting(self
, profile_name
: str, setting
: str, value
: str) -> str:
2949 if profile_name
not in self
.tuned_profiles
:
2950 raise OrchestratorError(
2951 f
'Tuned profile {profile_name} does not exist. Cannot add setting.')
2952 self
.tuned_profiles
.add_setting(profile_name
, setting
, value
)
2953 self
._kick
_serve
_loop
()
2954 return f
'Added setting {setting} with value {value} to tuned profile {profile_name}'
2957 def tuned_profile_rm_setting(self
, profile_name
: str, setting
: str) -> str:
2958 if profile_name
not in self
.tuned_profiles
:
2959 raise OrchestratorError(
2960 f
'Tuned profile {profile_name} does not exist. Cannot remove setting.')
2961 self
.tuned_profiles
.rm_setting(profile_name
, setting
)
2962 self
._kick
_serve
_loop
()
2963 return f
'Removed setting {setting} from tuned profile {profile_name}'
2966 def service_discovery_dump_cert(self
) -> str:
2967 root_cert
= self
.get_store(ServiceDiscovery
.KV_STORE_SD_ROOT_CERT
)
2969 raise OrchestratorError('No certificate found for service discovery')
2972 def set_health_warning(self
, name
: str, summary
: str, count
: int, detail
: List
[str]) -> None:
2973 self
.health_checks
[name
] = {
2974 'severity': 'warning',
2979 self
.set_health_checks(self
.health_checks
)
2981 def remove_health_warning(self
, name
: str) -> None:
2982 if name
in self
.health_checks
:
2983 del self
.health_checks
[name
]
2984 self
.set_health_checks(self
.health_checks
)
2986 def _plan(self
, spec
: ServiceSpec
) -> dict:
2987 if spec
.service_type
== 'osd':
2988 return {'service_name': spec
.service_name(),
2989 'service_type': spec
.service_type
,
2990 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
2992 svc
= self
.cephadm_services
[spec
.service_type
]
2993 ha
= HostAssignment(
2995 hosts
=self
.cache
.get_schedulable_hosts(),
2996 unreachable_hosts
=self
.cache
.get_unreachable_hosts(),
2997 draining_hosts
=self
.cache
.get_draining_hosts(),
2998 networks
=self
.cache
.networks
,
2999 daemons
=self
.cache
.get_daemons_by_service(spec
.service_name()),
3000 allow_colo
=svc
.allow_colo(),
3001 rank_map
=self
.spec_store
[spec
.service_name()].rank_map
if svc
.ranked() else None
3004 hosts
, to_add
, to_remove
= ha
.place()
3007 'service_name': spec
.service_name(),
3008 'service_type': spec
.service_type
,
3009 'add': [hs
.hostname
for hs
in to_add
],
3010 'remove': [d
.name() for d
in to_remove
]
3014 def plan(self
, specs
: Sequence
[GenericSpec
]) -> List
:
3015 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
3016 'to the current inventory setup. If any of these conditions change, the \n'
3017 'preview will be invalid. Please make sure to have a minimal \n'
3018 'timeframe between planning and applying the specs.'}]
3019 if any([spec
.service_type
== 'host' for spec
in specs
]):
3020 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
3022 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
3025 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
3026 if spec
.placement
.is_empty():
3027 # fill in default placement
3029 'mon': PlacementSpec(count
=5),
3030 'mgr': PlacementSpec(count
=2),
3031 'mds': PlacementSpec(count
=2),
3032 'rgw': PlacementSpec(count
=2),
3033 'ingress': PlacementSpec(count
=2),
3034 'iscsi': PlacementSpec(count
=1),
3035 'nvmeof': PlacementSpec(count
=1),
3036 'rbd-mirror': PlacementSpec(count
=2),
3037 'cephfs-mirror': PlacementSpec(count
=1),
3038 'nfs': PlacementSpec(count
=1),
3039 'grafana': PlacementSpec(count
=1),
3040 'alertmanager': PlacementSpec(count
=1),
3041 'prometheus': PlacementSpec(count
=1),
3042 'node-exporter': PlacementSpec(host_pattern
='*'),
3043 'ceph-exporter': PlacementSpec(host_pattern
='*'),
3044 'loki': PlacementSpec(count
=1),
3045 'promtail': PlacementSpec(host_pattern
='*'),
3046 'crash': PlacementSpec(host_pattern
='*'),
3047 'container': PlacementSpec(count
=1),
3048 'snmp-gateway': PlacementSpec(count
=1),
3049 'elasticsearch': PlacementSpec(count
=1),
3050 'jaeger-agent': PlacementSpec(host_pattern
='*'),
3051 'jaeger-collector': PlacementSpec(count
=1),
3052 'jaeger-query': PlacementSpec(count
=1)
3054 spec
.placement
= defaults
[spec
.service_type
]
3055 elif spec
.service_type
in ['mon', 'mgr'] and \
3056 spec
.placement
.count
is not None and \
3057 spec
.placement
.count
< 1:
3058 raise OrchestratorError('cannot scale %s service below 1' % (
3061 host_count
= len(self
.inventory
.keys())
3062 max_count
= self
.max_count_per_host
3064 if spec
.placement
.count
is not None:
3065 if spec
.service_type
in ['mon', 'mgr']:
3066 if spec
.placement
.count
> max(5, host_count
):
3067 raise OrchestratorError(
3068 (f
'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
3069 elif spec
.service_type
!= 'osd':
3070 if spec
.placement
.count
> (max_count
* host_count
):
3071 raise OrchestratorError((f
'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
3072 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3074 if spec
.placement
.count_per_host
is not None and spec
.placement
.count_per_host
> max_count
and spec
.service_type
!= 'osd':
3075 raise OrchestratorError((f
'The maximum count_per_host allowed is {max_count}.'
3076 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3080 hosts
=self
.inventory
.all_specs(), # All hosts, even those without daemon refresh
3081 unreachable_hosts
=self
.cache
.get_unreachable_hosts(),
3082 draining_hosts
=self
.cache
.get_draining_hosts(),
3083 networks
=self
.cache
.networks
,
3084 daemons
=self
.cache
.get_daemons_by_service(spec
.service_name()),
3085 allow_colo
=self
.cephadm_services
[spec
.service_type
].allow_colo(),
3088 self
.log
.info('Saving service %s spec with placement %s' % (
3089 spec
.service_name(), spec
.placement
.pretty_str()))
3090 self
.spec_store
.save(spec
)
3091 self
._kick
_serve
_loop
()
3092 return "Scheduled %s update..." % spec
.service_name()
3095 def apply(self
, specs
: Sequence
[GenericSpec
], no_overwrite
: bool = False) -> List
[str]:
3099 if spec
.service_type
== 'host' and cast(HostSpec
, spec
).hostname
in self
.inventory
:
3100 results
.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
3101 % (cast(HostSpec
, spec
).hostname
, spec
.service_type
))
3103 elif cast(ServiceSpec
, spec
).service_name() in self
.spec_store
:
3104 results
.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
3105 % (cast(ServiceSpec
, spec
).service_name(), cast(ServiceSpec
, spec
).service_name()))
3107 results
.append(self
._apply
(spec
))
3111 def apply_mgr(self
, spec
: ServiceSpec
) -> str:
3112 return self
._apply
(spec
)
3115 def apply_mds(self
, spec
: ServiceSpec
) -> str:
3116 return self
._apply
(spec
)
3119 def apply_rgw(self
, spec
: ServiceSpec
) -> str:
3120 return self
._apply
(spec
)
3123 def apply_ingress(self
, spec
: ServiceSpec
) -> str:
3124 return self
._apply
(spec
)
3127 def apply_iscsi(self
, spec
: ServiceSpec
) -> str:
3128 return self
._apply
(spec
)
3131 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> str:
3132 return self
._apply
(spec
)
3135 def apply_nfs(self
, spec
: ServiceSpec
) -> str:
3136 return self
._apply
(spec
)
3138 def _get_dashboard_url(self
):
3140 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
3143 def apply_prometheus(self
, spec
: ServiceSpec
) -> str:
3144 return self
._apply
(spec
)
3147 def apply_loki(self
, spec
: ServiceSpec
) -> str:
3148 return self
._apply
(spec
)
3151 def apply_promtail(self
, spec
: ServiceSpec
) -> str:
3152 return self
._apply
(spec
)
3155 def apply_node_exporter(self
, spec
: ServiceSpec
) -> str:
3156 return self
._apply
(spec
)
3159 def apply_ceph_exporter(self
, spec
: ServiceSpec
) -> str:
3160 return self
._apply
(spec
)
3163 def apply_crash(self
, spec
: ServiceSpec
) -> str:
3164 return self
._apply
(spec
)
3167 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
3168 return self
._apply
(spec
)
3171 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
3172 return self
._apply
(spec
)
3175 def apply_container(self
, spec
: ServiceSpec
) -> str:
3176 return self
._apply
(spec
)
3179 def apply_snmp_gateway(self
, spec
: ServiceSpec
) -> str:
3180 return self
._apply
(spec
)
3183 def set_unmanaged(self
, service_name
: str, value
: bool) -> str:
3184 return self
.spec_store
.set_unmanaged(service_name
, value
)
3187 def upgrade_check(self
, image
: str, version
: str) -> str:
3188 if self
.inventory
.get_host_with_state("maintenance"):
3189 raise OrchestratorError("check aborted - you have hosts in maintenance state")
3192 target_name
= self
.container_image_base
+ ':v' + version
3196 raise OrchestratorError('must specify either image or version')
3198 with self
.async_timeout_handler(cmd
=f
'cephadm inspect-image (image {target_name})'):
3199 image_info
= self
.wait_async(CephadmServe(self
)._get
_container
_image
_info
(target_name
))
3201 ceph_image_version
= image_info
.ceph_version
3202 if not ceph_image_version
:
3203 return f
'Unable to extract ceph version from {target_name}.'
3204 if ceph_image_version
.startswith('ceph version '):
3205 ceph_image_version
= ceph_image_version
.split(' ')[2]
3206 version_error
= self
.upgrade
._check
_target
_version
(ceph_image_version
)
3208 return f
'Incompatible upgrade: {version_error}'
3210 self
.log
.debug(f
'image info {image} -> {image_info}')
3212 'target_name': target_name
,
3213 'target_id': image_info
.image_id
,
3214 'target_version': image_info
.ceph_version
,
3215 'needs_update': dict(),
3216 'up_to_date': list(),
3217 'non_ceph_image_daemons': list()
3219 for host
, dm
in self
.cache
.daemons
.items():
3220 for name
, dd
in dm
.items():
3221 # check if the container digest for the digest we're checking upgrades for matches
3222 # the container digests for the daemon if "use_repo_digest" setting is true
3223 # or that the image name matches the daemon's image name if "use_repo_digest"
3224 # is false. The idea is to generally check if the daemon is already using
3225 # the image we're checking upgrade to.
3227 (self
.use_repo_digest
and dd
.matches_digests(image_info
.repo_digests
))
3228 or (not self
.use_repo_digest
and dd
.matches_image_name(image
))
3230 r
['up_to_date'].append(dd
.name())
3231 elif dd
.daemon_type
in CEPH_IMAGE_TYPES
:
3232 r
['needs_update'][dd
.name()] = {
3233 'current_name': dd
.container_image_name
,
3234 'current_id': dd
.container_image_id
,
3235 'current_version': dd
.version
,
3238 r
['non_ceph_image_daemons'].append(dd
.name())
3239 if self
.use_repo_digest
and image_info
.repo_digests
:
3240 # FIXME: we assume the first digest is the best one to use
3241 r
['target_digest'] = image_info
.repo_digests
[0]
3243 return json
.dumps(r
, indent
=4, sort_keys
=True)
3246 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
3247 return self
.upgrade
.upgrade_status()
3250 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool]) -> Dict
[Any
, Any
]:
3251 return self
.upgrade
.upgrade_ls(image
, tags
, show_all_versions
)
3254 def upgrade_start(self
, image
: str, version
: str, daemon_types
: Optional
[List
[str]] = None, host_placement
: Optional
[str] = None,
3255 services
: Optional
[List
[str]] = None, limit
: Optional
[int] = None) -> str:
3256 if self
.inventory
.get_host_with_state("maintenance"):
3257 raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
3258 if self
.offline_hosts
:
3259 raise OrchestratorError(
3260 f
"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
3261 if daemon_types
is not None and services
is not None:
3262 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
3263 if daemon_types
is not None:
3264 for dtype
in daemon_types
:
3265 if dtype
not in CEPH_UPGRADE_ORDER
:
3266 raise OrchestratorError(f
'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
3267 f
'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
3268 if services
is not None:
3269 for service
in services
:
3270 if service
not in self
.spec_store
:
3271 raise OrchestratorError(f
'Upgrade aborted - Got unknown service name "{service}".\n'
3272 f
'Known services are: {self.spec_store.all_specs.keys()}')
3273 hosts
: Optional
[List
[str]] = None
3274 if host_placement
is not None:
3275 all_hosts
= list(self
.inventory
.all_specs())
3276 placement
= PlacementSpec
.from_string(host_placement
)
3277 hosts
= placement
.filter_matching_hostspecs(all_hosts
)
3279 raise OrchestratorError(
3280 f
'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
3282 if limit
is not None:
3284 raise OrchestratorError(
3285 f
'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
3287 return self
.upgrade
.upgrade_start(image
, version
, daemon_types
, hosts
, services
, limit
)
3290 def upgrade_pause(self
) -> str:
3291 return self
.upgrade
.upgrade_pause()
3294 def upgrade_resume(self
) -> str:
3295 return self
.upgrade
.upgrade_resume()
3298 def upgrade_stop(self
) -> str:
3299 return self
.upgrade
.upgrade_stop()
3302 def remove_osds(self
, osd_ids
: List
[str],
3303 replace
: bool = False,
3304 force
: bool = False,
3306 no_destroy
: bool = False) -> str:
3308 Takes a list of OSDs and schedules them for removal.
3309 The function that takes care of the actual removal is
3310 process_removal_queue().
3313 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
3314 to_remove_daemons
= list()
3315 for daemon
in daemons
:
3316 if daemon
.daemon_id
in osd_ids
:
3317 to_remove_daemons
.append(daemon
)
3318 if not to_remove_daemons
:
3319 return f
"Unable to find OSDs: {osd_ids}"
3321 for daemon
in to_remove_daemons
:
3322 assert daemon
.daemon_id
is not None
3324 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
3328 no_destroy
=no_destroy
,
3329 hostname
=daemon
.hostname
,
3330 process_started_at
=datetime_now(),
3331 remove_util
=self
.to_remove_osds
.rm_util
))
3332 except NotFoundError
:
3333 return f
"Unable to find OSDs: {osd_ids}"
3335 # trigger the serve loop to initiate the removal
3336 self
._kick
_serve
_loop
()
3337 warning_zap
= "" if zap
else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
3338 "Run the `ceph-volume lvm zap` command with `--destroy`"
3339 " against the VG/LV if you want them to be destroyed.")
3340 return f
"Scheduled OSD(s) for removal.{warning_zap}"
3343 def stop_remove_osds(self
, osd_ids
: List
[str]) -> str:
3345 Stops a `removal` process for a List of OSDs.
3346 This will revert their weight and remove it from the osds_to_remove queue
3348 for osd_id
in osd_ids
:
3350 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
3351 remove_util
=self
.to_remove_osds
.rm_util
))
3352 except (NotFoundError
, KeyError, ValueError):
3353 return f
'Unable to find OSD in the queue: {osd_id}'
3355 # trigger the serve loop to halt the removal
3356 self
._kick
_serve
_loop
()
3357 return "Stopped OSD(s) removal"
3360 def remove_osds_status(self
) -> List
[OSD
]:
3362 The CLI call to retrieve an osd removal report
3364 return self
.to_remove_osds
.all_osds()
3367 def drain_host(self
, hostname
: str, force
: bool = False, keep_conf_keyring
: bool = False, zap_osd_devices
: bool = False) -> str:
3369 Drain all daemons from a host.
3370 :param host: host name
3373 # if we drain the last admin host we could end up removing the only instance
3374 # of the config and keyring and cause issues
3376 p
= PlacementSpec(label
=SpecialHostLabels
.ADMIN
)
3377 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
3378 if len(admin_hosts
) == 1 and admin_hosts
[0] == hostname
:
3379 raise OrchestratorValidationError(f
"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'"
3380 " label.\nDraining this host could cause the removal"
3381 " of the last cluster config/keyring managed by cephadm.\n"
3382 f
"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
3383 " before completing this operation.\nIf you're certain this is"
3384 " what you want rerun this command with --force.")
3386 self
.add_host_label(hostname
, '_no_schedule')
3387 if not keep_conf_keyring
:
3388 self
.add_host_label(hostname
, SpecialHostLabels
.DRAIN_CONF_KEYRING
)
3390 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_host(hostname
)
3392 osds_to_remove
= [d
.daemon_id
for d
in daemons
if d
.daemon_type
== 'osd']
3393 self
.remove_osds(osds_to_remove
, zap
=zap_osd_devices
)
3396 daemons_table
+= "{:<20} {:<15}\n".format("type", "id")
3397 daemons_table
+= "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
3399 daemons_table
+= "{:<20} {:<15}\n".format(d
.daemon_type
, d
.daemon_id
)
3401 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname
, daemons_table
)
3403 def trigger_connect_dashboard_rgw(self
) -> None:
3404 self
.need_connect_dashboard_rgw
= True