]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import asyncio
2 import json
3 import errno
4 import ipaddress
5 import logging
6 import re
7 import shlex
8 from collections import defaultdict
9 from configparser import ConfigParser
10 from contextlib import contextmanager
11 from functools import wraps
12 from tempfile import TemporaryDirectory, NamedTemporaryFile
13 from threading import Event
14
15 from cephadm.service_discovery import ServiceDiscovery
16
17 import string
18 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
19 Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
20 Awaitable, Iterator
21
22 import datetime
23 import os
24 import random
25 import multiprocessing.pool
26 import subprocess
27 from prettytable import PrettyTable
28
29 from ceph.deployment import inventory
30 from ceph.deployment.drive_group import DriveGroupSpec
31 from ceph.deployment.service_spec import \
32 ServiceSpec, PlacementSpec, \
33 HostPlacementSpec, IngressSpec, \
34 TunedProfileSpec, IscsiServiceSpec
35 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
36 from cephadm.serve import CephadmServe
37 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
38 from cephadm.http_server import CephadmHttpServer
39 from cephadm.agent import CephadmAgentHelpers
40
41
42 from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
43 import orchestrator
44 from orchestrator.module import to_format, Format
45
46 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
47 CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
48 service_to_daemon_types
49 from orchestrator._interface import GenericSpec
50 from orchestrator._interface import daemon_type_to_service
51
52 from . import utils
53 from . import ssh
54 from .migrations import Migrations
55 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
56 RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \
57 CephExporterService
58 from .services.ingress import IngressService
59 from .services.container import CustomContainerService
60 from .services.iscsi import IscsiService
61 from .services.nvmeof import NvmeofService
62 from .services.nfs import NFSService
63 from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
64 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
65 NodeExporterService, SNMPGatewayService, LokiService, PromtailService
66 from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
67 from .schedule import HostAssignment
68 from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
69 ClientKeyringStore, ClientKeyringSpec, TunedProfileStore
70 from .upgrade import CephadmUpgrade
71 from .template import TemplateMgr
72 from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
73 cephadmNoImage, CEPH_UPGRADE_ORDER, SpecialHostLabels
74 from .configchecks import CephadmConfigChecks
75 from .offline_watcher import OfflineHostWatcher
76 from .tuned_profiles import TunedProfileUtils
77
78 try:
79 import asyncssh
80 except ImportError as e:
81 asyncssh = None # type: ignore
82 asyncssh_import_error = str(e)
83
84 logger = logging.getLogger(__name__)
85
86 T = TypeVar('T')
87
88 DEFAULT_SSH_CONFIG = """
89 Host *
90 User root
91 StrictHostKeyChecking no
92 UserKnownHostsFile /dev/null
93 ConnectTimeout=30
94 """
95
96 # cherrypy likes to sys.exit on error. don't let it take us down too!
97
98
99 def os_exit_noop(status: int) -> None:
100 pass
101
102
103 os._exit = os_exit_noop # type: ignore
104
105
106 # Default container images -----------------------------------------------------
107 DEFAULT_IMAGE = 'quay.io/ceph/ceph' # DO NOT ADD TAG TO THIS
108 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
109 DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
110 DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.2'
111 DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
112 DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
113 DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
114 DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
115 DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
116 DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
117 DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
118 DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
119 DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
120 DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29'
121 DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29'
122 # ------------------------------------------------------------------------------
123
124
125 def host_exists(hostname_position: int = 1) -> Callable:
126 """Check that a hostname exists in the inventory"""
127 def inner(func: Callable) -> Callable:
128 @wraps(func)
129 def wrapper(*args: Any, **kwargs: Any) -> Any:
130 this = args[0] # self object
131 hostname = args[hostname_position]
132 if hostname not in this.cache.get_hosts():
133 candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
134 help_msg = f"Did you mean {candidates}?" if candidates else ""
135 raise OrchestratorError(
136 f"Cannot find host '{hostname}' in the inventory. {help_msg}")
137
138 return func(*args, **kwargs)
139 return wrapper
140 return inner
141
142
143 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
144 metaclass=CLICommandMeta):
145
146 _STORE_HOST_PREFIX = "host"
147
148 instance = None
149 NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary]
150 NATIVE_OPTIONS = [] # type: List[Any]
151 MODULE_OPTIONS = [
152 Option(
153 'ssh_config_file',
154 type='str',
155 default=None,
156 desc='customized SSH config file to connect to managed hosts',
157 ),
158 Option(
159 'device_cache_timeout',
160 type='secs',
161 default=30 * 60,
162 desc='seconds to cache device inventory',
163 ),
164 Option(
165 'device_enhanced_scan',
166 type='bool',
167 default=False,
168 desc='Use libstoragemgmt during device scans',
169 ),
170 Option(
171 'inventory_list_all',
172 type='bool',
173 default=False,
174 desc='Whether ceph-volume inventory should report '
175 'more devices (mostly mappers (LVs / mpaths), partitions...)',
176 ),
177 Option(
178 'daemon_cache_timeout',
179 type='secs',
180 default=10 * 60,
181 desc='seconds to cache service (daemon) inventory',
182 ),
183 Option(
184 'facts_cache_timeout',
185 type='secs',
186 default=1 * 60,
187 desc='seconds to cache host facts data',
188 ),
189 Option(
190 'host_check_interval',
191 type='secs',
192 default=10 * 60,
193 desc='how frequently to perform a host check',
194 ),
195 Option(
196 'mode',
197 type='str',
198 enum_allowed=['root', 'cephadm-package'],
199 default='root',
200 desc='mode for remote execution of cephadm',
201 ),
202 Option(
203 'container_image_base',
204 default=DEFAULT_IMAGE,
205 desc='Container image name, without the tag',
206 runtime=True,
207 ),
208 Option(
209 'container_image_prometheus',
210 default=DEFAULT_PROMETHEUS_IMAGE,
211 desc='Prometheus container image',
212 ),
213 Option(
214 'container_image_nvmeof',
215 default=DEFAULT_NVMEOF_IMAGE,
216 desc='Nvme-of container image',
217 ),
218 Option(
219 'container_image_grafana',
220 default=DEFAULT_GRAFANA_IMAGE,
221 desc='Prometheus container image',
222 ),
223 Option(
224 'container_image_alertmanager',
225 default=DEFAULT_ALERT_MANAGER_IMAGE,
226 desc='Prometheus container image',
227 ),
228 Option(
229 'container_image_node_exporter',
230 default=DEFAULT_NODE_EXPORTER_IMAGE,
231 desc='Prometheus container image',
232 ),
233 Option(
234 'container_image_loki',
235 default=DEFAULT_LOKI_IMAGE,
236 desc='Loki container image',
237 ),
238 Option(
239 'container_image_promtail',
240 default=DEFAULT_PROMTAIL_IMAGE,
241 desc='Promtail container image',
242 ),
243 Option(
244 'container_image_haproxy',
245 default=DEFAULT_HAPROXY_IMAGE,
246 desc='HAproxy container image',
247 ),
248 Option(
249 'container_image_keepalived',
250 default=DEFAULT_KEEPALIVED_IMAGE,
251 desc='Keepalived container image',
252 ),
253 Option(
254 'container_image_snmp_gateway',
255 default=DEFAULT_SNMP_GATEWAY_IMAGE,
256 desc='SNMP Gateway container image',
257 ),
258 Option(
259 'container_image_elasticsearch',
260 default=DEFAULT_ELASTICSEARCH_IMAGE,
261 desc='elasticsearch container image',
262 ),
263 Option(
264 'container_image_jaeger_agent',
265 default=DEFAULT_JAEGER_AGENT_IMAGE,
266 desc='Jaeger agent container image',
267 ),
268 Option(
269 'container_image_jaeger_collector',
270 default=DEFAULT_JAEGER_COLLECTOR_IMAGE,
271 desc='Jaeger collector container image',
272 ),
273 Option(
274 'container_image_jaeger_query',
275 default=DEFAULT_JAEGER_QUERY_IMAGE,
276 desc='Jaeger query container image',
277 ),
278 Option(
279 'warn_on_stray_hosts',
280 type='bool',
281 default=True,
282 desc='raise a health warning if daemons are detected on a host '
283 'that is not managed by cephadm',
284 ),
285 Option(
286 'warn_on_stray_daemons',
287 type='bool',
288 default=True,
289 desc='raise a health warning if daemons are detected '
290 'that are not managed by cephadm',
291 ),
292 Option(
293 'warn_on_failed_host_check',
294 type='bool',
295 default=True,
296 desc='raise a health warning if the host check fails',
297 ),
298 Option(
299 'log_to_cluster',
300 type='bool',
301 default=True,
302 desc='log to the "cephadm" cluster log channel"',
303 ),
304 Option(
305 'allow_ptrace',
306 type='bool',
307 default=False,
308 desc='allow SYS_PTRACE capability on ceph containers',
309 long_desc='The SYS_PTRACE capability is needed to attach to a '
310 'process with gdb or strace. Enabling this options '
311 'can allow debugging daemons that encounter problems '
312 'at runtime.',
313 ),
314 Option(
315 'container_init',
316 type='bool',
317 default=True,
318 desc='Run podman/docker with `--init`'
319 ),
320 Option(
321 'prometheus_alerts_path',
322 type='str',
323 default='/etc/prometheus/ceph/ceph_default_alerts.yml',
324 desc='location of alerts to include in prometheus deployments',
325 ),
326 Option(
327 'migration_current',
328 type='int',
329 default=None,
330 desc='internal - do not modify',
331 # used to track spec and other data migrations.
332 ),
333 Option(
334 'config_dashboard',
335 type='bool',
336 default=True,
337 desc='manage configs like API endpoints in Dashboard.'
338 ),
339 Option(
340 'manage_etc_ceph_ceph_conf',
341 type='bool',
342 default=False,
343 desc='Manage and own /etc/ceph/ceph.conf on the hosts.',
344 ),
345 Option(
346 'manage_etc_ceph_ceph_conf_hosts',
347 type='str',
348 default='*',
349 desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
350 ),
351 # not used anymore
352 Option(
353 'registry_url',
354 type='str',
355 default=None,
356 desc='Registry url for login purposes. This is not the default registry'
357 ),
358 Option(
359 'registry_username',
360 type='str',
361 default=None,
362 desc='Custom repository username. Only used for logging into a registry.'
363 ),
364 Option(
365 'registry_password',
366 type='str',
367 default=None,
368 desc='Custom repository password. Only used for logging into a registry.'
369 ),
370 ####
371 Option(
372 'registry_insecure',
373 type='bool',
374 default=False,
375 desc='Registry is to be considered insecure (no TLS available). Only for development purposes.'
376 ),
377 Option(
378 'use_repo_digest',
379 type='bool',
380 default=True,
381 desc='Automatically convert image tags to image digest. Make sure all daemons use the same image',
382 ),
383 Option(
384 'config_checks_enabled',
385 type='bool',
386 default=False,
387 desc='Enable or disable the cephadm configuration analysis',
388 ),
389 Option(
390 'default_registry',
391 type='str',
392 default='docker.io',
393 desc='Search-registry to which we should normalize unqualified image names. '
394 'This is not the default registry',
395 ),
396 Option(
397 'max_count_per_host',
398 type='int',
399 default=10,
400 desc='max number of daemons per service per host',
401 ),
402 Option(
403 'autotune_memory_target_ratio',
404 type='float',
405 default=.7,
406 desc='ratio of total system memory to divide amongst autotuned daemons'
407 ),
408 Option(
409 'autotune_interval',
410 type='secs',
411 default=10 * 60,
412 desc='how frequently to autotune daemon memory'
413 ),
414 Option(
415 'use_agent',
416 type='bool',
417 default=False,
418 desc='Use cephadm agent on each host to gather and send metadata'
419 ),
420 Option(
421 'agent_refresh_rate',
422 type='secs',
423 default=20,
424 desc='How often agent on each host will try to gather and send metadata'
425 ),
426 Option(
427 'agent_starting_port',
428 type='int',
429 default=4721,
430 desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
431 ),
432 Option(
433 'agent_down_multiplier',
434 type='float',
435 default=3.0,
436 desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
437 ),
438 Option(
439 'max_osd_draining_count',
440 type='int',
441 default=10,
442 desc='max number of osds that will be drained simultaneously when osds are removed'
443 ),
444 Option(
445 'service_discovery_port',
446 type='int',
447 default=8765,
448 desc='cephadm service discovery port'
449 ),
450 Option(
451 'cgroups_split',
452 type='bool',
453 default=True,
454 desc='Pass --cgroups=split when cephadm creates containers (currently podman only)'
455 ),
456 Option(
457 'log_refresh_metadata',
458 type='bool',
459 default=False,
460 desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
461 ),
462 Option(
463 'secure_monitoring_stack',
464 type='bool',
465 default=False,
466 desc='Enable TLS security for all the monitoring stack daemons'
467 ),
468 Option(
469 'default_cephadm_command_timeout',
470 type='secs',
471 default=15 * 60,
472 desc='Default timeout applied to cephadm commands run directly on '
473 'the host (in seconds)'
474 ),
475 ]
476
477 def __init__(self, *args: Any, **kwargs: Any):
478 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
479 self._cluster_fsid: str = self.get('mon_map')['fsid']
480 self.last_monmap: Optional[datetime.datetime] = None
481
482 # for serve()
483 self.run = True
484 self.event = Event()
485
486 self.ssh = ssh.SSHManager(self)
487
488 if self.get_store('pause'):
489 self.paused = True
490 else:
491 self.paused = False
492
493 # for mypy which does not run the code
494 if TYPE_CHECKING:
495 self.ssh_config_file = None # type: Optional[str]
496 self.device_cache_timeout = 0
497 self.daemon_cache_timeout = 0
498 self.facts_cache_timeout = 0
499 self.host_check_interval = 0
500 self.max_count_per_host = 0
501 self.mode = ''
502 self.container_image_base = ''
503 self.container_image_prometheus = ''
504 self.container_image_nvmeof = ''
505 self.container_image_grafana = ''
506 self.container_image_alertmanager = ''
507 self.container_image_node_exporter = ''
508 self.container_image_loki = ''
509 self.container_image_promtail = ''
510 self.container_image_haproxy = ''
511 self.container_image_keepalived = ''
512 self.container_image_snmp_gateway = ''
513 self.container_image_elasticsearch = ''
514 self.container_image_jaeger_agent = ''
515 self.container_image_jaeger_collector = ''
516 self.container_image_jaeger_query = ''
517 self.warn_on_stray_hosts = True
518 self.warn_on_stray_daemons = True
519 self.warn_on_failed_host_check = True
520 self.allow_ptrace = False
521 self.container_init = True
522 self.prometheus_alerts_path = ''
523 self.migration_current: Optional[int] = None
524 self.config_dashboard = True
525 self.manage_etc_ceph_ceph_conf = True
526 self.manage_etc_ceph_ceph_conf_hosts = '*'
527 self.registry_url: Optional[str] = None
528 self.registry_username: Optional[str] = None
529 self.registry_password: Optional[str] = None
530 self.registry_insecure: bool = False
531 self.use_repo_digest = True
532 self.default_registry = ''
533 self.autotune_memory_target_ratio = 0.0
534 self.autotune_interval = 0
535 self.ssh_user: Optional[str] = None
536 self._ssh_options: Optional[str] = None
537 self.tkey = NamedTemporaryFile()
538 self.ssh_config_fname: Optional[str] = None
539 self.ssh_config: Optional[str] = None
540 self._temp_files: List = []
541 self.ssh_key: Optional[str] = None
542 self.ssh_pub: Optional[str] = None
543 self.ssh_cert: Optional[str] = None
544 self.use_agent = False
545 self.agent_refresh_rate = 0
546 self.agent_down_multiplier = 0.0
547 self.agent_starting_port = 0
548 self.service_discovery_port = 0
549 self.secure_monitoring_stack = False
550 self.apply_spec_fails: List[Tuple[str, str]] = []
551 self.max_osd_draining_count = 10
552 self.device_enhanced_scan = False
553 self.inventory_list_all = False
554 self.cgroups_split = True
555 self.log_refresh_metadata = False
556 self.default_cephadm_command_timeout = 0
557
558 self.notify(NotifyType.mon_map, None)
559 self.config_notify()
560
561 path = self.get_ceph_option('cephadm_path')
562 try:
563 assert isinstance(path, str)
564 with open(path, 'rb') as f:
565 self._cephadm = f.read()
566 except (IOError, TypeError) as e:
567 raise RuntimeError("unable to read cephadm at '%s': %s" % (
568 path, str(e)))
569
570 self.cephadm_binary_path = self._get_cephadm_binary_path()
571
572 self._worker_pool = multiprocessing.pool.ThreadPool(10)
573
574 self.ssh._reconfig_ssh()
575
576 CephadmOrchestrator.instance = self
577
578 self.upgrade = CephadmUpgrade(self)
579
580 self.health_checks: Dict[str, dict] = {}
581
582 self.inventory = Inventory(self)
583
584 self.cache = HostCache(self)
585 self.cache.load()
586
587 self.agent_cache = AgentCache(self)
588 self.agent_cache.load()
589
590 self.to_remove_osds = OSDRemovalQueue(self)
591 self.to_remove_osds.load_from_store()
592
593 self.spec_store = SpecStore(self)
594 self.spec_store.load()
595
596 self.keys = ClientKeyringStore(self)
597 self.keys.load()
598
599 self.tuned_profiles = TunedProfileStore(self)
600 self.tuned_profiles.load()
601
602 self.tuned_profile_utils = TunedProfileUtils(self)
603
604 # ensure the host lists are in sync
605 for h in self.inventory.keys():
606 if h not in self.cache.daemons:
607 self.cache.prime_empty_host(h)
608 for h in self.cache.get_hosts():
609 if h not in self.inventory:
610 self.cache.rm_host(h)
611
612 # in-memory only.
613 self.events = EventStore(self)
614 self.offline_hosts: Set[str] = set()
615
616 self.migration = Migrations(self)
617
618 _service_classes: Sequence[Type[CephadmService]] = [
619 OSDService, NFSService, MonService, MgrService, MdsService,
620 RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
621 PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
622 IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
623 CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
624 JaegerQueryService, JaegerAgentService, JaegerCollectorService
625 ]
626
627 # https://github.com/python/mypy/issues/8993
628 self.cephadm_services: Dict[str, CephadmService] = {
629 cls.TYPE: cls(self) for cls in _service_classes} # type: ignore
630
631 self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
632 self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
633 self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
634 self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
635
636 self.scheduled_async_actions: List[Callable] = []
637
638 self.template = TemplateMgr(self)
639
640 self.requires_post_actions: Set[str] = set()
641 self.need_connect_dashboard_rgw = False
642
643 self.config_checker = CephadmConfigChecks(self)
644
645 self.http_server = CephadmHttpServer(self)
646 self.http_server.start()
647 self.agent_helpers = CephadmAgentHelpers(self)
648 if self.use_agent:
649 self.agent_helpers._apply_agent()
650
651 self.offline_watcher = OfflineHostWatcher(self)
652 self.offline_watcher.start()
653
654 def shutdown(self) -> None:
655 self.log.debug('shutdown')
656 self._worker_pool.close()
657 self._worker_pool.join()
658 self.http_server.shutdown()
659 self.offline_watcher.shutdown()
660 self.run = False
661 self.event.set()
662
663 def _get_cephadm_service(self, service_type: str) -> CephadmService:
664 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
665 return self.cephadm_services[service_type]
666
667 def _get_cephadm_binary_path(self) -> str:
668 import hashlib
669 m = hashlib.sha256()
670 m.update(self._cephadm)
671 return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
672
673 def _kick_serve_loop(self) -> None:
674 self.log.debug('_kick_serve_loop')
675 self.event.set()
676
677 def serve(self) -> None:
678 """
679 The main loop of cephadm.
680
681 A command handler will typically change the declarative state
682 of cephadm. This loop will then attempt to apply this new state.
683 """
684 # for ssh in serve
685 self.event_loop = ssh.EventLoopThread()
686
687 serve = CephadmServe(self)
688 serve.serve()
689
690 def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
691 if not timeout:
692 timeout = self.default_cephadm_command_timeout
693 # put a lower bound of 60 seconds in case users
694 # accidentally set it to something unreasonable.
695 # For example if they though it was in minutes
696 # rather than seconds
697 if timeout < 60:
698 self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
699 timeout = 60
700 return self.event_loop.get_result(coro, timeout)
701
702 @contextmanager
703 def async_timeout_handler(self, host: Optional[str] = '',
704 cmd: Optional[str] = '',
705 timeout: Optional[int] = None) -> Iterator[None]:
706 # this is meant to catch asyncio.TimeoutError and convert it into an
707 # OrchestratorError which much of the cephadm codebase is better equipped to handle.
708 # If the command being run, the host it is run on, or the timeout being used
709 # are provided, that will be included in the OrchestratorError's message
710 try:
711 yield
712 except asyncio.TimeoutError:
713 err_str: str = ''
714 if cmd:
715 err_str = f'Command "{cmd}" timed out '
716 else:
717 err_str = 'Command timed out '
718 if host:
719 err_str += f'on host {host} '
720 if timeout:
721 err_str += f'(non-default {timeout} second timeout)'
722 else:
723 err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)')
724 raise OrchestratorError(err_str)
725
726 def set_container_image(self, entity: str, image: str) -> None:
727 self.check_mon_command({
728 'prefix': 'config set',
729 'name': 'container_image',
730 'value': image,
731 'who': entity,
732 })
733
734 def config_notify(self) -> None:
735 """
736 This method is called whenever one of our config options is changed.
737
738 TODO: this method should be moved into mgr_module.py
739 """
740 for opt in self.MODULE_OPTIONS:
741 setattr(self,
742 opt['name'], # type: ignore
743 self.get_module_option(opt['name'])) # type: ignore
744 self.log.debug(' mgr option %s = %s',
745 opt['name'], getattr(self, opt['name'])) # type: ignore
746 for opt in self.NATIVE_OPTIONS:
747 setattr(self,
748 opt, # type: ignore
749 self.get_ceph_option(opt))
750 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
751
752 self.event.set()
753
754 def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
755 if notify_type == NotifyType.mon_map:
756 # get monmap mtime so we can refresh configs when mons change
757 monmap = self.get('mon_map')
758 self.last_monmap = str_to_datetime(monmap['modified'])
759 if self.last_monmap and self.last_monmap > datetime_now():
760 self.last_monmap = None # just in case clocks are skewed
761 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
762 # getattr, due to notify() being called before config_notify()
763 self._kick_serve_loop()
764 if notify_type == NotifyType.pg_summary:
765 self._trigger_osd_removal()
766
767 def _trigger_osd_removal(self) -> None:
768 remove_queue = self.to_remove_osds.as_osd_ids()
769 if not remove_queue:
770 return
771 data = self.get("osd_stats")
772 for osd in data.get('osd_stats', []):
773 if osd.get('num_pgs') == 0:
774 # if _ANY_ osd that is currently in the queue appears to be empty,
775 # start the removal process
776 if int(osd.get('osd')) in remove_queue:
777 self.log.debug('Found empty osd. Starting removal process')
778 # if the osd that is now empty is also part of the removal queue
779 # start the process
780 self._kick_serve_loop()
781
782 def pause(self) -> None:
783 if not self.paused:
784 self.log.info('Paused')
785 self.set_store('pause', 'true')
786 self.paused = True
787 # wake loop so we update the health status
788 self._kick_serve_loop()
789
790 def resume(self) -> None:
791 if self.paused:
792 self.log.info('Resumed')
793 self.paused = False
794 self.set_store('pause', None)
795 # unconditionally wake loop so that 'orch resume' can be used to kick
796 # cephadm
797 self._kick_serve_loop()
798
799 def get_unique_name(
800 self,
801 daemon_type: str,
802 host: str,
803 existing: List[orchestrator.DaemonDescription],
804 prefix: Optional[str] = None,
805 forcename: Optional[str] = None,
806 rank: Optional[int] = None,
807 rank_generation: Optional[int] = None,
808 ) -> str:
809 """
810 Generate a unique random service name
811 """
812 suffix = daemon_type not in [
813 'mon', 'crash', 'ceph-exporter',
814 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
815 'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
816 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
817 ]
818 if forcename:
819 if len([d for d in existing if d.daemon_id == forcename]):
820 raise orchestrator.OrchestratorValidationError(
821 f'name {daemon_type}.{forcename} already in use')
822 return forcename
823
824 if '.' in host:
825 host = host.split('.')[0]
826 while True:
827 if prefix:
828 name = prefix + '.'
829 else:
830 name = ''
831 if rank is not None and rank_generation is not None:
832 name += f'{rank}.{rank_generation}.'
833 name += host
834 if suffix:
835 name += '.' + ''.join(random.choice(string.ascii_lowercase)
836 for _ in range(6))
837 if len([d for d in existing if d.daemon_id == name]):
838 if not suffix:
839 raise orchestrator.OrchestratorValidationError(
840 f'name {daemon_type}.{name} already in use')
841 self.log.debug('name %s exists, trying again', name)
842 continue
843 return name
844
845 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
846 if ssh_config is None or len(ssh_config.strip()) == 0:
847 raise OrchestratorValidationError('ssh_config cannot be empty')
848 # StrictHostKeyChecking is [yes|no] ?
849 res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
850 if not res:
851 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
852 for s in res:
853 if 'ask' in s.lower():
854 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
855
856 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
857 if not os.path.isfile(ssh_config_fname):
858 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
859 ssh_config_fname))
860
861 def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
862 def _as_datetime(value: Optional[str]) -> Optional[datetime.datetime]:
863 return str_to_datetime(value) if value is not None else None
864
865 dm = {}
866 for d in ls:
867 if not d['style'].startswith('cephadm'):
868 continue
869 if d['fsid'] != self._cluster_fsid:
870 continue
871 if '.' not in d['name']:
872 continue
873 daemon_type = d['name'].split('.')[0]
874 if daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
875 logger.warning(f"Found unknown daemon type {daemon_type} on host {host}")
876 continue
877
878 container_id = d.get('container_id')
879 if container_id:
880 # shorten the hash
881 container_id = container_id[0:12]
882 rank = int(d['rank']) if d.get('rank') is not None else None
883 rank_generation = int(d['rank_generation']) if d.get(
884 'rank_generation') is not None else None
885 status, status_desc = None, 'unknown'
886 if 'state' in d:
887 status_desc = d['state']
888 status = {
889 'running': DaemonDescriptionStatus.running,
890 'stopped': DaemonDescriptionStatus.stopped,
891 'error': DaemonDescriptionStatus.error,
892 'unknown': DaemonDescriptionStatus.error,
893 }[d['state']]
894 sd = orchestrator.DaemonDescription(
895 daemon_type=daemon_type,
896 daemon_id='.'.join(d['name'].split('.')[1:]),
897 hostname=host,
898 container_id=container_id,
899 container_image_id=d.get('container_image_id'),
900 container_image_name=d.get('container_image_name'),
901 container_image_digests=d.get('container_image_digests'),
902 version=d.get('version'),
903 status=status,
904 status_desc=status_desc,
905 created=_as_datetime(d.get('created')),
906 started=_as_datetime(d.get('started')),
907 last_refresh=datetime_now(),
908 last_configured=_as_datetime(d.get('last_configured')),
909 last_deployed=_as_datetime(d.get('last_deployed')),
910 memory_usage=d.get('memory_usage'),
911 memory_request=d.get('memory_request'),
912 memory_limit=d.get('memory_limit'),
913 cpu_percentage=d.get('cpu_percentage'),
914 service_name=d.get('service_name'),
915 ports=d.get('ports'),
916 ip=d.get('ip'),
917 deployed_by=d.get('deployed_by'),
918 rank=rank,
919 rank_generation=rank_generation,
920 extra_container_args=d.get('extra_container_args'),
921 extra_entrypoint_args=d.get('extra_entrypoint_args'),
922 )
923 dm[sd.name()] = sd
924 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
925 self.cache.update_host_daemons(host, dm)
926 self.cache.save_host(host)
927 return None
928
929 def update_watched_hosts(self) -> None:
930 # currently, we are watching hosts with nfs daemons
931 hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
932 ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
933 self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
934
935 def offline_hosts_remove(self, host: str) -> None:
936 if host in self.offline_hosts:
937 self.offline_hosts.remove(host)
938
939 def update_failed_daemon_health_check(self) -> None:
940 failed_daemons = []
941 for dd in self.cache.get_error_daemons():
942 if dd.daemon_type != 'agent': # agents tracked by CEPHADM_AGENT_DOWN
943 failed_daemons.append('daemon %s on %s is in %s state' % (
944 dd.name(), dd.hostname, dd.status_desc
945 ))
946 self.remove_health_warning('CEPHADM_FAILED_DAEMON')
947 if failed_daemons:
948 self.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(
949 failed_daemons), failed_daemons)
950
951 @staticmethod
952 def can_run() -> Tuple[bool, str]:
953 if asyncssh is not None:
954 return True, ""
955 else:
956 return False, "loading asyncssh library:{}".format(
957 asyncssh_import_error)
958
959 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
960 """
961 The cephadm orchestrator is always available.
962 """
963 ok, err = self.can_run()
964 if not ok:
965 return ok, err, {}
966 if not self.ssh_key or not self.ssh_pub:
967 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
968
969 # mypy is unable to determine type for _processes since it's private
970 worker_count: int = self._worker_pool._processes # type: ignore
971 ret = {
972 "workers": worker_count,
973 "paused": self.paused,
974 }
975
976 return True, err, ret
977
978 def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
979 self.set_store(what, new)
980 self.ssh._reconfig_ssh()
981 if self.cache.get_hosts():
982 # Can't check anything without hosts
983 host = self.cache.get_hosts()[0]
984 r = CephadmServe(self)._check_host(host)
985 if r is not None:
986 # connection failed reset user
987 self.set_store(what, old)
988 self.ssh._reconfig_ssh()
989 raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
990 self.log.info(f'Set ssh {what}')
991
992 @orchestrator._cli_write_command(
993 prefix='cephadm set-ssh-config')
994 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
995 """
996 Set the ssh_config file (use -i <ssh_config>)
997 """
998 # Set an ssh_config file provided from stdin
999
1000 old = self.ssh_config
1001 if inbuf == old:
1002 return 0, "value unchanged", ""
1003 self.validate_ssh_config_content(inbuf)
1004 self._validate_and_set_ssh_val('ssh_config', inbuf, old)
1005 return 0, "", ""
1006
1007 @orchestrator._cli_write_command('cephadm clear-ssh-config')
1008 def _clear_ssh_config(self) -> Tuple[int, str, str]:
1009 """
1010 Clear the ssh_config file
1011 """
1012 # Clear the ssh_config file provided from stdin
1013 self.set_store("ssh_config", None)
1014 self.ssh_config_tmp = None
1015 self.log.info('Cleared ssh_config')
1016 self.ssh._reconfig_ssh()
1017 return 0, "", ""
1018
1019 @orchestrator._cli_read_command('cephadm get-ssh-config')
1020 def _get_ssh_config(self) -> HandleCommandResult:
1021 """
1022 Returns the ssh config as used by cephadm
1023 """
1024 if self.ssh_config_file:
1025 self.validate_ssh_config_fname(self.ssh_config_file)
1026 with open(self.ssh_config_file) as f:
1027 return HandleCommandResult(stdout=f.read())
1028 ssh_config = self.get_store("ssh_config")
1029 if ssh_config:
1030 return HandleCommandResult(stdout=ssh_config)
1031 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
1032
1033 @orchestrator._cli_write_command('cephadm generate-key')
1034 def _generate_key(self) -> Tuple[int, str, str]:
1035 """
1036 Generate a cluster SSH key (if not present)
1037 """
1038 if not self.ssh_pub or not self.ssh_key:
1039 self.log.info('Generating ssh key...')
1040 tmp_dir = TemporaryDirectory()
1041 path = tmp_dir.name + '/key'
1042 try:
1043 subprocess.check_call([
1044 '/usr/bin/ssh-keygen',
1045 '-C', 'ceph-%s' % self._cluster_fsid,
1046 '-N', '',
1047 '-f', path
1048 ])
1049 with open(path, 'r') as f:
1050 secret = f.read()
1051 with open(path + '.pub', 'r') as f:
1052 pub = f.read()
1053 finally:
1054 os.unlink(path)
1055 os.unlink(path + '.pub')
1056 tmp_dir.cleanup()
1057 self.set_store('ssh_identity_key', secret)
1058 self.set_store('ssh_identity_pub', pub)
1059 self.ssh._reconfig_ssh()
1060 return 0, '', ''
1061
1062 @orchestrator._cli_write_command(
1063 'cephadm set-priv-key')
1064 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1065 """Set cluster SSH private key (use -i <private_key>)"""
1066 if inbuf is None or len(inbuf) == 0:
1067 return -errno.EINVAL, "", "empty private ssh key provided"
1068 old = self.ssh_key
1069 if inbuf == old:
1070 return 0, "value unchanged", ""
1071 self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old)
1072 self.log.info('Set ssh private key')
1073 return 0, "", ""
1074
1075 @orchestrator._cli_write_command(
1076 'cephadm set-pub-key')
1077 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1078 """Set cluster SSH public key (use -i <public_key>)"""
1079 if inbuf is None or len(inbuf) == 0:
1080 return -errno.EINVAL, "", "empty public ssh key provided"
1081 old = self.ssh_pub
1082 if inbuf == old:
1083 return 0, "value unchanged", ""
1084 self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
1085 return 0, "", ""
1086
1087 @orchestrator._cli_write_command(
1088 'cephadm set-signed-cert')
1089 def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1090 """Set a signed cert if CA signed keys are being used (use -i <cert_filename>)"""
1091 if inbuf is None or len(inbuf) == 0:
1092 return -errno.EINVAL, "", "empty cert file provided"
1093 old = self.ssh_cert
1094 if inbuf == old:
1095 return 0, "value unchanged", ""
1096 self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old)
1097 return 0, "", ""
1098
1099 @orchestrator._cli_write_command(
1100 'cephadm clear-key')
1101 def _clear_key(self) -> Tuple[int, str, str]:
1102 """Clear cluster SSH key"""
1103 self.set_store('ssh_identity_key', None)
1104 self.set_store('ssh_identity_pub', None)
1105 self.set_store('ssh_identity_cert', None)
1106 self.ssh._reconfig_ssh()
1107 self.log.info('Cleared cluster SSH key')
1108 return 0, '', ''
1109
1110 @orchestrator._cli_read_command(
1111 'cephadm get-pub-key')
1112 def _get_pub_key(self) -> Tuple[int, str, str]:
1113 """Show SSH public key for connecting to cluster hosts"""
1114 if self.ssh_pub:
1115 return 0, self.ssh_pub, ''
1116 else:
1117 return -errno.ENOENT, '', 'No cluster SSH key defined'
1118
1119 @orchestrator._cli_read_command(
1120 'cephadm get-signed-cert')
1121 def _get_signed_cert(self) -> Tuple[int, str, str]:
1122 """Show SSH signed cert for connecting to cluster hosts using CA signed keys"""
1123 if self.ssh_cert:
1124 return 0, self.ssh_cert, ''
1125 else:
1126 return -errno.ENOENT, '', 'No signed cert defined'
1127
1128 @orchestrator._cli_read_command(
1129 'cephadm get-user')
1130 def _get_user(self) -> Tuple[int, str, str]:
1131 """
1132 Show user for SSHing to cluster hosts
1133 """
1134 if self.ssh_user is None:
1135 return -errno.ENOENT, '', 'No cluster SSH user configured'
1136 else:
1137 return 0, self.ssh_user, ''
1138
1139 @orchestrator._cli_read_command(
1140 'cephadm set-user')
1141 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
1142 """
1143 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
1144 """
1145 current_user = self.ssh_user
1146 if user == current_user:
1147 return 0, "value unchanged", ""
1148
1149 self._validate_and_set_ssh_val('ssh_user', user, current_user)
1150 current_ssh_config = self._get_ssh_config()
1151 new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout)
1152 self._set_ssh_config(new_ssh_config)
1153
1154 msg = 'ssh user set to %s' % user
1155 if user != 'root':
1156 msg += '. sudo will be used'
1157 self.log.info(msg)
1158 return 0, msg, ''
1159
1160 @orchestrator._cli_read_command(
1161 'cephadm registry-login')
1162 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1163 """
1164 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1165 """
1166 # if password not given in command line, get it through file input
1167 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
1168 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
1169 "or -i <login credentials json file>")
1170 elif (url and username and password):
1171 registry_json = {'url': url, 'username': username, 'password': password}
1172 else:
1173 assert isinstance(inbuf, str)
1174 registry_json = json.loads(inbuf)
1175 if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json:
1176 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
1177 "Please setup json file as\n"
1178 "{\n"
1179 " \"url\": \"REGISTRY_URL\",\n"
1180 " \"username\": \"REGISTRY_USERNAME\",\n"
1181 " \"password\": \"REGISTRY_PASSWORD\"\n"
1182 "}\n")
1183
1184 # verify login info works by attempting login on random host
1185 host = None
1186 for host_name in self.inventory.keys():
1187 host = host_name
1188 break
1189 if not host:
1190 raise OrchestratorError('no hosts defined')
1191 with self.async_timeout_handler(host, 'cephadm registry-login'):
1192 r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
1193 if r is not None:
1194 return 1, '', r
1195 # if logins succeeded, store info
1196 self.log.debug("Host logins successful. Storing login info.")
1197 self.set_store('registry_credentials', json.dumps(registry_json))
1198 # distribute new login info to all hosts
1199 self.cache.distribute_new_registry_login_info()
1200 return 0, "registry login scheduled", ''
1201
1202 @orchestrator._cli_read_command('cephadm check-host')
1203 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
1204 """Check whether we can access and manage a remote host"""
1205 try:
1206 with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
1207 out, err, code = self.wait_async(
1208 CephadmServe(self)._run_cephadm(
1209 host, cephadmNoImage, 'check-host', ['--expect-hostname', host],
1210 addr=addr, error_ok=True, no_fsid=True))
1211 if code:
1212 return 1, '', ('check-host failed:\n' + '\n'.join(err))
1213 except ssh.HostConnectionError as e:
1214 self.log.exception(
1215 f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
1216 return 1, '', ('check-host failed:\n'
1217 + f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
1218 except OrchestratorError:
1219 self.log.exception(f"check-host failed for '{host}'")
1220 return 1, '', ('check-host failed:\n'
1221 + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
1222 # if we have an outstanding health alert for this host, give the
1223 # serve thread a kick
1224 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1225 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1226 if item.startswith('host %s ' % host):
1227 self.event.set()
1228 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
1229
1230 @orchestrator._cli_read_command(
1231 'cephadm prepare-host')
1232 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
1233 """Prepare a remote host for use with cephadm"""
1234 with self.async_timeout_handler(host, 'cephadm prepare-host'):
1235 out, err, code = self.wait_async(
1236 CephadmServe(self)._run_cephadm(
1237 host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host],
1238 addr=addr, error_ok=True, no_fsid=True))
1239 if code:
1240 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
1241 # if we have an outstanding health alert for this host, give the
1242 # serve thread a kick
1243 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1244 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1245 if item.startswith('host %s ' % host):
1246 self.event.set()
1247 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
1248
1249 @orchestrator._cli_write_command(
1250 prefix='cephadm set-extra-ceph-conf')
1251 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
1252 """
1253 Text that is appended to all daemon's ceph.conf.
1254 Mainly a workaround, till `config generate-minimal-conf` generates
1255 a complete ceph.conf.
1256
1257 Warning: this is a dangerous operation.
1258 """
1259 if inbuf:
1260 # sanity check.
1261 cp = ConfigParser()
1262 cp.read_string(inbuf, source='<infile>')
1263
1264 self.set_store("extra_ceph_conf", json.dumps({
1265 'conf': inbuf,
1266 'last_modified': datetime_to_str(datetime_now())
1267 }))
1268 self.log.info('Set extra_ceph_conf')
1269 self._kick_serve_loop()
1270 return HandleCommandResult()
1271
1272 @orchestrator._cli_read_command(
1273 'cephadm get-extra-ceph-conf')
1274 def _get_extra_ceph_conf(self) -> HandleCommandResult:
1275 """
1276 Get extra ceph conf that is appended
1277 """
1278 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
1279
1280 @orchestrator._cli_read_command('cephadm config-check ls')
1281 def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
1282 """List the available configuration checks and their current state"""
1283
1284 if format not in [Format.plain, Format.json, Format.json_pretty]:
1285 return HandleCommandResult(
1286 retval=1,
1287 stderr="Requested format is not supported when listing configuration checks"
1288 )
1289
1290 if format in [Format.json, Format.json_pretty]:
1291 return HandleCommandResult(
1292 stdout=to_format(self.config_checker.health_checks,
1293 format,
1294 many=True,
1295 cls=None))
1296
1297 # plain formatting
1298 table = PrettyTable(
1299 ['NAME',
1300 'HEALTHCHECK',
1301 'STATUS',
1302 'DESCRIPTION'
1303 ], border=False)
1304 table.align['NAME'] = 'l'
1305 table.align['HEALTHCHECK'] = 'l'
1306 table.align['STATUS'] = 'l'
1307 table.align['DESCRIPTION'] = 'l'
1308 table.left_padding_width = 0
1309 table.right_padding_width = 2
1310 for c in self.config_checker.health_checks:
1311 table.add_row((
1312 c.name,
1313 c.healthcheck_name,
1314 c.status,
1315 c.description,
1316 ))
1317
1318 return HandleCommandResult(stdout=table.get_string())
1319
1320 @orchestrator._cli_read_command('cephadm config-check status')
1321 def _config_check_status(self) -> HandleCommandResult:
1322 """Show whether the configuration checker feature is enabled/disabled"""
1323 status = self.get_module_option('config_checks_enabled')
1324 return HandleCommandResult(stdout="Enabled" if status else "Disabled")
1325
1326 @orchestrator._cli_write_command('cephadm config-check enable')
1327 def _config_check_enable(self, check_name: str) -> HandleCommandResult:
1328 """Enable a specific configuration check"""
1329 if not self._config_check_valid(check_name):
1330 return HandleCommandResult(retval=1, stderr="Invalid check name")
1331
1332 err, msg = self._update_config_check(check_name, 'enabled')
1333 if err:
1334 return HandleCommandResult(
1335 retval=err,
1336 stderr=f"Failed to enable check '{check_name}' : {msg}")
1337
1338 return HandleCommandResult(stdout="ok")
1339
1340 @orchestrator._cli_write_command('cephadm config-check disable')
1341 def _config_check_disable(self, check_name: str) -> HandleCommandResult:
1342 """Disable a specific configuration check"""
1343 if not self._config_check_valid(check_name):
1344 return HandleCommandResult(retval=1, stderr="Invalid check name")
1345
1346 err, msg = self._update_config_check(check_name, 'disabled')
1347 if err:
1348 return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}")
1349 else:
1350 # drop any outstanding raised healthcheck for this check
1351 config_check = self.config_checker.lookup_check(check_name)
1352 if config_check:
1353 if config_check.healthcheck_name in self.health_checks:
1354 self.health_checks.pop(config_check.healthcheck_name, None)
1355 self.set_health_checks(self.health_checks)
1356 else:
1357 self.log.error(
1358 f"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1359
1360 return HandleCommandResult(stdout="ok")
1361
1362 def _config_check_valid(self, check_name: str) -> bool:
1363 return check_name in [chk.name for chk in self.config_checker.health_checks]
1364
1365 def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]:
1366 checks_raw = self.get_store('config_checks')
1367 if not checks_raw:
1368 return 1, "config_checks setting is not available"
1369
1370 checks = json.loads(checks_raw)
1371 checks.update({
1372 check_name: status
1373 })
1374 self.log.info(f"updated config check '{check_name}' : {status}")
1375 self.set_store('config_checks', json.dumps(checks))
1376 return 0, ""
1377
1378 class ExtraCephConf(NamedTuple):
1379 conf: str
1380 last_modified: Optional[datetime.datetime]
1381
1382 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
1383 data = self.get_store('extra_ceph_conf')
1384 if not data:
1385 return CephadmOrchestrator.ExtraCephConf('', None)
1386 try:
1387 j = json.loads(data)
1388 except ValueError:
1389 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
1390 self.log.exception('%s: \'%s\'', msg, data)
1391 return CephadmOrchestrator.ExtraCephConf('', None)
1392 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
1393
1394 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
1395 conf = self.extra_ceph_conf()
1396 if not conf.last_modified:
1397 return False
1398 return conf.last_modified > dt
1399
1400 @orchestrator._cli_write_command(
1401 'cephadm osd activate'
1402 )
1403 def _osd_activate(self, host: List[str]) -> HandleCommandResult:
1404 """
1405 Start OSD containers for existing OSDs
1406 """
1407
1408 @forall_hosts
1409 def run(h: str) -> str:
1410 with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'):
1411 return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
1412
1413 return HandleCommandResult(stdout='\n'.join(run(host)))
1414
1415 @orchestrator._cli_read_command('orch client-keyring ls')
1416 def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
1417 """
1418 List client keyrings under cephadm management
1419 """
1420 if format != Format.plain:
1421 output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec)
1422 else:
1423 table = PrettyTable(
1424 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1425 border=False)
1426 table.align = 'l'
1427 table.left_padding_width = 0
1428 table.right_padding_width = 2
1429 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity):
1430 table.add_row((
1431 ks.entity, ks.placement.pretty_str(),
1432 utils.file_mode_to_str(ks.mode),
1433 f'{ks.uid}:{ks.gid}',
1434 ks.path,
1435 ))
1436 output = table.get_string()
1437 return HandleCommandResult(stdout=output)
1438
1439 @orchestrator._cli_write_command('orch client-keyring set')
1440 def _client_keyring_set(
1441 self,
1442 entity: str,
1443 placement: str,
1444 owner: Optional[str] = None,
1445 mode: Optional[str] = None,
1446 ) -> HandleCommandResult:
1447 """
1448 Add or update client keyring under cephadm management
1449 """
1450 if not entity.startswith('client.'):
1451 raise OrchestratorError('entity must start with client.')
1452 if owner:
1453 try:
1454 uid, gid = map(int, owner.split(':'))
1455 except Exception:
1456 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1457 else:
1458 uid = 0
1459 gid = 0
1460 if mode:
1461 try:
1462 imode = int(mode, 8)
1463 except Exception:
1464 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1465 else:
1466 imode = 0o600
1467 pspec = PlacementSpec.from_string(placement)
1468 ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid)
1469 self.keys.update(ks)
1470 self._kick_serve_loop()
1471 return HandleCommandResult()
1472
1473 @orchestrator._cli_write_command('orch client-keyring rm')
1474 def _client_keyring_rm(
1475 self,
1476 entity: str,
1477 ) -> HandleCommandResult:
1478 """
1479 Remove client keyring from cephadm management
1480 """
1481 self.keys.rm(entity)
1482 self._kick_serve_loop()
1483 return HandleCommandResult()
1484
1485 def _get_container_image(self, daemon_name: str) -> Optional[str]:
1486 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1487 image: Optional[str] = None
1488 if daemon_type in CEPH_IMAGE_TYPES:
1489 # get container image
1490 image = str(self.get_foreign_ceph_option(
1491 utils.name_to_config_section(daemon_name),
1492 'container_image'
1493 )).strip()
1494 elif daemon_type == 'prometheus':
1495 image = self.container_image_prometheus
1496 elif daemon_type == 'nvmeof':
1497 image = self.container_image_nvmeof
1498 elif daemon_type == 'grafana':
1499 image = self.container_image_grafana
1500 elif daemon_type == 'alertmanager':
1501 image = self.container_image_alertmanager
1502 elif daemon_type == 'node-exporter':
1503 image = self.container_image_node_exporter
1504 elif daemon_type == 'loki':
1505 image = self.container_image_loki
1506 elif daemon_type == 'promtail':
1507 image = self.container_image_promtail
1508 elif daemon_type == 'haproxy':
1509 image = self.container_image_haproxy
1510 elif daemon_type == 'keepalived':
1511 image = self.container_image_keepalived
1512 elif daemon_type == 'elasticsearch':
1513 image = self.container_image_elasticsearch
1514 elif daemon_type == 'jaeger-agent':
1515 image = self.container_image_jaeger_agent
1516 elif daemon_type == 'jaeger-collector':
1517 image = self.container_image_jaeger_collector
1518 elif daemon_type == 'jaeger-query':
1519 image = self.container_image_jaeger_query
1520 elif daemon_type == CustomContainerService.TYPE:
1521 # The image can't be resolved, the necessary information
1522 # is only available when a container is deployed (given
1523 # via spec).
1524 image = None
1525 elif daemon_type == 'snmp-gateway':
1526 image = self.container_image_snmp_gateway
1527 else:
1528 assert False, daemon_type
1529
1530 self.log.debug('%s container image %s' % (daemon_name, image))
1531
1532 return image
1533
1534 def _check_valid_addr(self, host: str, addr: str) -> str:
1535 # make sure hostname is resolvable before trying to make a connection
1536 try:
1537 ip_addr = utils.resolve_ip(addr)
1538 except OrchestratorError as e:
1539 msg = str(e) + f'''
1540 You may need to supply an address for {addr}
1541
1542 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1543 To add the cephadm SSH key to the host:
1544 > ceph cephadm get-pub-key > ~/ceph.pub
1545 > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1546
1547 To check that the host is reachable open a new shell with the --no-hosts flag:
1548 > cephadm shell --no-hosts
1549
1550 Then run the following:
1551 > ceph cephadm get-ssh-config > ssh_config
1552 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1553 > chmod 0600 ~/cephadm_private_key
1554 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1555 raise OrchestratorError(msg)
1556
1557 if ipaddress.ip_address(ip_addr).is_loopback and host == addr:
1558 # if this is a re-add, use old address. otherwise error
1559 if host not in self.inventory or self.inventory.get_addr(host) == host:
1560 raise OrchestratorError(
1561 (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1562 + f'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1563 self.log.debug(
1564 f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1565 ip_addr = self.inventory.get_addr(host)
1566 try:
1567 with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
1568 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
1569 host, cephadmNoImage, 'check-host',
1570 ['--expect-hostname', host],
1571 addr=addr,
1572 error_ok=True, no_fsid=True))
1573 if code:
1574 msg = 'check-host failed:\n' + '\n'.join(err)
1575 # err will contain stdout and stderr, so we filter on the message text to
1576 # only show the errors
1577 errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
1578 if errors:
1579 msg = f'Host {host} ({addr}) failed check(s): {errors}'
1580 raise OrchestratorError(msg)
1581 except ssh.HostConnectionError as e:
1582 raise OrchestratorError(str(e))
1583 return ip_addr
1584
1585 def _add_host(self, spec):
1586 # type: (HostSpec) -> str
1587 """
1588 Add a host to be managed by the orchestrator.
1589
1590 :param host: host name
1591 """
1592 HostSpec.validate(spec)
1593 ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
1594 if spec.addr == spec.hostname and ip_addr:
1595 spec.addr = ip_addr
1596
1597 if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr:
1598 self.cache.refresh_all_host_info(spec.hostname)
1599
1600 # prime crush map?
1601 if spec.location:
1602 self.check_mon_command({
1603 'prefix': 'osd crush add-bucket',
1604 'name': spec.hostname,
1605 'type': 'host',
1606 'args': [f'{k}={v}' for k, v in spec.location.items()],
1607 })
1608
1609 if spec.hostname not in self.inventory:
1610 self.cache.prime_empty_host(spec.hostname)
1611 self.inventory.add_host(spec)
1612 self.offline_hosts_remove(spec.hostname)
1613 if spec.status == 'maintenance':
1614 self._set_maintenance_healthcheck()
1615 self.event.set() # refresh stray health check
1616 self.log.info('Added host %s' % spec.hostname)
1617 return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
1618
1619 @handle_orch_error
1620 def add_host(self, spec: HostSpec) -> str:
1621 return self._add_host(spec)
1622
1623 @handle_orch_error
1624 def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str:
1625 """
1626 Remove a host from orchestrator management.
1627
1628 :param host: host name
1629 :param force: bypass running daemons check
1630 :param offline: remove offline host
1631 """
1632
1633 # check if host is offline
1634 host_offline = host in self.offline_hosts
1635
1636 if host_offline and not offline:
1637 raise OrchestratorValidationError(
1638 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host))
1639
1640 if not host_offline and offline:
1641 raise OrchestratorValidationError(
1642 "{} is online, please remove host without --offline.".format(host))
1643
1644 if offline and not force:
1645 raise OrchestratorValidationError("Removing an offline host requires --force")
1646
1647 # check if there are daemons on the host
1648 if not force:
1649 daemons = self.cache.get_daemons_by_host(host)
1650 if daemons:
1651 self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}")
1652
1653 daemons_table = ""
1654 daemons_table += "{:<20} {:<15}\n".format("type", "id")
1655 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1656 for d in daemons:
1657 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
1658
1659 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1660 "The following daemons are running in the host:"
1661 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1662 host, daemons_table, host))
1663
1664 # check, if there we're removing the last _admin host
1665 if not force:
1666 p = PlacementSpec(label=SpecialHostLabels.ADMIN)
1667 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1668 if len(admin_hosts) == 1 and admin_hosts[0] == host:
1669 raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
1670 f" label. Please add the '{SpecialHostLabels.ADMIN}' label to a host"
1671 " or add --force to this command")
1672
1673 def run_cmd(cmd_args: dict) -> None:
1674 ret, out, err = self.mon_command(cmd_args)
1675 if ret != 0:
1676 self.log.debug(f"ran {cmd_args} with mon_command")
1677 self.log.error(
1678 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1679 self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
1680
1681 if offline:
1682 daemons = self.cache.get_daemons_by_host(host)
1683 for d in daemons:
1684 self.log.info(f"removing: {d.name()}")
1685
1686 if d.daemon_type != 'osd':
1687 self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
1688 self.cephadm_services[daemon_type_to_service(
1689 str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
1690 else:
1691 cmd_args = {
1692 'prefix': 'osd purge-actual',
1693 'id': int(str(d.daemon_id)),
1694 'yes_i_really_mean_it': True
1695 }
1696 run_cmd(cmd_args)
1697
1698 cmd_args = {
1699 'prefix': 'osd crush rm',
1700 'name': host
1701 }
1702 run_cmd(cmd_args)
1703
1704 self.inventory.rm_host(host)
1705 self.cache.rm_host(host)
1706 self.ssh.reset_con(host)
1707 # if host was in offline host list, we should remove it now.
1708 self.offline_hosts_remove(host)
1709 self.event.set() # refresh stray health check
1710 self.log.info('Removed host %s' % host)
1711 return "Removed {} host '{}'".format('offline' if offline else '', host)
1712
1713 @handle_orch_error
1714 def update_host_addr(self, host: str, addr: str) -> str:
1715 self._check_valid_addr(host, addr)
1716 self.inventory.set_addr(host, addr)
1717 self.ssh.reset_con(host)
1718 self.event.set() # refresh stray health check
1719 self.log.info('Set host %s addr to %s' % (host, addr))
1720 return "Updated host '{}' addr to '{}'".format(host, addr)
1721
1722 @handle_orch_error
1723 def get_hosts(self):
1724 # type: () -> List[orchestrator.HostSpec]
1725 """
1726 Return a list of hosts managed by the orchestrator.
1727
1728 Notes:
1729 - skip async: manager reads from cache.
1730 """
1731 return list(self.inventory.all_specs())
1732
1733 @handle_orch_error
1734 def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]:
1735 """
1736 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1737
1738 Notes:
1739 - skip async: manager reads from cache.
1740 """
1741 if hostname:
1742 return [self.cache.get_facts(hostname)]
1743
1744 return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()]
1745
1746 @handle_orch_error
1747 def add_host_label(self, host: str, label: str) -> str:
1748 self.inventory.add_label(host, label)
1749 self.log.info('Added label %s to host %s' % (label, host))
1750 self._kick_serve_loop()
1751 return 'Added label %s to host %s' % (label, host)
1752
1753 @handle_orch_error
1754 def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
1755 # if we remove the _admin label from the only host that has it we could end up
1756 # removing the only instance of the config and keyring and cause issues
1757 if not force and label == SpecialHostLabels.ADMIN:
1758 p = PlacementSpec(label=SpecialHostLabels.ADMIN)
1759 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1760 if len(admin_hosts) == 1 and admin_hosts[0] == host:
1761 raise OrchestratorValidationError(f"Host {host} is the last host with the '{SpecialHostLabels.ADMIN}'"
1762 f" label.\nRemoving the {SpecialHostLabels.ADMIN} label from this host could cause the removal"
1763 " of the last cluster config/keyring managed by cephadm.\n"
1764 f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
1765 " before completing this operation.\nIf you're certain this is"
1766 " what you want rerun this command with --force.")
1767 if self.inventory.has_label(host, label):
1768 self.inventory.rm_label(host, label)
1769 msg = f'Removed label {label} from host {host}'
1770 else:
1771 msg = f"Host {host} does not have label '{label}'. Please use 'ceph orch host ls' to list all the labels."
1772 self.log.info(msg)
1773 self._kick_serve_loop()
1774 return msg
1775
1776 def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
1777 self.log.debug("running host-ok-to-stop checks")
1778 daemons = self.cache.get_daemons()
1779 daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
1780 for dd in daemons:
1781 assert dd.hostname is not None
1782 assert dd.daemon_type is not None
1783 assert dd.daemon_id is not None
1784 if dd.hostname == hostname:
1785 daemon_map[dd.daemon_type].append(dd.daemon_id)
1786
1787 notifications: List[str] = []
1788 error_notifications: List[str] = []
1789 okay: bool = True
1790 for daemon_type, daemon_ids in daemon_map.items():
1791 r = self.cephadm_services[daemon_type_to_service(
1792 daemon_type)].ok_to_stop(daemon_ids, force=force)
1793 if r.retval:
1794 okay = False
1795 # collect error notifications so user can see every daemon causing host
1796 # to not be okay to stop
1797 error_notifications.append(r.stderr)
1798 if r.stdout:
1799 # if extra notifications to print for user, add them to notifications list
1800 notifications.append(r.stdout)
1801
1802 if not okay:
1803 # at least one daemon is not okay to stop
1804 return 1, '\n'.join(error_notifications)
1805
1806 if notifications:
1807 return 0, (f'It is presumed safe to stop host {hostname}. '
1808 + 'Note the following:\n\n' + '\n'.join(notifications))
1809 return 0, f'It is presumed safe to stop host {hostname}'
1810
1811 @handle_orch_error
1812 def host_ok_to_stop(self, hostname: str) -> str:
1813 if hostname not in self.cache.get_hosts():
1814 raise OrchestratorError(f'Cannot find host "{hostname}"')
1815
1816 rc, msg = self._host_ok_to_stop(hostname)
1817 if rc:
1818 raise OrchestratorError(msg, errno=rc)
1819
1820 self.log.info(msg)
1821 return msg
1822
1823 def _set_maintenance_healthcheck(self) -> None:
1824 """Raise/update or clear the maintenance health check as needed"""
1825
1826 in_maintenance = self.inventory.get_host_with_state("maintenance")
1827 if not in_maintenance:
1828 self.remove_health_warning('HOST_IN_MAINTENANCE')
1829 else:
1830 s = "host is" if len(in_maintenance) == 1 else "hosts are"
1831 self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [
1832 f"{h} is in maintenance" for h in in_maintenance])
1833
1834 @handle_orch_error
1835 @host_exists()
1836 def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> str:
1837 """ Attempt to place a cluster host in maintenance
1838
1839 Placing a host into maintenance disables the cluster's ceph target in systemd
1840 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1841 for the host subtree in crush to prevent data movement during a host maintenance
1842 window.
1843
1844 :param hostname: (str) name of the host (must match an inventory hostname)
1845
1846 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1847 """
1848 if yes_i_really_mean_it and not force:
1849 raise OrchestratorError("--force must be passed with --yes-i-really-mean-it")
1850
1851 if len(self.cache.get_hosts()) == 1 and not yes_i_really_mean_it:
1852 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1853
1854 # if upgrade is active, deny
1855 if self.upgrade.upgrade_state and not yes_i_really_mean_it:
1856 raise OrchestratorError(
1857 f"Unable to place {hostname} in maintenance with upgrade active/paused")
1858
1859 tgt_host = self.inventory._inventory[hostname]
1860 if tgt_host.get("status", "").lower() == "maintenance":
1861 raise OrchestratorError(f"Host {hostname} is already in maintenance")
1862
1863 host_daemons = self.cache.get_daemon_types(hostname)
1864 self.log.debug("daemons on host {}".format(','.join(host_daemons)))
1865 if host_daemons:
1866 # daemons on this host, so check the daemons can be stopped
1867 # and if so, place the host into maintenance by disabling the target
1868 rc, msg = self._host_ok_to_stop(hostname, force)
1869 if rc and not yes_i_really_mean_it:
1870 raise OrchestratorError(
1871 msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
1872
1873 # call the host-maintenance function
1874 with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'):
1875 _out, _err, _code = self.wait_async(
1876 CephadmServe(self)._run_cephadm(
1877 hostname, cephadmNoImage, "host-maintenance",
1878 ["enter"],
1879 error_ok=True))
1880 returned_msg = _err[0].split('\n')[-1]
1881 if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it:
1882 raise OrchestratorError(
1883 f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1884
1885 if "osd" in host_daemons:
1886 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1887 rc, out, err = self.mon_command({
1888 'prefix': 'osd set-group',
1889 'flags': 'noout',
1890 'who': [crush_node],
1891 'format': 'json'
1892 })
1893 if rc and not yes_i_really_mean_it:
1894 self.log.warning(
1895 f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1896 raise OrchestratorError(
1897 f"Unable to set the osds on {hostname} to noout (rc={rc})")
1898 elif not rc:
1899 self.log.info(
1900 f"maintenance mode request for {hostname} has SET the noout group")
1901
1902 # update the host status in the inventory
1903 tgt_host["status"] = "maintenance"
1904 self.inventory._inventory[hostname] = tgt_host
1905 self.inventory.save()
1906
1907 self._set_maintenance_healthcheck()
1908 return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
1909
1910 @handle_orch_error
1911 @host_exists()
1912 def exit_host_maintenance(self, hostname: str) -> str:
1913 """Exit maintenance mode and return a host to an operational state
1914
1915 Returning from maintenance will enable the clusters systemd target and
1916 start it, and remove any noout that has been added for the host if the
1917 host has osd daemons
1918
1919 :param hostname: (str) host name
1920
1921 :raises OrchestratorError: Unable to return from maintenance, or unset the
1922 noout flag
1923 """
1924 tgt_host = self.inventory._inventory[hostname]
1925 if tgt_host['status'] != "maintenance":
1926 raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
1927
1928 with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
1929 outs, errs, _code = self.wait_async(
1930 CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
1931 'host-maintenance', ['exit'], error_ok=True))
1932 returned_msg = errs[0].split('\n')[-1]
1933 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
1934 raise OrchestratorError(
1935 f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1936
1937 if "osd" in self.cache.get_daemon_types(hostname):
1938 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1939 rc, _out, _err = self.mon_command({
1940 'prefix': 'osd unset-group',
1941 'flags': 'noout',
1942 'who': [crush_node],
1943 'format': 'json'
1944 })
1945 if rc:
1946 self.log.warning(
1947 f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1948 raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
1949 else:
1950 self.log.info(
1951 f"exit maintenance request has UNSET for the noout group on host {hostname}")
1952
1953 # update the host record status
1954 tgt_host['status'] = ""
1955 self.inventory._inventory[hostname] = tgt_host
1956 self.inventory.save()
1957
1958 self._set_maintenance_healthcheck()
1959
1960 return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1961
1962 @handle_orch_error
1963 @host_exists()
1964 def rescan_host(self, hostname: str) -> str:
1965 """Use cephadm to issue a disk rescan on each HBA
1966
1967 Some HBAs and external enclosures don't automatically register
1968 device insertion with the kernel, so for these scenarios we need
1969 to manually rescan
1970
1971 :param hostname: (str) host name
1972 """
1973 self.log.info(f'disk rescan request sent to host "{hostname}"')
1974 with self.async_timeout_handler(hostname, 'cephadm disk-rescan'):
1975 _out, _err, _code = self.wait_async(
1976 CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
1977 [], no_fsid=True, error_ok=True))
1978 if not _err:
1979 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
1980
1981 msg = _err[0].split('\n')[-1]
1982 log_msg = f'disk rescan: {msg}'
1983 if msg.upper().startswith('OK'):
1984 self.log.info(log_msg)
1985 else:
1986 self.log.warning(log_msg)
1987
1988 return f'{msg}'
1989
1990 def get_minimal_ceph_conf(self) -> str:
1991 _, config, _ = self.check_mon_command({
1992 "prefix": "config generate-minimal-conf",
1993 })
1994 extra = self.extra_ceph_conf().conf
1995 if extra:
1996 try:
1997 config = self._combine_confs(config, extra)
1998 except Exception as e:
1999 self.log.error(f'Failed to add extra ceph conf settings to minimal ceph conf: {e}')
2000 return config
2001
2002 def _combine_confs(self, conf1: str, conf2: str) -> str:
2003 section_to_option: Dict[str, List[str]] = {}
2004 final_conf: str = ''
2005 for conf in [conf1, conf2]:
2006 if not conf:
2007 continue
2008 section = ''
2009 for line in conf.split('\n'):
2010 if line.strip().startswith('#') or not line.strip():
2011 continue
2012 if line.strip().startswith('[') and line.strip().endswith(']'):
2013 section = line.strip().replace('[', '').replace(']', '')
2014 if section not in section_to_option:
2015 section_to_option[section] = []
2016 else:
2017 section_to_option[section].append(line.strip())
2018
2019 first_section = True
2020 for section, options in section_to_option.items():
2021 if not first_section:
2022 final_conf += '\n'
2023 final_conf += f'[{section}]\n'
2024 for option in options:
2025 final_conf += f'{option}\n'
2026 first_section = False
2027
2028 return final_conf
2029
2030 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
2031 if filter_host:
2032 self.cache.invalidate_host_daemons(filter_host)
2033 else:
2034 for h in self.cache.get_hosts():
2035 # Also discover daemons deployed manually
2036 self.cache.invalidate_host_daemons(h)
2037
2038 self._kick_serve_loop()
2039
2040 @handle_orch_error
2041 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
2042 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
2043 if refresh:
2044 self._invalidate_daemons_and_kick_serve()
2045 self.log.debug('Kicked serve() loop to refresh all services')
2046
2047 sm: Dict[str, orchestrator.ServiceDescription] = {}
2048
2049 # known services
2050 for nm, spec in self.spec_store.all_specs.items():
2051 if service_type is not None and service_type != spec.service_type:
2052 continue
2053 if service_name is not None and service_name != nm:
2054 continue
2055
2056 if spec.service_type != 'osd':
2057 size = spec.placement.get_target_count(self.cache.get_schedulable_hosts())
2058 else:
2059 # osd counting is special
2060 size = 0
2061
2062 sm[nm] = orchestrator.ServiceDescription(
2063 spec=spec,
2064 size=size,
2065 running=0,
2066 events=self.events.get_for_service(spec.service_name()),
2067 created=self.spec_store.spec_created[nm],
2068 deleted=self.spec_store.spec_deleted.get(nm, None),
2069 virtual_ip=spec.get_virtual_ip(),
2070 ports=spec.get_port_start(),
2071 )
2072 if spec.service_type == 'ingress':
2073 # ingress has 2 daemons running per host
2074 # but only if it's the full ingress service, not for keepalive-only
2075 if not cast(IngressSpec, spec).keepalive_only:
2076 sm[nm].size *= 2
2077
2078 # factor daemons into status
2079 for h, dm in self.cache.get_daemons_with_volatile_status():
2080 for name, dd in dm.items():
2081 assert dd.hostname is not None, f'no hostname for {dd!r}'
2082 assert dd.daemon_type is not None, f'no daemon_type for {dd!r}'
2083
2084 n: str = dd.service_name()
2085
2086 if (
2087 service_type
2088 and service_type != daemon_type_to_service(dd.daemon_type)
2089 ):
2090 continue
2091 if service_name and service_name != n:
2092 continue
2093
2094 if n not in sm:
2095 # new unmanaged service
2096 spec = ServiceSpec(
2097 unmanaged=True,
2098 service_type=daemon_type_to_service(dd.daemon_type),
2099 service_id=dd.service_id(),
2100 )
2101 sm[n] = orchestrator.ServiceDescription(
2102 last_refresh=dd.last_refresh,
2103 container_image_id=dd.container_image_id,
2104 container_image_name=dd.container_image_name,
2105 spec=spec,
2106 size=0,
2107 )
2108
2109 if dd.status == DaemonDescriptionStatus.running:
2110 sm[n].running += 1
2111 if dd.daemon_type == 'osd':
2112 # The osd count can't be determined by the Placement spec.
2113 # Showing an actual/expected representation cannot be determined
2114 # here. So we're setting running = size for now.
2115 sm[n].size += 1
2116 if (
2117 not sm[n].last_refresh
2118 or not dd.last_refresh
2119 or dd.last_refresh < sm[n].last_refresh # type: ignore
2120 ):
2121 sm[n].last_refresh = dd.last_refresh
2122
2123 return list(sm.values())
2124
2125 @handle_orch_error
2126 def list_daemons(self,
2127 service_name: Optional[str] = None,
2128 daemon_type: Optional[str] = None,
2129 daemon_id: Optional[str] = None,
2130 host: Optional[str] = None,
2131 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
2132 if refresh:
2133 self._invalidate_daemons_and_kick_serve(host)
2134 self.log.debug('Kicked serve() loop to refresh all daemons')
2135
2136 result = []
2137 for h, dm in self.cache.get_daemons_with_volatile_status():
2138 if host and h != host:
2139 continue
2140 for name, dd in dm.items():
2141 if daemon_type is not None and daemon_type != dd.daemon_type:
2142 continue
2143 if daemon_id is not None and daemon_id != dd.daemon_id:
2144 continue
2145 if service_name is not None and service_name != dd.service_name():
2146 continue
2147 if not dd.memory_request and dd.daemon_type in ['osd', 'mon']:
2148 dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option(
2149 dd.name(),
2150 f"{dd.daemon_type}_memory_target"
2151 ))
2152 result.append(dd)
2153 return result
2154
2155 @handle_orch_error
2156 def service_action(self, action: str, service_name: str) -> List[str]:
2157 if service_name not in self.spec_store.all_specs.keys():
2158 raise OrchestratorError(f'Invalid service name "{service_name}".'
2159 + ' View currently running services using "ceph orch ls"')
2160 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
2161 if not dds:
2162 raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
2163 + ' View currently running services using "ceph orch ls"')
2164 if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
2165 return [f'Stopping entire {service_name} service is prohibited.']
2166 self.log.info('%s service %s' % (action.capitalize(), service_name))
2167 return [
2168 self._schedule_daemon_action(dd.name(), action)
2169 for dd in dds
2170 ]
2171
2172 def _rotate_daemon_key(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
2173 self.log.info(f'Rotating authentication key for {daemon_spec.name()}')
2174 rc, out, err = self.mon_command({
2175 'prefix': 'auth get-or-create-pending',
2176 'entity': daemon_spec.entity_name(),
2177 'format': 'json',
2178 })
2179 j = json.loads(out)
2180 pending_key = j[0]['pending_key']
2181
2182 # deploy a new keyring file
2183 if daemon_spec.daemon_type != 'osd':
2184 daemon_spec = self.cephadm_services[daemon_type_to_service(
2185 daemon_spec.daemon_type)].prepare_create(daemon_spec)
2186 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2187 self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
2188
2189 # try to be clever, or fall back to restarting the daemon
2190 rc = -1
2191 if daemon_spec.daemon_type == 'osd':
2192 rc, out, err = self.tool_exec(
2193 args=['ceph', 'tell', daemon_spec.name(), 'rotate-stored-key', '-i', '-'],
2194 stdin=pending_key.encode()
2195 )
2196 if not rc:
2197 rc, out, err = self.tool_exec(
2198 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2199 stdin=pending_key.encode()
2200 )
2201 elif daemon_spec.daemon_type == 'mds':
2202 rc, out, err = self.tool_exec(
2203 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2204 stdin=pending_key.encode()
2205 )
2206 elif (
2207 daemon_spec.daemon_type == 'mgr'
2208 and daemon_spec.daemon_id == self.get_mgr_id()
2209 ):
2210 rc, out, err = self.tool_exec(
2211 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2212 stdin=pending_key.encode()
2213 )
2214 if rc:
2215 self._daemon_action(daemon_spec, 'restart')
2216
2217 return f'Rotated key for {daemon_spec.name()}'
2218
2219 def _daemon_action(self,
2220 daemon_spec: CephadmDaemonDeploySpec,
2221 action: str,
2222 image: Optional[str] = None) -> str:
2223 self._daemon_action_set_image(action, image, daemon_spec.daemon_type,
2224 daemon_spec.daemon_id)
2225
2226 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type,
2227 daemon_spec.daemon_id):
2228 self.mgr_service.fail_over()
2229 return '' # unreachable
2230
2231 if action == 'rotate-key':
2232 return self._rotate_daemon_key(daemon_spec)
2233
2234 if action == 'redeploy' or action == 'reconfig':
2235 if daemon_spec.daemon_type != 'osd':
2236 daemon_spec = self.cephadm_services[daemon_type_to_service(
2237 daemon_spec.daemon_type)].prepare_create(daemon_spec)
2238 else:
2239 # for OSDs, we still need to update config, just not carry out the full
2240 # prepare_create function
2241 daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(
2242 daemon_spec)
2243 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2244 return self.wait_async(
2245 CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
2246
2247 actions = {
2248 'start': ['reset-failed', 'start'],
2249 'stop': ['stop'],
2250 'restart': ['reset-failed', 'restart'],
2251 }
2252 name = daemon_spec.name()
2253 for a in actions[action]:
2254 try:
2255 with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'):
2256 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2257 daemon_spec.host, name, 'unit',
2258 ['--name', name, a]))
2259 except Exception:
2260 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
2261 self.cache.invalidate_host_daemons(daemon_spec.host)
2262 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
2263 self.events.for_daemon(name, 'INFO', msg)
2264 return msg
2265
2266 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
2267 if image is not None:
2268 if action != 'redeploy':
2269 raise OrchestratorError(
2270 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
2271 if daemon_type not in CEPH_IMAGE_TYPES:
2272 raise OrchestratorError(
2273 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
2274 f'types are: {", ".join(CEPH_IMAGE_TYPES)}')
2275
2276 self.check_mon_command({
2277 'prefix': 'config set',
2278 'name': 'container_image',
2279 'value': image,
2280 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
2281 })
2282
2283 @handle_orch_error
2284 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
2285 d = self.cache.get_daemon(daemon_name)
2286 assert d.daemon_type is not None
2287 assert d.daemon_id is not None
2288
2289 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \
2290 and not self.mgr_service.mgr_map_has_standby():
2291 raise OrchestratorError(
2292 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2293
2294 if action == 'rotate-key':
2295 if d.daemon_type not in ['mgr', 'osd', 'mds',
2296 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
2297 raise OrchestratorError(
2298 f'key rotation not supported for {d.daemon_type}'
2299 )
2300
2301 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
2302
2303 self.log.info(f'Schedule {action} daemon {daemon_name}')
2304 return self._schedule_daemon_action(daemon_name, action)
2305
2306 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
2307 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
2308
2309 def get_active_mgr(self) -> DaemonDescription:
2310 return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr'))
2311
2312 def get_active_mgr_digests(self) -> List[str]:
2313 digests = self.mgr_service.get_active_daemon(
2314 self.cache.get_daemons_by_type('mgr')).container_image_digests
2315 return digests if digests else []
2316
2317 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
2318 dd = self.cache.get_daemon(daemon_name)
2319 assert dd.daemon_type is not None
2320 assert dd.daemon_id is not None
2321 assert dd.hostname is not None
2322 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
2323 and not self.mgr_service.mgr_map_has_standby():
2324 raise OrchestratorError(
2325 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2326 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
2327 self.cache.save_host(dd.hostname)
2328 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
2329 self._kick_serve_loop()
2330 return msg
2331
2332 @handle_orch_error
2333 def remove_daemons(self, names):
2334 # type: (List[str]) -> List[str]
2335 args = []
2336 for host, dm in self.cache.daemons.items():
2337 for name in names:
2338 if name in dm:
2339 args.append((name, host))
2340 if not args:
2341 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
2342 self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
2343 return self._remove_daemons(args)
2344
2345 @handle_orch_error
2346 def remove_service(self, service_name: str, force: bool = False) -> str:
2347 self.log.info('Remove service %s' % service_name)
2348 self._trigger_preview_refresh(service_name=service_name)
2349 if service_name in self.spec_store:
2350 if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
2351 return f'Unable to remove {service_name} service.\n' \
2352 f'Note, you might want to mark the {service_name} service as "unmanaged"'
2353 else:
2354 return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
2355
2356 # Report list of affected OSDs?
2357 if not force and service_name.startswith('osd.'):
2358 osds_msg = {}
2359 for h, dm in self.cache.get_daemons_with_volatile_status():
2360 osds_to_remove = []
2361 for name, dd in dm.items():
2362 if dd.daemon_type == 'osd' and dd.service_name() == service_name:
2363 osds_to_remove.append(str(dd.daemon_id))
2364 if osds_to_remove:
2365 osds_msg[h] = osds_to_remove
2366 if osds_msg:
2367 msg = ''
2368 for h, ls in osds_msg.items():
2369 msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2370 raise OrchestratorError(
2371 f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2372
2373 found = self.spec_store.rm(service_name)
2374 if found and service_name.startswith('osd.'):
2375 self.spec_store.finally_rm(service_name)
2376 self._kick_serve_loop()
2377 return f'Removed service {service_name}'
2378
2379 @handle_orch_error
2380 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
2381 """
2382 Return the storage inventory of hosts matching the given filter.
2383
2384 :param host_filter: host filter
2385
2386 TODO:
2387 - add filtering by label
2388 """
2389 if refresh:
2390 if host_filter and host_filter.hosts:
2391 for h in host_filter.hosts:
2392 self.log.debug(f'will refresh {h} devs')
2393 self.cache.invalidate_host_devices(h)
2394 self.cache.invalidate_host_networks(h)
2395 else:
2396 for h in self.cache.get_hosts():
2397 self.log.debug(f'will refresh {h} devs')
2398 self.cache.invalidate_host_devices(h)
2399 self.cache.invalidate_host_networks(h)
2400
2401 self.event.set()
2402 self.log.debug('Kicked serve() loop to refresh devices')
2403
2404 result = []
2405 for host, dls in self.cache.devices.items():
2406 if host_filter and host_filter.hosts and host not in host_filter.hosts:
2407 continue
2408 result.append(orchestrator.InventoryHost(host,
2409 inventory.Devices(dls)))
2410 return result
2411
2412 @handle_orch_error
2413 def zap_device(self, host: str, path: str) -> str:
2414 """Zap a device on a managed host.
2415
2416 Use ceph-volume zap to return a device to an unused/free state
2417
2418 Args:
2419 host (str): hostname of the cluster host
2420 path (str): device path
2421
2422 Raises:
2423 OrchestratorError: host is not a cluster host
2424 OrchestratorError: host is in maintenance and therefore unavailable
2425 OrchestratorError: device path not found on the host
2426 OrchestratorError: device is known to a different ceph cluster
2427 OrchestratorError: device holds active osd
2428 OrchestratorError: device cache hasn't been populated yet..
2429
2430 Returns:
2431 str: output from the zap command
2432 """
2433
2434 self.log.info('Zap device %s:%s' % (host, path))
2435
2436 if host not in self.inventory.keys():
2437 raise OrchestratorError(
2438 f"Host '{host}' is not a member of the cluster")
2439
2440 host_info = self.inventory._inventory.get(host, {})
2441 if host_info.get('status', '').lower() == 'maintenance':
2442 raise OrchestratorError(
2443 f"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2444
2445 if host not in self.cache.devices:
2446 raise OrchestratorError(
2447 f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2448
2449 host_devices = self.cache.devices[host]
2450 path_found = False
2451 osd_id_list: List[str] = []
2452
2453 for dev in host_devices:
2454 if dev.path == path:
2455 path_found = True
2456 break
2457 if not path_found:
2458 raise OrchestratorError(
2459 f"Device path '{path}' not found on host '{host}'")
2460
2461 if osd_id_list:
2462 dev_name = os.path.basename(path)
2463 active_osds: List[str] = []
2464 for osd_id in osd_id_list:
2465 metadata = self.get_metadata('osd', str(osd_id))
2466 if metadata:
2467 if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','):
2468 active_osds.append("osd." + osd_id)
2469 if active_osds:
2470 raise OrchestratorError(
2471 f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2472 f"OSD{'s' if len(active_osds) > 1 else ''}"
2473 f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2474
2475 cv_args = ['--', 'lvm', 'zap', '--destroy', path]
2476 with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'):
2477 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2478 host, 'osd', 'ceph-volume', cv_args, error_ok=True))
2479
2480 self.cache.invalidate_host_devices(host)
2481 self.cache.invalidate_host_networks(host)
2482 if code:
2483 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
2484 msg = f'zap successful for {path} on {host}'
2485 self.log.info(msg)
2486
2487 return msg + '\n'
2488
2489 @handle_orch_error
2490 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
2491 """
2492 Blink a device light. Calling something like::
2493
2494 lsmcli local-disk-ident-led-on --path $path
2495
2496 If you must, you can customize this via::
2497
2498 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2499 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2500
2501 See templates/blink_device_light_cmd.j2
2502 """
2503 @forall_hosts
2504 def blink(host: str, dev: str, path: str) -> str:
2505 cmd_line = self.template.render('blink_device_light_cmd.j2',
2506 {
2507 'on': on,
2508 'ident_fault': ident_fault,
2509 'dev': dev,
2510 'path': path
2511 },
2512 host=host)
2513 cmd_args = shlex.split(cmd_line)
2514
2515 with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'):
2516 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2517 host, 'osd', 'shell', ['--'] + cmd_args,
2518 error_ok=True))
2519 if code:
2520 raise OrchestratorError(
2521 'Unable to affect %s light for %s:%s. Command: %s' % (
2522 ident_fault, host, dev, ' '.join(cmd_args)))
2523 self.log.info('Set %s light for %s:%s %s' % (
2524 ident_fault, host, dev, 'on' if on else 'off'))
2525 return "Set %s light for %s:%s %s" % (
2526 ident_fault, host, dev, 'on' if on else 'off')
2527
2528 return blink(locs)
2529
2530 def get_osd_uuid_map(self, only_up=False):
2531 # type: (bool) -> Dict[str, str]
2532 osd_map = self.get('osd_map')
2533 r = {}
2534 for o in osd_map['osds']:
2535 # only include OSDs that have ever started in this map. this way
2536 # an interrupted osd create can be repeated and succeed the second
2537 # time around.
2538 osd_id = o.get('osd')
2539 if osd_id is None:
2540 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2541 if not only_up:
2542 r[str(osd_id)] = o.get('uuid', '')
2543 return r
2544
2545 def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]:
2546 osd = [x for x in self.get('osd_map')['osds']
2547 if x['osd'] == osd_id]
2548
2549 if len(osd) != 1:
2550 return None
2551
2552 return osd[0]
2553
2554 def _trigger_preview_refresh(self,
2555 specs: Optional[List[DriveGroupSpec]] = None,
2556 service_name: Optional[str] = None,
2557 ) -> None:
2558 # Only trigger a refresh when a spec has changed
2559 trigger_specs = []
2560 if specs:
2561 for spec in specs:
2562 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
2563 # the to-be-preview spec != the actual spec, this means we need to
2564 # trigger a refresh, if the spec has been removed (==None) we need to
2565 # refresh as well.
2566 if not preview_spec or spec != preview_spec:
2567 trigger_specs.append(spec)
2568 if service_name:
2569 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
2570 if not any(trigger_specs):
2571 return None
2572
2573 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
2574 for host in refresh_hosts:
2575 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
2576 self.cache.osdspec_previews_refresh_queue.append(host)
2577
2578 @handle_orch_error
2579 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
2580 """
2581 Deprecated. Please use `apply()` instead.
2582
2583 Keeping this around to be compatible to mgr/dashboard
2584 """
2585 return [self._apply(spec) for spec in specs]
2586
2587 @handle_orch_error
2588 def create_osds(self, drive_group: DriveGroupSpec) -> str:
2589 hosts: List[HostSpec] = self.inventory.all_specs()
2590 filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
2591 if not filtered_hosts:
2592 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
2593 return self.osd_service.create_from_spec(drive_group)
2594
2595 def _preview_osdspecs(self,
2596 osdspecs: Optional[List[DriveGroupSpec]] = None
2597 ) -> dict:
2598 if not osdspecs:
2599 return {'n/a': [{'error': True,
2600 'message': 'No OSDSpec or matching hosts found.'}]}
2601 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
2602 if not matching_hosts:
2603 return {'n/a': [{'error': True,
2604 'message': 'No OSDSpec or matching hosts found.'}]}
2605 # Is any host still loading previews or still in the queue to be previewed
2606 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
2607 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
2608 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2609 return {'n/a': [{'error': True,
2610 'message': 'Preview data is being generated.. '
2611 'Please re-run this command in a bit.'}]}
2612 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2613 previews_for_specs = {}
2614 for host, raw_reports in self.cache.osdspec_previews.items():
2615 if host not in matching_hosts:
2616 continue
2617 osd_reports = []
2618 for osd_report in raw_reports:
2619 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
2620 osd_reports.append(osd_report)
2621 previews_for_specs.update({host: osd_reports})
2622 return previews_for_specs
2623
2624 def _calc_daemon_deps(self,
2625 spec: Optional[ServiceSpec],
2626 daemon_type: str,
2627 daemon_id: str) -> List[str]:
2628
2629 def get_daemon_names(daemons: List[str]) -> List[str]:
2630 daemon_names = []
2631 for daemon_type in daemons:
2632 for dd in self.cache.get_daemons_by_type(daemon_type):
2633 daemon_names.append(dd.name())
2634 return daemon_names
2635
2636 alertmanager_user, alertmanager_password = self._get_alertmanager_credentials()
2637 prometheus_user, prometheus_password = self._get_prometheus_credentials()
2638
2639 deps = []
2640 if daemon_type == 'haproxy':
2641 # because cephadm creates new daemon instances whenever
2642 # port or ip changes, identifying daemons by name is
2643 # sufficient to detect changes.
2644 if not spec:
2645 return []
2646 ingress_spec = cast(IngressSpec, spec)
2647 assert ingress_spec.backend_service
2648 daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service)
2649 deps = [d.name() for d in daemons]
2650 elif daemon_type == 'keepalived':
2651 # because cephadm creates new daemon instances whenever
2652 # port or ip changes, identifying daemons by name is
2653 # sufficient to detect changes.
2654 if not spec:
2655 return []
2656 daemons = self.cache.get_daemons_by_service(spec.service_name())
2657 deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
2658 elif daemon_type == 'agent':
2659 root_cert = ''
2660 server_port = ''
2661 try:
2662 server_port = str(self.http_server.agent.server_port)
2663 root_cert = self.http_server.agent.ssl_certs.get_root_cert()
2664 except Exception:
2665 pass
2666 deps = sorted([self.get_mgr_ip(), server_port, root_cert,
2667 str(self.device_enhanced_scan)])
2668 elif daemon_type == 'iscsi':
2669 if spec:
2670 iscsi_spec = cast(IscsiServiceSpec, spec)
2671 deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)]
2672 else:
2673 deps = [self.get_mgr_ip()]
2674 elif daemon_type == 'prometheus':
2675 # for prometheus we add the active mgr as an explicit dependency,
2676 # this way we force a redeploy after a mgr failover
2677 deps.append(self.get_active_mgr().name())
2678 deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
2679 deps.append(str(self.service_discovery_port))
2680 # prometheus yaml configuration file (generated by prometheus.yml.j2) contains
2681 # a scrape_configs section for each service type. This should be included only
2682 # when at least one daemon of the corresponding service is running. Therefore,
2683 # an explicit dependency is added for each service-type to force a reconfig
2684 # whenever the number of daemons for those service-type changes from 0 to greater
2685 # than zero and vice versa.
2686 deps += [s for s in ['node-exporter', 'alertmanager']
2687 if self.cache.get_daemons_by_service(s)]
2688 if len(self.cache.get_daemons_by_type('ingress')) > 0:
2689 deps.append('ingress')
2690 # add dependency on ceph-exporter daemons
2691 deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')]
2692 if self.secure_monitoring_stack:
2693 if prometheus_user and prometheus_password:
2694 deps.append(f'{hash(prometheus_user + prometheus_password)}')
2695 if alertmanager_user and alertmanager_password:
2696 deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
2697 elif daemon_type == 'grafana':
2698 deps += get_daemon_names(['prometheus', 'loki'])
2699 if self.secure_monitoring_stack and prometheus_user and prometheus_password:
2700 deps.append(f'{hash(prometheus_user + prometheus_password)}')
2701 elif daemon_type == 'alertmanager':
2702 deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
2703 if self.secure_monitoring_stack and alertmanager_user and alertmanager_password:
2704 deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
2705 elif daemon_type == 'promtail':
2706 deps += get_daemon_names(['loki'])
2707 else:
2708 # TODO(redo): some error message!
2709 pass
2710
2711 if daemon_type in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']:
2712 deps.append(f'secure_monitoring_stack:{self.secure_monitoring_stack}')
2713
2714 return sorted(deps)
2715
2716 @forall_hosts
2717 def _remove_daemons(self, name: str, host: str) -> str:
2718 return CephadmServe(self)._remove_daemon(name, host)
2719
2720 def _check_pool_exists(self, pool: str, service_name: str) -> None:
2721 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
2722 if not self.rados.pool_exists(pool):
2723 raise OrchestratorError(f'Cannot find pool "{pool}" for '
2724 f'service {service_name}')
2725
2726 def _add_daemon(self,
2727 daemon_type: str,
2728 spec: ServiceSpec) -> List[str]:
2729 """
2730 Add (and place) a daemon. Require explicit host placement. Do not
2731 schedule, and do not apply the related scheduling limitations.
2732 """
2733 if spec.service_name() not in self.spec_store:
2734 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2735 'Please use `ceph orch apply ...` to create a Service.\n'
2736 'Note, you might want to create the service with "unmanaged=true"')
2737
2738 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2739 if not spec.placement.hosts:
2740 raise OrchestratorError('must specify host(s) to deploy on')
2741 count = spec.placement.count or len(spec.placement.hosts)
2742 daemons = self.cache.get_daemons_by_service(spec.service_name())
2743 return self._create_daemons(daemon_type, spec, daemons,
2744 spec.placement.hosts, count)
2745
2746 def _create_daemons(self,
2747 daemon_type: str,
2748 spec: ServiceSpec,
2749 daemons: List[DaemonDescription],
2750 hosts: List[HostPlacementSpec],
2751 count: int) -> List[str]:
2752 if count > len(hosts):
2753 raise OrchestratorError('too few hosts: want %d, have %s' % (
2754 count, hosts))
2755
2756 did_config = False
2757 service_type = daemon_type_to_service(daemon_type)
2758
2759 args = [] # type: List[CephadmDaemonDeploySpec]
2760 for host, network, name in hosts:
2761 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2762 prefix=spec.service_id,
2763 forcename=name)
2764
2765 if not did_config:
2766 self.cephadm_services[service_type].config(spec)
2767 did_config = True
2768
2769 daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
2770 host, daemon_id, network, spec,
2771 # NOTE: this does not consider port conflicts!
2772 ports=spec.get_port_start())
2773 self.log.debug('Placing %s.%s on host %s' % (
2774 daemon_type, daemon_id, host))
2775 args.append(daemon_spec)
2776
2777 # add to daemon list so next name(s) will also be unique
2778 sd = orchestrator.DaemonDescription(
2779 hostname=host,
2780 daemon_type=daemon_type,
2781 daemon_id=daemon_id,
2782 )
2783 daemons.append(sd)
2784
2785 @ forall_hosts
2786 def create_func_map(*args: Any) -> str:
2787 daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
2788 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2789 return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
2790
2791 return create_func_map(args)
2792
2793 @handle_orch_error
2794 def add_daemon(self, spec: ServiceSpec) -> List[str]:
2795 ret: List[str] = []
2796 try:
2797 with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True):
2798 for d_type in service_to_daemon_types(spec.service_type):
2799 ret.extend(self._add_daemon(d_type, spec))
2800 return ret
2801 except OrchestratorError as e:
2802 self.events.from_orch_error(e)
2803 raise
2804
2805 def _get_alertmanager_credentials(self) -> Tuple[str, str]:
2806 user = self.get_store(AlertmanagerService.USER_CFG_KEY)
2807 password = self.get_store(AlertmanagerService.PASS_CFG_KEY)
2808 if user is None or password is None:
2809 user = 'admin'
2810 password = 'admin'
2811 self.set_store(AlertmanagerService.USER_CFG_KEY, user)
2812 self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
2813 return (user, password)
2814
2815 def _get_prometheus_credentials(self) -> Tuple[str, str]:
2816 user = self.get_store(PrometheusService.USER_CFG_KEY)
2817 password = self.get_store(PrometheusService.PASS_CFG_KEY)
2818 if user is None or password is None:
2819 user = 'admin'
2820 password = 'admin'
2821 self.set_store(PrometheusService.USER_CFG_KEY, user)
2822 self.set_store(PrometheusService.PASS_CFG_KEY, password)
2823 return (user, password)
2824
2825 @handle_orch_error
2826 def set_prometheus_access_info(self, user: str, password: str) -> str:
2827 self.set_store(PrometheusService.USER_CFG_KEY, user)
2828 self.set_store(PrometheusService.PASS_CFG_KEY, password)
2829 return 'prometheus credentials updated correctly'
2830
2831 @handle_orch_error
2832 def set_alertmanager_access_info(self, user: str, password: str) -> str:
2833 self.set_store(AlertmanagerService.USER_CFG_KEY, user)
2834 self.set_store(AlertmanagerService.PASS_CFG_KEY, password)
2835 return 'alertmanager credentials updated correctly'
2836
2837 @handle_orch_error
2838 def get_prometheus_access_info(self) -> Dict[str, str]:
2839 user, password = self._get_prometheus_credentials()
2840 return {'user': user,
2841 'password': password,
2842 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
2843
2844 @handle_orch_error
2845 def get_alertmanager_access_info(self) -> Dict[str, str]:
2846 user, password = self._get_alertmanager_credentials()
2847 return {'user': user,
2848 'password': password,
2849 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
2850
2851 @handle_orch_error
2852 def apply_mon(self, spec: ServiceSpec) -> str:
2853 return self._apply(spec)
2854
2855 def _apply(self, spec: GenericSpec) -> str:
2856 if spec.service_type == 'host':
2857 return self._add_host(cast(HostSpec, spec))
2858
2859 if spec.service_type == 'osd':
2860 # _trigger preview refresh needs to be smart and
2861 # should only refresh if a change has been detected
2862 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
2863
2864 return self._apply_service_spec(cast(ServiceSpec, spec))
2865
2866 def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]:
2867 """Return a list of candidate hosts according to the placement specification."""
2868 all_hosts = self.cache.get_schedulable_hosts()
2869 candidates = []
2870 if placement.hosts:
2871 candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts]
2872 elif placement.label:
2873 candidates = [x.hostname for x in [h for h in all_hosts if placement.label in h.labels]]
2874 elif placement.host_pattern:
2875 candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)]
2876 elif (placement.count is not None or placement.count_per_host is not None):
2877 candidates = [x.hostname for x in all_hosts]
2878 return [h for h in candidates if not self.cache.is_host_draining(h)]
2879
2880 def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None:
2881 """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
2882 if spec.count is not None:
2883 raise OrchestratorError(
2884 "Placement 'count' field is no supported for this specification.")
2885 if spec.count_per_host is not None:
2886 raise OrchestratorError(
2887 "Placement 'count_per_host' field is no supported for this specification.")
2888 if spec.hosts:
2889 all_hosts = [h.hostname for h in self.inventory.all_specs()]
2890 invalid_hosts = [h.hostname for h in spec.hosts if h.hostname not in all_hosts]
2891 if invalid_hosts:
2892 raise OrchestratorError(f"Found invalid host(s) in placement section: {invalid_hosts}. "
2893 f"Please check 'ceph orch host ls' for available hosts.")
2894 elif not self._get_candidate_hosts(spec):
2895 raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n"
2896 "Please check 'ceph orch host ls' for available hosts.\n"
2897 "Note: draining hosts are excluded from the candidate list.")
2898
2899 def _validate_tunedprofile_settings(self, spec: TunedProfileSpec) -> Dict[str, List[str]]:
2900 candidate_hosts = spec.placement.filter_matching_hostspecs(self.inventory.all_specs())
2901 invalid_options: Dict[str, List[str]] = {}
2902 for host in candidate_hosts:
2903 host_sysctl_options = self.cache.get_facts(host).get('sysctl_options', {})
2904 invalid_options[host] = []
2905 for option in spec.settings:
2906 if option not in host_sysctl_options:
2907 invalid_options[host].append(option)
2908 return invalid_options
2909
2910 def _validate_tuned_profile_spec(self, spec: TunedProfileSpec) -> None:
2911 if not spec.settings:
2912 raise OrchestratorError("Invalid spec: settings section cannot be empty.")
2913 self._validate_one_shot_placement_spec(spec.placement)
2914 invalid_options = self._validate_tunedprofile_settings(spec)
2915 if any(e for e in invalid_options.values()):
2916 raise OrchestratorError(
2917 f'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}')
2918
2919 @handle_orch_error
2920 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str:
2921 outs = []
2922 for spec in specs:
2923 self._validate_tuned_profile_spec(spec)
2924 if no_overwrite and self.tuned_profiles.exists(spec.profile_name):
2925 outs.append(
2926 f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2927 else:
2928 # done, let's save the specs
2929 self.tuned_profiles.add_profile(spec)
2930 outs.append(f'Saved tuned profile {spec.profile_name}')
2931 self._kick_serve_loop()
2932 return '\n'.join(outs)
2933
2934 @handle_orch_error
2935 def rm_tuned_profile(self, profile_name: str) -> str:
2936 if profile_name not in self.tuned_profiles:
2937 raise OrchestratorError(
2938 f'Tuned profile {profile_name} does not exist. Nothing to remove.')
2939 self.tuned_profiles.rm_profile(profile_name)
2940 self._kick_serve_loop()
2941 return f'Removed tuned profile {profile_name}'
2942
2943 @handle_orch_error
2944 def tuned_profile_ls(self) -> List[TunedProfileSpec]:
2945 return self.tuned_profiles.list_profiles()
2946
2947 @handle_orch_error
2948 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str:
2949 if profile_name not in self.tuned_profiles:
2950 raise OrchestratorError(
2951 f'Tuned profile {profile_name} does not exist. Cannot add setting.')
2952 self.tuned_profiles.add_setting(profile_name, setting, value)
2953 self._kick_serve_loop()
2954 return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
2955
2956 @handle_orch_error
2957 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
2958 if profile_name not in self.tuned_profiles:
2959 raise OrchestratorError(
2960 f'Tuned profile {profile_name} does not exist. Cannot remove setting.')
2961 self.tuned_profiles.rm_setting(profile_name, setting)
2962 self._kick_serve_loop()
2963 return f'Removed setting {setting} from tuned profile {profile_name}'
2964
2965 @handle_orch_error
2966 def service_discovery_dump_cert(self) -> str:
2967 root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT)
2968 if not root_cert:
2969 raise OrchestratorError('No certificate found for service discovery')
2970 return root_cert
2971
2972 def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
2973 self.health_checks[name] = {
2974 'severity': 'warning',
2975 'summary': summary,
2976 'count': count,
2977 'detail': detail,
2978 }
2979 self.set_health_checks(self.health_checks)
2980
2981 def remove_health_warning(self, name: str) -> None:
2982 if name in self.health_checks:
2983 del self.health_checks[name]
2984 self.set_health_checks(self.health_checks)
2985
2986 def _plan(self, spec: ServiceSpec) -> dict:
2987 if spec.service_type == 'osd':
2988 return {'service_name': spec.service_name(),
2989 'service_type': spec.service_type,
2990 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
2991
2992 svc = self.cephadm_services[spec.service_type]
2993 ha = HostAssignment(
2994 spec=spec,
2995 hosts=self.cache.get_schedulable_hosts(),
2996 unreachable_hosts=self.cache.get_unreachable_hosts(),
2997 draining_hosts=self.cache.get_draining_hosts(),
2998 networks=self.cache.networks,
2999 daemons=self.cache.get_daemons_by_service(spec.service_name()),
3000 allow_colo=svc.allow_colo(),
3001 rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
3002 )
3003 ha.validate()
3004 hosts, to_add, to_remove = ha.place()
3005
3006 return {
3007 'service_name': spec.service_name(),
3008 'service_type': spec.service_type,
3009 'add': [hs.hostname for hs in to_add],
3010 'remove': [d.name() for d in to_remove]
3011 }
3012
3013 @handle_orch_error
3014 def plan(self, specs: Sequence[GenericSpec]) -> List:
3015 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
3016 'to the current inventory setup. If any of these conditions change, the \n'
3017 'preview will be invalid. Please make sure to have a minimal \n'
3018 'timeframe between planning and applying the specs.'}]
3019 if any([spec.service_type == 'host' for spec in specs]):
3020 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
3021 for spec in specs:
3022 results.append(self._plan(cast(ServiceSpec, spec)))
3023 return results
3024
3025 def _apply_service_spec(self, spec: ServiceSpec) -> str:
3026 if spec.placement.is_empty():
3027 # fill in default placement
3028 defaults = {
3029 'mon': PlacementSpec(count=5),
3030 'mgr': PlacementSpec(count=2),
3031 'mds': PlacementSpec(count=2),
3032 'rgw': PlacementSpec(count=2),
3033 'ingress': PlacementSpec(count=2),
3034 'iscsi': PlacementSpec(count=1),
3035 'nvmeof': PlacementSpec(count=1),
3036 'rbd-mirror': PlacementSpec(count=2),
3037 'cephfs-mirror': PlacementSpec(count=1),
3038 'nfs': PlacementSpec(count=1),
3039 'grafana': PlacementSpec(count=1),
3040 'alertmanager': PlacementSpec(count=1),
3041 'prometheus': PlacementSpec(count=1),
3042 'node-exporter': PlacementSpec(host_pattern='*'),
3043 'ceph-exporter': PlacementSpec(host_pattern='*'),
3044 'loki': PlacementSpec(count=1),
3045 'promtail': PlacementSpec(host_pattern='*'),
3046 'crash': PlacementSpec(host_pattern='*'),
3047 'container': PlacementSpec(count=1),
3048 'snmp-gateway': PlacementSpec(count=1),
3049 'elasticsearch': PlacementSpec(count=1),
3050 'jaeger-agent': PlacementSpec(host_pattern='*'),
3051 'jaeger-collector': PlacementSpec(count=1),
3052 'jaeger-query': PlacementSpec(count=1)
3053 }
3054 spec.placement = defaults[spec.service_type]
3055 elif spec.service_type in ['mon', 'mgr'] and \
3056 spec.placement.count is not None and \
3057 spec.placement.count < 1:
3058 raise OrchestratorError('cannot scale %s service below 1' % (
3059 spec.service_type))
3060
3061 host_count = len(self.inventory.keys())
3062 max_count = self.max_count_per_host
3063
3064 if spec.placement.count is not None:
3065 if spec.service_type in ['mon', 'mgr']:
3066 if spec.placement.count > max(5, host_count):
3067 raise OrchestratorError(
3068 (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
3069 elif spec.service_type != 'osd':
3070 if spec.placement.count > (max_count * host_count):
3071 raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
3072 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3073
3074 if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd':
3075 raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.'
3076 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3077
3078 HostAssignment(
3079 spec=spec,
3080 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
3081 unreachable_hosts=self.cache.get_unreachable_hosts(),
3082 draining_hosts=self.cache.get_draining_hosts(),
3083 networks=self.cache.networks,
3084 daemons=self.cache.get_daemons_by_service(spec.service_name()),
3085 allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
3086 ).validate()
3087
3088 self.log.info('Saving service %s spec with placement %s' % (
3089 spec.service_name(), spec.placement.pretty_str()))
3090 self.spec_store.save(spec)
3091 self._kick_serve_loop()
3092 return "Scheduled %s update..." % spec.service_name()
3093
3094 @handle_orch_error
3095 def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
3096 results = []
3097 for spec in specs:
3098 if no_overwrite:
3099 if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory:
3100 results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
3101 % (cast(HostSpec, spec).hostname, spec.service_type))
3102 continue
3103 elif cast(ServiceSpec, spec).service_name() in self.spec_store:
3104 results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
3105 % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
3106 continue
3107 results.append(self._apply(spec))
3108 return results
3109
3110 @handle_orch_error
3111 def apply_mgr(self, spec: ServiceSpec) -> str:
3112 return self._apply(spec)
3113
3114 @handle_orch_error
3115 def apply_mds(self, spec: ServiceSpec) -> str:
3116 return self._apply(spec)
3117
3118 @handle_orch_error
3119 def apply_rgw(self, spec: ServiceSpec) -> str:
3120 return self._apply(spec)
3121
3122 @handle_orch_error
3123 def apply_ingress(self, spec: ServiceSpec) -> str:
3124 return self._apply(spec)
3125
3126 @handle_orch_error
3127 def apply_iscsi(self, spec: ServiceSpec) -> str:
3128 return self._apply(spec)
3129
3130 @handle_orch_error
3131 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
3132 return self._apply(spec)
3133
3134 @handle_orch_error
3135 def apply_nfs(self, spec: ServiceSpec) -> str:
3136 return self._apply(spec)
3137
3138 def _get_dashboard_url(self):
3139 # type: () -> str
3140 return self.get('mgr_map').get('services', {}).get('dashboard', '')
3141
3142 @handle_orch_error
3143 def apply_prometheus(self, spec: ServiceSpec) -> str:
3144 return self._apply(spec)
3145
3146 @handle_orch_error
3147 def apply_loki(self, spec: ServiceSpec) -> str:
3148 return self._apply(spec)
3149
3150 @handle_orch_error
3151 def apply_promtail(self, spec: ServiceSpec) -> str:
3152 return self._apply(spec)
3153
3154 @handle_orch_error
3155 def apply_node_exporter(self, spec: ServiceSpec) -> str:
3156 return self._apply(spec)
3157
3158 @handle_orch_error
3159 def apply_ceph_exporter(self, spec: ServiceSpec) -> str:
3160 return self._apply(spec)
3161
3162 @handle_orch_error
3163 def apply_crash(self, spec: ServiceSpec) -> str:
3164 return self._apply(spec)
3165
3166 @handle_orch_error
3167 def apply_grafana(self, spec: ServiceSpec) -> str:
3168 return self._apply(spec)
3169
3170 @handle_orch_error
3171 def apply_alertmanager(self, spec: ServiceSpec) -> str:
3172 return self._apply(spec)
3173
3174 @handle_orch_error
3175 def apply_container(self, spec: ServiceSpec) -> str:
3176 return self._apply(spec)
3177
3178 @handle_orch_error
3179 def apply_snmp_gateway(self, spec: ServiceSpec) -> str:
3180 return self._apply(spec)
3181
3182 @handle_orch_error
3183 def set_unmanaged(self, service_name: str, value: bool) -> str:
3184 return self.spec_store.set_unmanaged(service_name, value)
3185
3186 @handle_orch_error
3187 def upgrade_check(self, image: str, version: str) -> str:
3188 if self.inventory.get_host_with_state("maintenance"):
3189 raise OrchestratorError("check aborted - you have hosts in maintenance state")
3190
3191 if version:
3192 target_name = self.container_image_base + ':v' + version
3193 elif image:
3194 target_name = image
3195 else:
3196 raise OrchestratorError('must specify either image or version')
3197
3198 with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'):
3199 image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
3200
3201 ceph_image_version = image_info.ceph_version
3202 if not ceph_image_version:
3203 return f'Unable to extract ceph version from {target_name}.'
3204 if ceph_image_version.startswith('ceph version '):
3205 ceph_image_version = ceph_image_version.split(' ')[2]
3206 version_error = self.upgrade._check_target_version(ceph_image_version)
3207 if version_error:
3208 return f'Incompatible upgrade: {version_error}'
3209
3210 self.log.debug(f'image info {image} -> {image_info}')
3211 r: dict = {
3212 'target_name': target_name,
3213 'target_id': image_info.image_id,
3214 'target_version': image_info.ceph_version,
3215 'needs_update': dict(),
3216 'up_to_date': list(),
3217 'non_ceph_image_daemons': list()
3218 }
3219 for host, dm in self.cache.daemons.items():
3220 for name, dd in dm.items():
3221 # check if the container digest for the digest we're checking upgrades for matches
3222 # the container digests for the daemon if "use_repo_digest" setting is true
3223 # or that the image name matches the daemon's image name if "use_repo_digest"
3224 # is false. The idea is to generally check if the daemon is already using
3225 # the image we're checking upgrade to.
3226 if (
3227 (self.use_repo_digest and dd.matches_digests(image_info.repo_digests))
3228 or (not self.use_repo_digest and dd.matches_image_name(image))
3229 ):
3230 r['up_to_date'].append(dd.name())
3231 elif dd.daemon_type in CEPH_IMAGE_TYPES:
3232 r['needs_update'][dd.name()] = {
3233 'current_name': dd.container_image_name,
3234 'current_id': dd.container_image_id,
3235 'current_version': dd.version,
3236 }
3237 else:
3238 r['non_ceph_image_daemons'].append(dd.name())
3239 if self.use_repo_digest and image_info.repo_digests:
3240 # FIXME: we assume the first digest is the best one to use
3241 r['target_digest'] = image_info.repo_digests[0]
3242
3243 return json.dumps(r, indent=4, sort_keys=True)
3244
3245 @handle_orch_error
3246 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
3247 return self.upgrade.upgrade_status()
3248
3249 @handle_orch_error
3250 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
3251 return self.upgrade.upgrade_ls(image, tags, show_all_versions)
3252
3253 @handle_orch_error
3254 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
3255 services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
3256 if self.inventory.get_host_with_state("maintenance"):
3257 raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
3258 if self.offline_hosts:
3259 raise OrchestratorError(
3260 f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
3261 if daemon_types is not None and services is not None:
3262 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
3263 if daemon_types is not None:
3264 for dtype in daemon_types:
3265 if dtype not in CEPH_UPGRADE_ORDER:
3266 raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
3267 f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
3268 if services is not None:
3269 for service in services:
3270 if service not in self.spec_store:
3271 raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
3272 f'Known services are: {self.spec_store.all_specs.keys()}')
3273 hosts: Optional[List[str]] = None
3274 if host_placement is not None:
3275 all_hosts = list(self.inventory.all_specs())
3276 placement = PlacementSpec.from_string(host_placement)
3277 hosts = placement.filter_matching_hostspecs(all_hosts)
3278 if not hosts:
3279 raise OrchestratorError(
3280 f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
3281
3282 if limit is not None:
3283 if limit < 1:
3284 raise OrchestratorError(
3285 f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
3286
3287 return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
3288
3289 @handle_orch_error
3290 def upgrade_pause(self) -> str:
3291 return self.upgrade.upgrade_pause()
3292
3293 @handle_orch_error
3294 def upgrade_resume(self) -> str:
3295 return self.upgrade.upgrade_resume()
3296
3297 @handle_orch_error
3298 def upgrade_stop(self) -> str:
3299 return self.upgrade.upgrade_stop()
3300
3301 @handle_orch_error
3302 def remove_osds(self, osd_ids: List[str],
3303 replace: bool = False,
3304 force: bool = False,
3305 zap: bool = False,
3306 no_destroy: bool = False) -> str:
3307 """
3308 Takes a list of OSDs and schedules them for removal.
3309 The function that takes care of the actual removal is
3310 process_removal_queue().
3311 """
3312
3313 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
3314 to_remove_daemons = list()
3315 for daemon in daemons:
3316 if daemon.daemon_id in osd_ids:
3317 to_remove_daemons.append(daemon)
3318 if not to_remove_daemons:
3319 return f"Unable to find OSDs: {osd_ids}"
3320
3321 for daemon in to_remove_daemons:
3322 assert daemon.daemon_id is not None
3323 try:
3324 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
3325 replace=replace,
3326 force=force,
3327 zap=zap,
3328 no_destroy=no_destroy,
3329 hostname=daemon.hostname,
3330 process_started_at=datetime_now(),
3331 remove_util=self.to_remove_osds.rm_util))
3332 except NotFoundError:
3333 return f"Unable to find OSDs: {osd_ids}"
3334
3335 # trigger the serve loop to initiate the removal
3336 self._kick_serve_loop()
3337 warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
3338 "Run the `ceph-volume lvm zap` command with `--destroy`"
3339 " against the VG/LV if you want them to be destroyed.")
3340 return f"Scheduled OSD(s) for removal.{warning_zap}"
3341
3342 @handle_orch_error
3343 def stop_remove_osds(self, osd_ids: List[str]) -> str:
3344 """
3345 Stops a `removal` process for a List of OSDs.
3346 This will revert their weight and remove it from the osds_to_remove queue
3347 """
3348 for osd_id in osd_ids:
3349 try:
3350 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
3351 remove_util=self.to_remove_osds.rm_util))
3352 except (NotFoundError, KeyError, ValueError):
3353 return f'Unable to find OSD in the queue: {osd_id}'
3354
3355 # trigger the serve loop to halt the removal
3356 self._kick_serve_loop()
3357 return "Stopped OSD(s) removal"
3358
3359 @handle_orch_error
3360 def remove_osds_status(self) -> List[OSD]:
3361 """
3362 The CLI call to retrieve an osd removal report
3363 """
3364 return self.to_remove_osds.all_osds()
3365
3366 @handle_orch_error
3367 def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str:
3368 """
3369 Drain all daemons from a host.
3370 :param host: host name
3371 """
3372
3373 # if we drain the last admin host we could end up removing the only instance
3374 # of the config and keyring and cause issues
3375 if not force:
3376 p = PlacementSpec(label=SpecialHostLabels.ADMIN)
3377 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
3378 if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
3379 raise OrchestratorValidationError(f"Host {hostname} is the last host with the '{SpecialHostLabels.ADMIN}'"
3380 " label.\nDraining this host could cause the removal"
3381 " of the last cluster config/keyring managed by cephadm.\n"
3382 f"It is recommended to add the {SpecialHostLabels.ADMIN} label to another host"
3383 " before completing this operation.\nIf you're certain this is"
3384 " what you want rerun this command with --force.")
3385
3386 self.add_host_label(hostname, '_no_schedule')
3387 if not keep_conf_keyring:
3388 self.add_host_label(hostname, SpecialHostLabels.DRAIN_CONF_KEYRING)
3389
3390 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
3391
3392 osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
3393 self.remove_osds(osds_to_remove, zap=zap_osd_devices)
3394
3395 daemons_table = ""
3396 daemons_table += "{:<20} {:<15}\n".format("type", "id")
3397 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
3398 for d in daemons:
3399 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
3400
3401 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table)
3402
3403 def trigger_connect_dashboard_rgw(self) -> None:
3404 self.need_connect_dashboard_rgw = True
3405 self.event.set()