]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/module.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
CommitLineData
9f95a23c
TL
1import json
2import errno
3import logging
f91f0fd5
TL
4import re
5import shlex
e306af50 6from collections import defaultdict
f91f0fd5 7from configparser import ConfigParser
9f95a23c 8from functools import wraps
e306af50
TL
9from tempfile import TemporaryDirectory
10from threading import Event
9f95a23c
TL
11
12import string
e306af50 13from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
f67539c2 14 Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type
9f95a23c
TL
15
16import datetime
9f95a23c
TL
17import os
18import random
19import tempfile
20import multiprocessing.pool
9f95a23c 21import subprocess
f67539c2 22from prettytable import PrettyTable
9f95a23c 23
e306af50 24from ceph.deployment import inventory
9f95a23c 25from ceph.deployment.drive_group import DriveGroupSpec
801d1391 26from ceph.deployment.service_spec import \
a4b75251 27 ServiceSpec, PlacementSpec, assert_valid_host, \
f67539c2 28 HostPlacementSpec, IngressSpec
adb31ebb 29from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f91f0fd5 30from cephadm.serve import CephadmServe
f67539c2 31from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
9f95a23c 32
f67539c2
TL
33from mgr_module import MgrModule, HandleCommandResult, Option
34from mgr_util import create_self_signed_cert
35import secrets
9f95a23c 36import orchestrator
f67539c2
TL
37from orchestrator.module import to_format, Format
38
9f95a23c 39from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
f67539c2
TL
40 CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
41 service_to_daemon_types
e306af50 42from orchestrator._interface import GenericSpec
f67539c2 43from orchestrator._interface import daemon_type_to_service
9f95a23c
TL
44
45from . import remotes
801d1391 46from . import utils
f6b5b4d7 47from .migrations import Migrations
e306af50 48from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
f67539c2
TL
49 RbdMirrorService, CrashService, CephadmService, CephfsMirrorService
50from .services.ingress import IngressService
f91f0fd5 51from .services.container import CustomContainerService
e306af50
TL
52from .services.iscsi import IscsiService
53from .services.nfs import NFSService
f67539c2 54from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
e306af50
TL
55from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
56 NodeExporterService
f67539c2 57from .services.exporter import CephadmExporter, CephadmExporterConfig
f91f0fd5 58from .schedule import HostAssignment
b3b6e05e 59from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec
f67539c2 60from .upgrade import CephadmUpgrade
e306af50 61from .template import TemplateMgr
a4b75251 62from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage
f67539c2 63from .configchecks import CephadmConfigChecks
9f95a23c
TL
64
65try:
66 import remoto
e306af50
TL
67 # NOTE(mattoliverau) Patch remoto until remoto PR
68 # (https://github.com/alfredodeza/remoto/pull/56) lands
69 from distutils.version import StrictVersion
70 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
adb31ebb 71 def remoto_has_connection(self: Any) -> bool:
e306af50
TL
72 return self.gateway.hasreceiver()
73
74 from remoto.backends import BaseConnection
75 BaseConnection.has_connection = remoto_has_connection
9f95a23c 76 import remoto.process
9f95a23c
TL
77except ImportError as e:
78 remoto = None
79 remoto_import_error = str(e)
80
9f95a23c
TL
81logger = logging.getLogger(__name__)
82
e306af50
TL
83T = TypeVar('T')
84
1911f103
TL
85DEFAULT_SSH_CONFIG = """
86Host *
87 User root
88 StrictHostKeyChecking no
89 UserKnownHostsFile /dev/null
90 ConnectTimeout=30
91"""
9f95a23c 92
f67539c2 93# Default container images -----------------------------------------------------
522d829b
TL
94DEFAULT_IMAGE = 'quay.io/ceph/ceph'
95DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.18.1'
96DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v0.18.1'
97DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.20.0'
98DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:6.7.4'
b3b6e05e 99DEFAULT_HAPROXY_IMAGE = 'docker.io/library/haproxy:2.3'
522d829b 100DEFAULT_KEEPALIVED_IMAGE = 'docker.io/arcts/keepalived'
f67539c2
TL
101# ------------------------------------------------------------------------------
102
103
104def service_inactive(spec_name: str) -> Callable:
105 def inner(func: Callable) -> Callable:
106 @wraps(func)
107 def wrapper(*args: Any, **kwargs: Any) -> Any:
108 obj = args[0]
109 if obj.get_store(f"spec.{spec_name}") is not None:
110 return 1, "", f"Unable to change configuration of an active service {spec_name}"
111 return func(*args, **kwargs)
112 return wrapper
113 return inner
114
115
116def host_exists(hostname_position: int = 1) -> Callable:
117 """Check that a hostname exists in the inventory"""
118 def inner(func: Callable) -> Callable:
119 @wraps(func)
120 def wrapper(*args: Any, **kwargs: Any) -> Any:
121 this = args[0] # self object
122 hostname = args[hostname_position]
123 if hostname not in this.cache.get_hosts():
124 candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)])
125 help_msg = f"Did you mean {candidates}?" if candidates else ""
126 raise OrchestratorError(
127 f"Cannot find host '{hostname}' in the inventory. {help_msg}")
9f95a23c 128
f67539c2
TL
129 return func(*args, **kwargs)
130 return wrapper
131 return inner
f91f0fd5
TL
132
133
f67539c2
TL
134class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
135 metaclass=CLICommandMeta):
9f95a23c
TL
136
137 _STORE_HOST_PREFIX = "host"
138
139 instance = None
140 NATIVE_OPTIONS = [] # type: List[Any]
f67539c2
TL
141 MODULE_OPTIONS = [
142 Option(
143 'ssh_config_file',
144 type='str',
145 default=None,
146 desc='customized SSH config file to connect to managed hosts',
147 ),
148 Option(
149 'device_cache_timeout',
150 type='secs',
151 default=30 * 60,
152 desc='seconds to cache device inventory',
153 ),
154 Option(
155 'device_enhanced_scan',
156 type='bool',
157 default=False,
158 desc='Use libstoragemgmt during device scans',
159 ),
160 Option(
161 'daemon_cache_timeout',
162 type='secs',
163 default=10 * 60,
164 desc='seconds to cache service (daemon) inventory',
165 ),
166 Option(
167 'facts_cache_timeout',
168 type='secs',
169 default=1 * 60,
170 desc='seconds to cache host facts data',
171 ),
172 Option(
173 'host_check_interval',
174 type='secs',
175 default=10 * 60,
176 desc='how frequently to perform a host check',
177 ),
178 Option(
179 'mode',
180 type='str',
181 enum_allowed=['root', 'cephadm-package'],
182 default='root',
183 desc='mode for remote execution of cephadm',
184 ),
185 Option(
186 'container_image_base',
187 default=DEFAULT_IMAGE,
188 desc='Container image name, without the tag',
189 runtime=True,
190 ),
191 Option(
192 'container_image_prometheus',
193 default=DEFAULT_PROMETHEUS_IMAGE,
194 desc='Prometheus container image',
195 ),
196 Option(
197 'container_image_grafana',
198 default=DEFAULT_GRAFANA_IMAGE,
199 desc='Prometheus container image',
200 ),
201 Option(
202 'container_image_alertmanager',
203 default=DEFAULT_ALERT_MANAGER_IMAGE,
204 desc='Prometheus container image',
205 ),
206 Option(
207 'container_image_node_exporter',
208 default=DEFAULT_NODE_EXPORTER_IMAGE,
209 desc='Prometheus container image',
210 ),
211 Option(
212 'container_image_haproxy',
b3b6e05e 213 default=DEFAULT_HAPROXY_IMAGE,
f67539c2
TL
214 desc='HAproxy container image',
215 ),
216 Option(
217 'container_image_keepalived',
522d829b 218 default=DEFAULT_KEEPALIVED_IMAGE,
f67539c2
TL
219 desc='Keepalived container image',
220 ),
221 Option(
222 'warn_on_stray_hosts',
223 type='bool',
224 default=True,
225 desc='raise a health warning if daemons are detected on a host '
226 'that is not managed by cephadm',
227 ),
228 Option(
229 'warn_on_stray_daemons',
230 type='bool',
231 default=True,
232 desc='raise a health warning if daemons are detected '
233 'that are not managed by cephadm',
234 ),
235 Option(
236 'warn_on_failed_host_check',
237 type='bool',
238 default=True,
239 desc='raise a health warning if the host check fails',
240 ),
241 Option(
242 'log_to_cluster',
243 type='bool',
244 default=True,
245 desc='log to the "cephadm" cluster log channel"',
246 ),
247 Option(
248 'allow_ptrace',
249 type='bool',
250 default=False,
251 desc='allow SYS_PTRACE capability on ceph containers',
252 long_desc='The SYS_PTRACE capability is needed to attach to a '
253 'process with gdb or strace. Enabling this options '
254 'can allow debugging daemons that encounter problems '
255 'at runtime.',
256 ),
257 Option(
258 'container_init',
259 type='bool',
260 default=True,
261 desc='Run podman/docker with `--init`'
262 ),
263 Option(
264 'prometheus_alerts_path',
265 type='str',
266 default='/etc/prometheus/ceph/ceph_default_alerts.yml',
267 desc='location of alerts to include in prometheus deployments',
268 ),
269 Option(
270 'migration_current',
271 type='int',
272 default=None,
273 desc='internal - do not modify',
f6b5b4d7 274 # used to track track spec and other data migrations.
f67539c2
TL
275 ),
276 Option(
277 'config_dashboard',
278 type='bool',
279 default=True,
280 desc='manage configs like API endpoints in Dashboard.'
281 ),
282 Option(
283 'manage_etc_ceph_ceph_conf',
284 type='bool',
285 default=False,
286 desc='Manage and own /etc/ceph/ceph.conf on the hosts.',
287 ),
b3b6e05e
TL
288 Option(
289 'manage_etc_ceph_ceph_conf_hosts',
290 type='str',
291 default='*',
292 desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf',
293 ),
f67539c2
TL
294 Option(
295 'registry_url',
296 type='str',
297 default=None,
298 desc='Custom repository url'
299 ),
300 Option(
301 'registry_username',
302 type='str',
303 default=None,
304 desc='Custom repository username'
305 ),
306 Option(
307 'registry_password',
308 type='str',
309 default=None,
310 desc='Custom repository password'
311 ),
a4b75251
TL
312 Option(
313 'registry_insecure',
314 type='bool',
315 default=False,
316 desc='Registry is to be considered insecure (no TLS available). Only for development purposes.'
317 ),
f67539c2
TL
318 Option(
319 'use_repo_digest',
320 type='bool',
321 default=True,
322 desc='Automatically convert image tags to image digest. Make sure all daemons use the same image',
323 ),
324 Option(
325 'config_checks_enabled',
326 type='bool',
327 default=False,
328 desc='Enable or disable the cephadm configuration analysis',
329 ),
330 Option(
331 'default_registry',
332 type='str',
333 default='docker.io',
334 desc='Registry to which we should normalize unqualified image names',
335 ),
336 Option(
337 'max_count_per_host',
338 type='int',
339 default=10,
340 desc='max number of daemons per service per host',
341 ),
b3b6e05e
TL
342 Option(
343 'autotune_memory_target_ratio',
344 type='float',
345 default=.7,
346 desc='ratio of total system memory to divide amongst autotuned daemons'
347 ),
348 Option(
349 'autotune_interval',
350 type='secs',
351 default=10 * 60,
352 desc='how frequently to autotune daemon memory'
353 ),
a4b75251
TL
354 Option(
355 'max_osd_draining_count',
356 type='int',
357 default=10,
358 desc='max number of osds that will be drained simultaneously when osds are removed'
359 ),
9f95a23c
TL
360 ]
361
adb31ebb 362 def __init__(self, *args: Any, **kwargs: Any):
9f95a23c 363 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
f67539c2 364 self._cluster_fsid: str = self.get('mon_map')['fsid']
f6b5b4d7 365 self.last_monmap: Optional[datetime.datetime] = None
9f95a23c
TL
366
367 # for serve()
368 self.run = True
369 self.event = Event()
370
371 if self.get_store('pause'):
372 self.paused = True
373 else:
374 self.paused = False
375
376 # for mypy which does not run the code
377 if TYPE_CHECKING:
378 self.ssh_config_file = None # type: Optional[str]
379 self.device_cache_timeout = 0
380 self.daemon_cache_timeout = 0
adb31ebb 381 self.facts_cache_timeout = 0
9f95a23c 382 self.host_check_interval = 0
f67539c2 383 self.max_count_per_host = 0
9f95a23c
TL
384 self.mode = ''
385 self.container_image_base = ''
e306af50
TL
386 self.container_image_prometheus = ''
387 self.container_image_grafana = ''
388 self.container_image_alertmanager = ''
389 self.container_image_node_exporter = ''
f67539c2
TL
390 self.container_image_haproxy = ''
391 self.container_image_keepalived = ''
9f95a23c
TL
392 self.warn_on_stray_hosts = True
393 self.warn_on_stray_daemons = True
394 self.warn_on_failed_host_check = True
395 self.allow_ptrace = False
f67539c2 396 self.container_init = True
801d1391 397 self.prometheus_alerts_path = ''
adb31ebb 398 self.migration_current: Optional[int] = None
f6b5b4d7
TL
399 self.config_dashboard = True
400 self.manage_etc_ceph_ceph_conf = True
b3b6e05e 401 self.manage_etc_ceph_ceph_conf_hosts = '*'
f6b5b4d7
TL
402 self.registry_url: Optional[str] = None
403 self.registry_username: Optional[str] = None
404 self.registry_password: Optional[str] = None
a4b75251 405 self.registry_insecure: bool = False
f67539c2
TL
406 self.use_repo_digest = True
407 self.default_registry = ''
b3b6e05e
TL
408 self.autotune_memory_target_ratio = 0.0
409 self.autotune_interval = 0
a4b75251
TL
410 self.apply_spec_fails: List[Tuple[str, str]] = []
411 self.max_osd_draining_count = 10
9f95a23c 412
f91f0fd5
TL
413 self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
414 remoto.backends.LegacyModuleExecute]] = {}
f6b5b4d7
TL
415
416 self.notify('mon_map', None)
9f95a23c
TL
417 self.config_notify()
418
419 path = self.get_ceph_option('cephadm_path')
420 try:
f67539c2 421 assert isinstance(path, str)
9f95a23c
TL
422 with open(path, 'r') as f:
423 self._cephadm = f.read()
424 except (IOError, TypeError) as e:
425 raise RuntimeError("unable to read cephadm at '%s': %s" % (
426 path, str(e)))
427
f67539c2
TL
428 self.cephadm_binary_path = self._get_cephadm_binary_path()
429
9f95a23c
TL
430 self._worker_pool = multiprocessing.pool.ThreadPool(10)
431
432 self._reconfig_ssh()
433
434 CephadmOrchestrator.instance = self
435
e306af50 436 self.upgrade = CephadmUpgrade(self)
9f95a23c 437
adb31ebb 438 self.health_checks: Dict[str, dict] = {}
9f95a23c 439
e306af50 440 self.inventory = Inventory(self)
9f95a23c
TL
441
442 self.cache = HostCache(self)
443 self.cache.load()
f6b5b4d7 444
adb31ebb
TL
445 self.to_remove_osds = OSDRemovalQueue(self)
446 self.to_remove_osds.load_from_store()
9f95a23c
TL
447
448 self.spec_store = SpecStore(self)
449 self.spec_store.load()
450
b3b6e05e
TL
451 self.keys = ClientKeyringStore(self)
452 self.keys.load()
453
9f95a23c
TL
454 # ensure the host lists are in sync
455 for h in self.inventory.keys():
456 if h not in self.cache.daemons:
457 self.cache.prime_empty_host(h)
458 for h in self.cache.get_hosts():
459 if h not in self.inventory:
460 self.cache.rm_host(h)
461
1911f103 462 # in-memory only.
f6b5b4d7 463 self.events = EventStore(self)
1911f103
TL
464 self.offline_hosts: Set[str] = set()
465
f6b5b4d7
TL
466 self.migration = Migrations(self)
467
f67539c2
TL
468 _service_clses: Sequence[Type[CephadmService]] = [
469 OSDService, NFSService, MonService, MgrService, MdsService,
470 RgwService, RbdMirrorService, GrafanaService, AlertmanagerService,
471 PrometheusService, NodeExporterService, CrashService, IscsiService,
472 IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService
473 ]
474
475 # https://github.com/python/mypy/issues/8993
476 self.cephadm_services: Dict[str, CephadmService] = {
477 cls.TYPE: cls(self) for cls in _service_clses} # type: ignore
478
479 self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr'])
480 self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
e306af50 481
f91f0fd5 482 self.template = TemplateMgr(self)
e306af50 483
adb31ebb 484 self.requires_post_actions: Set[str] = set()
522d829b 485 self.need_connect_dashboard_rgw = False
f6b5b4d7 486
f67539c2
TL
487 self.config_checker = CephadmConfigChecks(self)
488
adb31ebb 489 def shutdown(self) -> None:
9f95a23c
TL
490 self.log.debug('shutdown')
491 self._worker_pool.close()
492 self._worker_pool.join()
493 self.run = False
494 self.event.set()
495
e306af50
TL
496 def _get_cephadm_service(self, service_type: str) -> CephadmService:
497 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
498 return self.cephadm_services[service_type]
499
f67539c2
TL
500 def _get_cephadm_binary_path(self) -> str:
501 import hashlib
502 m = hashlib.sha256()
503 m.update(self._cephadm.encode())
504 return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}'
505
adb31ebb 506 def _kick_serve_loop(self) -> None:
9f95a23c
TL
507 self.log.debug('_kick_serve_loop')
508 self.event.set()
509
f6b5b4d7
TL
510 def serve(self) -> None:
511 """
512 The main loop of cephadm.
513
514 A command handler will typically change the declarative state
515 of cephadm. This loop will then attempt to apply this new state.
516 """
f91f0fd5
TL
517 serve = CephadmServe(self)
518 serve.serve()
519
adb31ebb 520 def set_container_image(self, entity: str, image: str) -> None:
f91f0fd5
TL
521 self.check_mon_command({
522 'prefix': 'config set',
523 'name': 'container_image',
524 'value': image,
525 'who': entity,
526 })
f6b5b4d7 527
adb31ebb 528 def config_notify(self) -> None:
9f95a23c
TL
529 """
530 This method is called whenever one of our config options is changed.
f6b5b4d7
TL
531
532 TODO: this method should be moved into mgr_module.py
9f95a23c
TL
533 """
534 for opt in self.MODULE_OPTIONS:
535 setattr(self,
536 opt['name'], # type: ignore
537 self.get_module_option(opt['name'])) # type: ignore
538 self.log.debug(' mgr option %s = %s',
539 opt['name'], getattr(self, opt['name'])) # type: ignore
540 for opt in self.NATIVE_OPTIONS:
541 setattr(self,
542 opt, # type: ignore
543 self.get_ceph_option(opt))
544 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
545
546 self.event.set()
547
adb31ebb 548 def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
f6b5b4d7
TL
549 if notify_type == "mon_map":
550 # get monmap mtime so we can refresh configs when mons change
551 monmap = self.get('mon_map')
adb31ebb
TL
552 self.last_monmap = str_to_datetime(monmap['modified'])
553 if self.last_monmap and self.last_monmap > datetime_now():
f6b5b4d7 554 self.last_monmap = None # just in case clocks are skewed
f91f0fd5
TL
555 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
556 # getattr, due to notify() being called before config_notify()
557 self._kick_serve_loop()
f6b5b4d7
TL
558 if notify_type == "pg_summary":
559 self._trigger_osd_removal()
560
adb31ebb 561 def _trigger_osd_removal(self) -> None:
f6b5b4d7
TL
562 data = self.get("osd_stats")
563 for osd in data.get('osd_stats', []):
564 if osd.get('num_pgs') == 0:
565 # if _ANY_ osd that is currently in the queue appears to be empty,
566 # start the removal process
567 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
f67539c2 568 self.log.debug('Found empty osd. Starting removal process')
f6b5b4d7
TL
569 # if the osd that is now empty is also part of the removal queue
570 # start the process
adb31ebb 571 self._kick_serve_loop()
9f95a23c 572
adb31ebb 573 def pause(self) -> None:
9f95a23c
TL
574 if not self.paused:
575 self.log.info('Paused')
576 self.set_store('pause', 'true')
577 self.paused = True
578 # wake loop so we update the health status
579 self._kick_serve_loop()
580
adb31ebb 581 def resume(self) -> None:
9f95a23c
TL
582 if self.paused:
583 self.log.info('Resumed')
584 self.paused = False
585 self.set_store('pause', None)
586 # unconditionally wake loop so that 'orch resume' can be used to kick
587 # cephadm
588 self._kick_serve_loop()
589
b3b6e05e
TL
590 def get_unique_name(
591 self,
592 daemon_type: str,
593 host: str,
594 existing: List[orchestrator.DaemonDescription],
595 prefix: Optional[str] = None,
596 forcename: Optional[str] = None,
597 rank: Optional[int] = None,
598 rank_generation: Optional[int] = None,
599 ) -> str:
9f95a23c
TL
600 """
601 Generate a unique random service name
602 """
603 suffix = daemon_type not in [
b3b6e05e 604 'mon', 'crash',
9f95a23c 605 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
f67539c2 606 'container', 'cephadm-exporter',
9f95a23c
TL
607 ]
608 if forcename:
609 if len([d for d in existing if d.daemon_id == forcename]):
f91f0fd5
TL
610 raise orchestrator.OrchestratorValidationError(
611 f'name {daemon_type}.{forcename} already in use')
9f95a23c
TL
612 return forcename
613
614 if '.' in host:
615 host = host.split('.')[0]
616 while True:
617 if prefix:
618 name = prefix + '.'
619 else:
620 name = ''
b3b6e05e
TL
621 if rank is not None and rank_generation is not None:
622 name += f'{rank}.{rank_generation}.'
9f95a23c
TL
623 name += host
624 if suffix:
625 name += '.' + ''.join(random.choice(string.ascii_lowercase)
626 for _ in range(6))
627 if len([d for d in existing if d.daemon_id == name]):
628 if not suffix:
f91f0fd5
TL
629 raise orchestrator.OrchestratorValidationError(
630 f'name {daemon_type}.{name} already in use')
9f95a23c
TL
631 self.log.debug('name %s exists, trying again', name)
632 continue
633 return name
634
adb31ebb 635 def _reconfig_ssh(self) -> None:
9f95a23c
TL
636 temp_files = [] # type: list
637 ssh_options = [] # type: List[str]
638
639 # ssh_config
640 ssh_config_fname = self.ssh_config_file
641 ssh_config = self.get_store("ssh_config")
642 if ssh_config is not None or ssh_config_fname is None:
643 if not ssh_config:
644 ssh_config = DEFAULT_SSH_CONFIG
645 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
646 os.fchmod(f.fileno(), 0o600)
647 f.write(ssh_config.encode('utf-8'))
648 f.flush() # make visible to other processes
649 temp_files += [f]
650 ssh_config_fname = f.name
651 if ssh_config_fname:
801d1391 652 self.validate_ssh_config_fname(ssh_config_fname)
9f95a23c 653 ssh_options += ['-F', ssh_config_fname]
f6b5b4d7 654 self.ssh_config = ssh_config
9f95a23c
TL
655
656 # identity
657 ssh_key = self.get_store("ssh_identity_key")
658 ssh_pub = self.get_store("ssh_identity_pub")
659 self.ssh_pub = ssh_pub
660 self.ssh_key = ssh_key
661 if ssh_key and ssh_pub:
662 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
663 tkey.write(ssh_key.encode('utf-8'))
664 os.fchmod(tkey.fileno(), 0o600)
665 tkey.flush() # make visible to other processes
666 tpub = open(tkey.name + '.pub', 'w')
667 os.fchmod(tpub.fileno(), 0o600)
668 tpub.write(ssh_pub)
669 tpub.flush() # make visible to other processes
670 temp_files += [tkey, tpub]
671 ssh_options += ['-i', tkey.name]
672
673 self._temp_files = temp_files
674 if ssh_options:
675 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
676 else:
677 self._ssh_options = None
678
679 if self.mode == 'root':
f6b5b4d7 680 self.ssh_user = self.get_store('ssh_user', default='root')
9f95a23c
TL
681 elif self.mode == 'cephadm-package':
682 self.ssh_user = 'cephadm'
683
684 self._reset_cons()
685
adb31ebb 686 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
f91f0fd5
TL
687 if ssh_config is None or len(ssh_config.strip()) == 0:
688 raise OrchestratorValidationError('ssh_config cannot be empty')
689 # StrictHostKeyChecking is [yes|no] ?
f67539c2
TL
690 res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
691 if not res:
f91f0fd5 692 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
f67539c2 693 for s in res:
f91f0fd5
TL
694 if 'ask' in s.lower():
695 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
696
adb31ebb 697 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
801d1391
TL
698 if not os.path.isfile(ssh_config_fname):
699 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
700 ssh_config_fname))
701
adb31ebb 702 def _reset_con(self, host: str) -> None:
9f95a23c
TL
703 conn, r = self._cons.get(host, (None, None))
704 if conn:
705 self.log.debug('_reset_con close %s' % host)
706 conn.exit()
707 del self._cons[host]
708
adb31ebb 709 def _reset_cons(self) -> None:
9f95a23c
TL
710 for host, conn_and_r in self._cons.items():
711 self.log.debug('_reset_cons close %s' % host)
712 conn, r = conn_and_r
713 conn.exit()
714 self._cons = {}
715
adb31ebb 716 def offline_hosts_remove(self, host: str) -> None:
1911f103
TL
717 if host in self.offline_hosts:
718 self.offline_hosts.remove(host)
719
9f95a23c 720 @staticmethod
adb31ebb 721 def can_run() -> Tuple[bool, str]:
9f95a23c
TL
722 if remoto is not None:
723 return True, ""
724 else:
725 return False, "loading remoto library:{}".format(
f91f0fd5 726 remoto_import_error)
9f95a23c 727
f67539c2 728 def available(self) -> Tuple[bool, str, Dict[str, Any]]:
9f95a23c
TL
729 """
730 The cephadm orchestrator is always available.
731 """
f6b5b4d7
TL
732 ok, err = self.can_run()
733 if not ok:
f67539c2 734 return ok, err, {}
f6b5b4d7 735 if not self.ssh_key or not self.ssh_pub:
f67539c2 736 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {}
9f95a23c 737
f67539c2
TL
738 # mypy is unable to determine type for _processes since it's private
739 worker_count: int = self._worker_pool._processes # type: ignore
740 ret = {
741 "workers": worker_count,
742 "paused": self.paused,
743 }
9f95a23c 744
f67539c2 745 return True, err, ret
9f95a23c 746
b3b6e05e
TL
747 def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
748 self.set_store(what, new)
749 self._reconfig_ssh()
750 if self.cache.get_hosts():
751 # Can't check anything without hosts
752 host = self.cache.get_hosts()[0]
753 r = CephadmServe(self)._check_host(host)
754 if r is not None:
755 # connection failed reset user
756 self.set_store(what, old)
757 self._reconfig_ssh()
758 raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
759 self.log.info(f'Set ssh {what}')
760
9f95a23c 761 @orchestrator._cli_write_command(
f67539c2 762 prefix='cephadm set-ssh-config')
adb31ebb 763 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
9f95a23c 764 """
f67539c2 765 Set the ssh_config file (use -i <ssh_config>)
9f95a23c 766 """
f67539c2
TL
767 # Set an ssh_config file provided from stdin
768
b3b6e05e
TL
769 old = self.ssh_config
770 if inbuf == old:
f6b5b4d7 771 return 0, "value unchanged", ""
f91f0fd5 772 self.validate_ssh_config_content(inbuf)
b3b6e05e 773 self._validate_and_set_ssh_val('ssh_config', inbuf, old)
9f95a23c
TL
774 return 0, "", ""
775
f67539c2 776 @orchestrator._cli_write_command('cephadm clear-ssh-config')
adb31ebb 777 def _clear_ssh_config(self) -> Tuple[int, str, str]:
9f95a23c 778 """
f67539c2 779 Clear the ssh_config file
9f95a23c 780 """
f67539c2 781 # Clear the ssh_config file provided from stdin
9f95a23c
TL
782 self.set_store("ssh_config", None)
783 self.ssh_config_tmp = None
784 self.log.info('Cleared ssh_config')
f6b5b4d7 785 self._reconfig_ssh()
9f95a23c
TL
786 return 0, "", ""
787
f67539c2 788 @orchestrator._cli_read_command('cephadm get-ssh-config')
adb31ebb 789 def _get_ssh_config(self) -> HandleCommandResult:
f67539c2
TL
790 """
791 Returns the ssh config as used by cephadm
792 """
801d1391
TL
793 if self.ssh_config_file:
794 self.validate_ssh_config_fname(self.ssh_config_file)
795 with open(self.ssh_config_file) as f:
796 return HandleCommandResult(stdout=f.read())
797 ssh_config = self.get_store("ssh_config")
798 if ssh_config:
799 return HandleCommandResult(stdout=ssh_config)
800 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
801
f67539c2 802 @orchestrator._cli_write_command('cephadm generate-key')
adb31ebb 803 def _generate_key(self) -> Tuple[int, str, str]:
f67539c2
TL
804 """
805 Generate a cluster SSH key (if not present)
806 """
9f95a23c
TL
807 if not self.ssh_pub or not self.ssh_key:
808 self.log.info('Generating ssh key...')
809 tmp_dir = TemporaryDirectory()
810 path = tmp_dir.name + '/key'
811 try:
1911f103 812 subprocess.check_call([
9f95a23c
TL
813 '/usr/bin/ssh-keygen',
814 '-C', 'ceph-%s' % self._cluster_fsid,
815 '-N', '',
816 '-f', path
817 ])
818 with open(path, 'r') as f:
819 secret = f.read()
820 with open(path + '.pub', 'r') as f:
821 pub = f.read()
822 finally:
823 os.unlink(path)
824 os.unlink(path + '.pub')
825 tmp_dir.cleanup()
826 self.set_store('ssh_identity_key', secret)
827 self.set_store('ssh_identity_pub', pub)
828 self._reconfig_ssh()
829 return 0, '', ''
830
e306af50 831 @orchestrator._cli_write_command(
f67539c2 832 'cephadm set-priv-key')
adb31ebb 833 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 834 """Set cluster SSH private key (use -i <private_key>)"""
e306af50
TL
835 if inbuf is None or len(inbuf) == 0:
836 return -errno.EINVAL, "", "empty private ssh key provided"
b3b6e05e
TL
837 old = self.ssh_key
838 if inbuf == old:
f6b5b4d7 839 return 0, "value unchanged", ""
b3b6e05e 840 self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old)
e306af50 841 self.log.info('Set ssh private key')
e306af50
TL
842 return 0, "", ""
843
844 @orchestrator._cli_write_command(
f67539c2 845 'cephadm set-pub-key')
adb31ebb 846 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 847 """Set cluster SSH public key (use -i <public_key>)"""
e306af50
TL
848 if inbuf is None or len(inbuf) == 0:
849 return -errno.EINVAL, "", "empty public ssh key provided"
b3b6e05e
TL
850 old = self.ssh_pub
851 if inbuf == old:
f6b5b4d7 852 return 0, "value unchanged", ""
b3b6e05e 853 self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
e306af50
TL
854 return 0, "", ""
855
9f95a23c 856 @orchestrator._cli_write_command(
f67539c2 857 'cephadm clear-key')
adb31ebb 858 def _clear_key(self) -> Tuple[int, str, str]:
f67539c2 859 """Clear cluster SSH key"""
9f95a23c
TL
860 self.set_store('ssh_identity_key', None)
861 self.set_store('ssh_identity_pub', None)
862 self._reconfig_ssh()
863 self.log.info('Cleared cluster SSH key')
864 return 0, '', ''
865
866 @orchestrator._cli_read_command(
f67539c2 867 'cephadm get-pub-key')
adb31ebb 868 def _get_pub_key(self) -> Tuple[int, str, str]:
f67539c2 869 """Show SSH public key for connecting to cluster hosts"""
9f95a23c
TL
870 if self.ssh_pub:
871 return 0, self.ssh_pub, ''
872 else:
873 return -errno.ENOENT, '', 'No cluster SSH key defined'
874
875 @orchestrator._cli_read_command(
f67539c2 876 'cephadm get-user')
adb31ebb 877 def _get_user(self) -> Tuple[int, str, str]:
f67539c2
TL
878 """
879 Show user for SSHing to cluster hosts
880 """
881 if self.ssh_user is None:
882 return -errno.ENOENT, '', 'No cluster SSH user configured'
883 else:
884 return 0, self.ssh_user, ''
9f95a23c 885
f6b5b4d7 886 @orchestrator._cli_read_command(
f67539c2 887 'cephadm set-user')
adb31ebb 888 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
f67539c2
TL
889 """
890 Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users
891 """
f6b5b4d7
TL
892 current_user = self.ssh_user
893 if user == current_user:
894 return 0, "value unchanged", ""
895
b3b6e05e 896 self._validate_and_set_ssh_val('ssh_user', user, current_user)
f6b5b4d7
TL
897
898 msg = 'ssh user set to %s' % user
899 if user != 'root':
b3b6e05e 900 msg += '. sudo will be used'
f6b5b4d7
TL
901 self.log.info(msg)
902 return 0, msg, ''
903
904 @orchestrator._cli_read_command(
f67539c2 905 'cephadm registry-login')
adb31ebb 906 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2
TL
907 """
908 Set custom registry login info by providing url, username and password or json file with login info (-i <file>)
909 """
f6b5b4d7
TL
910 # if password not given in command line, get it through file input
911 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
912 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
f91f0fd5 913 "or -i <login credentials json file>")
f6b5b4d7 914 elif not (url and username and password):
adb31ebb 915 assert isinstance(inbuf, str)
f6b5b4d7
TL
916 login_info = json.loads(inbuf)
917 if "url" in login_info and "username" in login_info and "password" in login_info:
918 url = login_info["url"]
919 username = login_info["username"]
920 password = login_info["password"]
921 else:
922 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
f91f0fd5
TL
923 "Please setup json file as\n"
924 "{\n"
925 " \"url\": \"REGISTRY_URL\",\n"
926 " \"username\": \"REGISTRY_USERNAME\",\n"
927 " \"password\": \"REGISTRY_PASSWORD\"\n"
928 "}\n")
f6b5b4d7
TL
929 # verify login info works by attempting login on random host
930 host = None
931 for host_name in self.inventory.keys():
932 host = host_name
933 break
934 if not host:
935 raise OrchestratorError('no hosts defined')
f67539c2 936 r = CephadmServe(self)._registry_login(host, url, username, password)
f6b5b4d7
TL
937 if r is not None:
938 return 1, '', r
939 # if logins succeeded, store info
940 self.log.debug("Host logins successful. Storing login info.")
941 self.set_module_option('registry_url', url)
942 self.set_module_option('registry_username', username)
943 self.set_module_option('registry_password', password)
944 # distribute new login info to all hosts
945 self.cache.distribute_new_registry_login_info()
946 return 0, "registry login scheduled", ''
947
f67539c2 948 @orchestrator._cli_read_command('cephadm check-host')
adb31ebb 949 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2 950 """Check whether we can access and manage a remote host"""
f6b5b4d7 951 try:
f67539c2
TL
952 out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host',
953 ['--expect-hostname', host],
954 addr=addr,
955 error_ok=True, no_fsid=True)
f6b5b4d7
TL
956 if code:
957 return 1, '', ('check-host failed:\n' + '\n'.join(err))
f67539c2 958 except OrchestratorError:
f6b5b4d7 959 self.log.exception(f"check-host failed for '{host}'")
f67539c2
TL
960 return 1, '', ('check-host failed:\n'
961 + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
9f95a23c
TL
962 # if we have an outstanding health alert for this host, give the
963 # serve thread a kick
964 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
965 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
966 if item.startswith('host %s ' % host):
967 self.event.set()
adb31ebb 968 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
9f95a23c
TL
969
970 @orchestrator._cli_read_command(
f67539c2 971 'cephadm prepare-host')
adb31ebb 972 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
f67539c2
TL
973 """Prepare a remote host for use with cephadm"""
974 out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host',
975 ['--expect-hostname', host],
976 addr=addr,
977 error_ok=True, no_fsid=True)
9f95a23c
TL
978 if code:
979 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
980 # if we have an outstanding health alert for this host, give the
981 # serve thread a kick
982 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
983 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
984 if item.startswith('host %s ' % host):
985 self.event.set()
adb31ebb 986 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
9f95a23c 987
f91f0fd5 988 @orchestrator._cli_write_command(
f67539c2 989 prefix='cephadm set-extra-ceph-conf')
adb31ebb 990 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
f67539c2
TL
991 """
992 Text that is appended to all daemon's ceph.conf.
993 Mainly a workaround, till `config generate-minimal-conf` generates
994 a complete ceph.conf.
995
996 Warning: this is a dangerous operation.
997 """
f91f0fd5
TL
998 if inbuf:
999 # sanity check.
1000 cp = ConfigParser()
1001 cp.read_string(inbuf, source='<infile>')
1002
1003 self.set_store("extra_ceph_conf", json.dumps({
1004 'conf': inbuf,
adb31ebb 1005 'last_modified': datetime_to_str(datetime_now())
f91f0fd5
TL
1006 }))
1007 self.log.info('Set extra_ceph_conf')
1008 self._kick_serve_loop()
1009 return HandleCommandResult()
1010
1011 @orchestrator._cli_read_command(
f67539c2 1012 'cephadm get-extra-ceph-conf')
f91f0fd5 1013 def _get_extra_ceph_conf(self) -> HandleCommandResult:
f67539c2
TL
1014 """
1015 Get extra ceph conf that is appended
1016 """
f91f0fd5
TL
1017 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
1018
f67539c2
TL
1019 def _set_exporter_config(self, config: Dict[str, str]) -> None:
1020 self.set_store('exporter_config', json.dumps(config))
1021
1022 def _get_exporter_config(self) -> Dict[str, str]:
1023 cfg_str = self.get_store('exporter_config')
1024 return json.loads(cfg_str) if cfg_str else {}
1025
1026 def _set_exporter_option(self, option: str, value: Optional[str] = None) -> None:
1027 kv_option = f'exporter_{option}'
1028 self.set_store(kv_option, value)
1029
1030 def _get_exporter_option(self, option: str) -> Optional[str]:
1031 kv_option = f'exporter_{option}'
1032 return self.get_store(kv_option)
1033
1034 @orchestrator._cli_write_command(
1035 prefix='cephadm generate-exporter-config')
1036 @service_inactive('cephadm-exporter')
1037 def _generate_exporter_config(self) -> Tuple[int, str, str]:
1038 """
1039 Generate default SSL crt/key and token for cephadm exporter daemons
1040 """
1041 self._set_exporter_defaults()
1042 self.log.info('Default settings created for cephadm exporter(s)')
1043 return 0, "", ""
1044
1045 def _set_exporter_defaults(self) -> None:
1046 crt, key = self._generate_exporter_ssl()
1047 token = self._generate_exporter_token()
1048 self._set_exporter_config({
1049 "crt": crt,
1050 "key": key,
1051 "token": token,
1052 "port": CephadmExporterConfig.DEFAULT_PORT
1053 })
1054 self._set_exporter_option('enabled', 'true')
1055
1056 def _generate_exporter_ssl(self) -> Tuple[str, str]:
1057 return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"})
1058
1059 def _generate_exporter_token(self) -> str:
1060 return secrets.token_hex(32)
1061
1062 @orchestrator._cli_write_command(
1063 prefix='cephadm clear-exporter-config')
1064 @service_inactive('cephadm-exporter')
1065 def _clear_exporter_config(self) -> Tuple[int, str, str]:
1066 """
1067 Clear the SSL configuration used by cephadm exporter daemons
1068 """
1069 self._clear_exporter_config_settings()
1070 self.log.info('Cleared cephadm exporter configuration')
1071 return 0, "", ""
1072
1073 def _clear_exporter_config_settings(self) -> None:
1074 self.set_store('exporter_config', None)
1075 self._set_exporter_option('enabled', None)
1076
1077 @orchestrator._cli_write_command(
1078 prefix='cephadm set-exporter-config')
1079 @service_inactive('cephadm-exporter')
1080 def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1081 """
1082 Set custom cephadm-exporter configuration from a json file (-i <file>). JSON must contain crt, key, token and port
1083 """
1084 if not inbuf:
1085 return 1, "", "JSON configuration has not been provided (-i <filename>)"
1086
1087 cfg = CephadmExporterConfig(self)
1088 rc, reason = cfg.load_from_json(inbuf)
1089 if rc:
1090 return 1, "", reason
1091
1092 rc, reason = cfg.validate_config()
1093 if rc:
1094 return 1, "", reason
1095
1096 self._set_exporter_config({
1097 "crt": cfg.crt,
1098 "key": cfg.key,
1099 "token": cfg.token,
1100 "port": cfg.port
1101 })
1102 self.log.info("Loaded and verified the TLS configuration")
1103 return 0, "", ""
1104
1105 @orchestrator._cli_read_command(
1106 'cephadm get-exporter-config')
1107 def _show_exporter_config(self) -> Tuple[int, str, str]:
1108 """
1109 Show the current cephadm-exporter configuraion (JSON)'
1110 """
1111 cfg = self._get_exporter_config()
1112 return 0, json.dumps(cfg, indent=2), ""
1113
1114 @orchestrator._cli_read_command('cephadm config-check ls')
1115 def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
1116 """List the available configuration checks and their current state"""
1117
1118 if format not in [Format.plain, Format.json, Format.json_pretty]:
1119 return HandleCommandResult(
1120 retval=1,
1121 stderr="Requested format is not supported when listing configuration checks"
1122 )
1123
1124 if format in [Format.json, Format.json_pretty]:
1125 return HandleCommandResult(
1126 stdout=to_format(self.config_checker.health_checks,
1127 format,
1128 many=True,
1129 cls=None))
1130
1131 # plain formatting
1132 table = PrettyTable(
1133 ['NAME',
1134 'HEALTHCHECK',
1135 'STATUS',
1136 'DESCRIPTION'
1137 ], border=False)
1138 table.align['NAME'] = 'l'
1139 table.align['HEALTHCHECK'] = 'l'
1140 table.align['STATUS'] = 'l'
1141 table.align['DESCRIPTION'] = 'l'
1142 table.left_padding_width = 0
1143 table.right_padding_width = 2
1144 for c in self.config_checker.health_checks:
1145 table.add_row((
1146 c.name,
1147 c.healthcheck_name,
1148 c.status,
1149 c.description,
1150 ))
1151
1152 return HandleCommandResult(stdout=table.get_string())
1153
1154 @orchestrator._cli_read_command('cephadm config-check status')
1155 def _config_check_status(self) -> HandleCommandResult:
1156 """Show whether the configuration checker feature is enabled/disabled"""
1157 status = self.get_module_option('config_checks_enabled')
1158 return HandleCommandResult(stdout="Enabled" if status else "Disabled")
1159
1160 @orchestrator._cli_write_command('cephadm config-check enable')
1161 def _config_check_enable(self, check_name: str) -> HandleCommandResult:
1162 """Enable a specific configuration check"""
1163 if not self._config_check_valid(check_name):
1164 return HandleCommandResult(retval=1, stderr="Invalid check name")
1165
1166 err, msg = self._update_config_check(check_name, 'enabled')
1167 if err:
1168 return HandleCommandResult(
1169 retval=err,
1170 stderr=f"Failed to enable check '{check_name}' : {msg}")
1171
1172 return HandleCommandResult(stdout="ok")
1173
1174 @orchestrator._cli_write_command('cephadm config-check disable')
1175 def _config_check_disable(self, check_name: str) -> HandleCommandResult:
1176 """Disable a specific configuration check"""
1177 if not self._config_check_valid(check_name):
1178 return HandleCommandResult(retval=1, stderr="Invalid check name")
1179
1180 err, msg = self._update_config_check(check_name, 'disabled')
1181 if err:
1182 return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}")
1183 else:
1184 # drop any outstanding raised healthcheck for this check
1185 config_check = self.config_checker.lookup_check(check_name)
1186 if config_check:
1187 if config_check.healthcheck_name in self.health_checks:
1188 self.health_checks.pop(config_check.healthcheck_name, None)
1189 self.set_health_checks(self.health_checks)
1190 else:
1191 self.log.error(
1192 f"Unable to resolve a check name ({check_name}) to a healthcheck definition?")
1193
1194 return HandleCommandResult(stdout="ok")
1195
1196 def _config_check_valid(self, check_name: str) -> bool:
1197 return check_name in [chk.name for chk in self.config_checker.health_checks]
1198
1199 def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]:
1200 checks_raw = self.get_store('config_checks')
1201 if not checks_raw:
1202 return 1, "config_checks setting is not available"
1203
1204 checks = json.loads(checks_raw)
1205 checks.update({
1206 check_name: status
1207 })
1208 self.log.info(f"updated config check '{check_name}' : {status}")
1209 self.set_store('config_checks', json.dumps(checks))
1210 return 0, ""
1211
f91f0fd5
TL
1212 class ExtraCephConf(NamedTuple):
1213 conf: str
1214 last_modified: Optional[datetime.datetime]
1215
1216 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
1217 data = self.get_store('extra_ceph_conf')
1218 if not data:
1219 return CephadmOrchestrator.ExtraCephConf('', None)
1220 try:
1221 j = json.loads(data)
1222 except ValueError:
adb31ebb
TL
1223 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
1224 self.log.exception('%s: \'%s\'', msg, data)
f91f0fd5
TL
1225 return CephadmOrchestrator.ExtraCephConf('', None)
1226 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
1227
1228 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
1229 conf = self.extra_ceph_conf()
1230 if not conf.last_modified:
1231 return False
1232 return conf.last_modified > dt
1233
f67539c2
TL
1234 @orchestrator._cli_write_command(
1235 'cephadm osd activate'
1236 )
1237 def _osd_activate(self, host: List[str]) -> HandleCommandResult:
1238 """
1239 Start OSD containers for existing OSDs
1240 """
1241
1242 @forall_hosts
1243 def run(h: str) -> str:
1244 return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd')
1245
1246 return HandleCommandResult(stdout='\n'.join(run(host)))
1247
b3b6e05e
TL
1248 @orchestrator._cli_read_command('orch client-keyring ls')
1249 def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
522d829b
TL
1250 """
1251 List client keyrings under cephadm management
1252 """
b3b6e05e
TL
1253 if format != Format.plain:
1254 output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec)
1255 else:
1256 table = PrettyTable(
1257 ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'],
1258 border=False)
1259 table.align = 'l'
1260 table.left_padding_width = 0
1261 table.right_padding_width = 2
1262 for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity):
1263 table.add_row((
1264 ks.entity, ks.placement.pretty_str(),
1265 utils.file_mode_to_str(ks.mode),
1266 f'{ks.uid}:{ks.gid}',
1267 ks.path,
1268 ))
1269 output = table.get_string()
1270 return HandleCommandResult(stdout=output)
1271
1272 @orchestrator._cli_write_command('orch client-keyring set')
1273 def _client_keyring_set(
1274 self,
1275 entity: str,
1276 placement: str,
1277 owner: Optional[str] = None,
1278 mode: Optional[str] = None,
1279 ) -> HandleCommandResult:
522d829b
TL
1280 """
1281 Add or update client keyring under cephadm management
1282 """
b3b6e05e
TL
1283 if not entity.startswith('client.'):
1284 raise OrchestratorError('entity must start with client.')
1285 if owner:
1286 try:
1287 uid, gid = map(int, owner.split(':'))
1288 except Exception:
1289 raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"')
1290 else:
1291 uid = 0
1292 gid = 0
1293 if mode:
1294 try:
1295 imode = int(mode, 8)
1296 except Exception:
1297 raise OrchestratorError('mode must be an octal mode, e.g. "600"')
1298 else:
1299 imode = 0o600
1300 pspec = PlacementSpec.from_string(placement)
1301 ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid)
1302 self.keys.update(ks)
1303 self._kick_serve_loop()
1304 return HandleCommandResult()
1305
1306 @orchestrator._cli_write_command('orch client-keyring rm')
1307 def _client_keyring_rm(
1308 self,
1309 entity: str,
1310 ) -> HandleCommandResult:
522d829b
TL
1311 """
1312 Remove client keyring from cephadm management
1313 """
b3b6e05e
TL
1314 self.keys.rm(entity)
1315 self._kick_serve_loop()
1316 return HandleCommandResult()
1317
adb31ebb
TL
1318 def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
1319 'remoto.backends.LegacyModuleExecute']:
9f95a23c
TL
1320 """
1321 Setup a connection for running commands on remote host.
1322 """
e306af50
TL
1323 conn, r = self._cons.get(host, (None, None))
1324 if conn:
1325 if conn.has_connection():
1326 self.log.debug('Have connection to %s' % host)
1327 return conn, r
1328 else:
1329 self._reset_con(host)
f67539c2 1330 assert self.ssh_user
9f95a23c
TL
1331 n = self.ssh_user + '@' + host
1332 self.log.debug("Opening connection to {} with ssh options '{}'".format(
1333 n, self._ssh_options))
f91f0fd5 1334 child_logger = self.log.getChild(n)
9f95a23c
TL
1335 child_logger.setLevel('WARNING')
1336 conn = remoto.Connection(
1337 n,
1338 logger=child_logger,
f6b5b4d7
TL
1339 ssh_options=self._ssh_options,
1340 sudo=True if self.ssh_user != 'root' else False)
9f95a23c
TL
1341
1342 r = conn.import_module(remotes)
1343 self._cons[host] = conn, r
1344
1345 return conn, r
1346
adb31ebb 1347 def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
9f95a23c
TL
1348 """
1349 Remote validator that accepts a connection object to ensure that a certain
1350 executable is available returning its full path if so.
1351
1352 Otherwise an exception with thorough details will be raised, informing the
1353 user that the executable was not found.
1354 """
1355 executable_path = conn.remote_module.which(executable)
1356 if not executable_path:
1357 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1358 executable, conn.hostname))
1359 self.log.debug("Found executable '{}' at path '{}'".format(executable,
f91f0fd5 1360 executable_path))
9f95a23c
TL
1361 return executable_path
1362
f91f0fd5 1363 def _get_container_image(self, daemon_name: str) -> Optional[str]:
f6b5b4d7 1364 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
f67539c2 1365 image: Optional[str] = None
a4b75251 1366 if daemon_type in CEPH_IMAGE_TYPES:
f6b5b4d7 1367 # get container image
f67539c2
TL
1368 image = str(self.get_foreign_ceph_option(
1369 utils.name_to_config_section(daemon_name),
1370 'container_image'
1371 )).strip()
f6b5b4d7
TL
1372 elif daemon_type == 'prometheus':
1373 image = self.container_image_prometheus
1374 elif daemon_type == 'grafana':
1375 image = self.container_image_grafana
1376 elif daemon_type == 'alertmanager':
1377 image = self.container_image_alertmanager
1378 elif daemon_type == 'node-exporter':
1379 image = self.container_image_node_exporter
f67539c2
TL
1380 elif daemon_type == 'haproxy':
1381 image = self.container_image_haproxy
1382 elif daemon_type == 'keepalived':
1383 image = self.container_image_keepalived
f91f0fd5
TL
1384 elif daemon_type == CustomContainerService.TYPE:
1385 # The image can't be resolved, the necessary information
1386 # is only available when a container is deployed (given
1387 # via spec).
1388 image = None
f6b5b4d7
TL
1389 else:
1390 assert False, daemon_type
1391
1392 self.log.debug('%s container image %s' % (daemon_name, image))
1393
1394 return image
1395
b3b6e05e 1396 def _schedulable_hosts(self) -> List[HostSpec]:
f91f0fd5 1397 """
f67539c2 1398 Returns all usable hosts that went through _refresh_host_daemons().
9f95a23c 1399
f91f0fd5
TL
1400 This mitigates a potential race, where new host was added *after*
1401 ``_refresh_host_daemons()`` was called, but *before*
1402 ``_apply_all_specs()`` was called. thus we end up with a hosts
1403 where daemons might be running, but we have not yet detected them.
1404 """
1405 return [
1406 h for h in self.inventory.all_specs()
b3b6e05e
TL
1407 if (
1408 self.cache.host_had_daemon_refresh(h.hostname)
b3b6e05e
TL
1409 and '_no_schedule' not in h.labels
1410 )
f91f0fd5 1411 ]
9f95a23c 1412
522d829b
TL
1413 def _unreachable_hosts(self) -> List[HostSpec]:
1414 """
1415 Return all hosts that are offline or in maintenance mode.
1416
1417 The idea is we should not touch the daemons on these hosts (since
1418 in theory the hosts are inaccessible so we CAN'T touch them) but
1419 we still want to count daemons that exist on these hosts toward the
1420 placement so daemons on these hosts aren't just moved elsewhere
1421 """
1422 return [
1423 h for h in self.inventory.all_specs()
1424 if (
1425 h.status.lower() in ['maintenance', 'offline']
1426 or h.hostname in self.offline_hosts
1427 )
1428 ]
1429
b3b6e05e 1430 def _check_valid_addr(self, host: str, addr: str) -> str:
a4b75251
TL
1431 # make sure mgr is not resolving own ip
1432 if addr in self.get_mgr_id():
1433 raise OrchestratorError(
1434 "Can not automatically resolve ip address of host where active mgr is running. Please explicitly provide the address.")
1435
b3b6e05e
TL
1436 # make sure hostname is resolvable before trying to make a connection
1437 try:
1438 ip_addr = utils.resolve_ip(addr)
1439 except OrchestratorError as e:
1440 msg = str(e) + f'''
1441You may need to supply an address for {addr}
1442
1443Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1444To add the cephadm SSH key to the host:
1445> ceph cephadm get-pub-key > ~/ceph.pub
1446> ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr}
1447
1448To check that the host is reachable open a new shell with the --no-hosts flag:
1449> cephadm shell --no-hosts
1450
1451Then run the following:
1452> ceph cephadm get-ssh-config > ssh_config
1453> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1454> chmod 0600 ~/cephadm_private_key
1455> ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}'''
1456 raise OrchestratorError(msg)
1457
1458 out, err, code = CephadmServe(self)._run_cephadm(
1459 host, cephadmNoImage, 'check-host',
1460 ['--expect-hostname', host],
1461 addr=addr,
1462 error_ok=True, no_fsid=True)
1463 if code:
a4b75251 1464 msg = 'check-host failed:\n' + '\n'.join(err)
b3b6e05e
TL
1465 # err will contain stdout and stderr, so we filter on the message text to
1466 # only show the errors
1467 errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')]
a4b75251
TL
1468 if errors:
1469 msg = f'Host {host} ({addr}) failed check(s): {errors}'
1470 raise OrchestratorError(msg)
b3b6e05e
TL
1471 return ip_addr
1472
e306af50 1473 def _add_host(self, spec):
9f95a23c
TL
1474 # type: (HostSpec) -> str
1475 """
1476 Add a host to be managed by the orchestrator.
1477
1478 :param host: host name
1479 """
1480 assert_valid_host(spec.hostname)
b3b6e05e
TL
1481 ip_addr = self._check_valid_addr(spec.hostname, spec.addr)
1482 if spec.addr == spec.hostname and ip_addr:
1483 spec.addr = ip_addr
9f95a23c 1484
a4b75251
TL
1485 if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr:
1486 self.cache.refresh_all_host_info(spec.hostname)
1487
b3b6e05e
TL
1488 # prime crush map?
1489 if spec.location:
1490 self.check_mon_command({
1491 'prefix': 'osd crush add-bucket',
1492 'name': spec.hostname,
1493 'type': 'host',
1494 'args': [f'{k}={v}' for k, v in spec.location.items()],
1495 })
1496
1497 if spec.hostname not in self.inventory:
1498 self.cache.prime_empty_host(spec.hostname)
e306af50 1499 self.inventory.add_host(spec)
1911f103 1500 self.offline_hosts_remove(spec.hostname)
a4b75251
TL
1501 if spec.status == 'maintenance':
1502 self._set_maintenance_healthcheck()
9f95a23c
TL
1503 self.event.set() # refresh stray health check
1504 self.log.info('Added host %s' % spec.hostname)
b3b6e05e 1505 return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
9f95a23c 1506
f67539c2 1507 @handle_orch_error
e306af50
TL
1508 def add_host(self, spec: HostSpec) -> str:
1509 return self._add_host(spec)
1510
f67539c2 1511 @handle_orch_error
522d829b 1512 def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str:
9f95a23c
TL
1513 """
1514 Remove a host from orchestrator management.
1515
1516 :param host: host name
522d829b
TL
1517 :param force: bypass running daemons check
1518 :param offline: remove offline host
9f95a23c 1519 """
522d829b
TL
1520
1521 # check if host is offline
1522 host_offline = host in self.offline_hosts
1523
1524 if host_offline and not offline:
1525 return "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host)
1526
1527 if not host_offline and offline:
1528 return "{} is online, please remove host without --offline.".format(host)
1529
1530 if offline and not force:
1531 return "Removing an offline host requires --force"
1532
1533 # check if there are daemons on the host
1534 if not force:
1535 daemons = self.cache.get_daemons_by_host(host)
1536 if daemons:
1537 self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}")
1538
1539 daemons_table = ""
1540 daemons_table += "{:<20} {:<15}\n".format("type", "id")
1541 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
1542 for d in daemons:
1543 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
1544
1545 return "Not allowed to remove %s from cluster. " \
1546 "The following daemons are running in the host:" \
1547 "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % (
1548 host, daemons_table, host)
1549
1550 def run_cmd(cmd_args: dict) -> None:
1551 ret, out, err = self.mon_command(cmd_args)
1552 if ret != 0:
1553 self.log.debug(f"ran {cmd_args} with mon_command")
1554 self.log.error(
1555 f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
1556 self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
1557
1558 if offline:
1559 daemons = self.cache.get_daemons_by_host(host)
1560 for d in daemons:
1561 self.log.info(f"removing: {d.name()}")
1562
1563 if d.daemon_type != 'osd':
1564 self.cephadm_services[str(d.daemon_type)].pre_remove(d)
a4b75251 1565 self.cephadm_services[str(d.daemon_type)].post_remove(d, is_failed_deploy=False)
522d829b
TL
1566 else:
1567 cmd_args = {
1568 'prefix': 'osd purge-actual',
1569 'id': int(str(d.daemon_id)),
1570 'yes_i_really_mean_it': True
1571 }
1572 run_cmd(cmd_args)
1573
1574 cmd_args = {
1575 'prefix': 'osd crush rm',
1576 'name': host
1577 }
1578 run_cmd(cmd_args)
1579
e306af50 1580 self.inventory.rm_host(host)
9f95a23c
TL
1581 self.cache.rm_host(host)
1582 self._reset_con(host)
1583 self.event.set() # refresh stray health check
1584 self.log.info('Removed host %s' % host)
522d829b 1585 return "Removed {} host '{}'".format('offline' if offline else '', host)
9f95a23c 1586
f67539c2 1587 @handle_orch_error
adb31ebb 1588 def update_host_addr(self, host: str, addr: str) -> str:
b3b6e05e 1589 self._check_valid_addr(host, addr)
e306af50 1590 self.inventory.set_addr(host, addr)
9f95a23c
TL
1591 self._reset_con(host)
1592 self.event.set() # refresh stray health check
1593 self.log.info('Set host %s addr to %s' % (host, addr))
1594 return "Updated host '{}' addr to '{}'".format(host, addr)
1595
f67539c2 1596 @handle_orch_error
9f95a23c
TL
1597 def get_hosts(self):
1598 # type: () -> List[orchestrator.HostSpec]
1599 """
1600 Return a list of hosts managed by the orchestrator.
1601
1602 Notes:
1603 - skip async: manager reads from cache.
1604 """
e306af50 1605 return list(self.inventory.all_specs())
9f95a23c 1606
a4b75251
TL
1607 @handle_orch_error
1608 def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]:
1609 """
1610 Return a list of hosts metadata(gather_facts) managed by the orchestrator.
1611
1612 Notes:
1613 - skip async: manager reads from cache.
1614 """
1615 if hostname:
1616 return [self.cache.get_facts(hostname)]
1617
1618 return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()]
1619
f67539c2 1620 @handle_orch_error
adb31ebb 1621 def add_host_label(self, host: str, label: str) -> str:
e306af50 1622 self.inventory.add_label(host, label)
9f95a23c 1623 self.log.info('Added label %s to host %s' % (label, host))
b3b6e05e 1624 self._kick_serve_loop()
9f95a23c
TL
1625 return 'Added label %s to host %s' % (label, host)
1626
f67539c2 1627 @handle_orch_error
adb31ebb 1628 def remove_host_label(self, host: str, label: str) -> str:
e306af50 1629 self.inventory.rm_label(host, label)
9f95a23c 1630 self.log.info('Removed label %s to host %s' % (label, host))
b3b6e05e 1631 self._kick_serve_loop()
9f95a23c
TL
1632 return 'Removed label %s from host %s' % (label, host)
1633
f67539c2
TL
1634 def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]:
1635 self.log.debug("running host-ok-to-stop checks")
f6b5b4d7 1636 daemons = self.cache.get_daemons()
f67539c2 1637 daemon_map: Dict[str, List[str]] = defaultdict(lambda: [])
f6b5b4d7 1638 for dd in daemons:
f67539c2
TL
1639 assert dd.hostname is not None
1640 assert dd.daemon_type is not None
1641 assert dd.daemon_id is not None
f6b5b4d7
TL
1642 if dd.hostname == hostname:
1643 daemon_map[dd.daemon_type].append(dd.daemon_id)
1644
f67539c2
TL
1645 notifications: List[str] = []
1646 error_notifications: List[str] = []
1647 okay: bool = True
f91f0fd5 1648 for daemon_type, daemon_ids in daemon_map.items():
f67539c2
TL
1649 r = self.cephadm_services[daemon_type_to_service(
1650 daemon_type)].ok_to_stop(daemon_ids, force=force)
f6b5b4d7 1651 if r.retval:
f67539c2
TL
1652 okay = False
1653 # collect error notifications so user can see every daemon causing host
1654 # to not be okay to stop
1655 error_notifications.append(r.stderr)
1656 if r.stdout:
1657 # if extra notifications to print for user, add them to notifications list
1658 notifications.append(r.stdout)
1659
1660 if not okay:
1661 # at least one daemon is not okay to stop
1662 return 1, '\n'.join(error_notifications)
1663
1664 if notifications:
1665 return 0, (f'It is presumed safe to stop host {hostname}. '
1666 + 'Note the following:\n\n' + '\n'.join(notifications))
1667 return 0, f'It is presumed safe to stop host {hostname}'
1668
1669 @handle_orch_error
1670 def host_ok_to_stop(self, hostname: str) -> str:
1671 if hostname not in self.cache.get_hosts():
1672 raise OrchestratorError(f'Cannot find host "{hostname}"')
1673
1674 rc, msg = self._host_ok_to_stop(hostname)
1675 if rc:
1676 raise OrchestratorError(msg, errno=rc)
f6b5b4d7 1677
f6b5b4d7
TL
1678 self.log.info(msg)
1679 return msg
1680
f67539c2
TL
1681 def _set_maintenance_healthcheck(self) -> None:
1682 """Raise/update or clear the maintenance health check as needed"""
1683
1684 in_maintenance = self.inventory.get_host_with_state("maintenance")
1685 if not in_maintenance:
a4b75251 1686 self.remove_health_warning('HOST_IN_MAINTENANCE')
f67539c2
TL
1687 else:
1688 s = "host is" if len(in_maintenance) == 1 else "hosts are"
a4b75251
TL
1689 self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [
1690 f"{h} is in maintenance" for h in in_maintenance])
f67539c2
TL
1691
1692 @handle_orch_error
1693 @host_exists()
1694 def enter_host_maintenance(self, hostname: str, force: bool = False) -> str:
1695 """ Attempt to place a cluster host in maintenance
1696
1697 Placing a host into maintenance disables the cluster's ceph target in systemd
1698 and stops all ceph daemons. If the host is an osd host we apply the noout flag
1699 for the host subtree in crush to prevent data movement during a host maintenance
1700 window.
1701
1702 :param hostname: (str) name of the host (must match an inventory hostname)
1703
1704 :raises OrchestratorError: Hostname is invalid, host is already in maintenance
1705 """
1706 if len(self.cache.get_hosts()) == 1:
1707 raise OrchestratorError("Maintenance feature is not supported on single node clusters")
1708
1709 # if upgrade is active, deny
1710 if self.upgrade.upgrade_state:
1711 raise OrchestratorError(
1712 f"Unable to place {hostname} in maintenance with upgrade active/paused")
1713
1714 tgt_host = self.inventory._inventory[hostname]
1715 if tgt_host.get("status", "").lower() == "maintenance":
1716 raise OrchestratorError(f"Host {hostname} is already in maintenance")
1717
1718 host_daemons = self.cache.get_daemon_types(hostname)
1719 self.log.debug("daemons on host {}".format(','.join(host_daemons)))
1720 if host_daemons:
1721 # daemons on this host, so check the daemons can be stopped
1722 # and if so, place the host into maintenance by disabling the target
1723 rc, msg = self._host_ok_to_stop(hostname, force)
1724 if rc:
1725 raise OrchestratorError(
1726 msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc)
1727
1728 # call the host-maintenance function
1729 _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance",
1730 ["enter"],
1731 error_ok=True)
a4b75251
TL
1732 returned_msg = _err[0].split('\n')[-1]
1733 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
f67539c2
TL
1734 raise OrchestratorError(
1735 f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}")
1736
1737 if "osd" in host_daemons:
1738 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1739 rc, out, err = self.mon_command({
1740 'prefix': 'osd set-group',
1741 'flags': 'noout',
1742 'who': [crush_node],
1743 'format': 'json'
1744 })
1745 if rc:
1746 self.log.warning(
1747 f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})")
1748 raise OrchestratorError(
1749 f"Unable to set the osds on {hostname} to noout (rc={rc})")
1750 else:
1751 self.log.info(
1752 f"maintenance mode request for {hostname} has SET the noout group")
1753
1754 # update the host status in the inventory
1755 tgt_host["status"] = "maintenance"
1756 self.inventory._inventory[hostname] = tgt_host
1757 self.inventory.save()
1758
1759 self._set_maintenance_healthcheck()
522d829b 1760 return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
f67539c2
TL
1761
1762 @handle_orch_error
1763 @host_exists()
1764 def exit_host_maintenance(self, hostname: str) -> str:
1765 """Exit maintenance mode and return a host to an operational state
1766
1767 Returning from maintnenance will enable the clusters systemd target and
1768 start it, and remove any noout that has been added for the host if the
1769 host has osd daemons
1770
1771 :param hostname: (str) host name
1772
1773 :raises OrchestratorError: Unable to return from maintenance, or unset the
1774 noout flag
1775 """
1776 tgt_host = self.inventory._inventory[hostname]
1777 if tgt_host['status'] != "maintenance":
1778 raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
1779
1780 outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance',
1781 ['exit'],
1782 error_ok=True)
a4b75251
TL
1783 returned_msg = errs[0].split('\n')[-1]
1784 if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
f67539c2
TL
1785 raise OrchestratorError(
1786 f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
1787
1788 if "osd" in self.cache.get_daemon_types(hostname):
1789 crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
1790 rc, _out, _err = self.mon_command({
1791 'prefix': 'osd unset-group',
1792 'flags': 'noout',
1793 'who': [crush_node],
1794 'format': 'json'
1795 })
1796 if rc:
1797 self.log.warning(
1798 f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
1799 raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
1800 else:
1801 self.log.info(
1802 f"exit maintenance request has UNSET for the noout group on host {hostname}")
1803
1804 # update the host record status
1805 tgt_host['status'] = ""
1806 self.inventory._inventory[hostname] = tgt_host
1807 self.inventory.save()
1808
1809 self._set_maintenance_healthcheck()
1810
1811 return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
1812
f91f0fd5
TL
1813 def get_minimal_ceph_conf(self) -> str:
1814 _, config, _ = self.check_mon_command({
f6b5b4d7
TL
1815 "prefix": "config generate-minimal-conf",
1816 })
f91f0fd5
TL
1817 extra = self.extra_ceph_conf().conf
1818 if extra:
1819 config += '\n\n' + extra.strip() + '\n'
1820 return config
f6b5b4d7 1821
adb31ebb 1822 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
f6b5b4d7
TL
1823 if filter_host:
1824 self.cache.invalidate_host_daemons(filter_host)
1825 else:
1826 for h in self.cache.get_hosts():
1827 # Also discover daemons deployed manually
1828 self.cache.invalidate_host_daemons(h)
1829
1830 self._kick_serve_loop()
1831
f67539c2 1832 @handle_orch_error
f6b5b4d7
TL
1833 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1834 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
9f95a23c 1835 if refresh:
f6b5b4d7 1836 self._invalidate_daemons_and_kick_serve()
b3b6e05e 1837 self.log.debug('Kicked serve() loop to refresh all services')
f6b5b4d7 1838
f6b5b4d7 1839 sm: Dict[str, orchestrator.ServiceDescription] = {}
f67539c2
TL
1840
1841 # known services
1842 for nm, spec in self.spec_store.all_specs.items():
1843 if service_type is not None and service_type != spec.service_type:
1844 continue
1845 if service_name is not None and service_name != nm:
1846 continue
1847 sm[nm] = orchestrator.ServiceDescription(
1848 spec=spec,
b3b6e05e 1849 size=spec.placement.get_target_count(self._schedulable_hosts()),
f67539c2
TL
1850 running=0,
1851 events=self.events.get_for_service(spec.service_name()),
1852 created=self.spec_store.spec_created[nm],
1853 deleted=self.spec_store.spec_deleted.get(nm, None),
1854 virtual_ip=spec.get_virtual_ip(),
1855 ports=spec.get_port_start(),
1856 )
f67539c2
TL
1857 if spec.service_type == 'ingress':
1858 # ingress has 2 daemons running per host
1859 sm[nm].size *= 2
1860
1861 # factor daemons into status
1911f103 1862 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c 1863 for name, dd in dm.items():
f67539c2
TL
1864 assert dd.hostname is not None, f'no hostname for {dd!r}'
1865 assert dd.daemon_type is not None, f'no daemon_type for {dd!r}'
1866
9f95a23c 1867 n: str = dd.service_name()
f67539c2
TL
1868
1869 if (
1870 service_type
b3b6e05e 1871 and service_type != daemon_type_to_service(dd.daemon_type)
f67539c2
TL
1872 ):
1873 continue
9f95a23c
TL
1874 if service_name and service_name != n:
1875 continue
f67539c2
TL
1876
1877 if n not in sm:
1878 # new unmanaged service
1911f103
TL
1879 spec = ServiceSpec(
1880 unmanaged=True,
f67539c2 1881 service_type=daemon_type_to_service(dd.daemon_type),
1911f103 1882 service_id=dd.service_id(),
1911f103 1883 )
9f95a23c 1884 sm[n] = orchestrator.ServiceDescription(
9f95a23c
TL
1885 last_refresh=dd.last_refresh,
1886 container_image_id=dd.container_image_id,
1887 container_image_name=dd.container_image_name,
1888 spec=spec,
f67539c2 1889 size=0,
9f95a23c 1890 )
f67539c2
TL
1891
1892 if dd.status == DaemonDescriptionStatus.running:
9f95a23c 1893 sm[n].running += 1
f67539c2
TL
1894 if dd.daemon_type == 'osd':
1895 # The osd count can't be determined by the Placement spec.
1896 # Showing an actual/expected representation cannot be determined
1897 # here. So we're setting running = size for now.
1898 sm[n].size += 1
1899 if (
1900 not sm[n].last_refresh
1901 or not dd.last_refresh
1902 or dd.last_refresh < sm[n].last_refresh # type: ignore
1903 ):
9f95a23c 1904 sm[n].last_refresh = dd.last_refresh
f67539c2 1905
1911f103 1906 return list(sm.values())
9f95a23c 1907
f67539c2 1908 @handle_orch_error
f6b5b4d7
TL
1909 def list_daemons(self,
1910 service_name: Optional[str] = None,
1911 daemon_type: Optional[str] = None,
1912 daemon_id: Optional[str] = None,
1913 host: Optional[str] = None,
1914 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
9f95a23c 1915 if refresh:
f6b5b4d7 1916 self._invalidate_daemons_and_kick_serve(host)
b3b6e05e 1917 self.log.debug('Kicked serve() loop to refresh all daemons')
f6b5b4d7 1918
9f95a23c 1919 result = []
1911f103 1920 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c
TL
1921 if host and h != host:
1922 continue
1923 for name, dd in dm.items():
801d1391
TL
1924 if daemon_type is not None and daemon_type != dd.daemon_type:
1925 continue
1926 if daemon_id is not None and daemon_id != dd.daemon_id:
9f95a23c 1927 continue
801d1391 1928 if service_name is not None and service_name != dd.service_name():
9f95a23c 1929 continue
b3b6e05e
TL
1930 if not dd.memory_request and dd.daemon_type in ['osd', 'mon']:
1931 dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option(
1932 dd.name(),
1933 f"{dd.daemon_type}_memory_target"
1934 ))
9f95a23c
TL
1935 result.append(dd)
1936 return result
1937
f67539c2 1938 @handle_orch_error
adb31ebb
TL
1939 def service_action(self, action: str, service_name: str) -> List[str]:
1940 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
f67539c2
TL
1941 if not dds:
1942 raise OrchestratorError(f'No daemons exist under service name "{service_name}".'
1943 + ' View currently running services using "ceph orch ls"')
522d829b
TL
1944 if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']:
1945 return [f'Stopping entire {service_name} service is prohibited.']
9f95a23c 1946 self.log.info('%s service %s' % (action.capitalize(), service_name))
adb31ebb
TL
1947 return [
1948 self._schedule_daemon_action(dd.name(), action)
1949 for dd in dds
1950 ]
f6b5b4d7 1951
f67539c2
TL
1952 def _daemon_action(self,
1953 daemon_spec: CephadmDaemonDeploySpec,
1954 action: str,
1955 image: Optional[str] = None) -> str:
1956 self._daemon_action_set_image(action, image, daemon_spec.daemon_type,
1957 daemon_spec.daemon_id)
f6b5b4d7 1958
b3b6e05e
TL
1959 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type,
1960 daemon_spec.daemon_id):
f67539c2
TL
1961 self.mgr_service.fail_over()
1962 return '' # unreachable
9f95a23c 1963
f67539c2
TL
1964 if action == 'redeploy' or action == 'reconfig':
1965 if daemon_spec.daemon_type != 'osd':
1966 daemon_spec = self.cephadm_services[daemon_type_to_service(
1967 daemon_spec.daemon_type)].prepare_create(daemon_spec)
1968 return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig'))
9f95a23c
TL
1969
1970 actions = {
1971 'start': ['reset-failed', 'start'],
1972 'stop': ['stop'],
1973 'restart': ['reset-failed', 'restart'],
1974 }
f6b5b4d7 1975 name = daemon_spec.name()
9f95a23c 1976 for a in actions[action]:
f6b5b4d7 1977 try:
f67539c2
TL
1978 out, err, code = CephadmServe(self)._run_cephadm(
1979 daemon_spec.host, name, 'unit',
f6b5b4d7
TL
1980 ['--name', name, a])
1981 except Exception:
f67539c2 1982 self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed')
f6b5b4d7
TL
1983 self.cache.invalidate_host_daemons(daemon_spec.host)
1984 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1985 self.events.for_daemon(name, 'INFO', msg)
1986 return msg
9f95a23c 1987
adb31ebb 1988 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
f91f0fd5
TL
1989 if image is not None:
1990 if action != 'redeploy':
1991 raise OrchestratorError(
1992 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
a4b75251 1993 if daemon_type not in CEPH_IMAGE_TYPES:
f91f0fd5
TL
1994 raise OrchestratorError(
1995 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
a4b75251 1996 f'types are: {", ".join(CEPH_IMAGE_TYPES)}')
f91f0fd5
TL
1997
1998 self.check_mon_command({
1999 'prefix': 'config set',
2000 'name': 'container_image',
2001 'value': image,
2002 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
2003 })
2004
f67539c2 2005 @handle_orch_error
f91f0fd5 2006 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
f6b5b4d7 2007 d = self.cache.get_daemon(daemon_name)
f67539c2
TL
2008 assert d.daemon_type is not None
2009 assert d.daemon_id is not None
f6b5b4d7 2010
b3b6e05e 2011 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \
f91f0fd5
TL
2012 and not self.mgr_service.mgr_map_has_standby():
2013 raise OrchestratorError(
2014 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2015
2016 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
2017
2018 self.log.info(f'Schedule {action} daemon {daemon_name}')
2019 return self._schedule_daemon_action(daemon_name, action)
2020
2021 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
2022 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
2023
f67539c2
TL
2024 def get_active_mgr_digests(self) -> List[str]:
2025 digests = self.mgr_service.get_active_daemon(
2026 self.cache.get_daemons_by_type('mgr')).container_image_digests
2027 return digests if digests else []
2028
adb31ebb 2029 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
f91f0fd5 2030 dd = self.cache.get_daemon(daemon_name)
f67539c2
TL
2031 assert dd.daemon_type is not None
2032 assert dd.daemon_id is not None
2033 assert dd.hostname is not None
b3b6e05e 2034 if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
f91f0fd5
TL
2035 and not self.mgr_service.mgr_map_has_standby():
2036 raise OrchestratorError(
2037 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
2038 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
2039 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
2040 self._kick_serve_loop()
2041 return msg
9f95a23c 2042
f67539c2 2043 @handle_orch_error
9f95a23c 2044 def remove_daemons(self, names):
e306af50 2045 # type: (List[str]) -> List[str]
9f95a23c
TL
2046 args = []
2047 for host, dm in self.cache.daemons.items():
2048 for name in names:
2049 if name in dm:
2050 args.append((name, host))
2051 if not args:
2052 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
f67539c2 2053 self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args]))
9f95a23c
TL
2054 return self._remove_daemons(args)
2055
f67539c2 2056 @handle_orch_error
a4b75251 2057 def remove_service(self, service_name: str, force: bool = False) -> str:
9f95a23c 2058 self.log.info('Remove service %s' % service_name)
e306af50 2059 self._trigger_preview_refresh(service_name=service_name)
f67539c2
TL
2060 if service_name in self.spec_store:
2061 if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'):
2062 return f'Unable to remove {service_name} service.\n' \
2063 f'Note, you might want to mark the {service_name} service as "unmanaged"'
b3b6e05e 2064
a4b75251
TL
2065 # Report list of affected OSDs?
2066 if not force and service_name.startswith('osd.'):
2067 osds_msg = {}
b3b6e05e
TL
2068 for h, dm in self.cache.get_daemons_with_volatile_status():
2069 osds_to_remove = []
2070 for name, dd in dm.items():
2071 if dd.daemon_type == 'osd' and (dd.service_name() == service_name or not dd.osdspec_affinity):
2072 osds_to_remove.append(str(dd.daemon_id))
2073 if osds_to_remove:
2074 osds_msg[h] = osds_to_remove
b3b6e05e 2075 if osds_msg:
a4b75251
TL
2076 msg = ''
2077 for h, ls in osds_msg.items():
2078 msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}'
2079 raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}')
2080
2081 found = self.spec_store.rm(service_name)
2082 if found and service_name.startswith('osd.'):
2083 self.spec_store.finally_rm(service_name)
2084 self._kick_serve_loop()
2085 return f'Removed service {service_name}'
9f95a23c 2086
f67539c2 2087 @handle_orch_error
adb31ebb 2088 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
9f95a23c
TL
2089 """
2090 Return the storage inventory of hosts matching the given filter.
2091
2092 :param host_filter: host filter
2093
2094 TODO:
2095 - add filtering by label
2096 """
2097 if refresh:
f6b5b4d7
TL
2098 if host_filter and host_filter.hosts:
2099 for h in host_filter.hosts:
b3b6e05e 2100 self.log.debug(f'will refresh {h} devs')
f6b5b4d7 2101 self.cache.invalidate_host_devices(h)
9f95a23c 2102 else:
f6b5b4d7 2103 for h in self.cache.get_hosts():
b3b6e05e 2104 self.log.debug(f'will refresh {h} devs')
f6b5b4d7
TL
2105 self.cache.invalidate_host_devices(h)
2106
2107 self.event.set()
b3b6e05e 2108 self.log.debug('Kicked serve() loop to refresh devices')
9f95a23c
TL
2109
2110 result = []
2111 for host, dls in self.cache.devices.items():
f6b5b4d7 2112 if host_filter and host_filter.hosts and host not in host_filter.hosts:
9f95a23c
TL
2113 continue
2114 result.append(orchestrator.InventoryHost(host,
2115 inventory.Devices(dls)))
2116 return result
2117
f67539c2 2118 @handle_orch_error
adb31ebb 2119 def zap_device(self, host: str, path: str) -> str:
a4b75251
TL
2120 """Zap a device on a managed host.
2121
2122 Use ceph-volume zap to return a device to an unused/free state
2123
2124 Args:
2125 host (str): hostname of the cluster host
2126 path (str): device path
2127
2128 Raises:
2129 OrchestratorError: host is not a cluster host
2130 OrchestratorError: host is in maintenance and therefore unavailable
2131 OrchestratorError: device path not found on the host
2132 OrchestratorError: device is known to a different ceph cluster
2133 OrchestratorError: device holds active osd
2134 OrchestratorError: device cache hasn't been populated yet..
2135
2136 Returns:
2137 str: output from the zap command
2138 """
2139
9f95a23c 2140 self.log.info('Zap device %s:%s' % (host, path))
a4b75251
TL
2141
2142 if host not in self.inventory.keys():
2143 raise OrchestratorError(
2144 f"Host '{host}' is not a member of the cluster")
2145
2146 host_info = self.inventory._inventory.get(host, {})
2147 if host_info.get('status', '').lower() == 'maintenance':
2148 raise OrchestratorError(
2149 f"Host '{host}' is in maintenance mode, which prevents any actions against it.")
2150
2151 if host not in self.cache.devices:
2152 raise OrchestratorError(
2153 f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.")
2154
2155 host_devices = self.cache.devices[host]
2156 path_found = False
2157 osd_id_list: List[str] = []
2158
2159 for dev in host_devices:
2160 if dev.path == path:
2161 # match, so look a little deeper
2162 if dev.lvs:
2163 for lv in cast(List[Dict[str, str]], dev.lvs):
2164 if lv.get('osd_id', ''):
2165 lv_fsid = lv.get('cluster_fsid')
2166 if lv_fsid != self._cluster_fsid:
2167 raise OrchestratorError(
2168 f"device {path} has lv's from a different Ceph cluster ({lv_fsid})")
2169 osd_id_list.append(lv.get('osd_id', ''))
2170 path_found = True
2171 break
2172 if not path_found:
2173 raise OrchestratorError(
2174 f"Device path '{path}' not found on host '{host}'")
2175
2176 if osd_id_list:
2177 dev_name = os.path.basename(path)
2178 active_osds: List[str] = []
2179 for osd_id in osd_id_list:
2180 metadata = self.get_metadata('osd', str(osd_id))
2181 if metadata:
2182 if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','):
2183 active_osds.append("osd." + osd_id)
2184 if active_osds:
2185 raise OrchestratorError(
2186 f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active "
2187 f"OSD{'s' if len(active_osds) > 1 else ''}"
2188 f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.")
2189
f67539c2 2190 out, err, code = CephadmServe(self)._run_cephadm(
9f95a23c
TL
2191 host, 'osd', 'ceph-volume',
2192 ['--', 'lvm', 'zap', '--destroy', path],
2193 error_ok=True)
a4b75251 2194
9f95a23c
TL
2195 self.cache.invalidate_host_devices(host)
2196 if code:
2197 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
a4b75251
TL
2198 msg = f'zap successful for {path} on {host}'
2199 self.log.info(msg)
2200
2201 return msg + '\n'
9f95a23c 2202
f67539c2 2203 @handle_orch_error
f91f0fd5
TL
2204 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
2205 """
2206 Blink a device light. Calling something like::
2207
2208 lsmcli local-disk-ident-led-on --path $path
2209
2210 If you must, you can customize this via::
2211
2212 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
2213 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
2214
2215 See templates/blink_device_light_cmd.j2
2216 """
e306af50 2217 @forall_hosts
adb31ebb 2218 def blink(host: str, dev: str, path: str) -> str:
f91f0fd5
TL
2219 cmd_line = self.template.render('blink_device_light_cmd.j2',
2220 {
2221 'on': on,
2222 'ident_fault': ident_fault,
2223 'dev': dev,
2224 'path': path
2225 },
2226 host=host)
2227 cmd_args = shlex.split(cmd_line)
2228
f67539c2 2229 out, err, code = CephadmServe(self)._run_cephadm(
f91f0fd5 2230 host, 'osd', 'shell', ['--'] + cmd_args,
9f95a23c
TL
2231 error_ok=True)
2232 if code:
f6b5b4d7 2233 raise OrchestratorError(
9f95a23c 2234 'Unable to affect %s light for %s:%s. Command: %s' % (
f91f0fd5 2235 ident_fault, host, dev, ' '.join(cmd_args)))
9f95a23c
TL
2236 self.log.info('Set %s light for %s:%s %s' % (
2237 ident_fault, host, dev, 'on' if on else 'off'))
2238 return "Set %s light for %s:%s %s" % (
2239 ident_fault, host, dev, 'on' if on else 'off')
2240
2241 return blink(locs)
2242
2243 def get_osd_uuid_map(self, only_up=False):
1911f103 2244 # type: (bool) -> Dict[str, str]
9f95a23c
TL
2245 osd_map = self.get('osd_map')
2246 r = {}
2247 for o in osd_map['osds']:
2248 # only include OSDs that have ever started in this map. this way
2249 # an interrupted osd create can be repeated and succeed the second
2250 # time around.
1911f103
TL
2251 osd_id = o.get('osd')
2252 if osd_id is None:
2253 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2254 if not only_up or (o['up_from'] > 0):
2255 r[str(osd_id)] = o.get('uuid', '')
9f95a23c
TL
2256 return r
2257
e306af50
TL
2258 def _trigger_preview_refresh(self,
2259 specs: Optional[List[DriveGroupSpec]] = None,
f6b5b4d7
TL
2260 service_name: Optional[str] = None,
2261 ) -> None:
2262 # Only trigger a refresh when a spec has changed
2263 trigger_specs = []
2264 if specs:
2265 for spec in specs:
2266 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
2267 # the to-be-preview spec != the actual spec, this means we need to
2268 # trigger a refresh, if the spec has been removed (==None) we need to
2269 # refresh as well.
2270 if not preview_spec or spec != preview_spec:
2271 trigger_specs.append(spec)
2272 if service_name:
2273 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
2274 if not any(trigger_specs):
2275 return None
2276
2277 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
e306af50
TL
2278 for host in refresh_hosts:
2279 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
2280 self.cache.osdspec_previews_refresh_queue.append(host)
2281
f67539c2 2282 @handle_orch_error
f6b5b4d7
TL
2283 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
2284 """
2285 Deprecated. Please use `apply()` instead.
2286
2287 Keeping this around to be compapatible to mgr/dashboard
2288 """
9f95a23c
TL
2289 return [self._apply(spec) for spec in specs]
2290
f67539c2 2291 @handle_orch_error
f6b5b4d7
TL
2292 def create_osds(self, drive_group: DriveGroupSpec) -> str:
2293 return self.osd_service.create_from_spec(drive_group)
9f95a23c 2294
f6b5b4d7
TL
2295 def _preview_osdspecs(self,
2296 osdspecs: Optional[List[DriveGroupSpec]] = None
adb31ebb 2297 ) -> dict:
f6b5b4d7
TL
2298 if not osdspecs:
2299 return {'n/a': [{'error': True,
2300 'message': 'No OSDSpec or matching hosts found.'}]}
2301 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
e306af50
TL
2302 if not matching_hosts:
2303 return {'n/a': [{'error': True,
2304 'message': 'No OSDSpec or matching hosts found.'}]}
adb31ebb 2305 # Is any host still loading previews or still in the queue to be previewed
e306af50 2306 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
adb31ebb 2307 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
e306af50
TL
2308 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
2309 return {'n/a': [{'error': True,
2310 'message': 'Preview data is being generated.. '
f6b5b4d7
TL
2311 'Please re-run this command in a bit.'}]}
2312 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
2313 previews_for_specs = {}
2314 for host, raw_reports in self.cache.osdspec_previews.items():
2315 if host not in matching_hosts:
2316 continue
2317 osd_reports = []
2318 for osd_report in raw_reports:
2319 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
2320 osd_reports.append(osd_report)
2321 previews_for_specs.update({host: osd_reports})
2322 return previews_for_specs
9f95a23c 2323
f67539c2
TL
2324 def _calc_daemon_deps(self,
2325 spec: Optional[ServiceSpec],
2326 daemon_type: str,
2327 daemon_id: str) -> List[str]:
9f95a23c 2328 deps = []
f67539c2
TL
2329 if daemon_type == 'haproxy':
2330 # because cephadm creates new daemon instances whenever
2331 # port or ip changes, identifying daemons by name is
2332 # sufficient to detect changes.
2333 if not spec:
2334 return []
2335 ingress_spec = cast(IngressSpec, spec)
2336 assert ingress_spec.backend_service
2337 daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service)
2338 deps = [d.name() for d in daemons]
2339 elif daemon_type == 'keepalived':
2340 # because cephadm creates new daemon instances whenever
2341 # port or ip changes, identifying daemons by name is
2342 # sufficient to detect changes.
2343 if not spec:
2344 return []
2345 daemons = self.cache.get_daemons_by_service(spec.service_name())
2346 deps = [d.name() for d in daemons if d.daemon_type == 'haproxy']
a4b75251
TL
2347 elif daemon_type == 'iscsi':
2348 deps = [self.get_mgr_ip()]
f67539c2
TL
2349 else:
2350 need = {
2351 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'],
2352 'grafana': ['prometheus'],
2353 'alertmanager': ['mgr', 'alertmanager'],
2354 }
2355 for dep_type in need.get(daemon_type, []):
2356 for dd in self.cache.get_daemons_by_type(dep_type):
2357 deps.append(dd.name())
9f95a23c
TL
2358 return sorted(deps)
2359
e306af50 2360 @forall_hosts
adb31ebb 2361 def _remove_daemons(self, name: str, host: str) -> str:
f67539c2 2362 return CephadmServe(self)._remove_daemon(name, host)
9f95a23c 2363
adb31ebb 2364 def _check_pool_exists(self, pool: str, service_name: str) -> None:
e306af50
TL
2365 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
2366 if not self.rados.pool_exists(pool):
2367 raise OrchestratorError(f'Cannot find pool "{pool}" for '
2368 f'service {service_name}')
2369
adb31ebb
TL
2370 def _add_daemon(self,
2371 daemon_type: str,
f67539c2 2372 spec: ServiceSpec) -> List[str]:
9f95a23c
TL
2373 """
2374 Add (and place) a daemon. Require explicit host placement. Do not
2375 schedule, and do not apply the related scheduling limitations.
2376 """
f67539c2
TL
2377 if spec.service_name() not in self.spec_store:
2378 raise OrchestratorError('Unable to add a Daemon without Service.\n'
2379 'Please use `ceph orch apply ...` to create a Service.\n'
2380 'Note, you might want to create the service with "unmanaged=true"')
2381
9f95a23c
TL
2382 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2383 if not spec.placement.hosts:
2384 raise OrchestratorError('must specify host(s) to deploy on')
2385 count = spec.placement.count or len(spec.placement.hosts)
2386 daemons = self.cache.get_daemons_by_service(spec.service_name())
2387 return self._create_daemons(daemon_type, spec, daemons,
f67539c2 2388 spec.placement.hosts, count)
9f95a23c 2389
adb31ebb
TL
2390 def _create_daemons(self,
2391 daemon_type: str,
2392 spec: ServiceSpec,
2393 daemons: List[DaemonDescription],
2394 hosts: List[HostPlacementSpec],
f67539c2 2395 count: int) -> List[str]:
9f95a23c
TL
2396 if count > len(hosts):
2397 raise OrchestratorError('too few hosts: want %d, have %s' % (
2398 count, hosts))
2399
f6b5b4d7 2400 did_config = False
f67539c2 2401 service_type = daemon_type_to_service(daemon_type)
9f95a23c 2402
f67539c2 2403 args = [] # type: List[CephadmDaemonDeploySpec]
9f95a23c
TL
2404 for host, network, name in hosts:
2405 daemon_id = self.get_unique_name(daemon_type, host, daemons,
e306af50
TL
2406 prefix=spec.service_id,
2407 forcename=name)
f6b5b4d7 2408
f67539c2 2409 if not did_config:
522d829b 2410 self.cephadm_services[service_type].config(spec)
f6b5b4d7
TL
2411 did_config = True
2412
f67539c2
TL
2413 daemon_spec = self.cephadm_services[service_type].make_daemon_spec(
2414 host, daemon_id, network, spec,
2415 # NOTE: this does not consider port conflicts!
2416 ports=spec.get_port_start())
9f95a23c
TL
2417 self.log.debug('Placing %s.%s on host %s' % (
2418 daemon_type, daemon_id, host))
f6b5b4d7 2419 args.append(daemon_spec)
9f95a23c
TL
2420
2421 # add to daemon list so next name(s) will also be unique
2422 sd = orchestrator.DaemonDescription(
2423 hostname=host,
2424 daemon_type=daemon_type,
2425 daemon_id=daemon_id,
2426 )
2427 daemons.append(sd)
2428
f67539c2 2429 @ forall_hosts
adb31ebb 2430 def create_func_map(*args: Any) -> str:
f67539c2
TL
2431 daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args)
2432 return CephadmServe(self)._create_daemon(daemon_spec)
9f95a23c
TL
2433
2434 return create_func_map(args)
2435
f67539c2
TL
2436 @handle_orch_error
2437 def add_daemon(self, spec: ServiceSpec) -> List[str]:
2438 ret: List[str] = []
2439 for d_type in service_to_daemon_types(spec.service_type):
2440 ret.extend(self._add_daemon(d_type, spec))
2441 return ret
2442
2443 @handle_orch_error
adb31ebb 2444 def apply_mon(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2445 return self._apply(spec)
2446
e306af50
TL
2447 def _apply(self, spec: GenericSpec) -> str:
2448 if spec.service_type == 'host':
2449 return self._add_host(cast(HostSpec, spec))
9f95a23c 2450
f6b5b4d7
TL
2451 if spec.service_type == 'osd':
2452 # _trigger preview refresh needs to be smart and
2453 # should only refresh if a change has been detected
2454 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
2455
e306af50 2456 return self._apply_service_spec(cast(ServiceSpec, spec))
9f95a23c 2457
a4b75251
TL
2458 def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None:
2459 self.health_checks[name] = {
2460 'severity': 'warning',
2461 'summary': summary,
2462 'count': count,
2463 'detail': detail,
2464 }
2465 self.set_health_checks(self.health_checks)
2466
2467 def remove_health_warning(self, name: str) -> None:
2468 if name in self.health_checks:
2469 del self.health_checks[name]
2470 self.set_health_checks(self.health_checks)
2471
adb31ebb 2472 def _plan(self, spec: ServiceSpec) -> dict:
f6b5b4d7
TL
2473 if spec.service_type == 'osd':
2474 return {'service_name': spec.service_name(),
2475 'service_type': spec.service_type,
2476 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
2477
b3b6e05e 2478 svc = self.cephadm_services[spec.service_type]
f6b5b4d7
TL
2479 ha = HostAssignment(
2480 spec=spec,
b3b6e05e 2481 hosts=self._schedulable_hosts(),
522d829b 2482 unreachable_hosts=self._unreachable_hosts(),
f67539c2
TL
2483 networks=self.cache.networks,
2484 daemons=self.cache.get_daemons_by_service(spec.service_name()),
b3b6e05e
TL
2485 allow_colo=svc.allow_colo(),
2486 rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None
f6b5b4d7
TL
2487 )
2488 ha.validate()
f67539c2 2489 hosts, to_add, to_remove = ha.place()
f6b5b4d7
TL
2490
2491 return {
2492 'service_name': spec.service_name(),
2493 'service_type': spec.service_type,
f67539c2 2494 'add': [hs.hostname for hs in to_add],
b3b6e05e 2495 'remove': [d.name() for d in to_remove]
f6b5b4d7
TL
2496 }
2497
f67539c2
TL
2498 @handle_orch_error
2499 def plan(self, specs: Sequence[GenericSpec]) -> List:
f6b5b4d7
TL
2500 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2501 'to the current inventory setup. If any on these conditions changes, the \n'
2502 'preview will be invalid. Please make sure to have a minimal \n'
2503 'timeframe between planning and applying the specs.'}]
2504 if any([spec.service_type == 'host' for spec in specs]):
2505 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2506 for spec in specs:
2507 results.append(self._plan(cast(ServiceSpec, spec)))
2508 return results
2509
e306af50 2510 def _apply_service_spec(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2511 if spec.placement.is_empty():
2512 # fill in default placement
2513 defaults = {
2514 'mon': PlacementSpec(count=5),
2515 'mgr': PlacementSpec(count=2),
2516 'mds': PlacementSpec(count=2),
2517 'rgw': PlacementSpec(count=2),
f67539c2 2518 'ingress': PlacementSpec(count=2),
1911f103 2519 'iscsi': PlacementSpec(count=1),
9f95a23c 2520 'rbd-mirror': PlacementSpec(count=2),
f67539c2 2521 'cephfs-mirror': PlacementSpec(count=1),
801d1391 2522 'nfs': PlacementSpec(count=1),
9f95a23c
TL
2523 'grafana': PlacementSpec(count=1),
2524 'alertmanager': PlacementSpec(count=1),
2525 'prometheus': PlacementSpec(count=1),
2526 'node-exporter': PlacementSpec(host_pattern='*'),
2527 'crash': PlacementSpec(host_pattern='*'),
f91f0fd5 2528 'container': PlacementSpec(count=1),
f67539c2 2529 'cephadm-exporter': PlacementSpec(host_pattern='*'),
9f95a23c
TL
2530 }
2531 spec.placement = defaults[spec.service_type]
2532 elif spec.service_type in ['mon', 'mgr'] and \
f91f0fd5
TL
2533 spec.placement.count is not None and \
2534 spec.placement.count < 1:
9f95a23c
TL
2535 raise OrchestratorError('cannot scale %s service below 1' % (
2536 spec.service_type))
2537
f67539c2
TL
2538 host_count = len(self.inventory.keys())
2539 max_count = self.max_count_per_host
2540
2541 if spec.placement.count is not None:
2542 if spec.service_type in ['mon', 'mgr']:
2543 if spec.placement.count > max(5, host_count):
2544 raise OrchestratorError(
2545 (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.'))
2546 elif spec.service_type != 'osd':
2547 if spec.placement.count > (max_count * host_count):
2548 raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).'
2549 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
2550
2551 if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd':
2552 raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.'
2553 + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option'))
2554
9f95a23c
TL
2555 HostAssignment(
2556 spec=spec,
f91f0fd5 2557 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
522d829b 2558 unreachable_hosts=self._unreachable_hosts(),
f67539c2
TL
2559 networks=self.cache.networks,
2560 daemons=self.cache.get_daemons_by_service(spec.service_name()),
2561 allow_colo=self.cephadm_services[spec.service_type].allow_colo(),
9f95a23c
TL
2562 ).validate()
2563
2564 self.log.info('Saving service %s spec with placement %s' % (
2565 spec.service_name(), spec.placement.pretty_str()))
2566 self.spec_store.save(spec)
2567 self._kick_serve_loop()
1911f103 2568 return "Scheduled %s update..." % spec.service_name()
9f95a23c 2569
f67539c2
TL
2570 @handle_orch_error
2571 def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
e306af50
TL
2572 results = []
2573 for spec in specs:
f67539c2
TL
2574 if no_overwrite:
2575 if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory:
2576 results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag'
2577 % (cast(HostSpec, spec).hostname, spec.service_type))
2578 continue
2579 elif cast(ServiceSpec, spec).service_name() in self.spec_store:
2580 results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
2581 % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
2582 continue
e306af50
TL
2583 results.append(self._apply(spec))
2584 return results
9f95a23c 2585
f67539c2 2586 @handle_orch_error
adb31ebb 2587 def apply_mgr(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2588 return self._apply(spec)
2589
f67539c2 2590 @handle_orch_error
f6b5b4d7 2591 def apply_mds(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2592 return self._apply(spec)
2593
f67539c2 2594 @handle_orch_error
adb31ebb 2595 def apply_rgw(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2596 return self._apply(spec)
2597
f67539c2
TL
2598 @handle_orch_error
2599 def apply_ingress(self, spec: ServiceSpec) -> str:
2600 return self._apply(spec)
1911f103 2601
f67539c2 2602 @handle_orch_error
adb31ebb 2603 def apply_iscsi(self, spec: ServiceSpec) -> str:
1911f103
TL
2604 return self._apply(spec)
2605
f67539c2 2606 @handle_orch_error
adb31ebb 2607 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2608 return self._apply(spec)
2609
f67539c2 2610 @handle_orch_error
adb31ebb 2611 def apply_nfs(self, spec: ServiceSpec) -> str:
801d1391
TL
2612 return self._apply(spec)
2613
9f95a23c
TL
2614 def _get_dashboard_url(self):
2615 # type: () -> str
2616 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2617
f67539c2 2618 @handle_orch_error
adb31ebb 2619 def apply_prometheus(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2620 return self._apply(spec)
2621
f67539c2 2622 @handle_orch_error
adb31ebb 2623 def apply_node_exporter(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2624 return self._apply(spec)
2625
f67539c2 2626 @handle_orch_error
adb31ebb 2627 def apply_crash(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2628 return self._apply(spec)
2629
f67539c2 2630 @handle_orch_error
f6b5b4d7 2631 def apply_grafana(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2632 return self._apply(spec)
2633
f67539c2 2634 @handle_orch_error
f6b5b4d7 2635 def apply_alertmanager(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2636 return self._apply(spec)
2637
f67539c2 2638 @handle_orch_error
f91f0fd5
TL
2639 def apply_container(self, spec: ServiceSpec) -> str:
2640 return self._apply(spec)
2641
f67539c2
TL
2642 @handle_orch_error
2643 def apply_cephadm_exporter(self, spec: ServiceSpec) -> str:
2644 return self._apply(spec)
2645
2646 @handle_orch_error
adb31ebb 2647 def upgrade_check(self, image: str, version: str) -> str:
f67539c2
TL
2648 if self.inventory.get_host_with_state("maintenance"):
2649 raise OrchestratorError("check aborted - you have hosts in maintenance state")
2650
9f95a23c
TL
2651 if version:
2652 target_name = self.container_image_base + ':v' + version
2653 elif image:
2654 target_name = image
2655 else:
2656 raise OrchestratorError('must specify either image or version')
2657
f67539c2 2658 image_info = CephadmServe(self)._get_container_image_info(target_name)
b3b6e05e
TL
2659
2660 ceph_image_version = image_info.ceph_version
2661 if not ceph_image_version:
2662 return f'Unable to extract ceph version from {target_name}.'
2663 if ceph_image_version.startswith('ceph version '):
2664 ceph_image_version = ceph_image_version.split(' ')[2]
2665 version_error = self.upgrade._check_target_version(ceph_image_version)
2666 if version_error:
2667 return f'Incompatible upgrade: {version_error}'
2668
f91f0fd5 2669 self.log.debug(f'image info {image} -> {image_info}')
adb31ebb 2670 r: dict = {
9f95a23c 2671 'target_name': target_name,
f91f0fd5
TL
2672 'target_id': image_info.image_id,
2673 'target_version': image_info.ceph_version,
9f95a23c
TL
2674 'needs_update': dict(),
2675 'up_to_date': list(),
f67539c2 2676 'non_ceph_image_daemons': list()
9f95a23c
TL
2677 }
2678 for host, dm in self.cache.daemons.items():
2679 for name, dd in dm.items():
f91f0fd5 2680 if image_info.image_id == dd.container_image_id:
9f95a23c 2681 r['up_to_date'].append(dd.name())
a4b75251 2682 elif dd.daemon_type in CEPH_IMAGE_TYPES:
9f95a23c
TL
2683 r['needs_update'][dd.name()] = {
2684 'current_name': dd.container_image_name,
2685 'current_id': dd.container_image_id,
2686 'current_version': dd.version,
2687 }
f67539c2
TL
2688 else:
2689 r['non_ceph_image_daemons'].append(dd.name())
2690 if self.use_repo_digest and image_info.repo_digests:
2691 # FIXME: we assume the first digest is the best one to use
2692 r['target_digest'] = image_info.repo_digests[0]
f91f0fd5 2693
9f95a23c
TL
2694 return json.dumps(r, indent=4, sort_keys=True)
2695
f67539c2 2696 @handle_orch_error
f6b5b4d7 2697 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
e306af50 2698 return self.upgrade.upgrade_status()
9f95a23c 2699
a4b75251
TL
2700 @handle_orch_error
2701 def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict[Any, Any]:
2702 return self.upgrade.upgrade_ls(image, tags)
2703
f67539c2 2704 @handle_orch_error
adb31ebb 2705 def upgrade_start(self, image: str, version: str) -> str:
f67539c2
TL
2706 if self.inventory.get_host_with_state("maintenance"):
2707 raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state")
e306af50 2708 return self.upgrade.upgrade_start(image, version)
9f95a23c 2709
f67539c2 2710 @handle_orch_error
f6b5b4d7 2711 def upgrade_pause(self) -> str:
e306af50 2712 return self.upgrade.upgrade_pause()
9f95a23c 2713
f67539c2 2714 @handle_orch_error
f6b5b4d7 2715 def upgrade_resume(self) -> str:
e306af50 2716 return self.upgrade.upgrade_resume()
9f95a23c 2717
f67539c2 2718 @handle_orch_error
f6b5b4d7 2719 def upgrade_stop(self) -> str:
e306af50 2720 return self.upgrade.upgrade_stop()
9f95a23c 2721
f67539c2 2722 @handle_orch_error
9f95a23c
TL
2723 def remove_osds(self, osd_ids: List[str],
2724 replace: bool = False,
a4b75251
TL
2725 force: bool = False,
2726 zap: bool = False) -> str:
9f95a23c
TL
2727 """
2728 Takes a list of OSDs and schedules them for removal.
2729 The function that takes care of the actual removal is
f6b5b4d7 2730 process_removal_queue().
9f95a23c
TL
2731 """
2732
f6b5b4d7
TL
2733 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2734 to_remove_daemons = list()
9f95a23c 2735 for daemon in daemons:
f6b5b4d7
TL
2736 if daemon.daemon_id in osd_ids:
2737 to_remove_daemons.append(daemon)
2738 if not to_remove_daemons:
2739 return f"Unable to find OSDs: {osd_ids}"
9f95a23c 2740
f6b5b4d7 2741 for daemon in to_remove_daemons:
f67539c2 2742 assert daemon.daemon_id is not None
f6b5b4d7
TL
2743 try:
2744 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2745 replace=replace,
2746 force=force,
a4b75251 2747 zap=zap,
f6b5b4d7 2748 hostname=daemon.hostname,
adb31ebb
TL
2749 process_started_at=datetime_now(),
2750 remove_util=self.to_remove_osds.rm_util))
f6b5b4d7
TL
2751 except NotFoundError:
2752 return f"Unable to find OSDs: {osd_ids}"
9f95a23c
TL
2753
2754 # trigger the serve loop to initiate the removal
2755 self._kick_serve_loop()
2756 return "Scheduled OSD(s) for removal"
2757
f67539c2 2758 @handle_orch_error
adb31ebb 2759 def stop_remove_osds(self, osd_ids: List[str]) -> str:
f6b5b4d7
TL
2760 """
2761 Stops a `removal` process for a List of OSDs.
2762 This will revert their weight and remove it from the osds_to_remove queue
2763 """
2764 for osd_id in osd_ids:
2765 try:
2766 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
adb31ebb 2767 remove_util=self.to_remove_osds.rm_util))
f67539c2 2768 except (NotFoundError, KeyError, ValueError):
f6b5b4d7
TL
2769 return f'Unable to find OSD in the queue: {osd_id}'
2770
2771 # trigger the serve loop to halt the removal
2772 self._kick_serve_loop()
2773 return "Stopped OSD(s) removal"
2774
f67539c2 2775 @handle_orch_error
adb31ebb 2776 def remove_osds_status(self) -> List[OSD]:
9f95a23c
TL
2777 """
2778 The CLI call to retrieve an osd removal report
2779 """
f6b5b4d7 2780 return self.to_remove_osds.all_osds()
522d829b
TL
2781
2782 @handle_orch_error
2783 def drain_host(self, hostname):
2784 # type: (str) -> str
2785 """
2786 Drain all daemons from a host.
2787 :param host: host name
2788 """
2789 self.add_host_label(hostname, '_no_schedule')
2790
2791 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname)
2792
2793 osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd']
2794 self.remove_osds(osds_to_remove)
2795
2796 daemons_table = ""
2797 daemons_table += "{:<20} {:<15}\n".format("type", "id")
2798 daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15)
2799 for d in daemons:
2800 daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id)
2801
2802 return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table)
2803
2804 def trigger_connect_dashboard_rgw(self) -> None:
2805 self.need_connect_dashboard_rgw = True
2806 self.event.set()