]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
import ceph 15.2.13
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 import re
5 import shlex
6 from collections import defaultdict
7 from configparser import ConfigParser
8 from contextlib import contextmanager
9 from functools import wraps
10 from tempfile import TemporaryDirectory
11 from threading import Event
12
13 import string
14 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
15 Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
16
17 import datetime
18 import six
19 import os
20 import random
21 import tempfile
22 import multiprocessing.pool
23 import subprocess
24
25 from ceph.deployment import inventory
26 from ceph.deployment.drive_group import DriveGroupSpec
27 from ceph.deployment.service_spec import \
28 NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
29 CustomContainerSpec, HostPlacementSpec
30 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
31 from cephadm.serve import CephadmServe
32 from cephadm.services.cephadmservice import CephadmDaemonSpec
33
34 from mgr_module import MgrModule, HandleCommandResult
35 import orchestrator
36 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
37 CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
38 from orchestrator._interface import GenericSpec
39
40 from . import remotes
41 from . import utils
42 from .migrations import Migrations
43 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
44 RbdMirrorService, CrashService, CephadmService
45 from .services.container import CustomContainerService
46 from .services.iscsi import IscsiService
47 from .services.nfs import NFSService
48 from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
49 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
50 NodeExporterService
51 from .schedule import HostAssignment
52 from .inventory import Inventory, SpecStore, HostCache, EventStore
53 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
54 from .template import TemplateMgr
55 from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
56
57 try:
58 import remoto
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils.version import StrictVersion
62 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self: Any) -> bool:
64 return self.gateway.hasreceiver()
65
66 from remoto.backends import BaseConnection
67 BaseConnection.has_connection = remoto_has_connection
68 import remoto.process
69 import execnet.gateway_bootstrap
70 except ImportError as e:
71 remoto = None
72 remoto_import_error = str(e)
73
74 try:
75 from typing import List
76 except ImportError:
77 pass
78
79 logger = logging.getLogger(__name__)
80
81 T = TypeVar('T')
82
83 DEFAULT_SSH_CONFIG = """
84 Host *
85 User root
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
88 ConnectTimeout=30
89 """
90
91 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
92
93
94 class CephadmCompletion(orchestrator.Completion[T]):
95 def evaluate(self) -> None:
96 self.finalize(None)
97
98
99 def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
100 """
101 Decorator to make CephadmCompletion methods return
102 a completion object that executes themselves.
103 """
104
105 @wraps(f)
106 def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
107 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
108
109 return wrapper
110
111
112 class ContainerInspectInfo(NamedTuple):
113 image_id: str
114 ceph_version: Optional[str]
115 repo_digest: Optional[str]
116
117
118 @six.add_metaclass(CLICommandMeta)
119 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
120
121 _STORE_HOST_PREFIX = "host"
122
123 instance = None
124 NATIVE_OPTIONS = [] # type: List[Any]
125 MODULE_OPTIONS: List[dict] = [
126 {
127 'name': 'ssh_config_file',
128 'type': 'str',
129 'default': None,
130 'desc': 'customized SSH config file to connect to managed hosts',
131 },
132 {
133 'name': 'device_cache_timeout',
134 'type': 'secs',
135 'default': 30 * 60,
136 'desc': 'seconds to cache device inventory',
137 },
138 {
139 'name': 'daemon_cache_timeout',
140 'type': 'secs',
141 'default': 10 * 60,
142 'desc': 'seconds to cache service (daemon) inventory',
143 },
144 {
145 'name': 'facts_cache_timeout',
146 'type': 'secs',
147 'default': 1 * 60,
148 'desc': 'seconds to cache host facts data',
149 },
150 {
151 'name': 'host_check_interval',
152 'type': 'secs',
153 'default': 10 * 60,
154 'desc': 'how frequently to perform a host check',
155 },
156 {
157 'name': 'mode',
158 'type': 'str',
159 'enum_allowed': ['root', 'cephadm-package'],
160 'default': 'root',
161 'desc': 'mode for remote execution of cephadm',
162 },
163 {
164 'name': 'container_image_base',
165 'default': 'docker.io/ceph/ceph',
166 'desc': 'Container image name, without the tag',
167 'runtime': True,
168 },
169 {
170 'name': 'container_image_prometheus',
171 'default': 'docker.io/prom/prometheus:v2.18.1',
172 'desc': 'Prometheus container image',
173 },
174 {
175 'name': 'container_image_grafana',
176 'default': 'docker.io/ceph/ceph-grafana:6.7.4',
177 'desc': 'Prometheus container image',
178 },
179 {
180 'name': 'container_image_alertmanager',
181 'default': 'docker.io/prom/alertmanager:v0.20.0',
182 'desc': 'Prometheus container image',
183 },
184 {
185 'name': 'container_image_node_exporter',
186 'default': 'docker.io/prom/node-exporter:v0.18.1',
187 'desc': 'Prometheus container image',
188 },
189 {
190 'name': 'warn_on_stray_hosts',
191 'type': 'bool',
192 'default': True,
193 'desc': 'raise a health warning if daemons are detected on a host '
194 'that is not managed by cephadm',
195 },
196 {
197 'name': 'warn_on_stray_daemons',
198 'type': 'bool',
199 'default': True,
200 'desc': 'raise a health warning if daemons are detected '
201 'that are not managed by cephadm',
202 },
203 {
204 'name': 'warn_on_failed_host_check',
205 'type': 'bool',
206 'default': True,
207 'desc': 'raise a health warning if the host check fails',
208 },
209 {
210 'name': 'log_to_cluster',
211 'type': 'bool',
212 'default': True,
213 'desc': 'log to the "cephadm" cluster log channel"',
214 },
215 {
216 'name': 'allow_ptrace',
217 'type': 'bool',
218 'default': False,
219 'desc': 'allow SYS_PTRACE capability on ceph containers',
220 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
221 'process with gdb or strace. Enabling this options '
222 'can allow debugging daemons that encounter problems '
223 'at runtime.',
224 },
225 {
226 'name': 'container_init',
227 'type': 'bool',
228 'default': False,
229 'desc': 'Run podman/docker with `--init`',
230 },
231 {
232 'name': 'prometheus_alerts_path',
233 'type': 'str',
234 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
235 'desc': 'location of alerts to include in prometheus deployments',
236 },
237 {
238 'name': 'migration_current',
239 'type': 'int',
240 'default': None,
241 'desc': 'internal - do not modify',
242 # used to track track spec and other data migrations.
243 },
244 {
245 'name': 'config_dashboard',
246 'type': 'bool',
247 'default': True,
248 'desc': 'manage configs like API endpoints in Dashboard.'
249 },
250 {
251 'name': 'manage_etc_ceph_ceph_conf',
252 'type': 'bool',
253 'default': False,
254 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
255 },
256 {
257 'name': 'registry_url',
258 'type': 'str',
259 'default': None,
260 'desc': 'Custom repository url'
261 },
262 {
263 'name': 'registry_username',
264 'type': 'str',
265 'default': None,
266 'desc': 'Custom repository username'
267 },
268 {
269 'name': 'registry_password',
270 'type': 'str',
271 'default': None,
272 'desc': 'Custom repository password'
273 },
274 {
275 'name': 'use_repo_digest',
276 'type': 'bool',
277 'default': False,
278 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
279 }
280 ]
281
282 def __init__(self, *args: Any, **kwargs: Any):
283 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
284 self._cluster_fsid = self.get('mon_map')['fsid']
285 self.last_monmap: Optional[datetime.datetime] = None
286
287 # for serve()
288 self.run = True
289 self.event = Event()
290
291 if self.get_store('pause'):
292 self.paused = True
293 else:
294 self.paused = False
295
296 # for mypy which does not run the code
297 if TYPE_CHECKING:
298 self.ssh_config_file = None # type: Optional[str]
299 self.device_cache_timeout = 0
300 self.daemon_cache_timeout = 0
301 self.facts_cache_timeout = 0
302 self.host_check_interval = 0
303 self.mode = ''
304 self.container_image_base = ''
305 self.container_image_prometheus = ''
306 self.container_image_grafana = ''
307 self.container_image_alertmanager = ''
308 self.container_image_node_exporter = ''
309 self.warn_on_stray_hosts = True
310 self.warn_on_stray_daemons = True
311 self.warn_on_failed_host_check = True
312 self.allow_ptrace = False
313 self.container_init = False
314 self.prometheus_alerts_path = ''
315 self.migration_current: Optional[int] = None
316 self.config_dashboard = True
317 self.manage_etc_ceph_ceph_conf = True
318 self.registry_url: Optional[str] = None
319 self.registry_username: Optional[str] = None
320 self.registry_password: Optional[str] = None
321 self.use_repo_digest = False
322
323 self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
324 remoto.backends.LegacyModuleExecute]] = {}
325
326 self.notify('mon_map', None)
327 self.config_notify()
328
329 path = self.get_ceph_option('cephadm_path')
330 try:
331 with open(path, 'r') as f:
332 self._cephadm = f.read()
333 except (IOError, TypeError) as e:
334 raise RuntimeError("unable to read cephadm at '%s': %s" % (
335 path, str(e)))
336
337 self._worker_pool = multiprocessing.pool.ThreadPool(10)
338
339 self._reconfig_ssh()
340
341 CephadmOrchestrator.instance = self
342
343 self.upgrade = CephadmUpgrade(self)
344
345 self.health_checks: Dict[str, dict] = {}
346
347 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
348
349 self.inventory = Inventory(self)
350
351 self.cache = HostCache(self)
352 self.cache.load()
353
354 self.to_remove_osds = OSDRemovalQueue(self)
355 self.to_remove_osds.load_from_store()
356
357 self.spec_store = SpecStore(self)
358 self.spec_store.load()
359
360 # ensure the host lists are in sync
361 for h in self.inventory.keys():
362 if h not in self.cache.daemons:
363 self.cache.prime_empty_host(h)
364 for h in self.cache.get_hosts():
365 if h not in self.inventory:
366 self.cache.rm_host(h)
367
368 # in-memory only.
369 self.events = EventStore(self)
370 self.offline_hosts: Set[str] = set()
371
372 self.migration = Migrations(self)
373
374 # services:
375 self.osd_service = OSDService(self)
376 self.nfs_service = NFSService(self)
377 self.mon_service = MonService(self)
378 self.mgr_service = MgrService(self)
379 self.mds_service = MdsService(self)
380 self.rgw_service = RgwService(self)
381 self.rbd_mirror_service = RbdMirrorService(self)
382 self.grafana_service = GrafanaService(self)
383 self.alertmanager_service = AlertmanagerService(self)
384 self.prometheus_service = PrometheusService(self)
385 self.node_exporter_service = NodeExporterService(self)
386 self.crash_service = CrashService(self)
387 self.iscsi_service = IscsiService(self)
388 self.container_service = CustomContainerService(self)
389 self.cephadm_services = {
390 'mon': self.mon_service,
391 'mgr': self.mgr_service,
392 'osd': self.osd_service,
393 'mds': self.mds_service,
394 'rgw': self.rgw_service,
395 'rbd-mirror': self.rbd_mirror_service,
396 'nfs': self.nfs_service,
397 'grafana': self.grafana_service,
398 'alertmanager': self.alertmanager_service,
399 'prometheus': self.prometheus_service,
400 'node-exporter': self.node_exporter_service,
401 'crash': self.crash_service,
402 'iscsi': self.iscsi_service,
403 'container': self.container_service,
404 }
405
406 self.template = TemplateMgr(self)
407
408 self.requires_post_actions: Set[str] = set()
409
410 def shutdown(self) -> None:
411 self.log.debug('shutdown')
412 self._worker_pool.close()
413 self._worker_pool.join()
414 self.run = False
415 self.event.set()
416
417 def _get_cephadm_service(self, service_type: str) -> CephadmService:
418 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
419 return self.cephadm_services[service_type]
420
421 def _kick_serve_loop(self) -> None:
422 self.log.debug('_kick_serve_loop')
423 self.event.set()
424
425 # function responsible for logging single host into custom registry
426 def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
427 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
428 # want to pass info over stdin rather than through normal list of args
429 args_str = json.dumps({
430 'url': url,
431 'username': username,
432 'password': password,
433 })
434 out, err, code = self._run_cephadm(
435 host, 'mon', 'registry-login',
436 ['--registry-json', '-'], stdin=args_str, error_ok=True)
437 if code:
438 return f"Host {host} failed to login to {url} as {username} with given password"
439 return None
440
441 def serve(self) -> None:
442 """
443 The main loop of cephadm.
444
445 A command handler will typically change the declarative state
446 of cephadm. This loop will then attempt to apply this new state.
447 """
448 serve = CephadmServe(self)
449 serve.serve()
450
451 def set_container_image(self, entity: str, image: str) -> None:
452 self.check_mon_command({
453 'prefix': 'config set',
454 'name': 'container_image',
455 'value': image,
456 'who': entity,
457 })
458
459 def config_notify(self) -> None:
460 """
461 This method is called whenever one of our config options is changed.
462
463 TODO: this method should be moved into mgr_module.py
464 """
465 for opt in self.MODULE_OPTIONS:
466 setattr(self,
467 opt['name'], # type: ignore
468 self.get_module_option(opt['name'])) # type: ignore
469 self.log.debug(' mgr option %s = %s',
470 opt['name'], getattr(self, opt['name'])) # type: ignore
471 for opt in self.NATIVE_OPTIONS:
472 setattr(self,
473 opt, # type: ignore
474 self.get_ceph_option(opt))
475 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
476
477 self.event.set()
478
479 def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
480 if notify_type == "mon_map":
481 # get monmap mtime so we can refresh configs when mons change
482 monmap = self.get('mon_map')
483 self.last_monmap = str_to_datetime(monmap['modified'])
484 if self.last_monmap and self.last_monmap > datetime_now():
485 self.last_monmap = None # just in case clocks are skewed
486 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
487 # getattr, due to notify() being called before config_notify()
488 self._kick_serve_loop()
489 if notify_type == "pg_summary":
490 self._trigger_osd_removal()
491
492 def _trigger_osd_removal(self) -> None:
493 data = self.get("osd_stats")
494 for osd in data.get('osd_stats', []):
495 if osd.get('num_pgs') == 0:
496 # if _ANY_ osd that is currently in the queue appears to be empty,
497 # start the removal process
498 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
499 self.log.debug(f"Found empty osd. Starting removal process")
500 # if the osd that is now empty is also part of the removal queue
501 # start the process
502 self._kick_serve_loop()
503
504 def pause(self) -> None:
505 if not self.paused:
506 self.log.info('Paused')
507 self.set_store('pause', 'true')
508 self.paused = True
509 # wake loop so we update the health status
510 self._kick_serve_loop()
511
512 def resume(self) -> None:
513 if self.paused:
514 self.log.info('Resumed')
515 self.paused = False
516 self.set_store('pause', None)
517 # unconditionally wake loop so that 'orch resume' can be used to kick
518 # cephadm
519 self._kick_serve_loop()
520
521 def get_unique_name(self, daemon_type, host, existing, prefix=None,
522 forcename=None):
523 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
524 """
525 Generate a unique random service name
526 """
527 suffix = daemon_type not in [
528 'mon', 'crash', 'nfs',
529 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
530 'container'
531 ]
532 if forcename:
533 if len([d for d in existing if d.daemon_id == forcename]):
534 raise orchestrator.OrchestratorValidationError(
535 f'name {daemon_type}.{forcename} already in use')
536 return forcename
537
538 if '.' in host:
539 host = host.split('.')[0]
540 while True:
541 if prefix:
542 name = prefix + '.'
543 else:
544 name = ''
545 name += host
546 if suffix:
547 name += '.' + ''.join(random.choice(string.ascii_lowercase)
548 for _ in range(6))
549 if len([d for d in existing if d.daemon_id == name]):
550 if not suffix:
551 raise orchestrator.OrchestratorValidationError(
552 f'name {daemon_type}.{name} already in use')
553 self.log.debug('name %s exists, trying again', name)
554 continue
555 return name
556
557 def _reconfig_ssh(self) -> None:
558 temp_files = [] # type: list
559 ssh_options = [] # type: List[str]
560
561 # ssh_config
562 ssh_config_fname = self.ssh_config_file
563 ssh_config = self.get_store("ssh_config")
564 if ssh_config is not None or ssh_config_fname is None:
565 if not ssh_config:
566 ssh_config = DEFAULT_SSH_CONFIG
567 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
568 os.fchmod(f.fileno(), 0o600)
569 f.write(ssh_config.encode('utf-8'))
570 f.flush() # make visible to other processes
571 temp_files += [f]
572 ssh_config_fname = f.name
573 if ssh_config_fname:
574 self.validate_ssh_config_fname(ssh_config_fname)
575 ssh_options += ['-F', ssh_config_fname]
576 self.ssh_config = ssh_config
577
578 # identity
579 ssh_key = self.get_store("ssh_identity_key")
580 ssh_pub = self.get_store("ssh_identity_pub")
581 self.ssh_pub = ssh_pub
582 self.ssh_key = ssh_key
583 if ssh_key and ssh_pub:
584 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
585 tkey.write(ssh_key.encode('utf-8'))
586 os.fchmod(tkey.fileno(), 0o600)
587 tkey.flush() # make visible to other processes
588 tpub = open(tkey.name + '.pub', 'w')
589 os.fchmod(tpub.fileno(), 0o600)
590 tpub.write(ssh_pub)
591 tpub.flush() # make visible to other processes
592 temp_files += [tkey, tpub]
593 ssh_options += ['-i', tkey.name]
594
595 self._temp_files = temp_files
596 if ssh_options:
597 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
598 else:
599 self._ssh_options = None
600
601 if self.mode == 'root':
602 self.ssh_user = self.get_store('ssh_user', default='root')
603 elif self.mode == 'cephadm-package':
604 self.ssh_user = 'cephadm'
605
606 self._reset_cons()
607
608 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
609 if ssh_config is None or len(ssh_config.strip()) == 0:
610 raise OrchestratorValidationError('ssh_config cannot be empty')
611 # StrictHostKeyChecking is [yes|no] ?
612 l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
613 if not l:
614 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
615 for s in l:
616 if 'ask' in s.lower():
617 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
618
619 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
620 if not os.path.isfile(ssh_config_fname):
621 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
622 ssh_config_fname))
623
624 def _reset_con(self, host: str) -> None:
625 conn, r = self._cons.get(host, (None, None))
626 if conn:
627 self.log.debug('_reset_con close %s' % host)
628 conn.exit()
629 del self._cons[host]
630
631 def _reset_cons(self) -> None:
632 for host, conn_and_r in self._cons.items():
633 self.log.debug('_reset_cons close %s' % host)
634 conn, r = conn_and_r
635 conn.exit()
636 self._cons = {}
637
638 def offline_hosts_remove(self, host: str) -> None:
639 if host in self.offline_hosts:
640 self.offline_hosts.remove(host)
641
642 @staticmethod
643 def can_run() -> Tuple[bool, str]:
644 if remoto is not None:
645 return True, ""
646 else:
647 return False, "loading remoto library:{}".format(
648 remoto_import_error)
649
650 def available(self) -> Tuple[bool, str]:
651 """
652 The cephadm orchestrator is always available.
653 """
654 ok, err = self.can_run()
655 if not ok:
656 return ok, err
657 if not self.ssh_key or not self.ssh_pub:
658 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
659 return True, ''
660
661 def process(self, completions: List[CephadmCompletion]) -> None:
662 """
663 Does nothing, as completions are processed in another thread.
664 """
665 if completions:
666 self.log.debug("process: completions={0}".format(
667 orchestrator.pretty_print(completions)))
668
669 for p in completions:
670 p.evaluate()
671
672 @orchestrator._cli_write_command(
673 prefix='cephadm set-ssh-config',
674 desc='Set the ssh_config file (use -i <ssh_config>)')
675 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
676 """
677 Set an ssh_config file provided from stdin
678 """
679 if inbuf == self.ssh_config:
680 return 0, "value unchanged", ""
681 self.validate_ssh_config_content(inbuf)
682 self.set_store("ssh_config", inbuf)
683 self.log.info('Set ssh_config')
684 self._reconfig_ssh()
685 return 0, "", ""
686
687 @orchestrator._cli_write_command(
688 prefix='cephadm clear-ssh-config',
689 desc='Clear the ssh_config file')
690 def _clear_ssh_config(self) -> Tuple[int, str, str]:
691 """
692 Clear the ssh_config file provided from stdin
693 """
694 self.set_store("ssh_config", None)
695 self.ssh_config_tmp = None
696 self.log.info('Cleared ssh_config')
697 self._reconfig_ssh()
698 return 0, "", ""
699
700 @orchestrator._cli_read_command(
701 prefix='cephadm get-ssh-config',
702 desc='Returns the ssh config as used by cephadm'
703 )
704 def _get_ssh_config(self) -> HandleCommandResult:
705 if self.ssh_config_file:
706 self.validate_ssh_config_fname(self.ssh_config_file)
707 with open(self.ssh_config_file) as f:
708 return HandleCommandResult(stdout=f.read())
709 ssh_config = self.get_store("ssh_config")
710 if ssh_config:
711 return HandleCommandResult(stdout=ssh_config)
712 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
713
714 @orchestrator._cli_write_command(
715 'cephadm generate-key',
716 desc='Generate a cluster SSH key (if not present)')
717 def _generate_key(self) -> Tuple[int, str, str]:
718 if not self.ssh_pub or not self.ssh_key:
719 self.log.info('Generating ssh key...')
720 tmp_dir = TemporaryDirectory()
721 path = tmp_dir.name + '/key'
722 try:
723 subprocess.check_call([
724 '/usr/bin/ssh-keygen',
725 '-C', 'ceph-%s' % self._cluster_fsid,
726 '-N', '',
727 '-f', path
728 ])
729 with open(path, 'r') as f:
730 secret = f.read()
731 with open(path + '.pub', 'r') as f:
732 pub = f.read()
733 finally:
734 os.unlink(path)
735 os.unlink(path + '.pub')
736 tmp_dir.cleanup()
737 self.set_store('ssh_identity_key', secret)
738 self.set_store('ssh_identity_pub', pub)
739 self._reconfig_ssh()
740 return 0, '', ''
741
742 @orchestrator._cli_write_command(
743 'cephadm set-priv-key',
744 desc='Set cluster SSH private key (use -i <private_key>)')
745 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
746 if inbuf is None or len(inbuf) == 0:
747 return -errno.EINVAL, "", "empty private ssh key provided"
748 if inbuf == self.ssh_key:
749 return 0, "value unchanged", ""
750 self.set_store("ssh_identity_key", inbuf)
751 self.log.info('Set ssh private key')
752 self._reconfig_ssh()
753 return 0, "", ""
754
755 @orchestrator._cli_write_command(
756 'cephadm set-pub-key',
757 desc='Set cluster SSH public key (use -i <public_key>)')
758 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
759 if inbuf is None or len(inbuf) == 0:
760 return -errno.EINVAL, "", "empty public ssh key provided"
761 if inbuf == self.ssh_pub:
762 return 0, "value unchanged", ""
763 self.set_store("ssh_identity_pub", inbuf)
764 self.log.info('Set ssh public key')
765 self._reconfig_ssh()
766 return 0, "", ""
767
768 @orchestrator._cli_write_command(
769 'cephadm clear-key',
770 desc='Clear cluster SSH key')
771 def _clear_key(self) -> Tuple[int, str, str]:
772 self.set_store('ssh_identity_key', None)
773 self.set_store('ssh_identity_pub', None)
774 self._reconfig_ssh()
775 self.log.info('Cleared cluster SSH key')
776 return 0, '', ''
777
778 @orchestrator._cli_read_command(
779 'cephadm get-pub-key',
780 desc='Show SSH public key for connecting to cluster hosts')
781 def _get_pub_key(self) -> Tuple[int, str, str]:
782 if self.ssh_pub:
783 return 0, self.ssh_pub, ''
784 else:
785 return -errno.ENOENT, '', 'No cluster SSH key defined'
786
787 @orchestrator._cli_read_command(
788 'cephadm get-user',
789 desc='Show user for SSHing to cluster hosts')
790 def _get_user(self) -> Tuple[int, str, str]:
791 return 0, self.ssh_user, ''
792
793 @orchestrator._cli_read_command(
794 'cephadm set-user',
795 'name=user,type=CephString',
796 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
797 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
798 current_user = self.ssh_user
799 if user == current_user:
800 return 0, "value unchanged", ""
801
802 self.set_store('ssh_user', user)
803 self._reconfig_ssh()
804
805 host = self.cache.get_hosts()[0]
806 r = CephadmServe(self)._check_host(host)
807 if r is not None:
808 # connection failed reset user
809 self.set_store('ssh_user', current_user)
810 self._reconfig_ssh()
811 return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
812
813 msg = 'ssh user set to %s' % user
814 if user != 'root':
815 msg += ' sudo will be used'
816 self.log.info(msg)
817 return 0, msg, ''
818
819 @orchestrator._cli_read_command(
820 'cephadm registry-login',
821 "name=url,type=CephString,req=false "
822 "name=username,type=CephString,req=false "
823 "name=password,type=CephString,req=false",
824 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
825 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
826 # if password not given in command line, get it through file input
827 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
828 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
829 "or -i <login credentials json file>")
830 elif not (url and username and password):
831 assert isinstance(inbuf, str)
832 login_info = json.loads(inbuf)
833 if "url" in login_info and "username" in login_info and "password" in login_info:
834 url = login_info["url"]
835 username = login_info["username"]
836 password = login_info["password"]
837 else:
838 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
839 "Please setup json file as\n"
840 "{\n"
841 " \"url\": \"REGISTRY_URL\",\n"
842 " \"username\": \"REGISTRY_USERNAME\",\n"
843 " \"password\": \"REGISTRY_PASSWORD\"\n"
844 "}\n")
845 # verify login info works by attempting login on random host
846 host = None
847 for host_name in self.inventory.keys():
848 host = host_name
849 break
850 if not host:
851 raise OrchestratorError('no hosts defined')
852 r = self._registry_login(host, url, username, password)
853 if r is not None:
854 return 1, '', r
855 # if logins succeeded, store info
856 self.log.debug("Host logins successful. Storing login info.")
857 self.set_module_option('registry_url', url)
858 self.set_module_option('registry_username', username)
859 self.set_module_option('registry_password', password)
860 # distribute new login info to all hosts
861 self.cache.distribute_new_registry_login_info()
862 return 0, "registry login scheduled", ''
863
864 @orchestrator._cli_read_command(
865 'cephadm check-host',
866 'name=host,type=CephString '
867 'name=addr,type=CephString,req=false',
868 'Check whether we can access and manage a remote host')
869 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
870 try:
871 out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
872 ['--expect-hostname', host],
873 addr=addr,
874 error_ok=True, no_fsid=True)
875 if code:
876 return 1, '', ('check-host failed:\n' + '\n'.join(err))
877 except OrchestratorError as e:
878 self.log.exception(f"check-host failed for '{host}'")
879 return 1, '', ('check-host failed:\n' +
880 f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
881 # if we have an outstanding health alert for this host, give the
882 # serve thread a kick
883 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
884 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
885 if item.startswith('host %s ' % host):
886 self.event.set()
887 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
888
889 @orchestrator._cli_read_command(
890 'cephadm prepare-host',
891 'name=host,type=CephString '
892 'name=addr,type=CephString,req=false',
893 'Prepare a remote host for use with cephadm')
894 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
895 out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
896 ['--expect-hostname', host],
897 addr=addr,
898 error_ok=True, no_fsid=True)
899 if code:
900 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
901 # if we have an outstanding health alert for this host, give the
902 # serve thread a kick
903 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
904 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
905 if item.startswith('host %s ' % host):
906 self.event.set()
907 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
908
909 @orchestrator._cli_write_command(
910 prefix='cephadm set-extra-ceph-conf',
911 desc="Text that is appended to all daemon's ceph.conf.\n"
912 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
913 "a complete ceph.conf.\n\n"
914 "Warning: this is a dangerous operation.")
915 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
916 if inbuf:
917 # sanity check.
918 cp = ConfigParser()
919 cp.read_string(inbuf, source='<infile>')
920
921 self.set_store("extra_ceph_conf", json.dumps({
922 'conf': inbuf,
923 'last_modified': datetime_to_str(datetime_now())
924 }))
925 self.log.info('Set extra_ceph_conf')
926 self._kick_serve_loop()
927 return HandleCommandResult()
928
929 @orchestrator._cli_read_command(
930 'cephadm get-extra-ceph-conf',
931 desc='Get extra ceph conf that is appended')
932 def _get_extra_ceph_conf(self) -> HandleCommandResult:
933 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
934
935 class ExtraCephConf(NamedTuple):
936 conf: str
937 last_modified: Optional[datetime.datetime]
938
939 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
940 data = self.get_store('extra_ceph_conf')
941 if not data:
942 return CephadmOrchestrator.ExtraCephConf('', None)
943 try:
944 j = json.loads(data)
945 except ValueError:
946 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
947 self.log.exception('%s: \'%s\'', msg, data)
948 return CephadmOrchestrator.ExtraCephConf('', None)
949 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
950
951 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
952 conf = self.extra_ceph_conf()
953 if not conf.last_modified:
954 return False
955 return conf.last_modified > dt
956
957 def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
958 'remoto.backends.LegacyModuleExecute']:
959 """
960 Setup a connection for running commands on remote host.
961 """
962 conn, r = self._cons.get(host, (None, None))
963 if conn:
964 if conn.has_connection():
965 self.log.debug('Have connection to %s' % host)
966 return conn, r
967 else:
968 self._reset_con(host)
969 n = self.ssh_user + '@' + host
970 self.log.debug("Opening connection to {} with ssh options '{}'".format(
971 n, self._ssh_options))
972 child_logger = self.log.getChild(n)
973 child_logger.setLevel('WARNING')
974 conn = remoto.Connection(
975 n,
976 logger=child_logger,
977 ssh_options=self._ssh_options,
978 sudo=True if self.ssh_user != 'root' else False)
979
980 r = conn.import_module(remotes)
981 self._cons[host] = conn, r
982
983 return conn, r
984
985 def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
986 """
987 Remote validator that accepts a connection object to ensure that a certain
988 executable is available returning its full path if so.
989
990 Otherwise an exception with thorough details will be raised, informing the
991 user that the executable was not found.
992 """
993 executable_path = conn.remote_module.which(executable)
994 if not executable_path:
995 raise RuntimeError("Executable '{}' not found on host '{}'".format(
996 executable, conn.hostname))
997 self.log.debug("Found executable '{}' at path '{}'".format(executable,
998 executable_path))
999 return executable_path
1000
1001 @contextmanager
1002 def _remote_connection(self,
1003 host: str,
1004 addr: Optional[str] = None,
1005 ) -> Iterator[Tuple["BaseConnection", Any]]:
1006 if not addr and host in self.inventory:
1007 addr = self.inventory.get_addr(host)
1008
1009 self.offline_hosts_remove(host)
1010
1011 try:
1012 try:
1013 if not addr:
1014 raise OrchestratorError("host address is empty")
1015 conn, connr = self._get_connection(addr)
1016 except OSError as e:
1017 self._reset_con(host)
1018 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1019 raise execnet.gateway_bootstrap.HostNotFound(msg)
1020
1021 yield (conn, connr)
1022
1023 except execnet.gateway_bootstrap.HostNotFound as e:
1024 # this is a misleading exception as it seems to be thrown for
1025 # any sort of connection failure, even those having nothing to
1026 # do with "host not found" (e.g., ssh key permission denied).
1027 self.offline_hosts.add(host)
1028 self._reset_con(host)
1029
1030 user = self.ssh_user if self.mode == 'root' else 'cephadm'
1031 if str(e).startswith("Can't communicate"):
1032 msg = str(e)
1033 else:
1034 msg = f'''Failed to connect to {host} ({addr}).
1035 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1036
1037 To add the cephadm SSH key to the host:
1038 > ceph cephadm get-pub-key > ~/ceph.pub
1039 > ssh-copy-id -f -i ~/ceph.pub {user}@{host}
1040
1041 To check that the host is reachable:
1042 > ceph cephadm get-ssh-config > ssh_config
1043 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1044 > chmod 0600 ~/cephadm_private_key
1045 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
1046 raise OrchestratorError(msg) from e
1047 except Exception as ex:
1048 self.log.exception(ex)
1049 raise
1050
1051 def _get_container_image(self, daemon_name: str) -> Optional[str]:
1052 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1053 if daemon_type in CEPH_TYPES or \
1054 daemon_type == 'nfs' or \
1055 daemon_type == 'iscsi':
1056 # get container image
1057 ret, image, err = self.check_mon_command({
1058 'prefix': 'config get',
1059 'who': utils.name_to_config_section(daemon_name),
1060 'key': 'container_image',
1061 })
1062 image = image.strip() # type: ignore
1063 elif daemon_type == 'prometheus':
1064 image = self.container_image_prometheus
1065 elif daemon_type == 'grafana':
1066 image = self.container_image_grafana
1067 elif daemon_type == 'alertmanager':
1068 image = self.container_image_alertmanager
1069 elif daemon_type == 'node-exporter':
1070 image = self.container_image_node_exporter
1071 elif daemon_type == CustomContainerService.TYPE:
1072 # The image can't be resolved, the necessary information
1073 # is only available when a container is deployed (given
1074 # via spec).
1075 image = None
1076 else:
1077 assert False, daemon_type
1078
1079 self.log.debug('%s container image %s' % (daemon_name, image))
1080
1081 return image
1082
1083 def _run_cephadm(self,
1084 host: str,
1085 entity: Union[CephadmNoImage, str],
1086 command: str,
1087 args: List[str],
1088 addr: Optional[str] = "",
1089 stdin: Optional[str] = "",
1090 no_fsid: Optional[bool] = False,
1091 error_ok: Optional[bool] = False,
1092 image: Optional[str] = "",
1093 env_vars: Optional[List[str]] = None,
1094 ) -> Tuple[List[str], List[str], int]:
1095 """
1096 Run cephadm on the remote host with the given command + args
1097
1098 :env_vars: in format -> [KEY=VALUE, ..]
1099 """
1100 with self._remote_connection(host, addr) as tpl:
1101 conn, connr = tpl
1102 assert image or entity
1103 if not image and entity is not cephadmNoImage:
1104 image = self._get_container_image(entity)
1105
1106 final_args = []
1107
1108 # global args
1109 if env_vars:
1110 for env_var_pair in env_vars:
1111 final_args.extend(['--env', env_var_pair])
1112
1113 if image:
1114 final_args.extend(['--image', image])
1115
1116 if not self.container_init:
1117 final_args += ['--no-container-init']
1118
1119 # subcommand
1120 final_args.append(command)
1121
1122 # subcommand args
1123 if not no_fsid:
1124 final_args += ['--fsid', self._cluster_fsid]
1125
1126 final_args += args
1127
1128 # exec
1129 self.log.debug('args: %s' % (' '.join(final_args)))
1130 if self.mode == 'root':
1131 if stdin:
1132 self.log.debug('stdin: %s' % stdin)
1133 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1134 if stdin:
1135 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1136 script += self._cephadm
1137 python = connr.choose_python()
1138 if not python:
1139 raise RuntimeError(
1140 'unable to find python on %s (tried %s in %s)' % (
1141 host, remotes.PYTHONS, remotes.PATH))
1142 try:
1143 out, err, code = remoto.process.check(
1144 conn,
1145 [python, '-u'],
1146 stdin=script.encode('utf-8'))
1147 except RuntimeError as e:
1148 self._reset_con(host)
1149 if error_ok:
1150 return [], [str(e)], 1
1151 raise
1152 elif self.mode == 'cephadm-package':
1153 try:
1154 out, err, code = remoto.process.check(
1155 conn,
1156 ['sudo', '/usr/bin/cephadm'] + final_args,
1157 stdin=stdin)
1158 except RuntimeError as e:
1159 self._reset_con(host)
1160 if error_ok:
1161 return [], [str(e)], 1
1162 raise
1163 else:
1164 assert False, 'unsupported mode'
1165
1166 self.log.debug('code: %d' % code)
1167 if out:
1168 self.log.debug('out: %s' % '\n'.join(out))
1169 if err:
1170 self.log.debug('err: %s' % '\n'.join(err))
1171 if code and not error_ok:
1172 raise OrchestratorError(
1173 'cephadm exited with an error code: %d, stderr:%s' % (
1174 code, '\n'.join(err)))
1175 return out, err, code
1176
1177 def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
1178 """
1179 Returns all hosts that went through _refresh_host_daemons().
1180
1181 This mitigates a potential race, where new host was added *after*
1182 ``_refresh_host_daemons()`` was called, but *before*
1183 ``_apply_all_specs()`` was called. thus we end up with a hosts
1184 where daemons might be running, but we have not yet detected them.
1185 """
1186 return [
1187 h for h in self.inventory.all_specs()
1188 if self.cache.host_had_daemon_refresh(h.hostname)
1189 ]
1190
1191 def _add_host(self, spec):
1192 # type: (HostSpec) -> str
1193 """
1194 Add a host to be managed by the orchestrator.
1195
1196 :param host: host name
1197 """
1198 assert_valid_host(spec.hostname)
1199 out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
1200 ['--expect-hostname', spec.hostname],
1201 addr=spec.addr,
1202 error_ok=True, no_fsid=True)
1203 if code:
1204 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1205 spec.hostname, spec.addr, err))
1206
1207 self.inventory.add_host(spec)
1208 self.cache.prime_empty_host(spec.hostname)
1209 self.offline_hosts_remove(spec.hostname)
1210 self.event.set() # refresh stray health check
1211 self.log.info('Added host %s' % spec.hostname)
1212 return "Added host '{}'".format(spec.hostname)
1213
1214 @trivial_completion
1215 def add_host(self, spec: HostSpec) -> str:
1216 return self._add_host(spec)
1217
1218 @trivial_completion
1219 def remove_host(self, host):
1220 # type: (str) -> str
1221 """
1222 Remove a host from orchestrator management.
1223
1224 :param host: host name
1225 """
1226 self.inventory.rm_host(host)
1227 self.cache.rm_host(host)
1228 self._reset_con(host)
1229 self.event.set() # refresh stray health check
1230 self.log.info('Removed host %s' % host)
1231 return "Removed host '{}'".format(host)
1232
1233 @trivial_completion
1234 def update_host_addr(self, host: str, addr: str) -> str:
1235 self.inventory.set_addr(host, addr)
1236 self._reset_con(host)
1237 self.event.set() # refresh stray health check
1238 self.log.info('Set host %s addr to %s' % (host, addr))
1239 return "Updated host '{}' addr to '{}'".format(host, addr)
1240
1241 @trivial_completion
1242 def get_hosts(self):
1243 # type: () -> List[orchestrator.HostSpec]
1244 """
1245 Return a list of hosts managed by the orchestrator.
1246
1247 Notes:
1248 - skip async: manager reads from cache.
1249 """
1250 return list(self.inventory.all_specs())
1251
1252 @trivial_completion
1253 def add_host_label(self, host: str, label: str) -> str:
1254 self.inventory.add_label(host, label)
1255 self.log.info('Added label %s to host %s' % (label, host))
1256 return 'Added label %s to host %s' % (label, host)
1257
1258 @trivial_completion
1259 def remove_host_label(self, host: str, label: str) -> str:
1260 self.inventory.rm_label(host, label)
1261 self.log.info('Removed label %s to host %s' % (label, host))
1262 return 'Removed label %s from host %s' % (label, host)
1263
1264 @trivial_completion
1265 def host_ok_to_stop(self, hostname: str) -> str:
1266 if hostname not in self.cache.get_hosts():
1267 raise OrchestratorError(f'Cannot find host "{hostname}"')
1268
1269 daemons = self.cache.get_daemons()
1270 daemon_map = defaultdict(lambda: [])
1271 for dd in daemons:
1272 if dd.hostname == hostname:
1273 daemon_map[dd.daemon_type].append(dd.daemon_id)
1274
1275 for daemon_type, daemon_ids in daemon_map.items():
1276 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
1277 if r.retval:
1278 self.log.error(f'It is NOT safe to stop host {hostname}')
1279 raise orchestrator.OrchestratorError(
1280 r.stderr,
1281 errno=r.retval)
1282
1283 msg = f'It is presumed safe to stop host {hostname}'
1284 self.log.info(msg)
1285 return msg
1286
1287 def get_minimal_ceph_conf(self) -> str:
1288 _, config, _ = self.check_mon_command({
1289 "prefix": "config generate-minimal-conf",
1290 })
1291 extra = self.extra_ceph_conf().conf
1292 if extra:
1293 config += '\n\n' + extra.strip() + '\n'
1294 return config
1295
1296 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
1297 if filter_host:
1298 self.cache.invalidate_host_daemons(filter_host)
1299 else:
1300 for h in self.cache.get_hosts():
1301 # Also discover daemons deployed manually
1302 self.cache.invalidate_host_daemons(h)
1303
1304 self._kick_serve_loop()
1305
1306 @trivial_completion
1307 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1308 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
1309 if refresh:
1310 self._invalidate_daemons_and_kick_serve()
1311 self.log.info('Kicked serve() loop to refresh all services')
1312
1313 # <service_map>
1314 sm: Dict[str, orchestrator.ServiceDescription] = {}
1315 osd_count = 0
1316 for h, dm in self.cache.get_daemons_with_volatile_status():
1317 for name, dd in dm.items():
1318 if service_type and service_type != dd.daemon_type:
1319 continue
1320 n: str = dd.service_name()
1321 if service_name and service_name != n:
1322 continue
1323 if dd.daemon_type == 'osd':
1324 """
1325 OSDs do not know the affinity to their spec out of the box.
1326 """
1327 n = f"osd.{dd.osdspec_affinity}"
1328 if not dd.osdspec_affinity:
1329 # If there is no osdspec_affinity, the spec should suffice for displaying
1330 continue
1331 if n in self.spec_store.specs:
1332 spec = self.spec_store.specs[n]
1333 else:
1334 spec = ServiceSpec(
1335 unmanaged=True,
1336 service_type=dd.daemon_type,
1337 service_id=dd.service_id(),
1338 placement=PlacementSpec(
1339 hosts=[dd.hostname]
1340 )
1341 )
1342 if n not in sm:
1343 sm[n] = orchestrator.ServiceDescription(
1344 last_refresh=dd.last_refresh,
1345 container_image_id=dd.container_image_id,
1346 container_image_name=dd.container_image_name,
1347 spec=spec,
1348 events=self.events.get_for_service(spec.service_name()),
1349 )
1350 if n in self.spec_store.specs:
1351 if dd.daemon_type == 'osd':
1352 """
1353 The osd count can't be determined by the Placement spec.
1354 Showing an actual/expected representation cannot be determined
1355 here. So we're setting running = size for now.
1356 """
1357 osd_count += 1
1358 sm[n].size = osd_count
1359 else:
1360 sm[n].size = spec.placement.get_host_selection_size(
1361 self.inventory.all_specs())
1362
1363 sm[n].created = self.spec_store.spec_created[n]
1364 if service_type == 'nfs':
1365 spec = cast(NFSServiceSpec, spec)
1366 sm[n].rados_config_location = spec.rados_config_location()
1367 else:
1368 sm[n].size = 0
1369 if dd.status == 1:
1370 sm[n].running += 1
1371 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1372 sm[n].last_refresh = dd.last_refresh
1373 if sm[n].container_image_id != dd.container_image_id:
1374 sm[n].container_image_id = 'mix'
1375 if sm[n].container_image_name != dd.container_image_name:
1376 sm[n].container_image_name = 'mix'
1377 for n, spec in self.spec_store.specs.items():
1378 if n in sm:
1379 continue
1380 if service_type is not None and service_type != spec.service_type:
1381 continue
1382 if service_name is not None and service_name != n:
1383 continue
1384 sm[n] = orchestrator.ServiceDescription(
1385 spec=spec,
1386 size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
1387 running=0,
1388 events=self.events.get_for_service(spec.service_name()),
1389 )
1390 if service_type == 'nfs':
1391 spec = cast(NFSServiceSpec, spec)
1392 sm[n].rados_config_location = spec.rados_config_location()
1393 return list(sm.values())
1394
1395 @trivial_completion
1396 def list_daemons(self,
1397 service_name: Optional[str] = None,
1398 daemon_type: Optional[str] = None,
1399 daemon_id: Optional[str] = None,
1400 host: Optional[str] = None,
1401 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
1402 if refresh:
1403 self._invalidate_daemons_and_kick_serve(host)
1404 self.log.info('Kicked serve() loop to refresh all daemons')
1405
1406 result = []
1407 for h, dm in self.cache.get_daemons_with_volatile_status():
1408 if host and h != host:
1409 continue
1410 for name, dd in dm.items():
1411 if daemon_type is not None and daemon_type != dd.daemon_type:
1412 continue
1413 if daemon_id is not None and daemon_id != dd.daemon_id:
1414 continue
1415 if service_name is not None and service_name != dd.service_name():
1416 continue
1417 result.append(dd)
1418 return result
1419
1420 @trivial_completion
1421 def service_action(self, action: str, service_name: str) -> List[str]:
1422 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
1423 self.log.info('%s service %s' % (action.capitalize(), service_name))
1424 return [
1425 self._schedule_daemon_action(dd.name(), action)
1426 for dd in dds
1427 ]
1428
1429 def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str:
1430 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
1431 host=host,
1432 daemon_id=daemon_id,
1433 daemon_type=daemon_type,
1434 )
1435
1436 self._daemon_action_set_image(action, image, daemon_type, daemon_id)
1437
1438 if action == 'redeploy':
1439 if self.daemon_is_self(daemon_type, daemon_id):
1440 self.mgr_service.fail_over()
1441 return '' # unreachable
1442 # stop, recreate the container+unit, then restart
1443 return self._create_daemon(daemon_spec)
1444 elif action == 'reconfig':
1445 return self._create_daemon(daemon_spec, reconfig=True)
1446
1447 actions = {
1448 'start': ['reset-failed', 'start'],
1449 'stop': ['stop'],
1450 'restart': ['reset-failed', 'restart'],
1451 }
1452 name = daemon_spec.name()
1453 for a in actions[action]:
1454 try:
1455 out, err, code = self._run_cephadm(
1456 host, name, 'unit',
1457 ['--name', name, a])
1458 except Exception:
1459 self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
1460 self.cache.invalidate_host_daemons(daemon_spec.host)
1461 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1462 self.events.for_daemon(name, 'INFO', msg)
1463 return msg
1464
1465 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
1466 if image is not None:
1467 if action != 'redeploy':
1468 raise OrchestratorError(
1469 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1470 if daemon_type not in CEPH_TYPES:
1471 raise OrchestratorError(
1472 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1473 f'types are: {", ".join(CEPH_TYPES)}')
1474
1475 self.check_mon_command({
1476 'prefix': 'config set',
1477 'name': 'container_image',
1478 'value': image,
1479 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1480 })
1481
1482 @trivial_completion
1483 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
1484 d = self.cache.get_daemon(daemon_name)
1485
1486 if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
1487 and not self.mgr_service.mgr_map_has_standby():
1488 raise OrchestratorError(
1489 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1490
1491 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
1492
1493 self.log.info(f'Schedule {action} daemon {daemon_name}')
1494 return self._schedule_daemon_action(daemon_name, action)
1495
1496 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
1497 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
1498
1499 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
1500 dd = self.cache.get_daemon(daemon_name)
1501 if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
1502 and not self.mgr_service.mgr_map_has_standby():
1503 raise OrchestratorError(
1504 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1505 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
1506 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
1507 self._kick_serve_loop()
1508 return msg
1509
1510 @trivial_completion
1511 def remove_daemons(self, names):
1512 # type: (List[str]) -> List[str]
1513 args = []
1514 for host, dm in self.cache.daemons.items():
1515 for name in names:
1516 if name in dm:
1517 args.append((name, host))
1518 if not args:
1519 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1520 self.log.info('Remove daemons %s' % [a[0] for a in args])
1521 return self._remove_daemons(args)
1522
1523 @trivial_completion
1524 def remove_service(self, service_name: str) -> str:
1525 self.log.info('Remove service %s' % service_name)
1526 self._trigger_preview_refresh(service_name=service_name)
1527 found = self.spec_store.rm(service_name)
1528 if found:
1529 self._kick_serve_loop()
1530 return 'Removed service %s' % service_name
1531 else:
1532 # must be idempotent: still a success.
1533 return f'Failed to remove service. <{service_name}> was not found.'
1534
1535 @trivial_completion
1536 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
1537 """
1538 Return the storage inventory of hosts matching the given filter.
1539
1540 :param host_filter: host filter
1541
1542 TODO:
1543 - add filtering by label
1544 """
1545 if refresh:
1546 if host_filter and host_filter.hosts:
1547 for h in host_filter.hosts:
1548 self.cache.invalidate_host_devices(h)
1549 else:
1550 for h in self.cache.get_hosts():
1551 self.cache.invalidate_host_devices(h)
1552
1553 self.event.set()
1554 self.log.info('Kicked serve() loop to refresh devices')
1555
1556 result = []
1557 for host, dls in self.cache.devices.items():
1558 if host_filter and host_filter.hosts and host not in host_filter.hosts:
1559 continue
1560 result.append(orchestrator.InventoryHost(host,
1561 inventory.Devices(dls)))
1562 return result
1563
1564 @trivial_completion
1565 def zap_device(self, host: str, path: str) -> str:
1566 self.log.info('Zap device %s:%s' % (host, path))
1567 out, err, code = self._run_cephadm(
1568 host, 'osd', 'ceph-volume',
1569 ['--', 'lvm', 'zap', '--destroy', path],
1570 error_ok=True)
1571 self.cache.invalidate_host_devices(host)
1572 if code:
1573 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1574 return '\n'.join(out + err)
1575
1576 @trivial_completion
1577 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
1578 """
1579 Blink a device light. Calling something like::
1580
1581 lsmcli local-disk-ident-led-on --path $path
1582
1583 If you must, you can customize this via::
1584
1585 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1586 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1587
1588 See templates/blink_device_light_cmd.j2
1589 """
1590 @forall_hosts
1591 def blink(host: str, dev: str, path: str) -> str:
1592 cmd_line = self.template.render('blink_device_light_cmd.j2',
1593 {
1594 'on': on,
1595 'ident_fault': ident_fault,
1596 'dev': dev,
1597 'path': path
1598 },
1599 host=host)
1600 cmd_args = shlex.split(cmd_line)
1601
1602 out, err, code = self._run_cephadm(
1603 host, 'osd', 'shell', ['--'] + cmd_args,
1604 error_ok=True)
1605 if code:
1606 raise OrchestratorError(
1607 'Unable to affect %s light for %s:%s. Command: %s' % (
1608 ident_fault, host, dev, ' '.join(cmd_args)))
1609 self.log.info('Set %s light for %s:%s %s' % (
1610 ident_fault, host, dev, 'on' if on else 'off'))
1611 return "Set %s light for %s:%s %s" % (
1612 ident_fault, host, dev, 'on' if on else 'off')
1613
1614 return blink(locs)
1615
1616 def get_osd_uuid_map(self, only_up=False):
1617 # type: (bool) -> Dict[str, str]
1618 osd_map = self.get('osd_map')
1619 r = {}
1620 for o in osd_map['osds']:
1621 # only include OSDs that have ever started in this map. this way
1622 # an interrupted osd create can be repeated and succeed the second
1623 # time around.
1624 osd_id = o.get('osd')
1625 if osd_id is None:
1626 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1627 if not only_up or (o['up_from'] > 0):
1628 r[str(osd_id)] = o.get('uuid', '')
1629 return r
1630
1631 def _trigger_preview_refresh(self,
1632 specs: Optional[List[DriveGroupSpec]] = None,
1633 service_name: Optional[str] = None,
1634 ) -> None:
1635 # Only trigger a refresh when a spec has changed
1636 trigger_specs = []
1637 if specs:
1638 for spec in specs:
1639 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
1640 # the to-be-preview spec != the actual spec, this means we need to
1641 # trigger a refresh, if the spec has been removed (==None) we need to
1642 # refresh as well.
1643 if not preview_spec or spec != preview_spec:
1644 trigger_specs.append(spec)
1645 if service_name:
1646 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
1647 if not any(trigger_specs):
1648 return None
1649
1650 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
1651 for host in refresh_hosts:
1652 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1653 self.cache.osdspec_previews_refresh_queue.append(host)
1654
1655 @trivial_completion
1656 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
1657 """
1658 Deprecated. Please use `apply()` instead.
1659
1660 Keeping this around to be compapatible to mgr/dashboard
1661 """
1662 return [self._apply(spec) for spec in specs]
1663
1664 @trivial_completion
1665 def create_osds(self, drive_group: DriveGroupSpec) -> str:
1666 return self.osd_service.create_from_spec(drive_group)
1667
1668 def _preview_osdspecs(self,
1669 osdspecs: Optional[List[DriveGroupSpec]] = None
1670 ) -> dict:
1671 if not osdspecs:
1672 return {'n/a': [{'error': True,
1673 'message': 'No OSDSpec or matching hosts found.'}]}
1674 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
1675 if not matching_hosts:
1676 return {'n/a': [{'error': True,
1677 'message': 'No OSDSpec or matching hosts found.'}]}
1678 # Is any host still loading previews or still in the queue to be previewed
1679 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
1680 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
1681 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1682 return {'n/a': [{'error': True,
1683 'message': 'Preview data is being generated.. '
1684 'Please re-run this command in a bit.'}]}
1685 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1686 previews_for_specs = {}
1687 for host, raw_reports in self.cache.osdspec_previews.items():
1688 if host not in matching_hosts:
1689 continue
1690 osd_reports = []
1691 for osd_report in raw_reports:
1692 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
1693 osd_reports.append(osd_report)
1694 previews_for_specs.update({host: osd_reports})
1695 return previews_for_specs
1696
1697 def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]:
1698 need = {
1699 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1700 'grafana': ['prometheus'],
1701 'alertmanager': ['mgr', 'alertmanager'],
1702 }
1703 deps = []
1704 for dep_type in need.get(daemon_type, []):
1705 for dd in self.cache.get_daemons_by_service(dep_type):
1706 deps.append(dd.name())
1707 return sorted(deps)
1708
1709 def _create_daemon(self,
1710 daemon_spec: CephadmDaemonSpec,
1711 reconfig: bool = False,
1712 osd_uuid_map: Optional[Dict[str, Any]] = None,
1713 ) -> str:
1714
1715 with set_exception_subject('service', orchestrator.DaemonDescription(
1716 daemon_type=daemon_spec.daemon_type,
1717 daemon_id=daemon_spec.daemon_id,
1718 hostname=daemon_spec.host,
1719 ).service_id(), overwrite=True):
1720
1721 image = ''
1722 start_time = datetime_now()
1723 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1724
1725 if daemon_spec.daemon_type == 'container':
1726 spec: Optional[CustomContainerSpec] = daemon_spec.spec
1727 if spec is None:
1728 # Exit here immediately because the required service
1729 # spec to create a daemon is not provided. This is only
1730 # provided when a service is applied via 'orch apply'
1731 # command.
1732 msg = "Failed to {} daemon {} on {}: Required " \
1733 "service specification not provided".format(
1734 'reconfigure' if reconfig else 'deploy',
1735 daemon_spec.name(), daemon_spec.host)
1736 self.log.info(msg)
1737 return msg
1738 image = spec.image
1739 if spec.ports:
1740 ports.extend(spec.ports)
1741
1742 cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
1743 daemon_spec)
1744
1745 # TCP port to open in the host firewall
1746 if len(ports) > 0:
1747 daemon_spec.extra_args.extend([
1748 '--tcp-ports', ' '.join(map(str, ports))
1749 ])
1750
1751 # osd deployments needs an --osd-uuid arg
1752 if daemon_spec.daemon_type == 'osd':
1753 if not osd_uuid_map:
1754 osd_uuid_map = self.get_osd_uuid_map()
1755 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1756 if not osd_uuid:
1757 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1758 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1759
1760 if reconfig:
1761 daemon_spec.extra_args.append('--reconfig')
1762 if self.allow_ptrace:
1763 daemon_spec.extra_args.append('--allow-ptrace')
1764
1765 if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
1766 self._registry_login(daemon_spec.host, self.registry_url,
1767 self.registry_username, self.registry_password)
1768
1769 daemon_spec.extra_args.extend(['--config-json', '-'])
1770
1771 self.log.info('%s daemon %s on %s' % (
1772 'Reconfiguring' if reconfig else 'Deploying',
1773 daemon_spec.name(), daemon_spec.host))
1774
1775 out, err, code = self._run_cephadm(
1776 daemon_spec.host, daemon_spec.name(), 'deploy',
1777 [
1778 '--name', daemon_spec.name(),
1779 ] + daemon_spec.extra_args,
1780 stdin=json.dumps(cephadm_config),
1781 image=image)
1782 if not code and daemon_spec.host in self.cache.daemons:
1783 # prime cached service state with what we (should have)
1784 # just created
1785 sd = orchestrator.DaemonDescription()
1786 sd.daemon_type = daemon_spec.daemon_type
1787 sd.daemon_id = daemon_spec.daemon_id
1788 sd.hostname = daemon_spec.host
1789 sd.status = 1
1790 sd.status_desc = 'starting'
1791 self.cache.add_daemon(daemon_spec.host, sd)
1792 if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1793 self.requires_post_actions.add(daemon_spec.daemon_type)
1794 self.cache.invalidate_host_daemons(daemon_spec.host)
1795 self.cache.update_daemon_config_deps(
1796 daemon_spec.host, daemon_spec.name(), deps, start_time)
1797 self.cache.save_host(daemon_spec.host)
1798 msg = "{} {} on host '{}'".format(
1799 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1800 if not code:
1801 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1802 else:
1803 what = 'reconfigure' if reconfig else 'deploy'
1804 self.events.for_daemon(
1805 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1806 return msg
1807
1808 @forall_hosts
1809 def _remove_daemons(self, name: str, host: str) -> str:
1810 return self._remove_daemon(name, host)
1811
1812 def _remove_daemon(self, name: str, host: str) -> str:
1813 """
1814 Remove a daemon
1815 """
1816 (daemon_type, daemon_id) = name.split('.', 1)
1817 daemon = orchestrator.DaemonDescription(
1818 daemon_type=daemon_type,
1819 daemon_id=daemon_id,
1820 hostname=host)
1821
1822 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1823
1824 self.cephadm_services[daemon_type].pre_remove(daemon)
1825
1826 args = ['--name', name, '--force']
1827 self.log.info('Removing daemon %s from %s' % (name, host))
1828 out, err, code = self._run_cephadm(
1829 host, name, 'rm-daemon', args)
1830 if not code:
1831 # remove item from cache
1832 self.cache.rm_daemon(host, name)
1833 self.cache.invalidate_host_daemons(host)
1834
1835 self.cephadm_services[daemon_type].post_remove(daemon)
1836
1837 return "Removed {} from host '{}'".format(name, host)
1838
1839 def _check_pool_exists(self, pool: str, service_name: str) -> None:
1840 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
1841 if not self.rados.pool_exists(pool):
1842 raise OrchestratorError(f'Cannot find pool "{pool}" for '
1843 f'service {service_name}')
1844
1845 def _add_daemon(self,
1846 daemon_type: str,
1847 spec: ServiceSpec,
1848 create_func: Callable[..., CephadmDaemonSpec],
1849 config_func: Optional[Callable] = None) -> List[str]:
1850 """
1851 Add (and place) a daemon. Require explicit host placement. Do not
1852 schedule, and do not apply the related scheduling limitations.
1853 """
1854 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
1855 if not spec.placement.hosts:
1856 raise OrchestratorError('must specify host(s) to deploy on')
1857 count = spec.placement.count or len(spec.placement.hosts)
1858 daemons = self.cache.get_daemons_by_service(spec.service_name())
1859 return self._create_daemons(daemon_type, spec, daemons,
1860 spec.placement.hosts, count,
1861 create_func, config_func)
1862
1863 def _create_daemons(self,
1864 daemon_type: str,
1865 spec: ServiceSpec,
1866 daemons: List[DaemonDescription],
1867 hosts: List[HostPlacementSpec],
1868 count: int,
1869 create_func: Callable[..., CephadmDaemonSpec],
1870 config_func: Optional[Callable] = None) -> List[str]:
1871 if count > len(hosts):
1872 raise OrchestratorError('too few hosts: want %d, have %s' % (
1873 count, hosts))
1874
1875 did_config = False
1876
1877 args = [] # type: List[CephadmDaemonSpec]
1878 for host, network, name in hosts:
1879 daemon_id = self.get_unique_name(daemon_type, host, daemons,
1880 prefix=spec.service_id,
1881 forcename=name)
1882
1883 if not did_config and config_func:
1884 if daemon_type == 'rgw':
1885 config_func(spec, daemon_id)
1886 else:
1887 config_func(spec)
1888 did_config = True
1889
1890 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
1891 host, daemon_id, network, spec)
1892 self.log.debug('Placing %s.%s on host %s' % (
1893 daemon_type, daemon_id, host))
1894 args.append(daemon_spec)
1895
1896 # add to daemon list so next name(s) will also be unique
1897 sd = orchestrator.DaemonDescription(
1898 hostname=host,
1899 daemon_type=daemon_type,
1900 daemon_id=daemon_id,
1901 )
1902 daemons.append(sd)
1903
1904 @forall_hosts
1905 def create_func_map(*args: Any) -> str:
1906 daemon_spec = create_func(*args)
1907 return self._create_daemon(daemon_spec)
1908
1909 return create_func_map(args)
1910
1911 @trivial_completion
1912 def apply_mon(self, spec: ServiceSpec) -> str:
1913 return self._apply(spec)
1914
1915 @trivial_completion
1916 def add_mon(self, spec):
1917 # type: (ServiceSpec) -> List[str]
1918 return self._add_daemon('mon', spec, self.mon_service.prepare_create)
1919
1920 @trivial_completion
1921 def add_mgr(self, spec):
1922 # type: (ServiceSpec) -> List[str]
1923 return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
1924
1925 def _apply(self, spec: GenericSpec) -> str:
1926 if spec.service_type == 'host':
1927 return self._add_host(cast(HostSpec, spec))
1928
1929 if spec.service_type == 'osd':
1930 # _trigger preview refresh needs to be smart and
1931 # should only refresh if a change has been detected
1932 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
1933
1934 return self._apply_service_spec(cast(ServiceSpec, spec))
1935
1936 def _plan(self, spec: ServiceSpec) -> dict:
1937 if spec.service_type == 'osd':
1938 return {'service_name': spec.service_name(),
1939 'service_type': spec.service_type,
1940 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
1941
1942 ha = HostAssignment(
1943 spec=spec,
1944 hosts=self._hosts_with_daemon_inventory(),
1945 get_daemons_func=self.cache.get_daemons_by_service,
1946 )
1947 ha.validate()
1948 hosts = ha.place()
1949
1950 add_daemon_hosts = ha.add_daemon_hosts(hosts)
1951 remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
1952
1953 return {
1954 'service_name': spec.service_name(),
1955 'service_type': spec.service_type,
1956 'add': [hs.hostname for hs in add_daemon_hosts],
1957 'remove': [d.hostname for d in remove_daemon_hosts]
1958 }
1959
1960 @trivial_completion
1961 def plan(self, specs: List[GenericSpec]) -> List:
1962 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1963 'to the current inventory setup. If any on these conditions changes, the \n'
1964 'preview will be invalid. Please make sure to have a minimal \n'
1965 'timeframe between planning and applying the specs.'}]
1966 if any([spec.service_type == 'host' for spec in specs]):
1967 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1968 for spec in specs:
1969 results.append(self._plan(cast(ServiceSpec, spec)))
1970 return results
1971
1972 def _apply_service_spec(self, spec: ServiceSpec) -> str:
1973 if spec.placement.is_empty():
1974 # fill in default placement
1975 defaults = {
1976 'mon': PlacementSpec(count=5),
1977 'mgr': PlacementSpec(count=2),
1978 'mds': PlacementSpec(count=2),
1979 'rgw': PlacementSpec(count=2),
1980 'iscsi': PlacementSpec(count=1),
1981 'rbd-mirror': PlacementSpec(count=2),
1982 'nfs': PlacementSpec(count=1),
1983 'grafana': PlacementSpec(count=1),
1984 'alertmanager': PlacementSpec(count=1),
1985 'prometheus': PlacementSpec(count=1),
1986 'node-exporter': PlacementSpec(host_pattern='*'),
1987 'crash': PlacementSpec(host_pattern='*'),
1988 'container': PlacementSpec(count=1),
1989 }
1990 spec.placement = defaults[spec.service_type]
1991 elif spec.service_type in ['mon', 'mgr'] and \
1992 spec.placement.count is not None and \
1993 spec.placement.count < 1:
1994 raise OrchestratorError('cannot scale %s service below 1' % (
1995 spec.service_type))
1996
1997 HostAssignment(
1998 spec=spec,
1999 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
2000 get_daemons_func=self.cache.get_daemons_by_service,
2001 ).validate()
2002
2003 self.log.info('Saving service %s spec with placement %s' % (
2004 spec.service_name(), spec.placement.pretty_str()))
2005 self.spec_store.save(spec)
2006 self._kick_serve_loop()
2007 return "Scheduled %s update..." % spec.service_name()
2008
2009 @trivial_completion
2010 def apply(self, specs: List[GenericSpec]) -> List[str]:
2011 results = []
2012 for spec in specs:
2013 results.append(self._apply(spec))
2014 return results
2015
2016 @trivial_completion
2017 def apply_mgr(self, spec: ServiceSpec) -> str:
2018 return self._apply(spec)
2019
2020 @trivial_completion
2021 def add_mds(self, spec: ServiceSpec) -> List[str]:
2022 return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config)
2023
2024 @trivial_completion
2025 def apply_mds(self, spec: ServiceSpec) -> str:
2026 return self._apply(spec)
2027
2028 @trivial_completion
2029 def add_rgw(self, spec: ServiceSpec) -> List[str]:
2030 return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
2031
2032 @trivial_completion
2033 def apply_rgw(self, spec: ServiceSpec) -> str:
2034 return self._apply(spec)
2035
2036 @trivial_completion
2037 def add_iscsi(self, spec):
2038 # type: (ServiceSpec) -> List[str]
2039 return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
2040
2041 @trivial_completion
2042 def apply_iscsi(self, spec: ServiceSpec) -> str:
2043 return self._apply(spec)
2044
2045 @trivial_completion
2046 def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
2047 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
2048
2049 @trivial_completion
2050 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
2051 return self._apply(spec)
2052
2053 @trivial_completion
2054 def add_nfs(self, spec: ServiceSpec) -> List[str]:
2055 return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
2056
2057 @trivial_completion
2058 def apply_nfs(self, spec: ServiceSpec) -> str:
2059 return self._apply(spec)
2060
2061 def _get_dashboard_url(self):
2062 # type: () -> str
2063 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2064
2065 @trivial_completion
2066 def add_prometheus(self, spec: ServiceSpec) -> List[str]:
2067 return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
2068
2069 @trivial_completion
2070 def apply_prometheus(self, spec: ServiceSpec) -> str:
2071 return self._apply(spec)
2072
2073 @trivial_completion
2074 def add_node_exporter(self, spec):
2075 # type: (ServiceSpec) -> List[str]
2076 return self._add_daemon('node-exporter', spec,
2077 self.node_exporter_service.prepare_create)
2078
2079 @trivial_completion
2080 def apply_node_exporter(self, spec: ServiceSpec) -> str:
2081 return self._apply(spec)
2082
2083 @trivial_completion
2084 def add_crash(self, spec):
2085 # type: (ServiceSpec) -> List[str]
2086 return self._add_daemon('crash', spec,
2087 self.crash_service.prepare_create)
2088
2089 @trivial_completion
2090 def apply_crash(self, spec: ServiceSpec) -> str:
2091 return self._apply(spec)
2092
2093 @trivial_completion
2094 def add_grafana(self, spec):
2095 # type: (ServiceSpec) -> List[str]
2096 return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
2097
2098 @trivial_completion
2099 def apply_grafana(self, spec: ServiceSpec) -> str:
2100 return self._apply(spec)
2101
2102 @trivial_completion
2103 def add_alertmanager(self, spec):
2104 # type: (ServiceSpec) -> List[str]
2105 return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
2106
2107 @trivial_completion
2108 def apply_alertmanager(self, spec: ServiceSpec) -> str:
2109 return self._apply(spec)
2110
2111 @trivial_completion
2112 def add_container(self, spec: ServiceSpec) -> List[str]:
2113 return self._add_daemon('container', spec,
2114 self.container_service.prepare_create)
2115
2116 @trivial_completion
2117 def apply_container(self, spec: ServiceSpec) -> str:
2118 return self._apply(spec)
2119
2120 def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
2121 # pick a random host...
2122 host = None
2123 for host_name in self.inventory.keys():
2124 host = host_name
2125 break
2126 if not host:
2127 raise OrchestratorError('no hosts defined')
2128 if self.cache.host_needs_registry_login(host) and self.registry_url:
2129 self._registry_login(host, self.registry_url,
2130 self.registry_username, self.registry_password)
2131 out, err, code = self._run_cephadm(
2132 host, '', 'pull', [],
2133 image=image_name,
2134 no_fsid=True,
2135 error_ok=True)
2136 if code:
2137 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2138 image_name, host, '\n'.join(out)))
2139 try:
2140 j = json.loads('\n'.join(out))
2141 r = ContainerInspectInfo(
2142 j['image_id'],
2143 j.get('ceph_version'),
2144 j.get('repo_digest')
2145 )
2146 self.log.debug(f'image {image_name} -> {r}')
2147 return r
2148 except (ValueError, KeyError) as _:
2149 msg = 'Failed to pull %s on %s: Cannot decode JSON' % (image_name, host)
2150 self.log.exception('%s: \'%s\'' % (msg, '\n'.join(out)))
2151 raise OrchestratorError(msg)
2152
2153 @trivial_completion
2154 def upgrade_check(self, image: str, version: str) -> str:
2155 if version:
2156 target_name = self.container_image_base + ':v' + version
2157 elif image:
2158 target_name = image
2159 else:
2160 raise OrchestratorError('must specify either image or version')
2161
2162 image_info = self._get_container_image_info(target_name)
2163 self.log.debug(f'image info {image} -> {image_info}')
2164 r: dict = {
2165 'target_name': target_name,
2166 'target_id': image_info.image_id,
2167 'target_version': image_info.ceph_version,
2168 'needs_update': dict(),
2169 'up_to_date': list(),
2170 }
2171 for host, dm in self.cache.daemons.items():
2172 for name, dd in dm.items():
2173 if image_info.image_id == dd.container_image_id:
2174 r['up_to_date'].append(dd.name())
2175 else:
2176 r['needs_update'][dd.name()] = {
2177 'current_name': dd.container_image_name,
2178 'current_id': dd.container_image_id,
2179 'current_version': dd.version,
2180 }
2181 if self.use_repo_digest:
2182 r['target_digest'] = image_info.repo_digest
2183
2184 return json.dumps(r, indent=4, sort_keys=True)
2185
2186 @trivial_completion
2187 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
2188 return self.upgrade.upgrade_status()
2189
2190 @trivial_completion
2191 def upgrade_start(self, image: str, version: str) -> str:
2192 return self.upgrade.upgrade_start(image, version)
2193
2194 @trivial_completion
2195 def upgrade_pause(self) -> str:
2196 return self.upgrade.upgrade_pause()
2197
2198 @trivial_completion
2199 def upgrade_resume(self) -> str:
2200 return self.upgrade.upgrade_resume()
2201
2202 @trivial_completion
2203 def upgrade_stop(self) -> str:
2204 return self.upgrade.upgrade_stop()
2205
2206 @trivial_completion
2207 def remove_osds(self, osd_ids: List[str],
2208 replace: bool = False,
2209 force: bool = False) -> str:
2210 """
2211 Takes a list of OSDs and schedules them for removal.
2212 The function that takes care of the actual removal is
2213 process_removal_queue().
2214 """
2215
2216 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2217 to_remove_daemons = list()
2218 for daemon in daemons:
2219 if daemon.daemon_id in osd_ids:
2220 to_remove_daemons.append(daemon)
2221 if not to_remove_daemons:
2222 return f"Unable to find OSDs: {osd_ids}"
2223
2224 for daemon in to_remove_daemons:
2225 try:
2226 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2227 replace=replace,
2228 force=force,
2229 hostname=daemon.hostname,
2230 fullname=daemon.name(),
2231 process_started_at=datetime_now(),
2232 remove_util=self.to_remove_osds.rm_util))
2233 except NotFoundError:
2234 return f"Unable to find OSDs: {osd_ids}"
2235
2236 # trigger the serve loop to initiate the removal
2237 self._kick_serve_loop()
2238 return "Scheduled OSD(s) for removal"
2239
2240 @trivial_completion
2241 def stop_remove_osds(self, osd_ids: List[str]) -> str:
2242 """
2243 Stops a `removal` process for a List of OSDs.
2244 This will revert their weight and remove it from the osds_to_remove queue
2245 """
2246 for osd_id in osd_ids:
2247 try:
2248 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
2249 remove_util=self.to_remove_osds.rm_util))
2250 except (NotFoundError, KeyError):
2251 return f'Unable to find OSD in the queue: {osd_id}'
2252
2253 # trigger the serve loop to halt the removal
2254 self._kick_serve_loop()
2255 return "Stopped OSD(s) removal"
2256
2257 @trivial_completion
2258 def remove_osds_status(self) -> List[OSD]:
2259 """
2260 The CLI call to retrieve an osd removal report
2261 """
2262 return self.to_remove_osds.all_osds()