7 from collections
import defaultdict
8 from configparser
import ConfigParser
9 from functools
import wraps
10 from tempfile
import TemporaryDirectory
, NamedTemporaryFile
11 from threading
import Event
14 from typing
import List
, Dict
, Optional
, Callable
, Tuple
, TypeVar
, \
15 Any
, Set
, TYPE_CHECKING
, cast
, NamedTuple
, Sequence
, Type
, Awaitable
20 import multiprocessing
.pool
22 from prettytable
import PrettyTable
24 from ceph
.deployment
import inventory
25 from ceph
.deployment
.drive_group
import DriveGroupSpec
26 from ceph
.deployment
.service_spec
import \
27 ServiceSpec
, PlacementSpec
, \
28 HostPlacementSpec
, IngressSpec
, \
30 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
31 from cephadm
.serve
import CephadmServe
32 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
33 from cephadm
.agent
import CherryPyThread
, CephadmAgentHelpers
36 from mgr_module
import MgrModule
, HandleCommandResult
, Option
, NotifyType
38 from orchestrator
.module
import to_format
, Format
40 from orchestrator
import OrchestratorError
, OrchestratorValidationError
, HostSpec
, \
41 CLICommandMeta
, DaemonDescription
, DaemonDescriptionStatus
, handle_orch_error
, \
42 service_to_daemon_types
43 from orchestrator
._interface
import GenericSpec
44 from orchestrator
._interface
import daemon_type_to_service
48 from .migrations
import Migrations
49 from .services
.cephadmservice
import MonService
, MgrService
, MdsService
, RgwService
, \
50 RbdMirrorService
, CrashService
, CephadmService
, CephfsMirrorService
, CephadmAgent
51 from .services
.ingress
import IngressService
52 from .services
.container
import CustomContainerService
53 from .services
.iscsi
import IscsiService
54 from .services
.nfs
import NFSService
55 from .services
.osd
import OSDRemovalQueue
, OSDService
, OSD
, NotFoundError
56 from .services
.monitoring
import GrafanaService
, AlertmanagerService
, PrometheusService
, \
57 NodeExporterService
, SNMPGatewayService
, LokiService
, PromtailService
58 from .schedule
import HostAssignment
59 from .inventory
import Inventory
, SpecStore
, HostCache
, AgentCache
, EventStore
, \
60 ClientKeyringStore
, ClientKeyringSpec
, TunedProfileStore
61 from .upgrade
import CephadmUpgrade
62 from .template
import TemplateMgr
63 from .utils
import CEPH_IMAGE_TYPES
, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
, forall_hosts
, \
64 cephadmNoImage
, CEPH_UPGRADE_ORDER
65 from .configchecks
import CephadmConfigChecks
66 from .offline_watcher
import OfflineHostWatcher
67 from .tuned_profiles
import TunedProfileUtils
71 except ImportError as e
:
72 asyncssh
= None # type: ignore
73 asyncssh_import_error
= str(e
)
75 logger
= logging
.getLogger(__name__
)
79 DEFAULT_SSH_CONFIG
= """
82 StrictHostKeyChecking no
83 UserKnownHostsFile /dev/null
87 # cherrypy likes to sys.exit on error. don't let it take us down too!
90 def os_exit_noop(status
: int) -> None:
94 os
._exit
= os_exit_noop
# type: ignore
97 # Default container images -----------------------------------------------------
98 DEFAULT_IMAGE
= 'quay.io/ceph/ceph'
99 DEFAULT_PROMETHEUS_IMAGE
= 'quay.io/prometheus/prometheus:v2.33.4'
100 DEFAULT_NODE_EXPORTER_IMAGE
= 'quay.io/prometheus/node-exporter:v1.3.1'
101 DEFAULT_LOKI_IMAGE
= 'docker.io/grafana/loki:2.4.0'
102 DEFAULT_PROMTAIL_IMAGE
= 'docker.io/grafana/promtail:2.4.0'
103 DEFAULT_ALERT_MANAGER_IMAGE
= 'quay.io/prometheus/alertmanager:v0.23.0'
104 DEFAULT_GRAFANA_IMAGE
= 'quay.io/ceph/ceph-grafana:8.3.5'
105 DEFAULT_HAPROXY_IMAGE
= 'quay.io/ceph/haproxy:2.3'
106 DEFAULT_KEEPALIVED_IMAGE
= 'quay.io/ceph/keepalived:2.1.5'
107 DEFAULT_SNMP_GATEWAY_IMAGE
= 'docker.io/maxwo/snmp-notifier:v1.2.1'
108 # ------------------------------------------------------------------------------
111 def host_exists(hostname_position
: int = 1) -> Callable
:
112 """Check that a hostname exists in the inventory"""
113 def inner(func
: Callable
) -> Callable
:
115 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
116 this
= args
[0] # self object
117 hostname
= args
[hostname_position
]
118 if hostname
not in this
.cache
.get_hosts():
119 candidates
= ','.join([h
for h
in this
.cache
.get_hosts() if h
.startswith(hostname
)])
120 help_msg
= f
"Did you mean {candidates}?" if candidates
else ""
121 raise OrchestratorError(
122 f
"Cannot find host '{hostname}' in the inventory. {help_msg}")
124 return func(*args
, **kwargs
)
129 class CephadmOrchestrator(orchestrator
.Orchestrator
, MgrModule
,
130 metaclass
=CLICommandMeta
):
132 _STORE_HOST_PREFIX
= "host"
135 NOTIFY_TYPES
= [NotifyType
.mon_map
, NotifyType
.pg_summary
]
136 NATIVE_OPTIONS
= [] # type: List[Any]
142 desc
='customized SSH config file to connect to managed hosts',
145 'device_cache_timeout',
148 desc
='seconds to cache device inventory',
151 'device_enhanced_scan',
154 desc
='Use libstoragemgmt during device scans',
157 'daemon_cache_timeout',
160 desc
='seconds to cache service (daemon) inventory',
163 'facts_cache_timeout',
166 desc
='seconds to cache host facts data',
169 'host_check_interval',
172 desc
='how frequently to perform a host check',
177 enum_allowed
=['root', 'cephadm-package'],
179 desc
='mode for remote execution of cephadm',
182 'container_image_base',
183 default
=DEFAULT_IMAGE
,
184 desc
='Container image name, without the tag',
188 'container_image_prometheus',
189 default
=DEFAULT_PROMETHEUS_IMAGE
,
190 desc
='Prometheus container image',
193 'container_image_grafana',
194 default
=DEFAULT_GRAFANA_IMAGE
,
195 desc
='Prometheus container image',
198 'container_image_alertmanager',
199 default
=DEFAULT_ALERT_MANAGER_IMAGE
,
200 desc
='Prometheus container image',
203 'container_image_node_exporter',
204 default
=DEFAULT_NODE_EXPORTER_IMAGE
,
205 desc
='Prometheus container image',
208 'container_image_loki',
209 default
=DEFAULT_LOKI_IMAGE
,
210 desc
='Loki container image',
213 'container_image_promtail',
214 default
=DEFAULT_PROMTAIL_IMAGE
,
215 desc
='Promtail container image',
218 'container_image_haproxy',
219 default
=DEFAULT_HAPROXY_IMAGE
,
220 desc
='HAproxy container image',
223 'container_image_keepalived',
224 default
=DEFAULT_KEEPALIVED_IMAGE
,
225 desc
='Keepalived container image',
228 'container_image_snmp_gateway',
229 default
=DEFAULT_SNMP_GATEWAY_IMAGE
,
230 desc
='SNMP Gateway container image',
233 'warn_on_stray_hosts',
236 desc
='raise a health warning if daemons are detected on a host '
237 'that is not managed by cephadm',
240 'warn_on_stray_daemons',
243 desc
='raise a health warning if daemons are detected '
244 'that are not managed by cephadm',
247 'warn_on_failed_host_check',
250 desc
='raise a health warning if the host check fails',
256 desc
='log to the "cephadm" cluster log channel"',
262 desc
='allow SYS_PTRACE capability on ceph containers',
263 long_desc
='The SYS_PTRACE capability is needed to attach to a '
264 'process with gdb or strace. Enabling this options '
265 'can allow debugging daemons that encounter problems '
272 desc
='Run podman/docker with `--init`'
275 'prometheus_alerts_path',
277 default
='/etc/prometheus/ceph/ceph_default_alerts.yml',
278 desc
='location of alerts to include in prometheus deployments',
284 desc
='internal - do not modify',
285 # used to track track spec and other data migrations.
291 desc
='manage configs like API endpoints in Dashboard.'
294 'manage_etc_ceph_ceph_conf',
297 desc
='Manage and own /etc/ceph/ceph.conf on the hosts.',
300 'manage_etc_ceph_ceph_conf_hosts',
303 desc
='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
310 desc
='Registry url for login purposes. This is not the default registry'
316 desc
='Custom repository username. Only used for logging into a registry.'
322 desc
='Custom repository password. Only used for logging into a registry.'
329 desc
='Registry is to be considered insecure (no TLS available). Only for development purposes.'
335 desc
='Automatically convert image tags to image digest. Make sure all daemons use the same image',
338 'config_checks_enabled',
341 desc
='Enable or disable the cephadm configuration analysis',
347 desc
='Search-registry to which we should normalize unqualified image names. '
348 'This is not the default registry',
351 'max_count_per_host',
354 desc
='max number of daemons per service per host',
357 'autotune_memory_target_ratio',
360 desc
='ratio of total system memory to divide amongst autotuned daemons'
366 desc
='how frequently to autotune daemon memory'
372 desc
='Use cephadm agent on each host to gather and send metadata'
375 'agent_refresh_rate',
378 desc
='How often agent on each host will try to gather and send metadata'
381 'agent_starting_port',
384 desc
='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
387 'agent_down_multiplier',
390 desc
='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
393 'max_osd_draining_count',
396 desc
='max number of osds that will be drained simultaneously when osds are removed'
400 def __init__(self
, *args
: Any
, **kwargs
: Any
):
401 super(CephadmOrchestrator
, self
).__init
__(*args
, **kwargs
)
402 self
._cluster
_fsid
: str = self
.get('mon_map')['fsid']
403 self
.last_monmap
: Optional
[datetime
.datetime
] = None
409 self
.ssh
= ssh
.SSHManager(self
)
411 if self
.get_store('pause'):
416 # for mypy which does not run the code
418 self
.ssh_config_file
= None # type: Optional[str]
419 self
.device_cache_timeout
= 0
420 self
.daemon_cache_timeout
= 0
421 self
.facts_cache_timeout
= 0
422 self
.host_check_interval
= 0
423 self
.max_count_per_host
= 0
425 self
.container_image_base
= ''
426 self
.container_image_prometheus
= ''
427 self
.container_image_grafana
= ''
428 self
.container_image_alertmanager
= ''
429 self
.container_image_node_exporter
= ''
430 self
.container_image_loki
= ''
431 self
.container_image_promtail
= ''
432 self
.container_image_haproxy
= ''
433 self
.container_image_keepalived
= ''
434 self
.container_image_snmp_gateway
= ''
435 self
.warn_on_stray_hosts
= True
436 self
.warn_on_stray_daemons
= True
437 self
.warn_on_failed_host_check
= True
438 self
.allow_ptrace
= False
439 self
.container_init
= True
440 self
.prometheus_alerts_path
= ''
441 self
.migration_current
: Optional
[int] = None
442 self
.config_dashboard
= True
443 self
.manage_etc_ceph_ceph_conf
= True
444 self
.manage_etc_ceph_ceph_conf_hosts
= '*'
445 self
.registry_url
: Optional
[str] = None
446 self
.registry_username
: Optional
[str] = None
447 self
.registry_password
: Optional
[str] = None
448 self
.registry_insecure
: bool = False
449 self
.use_repo_digest
= True
450 self
.default_registry
= ''
451 self
.autotune_memory_target_ratio
= 0.0
452 self
.autotune_interval
= 0
453 self
.ssh_user
: Optional
[str] = None
454 self
._ssh
_options
: Optional
[str] = None
455 self
.tkey
= NamedTemporaryFile()
456 self
.ssh_config_fname
: Optional
[str] = None
457 self
.ssh_config
: Optional
[str] = None
458 self
._temp
_files
: List
= []
459 self
.ssh_key
: Optional
[str] = None
460 self
.ssh_pub
: Optional
[str] = None
461 self
.use_agent
= False
462 self
.agent_refresh_rate
= 0
463 self
.agent_down_multiplier
= 0.0
464 self
.agent_starting_port
= 0
465 self
.apply_spec_fails
: List
[Tuple
[str, str]] = []
466 self
.max_osd_draining_count
= 10
467 self
.device_enhanced_scan
= False
469 self
.notify(NotifyType
.mon_map
, None)
472 path
= self
.get_ceph_option('cephadm_path')
474 assert isinstance(path
, str)
475 with
open(path
, 'r') as f
:
476 self
._cephadm
= f
.read()
477 except (IOError, TypeError) as e
:
478 raise RuntimeError("unable to read cephadm at '%s': %s" % (
481 self
.cephadm_binary_path
= self
._get
_cephadm
_binary
_path
()
483 self
._worker
_pool
= multiprocessing
.pool
.ThreadPool(10)
485 self
.ssh
._reconfig
_ssh
()
487 CephadmOrchestrator
.instance
= self
489 self
.upgrade
= CephadmUpgrade(self
)
491 self
.health_checks
: Dict
[str, dict] = {}
493 self
.inventory
= Inventory(self
)
495 self
.cache
= HostCache(self
)
498 self
.agent_cache
= AgentCache(self
)
499 self
.agent_cache
.load()
501 self
.to_remove_osds
= OSDRemovalQueue(self
)
502 self
.to_remove_osds
.load_from_store()
504 self
.spec_store
= SpecStore(self
)
505 self
.spec_store
.load()
507 self
.keys
= ClientKeyringStore(self
)
510 self
.tuned_profiles
= TunedProfileStore(self
)
511 self
.tuned_profiles
.load()
513 self
.tuned_profile_utils
= TunedProfileUtils(self
)
515 # ensure the host lists are in sync
516 for h
in self
.inventory
.keys():
517 if h
not in self
.cache
.daemons
:
518 self
.cache
.prime_empty_host(h
)
519 for h
in self
.cache
.get_hosts():
520 if h
not in self
.inventory
:
521 self
.cache
.rm_host(h
)
524 self
.events
= EventStore(self
)
525 self
.offline_hosts
: Set
[str] = set()
527 self
.migration
= Migrations(self
)
529 _service_clses
: Sequence
[Type
[CephadmService
]] = [
530 OSDService
, NFSService
, MonService
, MgrService
, MdsService
,
531 RgwService
, RbdMirrorService
, GrafanaService
, AlertmanagerService
,
532 PrometheusService
, NodeExporterService
, LokiService
, PromtailService
, CrashService
, IscsiService
,
533 IngressService
, CustomContainerService
, CephfsMirrorService
,
534 CephadmAgent
, SNMPGatewayService
537 # https://github.com/python/mypy/issues/8993
538 self
.cephadm_services
: Dict
[str, CephadmService
] = {
539 cls
.TYPE
: cls(self
) for cls
in _service_clses
} # type: ignore
541 self
.mgr_service
: MgrService
= cast(MgrService
, self
.cephadm_services
['mgr'])
542 self
.osd_service
: OSDService
= cast(OSDService
, self
.cephadm_services
['osd'])
544 self
.template
= TemplateMgr(self
)
546 self
.requires_post_actions
: Set
[str] = set()
547 self
.need_connect_dashboard_rgw
= False
549 self
.config_checker
= CephadmConfigChecks(self
)
551 self
.cherrypy_thread
= CherryPyThread(self
)
552 self
.cherrypy_thread
.start()
553 self
.agent_helpers
= CephadmAgentHelpers(self
)
555 self
.agent_helpers
._apply
_agent
()
557 self
.offline_watcher
= OfflineHostWatcher(self
)
558 self
.offline_watcher
.start()
560 def shutdown(self
) -> None:
561 self
.log
.debug('shutdown')
562 self
._worker
_pool
.close()
563 self
._worker
_pool
.join()
564 self
.cherrypy_thread
.shutdown()
565 self
.offline_watcher
.shutdown()
569 def _get_cephadm_service(self
, service_type
: str) -> CephadmService
:
570 assert service_type
in ServiceSpec
.KNOWN_SERVICE_TYPES
571 return self
.cephadm_services
[service_type
]
573 def _get_cephadm_binary_path(self
) -> str:
576 m
.update(self
._cephadm
.encode())
577 return f
'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
579 def _kick_serve_loop(self
) -> None:
580 self
.log
.debug('_kick_serve_loop')
583 def serve(self
) -> None:
585 The main loop of cephadm.
587 A command handler will typically change the declarative state
588 of cephadm. This loop will then attempt to apply this new state.
591 self
.event_loop
= ssh
.EventLoopThread()
593 serve
= CephadmServe(self
)
596 def wait_async(self
, coro
: Awaitable
[T
]) -> T
:
597 return self
.event_loop
.get_result(coro
)
599 def set_container_image(self
, entity
: str, image
: str) -> None:
600 self
.check_mon_command({
601 'prefix': 'config set',
602 'name': 'container_image',
607 def config_notify(self
) -> None:
609 This method is called whenever one of our config options is changed.
611 TODO: this method should be moved into mgr_module.py
613 for opt
in self
.MODULE_OPTIONS
:
615 opt
['name'], # type: ignore
616 self
.get_module_option(opt
['name'])) # type: ignore
617 self
.log
.debug(' mgr option %s = %s',
618 opt
['name'], getattr(self
, opt
['name'])) # type: ignore
619 for opt
in self
.NATIVE_OPTIONS
:
622 self
.get_ceph_option(opt
))
623 self
.log
.debug(' native option %s = %s', opt
, getattr(self
, opt
)) # type: ignore
627 def notify(self
, notify_type
: NotifyType
, notify_id
: Optional
[str]) -> None:
628 if notify_type
== NotifyType
.mon_map
:
629 # get monmap mtime so we can refresh configs when mons change
630 monmap
= self
.get('mon_map')
631 self
.last_monmap
= str_to_datetime(monmap
['modified'])
632 if self
.last_monmap
and self
.last_monmap
> datetime_now():
633 self
.last_monmap
= None # just in case clocks are skewed
634 if getattr(self
, 'manage_etc_ceph_ceph_conf', False):
635 # getattr, due to notify() being called before config_notify()
636 self
._kick
_serve
_loop
()
637 if notify_type
== NotifyType
.pg_summary
:
638 self
._trigger
_osd
_removal
()
640 def _trigger_osd_removal(self
) -> None:
641 remove_queue
= self
.to_remove_osds
.as_osd_ids()
644 data
= self
.get("osd_stats")
645 for osd
in data
.get('osd_stats', []):
646 if osd
.get('num_pgs') == 0:
647 # if _ANY_ osd that is currently in the queue appears to be empty,
648 # start the removal process
649 if int(osd
.get('osd')) in remove_queue
:
650 self
.log
.debug('Found empty osd. Starting removal process')
651 # if the osd that is now empty is also part of the removal queue
653 self
._kick
_serve
_loop
()
655 def pause(self
) -> None:
657 self
.log
.info('Paused')
658 self
.set_store('pause', 'true')
660 # wake loop so we update the health status
661 self
._kick
_serve
_loop
()
663 def resume(self
) -> None:
665 self
.log
.info('Resumed')
667 self
.set_store('pause', None)
668 # unconditionally wake loop so that 'orch resume' can be used to kick
670 self
._kick
_serve
_loop
()
676 existing
: List
[orchestrator
.DaemonDescription
],
677 prefix
: Optional
[str] = None,
678 forcename
: Optional
[str] = None,
679 rank
: Optional
[int] = None,
680 rank_generation
: Optional
[int] = None,
683 Generate a unique random service name
685 suffix
= daemon_type
not in [
687 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
688 'container', 'agent', 'snmp-gateway', 'loki', 'promtail'
691 if len([d
for d
in existing
if d
.daemon_id
== forcename
]):
692 raise orchestrator
.OrchestratorValidationError(
693 f
'name {daemon_type}.{forcename} already in use')
697 host
= host
.split('.')[0]
703 if rank
is not None and rank_generation
is not None:
704 name
+= f
'{rank}.{rank_generation}.'
707 name
+= '.' + ''.join(random
.choice(string
.ascii_lowercase
)
709 if len([d
for d
in existing
if d
.daemon_id
== name
]):
711 raise orchestrator
.OrchestratorValidationError(
712 f
'name {daemon_type}.{name} already in use')
713 self
.log
.debug('name %s exists, trying again', name
)
717 def validate_ssh_config_content(self
, ssh_config
: Optional
[str]) -> None:
718 if ssh_config
is None or len(ssh_config
.strip()) == 0:
719 raise OrchestratorValidationError('ssh_config cannot be empty')
720 # StrictHostKeyChecking is [yes|no] ?
721 res
= re
.findall(r
'StrictHostKeyChecking\s+.*', ssh_config
)
723 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
725 if 'ask' in s
.lower():
726 raise OrchestratorValidationError(f
'ssh_config cannot contain: \'{s}\'')
728 def validate_ssh_config_fname(self
, ssh_config_fname
: str) -> None:
729 if not os
.path
.isfile(ssh_config_fname
):
730 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
733 def _process_ls_output(self
, host
: str, ls
: List
[Dict
[str, Any
]]) -> None:
736 if not d
['style'].startswith('cephadm'):
738 if d
['fsid'] != self
._cluster
_fsid
:
740 if '.' not in d
['name']:
742 sd
= orchestrator
.DaemonDescription()
743 sd
.last_refresh
= datetime_now()
744 for k
in ['created', 'started', 'last_configured', 'last_deployed']:
747 setattr(sd
, k
, str_to_datetime(d
[k
]))
748 sd
.daemon_type
= d
['name'].split('.')[0]
749 if sd
.daemon_type
not in orchestrator
.KNOWN_DAEMON_TYPES
:
750 logger
.warning(f
"Found unknown daemon type {sd.daemon_type} on host {host}")
753 sd
.daemon_id
= '.'.join(d
['name'].split('.')[1:])
755 sd
.container_id
= d
.get('container_id')
758 sd
.container_id
= sd
.container_id
[0:12]
759 sd
.container_image_name
= d
.get('container_image_name')
760 sd
.container_image_id
= d
.get('container_image_id')
761 sd
.container_image_digests
= d
.get('container_image_digests')
762 sd
.memory_usage
= d
.get('memory_usage')
763 sd
.memory_request
= d
.get('memory_request')
764 sd
.memory_limit
= d
.get('memory_limit')
765 sd
.cpu_percentage
= d
.get('cpu_percentage')
766 sd
._service
_name
= d
.get('service_name')
767 sd
.deployed_by
= d
.get('deployed_by')
768 sd
.version
= d
.get('version')
769 sd
.ports
= d
.get('ports')
771 sd
.rank
= int(d
['rank']) if d
.get('rank') is not None else None
772 sd
.rank_generation
= int(d
['rank_generation']) if d
.get(
773 'rank_generation') is not None else None
774 sd
.extra_container_args
= d
.get('extra_container_args')
776 sd
.status_desc
= d
['state']
778 'running': DaemonDescriptionStatus
.running
,
779 'stopped': DaemonDescriptionStatus
.stopped
,
780 'error': DaemonDescriptionStatus
.error
,
781 'unknown': DaemonDescriptionStatus
.error
,
784 sd
.status_desc
= 'unknown'
787 self
.log
.debug('Refreshed host %s daemons (%d)' % (host
, len(dm
)))
788 self
.cache
.update_host_daemons(host
, dm
)
789 self
.cache
.save_host(host
)
792 def update_watched_hosts(self
) -> None:
793 # currently, we are watching hosts with nfs daemons
794 hosts_to_watch
= [d
.hostname
for d
in self
.cache
.get_daemons(
795 ) if d
.daemon_type
in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
]
796 self
.offline_watcher
.set_hosts(list(set([h
for h
in hosts_to_watch
if h
is not None])))
798 def offline_hosts_remove(self
, host
: str) -> None:
799 if host
in self
.offline_hosts
:
800 self
.offline_hosts
.remove(host
)
802 def update_failed_daemon_health_check(self
) -> None:
804 for dd
in self
.cache
.get_error_daemons():
805 if dd
.daemon_type
!= 'agent': # agents tracked by CEPHADM_AGENT_DOWN
806 failed_daemons
.append('daemon %s on %s is in %s state' % (
807 dd
.name(), dd
.hostname
, dd
.status_desc
809 self
.remove_health_warning('CEPHADM_FAILED_DAEMON')
811 self
.set_health_warning('CEPHADM_FAILED_DAEMON', f
'{len(failed_daemons)} failed cephadm daemon(s)', len(
812 failed_daemons
), failed_daemons
)
815 def can_run() -> Tuple
[bool, str]:
816 if asyncssh
is not None:
819 return False, "loading asyncssh library:{}".format(
820 asyncssh_import_error
)
822 def available(self
) -> Tuple
[bool, str, Dict
[str, Any
]]:
824 The cephadm orchestrator is always available.
826 ok
, err
= self
.can_run()
829 if not self
.ssh_key
or not self
.ssh_pub
:
830 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
832 # mypy is unable to determine type for _processes since it's private
833 worker_count
: int = self
._worker
_pool
._processes
# type: ignore
835 "workers": worker_count
,
836 "paused": self
.paused
,
839 return True, err
, ret
841 def _validate_and_set_ssh_val(self
, what
: str, new
: Optional
[str], old
: Optional
[str]) -> None:
842 self
.set_store(what
, new
)
843 self
.ssh
._reconfig
_ssh
()
844 if self
.cache
.get_hosts():
845 # Can't check anything without hosts
846 host
= self
.cache
.get_hosts()[0]
847 r
= CephadmServe(self
)._check
_host
(host
)
849 # connection failed reset user
850 self
.set_store(what
, old
)
851 self
.ssh
._reconfig
_ssh
()
852 raise OrchestratorError('ssh connection %s@%s failed' % (self
.ssh_user
, host
))
853 self
.log
.info(f
'Set ssh {what}')
855 @orchestrator._cli
_write
_command
(
856 prefix
='cephadm set-ssh-config')
857 def _set_ssh_config(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
859 Set the ssh_config file (use -i <ssh_config>)
861 # Set an ssh_config file provided from stdin
863 old
= self
.ssh_config
865 return 0, "value unchanged", ""
866 self
.validate_ssh_config_content(inbuf
)
867 self
._validate
_and
_set
_ssh
_val
('ssh_config', inbuf
, old
)
870 @orchestrator._cli
_write
_command
('cephadm clear-ssh-config')
871 def _clear_ssh_config(self
) -> Tuple
[int, str, str]:
873 Clear the ssh_config file
875 # Clear the ssh_config file provided from stdin
876 self
.set_store("ssh_config", None)
877 self
.ssh_config_tmp
= None
878 self
.log
.info('Cleared ssh_config')
879 self
.ssh
._reconfig
_ssh
()
882 @orchestrator._cli
_read
_command
('cephadm get-ssh-config')
883 def _get_ssh_config(self
) -> HandleCommandResult
:
885 Returns the ssh config as used by cephadm
887 if self
.ssh_config_file
:
888 self
.validate_ssh_config_fname(self
.ssh_config_file
)
889 with
open(self
.ssh_config_file
) as f
:
890 return HandleCommandResult(stdout
=f
.read())
891 ssh_config
= self
.get_store("ssh_config")
893 return HandleCommandResult(stdout
=ssh_config
)
894 return HandleCommandResult(stdout
=DEFAULT_SSH_CONFIG
)
896 @orchestrator._cli
_write
_command
('cephadm generate-key')
897 def _generate_key(self
) -> Tuple
[int, str, str]:
899 Generate a cluster SSH key (if not present)
901 if not self
.ssh_pub
or not self
.ssh_key
:
902 self
.log
.info('Generating ssh key...')
903 tmp_dir
= TemporaryDirectory()
904 path
= tmp_dir
.name
+ '/key'
906 subprocess
.check_call([
907 '/usr/bin/ssh-keygen',
908 '-C', 'ceph-%s' % self
._cluster
_fsid
,
912 with
open(path
, 'r') as f
:
914 with
open(path
+ '.pub', 'r') as f
:
918 os
.unlink(path
+ '.pub')
920 self
.set_store('ssh_identity_key', secret
)
921 self
.set_store('ssh_identity_pub', pub
)
922 self
.ssh
._reconfig
_ssh
()
925 @orchestrator._cli
_write
_command
(
926 'cephadm set-priv-key')
927 def _set_priv_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
928 """Set cluster SSH private key (use -i <private_key>)"""
929 if inbuf
is None or len(inbuf
) == 0:
930 return -errno
.EINVAL
, "", "empty private ssh key provided"
933 return 0, "value unchanged", ""
934 self
._validate
_and
_set
_ssh
_val
('ssh_identity_key', inbuf
, old
)
935 self
.log
.info('Set ssh private key')
938 @orchestrator._cli
_write
_command
(
939 'cephadm set-pub-key')
940 def _set_pub_key(self
, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
941 """Set cluster SSH public key (use -i <public_key>)"""
942 if inbuf
is None or len(inbuf
) == 0:
943 return -errno
.EINVAL
, "", "empty public ssh key provided"
946 return 0, "value unchanged", ""
947 self
._validate
_and
_set
_ssh
_val
('ssh_identity_pub', inbuf
, old
)
950 @orchestrator._cli
_write
_command
(
952 def _clear_key(self
) -> Tuple
[int, str, str]:
953 """Clear cluster SSH key"""
954 self
.set_store('ssh_identity_key', None)
955 self
.set_store('ssh_identity_pub', None)
956 self
.ssh
._reconfig
_ssh
()
957 self
.log
.info('Cleared cluster SSH key')
960 @orchestrator._cli
_read
_command
(
961 'cephadm get-pub-key')
962 def _get_pub_key(self
) -> Tuple
[int, str, str]:
963 """Show SSH public key for connecting to cluster hosts"""
965 return 0, self
.ssh_pub
, ''
967 return -errno
.ENOENT
, '', 'No cluster SSH key defined'
969 @orchestrator._cli
_read
_command
(
971 def _get_user(self
) -> Tuple
[int, str, str]:
973 Show user for SSHing to cluster hosts
975 if self
.ssh_user
is None:
976 return -errno
.ENOENT
, '', 'No cluster SSH user configured'
978 return 0, self
.ssh_user
, ''
980 @orchestrator._cli
_read
_command
(
982 def set_ssh_user(self
, user
: str) -> Tuple
[int, str, str]:
984 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
986 current_user
= self
.ssh_user
987 if user
== current_user
:
988 return 0, "value unchanged", ""
990 self
._validate
_and
_set
_ssh
_val
('ssh_user', user
, current_user
)
991 current_ssh_config
= self
._get
_ssh
_config
()
992 new_ssh_config
= re
.sub(r
"(\s{2}User\s)(.*)", r
"\1" + user
, current_ssh_config
.stdout
)
993 self
._set
_ssh
_config
(new_ssh_config
)
995 msg
= 'ssh user set to %s' % user
997 msg
+= '. sudo will be used'
1001 @orchestrator._cli
_read
_command
(
1002 'cephadm registry-login')
1003 def registry_login(self
, url
: Optional
[str] = None, username
: Optional
[str] = None, password
: Optional
[str] = None, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1005 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1007 # if password not given in command line, get it through file input
1008 if not (url
and username
and password
) and (inbuf
is None or len(inbuf
) == 0):
1009 return -errno
.EINVAL
, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
1010 "or -i <login credentials json file>")
1011 elif (url
and username
and password
):
1012 registry_json
= {'url': url
, 'username': username
, 'password': password
}
1014 assert isinstance(inbuf
, str)
1015 registry_json
= json
.loads(inbuf
)
1016 if "url" not in registry_json
or "username" not in registry_json
or "password" not in registry_json
:
1017 return -errno
.EINVAL
, "", ("json provided for custom registry login did not include all necessary fields. "
1018 "Please setup json file as\n"
1020 " \"url\": \"REGISTRY_URL\",\n"
1021 " \"username\": \"REGISTRY_USERNAME\",\n"
1022 " \"password\": \"REGISTRY_PASSWORD\"\n"
1025 # verify login info works by attempting login on random host
1027 for host_name
in self
.inventory
.keys():
1031 raise OrchestratorError('no hosts defined')
1032 r
= self
.wait_async(CephadmServe(self
)._registry
_login
(host
, registry_json
))
1035 # if logins succeeded, store info
1036 self
.log
.debug("Host logins successful. Storing login info.")
1037 self
.set_store('registry_credentials', json
.dumps(registry_json
))
1038 # distribute new login info to all hosts
1039 self
.cache
.distribute_new_registry_login_info()
1040 return 0, "registry login scheduled", ''
1042 @orchestrator._cli
_read
_command
('cephadm check-host')
1043 def check_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
1044 """Check whether we can access and manage a remote host"""
1046 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(host
, cephadmNoImage
, 'check-host',
1047 ['--expect-hostname', host
],
1049 error_ok
=True, no_fsid
=True))
1051 return 1, '', ('check-host failed:\n' + '\n'.join(err
))
1052 except OrchestratorError
:
1053 self
.log
.exception(f
"check-host failed for '{host}'")
1054 return 1, '', ('check-host failed:\n'
1055 + f
"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
1056 # if we have an outstanding health alert for this host, give the
1057 # serve thread a kick
1058 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1059 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1060 if item
.startswith('host %s ' % host
):
1062 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
1064 @orchestrator._cli
_read
_command
(
1065 'cephadm prepare-host')
1066 def _prepare_host(self
, host
: str, addr
: Optional
[str] = None) -> Tuple
[int, str, str]:
1067 """Prepare a remote host for use with cephadm"""
1068 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(host
, cephadmNoImage
, 'prepare-host',
1069 ['--expect-hostname', host
],
1071 error_ok
=True, no_fsid
=True))
1073 return 1, '', ('prepare-host failed:\n' + '\n'.join(err
))
1074 # if we have an outstanding health alert for this host, give the
1075 # serve thread a kick
1076 if 'CEPHADM_HOST_CHECK_FAILED' in self
.health_checks
:
1077 for item
in self
.health_checks
['CEPHADM_HOST_CHECK_FAILED']['detail']:
1078 if item
.startswith('host %s ' % host
):
1080 return 0, '%s (%s) ok' % (host
, addr
), '\n'.join(err
)
1082 @orchestrator._cli
_write
_command
(
1083 prefix
='cephadm set-extra-ceph-conf')
1084 def _set_extra_ceph_conf(self
, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1086 Text that is appended to all daemon's ceph.conf.
1087 Mainly a workaround, till `config generate-minimal-conf` generates
1088 a complete ceph.conf.
1090 Warning: this is a dangerous operation.
1095 cp
.read_string(inbuf
, source
='<infile>')
1097 self
.set_store("extra_ceph_conf", json
.dumps({
1099 'last_modified': datetime_to_str(datetime_now())
1101 self
.log
.info('Set extra_ceph_conf')
1102 self
._kick
_serve
_loop
()
1103 return HandleCommandResult()
1105 @orchestrator._cli
_read
_command
(
1106 'cephadm get-extra-ceph-conf')
1107 def _get_extra_ceph_conf(self
) -> HandleCommandResult
:
1109 Get extra ceph conf that is appended
1111 return HandleCommandResult(stdout
=self
.extra_ceph_conf().conf
)
1113 @orchestrator._cli
_read
_command
('cephadm config-check ls')
1114 def _config_checks_list(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1115 """List the available configuration checks and their current state"""
1117 if format
not in [Format
.plain
, Format
.json
, Format
.json_pretty
]:
1118 return HandleCommandResult(
1120 stderr
="Requested format is not supported when listing configuration checks"
1123 if format
in [Format
.json
, Format
.json_pretty
]:
1124 return HandleCommandResult(
1125 stdout
=to_format(self
.config_checker
.health_checks
,
1131 table
= PrettyTable(
1137 table
.align
['NAME'] = 'l'
1138 table
.align
['HEALTHCHECK'] = 'l'
1139 table
.align
['STATUS'] = 'l'
1140 table
.align
['DESCRIPTION'] = 'l'
1141 table
.left_padding_width
= 0
1142 table
.right_padding_width
= 2
1143 for c
in self
.config_checker
.health_checks
:
1151 return HandleCommandResult(stdout
=table
.get_string())
1153 @orchestrator._cli
_read
_command
('cephadm config-check status')
1154 def _config_check_status(self
) -> HandleCommandResult
:
1155 """Show whether the configuration checker feature is enabled/disabled"""
1156 status
= self
.get_module_option('config_checks_enabled')
1157 return HandleCommandResult(stdout
="Enabled" if status
else "Disabled")
1159 @orchestrator._cli
_write
_command
('cephadm config-check enable')
1160 def _config_check_enable(self
, check_name
: str) -> HandleCommandResult
:
1161 """Enable a specific configuration check"""
1162 if not self
._config
_check
_valid
(check_name
):
1163 return HandleCommandResult(retval
=1, stderr
="Invalid check name")
1165 err
, msg
= self
._update
_config
_check
(check_name
, 'enabled')
1167 return HandleCommandResult(
1169 stderr
=f
"Failed to enable check '{check_name}' : {msg}")
1171 return HandleCommandResult(stdout
="ok")
1173 @orchestrator._cli
_write
_command
('cephadm config-check disable')
1174 def _config_check_disable(self
, check_name
: str) -> HandleCommandResult
:
1175 """Disable a specific configuration check"""
1176 if not self
._config
_check
_valid
(check_name
):
1177 return HandleCommandResult(retval
=1, stderr
="Invalid check name")
1179 err
, msg
= self
._update
_config
_check
(check_name
, 'disabled')
1181 return HandleCommandResult(retval
=err
, stderr
=f
"Failed to disable check '{check_name}': {msg}")
1183 # drop any outstanding raised healthcheck for this check
1184 config_check
= self
.config_checker
.lookup_check(check_name
)
1186 if config_check
.healthcheck_name
in self
.health_checks
:
1187 self
.health_checks
.pop(config_check
.healthcheck_name
, None)
1188 self
.set_health_checks(self
.health_checks
)
1191 f
"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1193 return HandleCommandResult(stdout
="ok")
1195 def _config_check_valid(self
, check_name
: str) -> bool:
1196 return check_name
in [chk
.name
for chk
in self
.config_checker
.health_checks
]
1198 def _update_config_check(self
, check_name
: str, status
: str) -> Tuple
[int, str]:
1199 checks_raw
= self
.get_store('config_checks')
1201 return 1, "config_checks setting is not available"
1203 checks
= json
.loads(checks_raw
)
1207 self
.log
.info(f
"updated config check '{check_name}' : {status}")
1208 self
.set_store('config_checks', json
.dumps(checks
))
1211 class ExtraCephConf(NamedTuple
):
1213 last_modified
: Optional
[datetime
.datetime
]
1215 def extra_ceph_conf(self
) -> 'CephadmOrchestrator.ExtraCephConf':
1216 data
= self
.get_store('extra_ceph_conf')
1218 return CephadmOrchestrator
.ExtraCephConf('', None)
1220 j
= json
.loads(data
)
1222 msg
= 'Unable to load extra_ceph_conf: Cannot decode JSON'
1223 self
.log
.exception('%s: \'%s\'', msg
, data
)
1224 return CephadmOrchestrator
.ExtraCephConf('', None)
1225 return CephadmOrchestrator
.ExtraCephConf(j
['conf'], str_to_datetime(j
['last_modified']))
1227 def extra_ceph_conf_is_newer(self
, dt
: datetime
.datetime
) -> bool:
1228 conf
= self
.extra_ceph_conf()
1229 if not conf
.last_modified
:
1231 return conf
.last_modified
> dt
1233 @orchestrator._cli
_write
_command
(
1234 'cephadm osd activate'
1236 def _osd_activate(self
, host
: List
[str]) -> HandleCommandResult
:
1238 Start OSD containers for existing OSDs
1242 def run(h
: str) -> str:
1243 return self
.wait_async(self
.osd_service
.deploy_osd_daemons_for_existing_osds(h
, 'osd'))
1245 return HandleCommandResult(stdout
='\n'.join(run(host
)))
1247 @orchestrator._cli
_read
_command
('orch client-keyring ls')
1248 def _client_keyring_ls(self
, format
: Format
= Format
.plain
) -> HandleCommandResult
:
1250 List client keyrings under cephadm management
1252 if format
!= Format
.plain
:
1253 output
= to_format(self
.keys
.keys
.values(), format
, many
=True, cls
=ClientKeyringSpec
)
1255 table
= PrettyTable(
1256 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1259 table
.left_padding_width
= 0
1260 table
.right_padding_width
= 2
1261 for ks
in sorted(self
.keys
.keys
.values(), key
=lambda ks
: ks
.entity
):
1263 ks
.entity
, ks
.placement
.pretty_str(),
1264 utils
.file_mode_to_str(ks
.mode
),
1265 f
'{ks.uid}:{ks.gid}',
1268 output
= table
.get_string()
1269 return HandleCommandResult(stdout
=output
)
1271 @orchestrator._cli
_write
_command
('orch client-keyring set')
1272 def _client_keyring_set(
1276 owner
: Optional
[str] = None,
1277 mode
: Optional
[str] = None,
1278 ) -> HandleCommandResult
:
1280 Add or update client keyring under cephadm management
1282 if not entity
.startswith('client.'):
1283 raise OrchestratorError('entity must start with client.')
1286 uid
, gid
= map(int, owner
.split(':'))
1288 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1294 imode
= int(mode
, 8)
1296 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1299 pspec
= PlacementSpec
.from_string(placement
)
1300 ks
= ClientKeyringSpec(entity
, pspec
, mode
=imode
, uid
=uid
, gid
=gid
)
1301 self
.keys
.update(ks
)
1302 self
._kick
_serve
_loop
()
1303 return HandleCommandResult()
1305 @orchestrator._cli
_write
_command
('orch client-keyring rm')
1306 def _client_keyring_rm(
1309 ) -> HandleCommandResult
:
1311 Remove client keyring from cephadm management
1313 self
.keys
.rm(entity
)
1314 self
._kick
_serve
_loop
()
1315 return HandleCommandResult()
1317 def _get_container_image(self
, daemon_name
: str) -> Optional
[str]:
1318 daemon_type
= daemon_name
.split('.', 1)[0] # type: ignore
1319 image
: Optional
[str] = None
1320 if daemon_type
in CEPH_IMAGE_TYPES
:
1321 # get container image
1322 image
= str(self
.get_foreign_ceph_option(
1323 utils
.name_to_config_section(daemon_name
),
1326 elif daemon_type
== 'prometheus':
1327 image
= self
.container_image_prometheus
1328 elif daemon_type
== 'grafana':
1329 image
= self
.container_image_grafana
1330 elif daemon_type
== 'alertmanager':
1331 image
= self
.container_image_alertmanager
1332 elif daemon_type
== 'node-exporter':
1333 image
= self
.container_image_node_exporter
1334 elif daemon_type
== 'loki':
1335 image
= self
.container_image_loki
1336 elif daemon_type
== 'promtail':
1337 image
= self
.container_image_promtail
1338 elif daemon_type
== 'haproxy':
1339 image
= self
.container_image_haproxy
1340 elif daemon_type
== 'keepalived':
1341 image
= self
.container_image_keepalived
1342 elif daemon_type
== CustomContainerService
.TYPE
:
1343 # The image can't be resolved, the necessary information
1344 # is only available when a container is deployed (given
1347 elif daemon_type
== 'snmp-gateway':
1348 image
= self
.container_image_snmp_gateway
1350 assert False, daemon_type
1352 self
.log
.debug('%s container image %s' % (daemon_name
, image
))
1356 def _check_valid_addr(self
, host
: str, addr
: str) -> str:
1357 # make sure hostname is resolvable before trying to make a connection
1359 ip_addr
= utils
.resolve_ip(addr
)
1360 except OrchestratorError
as e
:
1362 You may need to supply an address for {addr}
1364 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1365 To add the cephadm SSH key to the host:
1366 > ceph cephadm get-pub-key > ~/ceph.pub
1367 > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1369 To check that the host is reachable open a new shell with the --no-hosts flag:
1370 > cephadm shell --no-hosts
1372 Then run the following:
1373 > ceph cephadm get-ssh-config > ssh_config
1374 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1375 > chmod 0600 ~/cephadm_private_key
1376 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1377 raise OrchestratorError(msg
)
1379 if ipaddress
.ip_address(ip_addr
).is_loopback
and host
== addr
:
1380 # if this is a re-add, use old address. otherwise error
1381 if host
not in self
.inventory
or self
.inventory
.get_addr(host
) == host
:
1382 raise OrchestratorError(
1383 (f
'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1384 + f
'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1386 f
'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1387 ip_addr
= self
.inventory
.get_addr(host
)
1388 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
1389 host
, cephadmNoImage
, 'check-host',
1390 ['--expect-hostname', host
],
1392 error_ok
=True, no_fsid
=True))
1394 msg
= 'check-host failed:\n' + '\n'.join(err
)
1395 # err will contain stdout and stderr, so we filter on the message text to
1396 # only show the errors
1397 errors
= [_i
.replace("ERROR: ", "") for _i
in err
if _i
.startswith('ERROR')]
1399 msg
= f
'Host {host} ({addr}) failed check(s): {errors}'
1400 raise OrchestratorError(msg
)
1403 def _add_host(self
, spec
):
1404 # type: (HostSpec) -> str
1406 Add a host to be managed by the orchestrator.
1408 :param host: host name
1410 HostSpec
.validate(spec
)
1411 ip_addr
= self
._check
_valid
_addr
(spec
.hostname
, spec
.addr
)
1412 if spec
.addr
== spec
.hostname
and ip_addr
:
1415 if spec
.hostname
in self
.inventory
and self
.inventory
.get_addr(spec
.hostname
) != spec
.addr
:
1416 self
.cache
.refresh_all_host_info(spec
.hostname
)
1420 self
.check_mon_command({
1421 'prefix': 'osd crush add-bucket',
1422 'name': spec
.hostname
,
1424 'args': [f
'{k}={v}' for k
, v
in spec
.location
.items()],
1427 if spec
.hostname
not in self
.inventory
:
1428 self
.cache
.prime_empty_host(spec
.hostname
)
1429 self
.inventory
.add_host(spec
)
1430 self
.offline_hosts_remove(spec
.hostname
)
1431 if spec
.status
== 'maintenance':
1432 self
._set
_maintenance
_healthcheck
()
1433 self
.event
.set() # refresh stray health check
1434 self
.log
.info('Added host %s' % spec
.hostname
)
1435 return "Added host '{}' with addr '{}'".format(spec
.hostname
, spec
.addr
)
1438 def add_host(self
, spec
: HostSpec
) -> str:
1439 return self
._add
_host
(spec
)
1442 def remove_host(self
, host
: str, force
: bool = False, offline
: bool = False) -> str:
1444 Remove a host from orchestrator management.
1446 :param host: host name
1447 :param force: bypass running daemons check
1448 :param offline: remove offline host
1451 # check if host is offline
1452 host_offline
= host
in self
.offline_hosts
1454 if host_offline
and not offline
:
1455 raise OrchestratorValidationError(
1456 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host
))
1458 if not host_offline
and offline
:
1459 raise OrchestratorValidationError(
1460 "{} is online, please remove host without --offline.".format(host
))
1462 if offline
and not force
:
1463 raise OrchestratorValidationError("Removing an offline host requires --force")
1465 # check if there are daemons on the host
1467 daemons
= self
.cache
.get_daemons_by_host(host
)
1469 self
.log
.warning(f
"Blocked {host} removal. Daemons running: {daemons}")
1472 daemons_table
+= "{:<20} {:<15}\n".format("type", "id")
1473 daemons_table
+= "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1475 daemons_table
+= "{:<20} {:<15}\n".format(d
.daemon_type
, d
.daemon_id
)
1477 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1478 "The following daemons are running in the host:"
1479 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1480 host
, daemons_table
, host
))
1482 # check, if there we're removing the last _admin host
1484 p
= PlacementSpec(label
='_admin')
1485 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
1486 if len(admin_hosts
) == 1 and admin_hosts
[0] == host
:
1487 raise OrchestratorValidationError(f
"Host {host} is the last host with the '_admin'"
1488 " label. Please add the '_admin' label to a host"
1489 " or add --force to this command")
1491 def run_cmd(cmd_args
: dict) -> None:
1492 ret
, out
, err
= self
.mon_command(cmd_args
)
1494 self
.log
.debug(f
"ran {cmd_args} with mon_command")
1496 f
"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1497 self
.log
.debug(f
"cmd: {cmd_args.get('prefix')} returns: {out}")
1500 daemons
= self
.cache
.get_daemons_by_host(host
)
1502 self
.log
.info(f
"removing: {d.name()}")
1504 if d
.daemon_type
!= 'osd':
1505 self
.cephadm_services
[str(d
.daemon_type
)].pre_remove(d
)
1506 self
.cephadm_services
[str(d
.daemon_type
)].post_remove(d
, is_failed_deploy
=False)
1509 'prefix': 'osd purge-actual',
1510 'id': int(str(d
.daemon_id
)),
1511 'yes_i_really_mean_it': True
1516 'prefix': 'osd crush rm',
1521 self
.inventory
.rm_host(host
)
1522 self
.cache
.rm_host(host
)
1523 self
.ssh
.reset_con(host
)
1524 self
.event
.set() # refresh stray health check
1525 self
.log
.info('Removed host %s' % host
)
1526 return "Removed {} host '{}'".format('offline' if offline
else '', host
)
1529 def update_host_addr(self
, host
: str, addr
: str) -> str:
1530 self
._check
_valid
_addr
(host
, addr
)
1531 self
.inventory
.set_addr(host
, addr
)
1532 self
.ssh
.reset_con(host
)
1533 self
.event
.set() # refresh stray health check
1534 self
.log
.info('Set host %s addr to %s' % (host
, addr
))
1535 return "Updated host '{}' addr to '{}'".format(host
, addr
)
1538 def get_hosts(self
):
1539 # type: () -> List[orchestrator.HostSpec]
1541 Return a list of hosts managed by the orchestrator.
1544 - skip async: manager reads from cache.
1546 return list(self
.inventory
.all_specs())
1549 def get_facts(self
, hostname
: Optional
[str] = None) -> List
[Dict
[str, Any
]]:
1551 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1554 - skip async: manager reads from cache.
1557 return [self
.cache
.get_facts(hostname
)]
1559 return [self
.cache
.get_facts(hostname
) for hostname
in self
.cache
.get_hosts()]
1562 def add_host_label(self
, host
: str, label
: str) -> str:
1563 self
.inventory
.add_label(host
, label
)
1564 self
.log
.info('Added label %s to host %s' % (label
, host
))
1565 self
._kick
_serve
_loop
()
1566 return 'Added label %s to host %s' % (label
, host
)
1569 def remove_host_label(self
, host
: str, label
: str, force
: bool = False) -> str:
1570 # if we remove the _admin label from the only host that has it we could end up
1571 # removing the only instance of the config and keyring and cause issues
1572 if not force
and label
== '_admin':
1573 p
= PlacementSpec(label
='_admin')
1574 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
1575 if len(admin_hosts
) == 1 and admin_hosts
[0] == host
:
1576 raise OrchestratorValidationError(f
"Host {host} is the last host with the '_admin'"
1577 " label.\nRemoving the _admin label from this host could cause the removal"
1578 " of the last cluster config/keyring managed by cephadm.\n"
1579 "It is recommended to add the _admin label to another host"
1580 " before completing this operation.\nIf you're certain this is"
1581 " what you want rerun this command with --force.")
1582 self
.inventory
.rm_label(host
, label
)
1583 self
.log
.info('Removed label %s to host %s' % (label
, host
))
1584 self
._kick
_serve
_loop
()
1585 return 'Removed label %s from host %s' % (label
, host
)
1587 def _host_ok_to_stop(self
, hostname
: str, force
: bool = False) -> Tuple
[int, str]:
1588 self
.log
.debug("running host-ok-to-stop checks")
1589 daemons
= self
.cache
.get_daemons()
1590 daemon_map
: Dict
[str, List
[str]] = defaultdict(lambda: [])
1592 assert dd
.hostname
is not None
1593 assert dd
.daemon_type
is not None
1594 assert dd
.daemon_id
is not None
1595 if dd
.hostname
== hostname
:
1596 daemon_map
[dd
.daemon_type
].append(dd
.daemon_id
)
1598 notifications
: List
[str] = []
1599 error_notifications
: List
[str] = []
1601 for daemon_type
, daemon_ids
in daemon_map
.items():
1602 r
= self
.cephadm_services
[daemon_type_to_service(
1603 daemon_type
)].ok_to_stop(daemon_ids
, force
=force
)
1606 # collect error notifications so user can see every daemon causing host
1607 # to not be okay to stop
1608 error_notifications
.append(r
.stderr
)
1610 # if extra notifications to print for user, add them to notifications list
1611 notifications
.append(r
.stdout
)
1614 # at least one daemon is not okay to stop
1615 return 1, '\n'.join(error_notifications
)
1618 return 0, (f
'It is presumed safe to stop host {hostname}. '
1619 + 'Note the following:\n\n' + '\n'.join(notifications
))
1620 return 0, f
'It is presumed safe to stop host {hostname}'
1623 def host_ok_to_stop(self
, hostname
: str) -> str:
1624 if hostname
not in self
.cache
.get_hosts():
1625 raise OrchestratorError(f
'Cannot find host "{hostname}"')
1627 rc
, msg
= self
._host
_ok
_to
_stop
(hostname
)
1629 raise OrchestratorError(msg
, errno
=rc
)
1634 def _set_maintenance_healthcheck(self
) -> None:
1635 """Raise/update or clear the maintenance health check as needed"""
1637 in_maintenance
= self
.inventory
.get_host_with_state("maintenance")
1638 if not in_maintenance
:
1639 self
.remove_health_warning('HOST_IN_MAINTENANCE')
1641 s
= "host is" if len(in_maintenance
) == 1 else "hosts are"
1642 self
.set_health_warning("HOST_IN_MAINTENANCE", f
"{len(in_maintenance)} {s} in maintenance mode", 1, [
1643 f
"{h} is in maintenance" for h
in in_maintenance
])
1647 def enter_host_maintenance(self
, hostname
: str, force
: bool = False) -> str:
1648 """ Attempt to place a cluster host in maintenance
1650 Placing a host into maintenance disables the cluster's ceph target in systemd
1651 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1652 for the host subtree in crush to prevent data movement during a host maintenance
1655 :param hostname: (str) name of the host (must match an inventory hostname)
1657 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1659 if len(self
.cache
.get_hosts()) == 1:
1660 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1662 # if upgrade is active, deny
1663 if self
.upgrade
.upgrade_state
:
1664 raise OrchestratorError(
1665 f
"Unable to place {hostname} in maintenance with upgrade active/paused")
1667 tgt_host
= self
.inventory
._inventory
[hostname
]
1668 if tgt_host
.get("status", "").lower() == "maintenance":
1669 raise OrchestratorError(f
"Host {hostname} is already in maintenance")
1671 host_daemons
= self
.cache
.get_daemon_types(hostname
)
1672 self
.log
.debug("daemons on host {}".format(','.join(host_daemons
)))
1674 # daemons on this host, so check the daemons can be stopped
1675 # and if so, place the host into maintenance by disabling the target
1676 rc
, msg
= self
._host
_ok
_to
_stop
(hostname
, force
)
1678 raise OrchestratorError(
1679 msg
+ '\nNote: Warnings can be bypassed with the --force flag', errno
=rc
)
1681 # call the host-maintenance function
1682 _out
, _err
, _code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
, "host-maintenance",
1685 returned_msg
= _err
[0].split('\n')[-1]
1686 if returned_msg
.startswith('failed') or returned_msg
.startswith('ERROR'):
1687 raise OrchestratorError(
1688 f
"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1690 if "osd" in host_daemons
:
1691 crush_node
= hostname
if '.' not in hostname
else hostname
.split('.')[0]
1692 rc
, out
, err
= self
.mon_command({
1693 'prefix': 'osd set-group',
1695 'who': [crush_node
],
1700 f
"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1701 raise OrchestratorError(
1702 f
"Unable to set the osds on {hostname} to noout (rc={rc})")
1705 f
"maintenance mode request for {hostname} has SET the noout group")
1707 # update the host status in the inventory
1708 tgt_host
["status"] = "maintenance"
1709 self
.inventory
._inventory
[hostname
] = tgt_host
1710 self
.inventory
.save()
1712 self
._set
_maintenance
_healthcheck
()
1713 return f
'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
1717 def exit_host_maintenance(self
, hostname
: str) -> str:
1718 """Exit maintenance mode and return a host to an operational state
1720 Returning from maintnenance will enable the clusters systemd target and
1721 start it, and remove any noout that has been added for the host if the
1722 host has osd daemons
1724 :param hostname: (str) host name
1726 :raises OrchestratorError: Unable to return from maintenance, or unset the
1729 tgt_host
= self
.inventory
._inventory
[hostname
]
1730 if tgt_host
['status'] != "maintenance":
1731 raise OrchestratorError(f
"Host {hostname} is not in maintenance mode")
1733 outs
, errs
, _code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
, 'host-maintenance',
1736 returned_msg
= errs
[0].split('\n')[-1]
1737 if returned_msg
.startswith('failed') or returned_msg
.startswith('ERROR'):
1738 raise OrchestratorError(
1739 f
"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1741 if "osd" in self
.cache
.get_daemon_types(hostname
):
1742 crush_node
= hostname
if '.' not in hostname
else hostname
.split('.')[0]
1743 rc
, _out
, _err
= self
.mon_command({
1744 'prefix': 'osd unset-group',
1746 'who': [crush_node
],
1751 f
"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1752 raise OrchestratorError(f
"Unable to set the osds on {hostname} to noout (rc={rc})")
1755 f
"exit maintenance request has UNSET for the noout group on host {hostname}")
1757 # update the host record status
1758 tgt_host
['status'] = ""
1759 self
.inventory
._inventory
[hostname
] = tgt_host
1760 self
.inventory
.save()
1762 self
._set
_maintenance
_healthcheck
()
1764 return f
"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1768 def rescan_host(self
, hostname
: str) -> str:
1769 """Use cephadm to issue a disk rescan on each HBA
1771 Some HBAs and external enclosures don't automatically register
1772 device insertion with the kernel, so for these scenarios we need
1775 :param hostname: (str) host name
1777 self
.log
.info(f
'disk rescan request sent to host "{hostname}"')
1778 _out
, _err
, _code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(hostname
, cephadmNoImage
, "disk-rescan",
1783 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
1785 msg
= _err
[0].split('\n')[-1]
1786 log_msg
= f
'disk rescan: {msg}'
1787 if msg
.upper().startswith('OK'):
1788 self
.log
.info(log_msg
)
1790 self
.log
.warning(log_msg
)
1794 def get_minimal_ceph_conf(self
) -> str:
1795 _
, config
, _
= self
.check_mon_command({
1796 "prefix": "config generate-minimal-conf",
1798 extra
= self
.extra_ceph_conf().conf
1800 config
+= '\n\n' + extra
.strip() + '\n'
1803 def _invalidate_daemons_and_kick_serve(self
, filter_host
: Optional
[str] = None) -> None:
1805 self
.cache
.invalidate_host_daemons(filter_host
)
1807 for h
in self
.cache
.get_hosts():
1808 # Also discover daemons deployed manually
1809 self
.cache
.invalidate_host_daemons(h
)
1811 self
._kick
_serve
_loop
()
1814 def describe_service(self
, service_type
: Optional
[str] = None, service_name
: Optional
[str] = None,
1815 refresh
: bool = False) -> List
[orchestrator
.ServiceDescription
]:
1817 self
._invalidate
_daemons
_and
_kick
_serve
()
1818 self
.log
.debug('Kicked serve() loop to refresh all services')
1820 sm
: Dict
[str, orchestrator
.ServiceDescription
] = {}
1823 for nm
, spec
in self
.spec_store
.all_specs
.items():
1824 if service_type
is not None and service_type
!= spec
.service_type
:
1826 if service_name
is not None and service_name
!= nm
:
1829 if spec
.service_type
!= 'osd':
1830 size
= spec
.placement
.get_target_count(self
.cache
.get_schedulable_hosts())
1832 # osd counting is special
1835 sm
[nm
] = orchestrator
.ServiceDescription(
1839 events
=self
.events
.get_for_service(spec
.service_name()),
1840 created
=self
.spec_store
.spec_created
[nm
],
1841 deleted
=self
.spec_store
.spec_deleted
.get(nm
, None),
1842 virtual_ip
=spec
.get_virtual_ip(),
1843 ports
=spec
.get_port_start(),
1845 if spec
.service_type
== 'ingress':
1846 # ingress has 2 daemons running per host
1849 # factor daemons into status
1850 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1851 for name
, dd
in dm
.items():
1852 assert dd
.hostname
is not None, f
'no hostname for {dd!r}'
1853 assert dd
.daemon_type
is not None, f
'no daemon_type for {dd!r}'
1855 n
: str = dd
.service_name()
1859 and service_type
!= daemon_type_to_service(dd
.daemon_type
)
1862 if service_name
and service_name
!= n
:
1866 # new unmanaged service
1869 service_type
=daemon_type_to_service(dd
.daemon_type
),
1870 service_id
=dd
.service_id(),
1872 sm
[n
] = orchestrator
.ServiceDescription(
1873 last_refresh
=dd
.last_refresh
,
1874 container_image_id
=dd
.container_image_id
,
1875 container_image_name
=dd
.container_image_name
,
1880 if dd
.status
== DaemonDescriptionStatus
.running
:
1882 if dd
.daemon_type
== 'osd':
1883 # The osd count can't be determined by the Placement spec.
1884 # Showing an actual/expected representation cannot be determined
1885 # here. So we're setting running = size for now.
1888 not sm
[n
].last_refresh
1889 or not dd
.last_refresh
1890 or dd
.last_refresh
< sm
[n
].last_refresh
# type: ignore
1892 sm
[n
].last_refresh
= dd
.last_refresh
1894 return list(sm
.values())
1897 def list_daemons(self
,
1898 service_name
: Optional
[str] = None,
1899 daemon_type
: Optional
[str] = None,
1900 daemon_id
: Optional
[str] = None,
1901 host
: Optional
[str] = None,
1902 refresh
: bool = False) -> List
[orchestrator
.DaemonDescription
]:
1904 self
._invalidate
_daemons
_and
_kick
_serve
(host
)
1905 self
.log
.debug('Kicked serve() loop to refresh all daemons')
1908 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
1909 if host
and h
!= host
:
1911 for name
, dd
in dm
.items():
1912 if daemon_type
is not None and daemon_type
!= dd
.daemon_type
:
1914 if daemon_id
is not None and daemon_id
!= dd
.daemon_id
:
1916 if service_name
is not None and service_name
!= dd
.service_name():
1918 if not dd
.memory_request
and dd
.daemon_type
in ['osd', 'mon']:
1919 dd
.memory_request
= cast(Optional
[int], self
.get_foreign_ceph_option(
1921 f
"{dd.daemon_type}_memory_target"
1927 def service_action(self
, action
: str, service_name
: str) -> List
[str]:
1928 if service_name
not in self
.spec_store
.all_specs
.keys():
1929 raise OrchestratorError(f
'Invalid service name "{service_name}".'
1930 + ' View currently running services using "ceph orch ls"')
1931 dds
: List
[DaemonDescription
] = self
.cache
.get_daemons_by_service(service_name
)
1933 raise OrchestratorError(f
'No daemons exist under service name "{service_name}".'
1934 + ' View currently running services using "ceph orch ls"')
1935 if action
== 'stop' and service_name
.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
1936 return [f
'Stopping entire {service_name} service is prohibited.']
1937 self
.log
.info('%s service %s' % (action
.capitalize(), service_name
))
1939 self
._schedule
_daemon
_action
(dd
.name(), action
)
1943 def _daemon_action(self
,
1944 daemon_spec
: CephadmDaemonDeploySpec
,
1946 image
: Optional
[str] = None) -> str:
1947 self
._daemon
_action
_set
_image
(action
, image
, daemon_spec
.daemon_type
,
1948 daemon_spec
.daemon_id
)
1950 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(daemon_spec
.daemon_type
,
1951 daemon_spec
.daemon_id
):
1952 self
.mgr_service
.fail_over()
1953 return '' # unreachable
1955 if action
== 'redeploy' or action
== 'reconfig':
1956 if daemon_spec
.daemon_type
!= 'osd':
1957 daemon_spec
= self
.cephadm_services
[daemon_type_to_service(
1958 daemon_spec
.daemon_type
)].prepare_create(daemon_spec
)
1960 # for OSDs, we still need to update config, just not carry out the full
1961 # prepare_create function
1962 daemon_spec
.final_config
, daemon_spec
.deps
= self
.osd_service
.generate_config(daemon_spec
)
1963 return self
.wait_async(CephadmServe(self
)._create
_daemon
(daemon_spec
, reconfig
=(action
== 'reconfig')))
1966 'start': ['reset-failed', 'start'],
1968 'restart': ['reset-failed', 'restart'],
1970 name
= daemon_spec
.name()
1971 for a
in actions
[action
]:
1973 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
1974 daemon_spec
.host
, name
, 'unit',
1975 ['--name', name
, a
]))
1977 self
.log
.exception(f
'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
1978 self
.cache
.invalidate_host_daemons(daemon_spec
.host
)
1979 msg
= "{} {} from host '{}'".format(action
, name
, daemon_spec
.host
)
1980 self
.events
.for_daemon(name
, 'INFO', msg
)
1983 def _daemon_action_set_image(self
, action
: str, image
: Optional
[str], daemon_type
: str, daemon_id
: str) -> None:
1984 if image
is not None:
1985 if action
!= 'redeploy':
1986 raise OrchestratorError(
1987 f
'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1988 if daemon_type
not in CEPH_IMAGE_TYPES
:
1989 raise OrchestratorError(
1990 f
'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1991 f
'types are: {", ".join(CEPH_IMAGE_TYPES)}')
1993 self
.check_mon_command({
1994 'prefix': 'config set',
1995 'name': 'container_image',
1997 'who': utils
.name_to_config_section(daemon_type
+ '.' + daemon_id
),
2001 def daemon_action(self
, action
: str, daemon_name
: str, image
: Optional
[str] = None) -> str:
2002 d
= self
.cache
.get_daemon(daemon_name
)
2003 assert d
.daemon_type
is not None
2004 assert d
.daemon_id
is not None
2006 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(d
.daemon_type
, d
.daemon_id
) \
2007 and not self
.mgr_service
.mgr_map_has_standby():
2008 raise OrchestratorError(
2009 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2011 self
._daemon
_action
_set
_image
(action
, image
, d
.daemon_type
, d
.daemon_id
)
2013 self
.log
.info(f
'Schedule {action} daemon {daemon_name}')
2014 return self
._schedule
_daemon
_action
(daemon_name
, action
)
2016 def daemon_is_self(self
, daemon_type
: str, daemon_id
: str) -> bool:
2017 return daemon_type
== 'mgr' and daemon_id
== self
.get_mgr_id()
2019 def get_active_mgr_digests(self
) -> List
[str]:
2020 digests
= self
.mgr_service
.get_active_daemon(
2021 self
.cache
.get_daemons_by_type('mgr')).container_image_digests
2022 return digests
if digests
else []
2024 def _schedule_daemon_action(self
, daemon_name
: str, action
: str) -> str:
2025 dd
= self
.cache
.get_daemon(daemon_name
)
2026 assert dd
.daemon_type
is not None
2027 assert dd
.daemon_id
is not None
2028 assert dd
.hostname
is not None
2029 if (action
== 'redeploy' or action
== 'restart') and self
.daemon_is_self(dd
.daemon_type
, dd
.daemon_id
) \
2030 and not self
.mgr_service
.mgr_map_has_standby():
2031 raise OrchestratorError(
2032 f
'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2033 self
.cache
.schedule_daemon_action(dd
.hostname
, dd
.name(), action
)
2034 msg
= "Scheduled to {} {} on host '{}'".format(action
, daemon_name
, dd
.hostname
)
2035 self
._kick
_serve
_loop
()
2039 def remove_daemons(self
, names
):
2040 # type: (List[str]) -> List[str]
2042 for host
, dm
in self
.cache
.daemons
.items():
2045 args
.append((name
, host
))
2047 raise OrchestratorError('Unable to find daemon(s) %s' % (names
))
2048 self
.log
.info('Remove daemons %s' % ' '.join([a
[0] for a
in args
]))
2049 return self
._remove
_daemons
(args
)
2052 def remove_service(self
, service_name
: str, force
: bool = False) -> str:
2053 self
.log
.info('Remove service %s' % service_name
)
2054 self
._trigger
_preview
_refresh
(service_name
=service_name
)
2055 if service_name
in self
.spec_store
:
2056 if self
.spec_store
[service_name
].spec
.service_type
in ('mon', 'mgr'):
2057 return f
'Unable to remove {service_name} service.\n' \
2058 f
'Note, you might want to mark the {service_name} service as "unmanaged"'
2060 return f
"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
2062 # Report list of affected OSDs?
2063 if not force
and service_name
.startswith('osd.'):
2065 for h
, dm
in self
.cache
.get_daemons_with_volatile_status():
2067 for name
, dd
in dm
.items():
2068 if dd
.daemon_type
== 'osd' and dd
.service_name() == service_name
:
2069 osds_to_remove
.append(str(dd
.daemon_id
))
2071 osds_msg
[h
] = osds_to_remove
2074 for h
, ls
in osds_msg
.items():
2075 msg
+= f
'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2076 raise OrchestratorError(
2077 f
'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2079 found
= self
.spec_store
.rm(service_name
)
2080 if found
and service_name
.startswith('osd.'):
2081 self
.spec_store
.finally_rm(service_name
)
2082 self
._kick
_serve
_loop
()
2083 return f
'Removed service {service_name}'
2086 def get_inventory(self
, host_filter
: Optional
[orchestrator
.InventoryFilter
] = None, refresh
: bool = False) -> List
[orchestrator
.InventoryHost
]:
2088 Return the storage inventory of hosts matching the given filter.
2090 :param host_filter: host filter
2093 - add filtering by label
2096 if host_filter
and host_filter
.hosts
:
2097 for h
in host_filter
.hosts
:
2098 self
.log
.debug(f
'will refresh {h} devs')
2099 self
.cache
.invalidate_host_devices(h
)
2100 self
.cache
.invalidate_host_networks(h
)
2102 for h
in self
.cache
.get_hosts():
2103 self
.log
.debug(f
'will refresh {h} devs')
2104 self
.cache
.invalidate_host_devices(h
)
2105 self
.cache
.invalidate_host_networks(h
)
2108 self
.log
.debug('Kicked serve() loop to refresh devices')
2111 for host
, dls
in self
.cache
.devices
.items():
2112 if host_filter
and host_filter
.hosts
and host
not in host_filter
.hosts
:
2114 result
.append(orchestrator
.InventoryHost(host
,
2115 inventory
.Devices(dls
)))
2119 def zap_device(self
, host
: str, path
: str) -> str:
2120 """Zap a device on a managed host.
2122 Use ceph-volume zap to return a device to an unused/free state
2125 host (str): hostname of the cluster host
2126 path (str): device path
2129 OrchestratorError: host is not a cluster host
2130 OrchestratorError: host is in maintenance and therefore unavailable
2131 OrchestratorError: device path not found on the host
2132 OrchestratorError: device is known to a different ceph cluster
2133 OrchestratorError: device holds active osd
2134 OrchestratorError: device cache hasn't been populated yet..
2137 str: output from the zap command
2140 self
.log
.info('Zap device %s:%s' % (host
, path
))
2142 if host
not in self
.inventory
.keys():
2143 raise OrchestratorError(
2144 f
"Host '{host}' is not a member of the cluster")
2146 host_info
= self
.inventory
._inventory
.get(host
, {})
2147 if host_info
.get('status', '').lower() == 'maintenance':
2148 raise OrchestratorError(
2149 f
"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2151 if host
not in self
.cache
.devices
:
2152 raise OrchestratorError(
2153 f
"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2155 host_devices
= self
.cache
.devices
[host
]
2157 osd_id_list
: List
[str] = []
2159 for dev
in host_devices
:
2160 if dev
.path
== path
:
2164 raise OrchestratorError(
2165 f
"Device path '{path}' not found on host '{host}'")
2168 dev_name
= os
.path
.basename(path
)
2169 active_osds
: List
[str] = []
2170 for osd_id
in osd_id_list
:
2171 metadata
= self
.get_metadata('osd', str(osd_id
))
2173 if metadata
.get('hostname', '') == host
and dev_name
in metadata
.get('devices', '').split(','):
2174 active_osds
.append("osd." + osd_id
)
2176 raise OrchestratorError(
2177 f
"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2178 f
"OSD{'s' if len(active_osds) > 1 else ''}"
2179 f
" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2181 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2182 host
, 'osd', 'ceph-volume',
2183 ['--', 'lvm', 'zap', '--destroy', path
],
2186 self
.cache
.invalidate_host_devices(host
)
2187 self
.cache
.invalidate_host_networks(host
)
2189 raise OrchestratorError('Zap failed: %s' % '\n'.join(out
+ err
))
2190 msg
= f
'zap successful for {path} on {host}'
2196 def blink_device_light(self
, ident_fault
: str, on
: bool, locs
: List
[orchestrator
.DeviceLightLoc
]) -> List
[str]:
2198 Blink a device light. Calling something like::
2200 lsmcli local-disk-ident-led-on --path $path
2202 If you must, you can customize this via::
2204 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2205 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2207 See templates/blink_device_light_cmd.j2
2210 def blink(host
: str, dev
: str, path
: str) -> str:
2211 cmd_line
= self
.template
.render('blink_device_light_cmd.j2',
2214 'ident_fault': ident_fault
,
2219 cmd_args
= shlex
.split(cmd_line
)
2221 out
, err
, code
= self
.wait_async(CephadmServe(self
)._run
_cephadm
(
2222 host
, 'osd', 'shell', ['--'] + cmd_args
,
2225 raise OrchestratorError(
2226 'Unable to affect %s light for %s:%s. Command: %s' % (
2227 ident_fault
, host
, dev
, ' '.join(cmd_args
)))
2228 self
.log
.info('Set %s light for %s:%s %s' % (
2229 ident_fault
, host
, dev
, 'on' if on
else 'off'))
2230 return "Set %s light for %s:%s %s" % (
2231 ident_fault
, host
, dev
, 'on' if on
else 'off')
2235 def get_osd_uuid_map(self
, only_up
=False):
2236 # type: (bool) -> Dict[str, str]
2237 osd_map
= self
.get('osd_map')
2239 for o
in osd_map
['osds']:
2240 # only include OSDs that have ever started in this map. this way
2241 # an interrupted osd create can be repeated and succeed the second
2243 osd_id
= o
.get('osd')
2245 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2247 r
[str(osd_id
)] = o
.get('uuid', '')
2250 def get_osd_by_id(self
, osd_id
: int) -> Optional
[Dict
[str, Any
]]:
2251 osd
= [x
for x
in self
.get('osd_map')['osds']
2252 if x
['osd'] == osd_id
]
2259 def _trigger_preview_refresh(self
,
2260 specs
: Optional
[List
[DriveGroupSpec
]] = None,
2261 service_name
: Optional
[str] = None,
2263 # Only trigger a refresh when a spec has changed
2267 preview_spec
= self
.spec_store
.spec_preview
.get(spec
.service_name())
2268 # the to-be-preview spec != the actual spec, this means we need to
2269 # trigger a refresh, if the spec has been removed (==None) we need to
2271 if not preview_spec
or spec
!= preview_spec
:
2272 trigger_specs
.append(spec
)
2274 trigger_specs
= [cast(DriveGroupSpec
, self
.spec_store
.spec_preview
.get(service_name
))]
2275 if not any(trigger_specs
):
2278 refresh_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=trigger_specs
)
2279 for host
in refresh_hosts
:
2280 self
.log
.info(f
"Marking host: {host} for OSDSpec preview refresh.")
2281 self
.cache
.osdspec_previews_refresh_queue
.append(host
)
2284 def apply_drivegroups(self
, specs
: List
[DriveGroupSpec
]) -> List
[str]:
2286 Deprecated. Please use `apply()` instead.
2288 Keeping this around to be compapatible to mgr/dashboard
2290 return [self
._apply
(spec
) for spec
in specs
]
2293 def create_osds(self
, drive_group
: DriveGroupSpec
) -> str:
2294 hosts
: List
[HostSpec
] = self
.inventory
.all_specs()
2295 filtered_hosts
: List
[str] = drive_group
.placement
.filter_matching_hostspecs(hosts
)
2296 if not filtered_hosts
:
2297 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
2298 return self
.osd_service
.create_from_spec(drive_group
)
2300 def _preview_osdspecs(self
,
2301 osdspecs
: Optional
[List
[DriveGroupSpec
]] = None
2304 return {'n/a': [{'error': True,
2305 'message': 'No OSDSpec or matching hosts found.'}]}
2306 matching_hosts
= self
.osd_service
.resolve_hosts_for_osdspecs(specs
=osdspecs
)
2307 if not matching_hosts
:
2308 return {'n/a': [{'error': True,
2309 'message': 'No OSDSpec or matching hosts found.'}]}
2310 # Is any host still loading previews or still in the queue to be previewed
2311 pending_hosts
= {h
for h
in self
.cache
.loading_osdspec_preview
if h
in matching_hosts
}
2312 if pending_hosts
or any(item
in self
.cache
.osdspec_previews_refresh_queue
for item
in matching_hosts
):
2313 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2314 return {'n/a': [{'error': True,
2315 'message': 'Preview data is being generated.. '
2316 'Please re-run this command in a bit.'}]}
2317 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2318 previews_for_specs
= {}
2319 for host
, raw_reports
in self
.cache
.osdspec_previews
.items():
2320 if host
not in matching_hosts
:
2323 for osd_report
in raw_reports
:
2324 if osd_report
.get('osdspec') in [x
.service_id
for x
in osdspecs
]:
2325 osd_reports
.append(osd_report
)
2326 previews_for_specs
.update({host
: osd_reports
})
2327 return previews_for_specs
2329 def _calc_daemon_deps(self
,
2330 spec
: Optional
[ServiceSpec
],
2332 daemon_id
: str) -> List
[str]:
2334 if daemon_type
== 'haproxy':
2335 # because cephadm creates new daemon instances whenever
2336 # port or ip changes, identifying daemons by name is
2337 # sufficient to detect changes.
2340 ingress_spec
= cast(IngressSpec
, spec
)
2341 assert ingress_spec
.backend_service
2342 daemons
= self
.cache
.get_daemons_by_service(ingress_spec
.backend_service
)
2343 deps
= [d
.name() for d
in daemons
]
2344 elif daemon_type
== 'keepalived':
2345 # because cephadm creates new daemon instances whenever
2346 # port or ip changes, identifying daemons by name is
2347 # sufficient to detect changes.
2350 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2351 deps
= [d
.name() for d
in daemons
if d
.daemon_type
== 'haproxy']
2352 elif daemon_type
== 'agent':
2356 server_port
= str(self
.cherrypy_thread
.server_port
)
2357 root_cert
= self
.cherrypy_thread
.ssl_certs
.get_root_cert()
2360 deps
= sorted([self
.get_mgr_ip(), server_port
, root_cert
,
2361 str(self
.device_enhanced_scan
)])
2362 elif daemon_type
== 'iscsi':
2363 deps
= [self
.get_mgr_ip()]
2366 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
2367 'grafana': ['prometheus', 'loki'],
2368 'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
2369 'promtail': ['loki'],
2371 for dep_type
in need
.get(daemon_type
, []):
2372 for dd
in self
.cache
.get_daemons_by_type(dep_type
):
2373 deps
.append(dd
.name())
2374 if daemon_type
== 'prometheus':
2375 deps
.append(str(self
.get_module_option_ex('prometheus', 'server_port', 9283)))
2379 def _remove_daemons(self
, name
: str, host
: str) -> str:
2380 return CephadmServe(self
)._remove
_daemon
(name
, host
)
2382 def _check_pool_exists(self
, pool
: str, service_name
: str) -> None:
2383 logger
.info(f
'Checking pool "{pool}" exists for service {service_name}')
2384 if not self
.rados
.pool_exists(pool
):
2385 raise OrchestratorError(f
'Cannot find pool "{pool}" for '
2386 f
'service {service_name}')
2388 def _add_daemon(self
,
2390 spec
: ServiceSpec
) -> List
[str]:
2392 Add (and place) a daemon. Require explicit host placement. Do not
2393 schedule, and do not apply the related scheduling limitations.
2395 if spec
.service_name() not in self
.spec_store
:
2396 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2397 'Please use `ceph orch apply ...` to create a Service.\n'
2398 'Note, you might want to create the service with "unmanaged=true"')
2400 self
.log
.debug('_add_daemon %s spec %s' % (daemon_type
, spec
.placement
))
2401 if not spec
.placement
.hosts
:
2402 raise OrchestratorError('must specify host(s) to deploy on')
2403 count
= spec
.placement
.count
or len(spec
.placement
.hosts
)
2404 daemons
= self
.cache
.get_daemons_by_service(spec
.service_name())
2405 return self
._create
_daemons
(daemon_type
, spec
, daemons
,
2406 spec
.placement
.hosts
, count
)
2408 def _create_daemons(self
,
2411 daemons
: List
[DaemonDescription
],
2412 hosts
: List
[HostPlacementSpec
],
2413 count
: int) -> List
[str]:
2414 if count
> len(hosts
):
2415 raise OrchestratorError('too few hosts: want %d, have %s' % (
2419 service_type
= daemon_type_to_service(daemon_type
)
2421 args
= [] # type: List[CephadmDaemonDeploySpec]
2422 for host
, network
, name
in hosts
:
2423 daemon_id
= self
.get_unique_name(daemon_type
, host
, daemons
,
2424 prefix
=spec
.service_id
,
2428 self
.cephadm_services
[service_type
].config(spec
)
2431 daemon_spec
= self
.cephadm_services
[service_type
].make_daemon_spec(
2432 host
, daemon_id
, network
, spec
,
2433 # NOTE: this does not consider port conflicts!
2434 ports
=spec
.get_port_start())
2435 self
.log
.debug('Placing %s.%s on host %s' % (
2436 daemon_type
, daemon_id
, host
))
2437 args
.append(daemon_spec
)
2439 # add to daemon list so next name(s) will also be unique
2440 sd
= orchestrator
.DaemonDescription(
2442 daemon_type
=daemon_type
,
2443 daemon_id
=daemon_id
,
2448 def create_func_map(*args
: Any
) -> str:
2449 daemon_spec
= self
.cephadm_services
[daemon_type
].prepare_create(*args
)
2450 return self
.wait_async(CephadmServe(self
)._create
_daemon
(daemon_spec
))
2452 return create_func_map(args
)
2455 def add_daemon(self
, spec
: ServiceSpec
) -> List
[str]:
2458 with orchestrator
.set_exception_subject('service', spec
.service_name(), overwrite
=True):
2459 for d_type
in service_to_daemon_types(spec
.service_type
):
2460 ret
.extend(self
._add
_daemon
(d_type
, spec
))
2462 except OrchestratorError
as e
:
2463 self
.events
.from_orch_error(e
)
2467 def apply_mon(self
, spec
: ServiceSpec
) -> str:
2468 return self
._apply
(spec
)
2470 def _apply(self
, spec
: GenericSpec
) -> str:
2471 if spec
.service_type
== 'host':
2472 return self
._add
_host
(cast(HostSpec
, spec
))
2474 if spec
.service_type
== 'osd':
2475 # _trigger preview refresh needs to be smart and
2476 # should only refresh if a change has been detected
2477 self
._trigger
_preview
_refresh
(specs
=[cast(DriveGroupSpec
, spec
)])
2479 return self
._apply
_service
_spec
(cast(ServiceSpec
, spec
))
2482 def apply_tuned_profiles(self
, specs
: List
[TunedProfileSpec
], no_overwrite
: bool = False) -> str:
2485 if no_overwrite
and self
.tuned_profiles
.exists(spec
.profile_name
):
2486 outs
.append(f
"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2488 self
.tuned_profiles
.add_profile(spec
)
2489 outs
.append(f
'Saved tuned profile {spec.profile_name}')
2490 self
._kick
_serve
_loop
()
2491 return '\n'.join(outs
)
2494 def rm_tuned_profile(self
, profile_name
: str) -> str:
2495 if profile_name
not in self
.tuned_profiles
:
2496 raise OrchestratorError(
2497 f
'Tuned profile {profile_name} does not exist. Nothing to remove.')
2498 self
.tuned_profiles
.rm_profile(profile_name
)
2499 self
._kick
_serve
_loop
()
2500 return f
'Removed tuned profile {profile_name}'
2503 def tuned_profile_ls(self
) -> List
[TunedProfileSpec
]:
2504 return self
.tuned_profiles
.list_profiles()
2507 def tuned_profile_add_setting(self
, profile_name
: str, setting
: str, value
: str) -> str:
2508 if profile_name
not in self
.tuned_profiles
:
2509 raise OrchestratorError(
2510 f
'Tuned profile {profile_name} does not exist. Cannot add setting.')
2511 self
.tuned_profiles
.add_setting(profile_name
, setting
, value
)
2512 self
._kick
_serve
_loop
()
2513 return f
'Added setting {setting} with value {value} to tuned profile {profile_name}'
2516 def tuned_profile_rm_setting(self
, profile_name
: str, setting
: str) -> str:
2517 if profile_name
not in self
.tuned_profiles
:
2518 raise OrchestratorError(
2519 f
'Tuned profile {profile_name} does not exist. Cannot remove setting.')
2520 self
.tuned_profiles
.rm_setting(profile_name
, setting
)
2521 self
._kick
_serve
_loop
()
2522 return f
'Removed setting {setting} from tuned profile {profile_name}'
2524 def set_health_warning(self
, name
: str, summary
: str, count
: int, detail
: List
[str]) -> None:
2525 self
.health_checks
[name
] = {
2526 'severity': 'warning',
2531 self
.set_health_checks(self
.health_checks
)
2533 def remove_health_warning(self
, name
: str) -> None:
2534 if name
in self
.health_checks
:
2535 del self
.health_checks
[name
]
2536 self
.set_health_checks(self
.health_checks
)
2538 def _plan(self
, spec
: ServiceSpec
) -> dict:
2539 if spec
.service_type
== 'osd':
2540 return {'service_name': spec
.service_name(),
2541 'service_type': spec
.service_type
,
2542 'data': self
._preview
_osdspecs
(osdspecs
=[cast(DriveGroupSpec
, spec
)])}
2544 svc
= self
.cephadm_services
[spec
.service_type
]
2545 ha
= HostAssignment(
2547 hosts
=self
.cache
.get_schedulable_hosts(),
2548 unreachable_hosts
=self
.cache
.get_unreachable_hosts(),
2549 draining_hosts
=self
.cache
.get_draining_hosts(),
2550 networks
=self
.cache
.networks
,
2551 daemons
=self
.cache
.get_daemons_by_service(spec
.service_name()),
2552 allow_colo
=svc
.allow_colo(),
2553 rank_map
=self
.spec_store
[spec
.service_name()].rank_map
if svc
.ranked() else None
2556 hosts
, to_add
, to_remove
= ha
.place()
2559 'service_name': spec
.service_name(),
2560 'service_type': spec
.service_type
,
2561 'add': [hs
.hostname
for hs
in to_add
],
2562 'remove': [d
.name() for d
in to_remove
]
2566 def plan(self
, specs
: Sequence
[GenericSpec
]) -> List
:
2567 results
= [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2568 'to the current inventory setup. If any of these conditions change, the \n'
2569 'preview will be invalid. Please make sure to have a minimal \n'
2570 'timeframe between planning and applying the specs.'}]
2571 if any([spec
.service_type
== 'host' for spec
in specs
]):
2572 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2574 results
.append(self
._plan
(cast(ServiceSpec
, spec
)))
2577 def _apply_service_spec(self
, spec
: ServiceSpec
) -> str:
2578 if spec
.placement
.is_empty():
2579 # fill in default placement
2581 'mon': PlacementSpec(count
=5),
2582 'mgr': PlacementSpec(count
=2),
2583 'mds': PlacementSpec(count
=2),
2584 'rgw': PlacementSpec(count
=2),
2585 'ingress': PlacementSpec(count
=2),
2586 'iscsi': PlacementSpec(count
=1),
2587 'rbd-mirror': PlacementSpec(count
=2),
2588 'cephfs-mirror': PlacementSpec(count
=1),
2589 'nfs': PlacementSpec(count
=1),
2590 'grafana': PlacementSpec(count
=1),
2591 'alertmanager': PlacementSpec(count
=1),
2592 'prometheus': PlacementSpec(count
=1),
2593 'node-exporter': PlacementSpec(host_pattern
='*'),
2594 'loki': PlacementSpec(count
=1),
2595 'promtail': PlacementSpec(host_pattern
='*'),
2596 'crash': PlacementSpec(host_pattern
='*'),
2597 'container': PlacementSpec(count
=1),
2598 'snmp-gateway': PlacementSpec(count
=1),
2600 spec
.placement
= defaults
[spec
.service_type
]
2601 elif spec
.service_type
in ['mon', 'mgr'] and \
2602 spec
.placement
.count
is not None and \
2603 spec
.placement
.count
< 1:
2604 raise OrchestratorError('cannot scale %s service below 1' % (
2607 host_count
= len(self
.inventory
.keys())
2608 max_count
= self
.max_count_per_host
2610 if spec
.placement
.count
is not None:
2611 if spec
.service_type
in ['mon', 'mgr']:
2612 if spec
.placement
.count
> max(5, host_count
):
2613 raise OrchestratorError(
2614 (f
'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
2615 elif spec
.service_type
!= 'osd':
2616 if spec
.placement
.count
> (max_count
* host_count
):
2617 raise OrchestratorError((f
'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
2618 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
2620 if spec
.placement
.count_per_host
is not None and spec
.placement
.count_per_host
> max_count
and spec
.service_type
!= 'osd':
2621 raise OrchestratorError((f
'The maximum count_per_host allowed is {max_count}.'
2622 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
2626 hosts
=self
.inventory
.all_specs(), # All hosts, even those without daemon refresh
2627 unreachable_hosts
=self
.cache
.get_unreachable_hosts(),
2628 draining_hosts
=self
.cache
.get_draining_hosts(),
2629 networks
=self
.cache
.networks
,
2630 daemons
=self
.cache
.get_daemons_by_service(spec
.service_name()),
2631 allow_colo
=self
.cephadm_services
[spec
.service_type
].allow_colo(),
2634 self
.log
.info('Saving service %s spec with placement %s' % (
2635 spec
.service_name(), spec
.placement
.pretty_str()))
2636 self
.spec_store
.save(spec
)
2637 self
._kick
_serve
_loop
()
2638 return "Scheduled %s update..." % spec
.service_name()
2641 def apply(self
, specs
: Sequence
[GenericSpec
], no_overwrite
: bool = False) -> List
[str]:
2645 if spec
.service_type
== 'host' and cast(HostSpec
, spec
).hostname
in self
.inventory
:
2646 results
.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
2647 % (cast(HostSpec
, spec
).hostname
, spec
.service_type
))
2649 elif cast(ServiceSpec
, spec
).service_name() in self
.spec_store
:
2650 results
.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
2651 % (cast(ServiceSpec
, spec
).service_name(), cast(ServiceSpec
, spec
).service_name()))
2653 results
.append(self
._apply
(spec
))
2657 def apply_mgr(self
, spec
: ServiceSpec
) -> str:
2658 return self
._apply
(spec
)
2661 def apply_mds(self
, spec
: ServiceSpec
) -> str:
2662 return self
._apply
(spec
)
2665 def apply_rgw(self
, spec
: ServiceSpec
) -> str:
2666 return self
._apply
(spec
)
2669 def apply_ingress(self
, spec
: ServiceSpec
) -> str:
2670 return self
._apply
(spec
)
2673 def apply_iscsi(self
, spec
: ServiceSpec
) -> str:
2674 return self
._apply
(spec
)
2677 def apply_rbd_mirror(self
, spec
: ServiceSpec
) -> str:
2678 return self
._apply
(spec
)
2681 def apply_nfs(self
, spec
: ServiceSpec
) -> str:
2682 return self
._apply
(spec
)
2684 def _get_dashboard_url(self
):
2686 return self
.get('mgr_map').get('services', {}).get('dashboard', '')
2689 def apply_prometheus(self
, spec
: ServiceSpec
) -> str:
2690 return self
._apply
(spec
)
2693 def apply_loki(self
, spec
: ServiceSpec
) -> str:
2694 return self
._apply
(spec
)
2697 def apply_promtail(self
, spec
: ServiceSpec
) -> str:
2698 return self
._apply
(spec
)
2701 def apply_node_exporter(self
, spec
: ServiceSpec
) -> str:
2702 return self
._apply
(spec
)
2705 def apply_crash(self
, spec
: ServiceSpec
) -> str:
2706 return self
._apply
(spec
)
2709 def apply_grafana(self
, spec
: ServiceSpec
) -> str:
2710 return self
._apply
(spec
)
2713 def apply_alertmanager(self
, spec
: ServiceSpec
) -> str:
2714 return self
._apply
(spec
)
2717 def apply_container(self
, spec
: ServiceSpec
) -> str:
2718 return self
._apply
(spec
)
2721 def apply_snmp_gateway(self
, spec
: ServiceSpec
) -> str:
2722 return self
._apply
(spec
)
2725 def upgrade_check(self
, image
: str, version
: str) -> str:
2726 if self
.inventory
.get_host_with_state("maintenance"):
2727 raise OrchestratorError("check aborted - you have hosts in maintenance state")
2730 target_name
= self
.container_image_base
+ ':v' + version
2734 raise OrchestratorError('must specify either image or version')
2736 image_info
= self
.wait_async(CephadmServe(self
)._get
_container
_image
_info
(target_name
))
2738 ceph_image_version
= image_info
.ceph_version
2739 if not ceph_image_version
:
2740 return f
'Unable to extract ceph version from {target_name}.'
2741 if ceph_image_version
.startswith('ceph version '):
2742 ceph_image_version
= ceph_image_version
.split(' ')[2]
2743 version_error
= self
.upgrade
._check
_target
_version
(ceph_image_version
)
2745 return f
'Incompatible upgrade: {version_error}'
2747 self
.log
.debug(f
'image info {image} -> {image_info}')
2749 'target_name': target_name
,
2750 'target_id': image_info
.image_id
,
2751 'target_version': image_info
.ceph_version
,
2752 'needs_update': dict(),
2753 'up_to_date': list(),
2754 'non_ceph_image_daemons': list()
2756 for host
, dm
in self
.cache
.daemons
.items():
2757 for name
, dd
in dm
.items():
2758 if image_info
.image_id
== dd
.container_image_id
:
2759 r
['up_to_date'].append(dd
.name())
2760 elif dd
.daemon_type
in CEPH_IMAGE_TYPES
:
2761 r
['needs_update'][dd
.name()] = {
2762 'current_name': dd
.container_image_name
,
2763 'current_id': dd
.container_image_id
,
2764 'current_version': dd
.version
,
2767 r
['non_ceph_image_daemons'].append(dd
.name())
2768 if self
.use_repo_digest
and image_info
.repo_digests
:
2769 # FIXME: we assume the first digest is the best one to use
2770 r
['target_digest'] = image_info
.repo_digests
[0]
2772 return json
.dumps(r
, indent
=4, sort_keys
=True)
2775 def upgrade_status(self
) -> orchestrator
.UpgradeStatusSpec
:
2776 return self
.upgrade
.upgrade_status()
2779 def upgrade_ls(self
, image
: Optional
[str], tags
: bool, show_all_versions
: Optional
[bool]) -> Dict
[Any
, Any
]:
2780 return self
.upgrade
.upgrade_ls(image
, tags
, show_all_versions
)
2783 def upgrade_start(self
, image
: str, version
: str, daemon_types
: Optional
[List
[str]] = None, host_placement
: Optional
[str] = None,
2784 services
: Optional
[List
[str]] = None, limit
: Optional
[int] = None) -> str:
2785 if self
.inventory
.get_host_with_state("maintenance"):
2786 raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
2787 if daemon_types
is not None and services
is not None:
2788 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
2789 if daemon_types
is not None:
2790 for dtype
in daemon_types
:
2791 if dtype
not in CEPH_UPGRADE_ORDER
:
2792 raise OrchestratorError(f
'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
2793 f
'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
2794 if services
is not None:
2795 for service
in services
:
2796 if service
not in self
.spec_store
:
2797 raise OrchestratorError(f
'Upgrade aborted - Got unknown service name "{service}".\n'
2798 f
'Known services are: {self.spec_store.all_specs.keys()}')
2799 hosts
: Optional
[List
[str]] = None
2800 if host_placement
is not None:
2801 all_hosts
= list(self
.inventory
.all_specs())
2802 placement
= PlacementSpec
.from_string(host_placement
)
2803 hosts
= placement
.filter_matching_hostspecs(all_hosts
)
2805 raise OrchestratorError(
2806 f
'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
2808 if limit
is not None:
2810 raise OrchestratorError(
2811 f
'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
2813 return self
.upgrade
.upgrade_start(image
, version
, daemon_types
, hosts
, services
, limit
)
2816 def upgrade_pause(self
) -> str:
2817 return self
.upgrade
.upgrade_pause()
2820 def upgrade_resume(self
) -> str:
2821 return self
.upgrade
.upgrade_resume()
2824 def upgrade_stop(self
) -> str:
2825 return self
.upgrade
.upgrade_stop()
2828 def remove_osds(self
, osd_ids
: List
[str],
2829 replace
: bool = False,
2830 force
: bool = False,
2831 zap
: bool = False) -> str:
2833 Takes a list of OSDs and schedules them for removal.
2834 The function that takes care of the actual removal is
2835 process_removal_queue().
2838 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_type('osd')
2839 to_remove_daemons
= list()
2840 for daemon
in daemons
:
2841 if daemon
.daemon_id
in osd_ids
:
2842 to_remove_daemons
.append(daemon
)
2843 if not to_remove_daemons
:
2844 return f
"Unable to find OSDs: {osd_ids}"
2846 for daemon
in to_remove_daemons
:
2847 assert daemon
.daemon_id
is not None
2849 self
.to_remove_osds
.enqueue(OSD(osd_id
=int(daemon
.daemon_id
),
2853 hostname
=daemon
.hostname
,
2854 process_started_at
=datetime_now(),
2855 remove_util
=self
.to_remove_osds
.rm_util
))
2856 except NotFoundError
:
2857 return f
"Unable to find OSDs: {osd_ids}"
2859 # trigger the serve loop to initiate the removal
2860 self
._kick
_serve
_loop
()
2861 warning_zap
= "" if zap
else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
2862 "Run the `ceph-volume lvm zap` command with `--destroy`"
2863 " against the VG/LV if you want them to be destroyed.")
2864 return f
"Scheduled OSD(s) for removal.{warning_zap}"
2867 def stop_remove_osds(self
, osd_ids
: List
[str]) -> str:
2869 Stops a `removal` process for a List of OSDs.
2870 This will revert their weight and remove it from the osds_to_remove queue
2872 for osd_id
in osd_ids
:
2874 self
.to_remove_osds
.rm(OSD(osd_id
=int(osd_id
),
2875 remove_util
=self
.to_remove_osds
.rm_util
))
2876 except (NotFoundError
, KeyError, ValueError):
2877 return f
'Unable to find OSD in the queue: {osd_id}'
2879 # trigger the serve loop to halt the removal
2880 self
._kick
_serve
_loop
()
2881 return "Stopped OSD(s) removal"
2884 def remove_osds_status(self
) -> List
[OSD
]:
2886 The CLI call to retrieve an osd removal report
2888 return self
.to_remove_osds
.all_osds()
2891 def drain_host(self
, hostname
, force
=False):
2892 # type: (str, bool) -> str
2894 Drain all daemons from a host.
2895 :param host: host name
2898 # if we drain the last admin host we could end up removing the only instance
2899 # of the config and keyring and cause issues
2901 p
= PlacementSpec(label
='_admin')
2902 admin_hosts
= p
.filter_matching_hostspecs(self
.inventory
.all_specs())
2903 if len(admin_hosts
) == 1 and admin_hosts
[0] == hostname
:
2904 raise OrchestratorValidationError(f
"Host {hostname} is the last host with the '_admin'"
2905 " label.\nDraining this host could cause the removal"
2906 " of the last cluster config/keyring managed by cephadm.\n"
2907 "It is recommended to add the _admin label to another host"
2908 " before completing this operation.\nIf you're certain this is"
2909 " what you want rerun this command with --force.")
2911 self
.add_host_label(hostname
, '_no_schedule')
2913 daemons
: List
[orchestrator
.DaemonDescription
] = self
.cache
.get_daemons_by_host(hostname
)
2915 osds_to_remove
= [d
.daemon_id
for d
in daemons
if d
.daemon_type
== 'osd']
2916 self
.remove_osds(osds_to_remove
)
2919 daemons_table
+= "{:<20} {:<15}\n".format("type", "id")
2920 daemons_table
+= "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
2922 daemons_table
+= "{:<20} {:<15}\n".format(d
.daemon_type
, d
.daemon_id
)
2924 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname
, daemons_table
)
2926 def trigger_connect_dashboard_rgw(self
) -> None:
2927 self
.need_connect_dashboard_rgw
= True