]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
37fe3b4ac58f0756c1f192799ce3a7ff66e30ee5
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import asyncio
2 import json
3 import errno
4 import ipaddress
5 import logging
6 import re
7 import shlex
8 from collections import defaultdict
9 from configparser import ConfigParser
10 from contextlib import contextmanager
11 from functools import wraps
12 from tempfile import TemporaryDirectory, NamedTemporaryFile
13 from threading import Event
14
15 from cephadm.service_discovery import ServiceDiscovery
16
17 import string
18 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
19 Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, \
20 Awaitable, Iterator
21
22 import datetime
23 import os
24 import random
25 import multiprocessing.pool
26 import subprocess
27 from prettytable import PrettyTable
28
29 from ceph.deployment import inventory
30 from ceph.deployment.drive_group import DriveGroupSpec
31 from ceph.deployment.service_spec import \
32 ServiceSpec, PlacementSpec, \
33 HostPlacementSpec, IngressSpec, \
34 TunedProfileSpec, IscsiServiceSpec
35 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
36 from cephadm.serve import CephadmServe
37 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
38 from cephadm.http_server import CephadmHttpServer
39 from cephadm.agent import CephadmAgentHelpers
40
41
42 from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
43 import orchestrator
44 from orchestrator.module import to_format, Format
45
46 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
47 CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
48 service_to_daemon_types
49 from orchestrator._interface import GenericSpec
50 from orchestrator._interface import daemon_type_to_service
51
52 from . import utils
53 from . import ssh
54 from .migrations import Migrations
55 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
56 RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent, \
57 CephExporterService
58 from .services.ingress import IngressService
59 from .services.container import CustomContainerService
60 from .services.iscsi import IscsiService
61 from .services.nfs import NFSService
62 from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
63 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
64 NodeExporterService, SNMPGatewayService, LokiService, PromtailService
65 from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
66 from .schedule import HostAssignment
67 from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
68 ClientKeyringStore, ClientKeyringSpec, TunedProfileStore
69 from .upgrade import CephadmUpgrade
70 from .template import TemplateMgr
71 from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
72 cephadmNoImage, CEPH_UPGRADE_ORDER
73 from .configchecks import CephadmConfigChecks
74 from .offline_watcher import OfflineHostWatcher
75 from .tuned_profiles import TunedProfileUtils
76
77 try:
78 import asyncssh
79 except ImportError as e:
80 asyncssh = None # type: ignore
81 asyncssh_import_error = str(e)
82
83 logger = logging.getLogger(__name__)
84
85 T = TypeVar('T')
86
87 DEFAULT_SSH_CONFIG = """
88 Host *
89 User root
90 StrictHostKeyChecking no
91 UserKnownHostsFile /dev/null
92 ConnectTimeout=30
93 """
94
95 # cherrypy likes to sys.exit on error. don't let it take us down too!
96
97
98 def os_exit_noop(status: int) -> None:
99 pass
100
101
102 os._exit = os_exit_noop # type: ignore
103
104
105 # Default container images -----------------------------------------------------
106 DEFAULT_IMAGE = 'quay.io/ceph/ceph:v18'
107 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
108 DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
109 DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
110 DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
111 DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
112 DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
113 DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
114 DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
115 DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
116 DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
117 DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
118 DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29'
119 DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29'
120 # ------------------------------------------------------------------------------
121
122
123 def host_exists(hostname_position: int = 1) -> Callable:
124 """Check that a hostname exists in the inventory"""
125 def inner(func: Callable) -> Callable:
126 @wraps(func)
127 def wrapper(*args: Any, **kwargs: Any) -> Any:
128 this = args[0] # self object
129 hostname = args[hostname_position]
130 if hostname not in this.cache.get_hosts():
131 candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
132 help_msg = f"Did you mean {candidates}?" if candidates else ""
133 raise OrchestratorError(
134 f"Cannot find host '{hostname}' in the inventory. {help_msg}")
135
136 return func(*args, **kwargs)
137 return wrapper
138 return inner
139
140
141 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
142 metaclass=CLICommandMeta):
143
144 _STORE_HOST_PREFIX = "host"
145
146 instance = None
147 NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary]
148 NATIVE_OPTIONS = [] # type: List[Any]
149 MODULE_OPTIONS = [
150 Option(
151 'ssh_config_file',
152 type='str',
153 default=None,
154 desc='customized SSH config file to connect to managed hosts',
155 ),
156 Option(
157 'device_cache_timeout',
158 type='secs',
159 default=30 * 60,
160 desc='seconds to cache device inventory',
161 ),
162 Option(
163 'device_enhanced_scan',
164 type='bool',
165 default=False,
166 desc='Use libstoragemgmt during device scans',
167 ),
168 Option(
169 'daemon_cache_timeout',
170 type='secs',
171 default=10 * 60,
172 desc='seconds to cache service (daemon) inventory',
173 ),
174 Option(
175 'facts_cache_timeout',
176 type='secs',
177 default=1 * 60,
178 desc='seconds to cache host facts data',
179 ),
180 Option(
181 'host_check_interval',
182 type='secs',
183 default=10 * 60,
184 desc='how frequently to perform a host check',
185 ),
186 Option(
187 'mode',
188 type='str',
189 enum_allowed=['root', 'cephadm-package'],
190 default='root',
191 desc='mode for remote execution of cephadm',
192 ),
193 Option(
194 'container_image_base',
195 default=DEFAULT_IMAGE,
196 desc='Container image name, without the tag',
197 runtime=True,
198 ),
199 Option(
200 'container_image_prometheus',
201 default=DEFAULT_PROMETHEUS_IMAGE,
202 desc='Prometheus container image',
203 ),
204 Option(
205 'container_image_grafana',
206 default=DEFAULT_GRAFANA_IMAGE,
207 desc='Prometheus container image',
208 ),
209 Option(
210 'container_image_alertmanager',
211 default=DEFAULT_ALERT_MANAGER_IMAGE,
212 desc='Prometheus container image',
213 ),
214 Option(
215 'container_image_node_exporter',
216 default=DEFAULT_NODE_EXPORTER_IMAGE,
217 desc='Prometheus container image',
218 ),
219 Option(
220 'container_image_loki',
221 default=DEFAULT_LOKI_IMAGE,
222 desc='Loki container image',
223 ),
224 Option(
225 'container_image_promtail',
226 default=DEFAULT_PROMTAIL_IMAGE,
227 desc='Promtail container image',
228 ),
229 Option(
230 'container_image_haproxy',
231 default=DEFAULT_HAPROXY_IMAGE,
232 desc='HAproxy container image',
233 ),
234 Option(
235 'container_image_keepalived',
236 default=DEFAULT_KEEPALIVED_IMAGE,
237 desc='Keepalived container image',
238 ),
239 Option(
240 'container_image_snmp_gateway',
241 default=DEFAULT_SNMP_GATEWAY_IMAGE,
242 desc='SNMP Gateway container image',
243 ),
244 Option(
245 'container_image_elasticsearch',
246 default=DEFAULT_ELASTICSEARCH_IMAGE,
247 desc='elasticsearch container image',
248 ),
249 Option(
250 'container_image_jaeger_agent',
251 default=DEFAULT_JAEGER_AGENT_IMAGE,
252 desc='Jaeger agent container image',
253 ),
254 Option(
255 'container_image_jaeger_collector',
256 default=DEFAULT_JAEGER_COLLECTOR_IMAGE,
257 desc='Jaeger collector container image',
258 ),
259 Option(
260 'container_image_jaeger_query',
261 default=DEFAULT_JAEGER_QUERY_IMAGE,
262 desc='Jaeger query container image',
263 ),
264 Option(
265 'warn_on_stray_hosts',
266 type='bool',
267 default=True,
268 desc='raise a health warning if daemons are detected on a host '
269 'that is not managed by cephadm',
270 ),
271 Option(
272 'warn_on_stray_daemons',
273 type='bool',
274 default=True,
275 desc='raise a health warning if daemons are detected '
276 'that are not managed by cephadm',
277 ),
278 Option(
279 'warn_on_failed_host_check',
280 type='bool',
281 default=True,
282 desc='raise a health warning if the host check fails',
283 ),
284 Option(
285 'log_to_cluster',
286 type='bool',
287 default=True,
288 desc='log to the "cephadm" cluster log channel"',
289 ),
290 Option(
291 'allow_ptrace',
292 type='bool',
293 default=False,
294 desc='allow SYS_PTRACE capability on ceph containers',
295 long_desc='The SYS_PTRACE capability is needed to attach to a '
296 'process with gdb or strace. Enabling this options '
297 'can allow debugging daemons that encounter problems '
298 'at runtime.',
299 ),
300 Option(
301 'container_init',
302 type='bool',
303 default=True,
304 desc='Run podman/docker with `--init`'
305 ),
306 Option(
307 'prometheus_alerts_path',
308 type='str',
309 default='/etc/prometheus/ceph/ceph_default_alerts.yml',
310 desc='location of alerts to include in prometheus deployments',
311 ),
312 Option(
313 'migration_current',
314 type='int',
315 default=None,
316 desc='internal - do not modify',
317 # used to track spec and other data migrations.
318 ),
319 Option(
320 'config_dashboard',
321 type='bool',
322 default=True,
323 desc='manage configs like API endpoints in Dashboard.'
324 ),
325 Option(
326 'manage_etc_ceph_ceph_conf',
327 type='bool',
328 default=False,
329 desc='Manage and own /etc/ceph/ceph.conf on the hosts.',
330 ),
331 Option(
332 'manage_etc_ceph_ceph_conf_hosts',
333 type='str',
334 default='*',
335 desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
336 ),
337 # not used anymore
338 Option(
339 'registry_url',
340 type='str',
341 default=None,
342 desc='Registry url for login purposes. This is not the default registry'
343 ),
344 Option(
345 'registry_username',
346 type='str',
347 default=None,
348 desc='Custom repository username. Only used for logging into a registry.'
349 ),
350 Option(
351 'registry_password',
352 type='str',
353 default=None,
354 desc='Custom repository password. Only used for logging into a registry.'
355 ),
356 ####
357 Option(
358 'registry_insecure',
359 type='bool',
360 default=False,
361 desc='Registry is to be considered insecure (no TLS available). Only for development purposes.'
362 ),
363 Option(
364 'use_repo_digest',
365 type='bool',
366 default=True,
367 desc='Automatically convert image tags to image digest. Make sure all daemons use the same image',
368 ),
369 Option(
370 'config_checks_enabled',
371 type='bool',
372 default=False,
373 desc='Enable or disable the cephadm configuration analysis',
374 ),
375 Option(
376 'default_registry',
377 type='str',
378 default='docker.io',
379 desc='Search-registry to which we should normalize unqualified image names. '
380 'This is not the default registry',
381 ),
382 Option(
383 'max_count_per_host',
384 type='int',
385 default=10,
386 desc='max number of daemons per service per host',
387 ),
388 Option(
389 'autotune_memory_target_ratio',
390 type='float',
391 default=.7,
392 desc='ratio of total system memory to divide amongst autotuned daemons'
393 ),
394 Option(
395 'autotune_interval',
396 type='secs',
397 default=10 * 60,
398 desc='how frequently to autotune daemon memory'
399 ),
400 Option(
401 'use_agent',
402 type='bool',
403 default=False,
404 desc='Use cephadm agent on each host to gather and send metadata'
405 ),
406 Option(
407 'agent_refresh_rate',
408 type='secs',
409 default=20,
410 desc='How often agent on each host will try to gather and send metadata'
411 ),
412 Option(
413 'agent_starting_port',
414 type='int',
415 default=4721,
416 desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
417 ),
418 Option(
419 'agent_down_multiplier',
420 type='float',
421 default=3.0,
422 desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
423 ),
424 Option(
425 'max_osd_draining_count',
426 type='int',
427 default=10,
428 desc='max number of osds that will be drained simultaneously when osds are removed'
429 ),
430 Option(
431 'service_discovery_port',
432 type='int',
433 default=8765,
434 desc='cephadm service discovery port'
435 ),
436 Option(
437 'cgroups_split',
438 type='bool',
439 default=True,
440 desc='Pass --cgroups=split when cephadm creates containers (currently podman only)'
441 ),
442 Option(
443 'log_refresh_metadata',
444 type='bool',
445 default=False,
446 desc='Log all refresh metadata. Includes daemon, device, and host info collected regularly. Only has effect if logging at debug level'
447 ),
448 Option(
449 'prometheus_web_user',
450 type='str',
451 default='admin',
452 desc='Prometheus web user'
453 ),
454 Option(
455 'prometheus_web_password',
456 type='str',
457 default='admin',
458 desc='Prometheus web password'
459 ),
460 Option(
461 'alertmanager_web_user',
462 type='str',
463 default='admin',
464 desc='Alertmanager web user'
465 ),
466 Option(
467 'alertmanager_web_password',
468 type='str',
469 default='admin',
470 desc='Alertmanager web password'
471 ),
472 Option(
473 'secure_monitoring_stack',
474 type='bool',
475 default=False,
476 desc='Enable TLS security for all the monitoring stack daemons'
477 ),
478 Option(
479 'default_cephadm_command_timeout',
480 type='secs',
481 default=15 * 60,
482 desc='Default timeout applied to cephadm commands run directly on '
483 'the host (in seconds)'
484 ),
485 ]
486
487 def __init__(self, *args: Any, **kwargs: Any):
488 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
489 self._cluster_fsid: str = self.get('mon_map')['fsid']
490 self.last_monmap: Optional[datetime.datetime] = None
491
492 # for serve()
493 self.run = True
494 self.event = Event()
495
496 self.ssh = ssh.SSHManager(self)
497
498 if self.get_store('pause'):
499 self.paused = True
500 else:
501 self.paused = False
502
503 # for mypy which does not run the code
504 if TYPE_CHECKING:
505 self.ssh_config_file = None # type: Optional[str]
506 self.device_cache_timeout = 0
507 self.daemon_cache_timeout = 0
508 self.facts_cache_timeout = 0
509 self.host_check_interval = 0
510 self.max_count_per_host = 0
511 self.mode = ''
512 self.container_image_base = ''
513 self.container_image_prometheus = ''
514 self.container_image_grafana = ''
515 self.container_image_alertmanager = ''
516 self.container_image_node_exporter = ''
517 self.container_image_loki = ''
518 self.container_image_promtail = ''
519 self.container_image_haproxy = ''
520 self.container_image_keepalived = ''
521 self.container_image_snmp_gateway = ''
522 self.container_image_elasticsearch = ''
523 self.container_image_jaeger_agent = ''
524 self.container_image_jaeger_collector = ''
525 self.container_image_jaeger_query = ''
526 self.warn_on_stray_hosts = True
527 self.warn_on_stray_daemons = True
528 self.warn_on_failed_host_check = True
529 self.allow_ptrace = False
530 self.container_init = True
531 self.prometheus_alerts_path = ''
532 self.migration_current: Optional[int] = None
533 self.config_dashboard = True
534 self.manage_etc_ceph_ceph_conf = True
535 self.manage_etc_ceph_ceph_conf_hosts = '*'
536 self.registry_url: Optional[str] = None
537 self.registry_username: Optional[str] = None
538 self.registry_password: Optional[str] = None
539 self.registry_insecure: bool = False
540 self.use_repo_digest = True
541 self.default_registry = ''
542 self.autotune_memory_target_ratio = 0.0
543 self.autotune_interval = 0
544 self.ssh_user: Optional[str] = None
545 self._ssh_options: Optional[str] = None
546 self.tkey = NamedTemporaryFile()
547 self.ssh_config_fname: Optional[str] = None
548 self.ssh_config: Optional[str] = None
549 self._temp_files: List = []
550 self.ssh_key: Optional[str] = None
551 self.ssh_pub: Optional[str] = None
552 self.use_agent = False
553 self.agent_refresh_rate = 0
554 self.agent_down_multiplier = 0.0
555 self.agent_starting_port = 0
556 self.service_discovery_port = 0
557 self.secure_monitoring_stack = False
558 self.prometheus_web_password: Optional[str] = None
559 self.prometheus_web_user: Optional[str] = None
560 self.alertmanager_web_password: Optional[str] = None
561 self.alertmanager_web_user: Optional[str] = None
562 self.apply_spec_fails: List[Tuple[str, str]] = []
563 self.max_osd_draining_count = 10
564 self.device_enhanced_scan = False
565 self.cgroups_split = True
566 self.log_refresh_metadata = False
567 self.default_cephadm_command_timeout = 0
568
569 self.notify(NotifyType.mon_map, None)
570 self.config_notify()
571
572 path = self.get_ceph_option('cephadm_path')
573 try:
574 assert isinstance(path, str)
575 with open(path, 'rb') as f:
576 self._cephadm = f.read()
577 except (IOError, TypeError) as e:
578 raise RuntimeError("unable to read cephadm at '%s': %s" % (
579 path, str(e)))
580
581 self.cephadm_binary_path = self._get_cephadm_binary_path()
582
583 self._worker_pool = multiprocessing.pool.ThreadPool(10)
584
585 self.ssh._reconfig_ssh()
586
587 CephadmOrchestrator.instance = self
588
589 self.upgrade = CephadmUpgrade(self)
590
591 self.health_checks: Dict[str, dict] = {}
592
593 self.inventory = Inventory(self)
594
595 self.cache = HostCache(self)
596 self.cache.load()
597
598 self.agent_cache = AgentCache(self)
599 self.agent_cache.load()
600
601 self.to_remove_osds = OSDRemovalQueue(self)
602 self.to_remove_osds.load_from_store()
603
604 self.spec_store = SpecStore(self)
605 self.spec_store.load()
606
607 self.keys = ClientKeyringStore(self)
608 self.keys.load()
609
610 self.tuned_profiles = TunedProfileStore(self)
611 self.tuned_profiles.load()
612
613 self.tuned_profile_utils = TunedProfileUtils(self)
614
615 # ensure the host lists are in sync
616 for h in self.inventory.keys():
617 if h not in self.cache.daemons:
618 self.cache.prime_empty_host(h)
619 for h in self.cache.get_hosts():
620 if h not in self.inventory:
621 self.cache.rm_host(h)
622
623 # in-memory only.
624 self.events = EventStore(self)
625 self.offline_hosts: Set[str] = set()
626
627 self.migration = Migrations(self)
628
629 _service_classes: Sequence[Type[CephadmService]] = [
630 OSDService, NFSService, MonService, MgrService, MdsService,
631 RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
632 PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
633 IngressService, CustomContainerService, CephfsMirrorService,
634 CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
635 JaegerQueryService, JaegerAgentService, JaegerCollectorService
636 ]
637
638 # https://github.com/python/mypy/issues/8993
639 self.cephadm_services: Dict[str, CephadmService] = {
640 cls.TYPE: cls(self) for cls in _service_classes} # type: ignore
641
642 self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
643 self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
644 self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
645
646 self.scheduled_async_actions: List[Callable] = []
647
648 self.template = TemplateMgr(self)
649
650 self.requires_post_actions: Set[str] = set()
651 self.need_connect_dashboard_rgw = False
652
653 self.config_checker = CephadmConfigChecks(self)
654
655 self.http_server = CephadmHttpServer(self)
656 self.http_server.start()
657 self.agent_helpers = CephadmAgentHelpers(self)
658 if self.use_agent:
659 self.agent_helpers._apply_agent()
660
661 self.offline_watcher = OfflineHostWatcher(self)
662 self.offline_watcher.start()
663
664 def shutdown(self) -> None:
665 self.log.debug('shutdown')
666 self._worker_pool.close()
667 self._worker_pool.join()
668 self.http_server.shutdown()
669 self.offline_watcher.shutdown()
670 self.run = False
671 self.event.set()
672
673 def _get_cephadm_service(self, service_type: str) -> CephadmService:
674 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
675 return self.cephadm_services[service_type]
676
677 def _get_cephadm_binary_path(self) -> str:
678 import hashlib
679 m = hashlib.sha256()
680 m.update(self._cephadm)
681 return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
682
683 def _kick_serve_loop(self) -> None:
684 self.log.debug('_kick_serve_loop')
685 self.event.set()
686
687 def serve(self) -> None:
688 """
689 The main loop of cephadm.
690
691 A command handler will typically change the declarative state
692 of cephadm. This loop will then attempt to apply this new state.
693 """
694 # for ssh in serve
695 self.event_loop = ssh.EventLoopThread()
696
697 serve = CephadmServe(self)
698 serve.serve()
699
700 def wait_async(self, coro: Awaitable[T], timeout: Optional[int] = None) -> T:
701 if not timeout:
702 timeout = self.default_cephadm_command_timeout
703 # put a lower bound of 60 seconds in case users
704 # accidentally set it to something unreasonable.
705 # For example if they though it was in minutes
706 # rather than seconds
707 if timeout < 60:
708 self.log.info(f'Found default timeout set to {timeout}. Instead trying minimum of 60.')
709 timeout = 60
710 return self.event_loop.get_result(coro, timeout)
711
712 @contextmanager
713 def async_timeout_handler(self, host: Optional[str] = '',
714 cmd: Optional[str] = '',
715 timeout: Optional[int] = None) -> Iterator[None]:
716 # this is meant to catch asyncio.TimeoutError and convert it into an
717 # OrchestratorError which much of the cephadm codebase is better equipped to handle.
718 # If the command being run, the host it is run on, or the timeout being used
719 # are provided, that will be included in the OrchestratorError's message
720 try:
721 yield
722 except asyncio.TimeoutError:
723 err_str: str = ''
724 if cmd:
725 err_str = f'Command "{cmd}" timed out '
726 else:
727 err_str = 'Command timed out '
728 if host:
729 err_str += f'on host {host} '
730 if timeout:
731 err_str += f'(non-default {timeout} second timeout)'
732 else:
733 err_str += (f'(default {self.default_cephadm_command_timeout} second timeout)')
734 raise OrchestratorError(err_str)
735
736 def set_container_image(self, entity: str, image: str) -> None:
737 self.check_mon_command({
738 'prefix': 'config set',
739 'name': 'container_image',
740 'value': image,
741 'who': entity,
742 })
743
744 def config_notify(self) -> None:
745 """
746 This method is called whenever one of our config options is changed.
747
748 TODO: this method should be moved into mgr_module.py
749 """
750 for opt in self.MODULE_OPTIONS:
751 setattr(self,
752 opt['name'], # type: ignore
753 self.get_module_option(opt['name'])) # type: ignore
754 self.log.debug(' mgr option %s = %s',
755 opt['name'], getattr(self, opt['name'])) # type: ignore
756 for opt in self.NATIVE_OPTIONS:
757 setattr(self,
758 opt, # type: ignore
759 self.get_ceph_option(opt))
760 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
761
762 self.event.set()
763
764 def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
765 if notify_type == NotifyType.mon_map:
766 # get monmap mtime so we can refresh configs when mons change
767 monmap = self.get('mon_map')
768 self.last_monmap = str_to_datetime(monmap['modified'])
769 if self.last_monmap and self.last_monmap > datetime_now():
770 self.last_monmap = None # just in case clocks are skewed
771 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
772 # getattr, due to notify() being called before config_notify()
773 self._kick_serve_loop()
774 if notify_type == NotifyType.pg_summary:
775 self._trigger_osd_removal()
776
777 def _trigger_osd_removal(self) -> None:
778 remove_queue = self.to_remove_osds.as_osd_ids()
779 if not remove_queue:
780 return
781 data = self.get("osd_stats")
782 for osd in data.get('osd_stats', []):
783 if osd.get('num_pgs') == 0:
784 # if _ANY_ osd that is currently in the queue appears to be empty,
785 # start the removal process
786 if int(osd.get('osd')) in remove_queue:
787 self.log.debug('Found empty osd. Starting removal process')
788 # if the osd that is now empty is also part of the removal queue
789 # start the process
790 self._kick_serve_loop()
791
792 def pause(self) -> None:
793 if not self.paused:
794 self.log.info('Paused')
795 self.set_store('pause', 'true')
796 self.paused = True
797 # wake loop so we update the health status
798 self._kick_serve_loop()
799
800 def resume(self) -> None:
801 if self.paused:
802 self.log.info('Resumed')
803 self.paused = False
804 self.set_store('pause', None)
805 # unconditionally wake loop so that 'orch resume' can be used to kick
806 # cephadm
807 self._kick_serve_loop()
808
809 def get_unique_name(
810 self,
811 daemon_type: str,
812 host: str,
813 existing: List[orchestrator.DaemonDescription],
814 prefix: Optional[str] = None,
815 forcename: Optional[str] = None,
816 rank: Optional[int] = None,
817 rank_generation: Optional[int] = None,
818 ) -> str:
819 """
820 Generate a unique random service name
821 """
822 suffix = daemon_type not in [
823 'mon', 'crash', 'ceph-exporter',
824 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
825 'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
826 'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
827 ]
828 if forcename:
829 if len([d for d in existing if d.daemon_id == forcename]):
830 raise orchestrator.OrchestratorValidationError(
831 f'name {daemon_type}.{forcename} already in use')
832 return forcename
833
834 if '.' in host:
835 host = host.split('.')[0]
836 while True:
837 if prefix:
838 name = prefix + '.'
839 else:
840 name = ''
841 if rank is not None and rank_generation is not None:
842 name += f'{rank}.{rank_generation}.'
843 name += host
844 if suffix:
845 name += '.' + ''.join(random.choice(string.ascii_lowercase)
846 for _ in range(6))
847 if len([d for d in existing if d.daemon_id == name]):
848 if not suffix:
849 raise orchestrator.OrchestratorValidationError(
850 f'name {daemon_type}.{name} already in use')
851 self.log.debug('name %s exists, trying again', name)
852 continue
853 return name
854
855 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
856 if ssh_config is None or len(ssh_config.strip()) == 0:
857 raise OrchestratorValidationError('ssh_config cannot be empty')
858 # StrictHostKeyChecking is [yes|no] ?
859 res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
860 if not res:
861 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
862 for s in res:
863 if 'ask' in s.lower():
864 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
865
866 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
867 if not os.path.isfile(ssh_config_fname):
868 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
869 ssh_config_fname))
870
871 def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
872 dm = {}
873 for d in ls:
874 if not d['style'].startswith('cephadm'):
875 continue
876 if d['fsid'] != self._cluster_fsid:
877 continue
878 if '.' not in d['name']:
879 continue
880 sd = orchestrator.DaemonDescription()
881 sd.last_refresh = datetime_now()
882 for k in ['created', 'started', 'last_configured', 'last_deployed']:
883 v = d.get(k, None)
884 if v:
885 setattr(sd, k, str_to_datetime(d[k]))
886 sd.daemon_type = d['name'].split('.')[0]
887 if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
888 logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
889 continue
890
891 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
892 sd.hostname = host
893 sd.container_id = d.get('container_id')
894 if sd.container_id:
895 # shorten the hash
896 sd.container_id = sd.container_id[0:12]
897 sd.container_image_name = d.get('container_image_name')
898 sd.container_image_id = d.get('container_image_id')
899 sd.container_image_digests = d.get('container_image_digests')
900 sd.memory_usage = d.get('memory_usage')
901 sd.memory_request = d.get('memory_request')
902 sd.memory_limit = d.get('memory_limit')
903 sd.cpu_percentage = d.get('cpu_percentage')
904 sd._service_name = d.get('service_name')
905 sd.deployed_by = d.get('deployed_by')
906 sd.version = d.get('version')
907 sd.ports = d.get('ports')
908 sd.ip = d.get('ip')
909 sd.rank = int(d['rank']) if d.get('rank') is not None else None
910 sd.rank_generation = int(d['rank_generation']) if d.get(
911 'rank_generation') is not None else None
912 sd.extra_container_args = d.get('extra_container_args')
913 sd.extra_entrypoint_args = d.get('extra_entrypoint_args')
914 if 'state' in d:
915 sd.status_desc = d['state']
916 sd.status = {
917 'running': DaemonDescriptionStatus.running,
918 'stopped': DaemonDescriptionStatus.stopped,
919 'error': DaemonDescriptionStatus.error,
920 'unknown': DaemonDescriptionStatus.error,
921 }[d['state']]
922 else:
923 sd.status_desc = 'unknown'
924 sd.status = None
925 dm[sd.name()] = sd
926 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
927 self.cache.update_host_daemons(host, dm)
928 self.cache.save_host(host)
929 return None
930
931 def update_watched_hosts(self) -> None:
932 # currently, we are watching hosts with nfs daemons
933 hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
934 ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
935 self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
936
937 def offline_hosts_remove(self, host: str) -> None:
938 if host in self.offline_hosts:
939 self.offline_hosts.remove(host)
940
941 def update_failed_daemon_health_check(self) -> None:
942 failed_daemons = []
943 for dd in self.cache.get_error_daemons():
944 if dd.daemon_type != 'agent': # agents tracked by CEPHADM_AGENT_DOWN
945 failed_daemons.append('daemon %s on %s is in %s state' % (
946 dd.name(), dd.hostname, dd.status_desc
947 ))
948 self.remove_health_warning('CEPHADM_FAILED_DAEMON')
949 if failed_daemons:
950 self.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(
951 failed_daemons), failed_daemons)
952
953 @staticmethod
954 def can_run() -> Tuple[bool, str]:
955 if asyncssh is not None:
956 return True, ""
957 else:
958 return False, "loading asyncssh library:{}".format(
959 asyncssh_import_error)
960
961 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
962 """
963 The cephadm orchestrator is always available.
964 """
965 ok, err = self.can_run()
966 if not ok:
967 return ok, err, {}
968 if not self.ssh_key or not self.ssh_pub:
969 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
970
971 # mypy is unable to determine type for _processes since it's private
972 worker_count: int = self._worker_pool._processes # type: ignore
973 ret = {
974 "workers": worker_count,
975 "paused": self.paused,
976 }
977
978 return True, err, ret
979
980 def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
981 self.set_store(what, new)
982 self.ssh._reconfig_ssh()
983 if self.cache.get_hosts():
984 # Can't check anything without hosts
985 host = self.cache.get_hosts()[0]
986 r = CephadmServe(self)._check_host(host)
987 if r is not None:
988 # connection failed reset user
989 self.set_store(what, old)
990 self.ssh._reconfig_ssh()
991 raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
992 self.log.info(f'Set ssh {what}')
993
994 @orchestrator._cli_write_command(
995 prefix='cephadm set-ssh-config')
996 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
997 """
998 Set the ssh_config file (use -i <ssh_config>)
999 """
1000 # Set an ssh_config file provided from stdin
1001
1002 old = self.ssh_config
1003 if inbuf == old:
1004 return 0, "value unchanged", ""
1005 self.validate_ssh_config_content(inbuf)
1006 self._validate_and_set_ssh_val('ssh_config', inbuf, old)
1007 return 0, "", ""
1008
1009 @orchestrator._cli_write_command('cephadm clear-ssh-config')
1010 def _clear_ssh_config(self) -> Tuple[int, str, str]:
1011 """
1012 Clear the ssh_config file
1013 """
1014 # Clear the ssh_config file provided from stdin
1015 self.set_store("ssh_config", None)
1016 self.ssh_config_tmp = None
1017 self.log.info('Cleared ssh_config')
1018 self.ssh._reconfig_ssh()
1019 return 0, "", ""
1020
1021 @orchestrator._cli_read_command('cephadm get-ssh-config')
1022 def _get_ssh_config(self) -> HandleCommandResult:
1023 """
1024 Returns the ssh config as used by cephadm
1025 """
1026 if self.ssh_config_file:
1027 self.validate_ssh_config_fname(self.ssh_config_file)
1028 with open(self.ssh_config_file) as f:
1029 return HandleCommandResult(stdout=f.read())
1030 ssh_config = self.get_store("ssh_config")
1031 if ssh_config:
1032 return HandleCommandResult(stdout=ssh_config)
1033 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
1034
1035 @orchestrator._cli_write_command('cephadm generate-key')
1036 def _generate_key(self) -> Tuple[int, str, str]:
1037 """
1038 Generate a cluster SSH key (if not present)
1039 """
1040 if not self.ssh_pub or not self.ssh_key:
1041 self.log.info('Generating ssh key...')
1042 tmp_dir = TemporaryDirectory()
1043 path = tmp_dir.name + '/key'
1044 try:
1045 subprocess.check_call([
1046 '/usr/bin/ssh-keygen',
1047 '-C', 'ceph-%s' % self._cluster_fsid,
1048 '-N', '',
1049 '-f', path
1050 ])
1051 with open(path, 'r') as f:
1052 secret = f.read()
1053 with open(path + '.pub', 'r') as f:
1054 pub = f.read()
1055 finally:
1056 os.unlink(path)
1057 os.unlink(path + '.pub')
1058 tmp_dir.cleanup()
1059 self.set_store('ssh_identity_key', secret)
1060 self.set_store('ssh_identity_pub', pub)
1061 self.ssh._reconfig_ssh()
1062 return 0, '', ''
1063
1064 @orchestrator._cli_write_command(
1065 'cephadm set-priv-key')
1066 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1067 """Set cluster SSH private key (use -i <private_key>)"""
1068 if inbuf is None or len(inbuf) == 0:
1069 return -errno.EINVAL, "", "empty private ssh key provided"
1070 old = self.ssh_key
1071 if inbuf == old:
1072 return 0, "value unchanged", ""
1073 self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old)
1074 self.log.info('Set ssh private key')
1075 return 0, "", ""
1076
1077 @orchestrator._cli_write_command(
1078 'cephadm set-pub-key')
1079 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1080 """Set cluster SSH public key (use -i <public_key>)"""
1081 if inbuf is None or len(inbuf) == 0:
1082 return -errno.EINVAL, "", "empty public ssh key provided"
1083 old = self.ssh_pub
1084 if inbuf == old:
1085 return 0, "value unchanged", ""
1086 self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
1087 return 0, "", ""
1088
1089 @orchestrator._cli_write_command(
1090 'cephadm clear-key')
1091 def _clear_key(self) -> Tuple[int, str, str]:
1092 """Clear cluster SSH key"""
1093 self.set_store('ssh_identity_key', None)
1094 self.set_store('ssh_identity_pub', None)
1095 self.ssh._reconfig_ssh()
1096 self.log.info('Cleared cluster SSH key')
1097 return 0, '', ''
1098
1099 @orchestrator._cli_read_command(
1100 'cephadm get-pub-key')
1101 def _get_pub_key(self) -> Tuple[int, str, str]:
1102 """Show SSH public key for connecting to cluster hosts"""
1103 if self.ssh_pub:
1104 return 0, self.ssh_pub, ''
1105 else:
1106 return -errno.ENOENT, '', 'No cluster SSH key defined'
1107
1108 @orchestrator._cli_read_command(
1109 'cephadm get-user')
1110 def _get_user(self) -> Tuple[int, str, str]:
1111 """
1112 Show user for SSHing to cluster hosts
1113 """
1114 if self.ssh_user is None:
1115 return -errno.ENOENT, '', 'No cluster SSH user configured'
1116 else:
1117 return 0, self.ssh_user, ''
1118
1119 @orchestrator._cli_read_command(
1120 'cephadm set-user')
1121 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
1122 """
1123 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
1124 """
1125 current_user = self.ssh_user
1126 if user == current_user:
1127 return 0, "value unchanged", ""
1128
1129 self._validate_and_set_ssh_val('ssh_user', user, current_user)
1130 current_ssh_config = self._get_ssh_config()
1131 new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout)
1132 self._set_ssh_config(new_ssh_config)
1133
1134 msg = 'ssh user set to %s' % user
1135 if user != 'root':
1136 msg += '. sudo will be used'
1137 self.log.info(msg)
1138 return 0, msg, ''
1139
1140 @orchestrator._cli_read_command(
1141 'cephadm registry-login')
1142 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1143 """
1144 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1145 """
1146 # if password not given in command line, get it through file input
1147 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
1148 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
1149 "or -i <login credentials json file>")
1150 elif (url and username and password):
1151 registry_json = {'url': url, 'username': username, 'password': password}
1152 else:
1153 assert isinstance(inbuf, str)
1154 registry_json = json.loads(inbuf)
1155 if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json:
1156 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
1157 "Please setup json file as\n"
1158 "{\n"
1159 " \"url\": \"REGISTRY_URL\",\n"
1160 " \"username\": \"REGISTRY_USERNAME\",\n"
1161 " \"password\": \"REGISTRY_PASSWORD\"\n"
1162 "}\n")
1163
1164 # verify login info works by attempting login on random host
1165 host = None
1166 for host_name in self.inventory.keys():
1167 host = host_name
1168 break
1169 if not host:
1170 raise OrchestratorError('no hosts defined')
1171 with self.async_timeout_handler(host, 'cephadm registry-login'):
1172 r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
1173 if r is not None:
1174 return 1, '', r
1175 # if logins succeeded, store info
1176 self.log.debug("Host logins successful. Storing login info.")
1177 self.set_store('registry_credentials', json.dumps(registry_json))
1178 # distribute new login info to all hosts
1179 self.cache.distribute_new_registry_login_info()
1180 return 0, "registry login scheduled", ''
1181
1182 @orchestrator._cli_read_command('cephadm check-host')
1183 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
1184 """Check whether we can access and manage a remote host"""
1185 try:
1186 with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
1187 out, err, code = self.wait_async(
1188 CephadmServe(self)._run_cephadm(
1189 host, cephadmNoImage, 'check-host', ['--expect-hostname', host],
1190 addr=addr, error_ok=True, no_fsid=True))
1191 if code:
1192 return 1, '', ('check-host failed:\n' + '\n'.join(err))
1193 except ssh.HostConnectionError as e:
1194 self.log.exception(f"check-host failed for '{host}' at addr ({e.addr}) due to connection failure: {str(e)}")
1195 return 1, '', ('check-host failed:\n'
1196 + f"Failed to connect to {host} at address ({e.addr}): {str(e)}")
1197 except OrchestratorError:
1198 self.log.exception(f"check-host failed for '{host}'")
1199 return 1, '', ('check-host failed:\n'
1200 + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
1201 # if we have an outstanding health alert for this host, give the
1202 # serve thread a kick
1203 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1204 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1205 if item.startswith('host %s ' % host):
1206 self.event.set()
1207 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
1208
1209 @orchestrator._cli_read_command(
1210 'cephadm prepare-host')
1211 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
1212 """Prepare a remote host for use with cephadm"""
1213 with self.async_timeout_handler(host, 'cephadm prepare-host'):
1214 out, err, code = self.wait_async(
1215 CephadmServe(self)._run_cephadm(
1216 host, cephadmNoImage, 'prepare-host', ['--expect-hostname', host],
1217 addr=addr, error_ok=True, no_fsid=True))
1218 if code:
1219 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
1220 # if we have an outstanding health alert for this host, give the
1221 # serve thread a kick
1222 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1223 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1224 if item.startswith('host %s ' % host):
1225 self.event.set()
1226 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
1227
1228 @orchestrator._cli_write_command(
1229 prefix='cephadm set-extra-ceph-conf')
1230 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
1231 """
1232 Text that is appended to all daemon's ceph.conf.
1233 Mainly a workaround, till `config generate-minimal-conf` generates
1234 a complete ceph.conf.
1235
1236 Warning: this is a dangerous operation.
1237 """
1238 if inbuf:
1239 # sanity check.
1240 cp = ConfigParser()
1241 cp.read_string(inbuf, source='<infile>')
1242
1243 self.set_store("extra_ceph_conf", json.dumps({
1244 'conf': inbuf,
1245 'last_modified': datetime_to_str(datetime_now())
1246 }))
1247 self.log.info('Set extra_ceph_conf')
1248 self._kick_serve_loop()
1249 return HandleCommandResult()
1250
1251 @orchestrator._cli_read_command(
1252 'cephadm get-extra-ceph-conf')
1253 def _get_extra_ceph_conf(self) -> HandleCommandResult:
1254 """
1255 Get extra ceph conf that is appended
1256 """
1257 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
1258
1259 @orchestrator._cli_read_command('cephadm config-check ls')
1260 def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
1261 """List the available configuration checks and their current state"""
1262
1263 if format not in [Format.plain, Format.json, Format.json_pretty]:
1264 return HandleCommandResult(
1265 retval=1,
1266 stderr="Requested format is not supported when listing configuration checks"
1267 )
1268
1269 if format in [Format.json, Format.json_pretty]:
1270 return HandleCommandResult(
1271 stdout=to_format(self.config_checker.health_checks,
1272 format,
1273 many=True,
1274 cls=None))
1275
1276 # plain formatting
1277 table = PrettyTable(
1278 ['NAME',
1279 'HEALTHCHECK',
1280 'STATUS',
1281 'DESCRIPTION'
1282 ], border=False)
1283 table.align['NAME'] = 'l'
1284 table.align['HEALTHCHECK'] = 'l'
1285 table.align['STATUS'] = 'l'
1286 table.align['DESCRIPTION'] = 'l'
1287 table.left_padding_width = 0
1288 table.right_padding_width = 2
1289 for c in self.config_checker.health_checks:
1290 table.add_row((
1291 c.name,
1292 c.healthcheck_name,
1293 c.status,
1294 c.description,
1295 ))
1296
1297 return HandleCommandResult(stdout=table.get_string())
1298
1299 @orchestrator._cli_read_command('cephadm config-check status')
1300 def _config_check_status(self) -> HandleCommandResult:
1301 """Show whether the configuration checker feature is enabled/disabled"""
1302 status = self.get_module_option('config_checks_enabled')
1303 return HandleCommandResult(stdout="Enabled" if status else "Disabled")
1304
1305 @orchestrator._cli_write_command('cephadm config-check enable')
1306 def _config_check_enable(self, check_name: str) -> HandleCommandResult:
1307 """Enable a specific configuration check"""
1308 if not self._config_check_valid(check_name):
1309 return HandleCommandResult(retval=1, stderr="Invalid check name")
1310
1311 err, msg = self._update_config_check(check_name, 'enabled')
1312 if err:
1313 return HandleCommandResult(
1314 retval=err,
1315 stderr=f"Failed to enable check '{check_name}' : {msg}")
1316
1317 return HandleCommandResult(stdout="ok")
1318
1319 @orchestrator._cli_write_command('cephadm config-check disable')
1320 def _config_check_disable(self, check_name: str) -> HandleCommandResult:
1321 """Disable a specific configuration check"""
1322 if not self._config_check_valid(check_name):
1323 return HandleCommandResult(retval=1, stderr="Invalid check name")
1324
1325 err, msg = self._update_config_check(check_name, 'disabled')
1326 if err:
1327 return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}")
1328 else:
1329 # drop any outstanding raised healthcheck for this check
1330 config_check = self.config_checker.lookup_check(check_name)
1331 if config_check:
1332 if config_check.healthcheck_name in self.health_checks:
1333 self.health_checks.pop(config_check.healthcheck_name, None)
1334 self.set_health_checks(self.health_checks)
1335 else:
1336 self.log.error(
1337 f"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1338
1339 return HandleCommandResult(stdout="ok")
1340
1341 def _config_check_valid(self, check_name: str) -> bool:
1342 return check_name in [chk.name for chk in self.config_checker.health_checks]
1343
1344 def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]:
1345 checks_raw = self.get_store('config_checks')
1346 if not checks_raw:
1347 return 1, "config_checks setting is not available"
1348
1349 checks = json.loads(checks_raw)
1350 checks.update({
1351 check_name: status
1352 })
1353 self.log.info(f"updated config check '{check_name}' : {status}")
1354 self.set_store('config_checks', json.dumps(checks))
1355 return 0, ""
1356
1357 class ExtraCephConf(NamedTuple):
1358 conf: str
1359 last_modified: Optional[datetime.datetime]
1360
1361 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
1362 data = self.get_store('extra_ceph_conf')
1363 if not data:
1364 return CephadmOrchestrator.ExtraCephConf('', None)
1365 try:
1366 j = json.loads(data)
1367 except ValueError:
1368 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
1369 self.log.exception('%s: \'%s\'', msg, data)
1370 return CephadmOrchestrator.ExtraCephConf('', None)
1371 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
1372
1373 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
1374 conf = self.extra_ceph_conf()
1375 if not conf.last_modified:
1376 return False
1377 return conf.last_modified > dt
1378
1379 @orchestrator._cli_write_command(
1380 'cephadm osd activate'
1381 )
1382 def _osd_activate(self, host: List[str]) -> HandleCommandResult:
1383 """
1384 Start OSD containers for existing OSDs
1385 """
1386
1387 @forall_hosts
1388 def run(h: str) -> str:
1389 with self.async_timeout_handler(h, 'cephadm deploy (osd daemon)'):
1390 return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
1391
1392 return HandleCommandResult(stdout='\n'.join(run(host)))
1393
1394 @orchestrator._cli_read_command('orch client-keyring ls')
1395 def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
1396 """
1397 List client keyrings under cephadm management
1398 """
1399 if format != Format.plain:
1400 output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec)
1401 else:
1402 table = PrettyTable(
1403 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1404 border=False)
1405 table.align = 'l'
1406 table.left_padding_width = 0
1407 table.right_padding_width = 2
1408 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity):
1409 table.add_row((
1410 ks.entity, ks.placement.pretty_str(),
1411 utils.file_mode_to_str(ks.mode),
1412 f'{ks.uid}:{ks.gid}',
1413 ks.path,
1414 ))
1415 output = table.get_string()
1416 return HandleCommandResult(stdout=output)
1417
1418 @orchestrator._cli_write_command('orch client-keyring set')
1419 def _client_keyring_set(
1420 self,
1421 entity: str,
1422 placement: str,
1423 owner: Optional[str] = None,
1424 mode: Optional[str] = None,
1425 ) -> HandleCommandResult:
1426 """
1427 Add or update client keyring under cephadm management
1428 """
1429 if not entity.startswith('client.'):
1430 raise OrchestratorError('entity must start with client.')
1431 if owner:
1432 try:
1433 uid, gid = map(int, owner.split(':'))
1434 except Exception:
1435 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1436 else:
1437 uid = 0
1438 gid = 0
1439 if mode:
1440 try:
1441 imode = int(mode, 8)
1442 except Exception:
1443 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1444 else:
1445 imode = 0o600
1446 pspec = PlacementSpec.from_string(placement)
1447 ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid)
1448 self.keys.update(ks)
1449 self._kick_serve_loop()
1450 return HandleCommandResult()
1451
1452 @orchestrator._cli_write_command('orch client-keyring rm')
1453 def _client_keyring_rm(
1454 self,
1455 entity: str,
1456 ) -> HandleCommandResult:
1457 """
1458 Remove client keyring from cephadm management
1459 """
1460 self.keys.rm(entity)
1461 self._kick_serve_loop()
1462 return HandleCommandResult()
1463
1464 def _get_container_image(self, daemon_name: str) -> Optional[str]:
1465 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1466 image: Optional[str] = None
1467 if daemon_type in CEPH_IMAGE_TYPES:
1468 # get container image
1469 image = str(self.get_foreign_ceph_option(
1470 utils.name_to_config_section(daemon_name),
1471 'container_image'
1472 )).strip()
1473 elif daemon_type == 'prometheus':
1474 image = self.container_image_prometheus
1475 elif daemon_type == 'grafana':
1476 image = self.container_image_grafana
1477 elif daemon_type == 'alertmanager':
1478 image = self.container_image_alertmanager
1479 elif daemon_type == 'node-exporter':
1480 image = self.container_image_node_exporter
1481 elif daemon_type == 'loki':
1482 image = self.container_image_loki
1483 elif daemon_type == 'promtail':
1484 image = self.container_image_promtail
1485 elif daemon_type == 'haproxy':
1486 image = self.container_image_haproxy
1487 elif daemon_type == 'keepalived':
1488 image = self.container_image_keepalived
1489 elif daemon_type == 'elasticsearch':
1490 image = self.container_image_elasticsearch
1491 elif daemon_type == 'jaeger-agent':
1492 image = self.container_image_jaeger_agent
1493 elif daemon_type == 'jaeger-collector':
1494 image = self.container_image_jaeger_collector
1495 elif daemon_type == 'jaeger-query':
1496 image = self.container_image_jaeger_query
1497 elif daemon_type == CustomContainerService.TYPE:
1498 # The image can't be resolved, the necessary information
1499 # is only available when a container is deployed (given
1500 # via spec).
1501 image = None
1502 elif daemon_type == 'snmp-gateway':
1503 image = self.container_image_snmp_gateway
1504 else:
1505 assert False, daemon_type
1506
1507 self.log.debug('%s container image %s' % (daemon_name, image))
1508
1509 return image
1510
1511 def _check_valid_addr(self, host: str, addr: str) -> str:
1512 # make sure hostname is resolvable before trying to make a connection
1513 try:
1514 ip_addr = utils.resolve_ip(addr)
1515 except OrchestratorError as e:
1516 msg = str(e) + f'''
1517 You may need to supply an address for {addr}
1518
1519 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1520 To add the cephadm SSH key to the host:
1521 > ceph cephadm get-pub-key > ~/ceph.pub
1522 > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1523
1524 To check that the host is reachable open a new shell with the --no-hosts flag:
1525 > cephadm shell --no-hosts
1526
1527 Then run the following:
1528 > ceph cephadm get-ssh-config > ssh_config
1529 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1530 > chmod 0600 ~/cephadm_private_key
1531 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1532 raise OrchestratorError(msg)
1533
1534 if ipaddress.ip_address(ip_addr).is_loopback and host == addr:
1535 # if this is a re-add, use old address. otherwise error
1536 if host not in self.inventory or self.inventory.get_addr(host) == host:
1537 raise OrchestratorError(
1538 (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1539 + f'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1540 self.log.debug(
1541 f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1542 ip_addr = self.inventory.get_addr(host)
1543 try:
1544 with self.async_timeout_handler(host, f'cephadm check-host --expect-hostname {host}'):
1545 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
1546 host, cephadmNoImage, 'check-host',
1547 ['--expect-hostname', host],
1548 addr=addr,
1549 error_ok=True, no_fsid=True))
1550 if code:
1551 msg = 'check-host failed:\n' + '\n'.join(err)
1552 # err will contain stdout and stderr, so we filter on the message text to
1553 # only show the errors
1554 errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
1555 if errors:
1556 msg = f'Host {host} ({addr}) failed check(s): {errors}'
1557 raise OrchestratorError(msg)
1558 except ssh.HostConnectionError as e:
1559 raise OrchestratorError(str(e))
1560 return ip_addr
1561
1562 def _add_host(self, spec):
1563 # type: (HostSpec) -> str
1564 """
1565 Add a host to be managed by the orchestrator.
1566
1567 :param host: host name
1568 """
1569 HostSpec.validate(spec)
1570 ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
1571 if spec.addr == spec.hostname and ip_addr:
1572 spec.addr = ip_addr
1573
1574 if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr:
1575 self.cache.refresh_all_host_info(spec.hostname)
1576
1577 # prime crush map?
1578 if spec.location:
1579 self.check_mon_command({
1580 'prefix': 'osd crush add-bucket',
1581 'name': spec.hostname,
1582 'type': 'host',
1583 'args': [f'{k}={v}' for k, v in spec.location.items()],
1584 })
1585
1586 if spec.hostname not in self.inventory:
1587 self.cache.prime_empty_host(spec.hostname)
1588 self.inventory.add_host(spec)
1589 self.offline_hosts_remove(spec.hostname)
1590 if spec.status == 'maintenance':
1591 self._set_maintenance_healthcheck()
1592 self.event.set() # refresh stray health check
1593 self.log.info('Added host %s' % spec.hostname)
1594 return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
1595
1596 @handle_orch_error
1597 def add_host(self, spec: HostSpec) -> str:
1598 return self._add_host(spec)
1599
1600 @handle_orch_error
1601 def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str:
1602 """
1603 Remove a host from orchestrator management.
1604
1605 :param host: host name
1606 :param force: bypass running daemons check
1607 :param offline: remove offline host
1608 """
1609
1610 # check if host is offline
1611 host_offline = host in self.offline_hosts
1612
1613 if host_offline and not offline:
1614 raise OrchestratorValidationError(
1615 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host))
1616
1617 if not host_offline and offline:
1618 raise OrchestratorValidationError(
1619 "{} is online, please remove host without --offline.".format(host))
1620
1621 if offline and not force:
1622 raise OrchestratorValidationError("Removing an offline host requires --force")
1623
1624 # check if there are daemons on the host
1625 if not force:
1626 daemons = self.cache.get_daemons_by_host(host)
1627 if daemons:
1628 self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}")
1629
1630 daemons_table = ""
1631 daemons_table += "{:<20} {:<15}\n".format("type", "id")
1632 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1633 for d in daemons:
1634 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
1635
1636 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1637 "The following daemons are running in the host:"
1638 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1639 host, daemons_table, host))
1640
1641 # check, if there we're removing the last _admin host
1642 if not force:
1643 p = PlacementSpec(label='_admin')
1644 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1645 if len(admin_hosts) == 1 and admin_hosts[0] == host:
1646 raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
1647 " label. Please add the '_admin' label to a host"
1648 " or add --force to this command")
1649
1650 def run_cmd(cmd_args: dict) -> None:
1651 ret, out, err = self.mon_command(cmd_args)
1652 if ret != 0:
1653 self.log.debug(f"ran {cmd_args} with mon_command")
1654 self.log.error(
1655 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1656 self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
1657
1658 if offline:
1659 daemons = self.cache.get_daemons_by_host(host)
1660 for d in daemons:
1661 self.log.info(f"removing: {d.name()}")
1662
1663 if d.daemon_type != 'osd':
1664 self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].pre_remove(d)
1665 self.cephadm_services[daemon_type_to_service(str(d.daemon_type))].post_remove(d, is_failed_deploy=False)
1666 else:
1667 cmd_args = {
1668 'prefix': 'osd purge-actual',
1669 'id': int(str(d.daemon_id)),
1670 'yes_i_really_mean_it': True
1671 }
1672 run_cmd(cmd_args)
1673
1674 cmd_args = {
1675 'prefix': 'osd crush rm',
1676 'name': host
1677 }
1678 run_cmd(cmd_args)
1679
1680 self.inventory.rm_host(host)
1681 self.cache.rm_host(host)
1682 self.ssh.reset_con(host)
1683 self.offline_hosts_remove(host) # if host was in offline host list, we should remove it now.
1684 self.event.set() # refresh stray health check
1685 self.log.info('Removed host %s' % host)
1686 return "Removed {} host '{}'".format('offline' if offline else '', host)
1687
1688 @handle_orch_error
1689 def update_host_addr(self, host: str, addr: str) -> str:
1690 self._check_valid_addr(host, addr)
1691 self.inventory.set_addr(host, addr)
1692 self.ssh.reset_con(host)
1693 self.event.set() # refresh stray health check
1694 self.log.info('Set host %s addr to %s' % (host, addr))
1695 return "Updated host '{}' addr to '{}'".format(host, addr)
1696
1697 @handle_orch_error
1698 def get_hosts(self):
1699 # type: () -> List[orchestrator.HostSpec]
1700 """
1701 Return a list of hosts managed by the orchestrator.
1702
1703 Notes:
1704 - skip async: manager reads from cache.
1705 """
1706 return list(self.inventory.all_specs())
1707
1708 @handle_orch_error
1709 def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]:
1710 """
1711 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1712
1713 Notes:
1714 - skip async: manager reads from cache.
1715 """
1716 if hostname:
1717 return [self.cache.get_facts(hostname)]
1718
1719 return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()]
1720
1721 @handle_orch_error
1722 def add_host_label(self, host: str, label: str) -> str:
1723 self.inventory.add_label(host, label)
1724 self.log.info('Added label %s to host %s' % (label, host))
1725 self._kick_serve_loop()
1726 return 'Added label %s to host %s' % (label, host)
1727
1728 @handle_orch_error
1729 def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
1730 # if we remove the _admin label from the only host that has it we could end up
1731 # removing the only instance of the config and keyring and cause issues
1732 if not force and label == '_admin':
1733 p = PlacementSpec(label='_admin')
1734 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1735 if len(admin_hosts) == 1 and admin_hosts[0] == host:
1736 raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
1737 " label.\nRemoving the _admin label from this host could cause the removal"
1738 " of the last cluster config/keyring managed by cephadm.\n"
1739 "It is recommended to add the _admin label to another host"
1740 " before completing this operation.\nIf you're certain this is"
1741 " what you want rerun this command with --force.")
1742 self.inventory.rm_label(host, label)
1743 self.log.info('Removed label %s to host %s' % (label, host))
1744 self._kick_serve_loop()
1745 return 'Removed label %s from host %s' % (label, host)
1746
1747 def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
1748 self.log.debug("running host-ok-to-stop checks")
1749 daemons = self.cache.get_daemons()
1750 daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
1751 for dd in daemons:
1752 assert dd.hostname is not None
1753 assert dd.daemon_type is not None
1754 assert dd.daemon_id is not None
1755 if dd.hostname == hostname:
1756 daemon_map[dd.daemon_type].append(dd.daemon_id)
1757
1758 notifications: List[str] = []
1759 error_notifications: List[str] = []
1760 okay: bool = True
1761 for daemon_type, daemon_ids in daemon_map.items():
1762 r = self.cephadm_services[daemon_type_to_service(
1763 daemon_type)].ok_to_stop(daemon_ids, force=force)
1764 if r.retval:
1765 okay = False
1766 # collect error notifications so user can see every daemon causing host
1767 # to not be okay to stop
1768 error_notifications.append(r.stderr)
1769 if r.stdout:
1770 # if extra notifications to print for user, add them to notifications list
1771 notifications.append(r.stdout)
1772
1773 if not okay:
1774 # at least one daemon is not okay to stop
1775 return 1, '\n'.join(error_notifications)
1776
1777 if notifications:
1778 return 0, (f'It is presumed safe to stop host {hostname}. '
1779 + 'Note the following:\n\n' + '\n'.join(notifications))
1780 return 0, f'It is presumed safe to stop host {hostname}'
1781
1782 @handle_orch_error
1783 def host_ok_to_stop(self, hostname: str) -> str:
1784 if hostname not in self.cache.get_hosts():
1785 raise OrchestratorError(f'Cannot find host "{hostname}"')
1786
1787 rc, msg = self._host_ok_to_stop(hostname)
1788 if rc:
1789 raise OrchestratorError(msg, errno=rc)
1790
1791 self.log.info(msg)
1792 return msg
1793
1794 def _set_maintenance_healthcheck(self) -> None:
1795 """Raise/update or clear the maintenance health check as needed"""
1796
1797 in_maintenance = self.inventory.get_host_with_state("maintenance")
1798 if not in_maintenance:
1799 self.remove_health_warning('HOST_IN_MAINTENANCE')
1800 else:
1801 s = "host is" if len(in_maintenance) == 1 else "hosts are"
1802 self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [
1803 f"{h} is in maintenance" for h in in_maintenance])
1804
1805 @handle_orch_error
1806 @host_exists()
1807 def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> str:
1808 """ Attempt to place a cluster host in maintenance
1809
1810 Placing a host into maintenance disables the cluster's ceph target in systemd
1811 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1812 for the host subtree in crush to prevent data movement during a host maintenance
1813 window.
1814
1815 :param hostname: (str) name of the host (must match an inventory hostname)
1816
1817 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1818 """
1819 if yes_i_really_mean_it and not force:
1820 raise OrchestratorError("--force must be passed with --yes-i-really-mean-it")
1821
1822 if len(self.cache.get_hosts()) == 1 and not yes_i_really_mean_it:
1823 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1824
1825 # if upgrade is active, deny
1826 if self.upgrade.upgrade_state and not yes_i_really_mean_it:
1827 raise OrchestratorError(
1828 f"Unable to place {hostname} in maintenance with upgrade active/paused")
1829
1830 tgt_host = self.inventory._inventory[hostname]
1831 if tgt_host.get("status", "").lower() == "maintenance":
1832 raise OrchestratorError(f"Host {hostname} is already in maintenance")
1833
1834 host_daemons = self.cache.get_daemon_types(hostname)
1835 self.log.debug("daemons on host {}".format(','.join(host_daemons)))
1836 if host_daemons:
1837 # daemons on this host, so check the daemons can be stopped
1838 # and if so, place the host into maintenance by disabling the target
1839 rc, msg = self._host_ok_to_stop(hostname, force)
1840 if rc and not yes_i_really_mean_it:
1841 raise OrchestratorError(
1842 msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
1843
1844 # call the host-maintenance function
1845 with self.async_timeout_handler(hostname, 'cephadm host-maintenance enter'):
1846 _out, _err, _code = self.wait_async(
1847 CephadmServe(self)._run_cephadm(
1848 hostname, cephadmNoImage, "host-maintenance",
1849 ["enter"],
1850 error_ok=True))
1851 returned_msg = _err[0].split('\n')[-1]
1852 if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')) and not yes_i_really_mean_it:
1853 raise OrchestratorError(
1854 f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1855
1856 if "osd" in host_daemons:
1857 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1858 rc, out, err = self.mon_command({
1859 'prefix': 'osd set-group',
1860 'flags': 'noout',
1861 'who': [crush_node],
1862 'format': 'json'
1863 })
1864 if rc and not yes_i_really_mean_it:
1865 self.log.warning(
1866 f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1867 raise OrchestratorError(
1868 f"Unable to set the osds on {hostname} to noout (rc={rc})")
1869 elif not rc:
1870 self.log.info(
1871 f"maintenance mode request for {hostname} has SET the noout group")
1872
1873 # update the host status in the inventory
1874 tgt_host["status"] = "maintenance"
1875 self.inventory._inventory[hostname] = tgt_host
1876 self.inventory.save()
1877
1878 self._set_maintenance_healthcheck()
1879 return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
1880
1881 @handle_orch_error
1882 @host_exists()
1883 def exit_host_maintenance(self, hostname: str) -> str:
1884 """Exit maintenance mode and return a host to an operational state
1885
1886 Returning from maintenance will enable the clusters systemd target and
1887 start it, and remove any noout that has been added for the host if the
1888 host has osd daemons
1889
1890 :param hostname: (str) host name
1891
1892 :raises OrchestratorError: Unable to return from maintenance, or unset the
1893 noout flag
1894 """
1895 tgt_host = self.inventory._inventory[hostname]
1896 if tgt_host['status'] != "maintenance":
1897 raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
1898
1899 with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
1900 outs, errs, _code = self.wait_async(
1901 CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
1902 'host-maintenance', ['exit'], error_ok=True))
1903 returned_msg = errs[0].split('\n')[-1]
1904 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
1905 raise OrchestratorError(
1906 f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1907
1908 if "osd" in self.cache.get_daemon_types(hostname):
1909 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1910 rc, _out, _err = self.mon_command({
1911 'prefix': 'osd unset-group',
1912 'flags': 'noout',
1913 'who': [crush_node],
1914 'format': 'json'
1915 })
1916 if rc:
1917 self.log.warning(
1918 f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1919 raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
1920 else:
1921 self.log.info(
1922 f"exit maintenance request has UNSET for the noout group on host {hostname}")
1923
1924 # update the host record status
1925 tgt_host['status'] = ""
1926 self.inventory._inventory[hostname] = tgt_host
1927 self.inventory.save()
1928
1929 self._set_maintenance_healthcheck()
1930
1931 return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1932
1933 @handle_orch_error
1934 @host_exists()
1935 def rescan_host(self, hostname: str) -> str:
1936 """Use cephadm to issue a disk rescan on each HBA
1937
1938 Some HBAs and external enclosures don't automatically register
1939 device insertion with the kernel, so for these scenarios we need
1940 to manually rescan
1941
1942 :param hostname: (str) host name
1943 """
1944 self.log.info(f'disk rescan request sent to host "{hostname}"')
1945 with self.async_timeout_handler(hostname, 'cephadm disk-rescan'):
1946 _out, _err, _code = self.wait_async(
1947 CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
1948 [], no_fsid=True, error_ok=True))
1949 if not _err:
1950 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
1951
1952 msg = _err[0].split('\n')[-1]
1953 log_msg = f'disk rescan: {msg}'
1954 if msg.upper().startswith('OK'):
1955 self.log.info(log_msg)
1956 else:
1957 self.log.warning(log_msg)
1958
1959 return f'{msg}'
1960
1961 def get_minimal_ceph_conf(self) -> str:
1962 _, config, _ = self.check_mon_command({
1963 "prefix": "config generate-minimal-conf",
1964 })
1965 extra = self.extra_ceph_conf().conf
1966 if extra:
1967 try:
1968 config = self._combine_confs(config, extra)
1969 except Exception as e:
1970 self.log.error(f'Failed to add extra ceph conf settings to minimal ceph conf: {e}')
1971 return config
1972
1973 def _combine_confs(self, conf1: str, conf2: str) -> str:
1974 section_to_option: Dict[str, List[str]] = {}
1975 final_conf: str = ''
1976 for conf in [conf1, conf2]:
1977 if not conf:
1978 continue
1979 section = ''
1980 for line in conf.split('\n'):
1981 if line.strip().startswith('#') or not line.strip():
1982 continue
1983 if line.strip().startswith('[') and line.strip().endswith(']'):
1984 section = line.strip().replace('[', '').replace(']', '')
1985 if section not in section_to_option:
1986 section_to_option[section] = []
1987 else:
1988 section_to_option[section].append(line.strip())
1989
1990 first_section = True
1991 for section, options in section_to_option.items():
1992 if not first_section:
1993 final_conf += '\n'
1994 final_conf += f'[{section}]\n'
1995 for option in options:
1996 final_conf += f'{option}\n'
1997 first_section = False
1998
1999 return final_conf
2000
2001 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
2002 if filter_host:
2003 self.cache.invalidate_host_daemons(filter_host)
2004 else:
2005 for h in self.cache.get_hosts():
2006 # Also discover daemons deployed manually
2007 self.cache.invalidate_host_daemons(h)
2008
2009 self._kick_serve_loop()
2010
2011 @handle_orch_error
2012 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
2013 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
2014 if refresh:
2015 self._invalidate_daemons_and_kick_serve()
2016 self.log.debug('Kicked serve() loop to refresh all services')
2017
2018 sm: Dict[str, orchestrator.ServiceDescription] = {}
2019
2020 # known services
2021 for nm, spec in self.spec_store.all_specs.items():
2022 if service_type is not None and service_type != spec.service_type:
2023 continue
2024 if service_name is not None and service_name != nm:
2025 continue
2026
2027 if spec.service_type != 'osd':
2028 size = spec.placement.get_target_count(self.cache.get_schedulable_hosts())
2029 else:
2030 # osd counting is special
2031 size = 0
2032
2033 sm[nm] = orchestrator.ServiceDescription(
2034 spec=spec,
2035 size=size,
2036 running=0,
2037 events=self.events.get_for_service(spec.service_name()),
2038 created=self.spec_store.spec_created[nm],
2039 deleted=self.spec_store.spec_deleted.get(nm, None),
2040 virtual_ip=spec.get_virtual_ip(),
2041 ports=spec.get_port_start(),
2042 )
2043 if spec.service_type == 'ingress':
2044 # ingress has 2 daemons running per host
2045 # but only if it's the full ingress service, not for keepalive-only
2046 if not cast(IngressSpec, spec).keepalive_only:
2047 sm[nm].size *= 2
2048
2049 # factor daemons into status
2050 for h, dm in self.cache.get_daemons_with_volatile_status():
2051 for name, dd in dm.items():
2052 assert dd.hostname is not None, f'no hostname for {dd!r}'
2053 assert dd.daemon_type is not None, f'no daemon_type for {dd!r}'
2054
2055 n: str = dd.service_name()
2056
2057 if (
2058 service_type
2059 and service_type != daemon_type_to_service(dd.daemon_type)
2060 ):
2061 continue
2062 if service_name and service_name != n:
2063 continue
2064
2065 if n not in sm:
2066 # new unmanaged service
2067 spec = ServiceSpec(
2068 unmanaged=True,
2069 service_type=daemon_type_to_service(dd.daemon_type),
2070 service_id=dd.service_id(),
2071 )
2072 sm[n] = orchestrator.ServiceDescription(
2073 last_refresh=dd.last_refresh,
2074 container_image_id=dd.container_image_id,
2075 container_image_name=dd.container_image_name,
2076 spec=spec,
2077 size=0,
2078 )
2079
2080 if dd.status == DaemonDescriptionStatus.running:
2081 sm[n].running += 1
2082 if dd.daemon_type == 'osd':
2083 # The osd count can't be determined by the Placement spec.
2084 # Showing an actual/expected representation cannot be determined
2085 # here. So we're setting running = size for now.
2086 sm[n].size += 1
2087 if (
2088 not sm[n].last_refresh
2089 or not dd.last_refresh
2090 or dd.last_refresh < sm[n].last_refresh # type: ignore
2091 ):
2092 sm[n].last_refresh = dd.last_refresh
2093
2094 return list(sm.values())
2095
2096 @handle_orch_error
2097 def list_daemons(self,
2098 service_name: Optional[str] = None,
2099 daemon_type: Optional[str] = None,
2100 daemon_id: Optional[str] = None,
2101 host: Optional[str] = None,
2102 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
2103 if refresh:
2104 self._invalidate_daemons_and_kick_serve(host)
2105 self.log.debug('Kicked serve() loop to refresh all daemons')
2106
2107 result = []
2108 for h, dm in self.cache.get_daemons_with_volatile_status():
2109 if host and h != host:
2110 continue
2111 for name, dd in dm.items():
2112 if daemon_type is not None and daemon_type != dd.daemon_type:
2113 continue
2114 if daemon_id is not None and daemon_id != dd.daemon_id:
2115 continue
2116 if service_name is not None and service_name != dd.service_name():
2117 continue
2118 if not dd.memory_request and dd.daemon_type in ['osd', 'mon']:
2119 dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option(
2120 dd.name(),
2121 f"{dd.daemon_type}_memory_target"
2122 ))
2123 result.append(dd)
2124 return result
2125
2126 @handle_orch_error
2127 def service_action(self, action: str, service_name: str) -> List[str]:
2128 if service_name not in self.spec_store.all_specs.keys():
2129 raise OrchestratorError(f'Invalid service name "{service_name}".'
2130 + ' View currently running services using "ceph orch ls"')
2131 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
2132 if not dds:
2133 raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
2134 + ' View currently running services using "ceph orch ls"')
2135 if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
2136 return [f'Stopping entire {service_name} service is prohibited.']
2137 self.log.info('%s service %s' % (action.capitalize(), service_name))
2138 return [
2139 self._schedule_daemon_action(dd.name(), action)
2140 for dd in dds
2141 ]
2142
2143 def _rotate_daemon_key(self, daemon_spec: CephadmDaemonDeploySpec) -> str:
2144 self.log.info(f'Rotating authentication key for {daemon_spec.name()}')
2145 rc, out, err = self.mon_command({
2146 'prefix': 'auth get-or-create-pending',
2147 'entity': daemon_spec.entity_name(),
2148 'format': 'json',
2149 })
2150 j = json.loads(out)
2151 pending_key = j[0]['pending_key']
2152
2153 # deploy a new keyring file
2154 if daemon_spec.daemon_type != 'osd':
2155 daemon_spec = self.cephadm_services[daemon_type_to_service(
2156 daemon_spec.daemon_type)].prepare_create(daemon_spec)
2157 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2158 self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=True))
2159
2160 # try to be clever, or fall back to restarting the daemon
2161 rc = -1
2162 if daemon_spec.daemon_type == 'osd':
2163 rc, out, err = self.tool_exec(
2164 args=['ceph', 'tell', daemon_spec.name(), 'rotate-stored-key', '-i', '-'],
2165 stdin=pending_key.encode()
2166 )
2167 if not rc:
2168 rc, out, err = self.tool_exec(
2169 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2170 stdin=pending_key.encode()
2171 )
2172 elif daemon_spec.daemon_type == 'mds':
2173 rc, out, err = self.tool_exec(
2174 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2175 stdin=pending_key.encode()
2176 )
2177 elif (
2178 daemon_spec.daemon_type == 'mgr'
2179 and daemon_spec.daemon_id == self.get_mgr_id()
2180 ):
2181 rc, out, err = self.tool_exec(
2182 args=['ceph', 'tell', daemon_spec.name(), 'rotate-key', '-i', '-'],
2183 stdin=pending_key.encode()
2184 )
2185 if rc:
2186 self._daemon_action(daemon_spec, 'restart')
2187
2188 return f'Rotated key for {daemon_spec.name()}'
2189
2190 def _daemon_action(self,
2191 daemon_spec: CephadmDaemonDeploySpec,
2192 action: str,
2193 image: Optional[str] = None) -> str:
2194 self._daemon_action_set_image(action, image, daemon_spec.daemon_type,
2195 daemon_spec.daemon_id)
2196
2197 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type,
2198 daemon_spec.daemon_id):
2199 self.mgr_service.fail_over()
2200 return '' # unreachable
2201
2202 if action == 'rotate-key':
2203 return self._rotate_daemon_key(daemon_spec)
2204
2205 if action == 'redeploy' or action == 'reconfig':
2206 if daemon_spec.daemon_type != 'osd':
2207 daemon_spec = self.cephadm_services[daemon_type_to_service(
2208 daemon_spec.daemon_type)].prepare_create(daemon_spec)
2209 else:
2210 # for OSDs, we still need to update config, just not carry out the full
2211 # prepare_create function
2212 daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(
2213 daemon_spec)
2214 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2215 return self.wait_async(
2216 CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
2217
2218 actions = {
2219 'start': ['reset-failed', 'start'],
2220 'stop': ['stop'],
2221 'restart': ['reset-failed', 'restart'],
2222 }
2223 name = daemon_spec.name()
2224 for a in actions[action]:
2225 try:
2226 with self.async_timeout_handler(daemon_spec.host, f'cephadm unit --name {name}'):
2227 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2228 daemon_spec.host, name, 'unit',
2229 ['--name', name, a]))
2230 except Exception:
2231 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
2232 self.cache.invalidate_host_daemons(daemon_spec.host)
2233 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
2234 self.events.for_daemon(name, 'INFO', msg)
2235 return msg
2236
2237 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
2238 if image is not None:
2239 if action != 'redeploy':
2240 raise OrchestratorError(
2241 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
2242 if daemon_type not in CEPH_IMAGE_TYPES:
2243 raise OrchestratorError(
2244 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
2245 f'types are: {", ".join(CEPH_IMAGE_TYPES)}')
2246
2247 self.check_mon_command({
2248 'prefix': 'config set',
2249 'name': 'container_image',
2250 'value': image,
2251 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
2252 })
2253
2254 @handle_orch_error
2255 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
2256 d = self.cache.get_daemon(daemon_name)
2257 assert d.daemon_type is not None
2258 assert d.daemon_id is not None
2259
2260 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \
2261 and not self.mgr_service.mgr_map_has_standby():
2262 raise OrchestratorError(
2263 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2264
2265 if action == 'rotate-key':
2266 if d.daemon_type not in ['mgr', 'osd', 'mds',
2267 'rgw', 'crash', 'nfs', 'rbd-mirror', 'iscsi']:
2268 raise OrchestratorError(
2269 f'key rotation not supported for {d.daemon_type}'
2270 )
2271
2272 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
2273
2274 self.log.info(f'Schedule {action} daemon {daemon_name}')
2275 return self._schedule_daemon_action(daemon_name, action)
2276
2277 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
2278 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
2279
2280 def get_active_mgr(self) -> DaemonDescription:
2281 return self.mgr_service.get_active_daemon(self.cache.get_daemons_by_type('mgr'))
2282
2283 def get_active_mgr_digests(self) -> List[str]:
2284 digests = self.mgr_service.get_active_daemon(
2285 self.cache.get_daemons_by_type('mgr')).container_image_digests
2286 return digests if digests else []
2287
2288 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
2289 dd = self.cache.get_daemon(daemon_name)
2290 assert dd.daemon_type is not None
2291 assert dd.daemon_id is not None
2292 assert dd.hostname is not None
2293 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
2294 and not self.mgr_service.mgr_map_has_standby():
2295 raise OrchestratorError(
2296 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2297 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
2298 self.cache.save_host(dd.hostname)
2299 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
2300 self._kick_serve_loop()
2301 return msg
2302
2303 @handle_orch_error
2304 def remove_daemons(self, names):
2305 # type: (List[str]) -> List[str]
2306 args = []
2307 for host, dm in self.cache.daemons.items():
2308 for name in names:
2309 if name in dm:
2310 args.append((name, host))
2311 if not args:
2312 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
2313 self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
2314 return self._remove_daemons(args)
2315
2316 @handle_orch_error
2317 def remove_service(self, service_name: str, force: bool = False) -> str:
2318 self.log.info('Remove service %s' % service_name)
2319 self._trigger_preview_refresh(service_name=service_name)
2320 if service_name in self.spec_store:
2321 if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
2322 return f'Unable to remove {service_name} service.\n' \
2323 f'Note, you might want to mark the {service_name} service as "unmanaged"'
2324 else:
2325 return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
2326
2327 # Report list of affected OSDs?
2328 if not force and service_name.startswith('osd.'):
2329 osds_msg = {}
2330 for h, dm in self.cache.get_daemons_with_volatile_status():
2331 osds_to_remove = []
2332 for name, dd in dm.items():
2333 if dd.daemon_type == 'osd' and dd.service_name() == service_name:
2334 osds_to_remove.append(str(dd.daemon_id))
2335 if osds_to_remove:
2336 osds_msg[h] = osds_to_remove
2337 if osds_msg:
2338 msg = ''
2339 for h, ls in osds_msg.items():
2340 msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2341 raise OrchestratorError(
2342 f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2343
2344 found = self.spec_store.rm(service_name)
2345 if found and service_name.startswith('osd.'):
2346 self.spec_store.finally_rm(service_name)
2347 self._kick_serve_loop()
2348 return f'Removed service {service_name}'
2349
2350 @handle_orch_error
2351 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
2352 """
2353 Return the storage inventory of hosts matching the given filter.
2354
2355 :param host_filter: host filter
2356
2357 TODO:
2358 - add filtering by label
2359 """
2360 if refresh:
2361 if host_filter and host_filter.hosts:
2362 for h in host_filter.hosts:
2363 self.log.debug(f'will refresh {h} devs')
2364 self.cache.invalidate_host_devices(h)
2365 self.cache.invalidate_host_networks(h)
2366 else:
2367 for h in self.cache.get_hosts():
2368 self.log.debug(f'will refresh {h} devs')
2369 self.cache.invalidate_host_devices(h)
2370 self.cache.invalidate_host_networks(h)
2371
2372 self.event.set()
2373 self.log.debug('Kicked serve() loop to refresh devices')
2374
2375 result = []
2376 for host, dls in self.cache.devices.items():
2377 if host_filter and host_filter.hosts and host not in host_filter.hosts:
2378 continue
2379 result.append(orchestrator.InventoryHost(host,
2380 inventory.Devices(dls)))
2381 return result
2382
2383 @handle_orch_error
2384 def zap_device(self, host: str, path: str) -> str:
2385 """Zap a device on a managed host.
2386
2387 Use ceph-volume zap to return a device to an unused/free state
2388
2389 Args:
2390 host (str): hostname of the cluster host
2391 path (str): device path
2392
2393 Raises:
2394 OrchestratorError: host is not a cluster host
2395 OrchestratorError: host is in maintenance and therefore unavailable
2396 OrchestratorError: device path not found on the host
2397 OrchestratorError: device is known to a different ceph cluster
2398 OrchestratorError: device holds active osd
2399 OrchestratorError: device cache hasn't been populated yet..
2400
2401 Returns:
2402 str: output from the zap command
2403 """
2404
2405 self.log.info('Zap device %s:%s' % (host, path))
2406
2407 if host not in self.inventory.keys():
2408 raise OrchestratorError(
2409 f"Host '{host}' is not a member of the cluster")
2410
2411 host_info = self.inventory._inventory.get(host, {})
2412 if host_info.get('status', '').lower() == 'maintenance':
2413 raise OrchestratorError(
2414 f"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2415
2416 if host not in self.cache.devices:
2417 raise OrchestratorError(
2418 f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2419
2420 host_devices = self.cache.devices[host]
2421 path_found = False
2422 osd_id_list: List[str] = []
2423
2424 for dev in host_devices:
2425 if dev.path == path:
2426 path_found = True
2427 break
2428 if not path_found:
2429 raise OrchestratorError(
2430 f"Device path '{path}' not found on host '{host}'")
2431
2432 if osd_id_list:
2433 dev_name = os.path.basename(path)
2434 active_osds: List[str] = []
2435 for osd_id in osd_id_list:
2436 metadata = self.get_metadata('osd', str(osd_id))
2437 if metadata:
2438 if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','):
2439 active_osds.append("osd." + osd_id)
2440 if active_osds:
2441 raise OrchestratorError(
2442 f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2443 f"OSD{'s' if len(active_osds) > 1 else ''}"
2444 f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2445
2446 cv_args = ['--', 'lvm', 'zap', '--destroy', path]
2447 with self.async_timeout_handler(host, f'cephadm ceph-volume {" ".join(cv_args)}'):
2448 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2449 host, 'osd', 'ceph-volume', cv_args, error_ok=True))
2450
2451 self.cache.invalidate_host_devices(host)
2452 self.cache.invalidate_host_networks(host)
2453 if code:
2454 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
2455 msg = f'zap successful for {path} on {host}'
2456 self.log.info(msg)
2457
2458 return msg + '\n'
2459
2460 @handle_orch_error
2461 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
2462 """
2463 Blink a device light. Calling something like::
2464
2465 lsmcli local-disk-ident-led-on --path $path
2466
2467 If you must, you can customize this via::
2468
2469 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2470 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2471
2472 See templates/blink_device_light_cmd.j2
2473 """
2474 @forall_hosts
2475 def blink(host: str, dev: str, path: str) -> str:
2476 cmd_line = self.template.render('blink_device_light_cmd.j2',
2477 {
2478 'on': on,
2479 'ident_fault': ident_fault,
2480 'dev': dev,
2481 'path': path
2482 },
2483 host=host)
2484 cmd_args = shlex.split(cmd_line)
2485
2486 with self.async_timeout_handler(host, f'cephadm shell -- {" ".join(cmd_args)}'):
2487 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2488 host, 'osd', 'shell', ['--'] + cmd_args,
2489 error_ok=True))
2490 if code:
2491 raise OrchestratorError(
2492 'Unable to affect %s light for %s:%s. Command: %s' % (
2493 ident_fault, host, dev, ' '.join(cmd_args)))
2494 self.log.info('Set %s light for %s:%s %s' % (
2495 ident_fault, host, dev, 'on' if on else 'off'))
2496 return "Set %s light for %s:%s %s" % (
2497 ident_fault, host, dev, 'on' if on else 'off')
2498
2499 return blink(locs)
2500
2501 def get_osd_uuid_map(self, only_up=False):
2502 # type: (bool) -> Dict[str, str]
2503 osd_map = self.get('osd_map')
2504 r = {}
2505 for o in osd_map['osds']:
2506 # only include OSDs that have ever started in this map. this way
2507 # an interrupted osd create can be repeated and succeed the second
2508 # time around.
2509 osd_id = o.get('osd')
2510 if osd_id is None:
2511 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2512 if not only_up:
2513 r[str(osd_id)] = o.get('uuid', '')
2514 return r
2515
2516 def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]:
2517 osd = [x for x in self.get('osd_map')['osds']
2518 if x['osd'] == osd_id]
2519
2520 if len(osd) != 1:
2521 return None
2522
2523 return osd[0]
2524
2525 def _trigger_preview_refresh(self,
2526 specs: Optional[List[DriveGroupSpec]] = None,
2527 service_name: Optional[str] = None,
2528 ) -> None:
2529 # Only trigger a refresh when a spec has changed
2530 trigger_specs = []
2531 if specs:
2532 for spec in specs:
2533 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
2534 # the to-be-preview spec != the actual spec, this means we need to
2535 # trigger a refresh, if the spec has been removed (==None) we need to
2536 # refresh as well.
2537 if not preview_spec or spec != preview_spec:
2538 trigger_specs.append(spec)
2539 if service_name:
2540 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
2541 if not any(trigger_specs):
2542 return None
2543
2544 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
2545 for host in refresh_hosts:
2546 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
2547 self.cache.osdspec_previews_refresh_queue.append(host)
2548
2549 @handle_orch_error
2550 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
2551 """
2552 Deprecated. Please use `apply()` instead.
2553
2554 Keeping this around to be compatible to mgr/dashboard
2555 """
2556 return [self._apply(spec) for spec in specs]
2557
2558 @handle_orch_error
2559 def create_osds(self, drive_group: DriveGroupSpec) -> str:
2560 hosts: List[HostSpec] = self.inventory.all_specs()
2561 filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
2562 if not filtered_hosts:
2563 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
2564 return self.osd_service.create_from_spec(drive_group)
2565
2566 def _preview_osdspecs(self,
2567 osdspecs: Optional[List[DriveGroupSpec]] = None
2568 ) -> dict:
2569 if not osdspecs:
2570 return {'n/a': [{'error': True,
2571 'message': 'No OSDSpec or matching hosts found.'}]}
2572 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
2573 if not matching_hosts:
2574 return {'n/a': [{'error': True,
2575 'message': 'No OSDSpec or matching hosts found.'}]}
2576 # Is any host still loading previews or still in the queue to be previewed
2577 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
2578 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
2579 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2580 return {'n/a': [{'error': True,
2581 'message': 'Preview data is being generated.. '
2582 'Please re-run this command in a bit.'}]}
2583 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2584 previews_for_specs = {}
2585 for host, raw_reports in self.cache.osdspec_previews.items():
2586 if host not in matching_hosts:
2587 continue
2588 osd_reports = []
2589 for osd_report in raw_reports:
2590 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
2591 osd_reports.append(osd_report)
2592 previews_for_specs.update({host: osd_reports})
2593 return previews_for_specs
2594
2595 def _calc_daemon_deps(self,
2596 spec: Optional[ServiceSpec],
2597 daemon_type: str,
2598 daemon_id: str) -> List[str]:
2599
2600 def get_daemon_names(daemons: List[str]) -> List[str]:
2601 daemon_names = []
2602 for daemon_type in daemons:
2603 for dd in self.cache.get_daemons_by_type(daemon_type):
2604 daemon_names.append(dd.name())
2605 return daemon_names
2606
2607 deps = []
2608 if daemon_type == 'haproxy':
2609 # because cephadm creates new daemon instances whenever
2610 # port or ip changes, identifying daemons by name is
2611 # sufficient to detect changes.
2612 if not spec:
2613 return []
2614 ingress_spec = cast(IngressSpec, spec)
2615 assert ingress_spec.backend_service
2616 daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service)
2617 deps = [d.name() for d in daemons]
2618 elif daemon_type == 'keepalived':
2619 # because cephadm creates new daemon instances whenever
2620 # port or ip changes, identifying daemons by name is
2621 # sufficient to detect changes.
2622 if not spec:
2623 return []
2624 daemons = self.cache.get_daemons_by_service(spec.service_name())
2625 deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
2626 elif daemon_type == 'agent':
2627 root_cert = ''
2628 server_port = ''
2629 try:
2630 server_port = str(self.http_server.agent.server_port)
2631 root_cert = self.http_server.agent.ssl_certs.get_root_cert()
2632 except Exception:
2633 pass
2634 deps = sorted([self.get_mgr_ip(), server_port, root_cert,
2635 str(self.device_enhanced_scan)])
2636 elif daemon_type == 'iscsi':
2637 if spec:
2638 iscsi_spec = cast(IscsiServiceSpec, spec)
2639 deps = [self.iscsi_service.get_trusted_ips(iscsi_spec)]
2640 else:
2641 deps = [self.get_mgr_ip()]
2642 elif daemon_type == 'prometheus':
2643 # for prometheus we add the active mgr as an explicit dependency,
2644 # this way we force a redeploy after a mgr failover
2645 deps.append(self.get_active_mgr().name())
2646 deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
2647 deps.append(str(self.service_discovery_port))
2648 # prometheus yaml configuration file (generated by prometheus.yml.j2) contains
2649 # a scrape_configs section for each service type. This should be included only
2650 # when at least one daemon of the corresponding service is running. Therefore,
2651 # an explicit dependency is added for each service-type to force a reconfig
2652 # whenever the number of daemons for those service-type changes from 0 to greater
2653 # than zero and vice versa.
2654 deps += [s for s in ['node-exporter', 'alertmanager'] if self.cache.get_daemons_by_service(s)]
2655 if len(self.cache.get_daemons_by_type('ingress')) > 0:
2656 deps.append('ingress')
2657 # add dependency on ceph-exporter daemons
2658 deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')]
2659 if self.secure_monitoring_stack:
2660 if self.prometheus_web_user and self.prometheus_web_password:
2661 deps.append(f'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
2662 if self.alertmanager_web_user and self.alertmanager_web_password:
2663 deps.append(f'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
2664 elif daemon_type == 'grafana':
2665 deps += get_daemon_names(['prometheus', 'loki'])
2666 if self.secure_monitoring_stack and self.prometheus_web_user and self.prometheus_web_password:
2667 deps.append(f'{hash(self.prometheus_web_user + self.prometheus_web_password)}')
2668 elif daemon_type == 'alertmanager':
2669 deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway'])
2670 if self.secure_monitoring_stack and self.alertmanager_web_user and self.alertmanager_web_password:
2671 deps.append(f'{hash(self.alertmanager_web_user + self.alertmanager_web_password)}')
2672 elif daemon_type == 'promtail':
2673 deps += get_daemon_names(['loki'])
2674 else:
2675 # TODO(redo): some error message!
2676 pass
2677
2678 if daemon_type in ['prometheus', 'node-exporter', 'alertmanager', 'grafana']:
2679 deps.append(f'secure_monitoring_stack:{self.secure_monitoring_stack}')
2680
2681 return sorted(deps)
2682
2683 @forall_hosts
2684 def _remove_daemons(self, name: str, host: str) -> str:
2685 return CephadmServe(self)._remove_daemon(name, host)
2686
2687 def _check_pool_exists(self, pool: str, service_name: str) -> None:
2688 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
2689 if not self.rados.pool_exists(pool):
2690 raise OrchestratorError(f'Cannot find pool "{pool}" for '
2691 f'service {service_name}')
2692
2693 def _add_daemon(self,
2694 daemon_type: str,
2695 spec: ServiceSpec) -> List[str]:
2696 """
2697 Add (and place) a daemon. Require explicit host placement. Do not
2698 schedule, and do not apply the related scheduling limitations.
2699 """
2700 if spec.service_name() not in self.spec_store:
2701 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2702 'Please use `ceph orch apply ...` to create a Service.\n'
2703 'Note, you might want to create the service with "unmanaged=true"')
2704
2705 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2706 if not spec.placement.hosts:
2707 raise OrchestratorError('must specify host(s) to deploy on')
2708 count = spec.placement.count or len(spec.placement.hosts)
2709 daemons = self.cache.get_daemons_by_service(spec.service_name())
2710 return self._create_daemons(daemon_type, spec, daemons,
2711 spec.placement.hosts, count)
2712
2713 def _create_daemons(self,
2714 daemon_type: str,
2715 spec: ServiceSpec,
2716 daemons: List[DaemonDescription],
2717 hosts: List[HostPlacementSpec],
2718 count: int) -> List[str]:
2719 if count > len(hosts):
2720 raise OrchestratorError('too few hosts: want %d, have %s' % (
2721 count, hosts))
2722
2723 did_config = False
2724 service_type = daemon_type_to_service(daemon_type)
2725
2726 args = [] # type: List[CephadmDaemonDeploySpec]
2727 for host, network, name in hosts:
2728 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2729 prefix=spec.service_id,
2730 forcename=name)
2731
2732 if not did_config:
2733 self.cephadm_services[service_type].config(spec)
2734 did_config = True
2735
2736 daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
2737 host, daemon_id, network, spec,
2738 # NOTE: this does not consider port conflicts!
2739 ports=spec.get_port_start())
2740 self.log.debug('Placing %s.%s on host %s' % (
2741 daemon_type, daemon_id, host))
2742 args.append(daemon_spec)
2743
2744 # add to daemon list so next name(s) will also be unique
2745 sd = orchestrator.DaemonDescription(
2746 hostname=host,
2747 daemon_type=daemon_type,
2748 daemon_id=daemon_id,
2749 )
2750 daemons.append(sd)
2751
2752 @ forall_hosts
2753 def create_func_map(*args: Any) -> str:
2754 daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
2755 with self.async_timeout_handler(daemon_spec.host, f'cephadm deploy ({daemon_spec.daemon_type} daemon)'):
2756 return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
2757
2758 return create_func_map(args)
2759
2760 @handle_orch_error
2761 def add_daemon(self, spec: ServiceSpec) -> List[str]:
2762 ret: List[str] = []
2763 try:
2764 with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True):
2765 for d_type in service_to_daemon_types(spec.service_type):
2766 ret.extend(self._add_daemon(d_type, spec))
2767 return ret
2768 except OrchestratorError as e:
2769 self.events.from_orch_error(e)
2770 raise
2771
2772 @handle_orch_error
2773 def get_prometheus_access_info(self) -> Dict[str, str]:
2774 return {'user': self.prometheus_web_user or '',
2775 'password': self.prometheus_web_password or '',
2776 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
2777
2778 @handle_orch_error
2779 def get_alertmanager_access_info(self) -> Dict[str, str]:
2780 return {'user': self.alertmanager_web_user or '',
2781 'password': self.alertmanager_web_password or '',
2782 'certificate': self.http_server.service_discovery.ssl_certs.get_root_cert()}
2783
2784 @handle_orch_error
2785 def apply_mon(self, spec: ServiceSpec) -> str:
2786 return self._apply(spec)
2787
2788 def _apply(self, spec: GenericSpec) -> str:
2789 if spec.service_type == 'host':
2790 return self._add_host(cast(HostSpec, spec))
2791
2792 if spec.service_type == 'osd':
2793 # _trigger preview refresh needs to be smart and
2794 # should only refresh if a change has been detected
2795 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
2796
2797 return self._apply_service_spec(cast(ServiceSpec, spec))
2798
2799 def _get_candidate_hosts(self, placement: PlacementSpec) -> List[str]:
2800 """Return a list of candidate hosts according to the placement specification."""
2801 all_hosts = self.cache.get_schedulable_hosts()
2802 draining_hosts = [dh.hostname for dh in self.cache.get_draining_hosts()]
2803 candidates = []
2804 if placement.hosts:
2805 candidates = [h.hostname for h in placement.hosts if h.hostname in placement.hosts]
2806 elif placement.label:
2807 candidates = [x.hostname for x in [h for h in all_hosts if placement.label in h.labels]]
2808 elif placement.host_pattern:
2809 candidates = [x for x in placement.filter_matching_hostspecs(all_hosts)]
2810 elif (placement.count is not None or placement.count_per_host is not None):
2811 candidates = [x.hostname for x in all_hosts]
2812 return [h for h in candidates if h not in draining_hosts]
2813
2814 def _validate_one_shot_placement_spec(self, spec: PlacementSpec) -> None:
2815 """Validate placement specification for TunedProfileSpec and ClientKeyringSpec."""
2816 if spec.count is not None:
2817 raise OrchestratorError(
2818 "Placement 'count' field is no supported for this specification.")
2819 if spec.count_per_host is not None:
2820 raise OrchestratorError(
2821 "Placement 'count_per_host' field is no supported for this specification.")
2822 if spec.hosts:
2823 all_hosts = [h.hostname for h in self.inventory.all_specs()]
2824 invalid_hosts = [h.hostname for h in spec.hosts if h.hostname not in all_hosts]
2825 if invalid_hosts:
2826 raise OrchestratorError(f"Found invalid host(s) in placement section: {invalid_hosts}. "
2827 f"Please check 'ceph orch host ls' for available hosts.")
2828 elif not self._get_candidate_hosts(spec):
2829 raise OrchestratorError("Invalid placement specification. No host(s) matched placement spec.\n"
2830 "Please check 'ceph orch host ls' for available hosts.\n"
2831 "Note: draining hosts are excluded from the candidate list.")
2832
2833 def _validate_tunedprofile_settings(self, spec: TunedProfileSpec) -> Dict[str, List[str]]:
2834 candidate_hosts = spec.placement.filter_matching_hostspecs(self.inventory.all_specs())
2835 invalid_options: Dict[str, List[str]] = {}
2836 for host in candidate_hosts:
2837 host_sysctl_options = self.cache.get_facts(host).get('sysctl_options', {})
2838 invalid_options[host] = []
2839 for option in spec.settings:
2840 if option not in host_sysctl_options:
2841 invalid_options[host].append(option)
2842 return invalid_options
2843
2844 def _validate_tuned_profile_spec(self, spec: TunedProfileSpec) -> None:
2845 if not spec.settings:
2846 raise OrchestratorError("Invalid spec: settings section cannot be empty.")
2847 self._validate_one_shot_placement_spec(spec.placement)
2848 invalid_options = self._validate_tunedprofile_settings(spec)
2849 if any(e for e in invalid_options.values()):
2850 raise OrchestratorError(
2851 f'Failed to apply tuned profile. Invalid sysctl option(s) for host(s) detected: {invalid_options}')
2852
2853 @handle_orch_error
2854 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str:
2855 outs = []
2856 for spec in specs:
2857 self._validate_tuned_profile_spec(spec)
2858 if no_overwrite and self.tuned_profiles.exists(spec.profile_name):
2859 outs.append(
2860 f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2861 else:
2862 # done, let's save the specs
2863 self.tuned_profiles.add_profile(spec)
2864 outs.append(f'Saved tuned profile {spec.profile_name}')
2865 self._kick_serve_loop()
2866 return '\n'.join(outs)
2867
2868 @handle_orch_error
2869 def rm_tuned_profile(self, profile_name: str) -> str:
2870 if profile_name not in self.tuned_profiles:
2871 raise OrchestratorError(
2872 f'Tuned profile {profile_name} does not exist. Nothing to remove.')
2873 self.tuned_profiles.rm_profile(profile_name)
2874 self._kick_serve_loop()
2875 return f'Removed tuned profile {profile_name}'
2876
2877 @handle_orch_error
2878 def tuned_profile_ls(self) -> List[TunedProfileSpec]:
2879 return self.tuned_profiles.list_profiles()
2880
2881 @handle_orch_error
2882 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str:
2883 if profile_name not in self.tuned_profiles:
2884 raise OrchestratorError(
2885 f'Tuned profile {profile_name} does not exist. Cannot add setting.')
2886 self.tuned_profiles.add_setting(profile_name, setting, value)
2887 self._kick_serve_loop()
2888 return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
2889
2890 @handle_orch_error
2891 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
2892 if profile_name not in self.tuned_profiles:
2893 raise OrchestratorError(
2894 f'Tuned profile {profile_name} does not exist. Cannot remove setting.')
2895 self.tuned_profiles.rm_setting(profile_name, setting)
2896 self._kick_serve_loop()
2897 return f'Removed setting {setting} from tuned profile {profile_name}'
2898
2899 @handle_orch_error
2900 def service_discovery_dump_cert(self) -> str:
2901 root_cert = self.get_store(ServiceDiscovery.KV_STORE_SD_ROOT_CERT)
2902 if not root_cert:
2903 raise OrchestratorError('No certificate found for service discovery')
2904 return root_cert
2905
2906 def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
2907 self.health_checks[name] = {
2908 'severity': 'warning',
2909 'summary': summary,
2910 'count': count,
2911 'detail': detail,
2912 }
2913 self.set_health_checks(self.health_checks)
2914
2915 def remove_health_warning(self, name: str) -> None:
2916 if name in self.health_checks:
2917 del self.health_checks[name]
2918 self.set_health_checks(self.health_checks)
2919
2920 def _plan(self, spec: ServiceSpec) -> dict:
2921 if spec.service_type == 'osd':
2922 return {'service_name': spec.service_name(),
2923 'service_type': spec.service_type,
2924 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
2925
2926 svc = self.cephadm_services[spec.service_type]
2927 ha = HostAssignment(
2928 spec=spec,
2929 hosts=self.cache.get_schedulable_hosts(),
2930 unreachable_hosts=self.cache.get_unreachable_hosts(),
2931 draining_hosts=self.cache.get_draining_hosts(),
2932 networks=self.cache.networks,
2933 daemons=self.cache.get_daemons_by_service(spec.service_name()),
2934 allow_colo=svc.allow_colo(),
2935 rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
2936 )
2937 ha.validate()
2938 hosts, to_add, to_remove = ha.place()
2939
2940 return {
2941 'service_name': spec.service_name(),
2942 'service_type': spec.service_type,
2943 'add': [hs.hostname for hs in to_add],
2944 'remove': [d.name() for d in to_remove]
2945 }
2946
2947 @handle_orch_error
2948 def plan(self, specs: Sequence[GenericSpec]) -> List:
2949 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2950 'to the current inventory setup. If any of these conditions change, the \n'
2951 'preview will be invalid. Please make sure to have a minimal \n'
2952 'timeframe between planning and applying the specs.'}]
2953 if any([spec.service_type == 'host' for spec in specs]):
2954 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2955 for spec in specs:
2956 results.append(self._plan(cast(ServiceSpec, spec)))
2957 return results
2958
2959 def _apply_service_spec(self, spec: ServiceSpec) -> str:
2960 if spec.placement.is_empty():
2961 # fill in default placement
2962 defaults = {
2963 'mon': PlacementSpec(count=5),
2964 'mgr': PlacementSpec(count=2),
2965 'mds': PlacementSpec(count=2),
2966 'rgw': PlacementSpec(count=2),
2967 'ingress': PlacementSpec(count=2),
2968 'iscsi': PlacementSpec(count=1),
2969 'rbd-mirror': PlacementSpec(count=2),
2970 'cephfs-mirror': PlacementSpec(count=1),
2971 'nfs': PlacementSpec(count=1),
2972 'grafana': PlacementSpec(count=1),
2973 'alertmanager': PlacementSpec(count=1),
2974 'prometheus': PlacementSpec(count=1),
2975 'node-exporter': PlacementSpec(host_pattern='*'),
2976 'ceph-exporter': PlacementSpec(host_pattern='*'),
2977 'loki': PlacementSpec(count=1),
2978 'promtail': PlacementSpec(host_pattern='*'),
2979 'crash': PlacementSpec(host_pattern='*'),
2980 'container': PlacementSpec(count=1),
2981 'snmp-gateway': PlacementSpec(count=1),
2982 'elasticsearch': PlacementSpec(count=1),
2983 'jaeger-agent': PlacementSpec(host_pattern='*'),
2984 'jaeger-collector': PlacementSpec(count=1),
2985 'jaeger-query': PlacementSpec(count=1)
2986 }
2987 spec.placement = defaults[spec.service_type]
2988 elif spec.service_type in ['mon', 'mgr'] and \
2989 spec.placement.count is not None and \
2990 spec.placement.count < 1:
2991 raise OrchestratorError('cannot scale %s service below 1' % (
2992 spec.service_type))
2993
2994 host_count = len(self.inventory.keys())
2995 max_count = self.max_count_per_host
2996
2997 if spec.placement.count is not None:
2998 if spec.service_type in ['mon', 'mgr']:
2999 if spec.placement.count > max(5, host_count):
3000 raise OrchestratorError(
3001 (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
3002 elif spec.service_type != 'osd':
3003 if spec.placement.count > (max_count * host_count):
3004 raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
3005 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3006
3007 if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd':
3008 raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.'
3009 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
3010
3011 HostAssignment(
3012 spec=spec,
3013 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
3014 unreachable_hosts=self.cache.get_unreachable_hosts(),
3015 draining_hosts=self.cache.get_draining_hosts(),
3016 networks=self.cache.networks,
3017 daemons=self.cache.get_daemons_by_service(spec.service_name()),
3018 allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
3019 ).validate()
3020
3021 self.log.info('Saving service %s spec with placement %s' % (
3022 spec.service_name(), spec.placement.pretty_str()))
3023 self.spec_store.save(spec)
3024 self._kick_serve_loop()
3025 return "Scheduled %s update..." % spec.service_name()
3026
3027 @handle_orch_error
3028 def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
3029 results = []
3030 for spec in specs:
3031 if no_overwrite:
3032 if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory:
3033 results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
3034 % (cast(HostSpec, spec).hostname, spec.service_type))
3035 continue
3036 elif cast(ServiceSpec, spec).service_name() in self.spec_store:
3037 results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
3038 % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
3039 continue
3040 results.append(self._apply(spec))
3041 return results
3042
3043 @handle_orch_error
3044 def apply_mgr(self, spec: ServiceSpec) -> str:
3045 return self._apply(spec)
3046
3047 @handle_orch_error
3048 def apply_mds(self, spec: ServiceSpec) -> str:
3049 return self._apply(spec)
3050
3051 @handle_orch_error
3052 def apply_rgw(self, spec: ServiceSpec) -> str:
3053 return self._apply(spec)
3054
3055 @handle_orch_error
3056 def apply_ingress(self, spec: ServiceSpec) -> str:
3057 return self._apply(spec)
3058
3059 @handle_orch_error
3060 def apply_iscsi(self, spec: ServiceSpec) -> str:
3061 return self._apply(spec)
3062
3063 @handle_orch_error
3064 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
3065 return self._apply(spec)
3066
3067 @handle_orch_error
3068 def apply_nfs(self, spec: ServiceSpec) -> str:
3069 return self._apply(spec)
3070
3071 def _get_dashboard_url(self):
3072 # type: () -> str
3073 return self.get('mgr_map').get('services', {}).get('dashboard', '')
3074
3075 @handle_orch_error
3076 def apply_prometheus(self, spec: ServiceSpec) -> str:
3077 return self._apply(spec)
3078
3079 @handle_orch_error
3080 def apply_loki(self, spec: ServiceSpec) -> str:
3081 return self._apply(spec)
3082
3083 @handle_orch_error
3084 def apply_promtail(self, spec: ServiceSpec) -> str:
3085 return self._apply(spec)
3086
3087 @handle_orch_error
3088 def apply_node_exporter(self, spec: ServiceSpec) -> str:
3089 return self._apply(spec)
3090
3091 @handle_orch_error
3092 def apply_ceph_exporter(self, spec: ServiceSpec) -> str:
3093 return self._apply(spec)
3094
3095 @handle_orch_error
3096 def apply_crash(self, spec: ServiceSpec) -> str:
3097 return self._apply(spec)
3098
3099 @handle_orch_error
3100 def apply_grafana(self, spec: ServiceSpec) -> str:
3101 return self._apply(spec)
3102
3103 @handle_orch_error
3104 def apply_alertmanager(self, spec: ServiceSpec) -> str:
3105 return self._apply(spec)
3106
3107 @handle_orch_error
3108 def apply_container(self, spec: ServiceSpec) -> str:
3109 return self._apply(spec)
3110
3111 @handle_orch_error
3112 def apply_snmp_gateway(self, spec: ServiceSpec) -> str:
3113 return self._apply(spec)
3114
3115 @handle_orch_error
3116 def set_unmanaged(self, service_name: str, value: bool) -> str:
3117 return self.spec_store.set_unmanaged(service_name, value)
3118
3119 @handle_orch_error
3120 def upgrade_check(self, image: str, version: str) -> str:
3121 if self.inventory.get_host_with_state("maintenance"):
3122 raise OrchestratorError("check aborted - you have hosts in maintenance state")
3123
3124 if version:
3125 target_name = self.container_image_base + ':v' + version
3126 elif image:
3127 target_name = image
3128 else:
3129 raise OrchestratorError('must specify either image or version')
3130
3131 with self.async_timeout_handler(cmd=f'cephadm inspect-image (image {target_name})'):
3132 image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
3133
3134 ceph_image_version = image_info.ceph_version
3135 if not ceph_image_version:
3136 return f'Unable to extract ceph version from {target_name}.'
3137 if ceph_image_version.startswith('ceph version '):
3138 ceph_image_version = ceph_image_version.split(' ')[2]
3139 version_error = self.upgrade._check_target_version(ceph_image_version)
3140 if version_error:
3141 return f'Incompatible upgrade: {version_error}'
3142
3143 self.log.debug(f'image info {image} -> {image_info}')
3144 r: dict = {
3145 'target_name': target_name,
3146 'target_id': image_info.image_id,
3147 'target_version': image_info.ceph_version,
3148 'needs_update': dict(),
3149 'up_to_date': list(),
3150 'non_ceph_image_daemons': list()
3151 }
3152 for host, dm in self.cache.daemons.items():
3153 for name, dd in dm.items():
3154 # check if the container digest for the digest we're checking upgrades for matches
3155 # the container digests for the daemon if "use_repo_digest" setting is true
3156 # or that the image name matches the daemon's image name if "use_repo_digest"
3157 # is false. The idea is to generally check if the daemon is already using
3158 # the image we're checking upgrade to.
3159 if (
3160 (self.use_repo_digest and dd.matches_digests(image_info.repo_digests))
3161 or (not self.use_repo_digest and dd.matches_image_name(image))
3162 ):
3163 r['up_to_date'].append(dd.name())
3164 elif dd.daemon_type in CEPH_IMAGE_TYPES:
3165 r['needs_update'][dd.name()] = {
3166 'current_name': dd.container_image_name,
3167 'current_id': dd.container_image_id,
3168 'current_version': dd.version,
3169 }
3170 else:
3171 r['non_ceph_image_daemons'].append(dd.name())
3172 if self.use_repo_digest and image_info.repo_digests:
3173 # FIXME: we assume the first digest is the best one to use
3174 r['target_digest'] = image_info.repo_digests[0]
3175
3176 return json.dumps(r, indent=4, sort_keys=True)
3177
3178 @handle_orch_error
3179 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
3180 return self.upgrade.upgrade_status()
3181
3182 @handle_orch_error
3183 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
3184 return self.upgrade.upgrade_ls(image, tags, show_all_versions)
3185
3186 @handle_orch_error
3187 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
3188 services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
3189 if self.inventory.get_host_with_state("maintenance"):
3190 raise OrchestratorError("Upgrade aborted - you have host(s) in maintenance state")
3191 if self.offline_hosts:
3192 raise OrchestratorError(f"Upgrade aborted - Some host(s) are currently offline: {self.offline_hosts}")
3193 if daemon_types is not None and services is not None:
3194 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
3195 if daemon_types is not None:
3196 for dtype in daemon_types:
3197 if dtype not in CEPH_UPGRADE_ORDER:
3198 raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
3199 f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
3200 if services is not None:
3201 for service in services:
3202 if service not in self.spec_store:
3203 raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
3204 f'Known services are: {self.spec_store.all_specs.keys()}')
3205 hosts: Optional[List[str]] = None
3206 if host_placement is not None:
3207 all_hosts = list(self.inventory.all_specs())
3208 placement = PlacementSpec.from_string(host_placement)
3209 hosts = placement.filter_matching_hostspecs(all_hosts)
3210 if not hosts:
3211 raise OrchestratorError(
3212 f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
3213
3214 if limit is not None:
3215 if limit < 1:
3216 raise OrchestratorError(
3217 f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
3218
3219 return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
3220
3221 @handle_orch_error
3222 def upgrade_pause(self) -> str:
3223 return self.upgrade.upgrade_pause()
3224
3225 @handle_orch_error
3226 def upgrade_resume(self) -> str:
3227 return self.upgrade.upgrade_resume()
3228
3229 @handle_orch_error
3230 def upgrade_stop(self) -> str:
3231 return self.upgrade.upgrade_stop()
3232
3233 @handle_orch_error
3234 def remove_osds(self, osd_ids: List[str],
3235 replace: bool = False,
3236 force: bool = False,
3237 zap: bool = False,
3238 no_destroy: bool = False) -> str:
3239 """
3240 Takes a list of OSDs and schedules them for removal.
3241 The function that takes care of the actual removal is
3242 process_removal_queue().
3243 """
3244
3245 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
3246 to_remove_daemons = list()
3247 for daemon in daemons:
3248 if daemon.daemon_id in osd_ids:
3249 to_remove_daemons.append(daemon)
3250 if not to_remove_daemons:
3251 return f"Unable to find OSDs: {osd_ids}"
3252
3253 for daemon in to_remove_daemons:
3254 assert daemon.daemon_id is not None
3255 try:
3256 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
3257 replace=replace,
3258 force=force,
3259 zap=zap,
3260 no_destroy=no_destroy,
3261 hostname=daemon.hostname,
3262 process_started_at=datetime_now(),
3263 remove_util=self.to_remove_osds.rm_util))
3264 except NotFoundError:
3265 return f"Unable to find OSDs: {osd_ids}"
3266
3267 # trigger the serve loop to initiate the removal
3268 self._kick_serve_loop()
3269 warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
3270 "Run the `ceph-volume lvm zap` command with `--destroy`"
3271 " against the VG/LV if you want them to be destroyed.")
3272 return f"Scheduled OSD(s) for removal.{warning_zap}"
3273
3274 @handle_orch_error
3275 def stop_remove_osds(self, osd_ids: List[str]) -> str:
3276 """
3277 Stops a `removal` process for a List of OSDs.
3278 This will revert their weight and remove it from the osds_to_remove queue
3279 """
3280 for osd_id in osd_ids:
3281 try:
3282 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
3283 remove_util=self.to_remove_osds.rm_util))
3284 except (NotFoundError, KeyError, ValueError):
3285 return f'Unable to find OSD in the queue: {osd_id}'
3286
3287 # trigger the serve loop to halt the removal
3288 self._kick_serve_loop()
3289 return "Stopped OSD(s) removal"
3290
3291 @handle_orch_error
3292 def remove_osds_status(self) -> List[OSD]:
3293 """
3294 The CLI call to retrieve an osd removal report
3295 """
3296 return self.to_remove_osds.all_osds()
3297
3298 @handle_orch_error
3299 def drain_host(self, hostname, force=False):
3300 # type: (str, bool) -> str
3301 """
3302 Drain all daemons from a host.
3303 :param host: host name
3304 """
3305
3306 # if we drain the last admin host we could end up removing the only instance
3307 # of the config and keyring and cause issues
3308 if not force:
3309 p = PlacementSpec(label='_admin')
3310 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
3311 if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
3312 raise OrchestratorValidationError(f"Host {hostname} is the last host with the '_admin'"
3313 " label.\nDraining this host could cause the removal"
3314 " of the last cluster config/keyring managed by cephadm.\n"
3315 "It is recommended to add the _admin label to another host"
3316 " before completing this operation.\nIf you're certain this is"
3317 " what you want rerun this command with --force.")
3318
3319 self.add_host_label(hostname, '_no_schedule')
3320
3321 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
3322
3323 osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
3324 self.remove_osds(osds_to_remove)
3325
3326 daemons_table = ""
3327 daemons_table += "{:<20} {:<15}\n".format("type", "id")
3328 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
3329 for d in daemons:
3330 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
3331
3332 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table)
3333
3334 def trigger_connect_dashboard_rgw(self) -> None:
3335 self.need_connect_dashboard_rgw = True
3336 self.event.set()