]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/module.py
import ceph 15.2.10
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
CommitLineData
9f95a23c
TL
1import json
2import errno
3import logging
f91f0fd5
TL
4import re
5import shlex
e306af50 6from collections import defaultdict
f91f0fd5 7from configparser import ConfigParser
f6b5b4d7 8from contextlib import contextmanager
9f95a23c 9from functools import wraps
e306af50
TL
10from tempfile import TemporaryDirectory
11from threading import Event
9f95a23c
TL
12
13import string
e306af50 14from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
f91f0fd5 15 Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
9f95a23c
TL
16
17import datetime
18import six
19import os
20import random
21import tempfile
22import multiprocessing.pool
9f95a23c 23import subprocess
9f95a23c 24
e306af50 25from ceph.deployment import inventory
9f95a23c 26from ceph.deployment.drive_group import DriveGroupSpec
801d1391 27from ceph.deployment.service_spec import \
f91f0fd5 28 NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
adb31ebb
TL
29 CustomContainerSpec, HostPlacementSpec
30from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f91f0fd5 31from cephadm.serve import CephadmServe
f6b5b4d7 32from cephadm.services.cephadmservice import CephadmDaemonSpec
9f95a23c 33
801d1391 34from mgr_module import MgrModule, HandleCommandResult
9f95a23c
TL
35import orchestrator
36from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
f6b5b4d7 37 CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
e306af50 38from orchestrator._interface import GenericSpec
9f95a23c
TL
39
40from . import remotes
801d1391 41from . import utils
f6b5b4d7 42from .migrations import Migrations
e306af50
TL
43from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
44 RbdMirrorService, CrashService, CephadmService
f91f0fd5 45from .services.container import CustomContainerService
e306af50
TL
46from .services.iscsi import IscsiService
47from .services.nfs import NFSService
adb31ebb 48from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
e306af50
TL
49from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
50 NodeExporterService
f91f0fd5 51from .schedule import HostAssignment
f6b5b4d7 52from .inventory import Inventory, SpecStore, HostCache, EventStore
e306af50
TL
53from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
54from .template import TemplateMgr
adb31ebb 55from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
9f95a23c
TL
56
57try:
58 import remoto
e306af50
TL
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils.version import StrictVersion
62 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
adb31ebb 63 def remoto_has_connection(self: Any) -> bool:
e306af50
TL
64 return self.gateway.hasreceiver()
65
66 from remoto.backends import BaseConnection
67 BaseConnection.has_connection = remoto_has_connection
9f95a23c
TL
68 import remoto.process
69 import execnet.gateway_bootstrap
70except ImportError as e:
71 remoto = None
72 remoto_import_error = str(e)
73
74try:
75 from typing import List
76except ImportError:
77 pass
78
79logger = logging.getLogger(__name__)
80
e306af50
TL
81T = TypeVar('T')
82
1911f103
TL
83DEFAULT_SSH_CONFIG = """
84Host *
85 User root
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
88 ConnectTimeout=30
89"""
9f95a23c 90
9f95a23c
TL
91CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
92
93
f6b5b4d7 94class CephadmCompletion(orchestrator.Completion[T]):
adb31ebb 95 def evaluate(self) -> None:
e306af50 96 self.finalize(None)
9f95a23c 97
f91f0fd5 98
f6b5b4d7 99def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
9f95a23c 100 """
e306af50
TL
101 Decorator to make CephadmCompletion methods return
102 a completion object that executes themselves.
9f95a23c 103 """
9f95a23c 104
9f95a23c 105 @wraps(f)
adb31ebb 106 def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
e306af50
TL
107 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
108
9f95a23c
TL
109 return wrapper
110
111
f91f0fd5
TL
112class ContainerInspectInfo(NamedTuple):
113 image_id: str
114 ceph_version: Optional[str]
115 repo_digest: Optional[str]
116
117
9f95a23c
TL
118@six.add_metaclass(CLICommandMeta)
119class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
120
121 _STORE_HOST_PREFIX = "host"
122
123 instance = None
124 NATIVE_OPTIONS = [] # type: List[Any]
f6b5b4d7 125 MODULE_OPTIONS: List[dict] = [
9f95a23c
TL
126 {
127 'name': 'ssh_config_file',
128 'type': 'str',
129 'default': None,
130 'desc': 'customized SSH config file to connect to managed hosts',
131 },
132 {
133 'name': 'device_cache_timeout',
134 'type': 'secs',
135 'default': 30 * 60,
136 'desc': 'seconds to cache device inventory',
137 },
138 {
139 'name': 'daemon_cache_timeout',
140 'type': 'secs',
141 'default': 10 * 60,
142 'desc': 'seconds to cache service (daemon) inventory',
143 },
adb31ebb
TL
144 {
145 'name': 'facts_cache_timeout',
146 'type': 'secs',
147 'default': 1 * 60,
148 'desc': 'seconds to cache host facts data',
149 },
9f95a23c
TL
150 {
151 'name': 'host_check_interval',
152 'type': 'secs',
153 'default': 10 * 60,
154 'desc': 'how frequently to perform a host check',
155 },
156 {
157 'name': 'mode',
158 'type': 'str',
159 'enum_allowed': ['root', 'cephadm-package'],
160 'default': 'root',
161 'desc': 'mode for remote execution of cephadm',
162 },
163 {
164 'name': 'container_image_base',
801d1391 165 'default': 'docker.io/ceph/ceph',
9f95a23c
TL
166 'desc': 'Container image name, without the tag',
167 'runtime': True,
168 },
e306af50
TL
169 {
170 'name': 'container_image_prometheus',
f91f0fd5 171 'default': 'docker.io/prom/prometheus:v2.18.1',
e306af50
TL
172 'desc': 'Prometheus container image',
173 },
174 {
175 'name': 'container_image_grafana',
cd265ab1 176 'default': 'docker.io/ceph/ceph-grafana:6.7.4',
e306af50
TL
177 'desc': 'Prometheus container image',
178 },
179 {
180 'name': 'container_image_alertmanager',
f91f0fd5 181 'default': 'docker.io/prom/alertmanager:v0.20.0',
e306af50
TL
182 'desc': 'Prometheus container image',
183 },
184 {
185 'name': 'container_image_node_exporter',
f91f0fd5 186 'default': 'docker.io/prom/node-exporter:v0.18.1',
e306af50
TL
187 'desc': 'Prometheus container image',
188 },
9f95a23c
TL
189 {
190 'name': 'warn_on_stray_hosts',
191 'type': 'bool',
192 'default': True,
193 'desc': 'raise a health warning if daemons are detected on a host '
194 'that is not managed by cephadm',
195 },
196 {
197 'name': 'warn_on_stray_daemons',
198 'type': 'bool',
199 'default': True,
200 'desc': 'raise a health warning if daemons are detected '
201 'that are not managed by cephadm',
202 },
203 {
204 'name': 'warn_on_failed_host_check',
205 'type': 'bool',
206 'default': True,
207 'desc': 'raise a health warning if the host check fails',
208 },
209 {
210 'name': 'log_to_cluster',
211 'type': 'bool',
212 'default': True,
213 'desc': 'log to the "cephadm" cluster log channel"',
214 },
215 {
216 'name': 'allow_ptrace',
217 'type': 'bool',
218 'default': False,
219 'desc': 'allow SYS_PTRACE capability on ceph containers',
220 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
221 'process with gdb or strace. Enabling this options '
222 'can allow debugging daemons that encounter problems '
223 'at runtime.',
224 },
f91f0fd5
TL
225 {
226 'name': 'container_init',
227 'type': 'bool',
228 'default': False,
229 'desc': 'Run podman/docker with `--init`',
230 },
801d1391
TL
231 {
232 'name': 'prometheus_alerts_path',
233 'type': 'str',
234 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
235 'desc': 'location of alerts to include in prometheus deployments',
236 },
f6b5b4d7
TL
237 {
238 'name': 'migration_current',
239 'type': 'int',
240 'default': None,
241 'desc': 'internal - do not modify',
242 # used to track track spec and other data migrations.
243 },
244 {
245 'name': 'config_dashboard',
246 'type': 'bool',
247 'default': True,
248 'desc': 'manage configs like API endpoints in Dashboard.'
249 },
250 {
251 'name': 'manage_etc_ceph_ceph_conf',
252 'type': 'bool',
253 'default': False,
254 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
255 },
256 {
257 'name': 'registry_url',
258 'type': 'str',
259 'default': None,
260 'desc': 'Custom repository url'
261 },
262 {
263 'name': 'registry_username',
264 'type': 'str',
265 'default': None,
266 'desc': 'Custom repository username'
267 },
268 {
269 'name': 'registry_password',
270 'type': 'str',
271 'default': None,
272 'desc': 'Custom repository password'
273 },
f91f0fd5
TL
274 {
275 'name': 'use_repo_digest',
276 'type': 'bool',
277 'default': False,
278 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
279 }
9f95a23c
TL
280 ]
281
adb31ebb 282 def __init__(self, *args: Any, **kwargs: Any):
9f95a23c
TL
283 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
284 self._cluster_fsid = self.get('mon_map')['fsid']
f6b5b4d7 285 self.last_monmap: Optional[datetime.datetime] = None
9f95a23c
TL
286
287 # for serve()
288 self.run = True
289 self.event = Event()
290
291 if self.get_store('pause'):
292 self.paused = True
293 else:
294 self.paused = False
295
296 # for mypy which does not run the code
297 if TYPE_CHECKING:
298 self.ssh_config_file = None # type: Optional[str]
299 self.device_cache_timeout = 0
300 self.daemon_cache_timeout = 0
adb31ebb 301 self.facts_cache_timeout = 0
9f95a23c
TL
302 self.host_check_interval = 0
303 self.mode = ''
304 self.container_image_base = ''
e306af50
TL
305 self.container_image_prometheus = ''
306 self.container_image_grafana = ''
307 self.container_image_alertmanager = ''
308 self.container_image_node_exporter = ''
9f95a23c
TL
309 self.warn_on_stray_hosts = True
310 self.warn_on_stray_daemons = True
311 self.warn_on_failed_host_check = True
312 self.allow_ptrace = False
f91f0fd5 313 self.container_init = False
801d1391 314 self.prometheus_alerts_path = ''
adb31ebb 315 self.migration_current: Optional[int] = None
f6b5b4d7
TL
316 self.config_dashboard = True
317 self.manage_etc_ceph_ceph_conf = True
318 self.registry_url: Optional[str] = None
319 self.registry_username: Optional[str] = None
320 self.registry_password: Optional[str] = None
f91f0fd5 321 self.use_repo_digest = False
9f95a23c 322
f91f0fd5
TL
323 self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
324 remoto.backends.LegacyModuleExecute]] = {}
f6b5b4d7
TL
325
326 self.notify('mon_map', None)
9f95a23c
TL
327 self.config_notify()
328
329 path = self.get_ceph_option('cephadm_path')
330 try:
331 with open(path, 'r') as f:
332 self._cephadm = f.read()
333 except (IOError, TypeError) as e:
334 raise RuntimeError("unable to read cephadm at '%s': %s" % (
335 path, str(e)))
336
337 self._worker_pool = multiprocessing.pool.ThreadPool(10)
338
339 self._reconfig_ssh()
340
341 CephadmOrchestrator.instance = self
342
e306af50 343 self.upgrade = CephadmUpgrade(self)
9f95a23c 344
adb31ebb 345 self.health_checks: Dict[str, dict] = {}
9f95a23c
TL
346
347 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
348
e306af50 349 self.inventory = Inventory(self)
9f95a23c
TL
350
351 self.cache = HostCache(self)
352 self.cache.load()
f6b5b4d7 353
adb31ebb
TL
354 self.to_remove_osds = OSDRemovalQueue(self)
355 self.to_remove_osds.load_from_store()
9f95a23c
TL
356
357 self.spec_store = SpecStore(self)
358 self.spec_store.load()
359
360 # ensure the host lists are in sync
361 for h in self.inventory.keys():
362 if h not in self.cache.daemons:
363 self.cache.prime_empty_host(h)
364 for h in self.cache.get_hosts():
365 if h not in self.inventory:
366 self.cache.rm_host(h)
367
1911f103 368 # in-memory only.
f6b5b4d7 369 self.events = EventStore(self)
1911f103
TL
370 self.offline_hosts: Set[str] = set()
371
f6b5b4d7
TL
372 self.migration = Migrations(self)
373
e306af50
TL
374 # services:
375 self.osd_service = OSDService(self)
376 self.nfs_service = NFSService(self)
377 self.mon_service = MonService(self)
378 self.mgr_service = MgrService(self)
379 self.mds_service = MdsService(self)
380 self.rgw_service = RgwService(self)
381 self.rbd_mirror_service = RbdMirrorService(self)
382 self.grafana_service = GrafanaService(self)
383 self.alertmanager_service = AlertmanagerService(self)
384 self.prometheus_service = PrometheusService(self)
385 self.node_exporter_service = NodeExporterService(self)
386 self.crash_service = CrashService(self)
387 self.iscsi_service = IscsiService(self)
f91f0fd5 388 self.container_service = CustomContainerService(self)
e306af50
TL
389 self.cephadm_services = {
390 'mon': self.mon_service,
391 'mgr': self.mgr_service,
392 'osd': self.osd_service,
393 'mds': self.mds_service,
394 'rgw': self.rgw_service,
395 'rbd-mirror': self.rbd_mirror_service,
396 'nfs': self.nfs_service,
397 'grafana': self.grafana_service,
398 'alertmanager': self.alertmanager_service,
399 'prometheus': self.prometheus_service,
400 'node-exporter': self.node_exporter_service,
401 'crash': self.crash_service,
402 'iscsi': self.iscsi_service,
f91f0fd5 403 'container': self.container_service,
e306af50
TL
404 }
405
f91f0fd5 406 self.template = TemplateMgr(self)
e306af50 407
adb31ebb 408 self.requires_post_actions: Set[str] = set()
f6b5b4d7 409
adb31ebb 410 def shutdown(self) -> None:
9f95a23c
TL
411 self.log.debug('shutdown')
412 self._worker_pool.close()
413 self._worker_pool.join()
414 self.run = False
415 self.event.set()
416
e306af50
TL
417 def _get_cephadm_service(self, service_type: str) -> CephadmService:
418 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
419 return self.cephadm_services[service_type]
420
adb31ebb 421 def _kick_serve_loop(self) -> None:
9f95a23c
TL
422 self.log.debug('_kick_serve_loop')
423 self.event.set()
424
f6b5b4d7 425 # function responsible for logging single host into custom registry
adb31ebb 426 def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
f6b5b4d7
TL
427 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
428 # want to pass info over stdin rather than through normal list of args
adb31ebb
TL
429 args_str = json.dumps({
430 'url': url,
431 'username': username,
432 'password': password,
433 })
f6b5b4d7
TL
434 out, err, code = self._run_cephadm(
435 host, 'mon', 'registry-login',
436 ['--registry-json', '-'], stdin=args_str, error_ok=True)
437 if code:
438 return f"Host {host} failed to login to {url} as {username} with given password"
adb31ebb 439 return None
9f95a23c 440
f6b5b4d7
TL
441 def serve(self) -> None:
442 """
443 The main loop of cephadm.
444
445 A command handler will typically change the declarative state
446 of cephadm. This loop will then attempt to apply this new state.
447 """
f91f0fd5
TL
448 serve = CephadmServe(self)
449 serve.serve()
450
adb31ebb 451 def set_container_image(self, entity: str, image: str) -> None:
f91f0fd5
TL
452 self.check_mon_command({
453 'prefix': 'config set',
454 'name': 'container_image',
455 'value': image,
456 'who': entity,
457 })
f6b5b4d7 458
adb31ebb 459 def config_notify(self) -> None:
9f95a23c
TL
460 """
461 This method is called whenever one of our config options is changed.
f6b5b4d7
TL
462
463 TODO: this method should be moved into mgr_module.py
9f95a23c
TL
464 """
465 for opt in self.MODULE_OPTIONS:
466 setattr(self,
467 opt['name'], # type: ignore
468 self.get_module_option(opt['name'])) # type: ignore
469 self.log.debug(' mgr option %s = %s',
470 opt['name'], getattr(self, opt['name'])) # type: ignore
471 for opt in self.NATIVE_OPTIONS:
472 setattr(self,
473 opt, # type: ignore
474 self.get_ceph_option(opt))
475 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
476
477 self.event.set()
478
adb31ebb 479 def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
f6b5b4d7
TL
480 if notify_type == "mon_map":
481 # get monmap mtime so we can refresh configs when mons change
482 monmap = self.get('mon_map')
adb31ebb
TL
483 self.last_monmap = str_to_datetime(monmap['modified'])
484 if self.last_monmap and self.last_monmap > datetime_now():
f6b5b4d7 485 self.last_monmap = None # just in case clocks are skewed
f91f0fd5
TL
486 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
487 # getattr, due to notify() being called before config_notify()
488 self._kick_serve_loop()
f6b5b4d7
TL
489 if notify_type == "pg_summary":
490 self._trigger_osd_removal()
491
adb31ebb 492 def _trigger_osd_removal(self) -> None:
f6b5b4d7
TL
493 data = self.get("osd_stats")
494 for osd in data.get('osd_stats', []):
495 if osd.get('num_pgs') == 0:
496 # if _ANY_ osd that is currently in the queue appears to be empty,
497 # start the removal process
498 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
499 self.log.debug(f"Found empty osd. Starting removal process")
500 # if the osd that is now empty is also part of the removal queue
501 # start the process
adb31ebb 502 self._kick_serve_loop()
9f95a23c 503
adb31ebb 504 def pause(self) -> None:
9f95a23c
TL
505 if not self.paused:
506 self.log.info('Paused')
507 self.set_store('pause', 'true')
508 self.paused = True
509 # wake loop so we update the health status
510 self._kick_serve_loop()
511
adb31ebb 512 def resume(self) -> None:
9f95a23c
TL
513 if self.paused:
514 self.log.info('Resumed')
515 self.paused = False
516 self.set_store('pause', None)
517 # unconditionally wake loop so that 'orch resume' can be used to kick
518 # cephadm
519 self._kick_serve_loop()
520
521 def get_unique_name(self, daemon_type, host, existing, prefix=None,
522 forcename=None):
523 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
524 """
525 Generate a unique random service name
526 """
527 suffix = daemon_type not in [
801d1391 528 'mon', 'crash', 'nfs',
9f95a23c 529 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
f91f0fd5 530 'container'
9f95a23c
TL
531 ]
532 if forcename:
533 if len([d for d in existing if d.daemon_id == forcename]):
f91f0fd5
TL
534 raise orchestrator.OrchestratorValidationError(
535 f'name {daemon_type}.{forcename} already in use')
9f95a23c
TL
536 return forcename
537
538 if '.' in host:
539 host = host.split('.')[0]
540 while True:
541 if prefix:
542 name = prefix + '.'
543 else:
544 name = ''
545 name += host
546 if suffix:
547 name += '.' + ''.join(random.choice(string.ascii_lowercase)
548 for _ in range(6))
549 if len([d for d in existing if d.daemon_id == name]):
550 if not suffix:
f91f0fd5
TL
551 raise orchestrator.OrchestratorValidationError(
552 f'name {daemon_type}.{name} already in use')
9f95a23c
TL
553 self.log.debug('name %s exists, trying again', name)
554 continue
555 return name
556
adb31ebb 557 def _reconfig_ssh(self) -> None:
9f95a23c
TL
558 temp_files = [] # type: list
559 ssh_options = [] # type: List[str]
560
561 # ssh_config
562 ssh_config_fname = self.ssh_config_file
563 ssh_config = self.get_store("ssh_config")
564 if ssh_config is not None or ssh_config_fname is None:
565 if not ssh_config:
566 ssh_config = DEFAULT_SSH_CONFIG
567 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
568 os.fchmod(f.fileno(), 0o600)
569 f.write(ssh_config.encode('utf-8'))
570 f.flush() # make visible to other processes
571 temp_files += [f]
572 ssh_config_fname = f.name
573 if ssh_config_fname:
801d1391 574 self.validate_ssh_config_fname(ssh_config_fname)
9f95a23c 575 ssh_options += ['-F', ssh_config_fname]
f6b5b4d7 576 self.ssh_config = ssh_config
9f95a23c
TL
577
578 # identity
579 ssh_key = self.get_store("ssh_identity_key")
580 ssh_pub = self.get_store("ssh_identity_pub")
581 self.ssh_pub = ssh_pub
582 self.ssh_key = ssh_key
583 if ssh_key and ssh_pub:
584 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
585 tkey.write(ssh_key.encode('utf-8'))
586 os.fchmod(tkey.fileno(), 0o600)
587 tkey.flush() # make visible to other processes
588 tpub = open(tkey.name + '.pub', 'w')
589 os.fchmod(tpub.fileno(), 0o600)
590 tpub.write(ssh_pub)
591 tpub.flush() # make visible to other processes
592 temp_files += [tkey, tpub]
593 ssh_options += ['-i', tkey.name]
594
595 self._temp_files = temp_files
596 if ssh_options:
597 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
598 else:
599 self._ssh_options = None
600
601 if self.mode == 'root':
f6b5b4d7 602 self.ssh_user = self.get_store('ssh_user', default='root')
9f95a23c
TL
603 elif self.mode == 'cephadm-package':
604 self.ssh_user = 'cephadm'
605
606 self._reset_cons()
607
adb31ebb 608 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
f91f0fd5
TL
609 if ssh_config is None or len(ssh_config.strip()) == 0:
610 raise OrchestratorValidationError('ssh_config cannot be empty')
611 # StrictHostKeyChecking is [yes|no] ?
612 l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
613 if not l:
614 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
615 for s in l:
616 if 'ask' in s.lower():
617 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
618
adb31ebb 619 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
801d1391
TL
620 if not os.path.isfile(ssh_config_fname):
621 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
622 ssh_config_fname))
623
adb31ebb 624 def _reset_con(self, host: str) -> None:
9f95a23c
TL
625 conn, r = self._cons.get(host, (None, None))
626 if conn:
627 self.log.debug('_reset_con close %s' % host)
628 conn.exit()
629 del self._cons[host]
630
adb31ebb 631 def _reset_cons(self) -> None:
9f95a23c
TL
632 for host, conn_and_r in self._cons.items():
633 self.log.debug('_reset_cons close %s' % host)
634 conn, r = conn_and_r
635 conn.exit()
636 self._cons = {}
637
adb31ebb 638 def offline_hosts_remove(self, host: str) -> None:
1911f103
TL
639 if host in self.offline_hosts:
640 self.offline_hosts.remove(host)
641
9f95a23c 642 @staticmethod
adb31ebb 643 def can_run() -> Tuple[bool, str]:
9f95a23c
TL
644 if remoto is not None:
645 return True, ""
646 else:
647 return False, "loading remoto library:{}".format(
f91f0fd5 648 remoto_import_error)
9f95a23c 649
adb31ebb 650 def available(self) -> Tuple[bool, str]:
9f95a23c
TL
651 """
652 The cephadm orchestrator is always available.
653 """
f6b5b4d7
TL
654 ok, err = self.can_run()
655 if not ok:
656 return ok, err
657 if not self.ssh_key or not self.ssh_pub:
658 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
659 return True, ''
9f95a23c 660
adb31ebb 661 def process(self, completions: List[CephadmCompletion]) -> None:
9f95a23c
TL
662 """
663 Does nothing, as completions are processed in another thread.
664 """
665 if completions:
f91f0fd5
TL
666 self.log.debug("process: completions={0}".format(
667 orchestrator.pretty_print(completions)))
9f95a23c
TL
668
669 for p in completions:
e306af50 670 p.evaluate()
9f95a23c
TL
671
672 @orchestrator._cli_write_command(
673 prefix='cephadm set-ssh-config',
674 desc='Set the ssh_config file (use -i <ssh_config>)')
adb31ebb 675 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
9f95a23c
TL
676 """
677 Set an ssh_config file provided from stdin
9f95a23c 678 """
f6b5b4d7
TL
679 if inbuf == self.ssh_config:
680 return 0, "value unchanged", ""
f91f0fd5 681 self.validate_ssh_config_content(inbuf)
9f95a23c
TL
682 self.set_store("ssh_config", inbuf)
683 self.log.info('Set ssh_config')
f6b5b4d7 684 self._reconfig_ssh()
9f95a23c
TL
685 return 0, "", ""
686
687 @orchestrator._cli_write_command(
688 prefix='cephadm clear-ssh-config',
689 desc='Clear the ssh_config file')
adb31ebb 690 def _clear_ssh_config(self) -> Tuple[int, str, str]:
9f95a23c
TL
691 """
692 Clear the ssh_config file provided from stdin
693 """
694 self.set_store("ssh_config", None)
695 self.ssh_config_tmp = None
696 self.log.info('Cleared ssh_config')
f6b5b4d7 697 self._reconfig_ssh()
9f95a23c
TL
698 return 0, "", ""
699
801d1391
TL
700 @orchestrator._cli_read_command(
701 prefix='cephadm get-ssh-config',
702 desc='Returns the ssh config as used by cephadm'
703 )
adb31ebb 704 def _get_ssh_config(self) -> HandleCommandResult:
801d1391
TL
705 if self.ssh_config_file:
706 self.validate_ssh_config_fname(self.ssh_config_file)
707 with open(self.ssh_config_file) as f:
708 return HandleCommandResult(stdout=f.read())
709 ssh_config = self.get_store("ssh_config")
710 if ssh_config:
711 return HandleCommandResult(stdout=ssh_config)
712 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
713
9f95a23c
TL
714 @orchestrator._cli_write_command(
715 'cephadm generate-key',
716 desc='Generate a cluster SSH key (if not present)')
adb31ebb 717 def _generate_key(self) -> Tuple[int, str, str]:
9f95a23c
TL
718 if not self.ssh_pub or not self.ssh_key:
719 self.log.info('Generating ssh key...')
720 tmp_dir = TemporaryDirectory()
721 path = tmp_dir.name + '/key'
722 try:
1911f103 723 subprocess.check_call([
9f95a23c
TL
724 '/usr/bin/ssh-keygen',
725 '-C', 'ceph-%s' % self._cluster_fsid,
726 '-N', '',
727 '-f', path
728 ])
729 with open(path, 'r') as f:
730 secret = f.read()
731 with open(path + '.pub', 'r') as f:
732 pub = f.read()
733 finally:
734 os.unlink(path)
735 os.unlink(path + '.pub')
736 tmp_dir.cleanup()
737 self.set_store('ssh_identity_key', secret)
738 self.set_store('ssh_identity_pub', pub)
739 self._reconfig_ssh()
740 return 0, '', ''
741
e306af50
TL
742 @orchestrator._cli_write_command(
743 'cephadm set-priv-key',
744 desc='Set cluster SSH private key (use -i <private_key>)')
adb31ebb 745 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
e306af50
TL
746 if inbuf is None or len(inbuf) == 0:
747 return -errno.EINVAL, "", "empty private ssh key provided"
f6b5b4d7
TL
748 if inbuf == self.ssh_key:
749 return 0, "value unchanged", ""
e306af50
TL
750 self.set_store("ssh_identity_key", inbuf)
751 self.log.info('Set ssh private key')
752 self._reconfig_ssh()
753 return 0, "", ""
754
755 @orchestrator._cli_write_command(
756 'cephadm set-pub-key',
757 desc='Set cluster SSH public key (use -i <public_key>)')
adb31ebb 758 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
e306af50
TL
759 if inbuf is None or len(inbuf) == 0:
760 return -errno.EINVAL, "", "empty public ssh key provided"
f6b5b4d7
TL
761 if inbuf == self.ssh_pub:
762 return 0, "value unchanged", ""
e306af50
TL
763 self.set_store("ssh_identity_pub", inbuf)
764 self.log.info('Set ssh public key')
765 self._reconfig_ssh()
766 return 0, "", ""
767
9f95a23c
TL
768 @orchestrator._cli_write_command(
769 'cephadm clear-key',
770 desc='Clear cluster SSH key')
adb31ebb 771 def _clear_key(self) -> Tuple[int, str, str]:
9f95a23c
TL
772 self.set_store('ssh_identity_key', None)
773 self.set_store('ssh_identity_pub', None)
774 self._reconfig_ssh()
775 self.log.info('Cleared cluster SSH key')
776 return 0, '', ''
777
778 @orchestrator._cli_read_command(
779 'cephadm get-pub-key',
780 desc='Show SSH public key for connecting to cluster hosts')
adb31ebb 781 def _get_pub_key(self) -> Tuple[int, str, str]:
9f95a23c
TL
782 if self.ssh_pub:
783 return 0, self.ssh_pub, ''
784 else:
785 return -errno.ENOENT, '', 'No cluster SSH key defined'
786
787 @orchestrator._cli_read_command(
788 'cephadm get-user',
789 desc='Show user for SSHing to cluster hosts')
adb31ebb 790 def _get_user(self) -> Tuple[int, str, str]:
9f95a23c
TL
791 return 0, self.ssh_user, ''
792
f6b5b4d7
TL
793 @orchestrator._cli_read_command(
794 'cephadm set-user',
795 'name=user,type=CephString',
796 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
adb31ebb 797 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
f6b5b4d7
TL
798 current_user = self.ssh_user
799 if user == current_user:
800 return 0, "value unchanged", ""
801
802 self.set_store('ssh_user', user)
803 self._reconfig_ssh()
804
805 host = self.cache.get_hosts()[0]
f91f0fd5 806 r = CephadmServe(self)._check_host(host)
f6b5b4d7 807 if r is not None:
f91f0fd5 808 # connection failed reset user
f6b5b4d7
TL
809 self.set_store('ssh_user', current_user)
810 self._reconfig_ssh()
811 return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
812
813 msg = 'ssh user set to %s' % user
814 if user != 'root':
815 msg += ' sudo will be used'
816 self.log.info(msg)
817 return 0, msg, ''
818
819 @orchestrator._cli_read_command(
820 'cephadm registry-login',
821 "name=url,type=CephString,req=false "
822 "name=username,type=CephString,req=false "
823 "name=password,type=CephString,req=false",
824 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
adb31ebb 825 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
f6b5b4d7
TL
826 # if password not given in command line, get it through file input
827 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
828 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
f91f0fd5 829 "or -i <login credentials json file>")
f6b5b4d7 830 elif not (url and username and password):
adb31ebb 831 assert isinstance(inbuf, str)
f6b5b4d7
TL
832 login_info = json.loads(inbuf)
833 if "url" in login_info and "username" in login_info and "password" in login_info:
834 url = login_info["url"]
835 username = login_info["username"]
836 password = login_info["password"]
837 else:
838 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
f91f0fd5
TL
839 "Please setup json file as\n"
840 "{\n"
841 " \"url\": \"REGISTRY_URL\",\n"
842 " \"username\": \"REGISTRY_USERNAME\",\n"
843 " \"password\": \"REGISTRY_PASSWORD\"\n"
844 "}\n")
f6b5b4d7
TL
845 # verify login info works by attempting login on random host
846 host = None
847 for host_name in self.inventory.keys():
848 host = host_name
849 break
850 if not host:
851 raise OrchestratorError('no hosts defined')
852 r = self._registry_login(host, url, username, password)
853 if r is not None:
854 return 1, '', r
855 # if logins succeeded, store info
856 self.log.debug("Host logins successful. Storing login info.")
857 self.set_module_option('registry_url', url)
858 self.set_module_option('registry_username', username)
859 self.set_module_option('registry_password', password)
860 # distribute new login info to all hosts
861 self.cache.distribute_new_registry_login_info()
862 return 0, "registry login scheduled", ''
863
9f95a23c
TL
864 @orchestrator._cli_read_command(
865 'cephadm check-host',
866 'name=host,type=CephString '
867 'name=addr,type=CephString,req=false',
868 'Check whether we can access and manage a remote host')
adb31ebb 869 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
f6b5b4d7
TL
870 try:
871 out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
872 ['--expect-hostname', host],
873 addr=addr,
874 error_ok=True, no_fsid=True)
875 if code:
876 return 1, '', ('check-host failed:\n' + '\n'.join(err))
877 except OrchestratorError as e:
878 self.log.exception(f"check-host failed for '{host}'")
879 return 1, '', ('check-host failed:\n' +
f91f0fd5 880 f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
9f95a23c
TL
881 # if we have an outstanding health alert for this host, give the
882 # serve thread a kick
883 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
884 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
885 if item.startswith('host %s ' % host):
886 self.event.set()
adb31ebb 887 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
9f95a23c
TL
888
889 @orchestrator._cli_read_command(
890 'cephadm prepare-host',
891 'name=host,type=CephString '
892 'name=addr,type=CephString,req=false',
893 'Prepare a remote host for use with cephadm')
adb31ebb 894 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
f6b5b4d7 895 out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
9f95a23c
TL
896 ['--expect-hostname', host],
897 addr=addr,
898 error_ok=True, no_fsid=True)
899 if code:
900 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
901 # if we have an outstanding health alert for this host, give the
902 # serve thread a kick
903 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
904 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
905 if item.startswith('host %s ' % host):
906 self.event.set()
adb31ebb 907 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
9f95a23c 908
f91f0fd5
TL
909 @orchestrator._cli_write_command(
910 prefix='cephadm set-extra-ceph-conf',
911 desc="Text that is appended to all daemon's ceph.conf.\n"
912 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
913 "a complete ceph.conf.\n\n"
914 "Warning: this is a dangerous operation.")
adb31ebb 915 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
f91f0fd5
TL
916 if inbuf:
917 # sanity check.
918 cp = ConfigParser()
919 cp.read_string(inbuf, source='<infile>')
920
921 self.set_store("extra_ceph_conf", json.dumps({
922 'conf': inbuf,
adb31ebb 923 'last_modified': datetime_to_str(datetime_now())
f91f0fd5
TL
924 }))
925 self.log.info('Set extra_ceph_conf')
926 self._kick_serve_loop()
927 return HandleCommandResult()
928
929 @orchestrator._cli_read_command(
930 'cephadm get-extra-ceph-conf',
931 desc='Get extra ceph conf that is appended')
932 def _get_extra_ceph_conf(self) -> HandleCommandResult:
933 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
934
935 class ExtraCephConf(NamedTuple):
936 conf: str
937 last_modified: Optional[datetime.datetime]
938
939 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
940 data = self.get_store('extra_ceph_conf')
941 if not data:
942 return CephadmOrchestrator.ExtraCephConf('', None)
943 try:
944 j = json.loads(data)
945 except ValueError:
adb31ebb
TL
946 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
947 self.log.exception('%s: \'%s\'', msg, data)
f91f0fd5
TL
948 return CephadmOrchestrator.ExtraCephConf('', None)
949 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
950
951 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
952 conf = self.extra_ceph_conf()
953 if not conf.last_modified:
954 return False
955 return conf.last_modified > dt
956
adb31ebb
TL
957 def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
958 'remoto.backends.LegacyModuleExecute']:
9f95a23c
TL
959 """
960 Setup a connection for running commands on remote host.
961 """
e306af50
TL
962 conn, r = self._cons.get(host, (None, None))
963 if conn:
964 if conn.has_connection():
965 self.log.debug('Have connection to %s' % host)
966 return conn, r
967 else:
968 self._reset_con(host)
9f95a23c
TL
969 n = self.ssh_user + '@' + host
970 self.log.debug("Opening connection to {} with ssh options '{}'".format(
971 n, self._ssh_options))
f91f0fd5 972 child_logger = self.log.getChild(n)
9f95a23c
TL
973 child_logger.setLevel('WARNING')
974 conn = remoto.Connection(
975 n,
976 logger=child_logger,
f6b5b4d7
TL
977 ssh_options=self._ssh_options,
978 sudo=True if self.ssh_user != 'root' else False)
9f95a23c
TL
979
980 r = conn.import_module(remotes)
981 self._cons[host] = conn, r
982
983 return conn, r
984
adb31ebb 985 def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
9f95a23c
TL
986 """
987 Remote validator that accepts a connection object to ensure that a certain
988 executable is available returning its full path if so.
989
990 Otherwise an exception with thorough details will be raised, informing the
991 user that the executable was not found.
992 """
993 executable_path = conn.remote_module.which(executable)
994 if not executable_path:
995 raise RuntimeError("Executable '{}' not found on host '{}'".format(
996 executable, conn.hostname))
997 self.log.debug("Found executable '{}' at path '{}'".format(executable,
f91f0fd5 998 executable_path))
9f95a23c
TL
999 return executable_path
1000
f6b5b4d7
TL
1001 @contextmanager
1002 def _remote_connection(self,
1003 host: str,
f91f0fd5 1004 addr: Optional[str] = None,
f6b5b4d7 1005 ) -> Iterator[Tuple["BaseConnection", Any]]:
9f95a23c 1006 if not addr and host in self.inventory:
e306af50 1007 addr = self.inventory.get_addr(host)
9f95a23c 1008
1911f103
TL
1009 self.offline_hosts_remove(host)
1010
9f95a23c 1011 try:
1911f103 1012 try:
f6b5b4d7
TL
1013 if not addr:
1014 raise OrchestratorError("host address is empty")
1911f103 1015 conn, connr = self._get_connection(addr)
e306af50 1016 except OSError as e:
f6b5b4d7
TL
1017 self._reset_con(host)
1018 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1019 raise execnet.gateway_bootstrap.HostNotFound(msg)
9f95a23c 1020
f6b5b4d7
TL
1021 yield (conn, connr)
1022
1023 except execnet.gateway_bootstrap.HostNotFound as e:
1024 # this is a misleading exception as it seems to be thrown for
1025 # any sort of connection failure, even those having nothing to
1026 # do with "host not found" (e.g., ssh key permission denied).
1027 self.offline_hosts.add(host)
1028 self._reset_con(host)
1029
1030 user = self.ssh_user if self.mode == 'root' else 'cephadm'
adb31ebb
TL
1031 if str(e).startswith("Can't communicate"):
1032 msg = str(e)
1033 else:
1034 msg = f'''Failed to connect to {host} ({addr}).
f91f0fd5
TL
1035Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1036
1037To add the cephadm SSH key to the host:
1038> ceph cephadm get-pub-key > ~/ceph.pub
1039> ssh-copy-id -f -i ~/ceph.pub {user}@{host}
f6b5b4d7 1040
f91f0fd5 1041To check that the host is reachable:
f6b5b4d7 1042> ceph cephadm get-ssh-config > ssh_config
f91f0fd5
TL
1043> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1044> ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
f6b5b4d7
TL
1045 raise OrchestratorError(msg) from e
1046 except Exception as ex:
1047 self.log.exception(ex)
1048 raise
1049
f91f0fd5 1050 def _get_container_image(self, daemon_name: str) -> Optional[str]:
f6b5b4d7
TL
1051 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1052 if daemon_type in CEPH_TYPES or \
1053 daemon_type == 'nfs' or \
1054 daemon_type == 'iscsi':
1055 # get container image
1056 ret, image, err = self.check_mon_command({
1057 'prefix': 'config get',
1058 'who': utils.name_to_config_section(daemon_name),
1059 'key': 'container_image',
1060 })
1061 image = image.strip() # type: ignore
1062 elif daemon_type == 'prometheus':
1063 image = self.container_image_prometheus
1064 elif daemon_type == 'grafana':
1065 image = self.container_image_grafana
1066 elif daemon_type == 'alertmanager':
1067 image = self.container_image_alertmanager
1068 elif daemon_type == 'node-exporter':
1069 image = self.container_image_node_exporter
f91f0fd5
TL
1070 elif daemon_type == CustomContainerService.TYPE:
1071 # The image can't be resolved, the necessary information
1072 # is only available when a container is deployed (given
1073 # via spec).
1074 image = None
f6b5b4d7
TL
1075 else:
1076 assert False, daemon_type
1077
1078 self.log.debug('%s container image %s' % (daemon_name, image))
1079
1080 return image
1081
1082 def _run_cephadm(self,
1083 host: str,
1084 entity: Union[CephadmNoImage, str],
1085 command: str,
1086 args: List[str],
1087 addr: Optional[str] = "",
1088 stdin: Optional[str] = "",
1089 no_fsid: Optional[bool] = False,
1090 error_ok: Optional[bool] = False,
1091 image: Optional[str] = "",
f91f0fd5 1092 env_vars: Optional[List[str]] = None,
f6b5b4d7
TL
1093 ) -> Tuple[List[str], List[str], int]:
1094 """
1095 Run cephadm on the remote host with the given command + args
1096
1097 :env_vars: in format -> [KEY=VALUE, ..]
1098 """
1099 with self._remote_connection(host, addr) as tpl:
1100 conn, connr = tpl
9f95a23c 1101 assert image or entity
f6b5b4d7
TL
1102 if not image and entity is not cephadmNoImage:
1103 image = self._get_container_image(entity)
9f95a23c
TL
1104
1105 final_args = []
e306af50
TL
1106
1107 if env_vars:
1108 for env_var_pair in env_vars:
1109 final_args.extend(['--env', env_var_pair])
1110
9f95a23c
TL
1111 if image:
1112 final_args.extend(['--image', image])
1113 final_args.append(command)
1114
1115 if not no_fsid:
1116 final_args += ['--fsid', self._cluster_fsid]
f91f0fd5
TL
1117
1118 if self.container_init:
1119 final_args += ['--container-init']
1120
9f95a23c
TL
1121 final_args += args
1122
e306af50 1123 self.log.debug('args: %s' % (' '.join(final_args)))
9f95a23c 1124 if self.mode == 'root':
9f95a23c
TL
1125 if stdin:
1126 self.log.debug('stdin: %s' % stdin)
1127 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1128 if stdin:
1129 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1130 script += self._cephadm
1131 python = connr.choose_python()
1132 if not python:
1133 raise RuntimeError(
1134 'unable to find python on %s (tried %s in %s)' % (
1135 host, remotes.PYTHONS, remotes.PATH))
1136 try:
1137 out, err, code = remoto.process.check(
f91f0fd5
TL
1138 conn,
1139 [python, '-u'],
1140 stdin=script.encode('utf-8'))
9f95a23c
TL
1141 except RuntimeError as e:
1142 self._reset_con(host)
1143 if error_ok:
1144 return [], [str(e)], 1
1145 raise
1146 elif self.mode == 'cephadm-package':
1147 try:
1148 out, err, code = remoto.process.check(
1149 conn,
1150 ['sudo', '/usr/bin/cephadm'] + final_args,
1151 stdin=stdin)
1152 except RuntimeError as e:
1153 self._reset_con(host)
1154 if error_ok:
1155 return [], [str(e)], 1
1156 raise
1157 else:
1158 assert False, 'unsupported mode'
1159
1160 self.log.debug('code: %d' % code)
1161 if out:
1162 self.log.debug('out: %s' % '\n'.join(out))
1163 if err:
1164 self.log.debug('err: %s' % '\n'.join(err))
1165 if code and not error_ok:
f6b5b4d7 1166 raise OrchestratorError(
9f95a23c
TL
1167 'cephadm exited with an error code: %d, stderr:%s' % (
1168 code, '\n'.join(err)))
1169 return out, err, code
1170
f91f0fd5
TL
1171 def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
1172 """
1173 Returns all hosts that went through _refresh_host_daemons().
9f95a23c 1174
f91f0fd5
TL
1175 This mitigates a potential race, where new host was added *after*
1176 ``_refresh_host_daemons()`` was called, but *before*
1177 ``_apply_all_specs()`` was called. thus we end up with a hosts
1178 where daemons might be running, but we have not yet detected them.
1179 """
1180 return [
1181 h for h in self.inventory.all_specs()
1182 if self.cache.host_had_daemon_refresh(h.hostname)
1183 ]
9f95a23c 1184
e306af50 1185 def _add_host(self, spec):
9f95a23c
TL
1186 # type: (HostSpec) -> str
1187 """
1188 Add a host to be managed by the orchestrator.
1189
1190 :param host: host name
1191 """
1192 assert_valid_host(spec.hostname)
f6b5b4d7 1193 out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
9f95a23c
TL
1194 ['--expect-hostname', spec.hostname],
1195 addr=spec.addr,
1196 error_ok=True, no_fsid=True)
1197 if code:
1198 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1199 spec.hostname, spec.addr, err))
1200
e306af50 1201 self.inventory.add_host(spec)
9f95a23c 1202 self.cache.prime_empty_host(spec.hostname)
1911f103 1203 self.offline_hosts_remove(spec.hostname)
9f95a23c
TL
1204 self.event.set() # refresh stray health check
1205 self.log.info('Added host %s' % spec.hostname)
1206 return "Added host '{}'".format(spec.hostname)
1207
e306af50
TL
1208 @trivial_completion
1209 def add_host(self, spec: HostSpec) -> str:
1210 return self._add_host(spec)
1211
1212 @trivial_completion
9f95a23c
TL
1213 def remove_host(self, host):
1214 # type: (str) -> str
1215 """
1216 Remove a host from orchestrator management.
1217
1218 :param host: host name
1219 """
e306af50 1220 self.inventory.rm_host(host)
9f95a23c
TL
1221 self.cache.rm_host(host)
1222 self._reset_con(host)
1223 self.event.set() # refresh stray health check
1224 self.log.info('Removed host %s' % host)
1225 return "Removed host '{}'".format(host)
1226
e306af50 1227 @trivial_completion
adb31ebb 1228 def update_host_addr(self, host: str, addr: str) -> str:
e306af50 1229 self.inventory.set_addr(host, addr)
9f95a23c
TL
1230 self._reset_con(host)
1231 self.event.set() # refresh stray health check
1232 self.log.info('Set host %s addr to %s' % (host, addr))
1233 return "Updated host '{}' addr to '{}'".format(host, addr)
1234
1235 @trivial_completion
1236 def get_hosts(self):
1237 # type: () -> List[orchestrator.HostSpec]
1238 """
1239 Return a list of hosts managed by the orchestrator.
1240
1241 Notes:
1242 - skip async: manager reads from cache.
1243 """
e306af50 1244 return list(self.inventory.all_specs())
9f95a23c 1245
e306af50 1246 @trivial_completion
adb31ebb 1247 def add_host_label(self, host: str, label: str) -> str:
e306af50 1248 self.inventory.add_label(host, label)
9f95a23c
TL
1249 self.log.info('Added label %s to host %s' % (label, host))
1250 return 'Added label %s to host %s' % (label, host)
1251
e306af50 1252 @trivial_completion
adb31ebb 1253 def remove_host_label(self, host: str, label: str) -> str:
e306af50 1254 self.inventory.rm_label(host, label)
9f95a23c
TL
1255 self.log.info('Removed label %s to host %s' % (label, host))
1256 return 'Removed label %s from host %s' % (label, host)
1257
f6b5b4d7 1258 @trivial_completion
adb31ebb 1259 def host_ok_to_stop(self, hostname: str) -> str:
f6b5b4d7
TL
1260 if hostname not in self.cache.get_hosts():
1261 raise OrchestratorError(f'Cannot find host "{hostname}"')
1262
1263 daemons = self.cache.get_daemons()
1264 daemon_map = defaultdict(lambda: [])
1265 for dd in daemons:
1266 if dd.hostname == hostname:
1267 daemon_map[dd.daemon_type].append(dd.daemon_id)
1268
f91f0fd5 1269 for daemon_type, daemon_ids in daemon_map.items():
f6b5b4d7
TL
1270 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
1271 if r.retval:
1272 self.log.error(f'It is NOT safe to stop host {hostname}')
1273 raise orchestrator.OrchestratorError(
f91f0fd5
TL
1274 r.stderr,
1275 errno=r.retval)
f6b5b4d7
TL
1276
1277 msg = f'It is presumed safe to stop host {hostname}'
1278 self.log.info(msg)
1279 return msg
1280
f91f0fd5
TL
1281 def get_minimal_ceph_conf(self) -> str:
1282 _, config, _ = self.check_mon_command({
f6b5b4d7
TL
1283 "prefix": "config generate-minimal-conf",
1284 })
f91f0fd5
TL
1285 extra = self.extra_ceph_conf().conf
1286 if extra:
1287 config += '\n\n' + extra.strip() + '\n'
1288 return config
f6b5b4d7 1289
adb31ebb 1290 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
f6b5b4d7
TL
1291 if filter_host:
1292 self.cache.invalidate_host_daemons(filter_host)
1293 else:
1294 for h in self.cache.get_hosts():
1295 # Also discover daemons deployed manually
1296 self.cache.invalidate_host_daemons(h)
1297
1298 self._kick_serve_loop()
1299
9f95a23c 1300 @trivial_completion
f6b5b4d7
TL
1301 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1302 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
9f95a23c 1303 if refresh:
f6b5b4d7
TL
1304 self._invalidate_daemons_and_kick_serve()
1305 self.log.info('Kicked serve() loop to refresh all services')
1306
9f95a23c 1307 # <service_map>
f6b5b4d7 1308 sm: Dict[str, orchestrator.ServiceDescription] = {}
e306af50 1309 osd_count = 0
1911f103 1310 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c
TL
1311 for name, dd in dm.items():
1312 if service_type and service_type != dd.daemon_type:
1313 continue
1314 n: str = dd.service_name()
1315 if service_name and service_name != n:
1316 continue
1317 if dd.daemon_type == 'osd':
e306af50
TL
1318 """
1319 OSDs do not know the affinity to their spec out of the box.
1320 """
1321 n = f"osd.{dd.osdspec_affinity}"
f6b5b4d7
TL
1322 if not dd.osdspec_affinity:
1323 # If there is no osdspec_affinity, the spec should suffice for displaying
1324 continue
e306af50
TL
1325 if n in self.spec_store.specs:
1326 spec = self.spec_store.specs[n]
1911f103
TL
1327 else:
1328 spec = ServiceSpec(
1329 unmanaged=True,
1330 service_type=dd.daemon_type,
1331 service_id=dd.service_id(),
1332 placement=PlacementSpec(
1333 hosts=[dd.hostname]
1334 )
1335 )
9f95a23c
TL
1336 if n not in sm:
1337 sm[n] = orchestrator.ServiceDescription(
9f95a23c
TL
1338 last_refresh=dd.last_refresh,
1339 container_image_id=dd.container_image_id,
1340 container_image_name=dd.container_image_name,
1341 spec=spec,
f6b5b4d7 1342 events=self.events.get_for_service(spec.service_name()),
9f95a23c 1343 )
e306af50
TL
1344 if n in self.spec_store.specs:
1345 if dd.daemon_type == 'osd':
1346 """
1347 The osd count can't be determined by the Placement spec.
f6b5b4d7 1348 Showing an actual/expected representation cannot be determined
e306af50
TL
1349 here. So we're setting running = size for now.
1350 """
1351 osd_count += 1
1352 sm[n].size = osd_count
1353 else:
f6b5b4d7
TL
1354 sm[n].size = spec.placement.get_host_selection_size(
1355 self.inventory.all_specs())
e306af50
TL
1356
1357 sm[n].created = self.spec_store.spec_created[n]
1911f103
TL
1358 if service_type == 'nfs':
1359 spec = cast(NFSServiceSpec, spec)
1360 sm[n].rados_config_location = spec.rados_config_location()
9f95a23c
TL
1361 else:
1362 sm[n].size = 0
1363 if dd.status == 1:
1364 sm[n].running += 1
1365 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1366 sm[n].last_refresh = dd.last_refresh
1367 if sm[n].container_image_id != dd.container_image_id:
1368 sm[n].container_image_id = 'mix'
1369 if sm[n].container_image_name != dd.container_image_name:
1370 sm[n].container_image_name = 'mix'
1371 for n, spec in self.spec_store.specs.items():
1372 if n in sm:
1373 continue
1374 if service_type is not None and service_type != spec.service_type:
1375 continue
1376 if service_name is not None and service_name != n:
1377 continue
1378 sm[n] = orchestrator.ServiceDescription(
9f95a23c 1379 spec=spec,
f6b5b4d7 1380 size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
9f95a23c 1381 running=0,
f6b5b4d7 1382 events=self.events.get_for_service(spec.service_name()),
9f95a23c 1383 )
1911f103
TL
1384 if service_type == 'nfs':
1385 spec = cast(NFSServiceSpec, spec)
1386 sm[n].rados_config_location = spec.rados_config_location()
1387 return list(sm.values())
9f95a23c
TL
1388
1389 @trivial_completion
f6b5b4d7
TL
1390 def list_daemons(self,
1391 service_name: Optional[str] = None,
1392 daemon_type: Optional[str] = None,
1393 daemon_id: Optional[str] = None,
1394 host: Optional[str] = None,
1395 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
9f95a23c 1396 if refresh:
f6b5b4d7
TL
1397 self._invalidate_daemons_and_kick_serve(host)
1398 self.log.info('Kicked serve() loop to refresh all daemons')
1399
9f95a23c 1400 result = []
1911f103 1401 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c
TL
1402 if host and h != host:
1403 continue
1404 for name, dd in dm.items():
801d1391
TL
1405 if daemon_type is not None and daemon_type != dd.daemon_type:
1406 continue
1407 if daemon_id is not None and daemon_id != dd.daemon_id:
9f95a23c 1408 continue
801d1391 1409 if service_name is not None and service_name != dd.service_name():
9f95a23c
TL
1410 continue
1411 result.append(dd)
1412 return result
1413
e306af50 1414 @trivial_completion
adb31ebb
TL
1415 def service_action(self, action: str, service_name: str) -> List[str]:
1416 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
9f95a23c 1417 self.log.info('%s service %s' % (action.capitalize(), service_name))
adb31ebb
TL
1418 return [
1419 self._schedule_daemon_action(dd.name(), action)
1420 for dd in dds
1421 ]
f6b5b4d7 1422
adb31ebb 1423 def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str:
f6b5b4d7
TL
1424 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
1425 host=host,
1426 daemon_id=daemon_id,
1427 daemon_type=daemon_type,
1428 )
1429
f91f0fd5 1430 self._daemon_action_set_image(action, image, daemon_type, daemon_id)
9f95a23c 1431
9f95a23c 1432 if action == 'redeploy':
f91f0fd5
TL
1433 if self.daemon_is_self(daemon_type, daemon_id):
1434 self.mgr_service.fail_over()
1435 return '' # unreachable
9f95a23c 1436 # stop, recreate the container+unit, then restart
f6b5b4d7 1437 return self._create_daemon(daemon_spec)
9f95a23c 1438 elif action == 'reconfig':
f6b5b4d7 1439 return self._create_daemon(daemon_spec, reconfig=True)
9f95a23c
TL
1440
1441 actions = {
1442 'start': ['reset-failed', 'start'],
1443 'stop': ['stop'],
1444 'restart': ['reset-failed', 'restart'],
1445 }
f6b5b4d7 1446 name = daemon_spec.name()
9f95a23c 1447 for a in actions[action]:
f6b5b4d7
TL
1448 try:
1449 out, err, code = self._run_cephadm(
1450 host, name, 'unit',
1451 ['--name', name, a])
1452 except Exception:
1453 self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
1454 self.cache.invalidate_host_daemons(daemon_spec.host)
1455 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1456 self.events.for_daemon(name, 'INFO', msg)
1457 return msg
9f95a23c 1458
adb31ebb 1459 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
f91f0fd5
TL
1460 if image is not None:
1461 if action != 'redeploy':
1462 raise OrchestratorError(
1463 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1464 if daemon_type not in CEPH_TYPES:
1465 raise OrchestratorError(
1466 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1467 f'types are: {", ".join(CEPH_TYPES)}')
1468
1469 self.check_mon_command({
1470 'prefix': 'config set',
1471 'name': 'container_image',
1472 'value': image,
1473 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1474 })
1475
e306af50 1476 @trivial_completion
f91f0fd5 1477 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
f6b5b4d7
TL
1478 d = self.cache.get_daemon(daemon_name)
1479
f91f0fd5
TL
1480 if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
1481 and not self.mgr_service.mgr_map_has_standby():
1482 raise OrchestratorError(
1483 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1484
1485 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
1486
1487 self.log.info(f'Schedule {action} daemon {daemon_name}')
1488 return self._schedule_daemon_action(daemon_name, action)
1489
1490 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
1491 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
1492
adb31ebb 1493 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
f91f0fd5
TL
1494 dd = self.cache.get_daemon(daemon_name)
1495 if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
1496 and not self.mgr_service.mgr_map_has_standby():
1497 raise OrchestratorError(
1498 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1499 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
1500 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
1501 self._kick_serve_loop()
1502 return msg
9f95a23c 1503
e306af50 1504 @trivial_completion
9f95a23c 1505 def remove_daemons(self, names):
e306af50 1506 # type: (List[str]) -> List[str]
9f95a23c
TL
1507 args = []
1508 for host, dm in self.cache.daemons.items():
1509 for name in names:
1510 if name in dm:
1511 args.append((name, host))
1512 if not args:
1513 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1514 self.log.info('Remove daemons %s' % [a[0] for a in args])
1515 return self._remove_daemons(args)
1516
1517 @trivial_completion
adb31ebb 1518 def remove_service(self, service_name: str) -> str:
9f95a23c 1519 self.log.info('Remove service %s' % service_name)
e306af50 1520 self._trigger_preview_refresh(service_name=service_name)
1911f103
TL
1521 found = self.spec_store.rm(service_name)
1522 if found:
1523 self._kick_serve_loop()
f6b5b4d7 1524 return 'Removed service %s' % service_name
1911f103
TL
1525 else:
1526 # must be idempotent: still a success.
f6b5b4d7 1527 return f'Failed to remove service. <{service_name}> was not found.'
9f95a23c
TL
1528
1529 @trivial_completion
adb31ebb 1530 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
9f95a23c
TL
1531 """
1532 Return the storage inventory of hosts matching the given filter.
1533
1534 :param host_filter: host filter
1535
1536 TODO:
1537 - add filtering by label
1538 """
1539 if refresh:
f6b5b4d7
TL
1540 if host_filter and host_filter.hosts:
1541 for h in host_filter.hosts:
1542 self.cache.invalidate_host_devices(h)
9f95a23c 1543 else:
f6b5b4d7
TL
1544 for h in self.cache.get_hosts():
1545 self.cache.invalidate_host_devices(h)
1546
1547 self.event.set()
1548 self.log.info('Kicked serve() loop to refresh devices')
9f95a23c
TL
1549
1550 result = []
1551 for host, dls in self.cache.devices.items():
f6b5b4d7 1552 if host_filter and host_filter.hosts and host not in host_filter.hosts:
9f95a23c
TL
1553 continue
1554 result.append(orchestrator.InventoryHost(host,
1555 inventory.Devices(dls)))
1556 return result
1557
1558 @trivial_completion
adb31ebb 1559 def zap_device(self, host: str, path: str) -> str:
9f95a23c
TL
1560 self.log.info('Zap device %s:%s' % (host, path))
1561 out, err, code = self._run_cephadm(
1562 host, 'osd', 'ceph-volume',
1563 ['--', 'lvm', 'zap', '--destroy', path],
1564 error_ok=True)
1565 self.cache.invalidate_host_devices(host)
1566 if code:
1567 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1568 return '\n'.join(out + err)
1569
e306af50 1570 @trivial_completion
f91f0fd5
TL
1571 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
1572 """
1573 Blink a device light. Calling something like::
1574
1575 lsmcli local-disk-ident-led-on --path $path
1576
1577 If you must, you can customize this via::
1578
1579 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1580 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1581
1582 See templates/blink_device_light_cmd.j2
1583 """
e306af50 1584 @forall_hosts
adb31ebb 1585 def blink(host: str, dev: str, path: str) -> str:
f91f0fd5
TL
1586 cmd_line = self.template.render('blink_device_light_cmd.j2',
1587 {
1588 'on': on,
1589 'ident_fault': ident_fault,
1590 'dev': dev,
1591 'path': path
1592 },
1593 host=host)
1594 cmd_args = shlex.split(cmd_line)
1595
9f95a23c 1596 out, err, code = self._run_cephadm(
f91f0fd5 1597 host, 'osd', 'shell', ['--'] + cmd_args,
9f95a23c
TL
1598 error_ok=True)
1599 if code:
f6b5b4d7 1600 raise OrchestratorError(
9f95a23c 1601 'Unable to affect %s light for %s:%s. Command: %s' % (
f91f0fd5 1602 ident_fault, host, dev, ' '.join(cmd_args)))
9f95a23c
TL
1603 self.log.info('Set %s light for %s:%s %s' % (
1604 ident_fault, host, dev, 'on' if on else 'off'))
1605 return "Set %s light for %s:%s %s" % (
1606 ident_fault, host, dev, 'on' if on else 'off')
1607
1608 return blink(locs)
1609
1610 def get_osd_uuid_map(self, only_up=False):
1911f103 1611 # type: (bool) -> Dict[str, str]
9f95a23c
TL
1612 osd_map = self.get('osd_map')
1613 r = {}
1614 for o in osd_map['osds']:
1615 # only include OSDs that have ever started in this map. this way
1616 # an interrupted osd create can be repeated and succeed the second
1617 # time around.
1911f103
TL
1618 osd_id = o.get('osd')
1619 if osd_id is None:
1620 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1621 if not only_up or (o['up_from'] > 0):
1622 r[str(osd_id)] = o.get('uuid', '')
9f95a23c
TL
1623 return r
1624
e306af50
TL
1625 def _trigger_preview_refresh(self,
1626 specs: Optional[List[DriveGroupSpec]] = None,
f6b5b4d7
TL
1627 service_name: Optional[str] = None,
1628 ) -> None:
1629 # Only trigger a refresh when a spec has changed
1630 trigger_specs = []
1631 if specs:
1632 for spec in specs:
1633 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
1634 # the to-be-preview spec != the actual spec, this means we need to
1635 # trigger a refresh, if the spec has been removed (==None) we need to
1636 # refresh as well.
1637 if not preview_spec or spec != preview_spec:
1638 trigger_specs.append(spec)
1639 if service_name:
1640 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
1641 if not any(trigger_specs):
1642 return None
1643
1644 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
e306af50
TL
1645 for host in refresh_hosts:
1646 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1647 self.cache.osdspec_previews_refresh_queue.append(host)
1648
9f95a23c 1649 @trivial_completion
f6b5b4d7
TL
1650 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
1651 """
1652 Deprecated. Please use `apply()` instead.
1653
1654 Keeping this around to be compapatible to mgr/dashboard
1655 """
9f95a23c
TL
1656 return [self._apply(spec) for spec in specs]
1657
1658 @trivial_completion
f6b5b4d7
TL
1659 def create_osds(self, drive_group: DriveGroupSpec) -> str:
1660 return self.osd_service.create_from_spec(drive_group)
9f95a23c 1661
f6b5b4d7
TL
1662 def _preview_osdspecs(self,
1663 osdspecs: Optional[List[DriveGroupSpec]] = None
adb31ebb 1664 ) -> dict:
f6b5b4d7
TL
1665 if not osdspecs:
1666 return {'n/a': [{'error': True,
1667 'message': 'No OSDSpec or matching hosts found.'}]}
1668 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
e306af50
TL
1669 if not matching_hosts:
1670 return {'n/a': [{'error': True,
1671 'message': 'No OSDSpec or matching hosts found.'}]}
adb31ebb 1672 # Is any host still loading previews or still in the queue to be previewed
e306af50 1673 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
adb31ebb 1674 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
e306af50
TL
1675 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1676 return {'n/a': [{'error': True,
1677 'message': 'Preview data is being generated.. '
f6b5b4d7
TL
1678 'Please re-run this command in a bit.'}]}
1679 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1680 previews_for_specs = {}
1681 for host, raw_reports in self.cache.osdspec_previews.items():
1682 if host not in matching_hosts:
1683 continue
1684 osd_reports = []
1685 for osd_report in raw_reports:
1686 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
1687 osd_reports.append(osd_report)
1688 previews_for_specs.update({host: osd_reports})
1689 return previews_for_specs
9f95a23c 1690
adb31ebb 1691 def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]:
9f95a23c
TL
1692 need = {
1693 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1694 'grafana': ['prometheus'],
801d1391 1695 'alertmanager': ['mgr', 'alertmanager'],
9f95a23c
TL
1696 }
1697 deps = []
1698 for dep_type in need.get(daemon_type, []):
1699 for dd in self.cache.get_daemons_by_service(dep_type):
1700 deps.append(dd.name())
1701 return sorted(deps)
1702
e306af50 1703 def _create_daemon(self,
f6b5b4d7 1704 daemon_spec: CephadmDaemonSpec,
adb31ebb 1705 reconfig: bool = False,
e306af50 1706 osd_uuid_map: Optional[Dict[str, Any]] = None,
e306af50
TL
1707 ) -> str:
1708
f6b5b4d7
TL
1709 with set_exception_subject('service', orchestrator.DaemonDescription(
1710 daemon_type=daemon_spec.daemon_type,
1711 daemon_id=daemon_spec.daemon_id,
1712 hostname=daemon_spec.host,
1713 ).service_id(), overwrite=True):
1714
f91f0fd5 1715 image = ''
adb31ebb 1716 start_time = datetime_now()
f91f0fd5
TL
1717 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1718
1719 if daemon_spec.daemon_type == 'container':
1720 spec: Optional[CustomContainerSpec] = daemon_spec.spec
1721 if spec is None:
1722 # Exit here immediately because the required service
1723 # spec to create a daemon is not provided. This is only
1724 # provided when a service is applied via 'orch apply'
1725 # command.
1726 msg = "Failed to {} daemon {} on {}: Required " \
1727 "service specification not provided".format(
1728 'reconfigure' if reconfig else 'deploy',
1729 daemon_spec.name(), daemon_spec.host)
1730 self.log.info(msg)
1731 return msg
1732 image = spec.image
1733 if spec.ports:
1734 ports.extend(spec.ports)
1735
1736 cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
1737 daemon_spec)
f6b5b4d7
TL
1738
1739 # TCP port to open in the host firewall
f91f0fd5
TL
1740 if len(ports) > 0:
1741 daemon_spec.extra_args.extend([
1742 '--tcp-ports', ' '.join(map(str, ports))
1743 ])
9f95a23c
TL
1744
1745 # osd deployments needs an --osd-uuid arg
f6b5b4d7 1746 if daemon_spec.daemon_type == 'osd':
9f95a23c
TL
1747 if not osd_uuid_map:
1748 osd_uuid_map = self.get_osd_uuid_map()
f6b5b4d7 1749 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
9f95a23c 1750 if not osd_uuid:
f6b5b4d7
TL
1751 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1752 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
9f95a23c 1753
f6b5b4d7
TL
1754 if reconfig:
1755 daemon_spec.extra_args.append('--reconfig')
1756 if self.allow_ptrace:
1757 daemon_spec.extra_args.append('--allow-ptrace')
9f95a23c 1758
f6b5b4d7 1759 if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
f91f0fd5
TL
1760 self._registry_login(daemon_spec.host, self.registry_url,
1761 self.registry_username, self.registry_password)
1762
1763 daemon_spec.extra_args.extend(['--config-json', '-'])
9f95a23c 1764
f6b5b4d7
TL
1765 self.log.info('%s daemon %s on %s' % (
1766 'Reconfiguring' if reconfig else 'Deploying',
1767 daemon_spec.name(), daemon_spec.host))
1768
1769 out, err, code = self._run_cephadm(
1770 daemon_spec.host, daemon_spec.name(), 'deploy',
1771 [
1772 '--name', daemon_spec.name(),
1773 ] + daemon_spec.extra_args,
f91f0fd5
TL
1774 stdin=json.dumps(cephadm_config),
1775 image=image)
f6b5b4d7
TL
1776 if not code and daemon_spec.host in self.cache.daemons:
1777 # prime cached service state with what we (should have)
1778 # just created
1779 sd = orchestrator.DaemonDescription()
1780 sd.daemon_type = daemon_spec.daemon_type
1781 sd.daemon_id = daemon_spec.daemon_id
1782 sd.hostname = daemon_spec.host
1783 sd.status = 1
1784 sd.status_desc = 'starting'
1785 self.cache.add_daemon(daemon_spec.host, sd)
f91f0fd5 1786 if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
f6b5b4d7
TL
1787 self.requires_post_actions.add(daemon_spec.daemon_type)
1788 self.cache.invalidate_host_daemons(daemon_spec.host)
f91f0fd5
TL
1789 self.cache.update_daemon_config_deps(
1790 daemon_spec.host, daemon_spec.name(), deps, start_time)
f6b5b4d7
TL
1791 self.cache.save_host(daemon_spec.host)
1792 msg = "{} {} on host '{}'".format(
1793 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1794 if not code:
1795 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1796 else:
1797 what = 'reconfigure' if reconfig else 'deploy'
f91f0fd5
TL
1798 self.events.for_daemon(
1799 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
f6b5b4d7 1800 return msg
9f95a23c 1801
e306af50 1802 @forall_hosts
adb31ebb 1803 def _remove_daemons(self, name: str, host: str) -> str:
9f95a23c
TL
1804 return self._remove_daemon(name, host)
1805
adb31ebb 1806 def _remove_daemon(self, name: str, host: str) -> str:
9f95a23c
TL
1807 """
1808 Remove a daemon
1809 """
1810 (daemon_type, daemon_id) = name.split('.', 1)
f91f0fd5
TL
1811 daemon = orchestrator.DaemonDescription(
1812 daemon_type=daemon_type,
1813 daemon_id=daemon_id,
1814 hostname=host)
9f95a23c 1815
f91f0fd5 1816 with set_exception_subject('service', daemon.service_id(), overwrite=True):
f6b5b4d7 1817
f91f0fd5 1818 self.cephadm_services[daemon_type].pre_remove(daemon)
f6b5b4d7
TL
1819
1820 args = ['--name', name, '--force']
1821 self.log.info('Removing daemon %s from %s' % (name, host))
1822 out, err, code = self._run_cephadm(
1823 host, name, 'rm-daemon', args)
1824 if not code:
1825 # remove item from cache
1826 self.cache.rm_daemon(host, name)
1827 self.cache.invalidate_host_daemons(host)
e306af50 1828
f91f0fd5 1829 self.cephadm_services[daemon_type].post_remove(daemon)
9f95a23c 1830
f91f0fd5 1831 return "Removed {} from host '{}'".format(name, host)
9f95a23c 1832
adb31ebb 1833 def _check_pool_exists(self, pool: str, service_name: str) -> None:
e306af50
TL
1834 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
1835 if not self.rados.pool_exists(pool):
1836 raise OrchestratorError(f'Cannot find pool "{pool}" for '
1837 f'service {service_name}')
1838
adb31ebb
TL
1839 def _add_daemon(self,
1840 daemon_type: str,
1841 spec: ServiceSpec,
1842 create_func: Callable[..., CephadmDaemonSpec],
1843 config_func: Optional[Callable] = None) -> List[str]:
9f95a23c
TL
1844 """
1845 Add (and place) a daemon. Require explicit host placement. Do not
1846 schedule, and do not apply the related scheduling limitations.
1847 """
1848 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
1849 if not spec.placement.hosts:
1850 raise OrchestratorError('must specify host(s) to deploy on')
1851 count = spec.placement.count or len(spec.placement.hosts)
1852 daemons = self.cache.get_daemons_by_service(spec.service_name())
1853 return self._create_daemons(daemon_type, spec, daemons,
1854 spec.placement.hosts, count,
1855 create_func, config_func)
1856
adb31ebb
TL
1857 def _create_daemons(self,
1858 daemon_type: str,
1859 spec: ServiceSpec,
1860 daemons: List[DaemonDescription],
1861 hosts: List[HostPlacementSpec],
1862 count: int,
1863 create_func: Callable[..., CephadmDaemonSpec],
1864 config_func: Optional[Callable] = None) -> List[str]:
9f95a23c
TL
1865 if count > len(hosts):
1866 raise OrchestratorError('too few hosts: want %d, have %s' % (
1867 count, hosts))
1868
f6b5b4d7 1869 did_config = False
9f95a23c 1870
f6b5b4d7 1871 args = [] # type: List[CephadmDaemonSpec]
9f95a23c
TL
1872 for host, network, name in hosts:
1873 daemon_id = self.get_unique_name(daemon_type, host, daemons,
e306af50
TL
1874 prefix=spec.service_id,
1875 forcename=name)
f6b5b4d7
TL
1876
1877 if not did_config and config_func:
1878 if daemon_type == 'rgw':
1879 config_func(spec, daemon_id)
1880 else:
1881 config_func(spec)
1882 did_config = True
1883
f91f0fd5
TL
1884 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
1885 host, daemon_id, network, spec)
9f95a23c
TL
1886 self.log.debug('Placing %s.%s on host %s' % (
1887 daemon_type, daemon_id, host))
f6b5b4d7 1888 args.append(daemon_spec)
9f95a23c
TL
1889
1890 # add to daemon list so next name(s) will also be unique
1891 sd = orchestrator.DaemonDescription(
1892 hostname=host,
1893 daemon_type=daemon_type,
1894 daemon_id=daemon_id,
1895 )
1896 daemons.append(sd)
1897
e306af50 1898 @forall_hosts
adb31ebb 1899 def create_func_map(*args: Any) -> str:
f91f0fd5
TL
1900 daemon_spec = create_func(*args)
1901 return self._create_daemon(daemon_spec)
9f95a23c
TL
1902
1903 return create_func_map(args)
1904
1905 @trivial_completion
adb31ebb 1906 def apply_mon(self, spec: ServiceSpec) -> str:
9f95a23c
TL
1907 return self._apply(spec)
1908
e306af50 1909 @trivial_completion
9f95a23c 1910 def add_mon(self, spec):
e306af50 1911 # type: (ServiceSpec) -> List[str]
f91f0fd5 1912 return self._add_daemon('mon', spec, self.mon_service.prepare_create)
9f95a23c 1913
e306af50
TL
1914 @trivial_completion
1915 def add_mgr(self, spec):
1916 # type: (ServiceSpec) -> List[str]
f91f0fd5 1917 return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
9f95a23c 1918
e306af50
TL
1919 def _apply(self, spec: GenericSpec) -> str:
1920 if spec.service_type == 'host':
1921 return self._add_host(cast(HostSpec, spec))
9f95a23c 1922
f6b5b4d7
TL
1923 if spec.service_type == 'osd':
1924 # _trigger preview refresh needs to be smart and
1925 # should only refresh if a change has been detected
1926 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
1927
e306af50 1928 return self._apply_service_spec(cast(ServiceSpec, spec))
9f95a23c 1929
adb31ebb 1930 def _plan(self, spec: ServiceSpec) -> dict:
f6b5b4d7
TL
1931 if spec.service_type == 'osd':
1932 return {'service_name': spec.service_name(),
1933 'service_type': spec.service_type,
1934 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
1935
1936 ha = HostAssignment(
1937 spec=spec,
f91f0fd5 1938 hosts=self._hosts_with_daemon_inventory(),
f6b5b4d7
TL
1939 get_daemons_func=self.cache.get_daemons_by_service,
1940 )
1941 ha.validate()
1942 hosts = ha.place()
1943
1944 add_daemon_hosts = ha.add_daemon_hosts(hosts)
1945 remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
1946
1947 return {
1948 'service_name': spec.service_name(),
1949 'service_type': spec.service_type,
1950 'add': [hs.hostname for hs in add_daemon_hosts],
1951 'remove': [d.hostname for d in remove_daemon_hosts]
1952 }
1953
1954 @trivial_completion
1955 def plan(self, specs: List[GenericSpec]) -> List:
1956 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1957 'to the current inventory setup. If any on these conditions changes, the \n'
1958 'preview will be invalid. Please make sure to have a minimal \n'
1959 'timeframe between planning and applying the specs.'}]
1960 if any([spec.service_type == 'host' for spec in specs]):
1961 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1962 for spec in specs:
1963 results.append(self._plan(cast(ServiceSpec, spec)))
1964 return results
1965
e306af50 1966 def _apply_service_spec(self, spec: ServiceSpec) -> str:
9f95a23c
TL
1967 if spec.placement.is_empty():
1968 # fill in default placement
1969 defaults = {
1970 'mon': PlacementSpec(count=5),
1971 'mgr': PlacementSpec(count=2),
1972 'mds': PlacementSpec(count=2),
1973 'rgw': PlacementSpec(count=2),
1911f103 1974 'iscsi': PlacementSpec(count=1),
9f95a23c 1975 'rbd-mirror': PlacementSpec(count=2),
801d1391 1976 'nfs': PlacementSpec(count=1),
9f95a23c
TL
1977 'grafana': PlacementSpec(count=1),
1978 'alertmanager': PlacementSpec(count=1),
1979 'prometheus': PlacementSpec(count=1),
1980 'node-exporter': PlacementSpec(host_pattern='*'),
1981 'crash': PlacementSpec(host_pattern='*'),
f91f0fd5 1982 'container': PlacementSpec(count=1),
9f95a23c
TL
1983 }
1984 spec.placement = defaults[spec.service_type]
1985 elif spec.service_type in ['mon', 'mgr'] and \
f91f0fd5
TL
1986 spec.placement.count is not None and \
1987 spec.placement.count < 1:
9f95a23c
TL
1988 raise OrchestratorError('cannot scale %s service below 1' % (
1989 spec.service_type))
1990
1991 HostAssignment(
1992 spec=spec,
f91f0fd5 1993 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
9f95a23c
TL
1994 get_daemons_func=self.cache.get_daemons_by_service,
1995 ).validate()
1996
1997 self.log.info('Saving service %s spec with placement %s' % (
1998 spec.service_name(), spec.placement.pretty_str()))
1999 self.spec_store.save(spec)
2000 self._kick_serve_loop()
1911f103 2001 return "Scheduled %s update..." % spec.service_name()
9f95a23c
TL
2002
2003 @trivial_completion
f6b5b4d7 2004 def apply(self, specs: List[GenericSpec]) -> List[str]:
e306af50
TL
2005 results = []
2006 for spec in specs:
2007 results.append(self._apply(spec))
2008 return results
9f95a23c
TL
2009
2010 @trivial_completion
adb31ebb 2011 def apply_mgr(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2012 return self._apply(spec)
2013
e306af50 2014 @trivial_completion
f6b5b4d7 2015 def add_mds(self, spec: ServiceSpec) -> List[str]:
f91f0fd5 2016 return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config)
9f95a23c
TL
2017
2018 @trivial_completion
f6b5b4d7 2019 def apply_mds(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2020 return self._apply(spec)
2021
e306af50 2022 @trivial_completion
adb31ebb 2023 def add_rgw(self, spec: ServiceSpec) -> List[str]:
f91f0fd5 2024 return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
9f95a23c
TL
2025
2026 @trivial_completion
adb31ebb 2027 def apply_rgw(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2028 return self._apply(spec)
2029
e306af50 2030 @trivial_completion
1911f103 2031 def add_iscsi(self, spec):
e306af50 2032 # type: (ServiceSpec) -> List[str]
f91f0fd5 2033 return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
1911f103
TL
2034
2035 @trivial_completion
adb31ebb 2036 def apply_iscsi(self, spec: ServiceSpec) -> str:
1911f103
TL
2037 return self._apply(spec)
2038
e306af50 2039 @trivial_completion
adb31ebb 2040 def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
f91f0fd5 2041 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
9f95a23c
TL
2042
2043 @trivial_completion
adb31ebb 2044 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2045 return self._apply(spec)
2046
e306af50 2047 @trivial_completion
adb31ebb 2048 def add_nfs(self, spec: ServiceSpec) -> List[str]:
f91f0fd5 2049 return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
801d1391
TL
2050
2051 @trivial_completion
adb31ebb 2052 def apply_nfs(self, spec: ServiceSpec) -> str:
801d1391
TL
2053 return self._apply(spec)
2054
9f95a23c
TL
2055 def _get_dashboard_url(self):
2056 # type: () -> str
2057 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2058
e306af50 2059 @trivial_completion
adb31ebb 2060 def add_prometheus(self, spec: ServiceSpec) -> List[str]:
f91f0fd5 2061 return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
9f95a23c
TL
2062
2063 @trivial_completion
adb31ebb 2064 def apply_prometheus(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2065 return self._apply(spec)
2066
e306af50 2067 @trivial_completion
9f95a23c 2068 def add_node_exporter(self, spec):
e306af50 2069 # type: (ServiceSpec) -> List[str]
9f95a23c 2070 return self._add_daemon('node-exporter', spec,
f91f0fd5 2071 self.node_exporter_service.prepare_create)
9f95a23c
TL
2072
2073 @trivial_completion
adb31ebb 2074 def apply_node_exporter(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2075 return self._apply(spec)
2076
e306af50 2077 @trivial_completion
9f95a23c 2078 def add_crash(self, spec):
e306af50 2079 # type: (ServiceSpec) -> List[str]
9f95a23c 2080 return self._add_daemon('crash', spec,
f91f0fd5 2081 self.crash_service.prepare_create)
9f95a23c
TL
2082
2083 @trivial_completion
adb31ebb 2084 def apply_crash(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2085 return self._apply(spec)
2086
e306af50 2087 @trivial_completion
9f95a23c 2088 def add_grafana(self, spec):
e306af50 2089 # type: (ServiceSpec) -> List[str]
f91f0fd5 2090 return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
9f95a23c
TL
2091
2092 @trivial_completion
f6b5b4d7 2093 def apply_grafana(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2094 return self._apply(spec)
2095
e306af50 2096 @trivial_completion
9f95a23c 2097 def add_alertmanager(self, spec):
e306af50 2098 # type: (ServiceSpec) -> List[str]
f91f0fd5 2099 return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
9f95a23c
TL
2100
2101 @trivial_completion
f6b5b4d7 2102 def apply_alertmanager(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2103 return self._apply(spec)
2104
f91f0fd5
TL
2105 @trivial_completion
2106 def add_container(self, spec: ServiceSpec) -> List[str]:
2107 return self._add_daemon('container', spec,
2108 self.container_service.prepare_create)
2109
2110 @trivial_completion
2111 def apply_container(self, spec: ServiceSpec) -> str:
2112 return self._apply(spec)
2113
adb31ebb 2114 def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
9f95a23c
TL
2115 # pick a random host...
2116 host = None
e306af50 2117 for host_name in self.inventory.keys():
9f95a23c
TL
2118 host = host_name
2119 break
2120 if not host:
2121 raise OrchestratorError('no hosts defined')
f6b5b4d7 2122 if self.cache.host_needs_registry_login(host) and self.registry_url:
f91f0fd5
TL
2123 self._registry_login(host, self.registry_url,
2124 self.registry_username, self.registry_password)
9f95a23c 2125 out, err, code = self._run_cephadm(
f6b5b4d7 2126 host, '', 'pull', [],
9f95a23c
TL
2127 image=image_name,
2128 no_fsid=True,
2129 error_ok=True)
2130 if code:
2131 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2132 image_name, host, '\n'.join(out)))
f91f0fd5
TL
2133 try:
2134 j = json.loads('\n'.join(out))
2135 r = ContainerInspectInfo(
2136 j['image_id'],
2137 j.get('ceph_version'),
2138 j.get('repo_digest')
2139 )
2140 self.log.debug(f'image {image_name} -> {r}')
2141 return r
2142 except (ValueError, KeyError) as _:
adb31ebb
TL
2143 msg = 'Failed to pull %s on %s: Cannot decode JSON' % (image_name, host)
2144 self.log.exception('%s: \'%s\'' % (msg, '\n'.join(out)))
f91f0fd5 2145 raise OrchestratorError(msg)
9f95a23c
TL
2146
2147 @trivial_completion
adb31ebb 2148 def upgrade_check(self, image: str, version: str) -> str:
9f95a23c
TL
2149 if version:
2150 target_name = self.container_image_base + ':v' + version
2151 elif image:
2152 target_name = image
2153 else:
2154 raise OrchestratorError('must specify either image or version')
2155
f91f0fd5
TL
2156 image_info = self._get_container_image_info(target_name)
2157 self.log.debug(f'image info {image} -> {image_info}')
adb31ebb 2158 r: dict = {
9f95a23c 2159 'target_name': target_name,
f91f0fd5
TL
2160 'target_id': image_info.image_id,
2161 'target_version': image_info.ceph_version,
9f95a23c
TL
2162 'needs_update': dict(),
2163 'up_to_date': list(),
2164 }
2165 for host, dm in self.cache.daemons.items():
2166 for name, dd in dm.items():
f91f0fd5 2167 if image_info.image_id == dd.container_image_id:
9f95a23c
TL
2168 r['up_to_date'].append(dd.name())
2169 else:
2170 r['needs_update'][dd.name()] = {
2171 'current_name': dd.container_image_name,
2172 'current_id': dd.container_image_id,
2173 'current_version': dd.version,
2174 }
f91f0fd5
TL
2175 if self.use_repo_digest:
2176 r['target_digest'] = image_info.repo_digest
2177
9f95a23c
TL
2178 return json.dumps(r, indent=4, sort_keys=True)
2179
2180 @trivial_completion
f6b5b4d7 2181 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
e306af50 2182 return self.upgrade.upgrade_status()
9f95a23c
TL
2183
2184 @trivial_completion
adb31ebb 2185 def upgrade_start(self, image: str, version: str) -> str:
e306af50 2186 return self.upgrade.upgrade_start(image, version)
9f95a23c
TL
2187
2188 @trivial_completion
f6b5b4d7 2189 def upgrade_pause(self) -> str:
e306af50 2190 return self.upgrade.upgrade_pause()
9f95a23c
TL
2191
2192 @trivial_completion
f6b5b4d7 2193 def upgrade_resume(self) -> str:
e306af50 2194 return self.upgrade.upgrade_resume()
9f95a23c
TL
2195
2196 @trivial_completion
f6b5b4d7 2197 def upgrade_stop(self) -> str:
e306af50 2198 return self.upgrade.upgrade_stop()
9f95a23c
TL
2199
2200 @trivial_completion
2201 def remove_osds(self, osd_ids: List[str],
2202 replace: bool = False,
f6b5b4d7 2203 force: bool = False) -> str:
9f95a23c
TL
2204 """
2205 Takes a list of OSDs and schedules them for removal.
2206 The function that takes care of the actual removal is
f6b5b4d7 2207 process_removal_queue().
9f95a23c
TL
2208 """
2209
f6b5b4d7
TL
2210 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2211 to_remove_daemons = list()
9f95a23c 2212 for daemon in daemons:
f6b5b4d7
TL
2213 if daemon.daemon_id in osd_ids:
2214 to_remove_daemons.append(daemon)
2215 if not to_remove_daemons:
2216 return f"Unable to find OSDs: {osd_ids}"
9f95a23c 2217
f6b5b4d7
TL
2218 for daemon in to_remove_daemons:
2219 try:
2220 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2221 replace=replace,
2222 force=force,
2223 hostname=daemon.hostname,
2224 fullname=daemon.name(),
adb31ebb
TL
2225 process_started_at=datetime_now(),
2226 remove_util=self.to_remove_osds.rm_util))
f6b5b4d7
TL
2227 except NotFoundError:
2228 return f"Unable to find OSDs: {osd_ids}"
9f95a23c
TL
2229
2230 # trigger the serve loop to initiate the removal
2231 self._kick_serve_loop()
2232 return "Scheduled OSD(s) for removal"
2233
f6b5b4d7 2234 @trivial_completion
adb31ebb 2235 def stop_remove_osds(self, osd_ids: List[str]) -> str:
f6b5b4d7
TL
2236 """
2237 Stops a `removal` process for a List of OSDs.
2238 This will revert their weight and remove it from the osds_to_remove queue
2239 """
2240 for osd_id in osd_ids:
2241 try:
2242 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
adb31ebb 2243 remove_util=self.to_remove_osds.rm_util))
f6b5b4d7
TL
2244 except (NotFoundError, KeyError):
2245 return f'Unable to find OSD in the queue: {osd_id}'
2246
2247 # trigger the serve loop to halt the removal
2248 self._kick_serve_loop()
2249 return "Stopped OSD(s) removal"
2250
9f95a23c 2251 @trivial_completion
adb31ebb 2252 def remove_osds_status(self) -> List[OSD]:
9f95a23c
TL
2253 """
2254 The CLI call to retrieve an osd removal report
2255 """
f6b5b4d7 2256 return self.to_remove_osds.all_osds()