]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
096d0e89336de59ec78ec71fb377a6ee4ebbb0cf
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 import re
5 import shlex
6 from collections import defaultdict
7 from configparser import ConfigParser
8 from contextlib import contextmanager
9 from functools import wraps
10 from tempfile import TemporaryDirectory
11 from threading import Event
12
13 import string
14 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
15 Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
16
17 import datetime
18 import six
19 import os
20 import random
21 import tempfile
22 import multiprocessing.pool
23 import subprocess
24
25 from ceph.deployment import inventory
26 from ceph.deployment.drive_group import DriveGroupSpec
27 from ceph.deployment.service_spec import \
28 NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
29 CustomContainerSpec, HostPlacementSpec
30 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
31 from cephadm.serve import CephadmServe
32 from cephadm.services.cephadmservice import CephadmDaemonSpec
33
34 from mgr_module import MgrModule, HandleCommandResult
35 import orchestrator
36 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
37 CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
38 from orchestrator._interface import GenericSpec
39
40 from . import remotes
41 from . import utils
42 from .migrations import Migrations
43 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
44 RbdMirrorService, CrashService, CephadmService
45 from .services.container import CustomContainerService
46 from .services.iscsi import IscsiService
47 from .services.nfs import NFSService
48 from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
49 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
50 NodeExporterService
51 from .schedule import HostAssignment
52 from .inventory import Inventory, SpecStore, HostCache, EventStore
53 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
54 from .template import TemplateMgr
55 from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
56
57 try:
58 import remoto
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils.version import StrictVersion
62 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self: Any) -> bool:
64 return self.gateway.hasreceiver()
65
66 from remoto.backends import BaseConnection
67 BaseConnection.has_connection = remoto_has_connection
68 import remoto.process
69 import execnet.gateway_bootstrap
70 except ImportError as e:
71 remoto = None
72 remoto_import_error = str(e)
73
74 try:
75 from typing import List
76 except ImportError:
77 pass
78
79 logger = logging.getLogger(__name__)
80
81 T = TypeVar('T')
82
83 DEFAULT_SSH_CONFIG = """
84 Host *
85 User root
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
88 ConnectTimeout=30
89 """
90
91 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
92
93
94 class CephadmCompletion(orchestrator.Completion[T]):
95 def evaluate(self) -> None:
96 self.finalize(None)
97
98
99 def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
100 """
101 Decorator to make CephadmCompletion methods return
102 a completion object that executes themselves.
103 """
104
105 @wraps(f)
106 def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion:
107 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
108
109 return wrapper
110
111
112 class ContainerInspectInfo(NamedTuple):
113 image_id: str
114 ceph_version: Optional[str]
115 repo_digest: Optional[str]
116
117
118 @six.add_metaclass(CLICommandMeta)
119 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
120
121 _STORE_HOST_PREFIX = "host"
122
123 instance = None
124 NATIVE_OPTIONS = [] # type: List[Any]
125 MODULE_OPTIONS: List[dict] = [
126 {
127 'name': 'ssh_config_file',
128 'type': 'str',
129 'default': None,
130 'desc': 'customized SSH config file to connect to managed hosts',
131 },
132 {
133 'name': 'device_cache_timeout',
134 'type': 'secs',
135 'default': 30 * 60,
136 'desc': 'seconds to cache device inventory',
137 },
138 {
139 'name': 'daemon_cache_timeout',
140 'type': 'secs',
141 'default': 10 * 60,
142 'desc': 'seconds to cache service (daemon) inventory',
143 },
144 {
145 'name': 'facts_cache_timeout',
146 'type': 'secs',
147 'default': 1 * 60,
148 'desc': 'seconds to cache host facts data',
149 },
150 {
151 'name': 'host_check_interval',
152 'type': 'secs',
153 'default': 10 * 60,
154 'desc': 'how frequently to perform a host check',
155 },
156 {
157 'name': 'mode',
158 'type': 'str',
159 'enum_allowed': ['root', 'cephadm-package'],
160 'default': 'root',
161 'desc': 'mode for remote execution of cephadm',
162 },
163 {
164 'name': 'container_image_base',
165 'default': 'docker.io/ceph/ceph',
166 'desc': 'Container image name, without the tag',
167 'runtime': True,
168 },
169 {
170 'name': 'container_image_prometheus',
171 'default': 'docker.io/prom/prometheus:v2.18.1',
172 'desc': 'Prometheus container image',
173 },
174 {
175 'name': 'container_image_grafana',
176 'default': 'docker.io/ceph/ceph-grafana:6.6.2',
177 'desc': 'Prometheus container image',
178 },
179 {
180 'name': 'container_image_alertmanager',
181 'default': 'docker.io/prom/alertmanager:v0.20.0',
182 'desc': 'Prometheus container image',
183 },
184 {
185 'name': 'container_image_node_exporter',
186 'default': 'docker.io/prom/node-exporter:v0.18.1',
187 'desc': 'Prometheus container image',
188 },
189 {
190 'name': 'warn_on_stray_hosts',
191 'type': 'bool',
192 'default': True,
193 'desc': 'raise a health warning if daemons are detected on a host '
194 'that is not managed by cephadm',
195 },
196 {
197 'name': 'warn_on_stray_daemons',
198 'type': 'bool',
199 'default': True,
200 'desc': 'raise a health warning if daemons are detected '
201 'that are not managed by cephadm',
202 },
203 {
204 'name': 'warn_on_failed_host_check',
205 'type': 'bool',
206 'default': True,
207 'desc': 'raise a health warning if the host check fails',
208 },
209 {
210 'name': 'log_to_cluster',
211 'type': 'bool',
212 'default': True,
213 'desc': 'log to the "cephadm" cluster log channel"',
214 },
215 {
216 'name': 'allow_ptrace',
217 'type': 'bool',
218 'default': False,
219 'desc': 'allow SYS_PTRACE capability on ceph containers',
220 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
221 'process with gdb or strace. Enabling this options '
222 'can allow debugging daemons that encounter problems '
223 'at runtime.',
224 },
225 {
226 'name': 'container_init',
227 'type': 'bool',
228 'default': False,
229 'desc': 'Run podman/docker with `--init`',
230 },
231 {
232 'name': 'prometheus_alerts_path',
233 'type': 'str',
234 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
235 'desc': 'location of alerts to include in prometheus deployments',
236 },
237 {
238 'name': 'migration_current',
239 'type': 'int',
240 'default': None,
241 'desc': 'internal - do not modify',
242 # used to track track spec and other data migrations.
243 },
244 {
245 'name': 'config_dashboard',
246 'type': 'bool',
247 'default': True,
248 'desc': 'manage configs like API endpoints in Dashboard.'
249 },
250 {
251 'name': 'manage_etc_ceph_ceph_conf',
252 'type': 'bool',
253 'default': False,
254 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
255 },
256 {
257 'name': 'registry_url',
258 'type': 'str',
259 'default': None,
260 'desc': 'Custom repository url'
261 },
262 {
263 'name': 'registry_username',
264 'type': 'str',
265 'default': None,
266 'desc': 'Custom repository username'
267 },
268 {
269 'name': 'registry_password',
270 'type': 'str',
271 'default': None,
272 'desc': 'Custom repository password'
273 },
274 {
275 'name': 'use_repo_digest',
276 'type': 'bool',
277 'default': False,
278 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
279 }
280 ]
281
282 def __init__(self, *args: Any, **kwargs: Any):
283 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
284 self._cluster_fsid = self.get('mon_map')['fsid']
285 self.last_monmap: Optional[datetime.datetime] = None
286
287 # for serve()
288 self.run = True
289 self.event = Event()
290
291 if self.get_store('pause'):
292 self.paused = True
293 else:
294 self.paused = False
295
296 # for mypy which does not run the code
297 if TYPE_CHECKING:
298 self.ssh_config_file = None # type: Optional[str]
299 self.device_cache_timeout = 0
300 self.daemon_cache_timeout = 0
301 self.facts_cache_timeout = 0
302 self.host_check_interval = 0
303 self.mode = ''
304 self.container_image_base = ''
305 self.container_image_prometheus = ''
306 self.container_image_grafana = ''
307 self.container_image_alertmanager = ''
308 self.container_image_node_exporter = ''
309 self.warn_on_stray_hosts = True
310 self.warn_on_stray_daemons = True
311 self.warn_on_failed_host_check = True
312 self.allow_ptrace = False
313 self.container_init = False
314 self.prometheus_alerts_path = ''
315 self.migration_current: Optional[int] = None
316 self.config_dashboard = True
317 self.manage_etc_ceph_ceph_conf = True
318 self.registry_url: Optional[str] = None
319 self.registry_username: Optional[str] = None
320 self.registry_password: Optional[str] = None
321 self.use_repo_digest = False
322
323 self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
324 remoto.backends.LegacyModuleExecute]] = {}
325
326 self.notify('mon_map', None)
327 self.config_notify()
328
329 path = self.get_ceph_option('cephadm_path')
330 try:
331 with open(path, 'r') as f:
332 self._cephadm = f.read()
333 except (IOError, TypeError) as e:
334 raise RuntimeError("unable to read cephadm at '%s': %s" % (
335 path, str(e)))
336
337 self._worker_pool = multiprocessing.pool.ThreadPool(10)
338
339 self._reconfig_ssh()
340
341 CephadmOrchestrator.instance = self
342
343 self.upgrade = CephadmUpgrade(self)
344
345 self.health_checks: Dict[str, dict] = {}
346
347 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
348
349 self.inventory = Inventory(self)
350
351 self.cache = HostCache(self)
352 self.cache.load()
353
354 self.to_remove_osds = OSDRemovalQueue(self)
355 self.to_remove_osds.load_from_store()
356
357 self.spec_store = SpecStore(self)
358 self.spec_store.load()
359
360 # ensure the host lists are in sync
361 for h in self.inventory.keys():
362 if h not in self.cache.daemons:
363 self.cache.prime_empty_host(h)
364 for h in self.cache.get_hosts():
365 if h not in self.inventory:
366 self.cache.rm_host(h)
367
368 # in-memory only.
369 self.events = EventStore(self)
370 self.offline_hosts: Set[str] = set()
371
372 self.migration = Migrations(self)
373
374 # services:
375 self.osd_service = OSDService(self)
376 self.nfs_service = NFSService(self)
377 self.mon_service = MonService(self)
378 self.mgr_service = MgrService(self)
379 self.mds_service = MdsService(self)
380 self.rgw_service = RgwService(self)
381 self.rbd_mirror_service = RbdMirrorService(self)
382 self.grafana_service = GrafanaService(self)
383 self.alertmanager_service = AlertmanagerService(self)
384 self.prometheus_service = PrometheusService(self)
385 self.node_exporter_service = NodeExporterService(self)
386 self.crash_service = CrashService(self)
387 self.iscsi_service = IscsiService(self)
388 self.container_service = CustomContainerService(self)
389 self.cephadm_services = {
390 'mon': self.mon_service,
391 'mgr': self.mgr_service,
392 'osd': self.osd_service,
393 'mds': self.mds_service,
394 'rgw': self.rgw_service,
395 'rbd-mirror': self.rbd_mirror_service,
396 'nfs': self.nfs_service,
397 'grafana': self.grafana_service,
398 'alertmanager': self.alertmanager_service,
399 'prometheus': self.prometheus_service,
400 'node-exporter': self.node_exporter_service,
401 'crash': self.crash_service,
402 'iscsi': self.iscsi_service,
403 'container': self.container_service,
404 }
405
406 self.template = TemplateMgr(self)
407
408 self.requires_post_actions: Set[str] = set()
409
410 def shutdown(self) -> None:
411 self.log.debug('shutdown')
412 self._worker_pool.close()
413 self._worker_pool.join()
414 self.run = False
415 self.event.set()
416
417 def _get_cephadm_service(self, service_type: str) -> CephadmService:
418 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
419 return self.cephadm_services[service_type]
420
421 def _kick_serve_loop(self) -> None:
422 self.log.debug('_kick_serve_loop')
423 self.event.set()
424
425 # function responsible for logging single host into custom registry
426 def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
427 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
428 # want to pass info over stdin rather than through normal list of args
429 args_str = json.dumps({
430 'url': url,
431 'username': username,
432 'password': password,
433 })
434 out, err, code = self._run_cephadm(
435 host, 'mon', 'registry-login',
436 ['--registry-json', '-'], stdin=args_str, error_ok=True)
437 if code:
438 return f"Host {host} failed to login to {url} as {username} with given password"
439 return None
440
441 def serve(self) -> None:
442 """
443 The main loop of cephadm.
444
445 A command handler will typically change the declarative state
446 of cephadm. This loop will then attempt to apply this new state.
447 """
448 serve = CephadmServe(self)
449 serve.serve()
450
451 def set_container_image(self, entity: str, image: str) -> None:
452 self.check_mon_command({
453 'prefix': 'config set',
454 'name': 'container_image',
455 'value': image,
456 'who': entity,
457 })
458
459 def config_notify(self) -> None:
460 """
461 This method is called whenever one of our config options is changed.
462
463 TODO: this method should be moved into mgr_module.py
464 """
465 for opt in self.MODULE_OPTIONS:
466 setattr(self,
467 opt['name'], # type: ignore
468 self.get_module_option(opt['name'])) # type: ignore
469 self.log.debug(' mgr option %s = %s',
470 opt['name'], getattr(self, opt['name'])) # type: ignore
471 for opt in self.NATIVE_OPTIONS:
472 setattr(self,
473 opt, # type: ignore
474 self.get_ceph_option(opt))
475 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
476
477 self.event.set()
478
479 def notify(self, notify_type: str, notify_id: Optional[str]) -> None:
480 if notify_type == "mon_map":
481 # get monmap mtime so we can refresh configs when mons change
482 monmap = self.get('mon_map')
483 self.last_monmap = str_to_datetime(monmap['modified'])
484 if self.last_monmap and self.last_monmap > datetime_now():
485 self.last_monmap = None # just in case clocks are skewed
486 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
487 # getattr, due to notify() being called before config_notify()
488 self._kick_serve_loop()
489 if notify_type == "pg_summary":
490 self._trigger_osd_removal()
491
492 def _trigger_osd_removal(self) -> None:
493 data = self.get("osd_stats")
494 for osd in data.get('osd_stats', []):
495 if osd.get('num_pgs') == 0:
496 # if _ANY_ osd that is currently in the queue appears to be empty,
497 # start the removal process
498 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
499 self.log.debug(f"Found empty osd. Starting removal process")
500 # if the osd that is now empty is also part of the removal queue
501 # start the process
502 self._kick_serve_loop()
503
504 def pause(self) -> None:
505 if not self.paused:
506 self.log.info('Paused')
507 self.set_store('pause', 'true')
508 self.paused = True
509 # wake loop so we update the health status
510 self._kick_serve_loop()
511
512 def resume(self) -> None:
513 if self.paused:
514 self.log.info('Resumed')
515 self.paused = False
516 self.set_store('pause', None)
517 # unconditionally wake loop so that 'orch resume' can be used to kick
518 # cephadm
519 self._kick_serve_loop()
520
521 def get_unique_name(self, daemon_type, host, existing, prefix=None,
522 forcename=None):
523 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
524 """
525 Generate a unique random service name
526 """
527 suffix = daemon_type not in [
528 'mon', 'crash', 'nfs',
529 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
530 'container'
531 ]
532 if forcename:
533 if len([d for d in existing if d.daemon_id == forcename]):
534 raise orchestrator.OrchestratorValidationError(
535 f'name {daemon_type}.{forcename} already in use')
536 return forcename
537
538 if '.' in host:
539 host = host.split('.')[0]
540 while True:
541 if prefix:
542 name = prefix + '.'
543 else:
544 name = ''
545 name += host
546 if suffix:
547 name += '.' + ''.join(random.choice(string.ascii_lowercase)
548 for _ in range(6))
549 if len([d for d in existing if d.daemon_id == name]):
550 if not suffix:
551 raise orchestrator.OrchestratorValidationError(
552 f'name {daemon_type}.{name} already in use')
553 self.log.debug('name %s exists, trying again', name)
554 continue
555 return name
556
557 def _reconfig_ssh(self) -> None:
558 temp_files = [] # type: list
559 ssh_options = [] # type: List[str]
560
561 # ssh_config
562 ssh_config_fname = self.ssh_config_file
563 ssh_config = self.get_store("ssh_config")
564 if ssh_config is not None or ssh_config_fname is None:
565 if not ssh_config:
566 ssh_config = DEFAULT_SSH_CONFIG
567 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
568 os.fchmod(f.fileno(), 0o600)
569 f.write(ssh_config.encode('utf-8'))
570 f.flush() # make visible to other processes
571 temp_files += [f]
572 ssh_config_fname = f.name
573 if ssh_config_fname:
574 self.validate_ssh_config_fname(ssh_config_fname)
575 ssh_options += ['-F', ssh_config_fname]
576 self.ssh_config = ssh_config
577
578 # identity
579 ssh_key = self.get_store("ssh_identity_key")
580 ssh_pub = self.get_store("ssh_identity_pub")
581 self.ssh_pub = ssh_pub
582 self.ssh_key = ssh_key
583 if ssh_key and ssh_pub:
584 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
585 tkey.write(ssh_key.encode('utf-8'))
586 os.fchmod(tkey.fileno(), 0o600)
587 tkey.flush() # make visible to other processes
588 tpub = open(tkey.name + '.pub', 'w')
589 os.fchmod(tpub.fileno(), 0o600)
590 tpub.write(ssh_pub)
591 tpub.flush() # make visible to other processes
592 temp_files += [tkey, tpub]
593 ssh_options += ['-i', tkey.name]
594
595 self._temp_files = temp_files
596 if ssh_options:
597 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
598 else:
599 self._ssh_options = None
600
601 if self.mode == 'root':
602 self.ssh_user = self.get_store('ssh_user', default='root')
603 elif self.mode == 'cephadm-package':
604 self.ssh_user = 'cephadm'
605
606 self._reset_cons()
607
608 def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
609 if ssh_config is None or len(ssh_config.strip()) == 0:
610 raise OrchestratorValidationError('ssh_config cannot be empty')
611 # StrictHostKeyChecking is [yes|no] ?
612 l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
613 if not l:
614 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
615 for s in l:
616 if 'ask' in s.lower():
617 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
618
619 def validate_ssh_config_fname(self, ssh_config_fname: str) -> None:
620 if not os.path.isfile(ssh_config_fname):
621 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
622 ssh_config_fname))
623
624 def _reset_con(self, host: str) -> None:
625 conn, r = self._cons.get(host, (None, None))
626 if conn:
627 self.log.debug('_reset_con close %s' % host)
628 conn.exit()
629 del self._cons[host]
630
631 def _reset_cons(self) -> None:
632 for host, conn_and_r in self._cons.items():
633 self.log.debug('_reset_cons close %s' % host)
634 conn, r = conn_and_r
635 conn.exit()
636 self._cons = {}
637
638 def offline_hosts_remove(self, host: str) -> None:
639 if host in self.offline_hosts:
640 self.offline_hosts.remove(host)
641
642 @staticmethod
643 def can_run() -> Tuple[bool, str]:
644 if remoto is not None:
645 return True, ""
646 else:
647 return False, "loading remoto library:{}".format(
648 remoto_import_error)
649
650 def available(self) -> Tuple[bool, str]:
651 """
652 The cephadm orchestrator is always available.
653 """
654 ok, err = self.can_run()
655 if not ok:
656 return ok, err
657 if not self.ssh_key or not self.ssh_pub:
658 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
659 return True, ''
660
661 def process(self, completions: List[CephadmCompletion]) -> None:
662 """
663 Does nothing, as completions are processed in another thread.
664 """
665 if completions:
666 self.log.debug("process: completions={0}".format(
667 orchestrator.pretty_print(completions)))
668
669 for p in completions:
670 p.evaluate()
671
672 @orchestrator._cli_write_command(
673 prefix='cephadm set-ssh-config',
674 desc='Set the ssh_config file (use -i <ssh_config>)')
675 def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
676 """
677 Set an ssh_config file provided from stdin
678 """
679 if inbuf == self.ssh_config:
680 return 0, "value unchanged", ""
681 self.validate_ssh_config_content(inbuf)
682 self.set_store("ssh_config", inbuf)
683 self.log.info('Set ssh_config')
684 self._reconfig_ssh()
685 return 0, "", ""
686
687 @orchestrator._cli_write_command(
688 prefix='cephadm clear-ssh-config',
689 desc='Clear the ssh_config file')
690 def _clear_ssh_config(self) -> Tuple[int, str, str]:
691 """
692 Clear the ssh_config file provided from stdin
693 """
694 self.set_store("ssh_config", None)
695 self.ssh_config_tmp = None
696 self.log.info('Cleared ssh_config')
697 self._reconfig_ssh()
698 return 0, "", ""
699
700 @orchestrator._cli_read_command(
701 prefix='cephadm get-ssh-config',
702 desc='Returns the ssh config as used by cephadm'
703 )
704 def _get_ssh_config(self) -> HandleCommandResult:
705 if self.ssh_config_file:
706 self.validate_ssh_config_fname(self.ssh_config_file)
707 with open(self.ssh_config_file) as f:
708 return HandleCommandResult(stdout=f.read())
709 ssh_config = self.get_store("ssh_config")
710 if ssh_config:
711 return HandleCommandResult(stdout=ssh_config)
712 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
713
714 @orchestrator._cli_write_command(
715 'cephadm generate-key',
716 desc='Generate a cluster SSH key (if not present)')
717 def _generate_key(self) -> Tuple[int, str, str]:
718 if not self.ssh_pub or not self.ssh_key:
719 self.log.info('Generating ssh key...')
720 tmp_dir = TemporaryDirectory()
721 path = tmp_dir.name + '/key'
722 try:
723 subprocess.check_call([
724 '/usr/bin/ssh-keygen',
725 '-C', 'ceph-%s' % self._cluster_fsid,
726 '-N', '',
727 '-f', path
728 ])
729 with open(path, 'r') as f:
730 secret = f.read()
731 with open(path + '.pub', 'r') as f:
732 pub = f.read()
733 finally:
734 os.unlink(path)
735 os.unlink(path + '.pub')
736 tmp_dir.cleanup()
737 self.set_store('ssh_identity_key', secret)
738 self.set_store('ssh_identity_pub', pub)
739 self._reconfig_ssh()
740 return 0, '', ''
741
742 @orchestrator._cli_write_command(
743 'cephadm set-priv-key',
744 desc='Set cluster SSH private key (use -i <private_key>)')
745 def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
746 if inbuf is None or len(inbuf) == 0:
747 return -errno.EINVAL, "", "empty private ssh key provided"
748 if inbuf == self.ssh_key:
749 return 0, "value unchanged", ""
750 self.set_store("ssh_identity_key", inbuf)
751 self.log.info('Set ssh private key')
752 self._reconfig_ssh()
753 return 0, "", ""
754
755 @orchestrator._cli_write_command(
756 'cephadm set-pub-key',
757 desc='Set cluster SSH public key (use -i <public_key>)')
758 def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
759 if inbuf is None or len(inbuf) == 0:
760 return -errno.EINVAL, "", "empty public ssh key provided"
761 if inbuf == self.ssh_pub:
762 return 0, "value unchanged", ""
763 self.set_store("ssh_identity_pub", inbuf)
764 self.log.info('Set ssh public key')
765 self._reconfig_ssh()
766 return 0, "", ""
767
768 @orchestrator._cli_write_command(
769 'cephadm clear-key',
770 desc='Clear cluster SSH key')
771 def _clear_key(self) -> Tuple[int, str, str]:
772 self.set_store('ssh_identity_key', None)
773 self.set_store('ssh_identity_pub', None)
774 self._reconfig_ssh()
775 self.log.info('Cleared cluster SSH key')
776 return 0, '', ''
777
778 @orchestrator._cli_read_command(
779 'cephadm get-pub-key',
780 desc='Show SSH public key for connecting to cluster hosts')
781 def _get_pub_key(self) -> Tuple[int, str, str]:
782 if self.ssh_pub:
783 return 0, self.ssh_pub, ''
784 else:
785 return -errno.ENOENT, '', 'No cluster SSH key defined'
786
787 @orchestrator._cli_read_command(
788 'cephadm get-user',
789 desc='Show user for SSHing to cluster hosts')
790 def _get_user(self) -> Tuple[int, str, str]:
791 return 0, self.ssh_user, ''
792
793 @orchestrator._cli_read_command(
794 'cephadm set-user',
795 'name=user,type=CephString',
796 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
797 def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
798 current_user = self.ssh_user
799 if user == current_user:
800 return 0, "value unchanged", ""
801
802 self.set_store('ssh_user', user)
803 self._reconfig_ssh()
804
805 host = self.cache.get_hosts()[0]
806 r = CephadmServe(self)._check_host(host)
807 if r is not None:
808 # connection failed reset user
809 self.set_store('ssh_user', current_user)
810 self._reconfig_ssh()
811 return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
812
813 msg = 'ssh user set to %s' % user
814 if user != 'root':
815 msg += ' sudo will be used'
816 self.log.info(msg)
817 return 0, msg, ''
818
819 @orchestrator._cli_read_command(
820 'cephadm registry-login',
821 "name=url,type=CephString,req=false "
822 "name=username,type=CephString,req=false "
823 "name=password,type=CephString,req=false",
824 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
825 def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
826 # if password not given in command line, get it through file input
827 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
828 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
829 "or -i <login credentials json file>")
830 elif not (url and username and password):
831 assert isinstance(inbuf, str)
832 login_info = json.loads(inbuf)
833 if "url" in login_info and "username" in login_info and "password" in login_info:
834 url = login_info["url"]
835 username = login_info["username"]
836 password = login_info["password"]
837 else:
838 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
839 "Please setup json file as\n"
840 "{\n"
841 " \"url\": \"REGISTRY_URL\",\n"
842 " \"username\": \"REGISTRY_USERNAME\",\n"
843 " \"password\": \"REGISTRY_PASSWORD\"\n"
844 "}\n")
845 # verify login info works by attempting login on random host
846 host = None
847 for host_name in self.inventory.keys():
848 host = host_name
849 break
850 if not host:
851 raise OrchestratorError('no hosts defined')
852 r = self._registry_login(host, url, username, password)
853 if r is not None:
854 return 1, '', r
855 # if logins succeeded, store info
856 self.log.debug("Host logins successful. Storing login info.")
857 self.set_module_option('registry_url', url)
858 self.set_module_option('registry_username', username)
859 self.set_module_option('registry_password', password)
860 # distribute new login info to all hosts
861 self.cache.distribute_new_registry_login_info()
862 return 0, "registry login scheduled", ''
863
864 @orchestrator._cli_read_command(
865 'cephadm check-host',
866 'name=host,type=CephString '
867 'name=addr,type=CephString,req=false',
868 'Check whether we can access and manage a remote host')
869 def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
870 try:
871 out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
872 ['--expect-hostname', host],
873 addr=addr,
874 error_ok=True, no_fsid=True)
875 if code:
876 return 1, '', ('check-host failed:\n' + '\n'.join(err))
877 except OrchestratorError as e:
878 self.log.exception(f"check-host failed for '{host}'")
879 return 1, '', ('check-host failed:\n' +
880 f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
881 # if we have an outstanding health alert for this host, give the
882 # serve thread a kick
883 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
884 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
885 if item.startswith('host %s ' % host):
886 self.event.set()
887 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
888
889 @orchestrator._cli_read_command(
890 'cephadm prepare-host',
891 'name=host,type=CephString '
892 'name=addr,type=CephString,req=false',
893 'Prepare a remote host for use with cephadm')
894 def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
895 out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
896 ['--expect-hostname', host],
897 addr=addr,
898 error_ok=True, no_fsid=True)
899 if code:
900 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
901 # if we have an outstanding health alert for this host, give the
902 # serve thread a kick
903 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
904 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
905 if item.startswith('host %s ' % host):
906 self.event.set()
907 return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
908
909 @orchestrator._cli_write_command(
910 prefix='cephadm set-extra-ceph-conf',
911 desc="Text that is appended to all daemon's ceph.conf.\n"
912 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
913 "a complete ceph.conf.\n\n"
914 "Warning: this is a dangerous operation.")
915 def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
916 if inbuf:
917 # sanity check.
918 cp = ConfigParser()
919 cp.read_string(inbuf, source='<infile>')
920
921 self.set_store("extra_ceph_conf", json.dumps({
922 'conf': inbuf,
923 'last_modified': datetime_to_str(datetime_now())
924 }))
925 self.log.info('Set extra_ceph_conf')
926 self._kick_serve_loop()
927 return HandleCommandResult()
928
929 @orchestrator._cli_read_command(
930 'cephadm get-extra-ceph-conf',
931 desc='Get extra ceph conf that is appended')
932 def _get_extra_ceph_conf(self) -> HandleCommandResult:
933 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
934
935 class ExtraCephConf(NamedTuple):
936 conf: str
937 last_modified: Optional[datetime.datetime]
938
939 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
940 data = self.get_store('extra_ceph_conf')
941 if not data:
942 return CephadmOrchestrator.ExtraCephConf('', None)
943 try:
944 j = json.loads(data)
945 except ValueError:
946 msg = 'Unable to load extra_ceph_conf: Cannot decode JSON'
947 self.log.exception('%s: \'%s\'', msg, data)
948 return CephadmOrchestrator.ExtraCephConf('', None)
949 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
950
951 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
952 conf = self.extra_ceph_conf()
953 if not conf.last_modified:
954 return False
955 return conf.last_modified > dt
956
957 def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection',
958 'remoto.backends.LegacyModuleExecute']:
959 """
960 Setup a connection for running commands on remote host.
961 """
962 conn, r = self._cons.get(host, (None, None))
963 if conn:
964 if conn.has_connection():
965 self.log.debug('Have connection to %s' % host)
966 return conn, r
967 else:
968 self._reset_con(host)
969 n = self.ssh_user + '@' + host
970 self.log.debug("Opening connection to {} with ssh options '{}'".format(
971 n, self._ssh_options))
972 child_logger = self.log.getChild(n)
973 child_logger.setLevel('WARNING')
974 conn = remoto.Connection(
975 n,
976 logger=child_logger,
977 ssh_options=self._ssh_options,
978 sudo=True if self.ssh_user != 'root' else False)
979
980 r = conn.import_module(remotes)
981 self._cons[host] = conn, r
982
983 return conn, r
984
985 def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str:
986 """
987 Remote validator that accepts a connection object to ensure that a certain
988 executable is available returning its full path if so.
989
990 Otherwise an exception with thorough details will be raised, informing the
991 user that the executable was not found.
992 """
993 executable_path = conn.remote_module.which(executable)
994 if not executable_path:
995 raise RuntimeError("Executable '{}' not found on host '{}'".format(
996 executable, conn.hostname))
997 self.log.debug("Found executable '{}' at path '{}'".format(executable,
998 executable_path))
999 return executable_path
1000
1001 @contextmanager
1002 def _remote_connection(self,
1003 host: str,
1004 addr: Optional[str] = None,
1005 ) -> Iterator[Tuple["BaseConnection", Any]]:
1006 if not addr and host in self.inventory:
1007 addr = self.inventory.get_addr(host)
1008
1009 self.offline_hosts_remove(host)
1010
1011 try:
1012 try:
1013 if not addr:
1014 raise OrchestratorError("host address is empty")
1015 conn, connr = self._get_connection(addr)
1016 except OSError as e:
1017 self._reset_con(host)
1018 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1019 raise execnet.gateway_bootstrap.HostNotFound(msg)
1020
1021 yield (conn, connr)
1022
1023 except execnet.gateway_bootstrap.HostNotFound as e:
1024 # this is a misleading exception as it seems to be thrown for
1025 # any sort of connection failure, even those having nothing to
1026 # do with "host not found" (e.g., ssh key permission denied).
1027 self.offline_hosts.add(host)
1028 self._reset_con(host)
1029
1030 user = self.ssh_user if self.mode == 'root' else 'cephadm'
1031 if str(e).startswith("Can't communicate"):
1032 msg = str(e)
1033 else:
1034 msg = f'''Failed to connect to {host} ({addr}).
1035 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1036
1037 To add the cephadm SSH key to the host:
1038 > ceph cephadm get-pub-key > ~/ceph.pub
1039 > ssh-copy-id -f -i ~/ceph.pub {user}@{host}
1040
1041 To check that the host is reachable:
1042 > ceph cephadm get-ssh-config > ssh_config
1043 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1044 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
1045 raise OrchestratorError(msg) from e
1046 except Exception as ex:
1047 self.log.exception(ex)
1048 raise
1049
1050 def _get_container_image(self, daemon_name: str) -> Optional[str]:
1051 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1052 if daemon_type in CEPH_TYPES or \
1053 daemon_type == 'nfs' or \
1054 daemon_type == 'iscsi':
1055 # get container image
1056 ret, image, err = self.check_mon_command({
1057 'prefix': 'config get',
1058 'who': utils.name_to_config_section(daemon_name),
1059 'key': 'container_image',
1060 })
1061 image = image.strip() # type: ignore
1062 elif daemon_type == 'prometheus':
1063 image = self.container_image_prometheus
1064 elif daemon_type == 'grafana':
1065 image = self.container_image_grafana
1066 elif daemon_type == 'alertmanager':
1067 image = self.container_image_alertmanager
1068 elif daemon_type == 'node-exporter':
1069 image = self.container_image_node_exporter
1070 elif daemon_type == CustomContainerService.TYPE:
1071 # The image can't be resolved, the necessary information
1072 # is only available when a container is deployed (given
1073 # via spec).
1074 image = None
1075 else:
1076 assert False, daemon_type
1077
1078 self.log.debug('%s container image %s' % (daemon_name, image))
1079
1080 return image
1081
1082 def _run_cephadm(self,
1083 host: str,
1084 entity: Union[CephadmNoImage, str],
1085 command: str,
1086 args: List[str],
1087 addr: Optional[str] = "",
1088 stdin: Optional[str] = "",
1089 no_fsid: Optional[bool] = False,
1090 error_ok: Optional[bool] = False,
1091 image: Optional[str] = "",
1092 env_vars: Optional[List[str]] = None,
1093 ) -> Tuple[List[str], List[str], int]:
1094 """
1095 Run cephadm on the remote host with the given command + args
1096
1097 :env_vars: in format -> [KEY=VALUE, ..]
1098 """
1099 with self._remote_connection(host, addr) as tpl:
1100 conn, connr = tpl
1101 assert image or entity
1102 if not image and entity is not cephadmNoImage:
1103 image = self._get_container_image(entity)
1104
1105 final_args = []
1106
1107 if env_vars:
1108 for env_var_pair in env_vars:
1109 final_args.extend(['--env', env_var_pair])
1110
1111 if image:
1112 final_args.extend(['--image', image])
1113 final_args.append(command)
1114
1115 if not no_fsid:
1116 final_args += ['--fsid', self._cluster_fsid]
1117
1118 if self.container_init:
1119 final_args += ['--container-init']
1120
1121 final_args += args
1122
1123 self.log.debug('args: %s' % (' '.join(final_args)))
1124 if self.mode == 'root':
1125 if stdin:
1126 self.log.debug('stdin: %s' % stdin)
1127 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1128 if stdin:
1129 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1130 script += self._cephadm
1131 python = connr.choose_python()
1132 if not python:
1133 raise RuntimeError(
1134 'unable to find python on %s (tried %s in %s)' % (
1135 host, remotes.PYTHONS, remotes.PATH))
1136 try:
1137 out, err, code = remoto.process.check(
1138 conn,
1139 [python, '-u'],
1140 stdin=script.encode('utf-8'))
1141 except RuntimeError as e:
1142 self._reset_con(host)
1143 if error_ok:
1144 return [], [str(e)], 1
1145 raise
1146 elif self.mode == 'cephadm-package':
1147 try:
1148 out, err, code = remoto.process.check(
1149 conn,
1150 ['sudo', '/usr/bin/cephadm'] + final_args,
1151 stdin=stdin)
1152 except RuntimeError as e:
1153 self._reset_con(host)
1154 if error_ok:
1155 return [], [str(e)], 1
1156 raise
1157 else:
1158 assert False, 'unsupported mode'
1159
1160 self.log.debug('code: %d' % code)
1161 if out:
1162 self.log.debug('out: %s' % '\n'.join(out))
1163 if err:
1164 self.log.debug('err: %s' % '\n'.join(err))
1165 if code and not error_ok:
1166 raise OrchestratorError(
1167 'cephadm exited with an error code: %d, stderr:%s' % (
1168 code, '\n'.join(err)))
1169 return out, err, code
1170
1171 def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
1172 """
1173 Returns all hosts that went through _refresh_host_daemons().
1174
1175 This mitigates a potential race, where new host was added *after*
1176 ``_refresh_host_daemons()`` was called, but *before*
1177 ``_apply_all_specs()`` was called. thus we end up with a hosts
1178 where daemons might be running, but we have not yet detected them.
1179 """
1180 return [
1181 h for h in self.inventory.all_specs()
1182 if self.cache.host_had_daemon_refresh(h.hostname)
1183 ]
1184
1185 def _add_host(self, spec):
1186 # type: (HostSpec) -> str
1187 """
1188 Add a host to be managed by the orchestrator.
1189
1190 :param host: host name
1191 """
1192 assert_valid_host(spec.hostname)
1193 out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
1194 ['--expect-hostname', spec.hostname],
1195 addr=spec.addr,
1196 error_ok=True, no_fsid=True)
1197 if code:
1198 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1199 spec.hostname, spec.addr, err))
1200
1201 self.inventory.add_host(spec)
1202 self.cache.prime_empty_host(spec.hostname)
1203 self.offline_hosts_remove(spec.hostname)
1204 self.event.set() # refresh stray health check
1205 self.log.info('Added host %s' % spec.hostname)
1206 return "Added host '{}'".format(spec.hostname)
1207
1208 @trivial_completion
1209 def add_host(self, spec: HostSpec) -> str:
1210 return self._add_host(spec)
1211
1212 @trivial_completion
1213 def remove_host(self, host):
1214 # type: (str) -> str
1215 """
1216 Remove a host from orchestrator management.
1217
1218 :param host: host name
1219 """
1220 self.inventory.rm_host(host)
1221 self.cache.rm_host(host)
1222 self._reset_con(host)
1223 self.event.set() # refresh stray health check
1224 self.log.info('Removed host %s' % host)
1225 return "Removed host '{}'".format(host)
1226
1227 @trivial_completion
1228 def update_host_addr(self, host: str, addr: str) -> str:
1229 self.inventory.set_addr(host, addr)
1230 self._reset_con(host)
1231 self.event.set() # refresh stray health check
1232 self.log.info('Set host %s addr to %s' % (host, addr))
1233 return "Updated host '{}' addr to '{}'".format(host, addr)
1234
1235 @trivial_completion
1236 def get_hosts(self):
1237 # type: () -> List[orchestrator.HostSpec]
1238 """
1239 Return a list of hosts managed by the orchestrator.
1240
1241 Notes:
1242 - skip async: manager reads from cache.
1243 """
1244 return list(self.inventory.all_specs())
1245
1246 @trivial_completion
1247 def add_host_label(self, host: str, label: str) -> str:
1248 self.inventory.add_label(host, label)
1249 self.log.info('Added label %s to host %s' % (label, host))
1250 return 'Added label %s to host %s' % (label, host)
1251
1252 @trivial_completion
1253 def remove_host_label(self, host: str, label: str) -> str:
1254 self.inventory.rm_label(host, label)
1255 self.log.info('Removed label %s to host %s' % (label, host))
1256 return 'Removed label %s from host %s' % (label, host)
1257
1258 @trivial_completion
1259 def host_ok_to_stop(self, hostname: str) -> str:
1260 if hostname not in self.cache.get_hosts():
1261 raise OrchestratorError(f'Cannot find host "{hostname}"')
1262
1263 daemons = self.cache.get_daemons()
1264 daemon_map = defaultdict(lambda: [])
1265 for dd in daemons:
1266 if dd.hostname == hostname:
1267 daemon_map[dd.daemon_type].append(dd.daemon_id)
1268
1269 for daemon_type, daemon_ids in daemon_map.items():
1270 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
1271 if r.retval:
1272 self.log.error(f'It is NOT safe to stop host {hostname}')
1273 raise orchestrator.OrchestratorError(
1274 r.stderr,
1275 errno=r.retval)
1276
1277 msg = f'It is presumed safe to stop host {hostname}'
1278 self.log.info(msg)
1279 return msg
1280
1281 def get_minimal_ceph_conf(self) -> str:
1282 _, config, _ = self.check_mon_command({
1283 "prefix": "config generate-minimal-conf",
1284 })
1285 extra = self.extra_ceph_conf().conf
1286 if extra:
1287 config += '\n\n' + extra.strip() + '\n'
1288 return config
1289
1290 def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None:
1291 if filter_host:
1292 self.cache.invalidate_host_daemons(filter_host)
1293 else:
1294 for h in self.cache.get_hosts():
1295 # Also discover daemons deployed manually
1296 self.cache.invalidate_host_daemons(h)
1297
1298 self._kick_serve_loop()
1299
1300 @trivial_completion
1301 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1302 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
1303 if refresh:
1304 self._invalidate_daemons_and_kick_serve()
1305 self.log.info('Kicked serve() loop to refresh all services')
1306
1307 # <service_map>
1308 sm: Dict[str, orchestrator.ServiceDescription] = {}
1309 osd_count = 0
1310 for h, dm in self.cache.get_daemons_with_volatile_status():
1311 for name, dd in dm.items():
1312 if service_type and service_type != dd.daemon_type:
1313 continue
1314 n: str = dd.service_name()
1315 if service_name and service_name != n:
1316 continue
1317 if dd.daemon_type == 'osd':
1318 """
1319 OSDs do not know the affinity to their spec out of the box.
1320 """
1321 n = f"osd.{dd.osdspec_affinity}"
1322 if not dd.osdspec_affinity:
1323 # If there is no osdspec_affinity, the spec should suffice for displaying
1324 continue
1325 if n in self.spec_store.specs:
1326 spec = self.spec_store.specs[n]
1327 else:
1328 spec = ServiceSpec(
1329 unmanaged=True,
1330 service_type=dd.daemon_type,
1331 service_id=dd.service_id(),
1332 placement=PlacementSpec(
1333 hosts=[dd.hostname]
1334 )
1335 )
1336 if n not in sm:
1337 sm[n] = orchestrator.ServiceDescription(
1338 last_refresh=dd.last_refresh,
1339 container_image_id=dd.container_image_id,
1340 container_image_name=dd.container_image_name,
1341 spec=spec,
1342 events=self.events.get_for_service(spec.service_name()),
1343 )
1344 if n in self.spec_store.specs:
1345 if dd.daemon_type == 'osd':
1346 """
1347 The osd count can't be determined by the Placement spec.
1348 Showing an actual/expected representation cannot be determined
1349 here. So we're setting running = size for now.
1350 """
1351 osd_count += 1
1352 sm[n].size = osd_count
1353 else:
1354 sm[n].size = spec.placement.get_host_selection_size(
1355 self.inventory.all_specs())
1356
1357 sm[n].created = self.spec_store.spec_created[n]
1358 if service_type == 'nfs':
1359 spec = cast(NFSServiceSpec, spec)
1360 sm[n].rados_config_location = spec.rados_config_location()
1361 else:
1362 sm[n].size = 0
1363 if dd.status == 1:
1364 sm[n].running += 1
1365 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1366 sm[n].last_refresh = dd.last_refresh
1367 if sm[n].container_image_id != dd.container_image_id:
1368 sm[n].container_image_id = 'mix'
1369 if sm[n].container_image_name != dd.container_image_name:
1370 sm[n].container_image_name = 'mix'
1371 for n, spec in self.spec_store.specs.items():
1372 if n in sm:
1373 continue
1374 if service_type is not None and service_type != spec.service_type:
1375 continue
1376 if service_name is not None and service_name != n:
1377 continue
1378 sm[n] = orchestrator.ServiceDescription(
1379 spec=spec,
1380 size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
1381 running=0,
1382 events=self.events.get_for_service(spec.service_name()),
1383 )
1384 if service_type == 'nfs':
1385 spec = cast(NFSServiceSpec, spec)
1386 sm[n].rados_config_location = spec.rados_config_location()
1387 return list(sm.values())
1388
1389 @trivial_completion
1390 def list_daemons(self,
1391 service_name: Optional[str] = None,
1392 daemon_type: Optional[str] = None,
1393 daemon_id: Optional[str] = None,
1394 host: Optional[str] = None,
1395 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
1396 if refresh:
1397 self._invalidate_daemons_and_kick_serve(host)
1398 self.log.info('Kicked serve() loop to refresh all daemons')
1399
1400 result = []
1401 for h, dm in self.cache.get_daemons_with_volatile_status():
1402 if host and h != host:
1403 continue
1404 for name, dd in dm.items():
1405 if daemon_type is not None and daemon_type != dd.daemon_type:
1406 continue
1407 if daemon_id is not None and daemon_id != dd.daemon_id:
1408 continue
1409 if service_name is not None and service_name != dd.service_name():
1410 continue
1411 result.append(dd)
1412 return result
1413
1414 @trivial_completion
1415 def service_action(self, action: str, service_name: str) -> List[str]:
1416 dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
1417 self.log.info('%s service %s' % (action.capitalize(), service_name))
1418 return [
1419 self._schedule_daemon_action(dd.name(), action)
1420 for dd in dds
1421 ]
1422
1423 def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str:
1424 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
1425 host=host,
1426 daemon_id=daemon_id,
1427 daemon_type=daemon_type,
1428 )
1429
1430 self._daemon_action_set_image(action, image, daemon_type, daemon_id)
1431
1432 if action == 'redeploy':
1433 if self.daemon_is_self(daemon_type, daemon_id):
1434 self.mgr_service.fail_over()
1435 return '' # unreachable
1436 # stop, recreate the container+unit, then restart
1437 return self._create_daemon(daemon_spec)
1438 elif action == 'reconfig':
1439 return self._create_daemon(daemon_spec, reconfig=True)
1440
1441 actions = {
1442 'start': ['reset-failed', 'start'],
1443 'stop': ['stop'],
1444 'restart': ['reset-failed', 'restart'],
1445 }
1446 name = daemon_spec.name()
1447 for a in actions[action]:
1448 try:
1449 out, err, code = self._run_cephadm(
1450 host, name, 'unit',
1451 ['--name', name, a])
1452 except Exception:
1453 self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
1454 self.cache.invalidate_host_daemons(daemon_spec.host)
1455 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1456 self.events.for_daemon(name, 'INFO', msg)
1457 return msg
1458
1459 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None:
1460 if image is not None:
1461 if action != 'redeploy':
1462 raise OrchestratorError(
1463 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1464 if daemon_type not in CEPH_TYPES:
1465 raise OrchestratorError(
1466 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1467 f'types are: {", ".join(CEPH_TYPES)}')
1468
1469 self.check_mon_command({
1470 'prefix': 'config set',
1471 'name': 'container_image',
1472 'value': image,
1473 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1474 })
1475
1476 @trivial_completion
1477 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
1478 d = self.cache.get_daemon(daemon_name)
1479
1480 if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
1481 and not self.mgr_service.mgr_map_has_standby():
1482 raise OrchestratorError(
1483 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1484
1485 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
1486
1487 self.log.info(f'Schedule {action} daemon {daemon_name}')
1488 return self._schedule_daemon_action(daemon_name, action)
1489
1490 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
1491 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
1492
1493 def _schedule_daemon_action(self, daemon_name: str, action: str) -> str:
1494 dd = self.cache.get_daemon(daemon_name)
1495 if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
1496 and not self.mgr_service.mgr_map_has_standby():
1497 raise OrchestratorError(
1498 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1499 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
1500 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
1501 self._kick_serve_loop()
1502 return msg
1503
1504 @trivial_completion
1505 def remove_daemons(self, names):
1506 # type: (List[str]) -> List[str]
1507 args = []
1508 for host, dm in self.cache.daemons.items():
1509 for name in names:
1510 if name in dm:
1511 args.append((name, host))
1512 if not args:
1513 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1514 self.log.info('Remove daemons %s' % [a[0] for a in args])
1515 return self._remove_daemons(args)
1516
1517 @trivial_completion
1518 def remove_service(self, service_name: str) -> str:
1519 self.log.info('Remove service %s' % service_name)
1520 self._trigger_preview_refresh(service_name=service_name)
1521 found = self.spec_store.rm(service_name)
1522 if found:
1523 self._kick_serve_loop()
1524 return 'Removed service %s' % service_name
1525 else:
1526 # must be idempotent: still a success.
1527 return f'Failed to remove service. <{service_name}> was not found.'
1528
1529 @trivial_completion
1530 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
1531 """
1532 Return the storage inventory of hosts matching the given filter.
1533
1534 :param host_filter: host filter
1535
1536 TODO:
1537 - add filtering by label
1538 """
1539 if refresh:
1540 if host_filter and host_filter.hosts:
1541 for h in host_filter.hosts:
1542 self.cache.invalidate_host_devices(h)
1543 else:
1544 for h in self.cache.get_hosts():
1545 self.cache.invalidate_host_devices(h)
1546
1547 self.event.set()
1548 self.log.info('Kicked serve() loop to refresh devices')
1549
1550 result = []
1551 for host, dls in self.cache.devices.items():
1552 if host_filter and host_filter.hosts and host not in host_filter.hosts:
1553 continue
1554 result.append(orchestrator.InventoryHost(host,
1555 inventory.Devices(dls)))
1556 return result
1557
1558 @trivial_completion
1559 def zap_device(self, host: str, path: str) -> str:
1560 self.log.info('Zap device %s:%s' % (host, path))
1561 out, err, code = self._run_cephadm(
1562 host, 'osd', 'ceph-volume',
1563 ['--', 'lvm', 'zap', '--destroy', path],
1564 error_ok=True)
1565 self.cache.invalidate_host_devices(host)
1566 if code:
1567 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1568 return '\n'.join(out + err)
1569
1570 @trivial_completion
1571 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
1572 """
1573 Blink a device light. Calling something like::
1574
1575 lsmcli local-disk-ident-led-on --path $path
1576
1577 If you must, you can customize this via::
1578
1579 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1580 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1581
1582 See templates/blink_device_light_cmd.j2
1583 """
1584 @forall_hosts
1585 def blink(host: str, dev: str, path: str) -> str:
1586 cmd_line = self.template.render('blink_device_light_cmd.j2',
1587 {
1588 'on': on,
1589 'ident_fault': ident_fault,
1590 'dev': dev,
1591 'path': path
1592 },
1593 host=host)
1594 cmd_args = shlex.split(cmd_line)
1595
1596 out, err, code = self._run_cephadm(
1597 host, 'osd', 'shell', ['--'] + cmd_args,
1598 error_ok=True)
1599 if code:
1600 raise OrchestratorError(
1601 'Unable to affect %s light for %s:%s. Command: %s' % (
1602 ident_fault, host, dev, ' '.join(cmd_args)))
1603 self.log.info('Set %s light for %s:%s %s' % (
1604 ident_fault, host, dev, 'on' if on else 'off'))
1605 return "Set %s light for %s:%s %s" % (
1606 ident_fault, host, dev, 'on' if on else 'off')
1607
1608 return blink(locs)
1609
1610 def get_osd_uuid_map(self, only_up=False):
1611 # type: (bool) -> Dict[str, str]
1612 osd_map = self.get('osd_map')
1613 r = {}
1614 for o in osd_map['osds']:
1615 # only include OSDs that have ever started in this map. this way
1616 # an interrupted osd create can be repeated and succeed the second
1617 # time around.
1618 osd_id = o.get('osd')
1619 if osd_id is None:
1620 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1621 if not only_up or (o['up_from'] > 0):
1622 r[str(osd_id)] = o.get('uuid', '')
1623 return r
1624
1625 def _trigger_preview_refresh(self,
1626 specs: Optional[List[DriveGroupSpec]] = None,
1627 service_name: Optional[str] = None,
1628 ) -> None:
1629 # Only trigger a refresh when a spec has changed
1630 trigger_specs = []
1631 if specs:
1632 for spec in specs:
1633 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
1634 # the to-be-preview spec != the actual spec, this means we need to
1635 # trigger a refresh, if the spec has been removed (==None) we need to
1636 # refresh as well.
1637 if not preview_spec or spec != preview_spec:
1638 trigger_specs.append(spec)
1639 if service_name:
1640 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
1641 if not any(trigger_specs):
1642 return None
1643
1644 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
1645 for host in refresh_hosts:
1646 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1647 self.cache.osdspec_previews_refresh_queue.append(host)
1648
1649 @trivial_completion
1650 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
1651 """
1652 Deprecated. Please use `apply()` instead.
1653
1654 Keeping this around to be compapatible to mgr/dashboard
1655 """
1656 return [self._apply(spec) for spec in specs]
1657
1658 @trivial_completion
1659 def create_osds(self, drive_group: DriveGroupSpec) -> str:
1660 return self.osd_service.create_from_spec(drive_group)
1661
1662 def _preview_osdspecs(self,
1663 osdspecs: Optional[List[DriveGroupSpec]] = None
1664 ) -> dict:
1665 if not osdspecs:
1666 return {'n/a': [{'error': True,
1667 'message': 'No OSDSpec or matching hosts found.'}]}
1668 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
1669 if not matching_hosts:
1670 return {'n/a': [{'error': True,
1671 'message': 'No OSDSpec or matching hosts found.'}]}
1672 # Is any host still loading previews or still in the queue to be previewed
1673 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
1674 if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts):
1675 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1676 return {'n/a': [{'error': True,
1677 'message': 'Preview data is being generated.. '
1678 'Please re-run this command in a bit.'}]}
1679 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1680 previews_for_specs = {}
1681 for host, raw_reports in self.cache.osdspec_previews.items():
1682 if host not in matching_hosts:
1683 continue
1684 osd_reports = []
1685 for osd_report in raw_reports:
1686 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
1687 osd_reports.append(osd_report)
1688 previews_for_specs.update({host: osd_reports})
1689 return previews_for_specs
1690
1691 def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]:
1692 need = {
1693 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1694 'grafana': ['prometheus'],
1695 'alertmanager': ['mgr', 'alertmanager'],
1696 }
1697 deps = []
1698 for dep_type in need.get(daemon_type, []):
1699 for dd in self.cache.get_daemons_by_service(dep_type):
1700 deps.append(dd.name())
1701 return sorted(deps)
1702
1703 def _create_daemon(self,
1704 daemon_spec: CephadmDaemonSpec,
1705 reconfig: bool = False,
1706 osd_uuid_map: Optional[Dict[str, Any]] = None,
1707 ) -> str:
1708
1709 with set_exception_subject('service', orchestrator.DaemonDescription(
1710 daemon_type=daemon_spec.daemon_type,
1711 daemon_id=daemon_spec.daemon_id,
1712 hostname=daemon_spec.host,
1713 ).service_id(), overwrite=True):
1714
1715 image = ''
1716 start_time = datetime_now()
1717 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1718
1719 if daemon_spec.daemon_type == 'container':
1720 spec: Optional[CustomContainerSpec] = daemon_spec.spec
1721 if spec is None:
1722 # Exit here immediately because the required service
1723 # spec to create a daemon is not provided. This is only
1724 # provided when a service is applied via 'orch apply'
1725 # command.
1726 msg = "Failed to {} daemon {} on {}: Required " \
1727 "service specification not provided".format(
1728 'reconfigure' if reconfig else 'deploy',
1729 daemon_spec.name(), daemon_spec.host)
1730 self.log.info(msg)
1731 return msg
1732 image = spec.image
1733 if spec.ports:
1734 ports.extend(spec.ports)
1735
1736 cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
1737 daemon_spec)
1738
1739 # TCP port to open in the host firewall
1740 if len(ports) > 0:
1741 daemon_spec.extra_args.extend([
1742 '--tcp-ports', ' '.join(map(str, ports))
1743 ])
1744
1745 # osd deployments needs an --osd-uuid arg
1746 if daemon_spec.daemon_type == 'osd':
1747 if not osd_uuid_map:
1748 osd_uuid_map = self.get_osd_uuid_map()
1749 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1750 if not osd_uuid:
1751 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1752 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1753
1754 if reconfig:
1755 daemon_spec.extra_args.append('--reconfig')
1756 if self.allow_ptrace:
1757 daemon_spec.extra_args.append('--allow-ptrace')
1758
1759 if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
1760 self._registry_login(daemon_spec.host, self.registry_url,
1761 self.registry_username, self.registry_password)
1762
1763 daemon_spec.extra_args.extend(['--config-json', '-'])
1764
1765 self.log.info('%s daemon %s on %s' % (
1766 'Reconfiguring' if reconfig else 'Deploying',
1767 daemon_spec.name(), daemon_spec.host))
1768
1769 out, err, code = self._run_cephadm(
1770 daemon_spec.host, daemon_spec.name(), 'deploy',
1771 [
1772 '--name', daemon_spec.name(),
1773 ] + daemon_spec.extra_args,
1774 stdin=json.dumps(cephadm_config),
1775 image=image)
1776 if not code and daemon_spec.host in self.cache.daemons:
1777 # prime cached service state with what we (should have)
1778 # just created
1779 sd = orchestrator.DaemonDescription()
1780 sd.daemon_type = daemon_spec.daemon_type
1781 sd.daemon_id = daemon_spec.daemon_id
1782 sd.hostname = daemon_spec.host
1783 sd.status = 1
1784 sd.status_desc = 'starting'
1785 self.cache.add_daemon(daemon_spec.host, sd)
1786 if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1787 self.requires_post_actions.add(daemon_spec.daemon_type)
1788 self.cache.invalidate_host_daemons(daemon_spec.host)
1789 self.cache.update_daemon_config_deps(
1790 daemon_spec.host, daemon_spec.name(), deps, start_time)
1791 self.cache.save_host(daemon_spec.host)
1792 msg = "{} {} on host '{}'".format(
1793 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1794 if not code:
1795 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1796 else:
1797 what = 'reconfigure' if reconfig else 'deploy'
1798 self.events.for_daemon(
1799 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1800 return msg
1801
1802 @forall_hosts
1803 def _remove_daemons(self, name: str, host: str) -> str:
1804 return self._remove_daemon(name, host)
1805
1806 def _remove_daemon(self, name: str, host: str) -> str:
1807 """
1808 Remove a daemon
1809 """
1810 (daemon_type, daemon_id) = name.split('.', 1)
1811 daemon = orchestrator.DaemonDescription(
1812 daemon_type=daemon_type,
1813 daemon_id=daemon_id,
1814 hostname=host)
1815
1816 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1817
1818 self.cephadm_services[daemon_type].pre_remove(daemon)
1819
1820 args = ['--name', name, '--force']
1821 self.log.info('Removing daemon %s from %s' % (name, host))
1822 out, err, code = self._run_cephadm(
1823 host, name, 'rm-daemon', args)
1824 if not code:
1825 # remove item from cache
1826 self.cache.rm_daemon(host, name)
1827 self.cache.invalidate_host_daemons(host)
1828
1829 self.cephadm_services[daemon_type].post_remove(daemon)
1830
1831 return "Removed {} from host '{}'".format(name, host)
1832
1833 def _check_pool_exists(self, pool: str, service_name: str) -> None:
1834 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
1835 if not self.rados.pool_exists(pool):
1836 raise OrchestratorError(f'Cannot find pool "{pool}" for '
1837 f'service {service_name}')
1838
1839 def _add_daemon(self,
1840 daemon_type: str,
1841 spec: ServiceSpec,
1842 create_func: Callable[..., CephadmDaemonSpec],
1843 config_func: Optional[Callable] = None) -> List[str]:
1844 """
1845 Add (and place) a daemon. Require explicit host placement. Do not
1846 schedule, and do not apply the related scheduling limitations.
1847 """
1848 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
1849 if not spec.placement.hosts:
1850 raise OrchestratorError('must specify host(s) to deploy on')
1851 count = spec.placement.count or len(spec.placement.hosts)
1852 daemons = self.cache.get_daemons_by_service(spec.service_name())
1853 return self._create_daemons(daemon_type, spec, daemons,
1854 spec.placement.hosts, count,
1855 create_func, config_func)
1856
1857 def _create_daemons(self,
1858 daemon_type: str,
1859 spec: ServiceSpec,
1860 daemons: List[DaemonDescription],
1861 hosts: List[HostPlacementSpec],
1862 count: int,
1863 create_func: Callable[..., CephadmDaemonSpec],
1864 config_func: Optional[Callable] = None) -> List[str]:
1865 if count > len(hosts):
1866 raise OrchestratorError('too few hosts: want %d, have %s' % (
1867 count, hosts))
1868
1869 did_config = False
1870
1871 args = [] # type: List[CephadmDaemonSpec]
1872 for host, network, name in hosts:
1873 daemon_id = self.get_unique_name(daemon_type, host, daemons,
1874 prefix=spec.service_id,
1875 forcename=name)
1876
1877 if not did_config and config_func:
1878 if daemon_type == 'rgw':
1879 config_func(spec, daemon_id)
1880 else:
1881 config_func(spec)
1882 did_config = True
1883
1884 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
1885 host, daemon_id, network, spec)
1886 self.log.debug('Placing %s.%s on host %s' % (
1887 daemon_type, daemon_id, host))
1888 args.append(daemon_spec)
1889
1890 # add to daemon list so next name(s) will also be unique
1891 sd = orchestrator.DaemonDescription(
1892 hostname=host,
1893 daemon_type=daemon_type,
1894 daemon_id=daemon_id,
1895 )
1896 daemons.append(sd)
1897
1898 @forall_hosts
1899 def create_func_map(*args: Any) -> str:
1900 daemon_spec = create_func(*args)
1901 return self._create_daemon(daemon_spec)
1902
1903 return create_func_map(args)
1904
1905 @trivial_completion
1906 def apply_mon(self, spec: ServiceSpec) -> str:
1907 return self._apply(spec)
1908
1909 @trivial_completion
1910 def add_mon(self, spec):
1911 # type: (ServiceSpec) -> List[str]
1912 return self._add_daemon('mon', spec, self.mon_service.prepare_create)
1913
1914 @trivial_completion
1915 def add_mgr(self, spec):
1916 # type: (ServiceSpec) -> List[str]
1917 return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
1918
1919 def _apply(self, spec: GenericSpec) -> str:
1920 if spec.service_type == 'host':
1921 return self._add_host(cast(HostSpec, spec))
1922
1923 if spec.service_type == 'osd':
1924 # _trigger preview refresh needs to be smart and
1925 # should only refresh if a change has been detected
1926 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
1927
1928 return self._apply_service_spec(cast(ServiceSpec, spec))
1929
1930 def _plan(self, spec: ServiceSpec) -> dict:
1931 if spec.service_type == 'osd':
1932 return {'service_name': spec.service_name(),
1933 'service_type': spec.service_type,
1934 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
1935
1936 ha = HostAssignment(
1937 spec=spec,
1938 hosts=self._hosts_with_daemon_inventory(),
1939 get_daemons_func=self.cache.get_daemons_by_service,
1940 )
1941 ha.validate()
1942 hosts = ha.place()
1943
1944 add_daemon_hosts = ha.add_daemon_hosts(hosts)
1945 remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
1946
1947 return {
1948 'service_name': spec.service_name(),
1949 'service_type': spec.service_type,
1950 'add': [hs.hostname for hs in add_daemon_hosts],
1951 'remove': [d.hostname for d in remove_daemon_hosts]
1952 }
1953
1954 @trivial_completion
1955 def plan(self, specs: List[GenericSpec]) -> List:
1956 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1957 'to the current inventory setup. If any on these conditions changes, the \n'
1958 'preview will be invalid. Please make sure to have a minimal \n'
1959 'timeframe between planning and applying the specs.'}]
1960 if any([spec.service_type == 'host' for spec in specs]):
1961 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1962 for spec in specs:
1963 results.append(self._plan(cast(ServiceSpec, spec)))
1964 return results
1965
1966 def _apply_service_spec(self, spec: ServiceSpec) -> str:
1967 if spec.placement.is_empty():
1968 # fill in default placement
1969 defaults = {
1970 'mon': PlacementSpec(count=5),
1971 'mgr': PlacementSpec(count=2),
1972 'mds': PlacementSpec(count=2),
1973 'rgw': PlacementSpec(count=2),
1974 'iscsi': PlacementSpec(count=1),
1975 'rbd-mirror': PlacementSpec(count=2),
1976 'nfs': PlacementSpec(count=1),
1977 'grafana': PlacementSpec(count=1),
1978 'alertmanager': PlacementSpec(count=1),
1979 'prometheus': PlacementSpec(count=1),
1980 'node-exporter': PlacementSpec(host_pattern='*'),
1981 'crash': PlacementSpec(host_pattern='*'),
1982 'container': PlacementSpec(count=1),
1983 }
1984 spec.placement = defaults[spec.service_type]
1985 elif spec.service_type in ['mon', 'mgr'] and \
1986 spec.placement.count is not None and \
1987 spec.placement.count < 1:
1988 raise OrchestratorError('cannot scale %s service below 1' % (
1989 spec.service_type))
1990
1991 HostAssignment(
1992 spec=spec,
1993 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
1994 get_daemons_func=self.cache.get_daemons_by_service,
1995 ).validate()
1996
1997 self.log.info('Saving service %s spec with placement %s' % (
1998 spec.service_name(), spec.placement.pretty_str()))
1999 self.spec_store.save(spec)
2000 self._kick_serve_loop()
2001 return "Scheduled %s update..." % spec.service_name()
2002
2003 @trivial_completion
2004 def apply(self, specs: List[GenericSpec]) -> List[str]:
2005 results = []
2006 for spec in specs:
2007 results.append(self._apply(spec))
2008 return results
2009
2010 @trivial_completion
2011 def apply_mgr(self, spec: ServiceSpec) -> str:
2012 return self._apply(spec)
2013
2014 @trivial_completion
2015 def add_mds(self, spec: ServiceSpec) -> List[str]:
2016 return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config)
2017
2018 @trivial_completion
2019 def apply_mds(self, spec: ServiceSpec) -> str:
2020 return self._apply(spec)
2021
2022 @trivial_completion
2023 def add_rgw(self, spec: ServiceSpec) -> List[str]:
2024 return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
2025
2026 @trivial_completion
2027 def apply_rgw(self, spec: ServiceSpec) -> str:
2028 return self._apply(spec)
2029
2030 @trivial_completion
2031 def add_iscsi(self, spec):
2032 # type: (ServiceSpec) -> List[str]
2033 return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
2034
2035 @trivial_completion
2036 def apply_iscsi(self, spec: ServiceSpec) -> str:
2037 return self._apply(spec)
2038
2039 @trivial_completion
2040 def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]:
2041 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
2042
2043 @trivial_completion
2044 def apply_rbd_mirror(self, spec: ServiceSpec) -> str:
2045 return self._apply(spec)
2046
2047 @trivial_completion
2048 def add_nfs(self, spec: ServiceSpec) -> List[str]:
2049 return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
2050
2051 @trivial_completion
2052 def apply_nfs(self, spec: ServiceSpec) -> str:
2053 return self._apply(spec)
2054
2055 def _get_dashboard_url(self):
2056 # type: () -> str
2057 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2058
2059 @trivial_completion
2060 def add_prometheus(self, spec: ServiceSpec) -> List[str]:
2061 return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
2062
2063 @trivial_completion
2064 def apply_prometheus(self, spec: ServiceSpec) -> str:
2065 return self._apply(spec)
2066
2067 @trivial_completion
2068 def add_node_exporter(self, spec):
2069 # type: (ServiceSpec) -> List[str]
2070 return self._add_daemon('node-exporter', spec,
2071 self.node_exporter_service.prepare_create)
2072
2073 @trivial_completion
2074 def apply_node_exporter(self, spec: ServiceSpec) -> str:
2075 return self._apply(spec)
2076
2077 @trivial_completion
2078 def add_crash(self, spec):
2079 # type: (ServiceSpec) -> List[str]
2080 return self._add_daemon('crash', spec,
2081 self.crash_service.prepare_create)
2082
2083 @trivial_completion
2084 def apply_crash(self, spec: ServiceSpec) -> str:
2085 return self._apply(spec)
2086
2087 @trivial_completion
2088 def add_grafana(self, spec):
2089 # type: (ServiceSpec) -> List[str]
2090 return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
2091
2092 @trivial_completion
2093 def apply_grafana(self, spec: ServiceSpec) -> str:
2094 return self._apply(spec)
2095
2096 @trivial_completion
2097 def add_alertmanager(self, spec):
2098 # type: (ServiceSpec) -> List[str]
2099 return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
2100
2101 @trivial_completion
2102 def apply_alertmanager(self, spec: ServiceSpec) -> str:
2103 return self._apply(spec)
2104
2105 @trivial_completion
2106 def add_container(self, spec: ServiceSpec) -> List[str]:
2107 return self._add_daemon('container', spec,
2108 self.container_service.prepare_create)
2109
2110 @trivial_completion
2111 def apply_container(self, spec: ServiceSpec) -> str:
2112 return self._apply(spec)
2113
2114 def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
2115 # pick a random host...
2116 host = None
2117 for host_name in self.inventory.keys():
2118 host = host_name
2119 break
2120 if not host:
2121 raise OrchestratorError('no hosts defined')
2122 if self.cache.host_needs_registry_login(host) and self.registry_url:
2123 self._registry_login(host, self.registry_url,
2124 self.registry_username, self.registry_password)
2125 out, err, code = self._run_cephadm(
2126 host, '', 'pull', [],
2127 image=image_name,
2128 no_fsid=True,
2129 error_ok=True)
2130 if code:
2131 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2132 image_name, host, '\n'.join(out)))
2133 try:
2134 j = json.loads('\n'.join(out))
2135 r = ContainerInspectInfo(
2136 j['image_id'],
2137 j.get('ceph_version'),
2138 j.get('repo_digest')
2139 )
2140 self.log.debug(f'image {image_name} -> {r}')
2141 return r
2142 except (ValueError, KeyError) as _:
2143 msg = 'Failed to pull %s on %s: Cannot decode JSON' % (image_name, host)
2144 self.log.exception('%s: \'%s\'' % (msg, '\n'.join(out)))
2145 raise OrchestratorError(msg)
2146
2147 @trivial_completion
2148 def upgrade_check(self, image: str, version: str) -> str:
2149 if version:
2150 target_name = self.container_image_base + ':v' + version
2151 elif image:
2152 target_name = image
2153 else:
2154 raise OrchestratorError('must specify either image or version')
2155
2156 image_info = self._get_container_image_info(target_name)
2157 self.log.debug(f'image info {image} -> {image_info}')
2158 r: dict = {
2159 'target_name': target_name,
2160 'target_id': image_info.image_id,
2161 'target_version': image_info.ceph_version,
2162 'needs_update': dict(),
2163 'up_to_date': list(),
2164 }
2165 for host, dm in self.cache.daemons.items():
2166 for name, dd in dm.items():
2167 if image_info.image_id == dd.container_image_id:
2168 r['up_to_date'].append(dd.name())
2169 else:
2170 r['needs_update'][dd.name()] = {
2171 'current_name': dd.container_image_name,
2172 'current_id': dd.container_image_id,
2173 'current_version': dd.version,
2174 }
2175 if self.use_repo_digest:
2176 r['target_digest'] = image_info.repo_digest
2177
2178 return json.dumps(r, indent=4, sort_keys=True)
2179
2180 @trivial_completion
2181 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
2182 return self.upgrade.upgrade_status()
2183
2184 @trivial_completion
2185 def upgrade_start(self, image: str, version: str) -> str:
2186 return self.upgrade.upgrade_start(image, version)
2187
2188 @trivial_completion
2189 def upgrade_pause(self) -> str:
2190 return self.upgrade.upgrade_pause()
2191
2192 @trivial_completion
2193 def upgrade_resume(self) -> str:
2194 return self.upgrade.upgrade_resume()
2195
2196 @trivial_completion
2197 def upgrade_stop(self) -> str:
2198 return self.upgrade.upgrade_stop()
2199
2200 @trivial_completion
2201 def remove_osds(self, osd_ids: List[str],
2202 replace: bool = False,
2203 force: bool = False) -> str:
2204 """
2205 Takes a list of OSDs and schedules them for removal.
2206 The function that takes care of the actual removal is
2207 process_removal_queue().
2208 """
2209
2210 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2211 to_remove_daemons = list()
2212 for daemon in daemons:
2213 if daemon.daemon_id in osd_ids:
2214 to_remove_daemons.append(daemon)
2215 if not to_remove_daemons:
2216 return f"Unable to find OSDs: {osd_ids}"
2217
2218 for daemon in to_remove_daemons:
2219 try:
2220 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2221 replace=replace,
2222 force=force,
2223 hostname=daemon.hostname,
2224 fullname=daemon.name(),
2225 process_started_at=datetime_now(),
2226 remove_util=self.to_remove_osds.rm_util))
2227 except NotFoundError:
2228 return f"Unable to find OSDs: {osd_ids}"
2229
2230 # trigger the serve loop to initiate the removal
2231 self._kick_serve_loop()
2232 return "Scheduled OSD(s) for removal"
2233
2234 @trivial_completion
2235 def stop_remove_osds(self, osd_ids: List[str]) -> str:
2236 """
2237 Stops a `removal` process for a List of OSDs.
2238 This will revert their weight and remove it from the osds_to_remove queue
2239 """
2240 for osd_id in osd_ids:
2241 try:
2242 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
2243 remove_util=self.to_remove_osds.rm_util))
2244 except (NotFoundError, KeyError):
2245 return f'Unable to find OSD in the queue: {osd_id}'
2246
2247 # trigger the serve loop to halt the removal
2248 self._kick_serve_loop()
2249 return "Stopped OSD(s) removal"
2250
2251 @trivial_completion
2252 def remove_osds_status(self) -> List[OSD]:
2253 """
2254 The CLI call to retrieve an osd removal report
2255 """
2256 return self.to_remove_osds.all_osds()