]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
b2a48801cc2d779a1faf603718b53de5fed78169
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import ipaddress
4 import logging
5 import re
6 import shlex
7 from collections import defaultdict
8 from configparser import ConfigParser
9 from functools import wraps
10 from tempfile import TemporaryDirectory, NamedTemporaryFile
11 from threading import Event
12
13 import string
14 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
15 Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type, Awaitable
16
17 import datetime
18 import os
19 import random
20 import multiprocessing.pool
21 import subprocess
22 from prettytable import PrettyTable
23
24 from ceph.deployment import inventory
25 from ceph.deployment.drive_group import DriveGroupSpec
26 from ceph.deployment.service_spec import \
27 ServiceSpec, PlacementSpec, \
28 HostPlacementSpec, IngressSpec, \
29 TunedProfileSpec
30 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
31 from cephadm.serve import CephadmServe
32 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
33 from cephadm.agent import CherryPyThread, CephadmAgentHelpers
34
35
36 from mgr_module import MgrModule, HandleCommandResult, Option, NotifyType
37 import orchestrator
38 from orchestrator.module import to_format, Format
39
40 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
41 CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
42 service_to_daemon_types
43 from orchestrator._interface import GenericSpec
44 from orchestrator._interface import daemon_type_to_service
45
46 from . import utils
47 from . import ssh
48 from .migrations import Migrations
49 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
50 RbdMirrorService, CrashService, CephadmService, CephfsMirrorService, CephadmAgent
51 from .services.ingress import IngressService
52 from .services.container import CustomContainerService
53 from .services.iscsi import IscsiService
54 from .services.nfs import NFSService
55 from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
56 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
57 NodeExporterService, SNMPGatewayService, LokiService, PromtailService
58 from .schedule import HostAssignment
59 from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
60 ClientKeyringStore, ClientKeyringSpec, TunedProfileStore
61 from .upgrade import CephadmUpgrade
62 from .template import TemplateMgr
63 from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall_hosts, \
64 cephadmNoImage, CEPH_UPGRADE_ORDER
65 from .configchecks import CephadmConfigChecks
66 from .offline_watcher import OfflineHostWatcher
67 from .tuned_profiles import TunedProfileUtils
68
69 try:
70 import asyncssh
71 except ImportError as e:
72 asyncssh = None # type: ignore
73 asyncssh_import_error = str(e)
74
75 logger = logging.getLogger(__name__)
76
77 T = TypeVar('T')
78
79 DEFAULT_SSH_CONFIG = """
80 Host *
81 User root
82 StrictHostKeyChecking no
83 UserKnownHostsFile /dev/null
84 ConnectTimeout=30
85 """
86
87 # cherrypy likes to sys.exit on error. don't let it take us down too!
88
89
90 def os_exit_noop(status: int) -> None:
91 pass
92
93
94 os._exit = os_exit_noop # type: ignore
95
96
97 # Default container images -----------------------------------------------------
98 DEFAULT_IMAGE = 'quay.io/ceph/ceph'
99 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.33.4'
100 DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.3.1'
101 DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
102 DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
103 DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.23.0'
104 DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:8.3.5'
105 DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
106 DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.1.5'
107 DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
108 # ------------------------------------------------------------------------------
109
110
111 def host_exists(hostname_position: int = 1) -> Callable:
112 """Check that a hostname exists in the inventory"""
113 def inner(func: Callable) -> Callable:
114 @wraps(func)
115 def wrapper(*args: Any, **kwargs: Any) -> Any:
116 this = args[0] # self object
117 hostname = args[hostname_position]
118 if hostname not in this.cache.get_hosts():
119 candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
120 help_msg = f"Did you mean {candidates}?" if candidates else ""
121 raise OrchestratorError(
122 f"Cannot find host '{hostname}' in the inventory. {help_msg}")
123
124 return func(*args, **kwargs)
125 return wrapper
126 return inner
127
128
129 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
130 metaclass=CLICommandMeta):
131
132 _STORE_HOST_PREFIX = "host"
133
134 instance = None
135 NOTIFY_TYPES = [NotifyType.mon_map, NotifyType.pg_summary]
136 NATIVE_OPTIONS = [] # type: List[Any]
137 MODULE_OPTIONS = [
138 Option(
139 'ssh_config_file',
140 type='str',
141 default=None,
142 desc='customized SSH config file to connect to managed hosts',
143 ),
144 Option(
145 'device_cache_timeout',
146 type='secs',
147 default=30 * 60,
148 desc='seconds to cache device inventory',
149 ),
150 Option(
151 'device_enhanced_scan',
152 type='bool',
153 default=False,
154 desc='Use libstoragemgmt during device scans',
155 ),
156 Option(
157 'daemon_cache_timeout',
158 type='secs',
159 default=10 * 60,
160 desc='seconds to cache service (daemon) inventory',
161 ),
162 Option(
163 'facts_cache_timeout',
164 type='secs',
165 default=1 * 60,
166 desc='seconds to cache host facts data',
167 ),
168 Option(
169 'host_check_interval',
170 type='secs',
171 default=10 * 60,
172 desc='how frequently to perform a host check',
173 ),
174 Option(
175 'mode',
176 type='str',
177 enum_allowed=['root', 'cephadm-package'],
178 default='root',
179 desc='mode for remote execution of cephadm',
180 ),
181 Option(
182 'container_image_base',
183 default=DEFAULT_IMAGE,
184 desc='Container image name, without the tag',
185 runtime=True,
186 ),
187 Option(
188 'container_image_prometheus',
189 default=DEFAULT_PROMETHEUS_IMAGE,
190 desc='Prometheus container image',
191 ),
192 Option(
193 'container_image_grafana',
194 default=DEFAULT_GRAFANA_IMAGE,
195 desc='Prometheus container image',
196 ),
197 Option(
198 'container_image_alertmanager',
199 default=DEFAULT_ALERT_MANAGER_IMAGE,
200 desc='Prometheus container image',
201 ),
202 Option(
203 'container_image_node_exporter',
204 default=DEFAULT_NODE_EXPORTER_IMAGE,
205 desc='Prometheus container image',
206 ),
207 Option(
208 'container_image_loki',
209 default=DEFAULT_LOKI_IMAGE,
210 desc='Loki container image',
211 ),
212 Option(
213 'container_image_promtail',
214 default=DEFAULT_PROMTAIL_IMAGE,
215 desc='Promtail container image',
216 ),
217 Option(
218 'container_image_haproxy',
219 default=DEFAULT_HAPROXY_IMAGE,
220 desc='HAproxy container image',
221 ),
222 Option(
223 'container_image_keepalived',
224 default=DEFAULT_KEEPALIVED_IMAGE,
225 desc='Keepalived container image',
226 ),
227 Option(
228 'container_image_snmp_gateway',
229 default=DEFAULT_SNMP_GATEWAY_IMAGE,
230 desc='SNMP Gateway container image',
231 ),
232 Option(
233 'warn_on_stray_hosts',
234 type='bool',
235 default=True,
236 desc='raise a health warning if daemons are detected on a host '
237 'that is not managed by cephadm',
238 ),
239 Option(
240 'warn_on_stray_daemons',
241 type='bool',
242 default=True,
243 desc='raise a health warning if daemons are detected '
244 'that are not managed by cephadm',
245 ),
246 Option(
247 'warn_on_failed_host_check',
248 type='bool',
249 default=True,
250 desc='raise a health warning if the host check fails',
251 ),
252 Option(
253 'log_to_cluster',
254 type='bool',
255 default=True,
256 desc='log to the "cephadm" cluster log channel"',
257 ),
258 Option(
259 'allow_ptrace',
260 type='bool',
261 default=False,
262 desc='allow SYS_PTRACE capability on ceph containers',
263 long_desc='The SYS_PTRACE capability is needed to attach to a '
264 'process with gdb or strace. Enabling this options '
265 'can allow debugging daemons that encounter problems '
266 'at runtime.',
267 ),
268 Option(
269 'container_init',
270 type='bool',
271 default=True,
272 desc='Run podman/docker with `--init`'
273 ),
274 Option(
275 'prometheus_alerts_path',
276 type='str',
277 default='/etc/prometheus/ceph/ceph_default_alerts.yml',
278 desc='location of alerts to include in prometheus deployments',
279 ),
280 Option(
281 'migration_current',
282 type='int',
283 default=None,
284 desc='internal - do not modify',
285 # used to track track spec and other data migrations.
286 ),
287 Option(
288 'config_dashboard',
289 type='bool',
290 default=True,
291 desc='manage configs like API endpoints in Dashboard.'
292 ),
293 Option(
294 'manage_etc_ceph_ceph_conf',
295 type='bool',
296 default=False,
297 desc='Manage and own /etc/ceph/ceph.conf on the hosts.',
298 ),
299 Option(
300 'manage_etc_ceph_ceph_conf_hosts',
301 type='str',
302 default='*',
303 desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
304 ),
305 # not used anymore
306 Option(
307 'registry_url',
308 type='str',
309 default=None,
310 desc='Registry url for login purposes. This is not the default registry'
311 ),
312 Option(
313 'registry_username',
314 type='str',
315 default=None,
316 desc='Custom repository username. Only used for logging into a registry.'
317 ),
318 Option(
319 'registry_password',
320 type='str',
321 default=None,
322 desc='Custom repository password. Only used for logging into a registry.'
323 ),
324 ####
325 Option(
326 'registry_insecure',
327 type='bool',
328 default=False,
329 desc='Registry is to be considered insecure (no TLS available). Only for development purposes.'
330 ),
331 Option(
332 'use_repo_digest',
333 type='bool',
334 default=True,
335 desc='Automatically convert image tags to image digest. Make sure all daemons use the same image',
336 ),
337 Option(
338 'config_checks_enabled',
339 type='bool',
340 default=False,
341 desc='Enable or disable the cephadm configuration analysis',
342 ),
343 Option(
344 'default_registry',
345 type='str',
346 default='docker.io',
347 desc='Search-registry to which we should normalize unqualified image names. '
348 'This is not the default registry',
349 ),
350 Option(
351 'max_count_per_host',
352 type='int',
353 default=10,
354 desc='max number of daemons per service per host',
355 ),
356 Option(
357 'autotune_memory_target_ratio',
358 type='float',
359 default=.7,
360 desc='ratio of total system memory to divide amongst autotuned daemons'
361 ),
362 Option(
363 'autotune_interval',
364 type='secs',
365 default=10 * 60,
366 desc='how frequently to autotune daemon memory'
367 ),
368 Option(
369 'use_agent',
370 type='bool',
371 default=False,
372 desc='Use cephadm agent on each host to gather and send metadata'
373 ),
374 Option(
375 'agent_refresh_rate',
376 type='secs',
377 default=20,
378 desc='How often agent on each host will try to gather and send metadata'
379 ),
380 Option(
381 'agent_starting_port',
382 type='int',
383 default=4721,
384 desc='First port agent will try to bind to (will also try up to next 1000 subsequent ports if blocked)'
385 ),
386 Option(
387 'agent_down_multiplier',
388 type='float',
389 default=3.0,
390 desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
391 ),
392 Option(
393 'max_osd_draining_count',
394 type='int',
395 default=10,
396 desc='max number of osds that will be drained simultaneously when osds are removed'
397 ),
398 ]
399
400 def __init__(self, *args: Any, **kwargs: Any):
401 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
402 self._cluster_fsid: str = self.get('mon_map')['fsid']
403 self.last_monmap: Optional[datetime.datetime] = None
404
405 # for serve()
406 self.run = True
407 self.event = Event()
408
409 self.ssh = ssh.SSHManager(self)
410
411 if self.get_store('pause'):
412 self.paused = True
413 else:
414 self.paused = False
415
416 # for mypy which does not run the code
417 if TYPE_CHECKING:
418 self.ssh_config_file = None # type: Optional[str]
419 self.device_cache_timeout = 0
420 self.daemon_cache_timeout = 0
421 self.facts_cache_timeout = 0
422 self.host_check_interval = 0
423 self.max_count_per_host = 0
424 self.mode = ''
425 self.container_image_base = ''
426 self.container_image_prometheus = ''
427 self.container_image_grafana = ''
428 self.container_image_alertmanager = ''
429 self.container_image_node_exporter = ''
430 self.container_image_loki = ''
431 self.container_image_promtail = ''
432 self.container_image_haproxy = ''
433 self.container_image_keepalived = ''
434 self.container_image_snmp_gateway = ''
435 self.warn_on_stray_hosts = True
436 self.warn_on_stray_daemons = True
437 self.warn_on_failed_host_check = True
438 self.allow_ptrace = False
439 self.container_init = True
440 self.prometheus_alerts_path = ''
441 self.migration_current: Optional[int] = None
442 self.config_dashboard = True
443 self.manage_etc_ceph_ceph_conf = True
444 self.manage_etc_ceph_ceph_conf_hosts = '*'
445 self.registry_url: Optional[str] = None
446 self.registry_username: Optional[str] = None
447 self.registry_password: Optional[str] = None
448 self.registry_insecure: bool = False
449 self.use_repo_digest = True
450 self.default_registry = ''
451 self.autotune_memory_target_ratio = 0.0
452 self.autotune_interval = 0
453 self.ssh_user: Optional[str] = None
454 self._ssh_options: Optional[str] = None
455 self.tkey = NamedTemporaryFile()
456 self.ssh_config_fname: Optional[str] = None
457 self.ssh_config: Optional[str] = None
458 self._temp_files: List = []
459 self.ssh_key: Optional[str] = None
460 self.ssh_pub: Optional[str] = None
461 self.use_agent = False
462 self.agent_refresh_rate = 0
463 self.agent_down_multiplier = 0.0
464 self.agent_starting_port = 0
465 self.apply_spec_fails: List[Tuple[str, str]] = []
466 self.max_osd_draining_count = 10
467 self.device_enhanced_scan = False
468
469 self.notify(NotifyType.mon_map, None)
470 self.config_notify()
471
472 path = self.get_ceph_option('cephadm_path')
473 try:
474 assert isinstance(path, str)
475 with open(path, 'r') as f:
476 self._cephadm = f.read()
477 except (IOError, TypeError) as e:
478 raise RuntimeError("unable to read cephadm at '%s': %s" % (
479 path, str(e)))
480
481 self.cephadm_binary_path = self._get_cephadm_binary_path()
482
483 self._worker_pool = multiprocessing.pool.ThreadPool(10)
484
485 self.ssh._reconfig_ssh()
486
487 CephadmOrchestrator.instance = self
488
489 self.upgrade = CephadmUpgrade(self)
490
491 self.health_checks: Dict[str, dict] = {}
492
493 self.inventory = Inventory(self)
494
495 self.cache = HostCache(self)
496 self.cache.load()
497
498 self.agent_cache = AgentCache(self)
499 self.agent_cache.load()
500
501 self.to_remove_osds = OSDRemovalQueue(self)
502 self.to_remove_osds.load_from_store()
503
504 self.spec_store = SpecStore(self)
505 self.spec_store.load()
506
507 self.keys = ClientKeyringStore(self)
508 self.keys.load()
509
510 self.tuned_profiles = TunedProfileStore(self)
511 self.tuned_profiles.load()
512
513 self.tuned_profile_utils = TunedProfileUtils(self)
514
515 # ensure the host lists are in sync
516 for h in self.inventory.keys():
517 if h not in self.cache.daemons:
518 self.cache.prime_empty_host(h)
519 for h in self.cache.get_hosts():
520 if h not in self.inventory:
521 self.cache.rm_host(h)
522
523 # in-memory only.
524 self.events = EventStore(self)
525 self.offline_hosts: Set[str] = set()
526
527 self.migration = Migrations(self)
528
529 _service_clses: Sequence[Type[CephadmService]] = [
530 OSDService, NFSService, MonService, MgrService, MdsService,
531 RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
532 PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
533 IngressService, CustomContainerService, CephfsMirrorService,
534 CephadmAgent, SNMPGatewayService
535 ]
536
537 # https://github.com/python/mypy/issues/8993
538 self.cephadm_services: Dict[str, CephadmService] = {
539 cls.TYPE: cls(self) for cls in _service_clses} # type: ignore
540
541 self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
542 self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
543
544 self.template = TemplateMgr(self)
545
546 self.requires_post_actions: Set[str] = set()
547 self.need_connect_dashboard_rgw = False
548
549 self.config_checker = CephadmConfigChecks(self)
550
551 self.cherrypy_thread = CherryPyThread(self)
552 self.cherrypy_thread.start()
553 self.agent_helpers = CephadmAgentHelpers(self)
554 if self.use_agent:
555 self.agent_helpers._apply_agent()
556
557 self.offline_watcher = OfflineHostWatcher(self)
558 self.offline_watcher.start()
559
560 def shutdown(self) -> None:
561 self.log.debug('shutdown')
562 self._worker_pool.close()
563 self._worker_pool.join()
564 self.cherrypy_thread.shutdown()
565 self.offline_watcher.shutdown()
566 self.run = False
567 self.event.set()
568
569 def _get_cephadm_service(self, service_type: str) -> CephadmService:
570 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
571 return self.cephadm_services[service_type]
572
573 def _get_cephadm_binary_path(self) -> str:
574 import hashlib
575 m = hashlib.sha256()
576 m.update(self._cephadm.encode())
577 return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
578
579 def _kick_serve_loop(self) -> None:
580 self.log.debug('_kick_serve_loop')
581 self.event.set()
582
583 def serve(self) -> None:
584 """
585 The main loop of cephadm.
586
587 A command handler will typically change the declarative state
588 of cephadm. This loop will then attempt to apply this new state.
589 """
590 # for ssh in serve
591 self.event_loop = ssh.EventLoopThread()
592
593 serve = CephadmServe(self)
594 serve.serve()
595
596 def wait_async(self, coro: Awaitable[T]) -> T:
597 return self.event_loop.get_result(coro)
598
599 def set_container_image(self, entity: str, image: str) -> None:
600 self.check_mon_command({
601 'prefix': 'config set',
602 'name': 'container_image',
603 'value': image,
604 'who': entity,
605 })
606
607 def config_notify(self) -> None:
608 """
609 This method is called whenever one of our config options is changed.
610
611 TODO: this method should be moved into mgr_module.py
612 """
613 for opt in self.MODULE_OPTIONS:
614 setattr(self,
615 opt['name'], # type: ignore
616 self.get_module_option(opt['name'])) # type: ignore
617 self.log.debug(' mgr option %s = %s',
618 opt['name'], getattr(self, opt['name'])) # type: ignore
619 for opt in self.NATIVE_OPTIONS:
620 setattr(self,
621 opt, # type: ignore
622 self.get_ceph_option(opt))
623 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
624
625 self.event.set()
626
627 def notify(self, notify_type: NotifyType, notify_id: Optional[str]) -> None:
628 if notify_type == NotifyType.mon_map:
629 # get monmap mtime so we can refresh configs when mons change
630 monmap = self.get('mon_map')
631 self.last_monmap = str_to_datetime(monmap['modified'])
632 if self.last_monmap and self.last_monmap > datetime_now():
633 self.last_monmap = None # just in case clocks are skewed
634 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
635 # getattr, due to notify() being called before config_notify()
636 self._kick_serve_loop()
637 if notify_type == NotifyType.pg_summary:
638 self._trigger_osd_removal()
639
640 def _trigger_osd_removal(self) -> None:
641 remove_queue = self.to_remove_osds.as_osd_ids()
642 if not remove_queue:
643 return
644 data = self.get("osd_stats")
645 for osd in data.get('osd_stats', []):
646 if osd.get('num_pgs') == 0:
647 # if _ANY_ osd that is currently in the queue appears to be empty,
648 # start the removal process
649 if int(osd.get('osd')) in remove_queue:
650 self.log.debug('Found empty osd. Starting removal process')
651 # if the osd that is now empty is also part of the removal queue
652 # start the process
653 self._kick_serve_loop()
654
655 def pause(self) -> None:
656 if not self.paused:
657 self.log.info('Paused')
658 self.set_store('pause', 'true')
659 self.paused = True
660 # wake loop so we update the health status
661 self._kick_serve_loop()
662
663 def resume(self) -> None:
664 if self.paused:
665 self.log.info('Resumed')
666 self.paused = False
667 self.set_store('pause', None)
668 # unconditionally wake loop so that 'orch resume' can be used to kick
669 # cephadm
670 self._kick_serve_loop()
671
672 def get_unique_name(
673 self,
674 daemon_type: str,
675 host: str,
676 existing: List[orchestrator.DaemonDescription],
677 prefix: Optional[str] = None,
678 forcename: Optional[str] = None,
679 rank: Optional[int] = None,
680 rank_generation: Optional[int] = None,
681 ) -> str:
682 """
683 Generate a unique random service name
684 """
685 suffix = daemon_type not in [
686 'mon', 'crash',
687 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
688 'container', 'agent', 'snmp-gateway', 'loki', 'promtail'
689 ]
690 if forcename:
691 if len([d for d in existing if d.daemon_id == forcename]):
692 raise orchestrator.OrchestratorValidationError(
693 f'name {daemon_type}.{forcename} already in use')
694 return forcename
695
696 if '.' in host:
697 host = host.split('.')[0]
698 while True:
699 if prefix:
700 name = prefix + '.'
701 else:
702 name = ''
703 if rank is not None and rank_generation is not None:
704 name += f'{rank}.{rank_generation}.'
705 name += host
706 if suffix:
707 name += '.' + ''.join(random.choice(string.ascii_lowercase)
708 for _ in range(6))
709 if len([d for d in existing if d.daemon_id == name]):
710 if not suffix:
711 raise orchestrator.OrchestratorValidationError(
712 f'name {daemon_type}.{name} already in use')
713 self.log.debug('name %s exists, trying again', name)
714 continue
715 return name
716
717 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
718 if ssh_config is None or len(ssh_config.strip()) == 0:
719 raise OrchestratorValidationError('ssh_config cannot be empty')
720 # StrictHostKeyChecking is [yes|no] ?
721 res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
722 if not res:
723 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
724 for s in res:
725 if 'ask' in s.lower():
726 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
727
728 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
729 if not os.path.isfile(ssh_config_fname):
730 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
731 ssh_config_fname))
732
733 def _process_ls_output(self, host: str, ls: List[Dict[str, Any]]) -> None:
734 dm = {}
735 for d in ls:
736 if not d['style'].startswith('cephadm'):
737 continue
738 if d['fsid'] != self._cluster_fsid:
739 continue
740 if '.' not in d['name']:
741 continue
742 sd = orchestrator.DaemonDescription()
743 sd.last_refresh = datetime_now()
744 for k in ['created', 'started', 'last_configured', 'last_deployed']:
745 v = d.get(k, None)
746 if v:
747 setattr(sd, k, str_to_datetime(d[k]))
748 sd.daemon_type = d['name'].split('.')[0]
749 if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
750 logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
751 continue
752
753 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
754 sd.hostname = host
755 sd.container_id = d.get('container_id')
756 if sd.container_id:
757 # shorten the hash
758 sd.container_id = sd.container_id[0:12]
759 sd.container_image_name = d.get('container_image_name')
760 sd.container_image_id = d.get('container_image_id')
761 sd.container_image_digests = d.get('container_image_digests')
762 sd.memory_usage = d.get('memory_usage')
763 sd.memory_request = d.get('memory_request')
764 sd.memory_limit = d.get('memory_limit')
765 sd.cpu_percentage = d.get('cpu_percentage')
766 sd._service_name = d.get('service_name')
767 sd.deployed_by = d.get('deployed_by')
768 sd.version = d.get('version')
769 sd.ports = d.get('ports')
770 sd.ip = d.get('ip')
771 sd.rank = int(d['rank']) if d.get('rank') is not None else None
772 sd.rank_generation = int(d['rank_generation']) if d.get(
773 'rank_generation') is not None else None
774 sd.extra_container_args = d.get('extra_container_args')
775 if 'state' in d:
776 sd.status_desc = d['state']
777 sd.status = {
778 'running': DaemonDescriptionStatus.running,
779 'stopped': DaemonDescriptionStatus.stopped,
780 'error': DaemonDescriptionStatus.error,
781 'unknown': DaemonDescriptionStatus.error,
782 }[d['state']]
783 else:
784 sd.status_desc = 'unknown'
785 sd.status = None
786 dm[sd.name()] = sd
787 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
788 self.cache.update_host_daemons(host, dm)
789 self.cache.save_host(host)
790 return None
791
792 def update_watched_hosts(self) -> None:
793 # currently, we are watching hosts with nfs daemons
794 hosts_to_watch = [d.hostname for d in self.cache.get_daemons(
795 ) if d.daemon_type in RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES]
796 self.offline_watcher.set_hosts(list(set([h for h in hosts_to_watch if h is not None])))
797
798 def offline_hosts_remove(self, host: str) -> None:
799 if host in self.offline_hosts:
800 self.offline_hosts.remove(host)
801
802 def update_failed_daemon_health_check(self) -> None:
803 failed_daemons = []
804 for dd in self.cache.get_error_daemons():
805 if dd.daemon_type != 'agent': # agents tracked by CEPHADM_AGENT_DOWN
806 failed_daemons.append('daemon %s on %s is in %s state' % (
807 dd.name(), dd.hostname, dd.status_desc
808 ))
809 self.remove_health_warning('CEPHADM_FAILED_DAEMON')
810 if failed_daemons:
811 self.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(
812 failed_daemons), failed_daemons)
813
814 @staticmethod
815 def can_run() -> Tuple[bool, str]:
816 if asyncssh is not None:
817 return True, ""
818 else:
819 return False, "loading asyncssh library:{}".format(
820 asyncssh_import_error)
821
822 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
823 """
824 The cephadm orchestrator is always available.
825 """
826 ok, err = self.can_run()
827 if not ok:
828 return ok, err, {}
829 if not self.ssh_key or not self.ssh_pub:
830 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
831
832 # mypy is unable to determine type for _processes since it's private
833 worker_count: int = self._worker_pool._processes # type: ignore
834 ret = {
835 "workers": worker_count,
836 "paused": self.paused,
837 }
838
839 return True, err, ret
840
841 def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
842 self.set_store(what, new)
843 self.ssh._reconfig_ssh()
844 if self.cache.get_hosts():
845 # Can't check anything without hosts
846 host = self.cache.get_hosts()[0]
847 r = CephadmServe(self)._check_host(host)
848 if r is not None:
849 # connection failed reset user
850 self.set_store(what, old)
851 self.ssh._reconfig_ssh()
852 raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
853 self.log.info(f'Set ssh {what}')
854
855 @orchestrator._cli_write_command(
856 prefix='cephadm set-ssh-config')
857 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
858 """
859 Set the ssh_config file (use -i <ssh_config>)
860 """
861 # Set an ssh_config file provided from stdin
862
863 old = self.ssh_config
864 if inbuf == old:
865 return 0, "value unchanged", ""
866 self.validate_ssh_config_content(inbuf)
867 self._validate_and_set_ssh_val('ssh_config', inbuf, old)
868 return 0, "", ""
869
870 @orchestrator._cli_write_command('cephadm clear-ssh-config')
871 def _clear_ssh_config(self) -> Tuple[int, str, str]:
872 """
873 Clear the ssh_config file
874 """
875 # Clear the ssh_config file provided from stdin
876 self.set_store("ssh_config", None)
877 self.ssh_config_tmp = None
878 self.log.info('Cleared ssh_config')
879 self.ssh._reconfig_ssh()
880 return 0, "", ""
881
882 @orchestrator._cli_read_command('cephadm get-ssh-config')
883 def _get_ssh_config(self) -> HandleCommandResult:
884 """
885 Returns the ssh config as used by cephadm
886 """
887 if self.ssh_config_file:
888 self.validate_ssh_config_fname(self.ssh_config_file)
889 with open(self.ssh_config_file) as f:
890 return HandleCommandResult(stdout=f.read())
891 ssh_config = self.get_store("ssh_config")
892 if ssh_config:
893 return HandleCommandResult(stdout=ssh_config)
894 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
895
896 @orchestrator._cli_write_command('cephadm generate-key')
897 def _generate_key(self) -> Tuple[int, str, str]:
898 """
899 Generate a cluster SSH key (if not present)
900 """
901 if not self.ssh_pub or not self.ssh_key:
902 self.log.info('Generating ssh key...')
903 tmp_dir = TemporaryDirectory()
904 path = tmp_dir.name + '/key'
905 try:
906 subprocess.check_call([
907 '/usr/bin/ssh-keygen',
908 '-C', 'ceph-%s' % self._cluster_fsid,
909 '-N', '',
910 '-f', path
911 ])
912 with open(path, 'r') as f:
913 secret = f.read()
914 with open(path + '.pub', 'r') as f:
915 pub = f.read()
916 finally:
917 os.unlink(path)
918 os.unlink(path + '.pub')
919 tmp_dir.cleanup()
920 self.set_store('ssh_identity_key', secret)
921 self.set_store('ssh_identity_pub', pub)
922 self.ssh._reconfig_ssh()
923 return 0, '', ''
924
925 @orchestrator._cli_write_command(
926 'cephadm set-priv-key')
927 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
928 """Set cluster SSH private key (use -i <private_key>)"""
929 if inbuf is None or len(inbuf) == 0:
930 return -errno.EINVAL, "", "empty private ssh key provided"
931 old = self.ssh_key
932 if inbuf == old:
933 return 0, "value unchanged", ""
934 self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old)
935 self.log.info('Set ssh private key')
936 return 0, "", ""
937
938 @orchestrator._cli_write_command(
939 'cephadm set-pub-key')
940 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
941 """Set cluster SSH public key (use -i <public_key>)"""
942 if inbuf is None or len(inbuf) == 0:
943 return -errno.EINVAL, "", "empty public ssh key provided"
944 old = self.ssh_pub
945 if inbuf == old:
946 return 0, "value unchanged", ""
947 self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
948 return 0, "", ""
949
950 @orchestrator._cli_write_command(
951 'cephadm clear-key')
952 def _clear_key(self) -> Tuple[int, str, str]:
953 """Clear cluster SSH key"""
954 self.set_store('ssh_identity_key', None)
955 self.set_store('ssh_identity_pub', None)
956 self.ssh._reconfig_ssh()
957 self.log.info('Cleared cluster SSH key')
958 return 0, '', ''
959
960 @orchestrator._cli_read_command(
961 'cephadm get-pub-key')
962 def _get_pub_key(self) -> Tuple[int, str, str]:
963 """Show SSH public key for connecting to cluster hosts"""
964 if self.ssh_pub:
965 return 0, self.ssh_pub, ''
966 else:
967 return -errno.ENOENT, '', 'No cluster SSH key defined'
968
969 @orchestrator._cli_read_command(
970 'cephadm get-user')
971 def _get_user(self) -> Tuple[int, str, str]:
972 """
973 Show user for SSHing to cluster hosts
974 """
975 if self.ssh_user is None:
976 return -errno.ENOENT, '', 'No cluster SSH user configured'
977 else:
978 return 0, self.ssh_user, ''
979
980 @orchestrator._cli_read_command(
981 'cephadm set-user')
982 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
983 """
984 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
985 """
986 current_user = self.ssh_user
987 if user == current_user:
988 return 0, "value unchanged", ""
989
990 self._validate_and_set_ssh_val('ssh_user', user, current_user)
991 current_ssh_config = self._get_ssh_config()
992 new_ssh_config = re.sub(r"(\s{2}User\s)(.*)", r"\1" + user, current_ssh_config.stdout)
993 self._set_ssh_config(new_ssh_config)
994
995 msg = 'ssh user set to %s' % user
996 if user != 'root':
997 msg += '. sudo will be used'
998 self.log.info(msg)
999 return 0, msg, ''
1000
1001 @orchestrator._cli_read_command(
1002 'cephadm registry-login')
1003 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1004 """
1005 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
1006 """
1007 # if password not given in command line, get it through file input
1008 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
1009 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
1010 "or -i <login credentials json file>")
1011 elif (url and username and password):
1012 registry_json = {'url': url, 'username': username, 'password': password}
1013 else:
1014 assert isinstance(inbuf, str)
1015 registry_json = json.loads(inbuf)
1016 if "url" not in registry_json or "username" not in registry_json or "password" not in registry_json:
1017 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
1018 "Please setup json file as\n"
1019 "{\n"
1020 " \"url\": \"REGISTRY_URL\",\n"
1021 " \"username\": \"REGISTRY_USERNAME\",\n"
1022 " \"password\": \"REGISTRY_PASSWORD\"\n"
1023 "}\n")
1024
1025 # verify login info works by attempting login on random host
1026 host = None
1027 for host_name in self.inventory.keys():
1028 host = host_name
1029 break
1030 if not host:
1031 raise OrchestratorError('no hosts defined')
1032 r = self.wait_async(CephadmServe(self)._registry_login(host, registry_json))
1033 if r is not None:
1034 return 1, '', r
1035 # if logins succeeded, store info
1036 self.log.debug("Host logins successful. Storing login info.")
1037 self.set_store('registry_credentials', json.dumps(registry_json))
1038 # distribute new login info to all hosts
1039 self.cache.distribute_new_registry_login_info()
1040 return 0, "registry login scheduled", ''
1041
1042 @orchestrator._cli_read_command('cephadm check-host')
1043 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
1044 """Check whether we can access and manage a remote host"""
1045 try:
1046 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
1047 ['--expect-hostname', host],
1048 addr=addr,
1049 error_ok=True, no_fsid=True))
1050 if code:
1051 return 1, '', ('check-host failed:\n' + '\n'.join(err))
1052 except OrchestratorError:
1053 self.log.exception(f"check-host failed for '{host}'")
1054 return 1, '', ('check-host failed:\n'
1055 + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
1056 # if we have an outstanding health alert for this host, give the
1057 # serve thread a kick
1058 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1059 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1060 if item.startswith('host %s ' % host):
1061 self.event.set()
1062 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
1063
1064 @orchestrator._cli_read_command(
1065 'cephadm prepare-host')
1066 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
1067 """Prepare a remote host for use with cephadm"""
1068 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
1069 ['--expect-hostname', host],
1070 addr=addr,
1071 error_ok=True, no_fsid=True))
1072 if code:
1073 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
1074 # if we have an outstanding health alert for this host, give the
1075 # serve thread a kick
1076 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1077 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1078 if item.startswith('host %s ' % host):
1079 self.event.set()
1080 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
1081
1082 @orchestrator._cli_write_command(
1083 prefix='cephadm set-extra-ceph-conf')
1084 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
1085 """
1086 Text that is appended to all daemon's ceph.conf.
1087 Mainly a workaround, till `config generate-minimal-conf` generates
1088 a complete ceph.conf.
1089
1090 Warning: this is a dangerous operation.
1091 """
1092 if inbuf:
1093 # sanity check.
1094 cp = ConfigParser()
1095 cp.read_string(inbuf, source='<infile>')
1096
1097 self.set_store("extra_ceph_conf", json.dumps({
1098 'conf': inbuf,
1099 'last_modified': datetime_to_str(datetime_now())
1100 }))
1101 self.log.info('Set extra_ceph_conf')
1102 self._kick_serve_loop()
1103 return HandleCommandResult()
1104
1105 @orchestrator._cli_read_command(
1106 'cephadm get-extra-ceph-conf')
1107 def _get_extra_ceph_conf(self) -> HandleCommandResult:
1108 """
1109 Get extra ceph conf that is appended
1110 """
1111 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
1112
1113 @orchestrator._cli_read_command('cephadm config-check ls')
1114 def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
1115 """List the available configuration checks and their current state"""
1116
1117 if format not in [Format.plain, Format.json, Format.json_pretty]:
1118 return HandleCommandResult(
1119 retval=1,
1120 stderr="Requested format is not supported when listing configuration checks"
1121 )
1122
1123 if format in [Format.json, Format.json_pretty]:
1124 return HandleCommandResult(
1125 stdout=to_format(self.config_checker.health_checks,
1126 format,
1127 many=True,
1128 cls=None))
1129
1130 # plain formatting
1131 table = PrettyTable(
1132 ['NAME',
1133 'HEALTHCHECK',
1134 'STATUS',
1135 'DESCRIPTION'
1136 ], border=False)
1137 table.align['NAME'] = 'l'
1138 table.align['HEALTHCHECK'] = 'l'
1139 table.align['STATUS'] = 'l'
1140 table.align['DESCRIPTION'] = 'l'
1141 table.left_padding_width = 0
1142 table.right_padding_width = 2
1143 for c in self.config_checker.health_checks:
1144 table.add_row((
1145 c.name,
1146 c.healthcheck_name,
1147 c.status,
1148 c.description,
1149 ))
1150
1151 return HandleCommandResult(stdout=table.get_string())
1152
1153 @orchestrator._cli_read_command('cephadm config-check status')
1154 def _config_check_status(self) -> HandleCommandResult:
1155 """Show whether the configuration checker feature is enabled/disabled"""
1156 status = self.get_module_option('config_checks_enabled')
1157 return HandleCommandResult(stdout="Enabled" if status else "Disabled")
1158
1159 @orchestrator._cli_write_command('cephadm config-check enable')
1160 def _config_check_enable(self, check_name: str) -> HandleCommandResult:
1161 """Enable a specific configuration check"""
1162 if not self._config_check_valid(check_name):
1163 return HandleCommandResult(retval=1, stderr="Invalid check name")
1164
1165 err, msg = self._update_config_check(check_name, 'enabled')
1166 if err:
1167 return HandleCommandResult(
1168 retval=err,
1169 stderr=f"Failed to enable check '{check_name}' : {msg}")
1170
1171 return HandleCommandResult(stdout="ok")
1172
1173 @orchestrator._cli_write_command('cephadm config-check disable')
1174 def _config_check_disable(self, check_name: str) -> HandleCommandResult:
1175 """Disable a specific configuration check"""
1176 if not self._config_check_valid(check_name):
1177 return HandleCommandResult(retval=1, stderr="Invalid check name")
1178
1179 err, msg = self._update_config_check(check_name, 'disabled')
1180 if err:
1181 return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}")
1182 else:
1183 # drop any outstanding raised healthcheck for this check
1184 config_check = self.config_checker.lookup_check(check_name)
1185 if config_check:
1186 if config_check.healthcheck_name in self.health_checks:
1187 self.health_checks.pop(config_check.healthcheck_name, None)
1188 self.set_health_checks(self.health_checks)
1189 else:
1190 self.log.error(
1191 f"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1192
1193 return HandleCommandResult(stdout="ok")
1194
1195 def _config_check_valid(self, check_name: str) -> bool:
1196 return check_name in [chk.name for chk in self.config_checker.health_checks]
1197
1198 def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]:
1199 checks_raw = self.get_store('config_checks')
1200 if not checks_raw:
1201 return 1, "config_checks setting is not available"
1202
1203 checks = json.loads(checks_raw)
1204 checks.update({
1205 check_name: status
1206 })
1207 self.log.info(f"updated config check '{check_name}' : {status}")
1208 self.set_store('config_checks', json.dumps(checks))
1209 return 0, ""
1210
1211 class ExtraCephConf(NamedTuple):
1212 conf: str
1213 last_modified: Optional[datetime.datetime]
1214
1215 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
1216 data = self.get_store('extra_ceph_conf')
1217 if not data:
1218 return CephadmOrchestrator.ExtraCephConf('', None)
1219 try:
1220 j = json.loads(data)
1221 except ValueError:
1222 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
1223 self.log.exception('%s: \'%s\'', msg, data)
1224 return CephadmOrchestrator.ExtraCephConf('', None)
1225 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
1226
1227 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
1228 conf = self.extra_ceph_conf()
1229 if not conf.last_modified:
1230 return False
1231 return conf.last_modified > dt
1232
1233 @orchestrator._cli_write_command(
1234 'cephadm osd activate'
1235 )
1236 def _osd_activate(self, host: List[str]) -> HandleCommandResult:
1237 """
1238 Start OSD containers for existing OSDs
1239 """
1240
1241 @forall_hosts
1242 def run(h: str) -> str:
1243 return self.wait_async(self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd'))
1244
1245 return HandleCommandResult(stdout='\n'.join(run(host)))
1246
1247 @orchestrator._cli_read_command('orch client-keyring ls')
1248 def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
1249 """
1250 List client keyrings under cephadm management
1251 """
1252 if format != Format.plain:
1253 output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec)
1254 else:
1255 table = PrettyTable(
1256 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1257 border=False)
1258 table.align = 'l'
1259 table.left_padding_width = 0
1260 table.right_padding_width = 2
1261 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity):
1262 table.add_row((
1263 ks.entity, ks.placement.pretty_str(),
1264 utils.file_mode_to_str(ks.mode),
1265 f'{ks.uid}:{ks.gid}',
1266 ks.path,
1267 ))
1268 output = table.get_string()
1269 return HandleCommandResult(stdout=output)
1270
1271 @orchestrator._cli_write_command('orch client-keyring set')
1272 def _client_keyring_set(
1273 self,
1274 entity: str,
1275 placement: str,
1276 owner: Optional[str] = None,
1277 mode: Optional[str] = None,
1278 ) -> HandleCommandResult:
1279 """
1280 Add or update client keyring under cephadm management
1281 """
1282 if not entity.startswith('client.'):
1283 raise OrchestratorError('entity must start with client.')
1284 if owner:
1285 try:
1286 uid, gid = map(int, owner.split(':'))
1287 except Exception:
1288 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1289 else:
1290 uid = 0
1291 gid = 0
1292 if mode:
1293 try:
1294 imode = int(mode, 8)
1295 except Exception:
1296 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1297 else:
1298 imode = 0o600
1299 pspec = PlacementSpec.from_string(placement)
1300 ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid)
1301 self.keys.update(ks)
1302 self._kick_serve_loop()
1303 return HandleCommandResult()
1304
1305 @orchestrator._cli_write_command('orch client-keyring rm')
1306 def _client_keyring_rm(
1307 self,
1308 entity: str,
1309 ) -> HandleCommandResult:
1310 """
1311 Remove client keyring from cephadm management
1312 """
1313 self.keys.rm(entity)
1314 self._kick_serve_loop()
1315 return HandleCommandResult()
1316
1317 def _get_container_image(self, daemon_name: str) -> Optional[str]:
1318 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1319 image: Optional[str] = None
1320 if daemon_type in CEPH_IMAGE_TYPES:
1321 # get container image
1322 image = str(self.get_foreign_ceph_option(
1323 utils.name_to_config_section(daemon_name),
1324 'container_image'
1325 )).strip()
1326 elif daemon_type == 'prometheus':
1327 image = self.container_image_prometheus
1328 elif daemon_type == 'grafana':
1329 image = self.container_image_grafana
1330 elif daemon_type == 'alertmanager':
1331 image = self.container_image_alertmanager
1332 elif daemon_type == 'node-exporter':
1333 image = self.container_image_node_exporter
1334 elif daemon_type == 'loki':
1335 image = self.container_image_loki
1336 elif daemon_type == 'promtail':
1337 image = self.container_image_promtail
1338 elif daemon_type == 'haproxy':
1339 image = self.container_image_haproxy
1340 elif daemon_type == 'keepalived':
1341 image = self.container_image_keepalived
1342 elif daemon_type == CustomContainerService.TYPE:
1343 # The image can't be resolved, the necessary information
1344 # is only available when a container is deployed (given
1345 # via spec).
1346 image = None
1347 elif daemon_type == 'snmp-gateway':
1348 image = self.container_image_snmp_gateway
1349 else:
1350 assert False, daemon_type
1351
1352 self.log.debug('%s container image %s' % (daemon_name, image))
1353
1354 return image
1355
1356 def _check_valid_addr(self, host: str, addr: str) -> str:
1357 # make sure hostname is resolvable before trying to make a connection
1358 try:
1359 ip_addr = utils.resolve_ip(addr)
1360 except OrchestratorError as e:
1361 msg = str(e) + f'''
1362 You may need to supply an address for {addr}
1363
1364 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1365 To add the cephadm SSH key to the host:
1366 > ceph cephadm get-pub-key > ~/ceph.pub
1367 > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1368
1369 To check that the host is reachable open a new shell with the --no-hosts flag:
1370 > cephadm shell --no-hosts
1371
1372 Then run the following:
1373 > ceph cephadm get-ssh-config > ssh_config
1374 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1375 > chmod 0600 ~/cephadm_private_key
1376 > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1377 raise OrchestratorError(msg)
1378
1379 if ipaddress.ip_address(ip_addr).is_loopback and host == addr:
1380 # if this is a re-add, use old address. otherwise error
1381 if host not in self.inventory or self.inventory.get_addr(host) == host:
1382 raise OrchestratorError(
1383 (f'Cannot automatically resolve ip address of host {host}. Ip resolved to loopback address: {ip_addr}\n'
1384 + f'Please explicitly provide the address (ceph orch host add {host} --addr <ip-addr>)'))
1385 self.log.debug(
1386 f'Received loopback address resolving ip for {host}: {ip_addr}. Falling back to previous address.')
1387 ip_addr = self.inventory.get_addr(host)
1388 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
1389 host, cephadmNoImage, 'check-host',
1390 ['--expect-hostname', host],
1391 addr=addr,
1392 error_ok=True, no_fsid=True))
1393 if code:
1394 msg = 'check-host failed:\n' + '\n'.join(err)
1395 # err will contain stdout and stderr, so we filter on the message text to
1396 # only show the errors
1397 errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
1398 if errors:
1399 msg = f'Host {host} ({addr}) failed check(s): {errors}'
1400 raise OrchestratorError(msg)
1401 return ip_addr
1402
1403 def _add_host(self, spec):
1404 # type: (HostSpec) -> str
1405 """
1406 Add a host to be managed by the orchestrator.
1407
1408 :param host: host name
1409 """
1410 HostSpec.validate(spec)
1411 ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
1412 if spec.addr == spec.hostname and ip_addr:
1413 spec.addr = ip_addr
1414
1415 if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr:
1416 self.cache.refresh_all_host_info(spec.hostname)
1417
1418 # prime crush map?
1419 if spec.location:
1420 self.check_mon_command({
1421 'prefix': 'osd crush add-bucket',
1422 'name': spec.hostname,
1423 'type': 'host',
1424 'args': [f'{k}={v}' for k, v in spec.location.items()],
1425 })
1426
1427 if spec.hostname not in self.inventory:
1428 self.cache.prime_empty_host(spec.hostname)
1429 self.inventory.add_host(spec)
1430 self.offline_hosts_remove(spec.hostname)
1431 if spec.status == 'maintenance':
1432 self._set_maintenance_healthcheck()
1433 self.event.set() # refresh stray health check
1434 self.log.info('Added host %s' % spec.hostname)
1435 return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
1436
1437 @handle_orch_error
1438 def add_host(self, spec: HostSpec) -> str:
1439 return self._add_host(spec)
1440
1441 @handle_orch_error
1442 def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str:
1443 """
1444 Remove a host from orchestrator management.
1445
1446 :param host: host name
1447 :param force: bypass running daemons check
1448 :param offline: remove offline host
1449 """
1450
1451 # check if host is offline
1452 host_offline = host in self.offline_hosts
1453
1454 if host_offline and not offline:
1455 raise OrchestratorValidationError(
1456 "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host))
1457
1458 if not host_offline and offline:
1459 raise OrchestratorValidationError(
1460 "{} is online, please remove host without --offline.".format(host))
1461
1462 if offline and not force:
1463 raise OrchestratorValidationError("Removing an offline host requires --force")
1464
1465 # check if there are daemons on the host
1466 if not force:
1467 daemons = self.cache.get_daemons_by_host(host)
1468 if daemons:
1469 self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}")
1470
1471 daemons_table = ""
1472 daemons_table += "{:<20} {:<15}\n".format("type", "id")
1473 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1474 for d in daemons:
1475 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
1476
1477 raise OrchestratorValidationError("Not allowed to remove %s from cluster. "
1478 "The following daemons are running in the host:"
1479 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1480 host, daemons_table, host))
1481
1482 # check, if there we're removing the last _admin host
1483 if not force:
1484 p = PlacementSpec(label='_admin')
1485 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1486 if len(admin_hosts) == 1 and admin_hosts[0] == host:
1487 raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
1488 " label. Please add the '_admin' label to a host"
1489 " or add --force to this command")
1490
1491 def run_cmd(cmd_args: dict) -> None:
1492 ret, out, err = self.mon_command(cmd_args)
1493 if ret != 0:
1494 self.log.debug(f"ran {cmd_args} with mon_command")
1495 self.log.error(
1496 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1497 self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
1498
1499 if offline:
1500 daemons = self.cache.get_daemons_by_host(host)
1501 for d in daemons:
1502 self.log.info(f"removing: {d.name()}")
1503
1504 if d.daemon_type != 'osd':
1505 self.cephadm_services[str(d.daemon_type)].pre_remove(d)
1506 self.cephadm_services[str(d.daemon_type)].post_remove(d, is_failed_deploy=False)
1507 else:
1508 cmd_args = {
1509 'prefix': 'osd purge-actual',
1510 'id': int(str(d.daemon_id)),
1511 'yes_i_really_mean_it': True
1512 }
1513 run_cmd(cmd_args)
1514
1515 cmd_args = {
1516 'prefix': 'osd crush rm',
1517 'name': host
1518 }
1519 run_cmd(cmd_args)
1520
1521 self.inventory.rm_host(host)
1522 self.cache.rm_host(host)
1523 self.ssh.reset_con(host)
1524 self.event.set() # refresh stray health check
1525 self.log.info('Removed host %s' % host)
1526 return "Removed {} host '{}'".format('offline' if offline else '', host)
1527
1528 @handle_orch_error
1529 def update_host_addr(self, host: str, addr: str) -> str:
1530 self._check_valid_addr(host, addr)
1531 self.inventory.set_addr(host, addr)
1532 self.ssh.reset_con(host)
1533 self.event.set() # refresh stray health check
1534 self.log.info('Set host %s addr to %s' % (host, addr))
1535 return "Updated host '{}' addr to '{}'".format(host, addr)
1536
1537 @handle_orch_error
1538 def get_hosts(self):
1539 # type: () -> List[orchestrator.HostSpec]
1540 """
1541 Return a list of hosts managed by the orchestrator.
1542
1543 Notes:
1544 - skip async: manager reads from cache.
1545 """
1546 return list(self.inventory.all_specs())
1547
1548 @handle_orch_error
1549 def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]:
1550 """
1551 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1552
1553 Notes:
1554 - skip async: manager reads from cache.
1555 """
1556 if hostname:
1557 return [self.cache.get_facts(hostname)]
1558
1559 return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()]
1560
1561 @handle_orch_error
1562 def add_host_label(self, host: str, label: str) -> str:
1563 self.inventory.add_label(host, label)
1564 self.log.info('Added label %s to host %s' % (label, host))
1565 self._kick_serve_loop()
1566 return 'Added label %s to host %s' % (label, host)
1567
1568 @handle_orch_error
1569 def remove_host_label(self, host: str, label: str, force: bool = False) -> str:
1570 # if we remove the _admin label from the only host that has it we could end up
1571 # removing the only instance of the config and keyring and cause issues
1572 if not force and label == '_admin':
1573 p = PlacementSpec(label='_admin')
1574 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
1575 if len(admin_hosts) == 1 and admin_hosts[0] == host:
1576 raise OrchestratorValidationError(f"Host {host} is the last host with the '_admin'"
1577 " label.\nRemoving the _admin label from this host could cause the removal"
1578 " of the last cluster config/keyring managed by cephadm.\n"
1579 "It is recommended to add the _admin label to another host"
1580 " before completing this operation.\nIf you're certain this is"
1581 " what you want rerun this command with --force.")
1582 self.inventory.rm_label(host, label)
1583 self.log.info('Removed label %s to host %s' % (label, host))
1584 self._kick_serve_loop()
1585 return 'Removed label %s from host %s' % (label, host)
1586
1587 def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
1588 self.log.debug("running host-ok-to-stop checks")
1589 daemons = self.cache.get_daemons()
1590 daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
1591 for dd in daemons:
1592 assert dd.hostname is not None
1593 assert dd.daemon_type is not None
1594 assert dd.daemon_id is not None
1595 if dd.hostname == hostname:
1596 daemon_map[dd.daemon_type].append(dd.daemon_id)
1597
1598 notifications: List[str] = []
1599 error_notifications: List[str] = []
1600 okay: bool = True
1601 for daemon_type, daemon_ids in daemon_map.items():
1602 r = self.cephadm_services[daemon_type_to_service(
1603 daemon_type)].ok_to_stop(daemon_ids, force=force)
1604 if r.retval:
1605 okay = False
1606 # collect error notifications so user can see every daemon causing host
1607 # to not be okay to stop
1608 error_notifications.append(r.stderr)
1609 if r.stdout:
1610 # if extra notifications to print for user, add them to notifications list
1611 notifications.append(r.stdout)
1612
1613 if not okay:
1614 # at least one daemon is not okay to stop
1615 return 1, '\n'.join(error_notifications)
1616
1617 if notifications:
1618 return 0, (f'It is presumed safe to stop host {hostname}. '
1619 + 'Note the following:\n\n' + '\n'.join(notifications))
1620 return 0, f'It is presumed safe to stop host {hostname}'
1621
1622 @handle_orch_error
1623 def host_ok_to_stop(self, hostname: str) -> str:
1624 if hostname not in self.cache.get_hosts():
1625 raise OrchestratorError(f'Cannot find host "{hostname}"')
1626
1627 rc, msg = self._host_ok_to_stop(hostname)
1628 if rc:
1629 raise OrchestratorError(msg, errno=rc)
1630
1631 self.log.info(msg)
1632 return msg
1633
1634 def _set_maintenance_healthcheck(self) -> None:
1635 """Raise/update or clear the maintenance health check as needed"""
1636
1637 in_maintenance = self.inventory.get_host_with_state("maintenance")
1638 if not in_maintenance:
1639 self.remove_health_warning('HOST_IN_MAINTENANCE')
1640 else:
1641 s = "host is" if len(in_maintenance) == 1 else "hosts are"
1642 self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [
1643 f"{h} is in maintenance" for h in in_maintenance])
1644
1645 @handle_orch_error
1646 @host_exists()
1647 def enter_host_maintenance(self, hostname: str, force: bool = False) -> str:
1648 """ Attempt to place a cluster host in maintenance
1649
1650 Placing a host into maintenance disables the cluster's ceph target in systemd
1651 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1652 for the host subtree in crush to prevent data movement during a host maintenance
1653 window.
1654
1655 :param hostname: (str) name of the host (must match an inventory hostname)
1656
1657 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1658 """
1659 if len(self.cache.get_hosts()) == 1:
1660 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1661
1662 # if upgrade is active, deny
1663 if self.upgrade.upgrade_state:
1664 raise OrchestratorError(
1665 f"Unable to place {hostname} in maintenance with upgrade active/paused")
1666
1667 tgt_host = self.inventory._inventory[hostname]
1668 if tgt_host.get("status", "").lower() == "maintenance":
1669 raise OrchestratorError(f"Host {hostname} is already in maintenance")
1670
1671 host_daemons = self.cache.get_daemon_types(hostname)
1672 self.log.debug("daemons on host {}".format(','.join(host_daemons)))
1673 if host_daemons:
1674 # daemons on this host, so check the daemons can be stopped
1675 # and if so, place the host into maintenance by disabling the target
1676 rc, msg = self._host_ok_to_stop(hostname, force)
1677 if rc:
1678 raise OrchestratorError(
1679 msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
1680
1681 # call the host-maintenance function
1682 _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
1683 ["enter"],
1684 error_ok=True))
1685 returned_msg = _err[0].split('\n')[-1]
1686 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
1687 raise OrchestratorError(
1688 f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1689
1690 if "osd" in host_daemons:
1691 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1692 rc, out, err = self.mon_command({
1693 'prefix': 'osd set-group',
1694 'flags': 'noout',
1695 'who': [crush_node],
1696 'format': 'json'
1697 })
1698 if rc:
1699 self.log.warning(
1700 f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1701 raise OrchestratorError(
1702 f"Unable to set the osds on {hostname} to noout (rc={rc})")
1703 else:
1704 self.log.info(
1705 f"maintenance mode request for {hostname} has SET the noout group")
1706
1707 # update the host status in the inventory
1708 tgt_host["status"] = "maintenance"
1709 self.inventory._inventory[hostname] = tgt_host
1710 self.inventory.save()
1711
1712 self._set_maintenance_healthcheck()
1713 return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
1714
1715 @handle_orch_error
1716 @host_exists()
1717 def exit_host_maintenance(self, hostname: str) -> str:
1718 """Exit maintenance mode and return a host to an operational state
1719
1720 Returning from maintnenance will enable the clusters systemd target and
1721 start it, and remove any noout that has been added for the host if the
1722 host has osd daemons
1723
1724 :param hostname: (str) host name
1725
1726 :raises OrchestratorError: Unable to return from maintenance, or unset the
1727 noout flag
1728 """
1729 tgt_host = self.inventory._inventory[hostname]
1730 if tgt_host['status'] != "maintenance":
1731 raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
1732
1733 outs, errs, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
1734 ['exit'],
1735 error_ok=True))
1736 returned_msg = errs[0].split('\n')[-1]
1737 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
1738 raise OrchestratorError(
1739 f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1740
1741 if "osd" in self.cache.get_daemon_types(hostname):
1742 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1743 rc, _out, _err = self.mon_command({
1744 'prefix': 'osd unset-group',
1745 'flags': 'noout',
1746 'who': [crush_node],
1747 'format': 'json'
1748 })
1749 if rc:
1750 self.log.warning(
1751 f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1752 raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
1753 else:
1754 self.log.info(
1755 f"exit maintenance request has UNSET for the noout group on host {hostname}")
1756
1757 # update the host record status
1758 tgt_host['status'] = ""
1759 self.inventory._inventory[hostname] = tgt_host
1760 self.inventory.save()
1761
1762 self._set_maintenance_healthcheck()
1763
1764 return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1765
1766 @handle_orch_error
1767 @host_exists()
1768 def rescan_host(self, hostname: str) -> str:
1769 """Use cephadm to issue a disk rescan on each HBA
1770
1771 Some HBAs and external enclosures don't automatically register
1772 device insertion with the kernel, so for these scenarios we need
1773 to manually rescan
1774
1775 :param hostname: (str) host name
1776 """
1777 self.log.info(f'disk rescan request sent to host "{hostname}"')
1778 _out, _err, _code = self.wait_async(CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "disk-rescan",
1779 [],
1780 no_fsid=True,
1781 error_ok=True))
1782 if not _err:
1783 raise OrchestratorError('Unexpected response from cephadm disk-rescan call')
1784
1785 msg = _err[0].split('\n')[-1]
1786 log_msg = f'disk rescan: {msg}'
1787 if msg.upper().startswith('OK'):
1788 self.log.info(log_msg)
1789 else:
1790 self.log.warning(log_msg)
1791
1792 return f'{msg}'
1793
1794 def get_minimal_ceph_conf(self) -> str:
1795 _, config, _ = self.check_mon_command({
1796 "prefix": "config generate-minimal-conf",
1797 })
1798 extra = self.extra_ceph_conf().conf
1799 if extra:
1800 config += '\n\n' + extra.strip() + '\n'
1801 return config
1802
1803 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
1804 if filter_host:
1805 self.cache.invalidate_host_daemons(filter_host)
1806 else:
1807 for h in self.cache.get_hosts():
1808 # Also discover daemons deployed manually
1809 self.cache.invalidate_host_daemons(h)
1810
1811 self._kick_serve_loop()
1812
1813 @handle_orch_error
1814 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1815 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
1816 if refresh:
1817 self._invalidate_daemons_and_kick_serve()
1818 self.log.debug('Kicked serve() loop to refresh all services')
1819
1820 sm: Dict[str, orchestrator.ServiceDescription] = {}
1821
1822 # known services
1823 for nm, spec in self.spec_store.all_specs.items():
1824 if service_type is not None and service_type != spec.service_type:
1825 continue
1826 if service_name is not None and service_name != nm:
1827 continue
1828
1829 if spec.service_type != 'osd':
1830 size = spec.placement.get_target_count(self.cache.get_schedulable_hosts())
1831 else:
1832 # osd counting is special
1833 size = 0
1834
1835 sm[nm] = orchestrator.ServiceDescription(
1836 spec=spec,
1837 size=size,
1838 running=0,
1839 events=self.events.get_for_service(spec.service_name()),
1840 created=self.spec_store.spec_created[nm],
1841 deleted=self.spec_store.spec_deleted.get(nm, None),
1842 virtual_ip=spec.get_virtual_ip(),
1843 ports=spec.get_port_start(),
1844 )
1845 if spec.service_type == 'ingress':
1846 # ingress has 2 daemons running per host
1847 sm[nm].size *= 2
1848
1849 # factor daemons into status
1850 for h, dm in self.cache.get_daemons_with_volatile_status():
1851 for name, dd in dm.items():
1852 assert dd.hostname is not None, f'no hostname for {dd!r}'
1853 assert dd.daemon_type is not None, f'no daemon_type for {dd!r}'
1854
1855 n: str = dd.service_name()
1856
1857 if (
1858 service_type
1859 and service_type != daemon_type_to_service(dd.daemon_type)
1860 ):
1861 continue
1862 if service_name and service_name != n:
1863 continue
1864
1865 if n not in sm:
1866 # new unmanaged service
1867 spec = ServiceSpec(
1868 unmanaged=True,
1869 service_type=daemon_type_to_service(dd.daemon_type),
1870 service_id=dd.service_id(),
1871 )
1872 sm[n] = orchestrator.ServiceDescription(
1873 last_refresh=dd.last_refresh,
1874 container_image_id=dd.container_image_id,
1875 container_image_name=dd.container_image_name,
1876 spec=spec,
1877 size=0,
1878 )
1879
1880 if dd.status == DaemonDescriptionStatus.running:
1881 sm[n].running += 1
1882 if dd.daemon_type == 'osd':
1883 # The osd count can't be determined by the Placement spec.
1884 # Showing an actual/expected representation cannot be determined
1885 # here. So we're setting running = size for now.
1886 sm[n].size += 1
1887 if (
1888 not sm[n].last_refresh
1889 or not dd.last_refresh
1890 or dd.last_refresh < sm[n].last_refresh # type: ignore
1891 ):
1892 sm[n].last_refresh = dd.last_refresh
1893
1894 return list(sm.values())
1895
1896 @handle_orch_error
1897 def list_daemons(self,
1898 service_name: Optional[str] = None,
1899 daemon_type: Optional[str] = None,
1900 daemon_id: Optional[str] = None,
1901 host: Optional[str] = None,
1902 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
1903 if refresh:
1904 self._invalidate_daemons_and_kick_serve(host)
1905 self.log.debug('Kicked serve() loop to refresh all daemons')
1906
1907 result = []
1908 for h, dm in self.cache.get_daemons_with_volatile_status():
1909 if host and h != host:
1910 continue
1911 for name, dd in dm.items():
1912 if daemon_type is not None and daemon_type != dd.daemon_type:
1913 continue
1914 if daemon_id is not None and daemon_id != dd.daemon_id:
1915 continue
1916 if service_name is not None and service_name != dd.service_name():
1917 continue
1918 if not dd.memory_request and dd.daemon_type in ['osd', 'mon']:
1919 dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option(
1920 dd.name(),
1921 f"{dd.daemon_type}_memory_target"
1922 ))
1923 result.append(dd)
1924 return result
1925
1926 @handle_orch_error
1927 def service_action(self, action: str, service_name: str) -> List[str]:
1928 if service_name not in self.spec_store.all_specs.keys():
1929 raise OrchestratorError(f'Invalid service name "{service_name}".'
1930 + ' View currently running services using "ceph orch ls"')
1931 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
1932 if not dds:
1933 raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
1934 + ' View currently running services using "ceph orch ls"')
1935 if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
1936 return [f'Stopping entire {service_name} service is prohibited.']
1937 self.log.info('%s service %s' % (action.capitalize(), service_name))
1938 return [
1939 self._schedule_daemon_action(dd.name(), action)
1940 for dd in dds
1941 ]
1942
1943 def _daemon_action(self,
1944 daemon_spec: CephadmDaemonDeploySpec,
1945 action: str,
1946 image: Optional[str] = None) -> str:
1947 self._daemon_action_set_image(action, image, daemon_spec.daemon_type,
1948 daemon_spec.daemon_id)
1949
1950 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type,
1951 daemon_spec.daemon_id):
1952 self.mgr_service.fail_over()
1953 return '' # unreachable
1954
1955 if action == 'redeploy' or action == 'reconfig':
1956 if daemon_spec.daemon_type != 'osd':
1957 daemon_spec = self.cephadm_services[daemon_type_to_service(
1958 daemon_spec.daemon_type)].prepare_create(daemon_spec)
1959 else:
1960 # for OSDs, we still need to update config, just not carry out the full
1961 # prepare_create function
1962 daemon_spec.final_config, daemon_spec.deps = self.osd_service.generate_config(daemon_spec)
1963 return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')))
1964
1965 actions = {
1966 'start': ['reset-failed', 'start'],
1967 'stop': ['stop'],
1968 'restart': ['reset-failed', 'restart'],
1969 }
1970 name = daemon_spec.name()
1971 for a in actions[action]:
1972 try:
1973 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
1974 daemon_spec.host, name, 'unit',
1975 ['--name', name, a]))
1976 except Exception:
1977 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
1978 self.cache.invalidate_host_daemons(daemon_spec.host)
1979 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1980 self.events.for_daemon(name, 'INFO', msg)
1981 return msg
1982
1983 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
1984 if image is not None:
1985 if action != 'redeploy':
1986 raise OrchestratorError(
1987 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1988 if daemon_type not in CEPH_IMAGE_TYPES:
1989 raise OrchestratorError(
1990 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1991 f'types are: {", ".join(CEPH_IMAGE_TYPES)}')
1992
1993 self.check_mon_command({
1994 'prefix': 'config set',
1995 'name': 'container_image',
1996 'value': image,
1997 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1998 })
1999
2000 @handle_orch_error
2001 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
2002 d = self.cache.get_daemon(daemon_name)
2003 assert d.daemon_type is not None
2004 assert d.daemon_id is not None
2005
2006 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \
2007 and not self.mgr_service.mgr_map_has_standby():
2008 raise OrchestratorError(
2009 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2010
2011 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
2012
2013 self.log.info(f'Schedule {action} daemon {daemon_name}')
2014 return self._schedule_daemon_action(daemon_name, action)
2015
2016 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
2017 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
2018
2019 def get_active_mgr_digests(self) -> List[str]:
2020 digests = self.mgr_service.get_active_daemon(
2021 self.cache.get_daemons_by_type('mgr')).container_image_digests
2022 return digests if digests else []
2023
2024 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
2025 dd = self.cache.get_daemon(daemon_name)
2026 assert dd.daemon_type is not None
2027 assert dd.daemon_id is not None
2028 assert dd.hostname is not None
2029 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
2030 and not self.mgr_service.mgr_map_has_standby():
2031 raise OrchestratorError(
2032 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2033 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
2034 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
2035 self._kick_serve_loop()
2036 return msg
2037
2038 @handle_orch_error
2039 def remove_daemons(self, names):
2040 # type: (List[str]) -> List[str]
2041 args = []
2042 for host, dm in self.cache.daemons.items():
2043 for name in names:
2044 if name in dm:
2045 args.append((name, host))
2046 if not args:
2047 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
2048 self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
2049 return self._remove_daemons(args)
2050
2051 @handle_orch_error
2052 def remove_service(self, service_name: str, force: bool = False) -> str:
2053 self.log.info('Remove service %s' % service_name)
2054 self._trigger_preview_refresh(service_name=service_name)
2055 if service_name in self.spec_store:
2056 if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
2057 return f'Unable to remove {service_name} service.\n' \
2058 f'Note, you might want to mark the {service_name} service as "unmanaged"'
2059 else:
2060 return f"Invalid service '{service_name}'. Use 'ceph orch ls' to list available services.\n"
2061
2062 # Report list of affected OSDs?
2063 if not force and service_name.startswith('osd.'):
2064 osds_msg = {}
2065 for h, dm in self.cache.get_daemons_with_volatile_status():
2066 osds_to_remove = []
2067 for name, dd in dm.items():
2068 if dd.daemon_type == 'osd' and dd.service_name() == service_name:
2069 osds_to_remove.append(str(dd.daemon_id))
2070 if osds_to_remove:
2071 osds_msg[h] = osds_to_remove
2072 if osds_msg:
2073 msg = ''
2074 for h, ls in osds_msg.items():
2075 msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2076 raise OrchestratorError(
2077 f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2078
2079 found = self.spec_store.rm(service_name)
2080 if found and service_name.startswith('osd.'):
2081 self.spec_store.finally_rm(service_name)
2082 self._kick_serve_loop()
2083 return f'Removed service {service_name}'
2084
2085 @handle_orch_error
2086 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
2087 """
2088 Return the storage inventory of hosts matching the given filter.
2089
2090 :param host_filter: host filter
2091
2092 TODO:
2093 - add filtering by label
2094 """
2095 if refresh:
2096 if host_filter and host_filter.hosts:
2097 for h in host_filter.hosts:
2098 self.log.debug(f'will refresh {h} devs')
2099 self.cache.invalidate_host_devices(h)
2100 self.cache.invalidate_host_networks(h)
2101 else:
2102 for h in self.cache.get_hosts():
2103 self.log.debug(f'will refresh {h} devs')
2104 self.cache.invalidate_host_devices(h)
2105 self.cache.invalidate_host_networks(h)
2106
2107 self.event.set()
2108 self.log.debug('Kicked serve() loop to refresh devices')
2109
2110 result = []
2111 for host, dls in self.cache.devices.items():
2112 if host_filter and host_filter.hosts and host not in host_filter.hosts:
2113 continue
2114 result.append(orchestrator.InventoryHost(host,
2115 inventory.Devices(dls)))
2116 return result
2117
2118 @handle_orch_error
2119 def zap_device(self, host: str, path: str) -> str:
2120 """Zap a device on a managed host.
2121
2122 Use ceph-volume zap to return a device to an unused/free state
2123
2124 Args:
2125 host (str): hostname of the cluster host
2126 path (str): device path
2127
2128 Raises:
2129 OrchestratorError: host is not a cluster host
2130 OrchestratorError: host is in maintenance and therefore unavailable
2131 OrchestratorError: device path not found on the host
2132 OrchestratorError: device is known to a different ceph cluster
2133 OrchestratorError: device holds active osd
2134 OrchestratorError: device cache hasn't been populated yet..
2135
2136 Returns:
2137 str: output from the zap command
2138 """
2139
2140 self.log.info('Zap device %s:%s' % (host, path))
2141
2142 if host not in self.inventory.keys():
2143 raise OrchestratorError(
2144 f"Host '{host}' is not a member of the cluster")
2145
2146 host_info = self.inventory._inventory.get(host, {})
2147 if host_info.get('status', '').lower() == 'maintenance':
2148 raise OrchestratorError(
2149 f"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2150
2151 if host not in self.cache.devices:
2152 raise OrchestratorError(
2153 f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2154
2155 host_devices = self.cache.devices[host]
2156 path_found = False
2157 osd_id_list: List[str] = []
2158
2159 for dev in host_devices:
2160 if dev.path == path:
2161 path_found = True
2162 break
2163 if not path_found:
2164 raise OrchestratorError(
2165 f"Device path '{path}' not found on host '{host}'")
2166
2167 if osd_id_list:
2168 dev_name = os.path.basename(path)
2169 active_osds: List[str] = []
2170 for osd_id in osd_id_list:
2171 metadata = self.get_metadata('osd', str(osd_id))
2172 if metadata:
2173 if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','):
2174 active_osds.append("osd." + osd_id)
2175 if active_osds:
2176 raise OrchestratorError(
2177 f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2178 f"OSD{'s' if len(active_osds) > 1 else ''}"
2179 f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2180
2181 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2182 host, 'osd', 'ceph-volume',
2183 ['--', 'lvm', 'zap', '--destroy', path],
2184 error_ok=True))
2185
2186 self.cache.invalidate_host_devices(host)
2187 self.cache.invalidate_host_networks(host)
2188 if code:
2189 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
2190 msg = f'zap successful for {path} on {host}'
2191 self.log.info(msg)
2192
2193 return msg + '\n'
2194
2195 @handle_orch_error
2196 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
2197 """
2198 Blink a device light. Calling something like::
2199
2200 lsmcli local-disk-ident-led-on --path $path
2201
2202 If you must, you can customize this via::
2203
2204 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2205 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2206
2207 See templates/blink_device_light_cmd.j2
2208 """
2209 @forall_hosts
2210 def blink(host: str, dev: str, path: str) -> str:
2211 cmd_line = self.template.render('blink_device_light_cmd.j2',
2212 {
2213 'on': on,
2214 'ident_fault': ident_fault,
2215 'dev': dev,
2216 'path': path
2217 },
2218 host=host)
2219 cmd_args = shlex.split(cmd_line)
2220
2221 out, err, code = self.wait_async(CephadmServe(self)._run_cephadm(
2222 host, 'osd', 'shell', ['--'] + cmd_args,
2223 error_ok=True))
2224 if code:
2225 raise OrchestratorError(
2226 'Unable to affect %s light for %s:%s. Command: %s' % (
2227 ident_fault, host, dev, ' '.join(cmd_args)))
2228 self.log.info('Set %s light for %s:%s %s' % (
2229 ident_fault, host, dev, 'on' if on else 'off'))
2230 return "Set %s light for %s:%s %s" % (
2231 ident_fault, host, dev, 'on' if on else 'off')
2232
2233 return blink(locs)
2234
2235 def get_osd_uuid_map(self, only_up=False):
2236 # type: (bool) -> Dict[str, str]
2237 osd_map = self.get('osd_map')
2238 r = {}
2239 for o in osd_map['osds']:
2240 # only include OSDs that have ever started in this map. this way
2241 # an interrupted osd create can be repeated and succeed the second
2242 # time around.
2243 osd_id = o.get('osd')
2244 if osd_id is None:
2245 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2246 if not only_up:
2247 r[str(osd_id)] = o.get('uuid', '')
2248 return r
2249
2250 def get_osd_by_id(self, osd_id: int) -> Optional[Dict[str, Any]]:
2251 osd = [x for x in self.get('osd_map')['osds']
2252 if x['osd'] == osd_id]
2253
2254 if len(osd) != 1:
2255 return None
2256
2257 return osd[0]
2258
2259 def _trigger_preview_refresh(self,
2260 specs: Optional[List[DriveGroupSpec]] = None,
2261 service_name: Optional[str] = None,
2262 ) -> None:
2263 # Only trigger a refresh when a spec has changed
2264 trigger_specs = []
2265 if specs:
2266 for spec in specs:
2267 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
2268 # the to-be-preview spec != the actual spec, this means we need to
2269 # trigger a refresh, if the spec has been removed (==None) we need to
2270 # refresh as well.
2271 if not preview_spec or spec != preview_spec:
2272 trigger_specs.append(spec)
2273 if service_name:
2274 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
2275 if not any(trigger_specs):
2276 return None
2277
2278 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
2279 for host in refresh_hosts:
2280 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
2281 self.cache.osdspec_previews_refresh_queue.append(host)
2282
2283 @handle_orch_error
2284 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
2285 """
2286 Deprecated. Please use `apply()` instead.
2287
2288 Keeping this around to be compapatible to mgr/dashboard
2289 """
2290 return [self._apply(spec) for spec in specs]
2291
2292 @handle_orch_error
2293 def create_osds(self, drive_group: DriveGroupSpec) -> str:
2294 hosts: List[HostSpec] = self.inventory.all_specs()
2295 filtered_hosts: List[str] = drive_group.placement.filter_matching_hostspecs(hosts)
2296 if not filtered_hosts:
2297 return "Invalid 'host:device' spec: host not found in cluster. Please check 'ceph orch host ls' for available hosts"
2298 return self.osd_service.create_from_spec(drive_group)
2299
2300 def _preview_osdspecs(self,
2301 osdspecs: Optional[List[DriveGroupSpec]] = None
2302 ) -> dict:
2303 if not osdspecs:
2304 return {'n/a': [{'error': True,
2305 'message': 'No OSDSpec or matching hosts found.'}]}
2306 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
2307 if not matching_hosts:
2308 return {'n/a': [{'error': True,
2309 'message': 'No OSDSpec or matching hosts found.'}]}
2310 # Is any host still loading previews or still in the queue to be previewed
2311 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
2312 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
2313 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2314 return {'n/a': [{'error': True,
2315 'message': 'Preview data is being generated.. '
2316 'Please re-run this command in a bit.'}]}
2317 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2318 previews_for_specs = {}
2319 for host, raw_reports in self.cache.osdspec_previews.items():
2320 if host not in matching_hosts:
2321 continue
2322 osd_reports = []
2323 for osd_report in raw_reports:
2324 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
2325 osd_reports.append(osd_report)
2326 previews_for_specs.update({host: osd_reports})
2327 return previews_for_specs
2328
2329 def _calc_daemon_deps(self,
2330 spec: Optional[ServiceSpec],
2331 daemon_type: str,
2332 daemon_id: str) -> List[str]:
2333 deps = []
2334 if daemon_type == 'haproxy':
2335 # because cephadm creates new daemon instances whenever
2336 # port or ip changes, identifying daemons by name is
2337 # sufficient to detect changes.
2338 if not spec:
2339 return []
2340 ingress_spec = cast(IngressSpec, spec)
2341 assert ingress_spec.backend_service
2342 daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service)
2343 deps = [d.name() for d in daemons]
2344 elif daemon_type == 'keepalived':
2345 # because cephadm creates new daemon instances whenever
2346 # port or ip changes, identifying daemons by name is
2347 # sufficient to detect changes.
2348 if not spec:
2349 return []
2350 daemons = self.cache.get_daemons_by_service(spec.service_name())
2351 deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
2352 elif daemon_type == 'agent':
2353 root_cert = ''
2354 server_port = ''
2355 try:
2356 server_port = str(self.cherrypy_thread.server_port)
2357 root_cert = self.cherrypy_thread.ssl_certs.get_root_cert()
2358 except Exception:
2359 pass
2360 deps = sorted([self.get_mgr_ip(), server_port, root_cert,
2361 str(self.device_enhanced_scan)])
2362 elif daemon_type == 'iscsi':
2363 deps = [self.get_mgr_ip()]
2364 else:
2365 need = {
2366 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
2367 'grafana': ['prometheus', 'loki'],
2368 'alertmanager': ['mgr', 'alertmanager', 'snmp-gateway'],
2369 'promtail': ['loki'],
2370 }
2371 for dep_type in need.get(daemon_type, []):
2372 for dd in self.cache.get_daemons_by_type(dep_type):
2373 deps.append(dd.name())
2374 if daemon_type == 'prometheus':
2375 deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
2376 return sorted(deps)
2377
2378 @forall_hosts
2379 def _remove_daemons(self, name: str, host: str) -> str:
2380 return CephadmServe(self)._remove_daemon(name, host)
2381
2382 def _check_pool_exists(self, pool: str, service_name: str) -> None:
2383 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
2384 if not self.rados.pool_exists(pool):
2385 raise OrchestratorError(f'Cannot find pool "{pool}" for '
2386 f'service {service_name}')
2387
2388 def _add_daemon(self,
2389 daemon_type: str,
2390 spec: ServiceSpec) -> List[str]:
2391 """
2392 Add (and place) a daemon. Require explicit host placement. Do not
2393 schedule, and do not apply the related scheduling limitations.
2394 """
2395 if spec.service_name() not in self.spec_store:
2396 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2397 'Please use `ceph orch apply ...` to create a Service.\n'
2398 'Note, you might want to create the service with "unmanaged=true"')
2399
2400 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2401 if not spec.placement.hosts:
2402 raise OrchestratorError('must specify host(s) to deploy on')
2403 count = spec.placement.count or len(spec.placement.hosts)
2404 daemons = self.cache.get_daemons_by_service(spec.service_name())
2405 return self._create_daemons(daemon_type, spec, daemons,
2406 spec.placement.hosts, count)
2407
2408 def _create_daemons(self,
2409 daemon_type: str,
2410 spec: ServiceSpec,
2411 daemons: List[DaemonDescription],
2412 hosts: List[HostPlacementSpec],
2413 count: int) -> List[str]:
2414 if count > len(hosts):
2415 raise OrchestratorError('too few hosts: want %d, have %s' % (
2416 count, hosts))
2417
2418 did_config = False
2419 service_type = daemon_type_to_service(daemon_type)
2420
2421 args = [] # type: List[CephadmDaemonDeploySpec]
2422 for host, network, name in hosts:
2423 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2424 prefix=spec.service_id,
2425 forcename=name)
2426
2427 if not did_config:
2428 self.cephadm_services[service_type].config(spec)
2429 did_config = True
2430
2431 daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
2432 host, daemon_id, network, spec,
2433 # NOTE: this does not consider port conflicts!
2434 ports=spec.get_port_start())
2435 self.log.debug('Placing %s.%s on host %s' % (
2436 daemon_type, daemon_id, host))
2437 args.append(daemon_spec)
2438
2439 # add to daemon list so next name(s) will also be unique
2440 sd = orchestrator.DaemonDescription(
2441 hostname=host,
2442 daemon_type=daemon_type,
2443 daemon_id=daemon_id,
2444 )
2445 daemons.append(sd)
2446
2447 @ forall_hosts
2448 def create_func_map(*args: Any) -> str:
2449 daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
2450 return self.wait_async(CephadmServe(self)._create_daemon(daemon_spec))
2451
2452 return create_func_map(args)
2453
2454 @handle_orch_error
2455 def add_daemon(self, spec: ServiceSpec) -> List[str]:
2456 ret: List[str] = []
2457 try:
2458 with orchestrator.set_exception_subject('service', spec.service_name(), overwrite=True):
2459 for d_type in service_to_daemon_types(spec.service_type):
2460 ret.extend(self._add_daemon(d_type, spec))
2461 return ret
2462 except OrchestratorError as e:
2463 self.events.from_orch_error(e)
2464 raise
2465
2466 @handle_orch_error
2467 def apply_mon(self, spec: ServiceSpec) -> str:
2468 return self._apply(spec)
2469
2470 def _apply(self, spec: GenericSpec) -> str:
2471 if spec.service_type == 'host':
2472 return self._add_host(cast(HostSpec, spec))
2473
2474 if spec.service_type == 'osd':
2475 # _trigger preview refresh needs to be smart and
2476 # should only refresh if a change has been detected
2477 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
2478
2479 return self._apply_service_spec(cast(ServiceSpec, spec))
2480
2481 @handle_orch_error
2482 def apply_tuned_profiles(self, specs: List[TunedProfileSpec], no_overwrite: bool = False) -> str:
2483 outs = []
2484 for spec in specs:
2485 if no_overwrite and self.tuned_profiles.exists(spec.profile_name):
2486 outs.append(f"Tuned profile '{spec.profile_name}' already exists (--no-overwrite was passed)")
2487 else:
2488 self.tuned_profiles.add_profile(spec)
2489 outs.append(f'Saved tuned profile {spec.profile_name}')
2490 self._kick_serve_loop()
2491 return '\n'.join(outs)
2492
2493 @handle_orch_error
2494 def rm_tuned_profile(self, profile_name: str) -> str:
2495 if profile_name not in self.tuned_profiles:
2496 raise OrchestratorError(
2497 f'Tuned profile {profile_name} does not exist. Nothing to remove.')
2498 self.tuned_profiles.rm_profile(profile_name)
2499 self._kick_serve_loop()
2500 return f'Removed tuned profile {profile_name}'
2501
2502 @handle_orch_error
2503 def tuned_profile_ls(self) -> List[TunedProfileSpec]:
2504 return self.tuned_profiles.list_profiles()
2505
2506 @handle_orch_error
2507 def tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> str:
2508 if profile_name not in self.tuned_profiles:
2509 raise OrchestratorError(
2510 f'Tuned profile {profile_name} does not exist. Cannot add setting.')
2511 self.tuned_profiles.add_setting(profile_name, setting, value)
2512 self._kick_serve_loop()
2513 return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
2514
2515 @handle_orch_error
2516 def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
2517 if profile_name not in self.tuned_profiles:
2518 raise OrchestratorError(
2519 f'Tuned profile {profile_name} does not exist. Cannot remove setting.')
2520 self.tuned_profiles.rm_setting(profile_name, setting)
2521 self._kick_serve_loop()
2522 return f'Removed setting {setting} from tuned profile {profile_name}'
2523
2524 def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
2525 self.health_checks[name] = {
2526 'severity': 'warning',
2527 'summary': summary,
2528 'count': count,
2529 'detail': detail,
2530 }
2531 self.set_health_checks(self.health_checks)
2532
2533 def remove_health_warning(self, name: str) -> None:
2534 if name in self.health_checks:
2535 del self.health_checks[name]
2536 self.set_health_checks(self.health_checks)
2537
2538 def _plan(self, spec: ServiceSpec) -> dict:
2539 if spec.service_type == 'osd':
2540 return {'service_name': spec.service_name(),
2541 'service_type': spec.service_type,
2542 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
2543
2544 svc = self.cephadm_services[spec.service_type]
2545 ha = HostAssignment(
2546 spec=spec,
2547 hosts=self.cache.get_schedulable_hosts(),
2548 unreachable_hosts=self.cache.get_unreachable_hosts(),
2549 draining_hosts=self.cache.get_draining_hosts(),
2550 networks=self.cache.networks,
2551 daemons=self.cache.get_daemons_by_service(spec.service_name()),
2552 allow_colo=svc.allow_colo(),
2553 rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
2554 )
2555 ha.validate()
2556 hosts, to_add, to_remove = ha.place()
2557
2558 return {
2559 'service_name': spec.service_name(),
2560 'service_type': spec.service_type,
2561 'add': [hs.hostname for hs in to_add],
2562 'remove': [d.name() for d in to_remove]
2563 }
2564
2565 @handle_orch_error
2566 def plan(self, specs: Sequence[GenericSpec]) -> List:
2567 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2568 'to the current inventory setup. If any of these conditions change, the \n'
2569 'preview will be invalid. Please make sure to have a minimal \n'
2570 'timeframe between planning and applying the specs.'}]
2571 if any([spec.service_type == 'host' for spec in specs]):
2572 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2573 for spec in specs:
2574 results.append(self._plan(cast(ServiceSpec, spec)))
2575 return results
2576
2577 def _apply_service_spec(self, spec: ServiceSpec) -> str:
2578 if spec.placement.is_empty():
2579 # fill in default placement
2580 defaults = {
2581 'mon': PlacementSpec(count=5),
2582 'mgr': PlacementSpec(count=2),
2583 'mds': PlacementSpec(count=2),
2584 'rgw': PlacementSpec(count=2),
2585 'ingress': PlacementSpec(count=2),
2586 'iscsi': PlacementSpec(count=1),
2587 'rbd-mirror': PlacementSpec(count=2),
2588 'cephfs-mirror': PlacementSpec(count=1),
2589 'nfs': PlacementSpec(count=1),
2590 'grafana': PlacementSpec(count=1),
2591 'alertmanager': PlacementSpec(count=1),
2592 'prometheus': PlacementSpec(count=1),
2593 'node-exporter': PlacementSpec(host_pattern='*'),
2594 'loki': PlacementSpec(count=1),
2595 'promtail': PlacementSpec(host_pattern='*'),
2596 'crash': PlacementSpec(host_pattern='*'),
2597 'container': PlacementSpec(count=1),
2598 'snmp-gateway': PlacementSpec(count=1),
2599 }
2600 spec.placement = defaults[spec.service_type]
2601 elif spec.service_type in ['mon', 'mgr'] and \
2602 spec.placement.count is not None and \
2603 spec.placement.count < 1:
2604 raise OrchestratorError('cannot scale %s service below 1' % (
2605 spec.service_type))
2606
2607 host_count = len(self.inventory.keys())
2608 max_count = self.max_count_per_host
2609
2610 if spec.placement.count is not None:
2611 if spec.service_type in ['mon', 'mgr']:
2612 if spec.placement.count > max(5, host_count):
2613 raise OrchestratorError(
2614 (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
2615 elif spec.service_type != 'osd':
2616 if spec.placement.count > (max_count * host_count):
2617 raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
2618 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
2619
2620 if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd':
2621 raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.'
2622 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
2623
2624 HostAssignment(
2625 spec=spec,
2626 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
2627 unreachable_hosts=self.cache.get_unreachable_hosts(),
2628 draining_hosts=self.cache.get_draining_hosts(),
2629 networks=self.cache.networks,
2630 daemons=self.cache.get_daemons_by_service(spec.service_name()),
2631 allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
2632 ).validate()
2633
2634 self.log.info('Saving service %s spec with placement %s' % (
2635 spec.service_name(), spec.placement.pretty_str()))
2636 self.spec_store.save(spec)
2637 self._kick_serve_loop()
2638 return "Scheduled %s update..." % spec.service_name()
2639
2640 @handle_orch_error
2641 def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
2642 results = []
2643 for spec in specs:
2644 if no_overwrite:
2645 if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory:
2646 results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
2647 % (cast(HostSpec, spec).hostname, spec.service_type))
2648 continue
2649 elif cast(ServiceSpec, spec).service_name() in self.spec_store:
2650 results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
2651 % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
2652 continue
2653 results.append(self._apply(spec))
2654 return results
2655
2656 @handle_orch_error
2657 def apply_mgr(self, spec: ServiceSpec) -> str:
2658 return self._apply(spec)
2659
2660 @handle_orch_error
2661 def apply_mds(self, spec: ServiceSpec) -> str:
2662 return self._apply(spec)
2663
2664 @handle_orch_error
2665 def apply_rgw(self, spec: ServiceSpec) -> str:
2666 return self._apply(spec)
2667
2668 @handle_orch_error
2669 def apply_ingress(self, spec: ServiceSpec) -> str:
2670 return self._apply(spec)
2671
2672 @handle_orch_error
2673 def apply_iscsi(self, spec: ServiceSpec) -> str:
2674 return self._apply(spec)
2675
2676 @handle_orch_error
2677 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
2678 return self._apply(spec)
2679
2680 @handle_orch_error
2681 def apply_nfs(self, spec: ServiceSpec) -> str:
2682 return self._apply(spec)
2683
2684 def _get_dashboard_url(self):
2685 # type: () -> str
2686 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2687
2688 @handle_orch_error
2689 def apply_prometheus(self, spec: ServiceSpec) -> str:
2690 return self._apply(spec)
2691
2692 @handle_orch_error
2693 def apply_loki(self, spec: ServiceSpec) -> str:
2694 return self._apply(spec)
2695
2696 @handle_orch_error
2697 def apply_promtail(self, spec: ServiceSpec) -> str:
2698 return self._apply(spec)
2699
2700 @handle_orch_error
2701 def apply_node_exporter(self, spec: ServiceSpec) -> str:
2702 return self._apply(spec)
2703
2704 @handle_orch_error
2705 def apply_crash(self, spec: ServiceSpec) -> str:
2706 return self._apply(spec)
2707
2708 @handle_orch_error
2709 def apply_grafana(self, spec: ServiceSpec) -> str:
2710 return self._apply(spec)
2711
2712 @handle_orch_error
2713 def apply_alertmanager(self, spec: ServiceSpec) -> str:
2714 return self._apply(spec)
2715
2716 @handle_orch_error
2717 def apply_container(self, spec: ServiceSpec) -> str:
2718 return self._apply(spec)
2719
2720 @handle_orch_error
2721 def apply_snmp_gateway(self, spec: ServiceSpec) -> str:
2722 return self._apply(spec)
2723
2724 @handle_orch_error
2725 def upgrade_check(self, image: str, version: str) -> str:
2726 if self.inventory.get_host_with_state("maintenance"):
2727 raise OrchestratorError("check aborted - you have hosts in maintenance state")
2728
2729 if version:
2730 target_name = self.container_image_base + ':v' + version
2731 elif image:
2732 target_name = image
2733 else:
2734 raise OrchestratorError('must specify either image or version')
2735
2736 image_info = self.wait_async(CephadmServe(self)._get_container_image_info(target_name))
2737
2738 ceph_image_version = image_info.ceph_version
2739 if not ceph_image_version:
2740 return f'Unable to extract ceph version from {target_name}.'
2741 if ceph_image_version.startswith('ceph version '):
2742 ceph_image_version = ceph_image_version.split(' ')[2]
2743 version_error = self.upgrade._check_target_version(ceph_image_version)
2744 if version_error:
2745 return f'Incompatible upgrade: {version_error}'
2746
2747 self.log.debug(f'image info {image} -> {image_info}')
2748 r: dict = {
2749 'target_name': target_name,
2750 'target_id': image_info.image_id,
2751 'target_version': image_info.ceph_version,
2752 'needs_update': dict(),
2753 'up_to_date': list(),
2754 'non_ceph_image_daemons': list()
2755 }
2756 for host, dm in self.cache.daemons.items():
2757 for name, dd in dm.items():
2758 if image_info.image_id == dd.container_image_id:
2759 r['up_to_date'].append(dd.name())
2760 elif dd.daemon_type in CEPH_IMAGE_TYPES:
2761 r['needs_update'][dd.name()] = {
2762 'current_name': dd.container_image_name,
2763 'current_id': dd.container_image_id,
2764 'current_version': dd.version,
2765 }
2766 else:
2767 r['non_ceph_image_daemons'].append(dd.name())
2768 if self.use_repo_digest and image_info.repo_digests:
2769 # FIXME: we assume the first digest is the best one to use
2770 r['target_digest'] = image_info.repo_digests[0]
2771
2772 return json.dumps(r, indent=4, sort_keys=True)
2773
2774 @handle_orch_error
2775 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
2776 return self.upgrade.upgrade_status()
2777
2778 @handle_orch_error
2779 def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
2780 return self.upgrade.upgrade_ls(image, tags, show_all_versions)
2781
2782 @handle_orch_error
2783 def upgrade_start(self, image: str, version: str, daemon_types: Optional[List[str]] = None, host_placement: Optional[str] = None,
2784 services: Optional[List[str]] = None, limit: Optional[int] = None) -> str:
2785 if self.inventory.get_host_with_state("maintenance"):
2786 raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
2787 if daemon_types is not None and services is not None:
2788 raise OrchestratorError('--daemon-types and --services are mutually exclusive')
2789 if daemon_types is not None:
2790 for dtype in daemon_types:
2791 if dtype not in CEPH_UPGRADE_ORDER:
2792 raise OrchestratorError(f'Upgrade aborted - Got unexpected daemon type "{dtype}".\n'
2793 f'Viable daemon types for this command are: {utils.CEPH_TYPES + utils.GATEWAY_TYPES}')
2794 if services is not None:
2795 for service in services:
2796 if service not in self.spec_store:
2797 raise OrchestratorError(f'Upgrade aborted - Got unknown service name "{service}".\n'
2798 f'Known services are: {self.spec_store.all_specs.keys()}')
2799 hosts: Optional[List[str]] = None
2800 if host_placement is not None:
2801 all_hosts = list(self.inventory.all_specs())
2802 placement = PlacementSpec.from_string(host_placement)
2803 hosts = placement.filter_matching_hostspecs(all_hosts)
2804 if not hosts:
2805 raise OrchestratorError(
2806 f'Upgrade aborted - hosts parameter "{host_placement}" provided did not match any hosts')
2807
2808 if limit is not None:
2809 if limit < 1:
2810 raise OrchestratorError(
2811 f'Upgrade aborted - --limit arg must be a positive integer, not {limit}')
2812
2813 return self.upgrade.upgrade_start(image, version, daemon_types, hosts, services, limit)
2814
2815 @handle_orch_error
2816 def upgrade_pause(self) -> str:
2817 return self.upgrade.upgrade_pause()
2818
2819 @handle_orch_error
2820 def upgrade_resume(self) -> str:
2821 return self.upgrade.upgrade_resume()
2822
2823 @handle_orch_error
2824 def upgrade_stop(self) -> str:
2825 return self.upgrade.upgrade_stop()
2826
2827 @handle_orch_error
2828 def remove_osds(self, osd_ids: List[str],
2829 replace: bool = False,
2830 force: bool = False,
2831 zap: bool = False) -> str:
2832 """
2833 Takes a list of OSDs and schedules them for removal.
2834 The function that takes care of the actual removal is
2835 process_removal_queue().
2836 """
2837
2838 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2839 to_remove_daemons = list()
2840 for daemon in daemons:
2841 if daemon.daemon_id in osd_ids:
2842 to_remove_daemons.append(daemon)
2843 if not to_remove_daemons:
2844 return f"Unable to find OSDs: {osd_ids}"
2845
2846 for daemon in to_remove_daemons:
2847 assert daemon.daemon_id is not None
2848 try:
2849 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2850 replace=replace,
2851 force=force,
2852 zap=zap,
2853 hostname=daemon.hostname,
2854 process_started_at=datetime_now(),
2855 remove_util=self.to_remove_osds.rm_util))
2856 except NotFoundError:
2857 return f"Unable to find OSDs: {osd_ids}"
2858
2859 # trigger the serve loop to initiate the removal
2860 self._kick_serve_loop()
2861 warning_zap = "" if zap else ("\nVG/LV for the OSDs won't be zapped (--zap wasn't passed).\n"
2862 "Run the `ceph-volume lvm zap` command with `--destroy`"
2863 " against the VG/LV if you want them to be destroyed.")
2864 return f"Scheduled OSD(s) for removal.{warning_zap}"
2865
2866 @handle_orch_error
2867 def stop_remove_osds(self, osd_ids: List[str]) -> str:
2868 """
2869 Stops a `removal` process for a List of OSDs.
2870 This will revert their weight and remove it from the osds_to_remove queue
2871 """
2872 for osd_id in osd_ids:
2873 try:
2874 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
2875 remove_util=self.to_remove_osds.rm_util))
2876 except (NotFoundError, KeyError, ValueError):
2877 return f'Unable to find OSD in the queue: {osd_id}'
2878
2879 # trigger the serve loop to halt the removal
2880 self._kick_serve_loop()
2881 return "Stopped OSD(s) removal"
2882
2883 @handle_orch_error
2884 def remove_osds_status(self) -> List[OSD]:
2885 """
2886 The CLI call to retrieve an osd removal report
2887 """
2888 return self.to_remove_osds.all_osds()
2889
2890 @handle_orch_error
2891 def drain_host(self, hostname, force=False):
2892 # type: (str, bool) -> str
2893 """
2894 Drain all daemons from a host.
2895 :param host: host name
2896 """
2897
2898 # if we drain the last admin host we could end up removing the only instance
2899 # of the config and keyring and cause issues
2900 if not force:
2901 p = PlacementSpec(label='_admin')
2902 admin_hosts = p.filter_matching_hostspecs(self.inventory.all_specs())
2903 if len(admin_hosts) == 1 and admin_hosts[0] == hostname:
2904 raise OrchestratorValidationError(f"Host {hostname} is the last host with the '_admin'"
2905 " label.\nDraining this host could cause the removal"
2906 " of the last cluster config/keyring managed by cephadm.\n"
2907 "It is recommended to add the _admin label to another host"
2908 " before completing this operation.\nIf you're certain this is"
2909 " what you want rerun this command with --force.")
2910
2911 self.add_host_label(hostname, '_no_schedule')
2912
2913 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
2914
2915 osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
2916 self.remove_osds(osds_to_remove)
2917
2918 daemons_table = ""
2919 daemons_table += "{:<20} {:<15}\n".format("type", "id")
2920 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
2921 for d in daemons:
2922 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
2923
2924 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table)
2925
2926 def trigger_connect_dashboard_rgw(self) -> None:
2927 self.need_connect_dashboard_rgw = True
2928 self.event.set()