]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 import re
5 import shlex
6 from collections import defaultdict
7 from configparser import ConfigParser
8 from contextlib import contextmanager
9 from functools import wraps
10 from tempfile import TemporaryDirectory
11 from threading import Event
12
13 import string
14 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
15 Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
16
17 import datetime
18 import six
19 import os
20 import random
21 import tempfile
22 import multiprocessing.pool
23 import subprocess
24
25 from ceph.deployment import inventory
26 from ceph.deployment.drive_group import DriveGroupSpec
27 from ceph.deployment.service_spec import \
28 NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
29 CustomContainerSpec
30 from cephadm.serve import CephadmServe
31 from cephadm.services.cephadmservice import CephadmDaemonSpec
32
33 from mgr_module import MgrModule, HandleCommandResult
34 import orchestrator
35 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
36 CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
37 from orchestrator._interface import GenericSpec
38
39 from . import remotes
40 from . import utils
41 from .migrations import Migrations
42 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
43 RbdMirrorService, CrashService, CephadmService
44 from .services.container import CustomContainerService
45 from .services.iscsi import IscsiService
46 from .services.nfs import NFSService
47 from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
48 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
49 NodeExporterService
50 from .schedule import HostAssignment
51 from .inventory import Inventory, SpecStore, HostCache, EventStore
52 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
53 from .template import TemplateMgr
54 from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, \
55 str_to_datetime, datetime_to_str
56
57 try:
58 import remoto
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils.version import StrictVersion
62 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self):
64 return self.gateway.hasreceiver()
65
66 from remoto.backends import BaseConnection
67 BaseConnection.has_connection = remoto_has_connection
68 import remoto.process
69 import execnet.gateway_bootstrap
70 except ImportError as e:
71 remoto = None
72 remoto_import_error = str(e)
73
74 try:
75 from typing import List
76 except ImportError:
77 pass
78
79 logger = logging.getLogger(__name__)
80
81 T = TypeVar('T')
82
83 DEFAULT_SSH_CONFIG = """
84 Host *
85 User root
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
88 ConnectTimeout=30
89 """
90
91 CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
92
93 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
94
95
96 class CephadmCompletion(orchestrator.Completion[T]):
97 def evaluate(self):
98 self.finalize(None)
99
100
101 def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
102 """
103 Decorator to make CephadmCompletion methods return
104 a completion object that executes themselves.
105 """
106
107 @wraps(f)
108 def wrapper(*args, **kwargs):
109 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
110
111 return wrapper
112
113
114 class ContainerInspectInfo(NamedTuple):
115 image_id: str
116 ceph_version: Optional[str]
117 repo_digest: Optional[str]
118
119
120 @six.add_metaclass(CLICommandMeta)
121 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
122
123 _STORE_HOST_PREFIX = "host"
124
125 instance = None
126 NATIVE_OPTIONS = [] # type: List[Any]
127 MODULE_OPTIONS: List[dict] = [
128 {
129 'name': 'ssh_config_file',
130 'type': 'str',
131 'default': None,
132 'desc': 'customized SSH config file to connect to managed hosts',
133 },
134 {
135 'name': 'device_cache_timeout',
136 'type': 'secs',
137 'default': 30 * 60,
138 'desc': 'seconds to cache device inventory',
139 },
140 {
141 'name': 'daemon_cache_timeout',
142 'type': 'secs',
143 'default': 10 * 60,
144 'desc': 'seconds to cache service (daemon) inventory',
145 },
146 {
147 'name': 'host_check_interval',
148 'type': 'secs',
149 'default': 10 * 60,
150 'desc': 'how frequently to perform a host check',
151 },
152 {
153 'name': 'mode',
154 'type': 'str',
155 'enum_allowed': ['root', 'cephadm-package'],
156 'default': 'root',
157 'desc': 'mode for remote execution of cephadm',
158 },
159 {
160 'name': 'container_image_base',
161 'default': 'docker.io/ceph/ceph',
162 'desc': 'Container image name, without the tag',
163 'runtime': True,
164 },
165 {
166 'name': 'container_image_prometheus',
167 'default': 'docker.io/prom/prometheus:v2.18.1',
168 'desc': 'Prometheus container image',
169 },
170 {
171 'name': 'container_image_grafana',
172 'default': 'docker.io/ceph/ceph-grafana:6.6.2',
173 'desc': 'Prometheus container image',
174 },
175 {
176 'name': 'container_image_alertmanager',
177 'default': 'docker.io/prom/alertmanager:v0.20.0',
178 'desc': 'Prometheus container image',
179 },
180 {
181 'name': 'container_image_node_exporter',
182 'default': 'docker.io/prom/node-exporter:v0.18.1',
183 'desc': 'Prometheus container image',
184 },
185 {
186 'name': 'warn_on_stray_hosts',
187 'type': 'bool',
188 'default': True,
189 'desc': 'raise a health warning if daemons are detected on a host '
190 'that is not managed by cephadm',
191 },
192 {
193 'name': 'warn_on_stray_daemons',
194 'type': 'bool',
195 'default': True,
196 'desc': 'raise a health warning if daemons are detected '
197 'that are not managed by cephadm',
198 },
199 {
200 'name': 'warn_on_failed_host_check',
201 'type': 'bool',
202 'default': True,
203 'desc': 'raise a health warning if the host check fails',
204 },
205 {
206 'name': 'log_to_cluster',
207 'type': 'bool',
208 'default': True,
209 'desc': 'log to the "cephadm" cluster log channel"',
210 },
211 {
212 'name': 'allow_ptrace',
213 'type': 'bool',
214 'default': False,
215 'desc': 'allow SYS_PTRACE capability on ceph containers',
216 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
217 'process with gdb or strace. Enabling this options '
218 'can allow debugging daemons that encounter problems '
219 'at runtime.',
220 },
221 {
222 'name': 'container_init',
223 'type': 'bool',
224 'default': False,
225 'desc': 'Run podman/docker with `--init`',
226 },
227 {
228 'name': 'prometheus_alerts_path',
229 'type': 'str',
230 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
231 'desc': 'location of alerts to include in prometheus deployments',
232 },
233 {
234 'name': 'migration_current',
235 'type': 'int',
236 'default': None,
237 'desc': 'internal - do not modify',
238 # used to track track spec and other data migrations.
239 },
240 {
241 'name': 'config_dashboard',
242 'type': 'bool',
243 'default': True,
244 'desc': 'manage configs like API endpoints in Dashboard.'
245 },
246 {
247 'name': 'manage_etc_ceph_ceph_conf',
248 'type': 'bool',
249 'default': False,
250 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
251 },
252 {
253 'name': 'registry_url',
254 'type': 'str',
255 'default': None,
256 'desc': 'Custom repository url'
257 },
258 {
259 'name': 'registry_username',
260 'type': 'str',
261 'default': None,
262 'desc': 'Custom repository username'
263 },
264 {
265 'name': 'registry_password',
266 'type': 'str',
267 'default': None,
268 'desc': 'Custom repository password'
269 },
270 {
271 'name': 'use_repo_digest',
272 'type': 'bool',
273 'default': False,
274 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
275 }
276 ]
277
278 def __init__(self, *args, **kwargs):
279 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
280 self._cluster_fsid = self.get('mon_map')['fsid']
281 self.last_monmap: Optional[datetime.datetime] = None
282
283 # for serve()
284 self.run = True
285 self.event = Event()
286
287 if self.get_store('pause'):
288 self.paused = True
289 else:
290 self.paused = False
291
292 # for mypy which does not run the code
293 if TYPE_CHECKING:
294 self.ssh_config_file = None # type: Optional[str]
295 self.device_cache_timeout = 0
296 self.daemon_cache_timeout = 0
297 self.host_check_interval = 0
298 self.mode = ''
299 self.container_image_base = ''
300 self.container_image_prometheus = ''
301 self.container_image_grafana = ''
302 self.container_image_alertmanager = ''
303 self.container_image_node_exporter = ''
304 self.warn_on_stray_hosts = True
305 self.warn_on_stray_daemons = True
306 self.warn_on_failed_host_check = True
307 self.allow_ptrace = False
308 self.container_init = False
309 self.prometheus_alerts_path = ''
310 self.migration_current = None
311 self.config_dashboard = True
312 self.manage_etc_ceph_ceph_conf = True
313 self.registry_url: Optional[str] = None
314 self.registry_username: Optional[str] = None
315 self.registry_password: Optional[str] = None
316 self.use_repo_digest = False
317
318 self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
319 remoto.backends.LegacyModuleExecute]] = {}
320
321 self.notify('mon_map', None)
322 self.config_notify()
323
324 path = self.get_ceph_option('cephadm_path')
325 try:
326 with open(path, 'r') as f:
327 self._cephadm = f.read()
328 except (IOError, TypeError) as e:
329 raise RuntimeError("unable to read cephadm at '%s': %s" % (
330 path, str(e)))
331
332 self._worker_pool = multiprocessing.pool.ThreadPool(10)
333
334 self._reconfig_ssh()
335
336 CephadmOrchestrator.instance = self
337
338 self.upgrade = CephadmUpgrade(self)
339
340 self.health_checks = {}
341
342 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
343
344 self.inventory = Inventory(self)
345
346 self.cache = HostCache(self)
347 self.cache.load()
348
349 self.rm_util = RemoveUtil(self)
350 self.to_remove_osds = OSDQueue()
351 self.rm_util.load_from_store()
352
353 self.spec_store = SpecStore(self)
354 self.spec_store.load()
355
356 # ensure the host lists are in sync
357 for h in self.inventory.keys():
358 if h not in self.cache.daemons:
359 self.cache.prime_empty_host(h)
360 for h in self.cache.get_hosts():
361 if h not in self.inventory:
362 self.cache.rm_host(h)
363
364 # in-memory only.
365 self.events = EventStore(self)
366 self.offline_hosts: Set[str] = set()
367
368 self.migration = Migrations(self)
369
370 # services:
371 self.osd_service = OSDService(self)
372 self.nfs_service = NFSService(self)
373 self.mon_service = MonService(self)
374 self.mgr_service = MgrService(self)
375 self.mds_service = MdsService(self)
376 self.rgw_service = RgwService(self)
377 self.rbd_mirror_service = RbdMirrorService(self)
378 self.grafana_service = GrafanaService(self)
379 self.alertmanager_service = AlertmanagerService(self)
380 self.prometheus_service = PrometheusService(self)
381 self.node_exporter_service = NodeExporterService(self)
382 self.crash_service = CrashService(self)
383 self.iscsi_service = IscsiService(self)
384 self.container_service = CustomContainerService(self)
385 self.cephadm_services = {
386 'mon': self.mon_service,
387 'mgr': self.mgr_service,
388 'osd': self.osd_service,
389 'mds': self.mds_service,
390 'rgw': self.rgw_service,
391 'rbd-mirror': self.rbd_mirror_service,
392 'nfs': self.nfs_service,
393 'grafana': self.grafana_service,
394 'alertmanager': self.alertmanager_service,
395 'prometheus': self.prometheus_service,
396 'node-exporter': self.node_exporter_service,
397 'crash': self.crash_service,
398 'iscsi': self.iscsi_service,
399 'container': self.container_service,
400 }
401
402 self.template = TemplateMgr(self)
403
404 self.requires_post_actions = set()
405
406 def shutdown(self):
407 self.log.debug('shutdown')
408 self._worker_pool.close()
409 self._worker_pool.join()
410 self.run = False
411 self.event.set()
412
413 def _get_cephadm_service(self, service_type: str) -> CephadmService:
414 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
415 return self.cephadm_services[service_type]
416
417 def _kick_serve_loop(self):
418 self.log.debug('_kick_serve_loop')
419 self.event.set()
420
421 # function responsible for logging single host into custom registry
422 def _registry_login(self, host, url, username, password):
423 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
424 # want to pass info over stdin rather than through normal list of args
425 args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
426 " \"password\": \"" + password + "\"}")
427 out, err, code = self._run_cephadm(
428 host, 'mon', 'registry-login',
429 ['--registry-json', '-'], stdin=args_str, error_ok=True)
430 if code:
431 return f"Host {host} failed to login to {url} as {username} with given password"
432 return
433
434 def serve(self) -> None:
435 """
436 The main loop of cephadm.
437
438 A command handler will typically change the declarative state
439 of cephadm. This loop will then attempt to apply this new state.
440 """
441 serve = CephadmServe(self)
442 serve.serve()
443
444 def set_container_image(self, entity: str, image):
445 self.check_mon_command({
446 'prefix': 'config set',
447 'name': 'container_image',
448 'value': image,
449 'who': entity,
450 })
451
452 def config_notify(self):
453 """
454 This method is called whenever one of our config options is changed.
455
456 TODO: this method should be moved into mgr_module.py
457 """
458 for opt in self.MODULE_OPTIONS:
459 setattr(self,
460 opt['name'], # type: ignore
461 self.get_module_option(opt['name'])) # type: ignore
462 self.log.debug(' mgr option %s = %s',
463 opt['name'], getattr(self, opt['name'])) # type: ignore
464 for opt in self.NATIVE_OPTIONS:
465 setattr(self,
466 opt, # type: ignore
467 self.get_ceph_option(opt))
468 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
469
470 self.event.set()
471
472 def notify(self, notify_type, notify_id):
473 if notify_type == "mon_map":
474 # get monmap mtime so we can refresh configs when mons change
475 monmap = self.get('mon_map')
476 self.last_monmap = datetime.datetime.strptime(
477 monmap['modified'], CEPH_DATEFMT)
478 if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
479 self.last_monmap = None # just in case clocks are skewed
480 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
481 # getattr, due to notify() being called before config_notify()
482 self._kick_serve_loop()
483 if notify_type == "pg_summary":
484 self._trigger_osd_removal()
485
486 def _trigger_osd_removal(self):
487 data = self.get("osd_stats")
488 for osd in data.get('osd_stats', []):
489 if osd.get('num_pgs') == 0:
490 # if _ANY_ osd that is currently in the queue appears to be empty,
491 # start the removal process
492 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
493 self.log.debug(f"Found empty osd. Starting removal process")
494 # if the osd that is now empty is also part of the removal queue
495 # start the process
496 self.rm_util.process_removal_queue()
497
498 def pause(self):
499 if not self.paused:
500 self.log.info('Paused')
501 self.set_store('pause', 'true')
502 self.paused = True
503 # wake loop so we update the health status
504 self._kick_serve_loop()
505
506 def resume(self):
507 if self.paused:
508 self.log.info('Resumed')
509 self.paused = False
510 self.set_store('pause', None)
511 # unconditionally wake loop so that 'orch resume' can be used to kick
512 # cephadm
513 self._kick_serve_loop()
514
515 def get_unique_name(self, daemon_type, host, existing, prefix=None,
516 forcename=None):
517 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
518 """
519 Generate a unique random service name
520 """
521 suffix = daemon_type not in [
522 'mon', 'crash', 'nfs',
523 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
524 'container'
525 ]
526 if forcename:
527 if len([d for d in existing if d.daemon_id == forcename]):
528 raise orchestrator.OrchestratorValidationError(
529 f'name {daemon_type}.{forcename} already in use')
530 return forcename
531
532 if '.' in host:
533 host = host.split('.')[0]
534 while True:
535 if prefix:
536 name = prefix + '.'
537 else:
538 name = ''
539 name += host
540 if suffix:
541 name += '.' + ''.join(random.choice(string.ascii_lowercase)
542 for _ in range(6))
543 if len([d for d in existing if d.daemon_id == name]):
544 if not suffix:
545 raise orchestrator.OrchestratorValidationError(
546 f'name {daemon_type}.{name} already in use')
547 self.log.debug('name %s exists, trying again', name)
548 continue
549 return name
550
551 def _reconfig_ssh(self):
552 temp_files = [] # type: list
553 ssh_options = [] # type: List[str]
554
555 # ssh_config
556 ssh_config_fname = self.ssh_config_file
557 ssh_config = self.get_store("ssh_config")
558 if ssh_config is not None or ssh_config_fname is None:
559 if not ssh_config:
560 ssh_config = DEFAULT_SSH_CONFIG
561 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
562 os.fchmod(f.fileno(), 0o600)
563 f.write(ssh_config.encode('utf-8'))
564 f.flush() # make visible to other processes
565 temp_files += [f]
566 ssh_config_fname = f.name
567 if ssh_config_fname:
568 self.validate_ssh_config_fname(ssh_config_fname)
569 ssh_options += ['-F', ssh_config_fname]
570 self.ssh_config = ssh_config
571
572 # identity
573 ssh_key = self.get_store("ssh_identity_key")
574 ssh_pub = self.get_store("ssh_identity_pub")
575 self.ssh_pub = ssh_pub
576 self.ssh_key = ssh_key
577 if ssh_key and ssh_pub:
578 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
579 tkey.write(ssh_key.encode('utf-8'))
580 os.fchmod(tkey.fileno(), 0o600)
581 tkey.flush() # make visible to other processes
582 tpub = open(tkey.name + '.pub', 'w')
583 os.fchmod(tpub.fileno(), 0o600)
584 tpub.write(ssh_pub)
585 tpub.flush() # make visible to other processes
586 temp_files += [tkey, tpub]
587 ssh_options += ['-i', tkey.name]
588
589 self._temp_files = temp_files
590 if ssh_options:
591 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
592 else:
593 self._ssh_options = None
594
595 if self.mode == 'root':
596 self.ssh_user = self.get_store('ssh_user', default='root')
597 elif self.mode == 'cephadm-package':
598 self.ssh_user = 'cephadm'
599
600 self._reset_cons()
601
602 def validate_ssh_config_content(self, ssh_config):
603 if ssh_config is None or len(ssh_config.strip()) == 0:
604 raise OrchestratorValidationError('ssh_config cannot be empty')
605 # StrictHostKeyChecking is [yes|no] ?
606 l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
607 if not l:
608 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
609 for s in l:
610 if 'ask' in s.lower():
611 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
612
613 def validate_ssh_config_fname(self, ssh_config_fname):
614 if not os.path.isfile(ssh_config_fname):
615 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
616 ssh_config_fname))
617
618 def _reset_con(self, host):
619 conn, r = self._cons.get(host, (None, None))
620 if conn:
621 self.log.debug('_reset_con close %s' % host)
622 conn.exit()
623 del self._cons[host]
624
625 def _reset_cons(self):
626 for host, conn_and_r in self._cons.items():
627 self.log.debug('_reset_cons close %s' % host)
628 conn, r = conn_and_r
629 conn.exit()
630 self._cons = {}
631
632 def offline_hosts_remove(self, host):
633 if host in self.offline_hosts:
634 self.offline_hosts.remove(host)
635
636 @staticmethod
637 def can_run():
638 if remoto is not None:
639 return True, ""
640 else:
641 return False, "loading remoto library:{}".format(
642 remoto_import_error)
643
644 def available(self):
645 """
646 The cephadm orchestrator is always available.
647 """
648 ok, err = self.can_run()
649 if not ok:
650 return ok, err
651 if not self.ssh_key or not self.ssh_pub:
652 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
653 return True, ''
654
655 def process(self, completions):
656 """
657 Does nothing, as completions are processed in another thread.
658 """
659 if completions:
660 self.log.debug("process: completions={0}".format(
661 orchestrator.pretty_print(completions)))
662
663 for p in completions:
664 p.evaluate()
665
666 @orchestrator._cli_write_command(
667 prefix='cephadm set-ssh-config',
668 desc='Set the ssh_config file (use -i <ssh_config>)')
669 def _set_ssh_config(self, inbuf=None):
670 """
671 Set an ssh_config file provided from stdin
672 """
673 if inbuf == self.ssh_config:
674 return 0, "value unchanged", ""
675 self.validate_ssh_config_content(inbuf)
676 self.set_store("ssh_config", inbuf)
677 self.log.info('Set ssh_config')
678 self._reconfig_ssh()
679 return 0, "", ""
680
681 @orchestrator._cli_write_command(
682 prefix='cephadm clear-ssh-config',
683 desc='Clear the ssh_config file')
684 def _clear_ssh_config(self):
685 """
686 Clear the ssh_config file provided from stdin
687 """
688 self.set_store("ssh_config", None)
689 self.ssh_config_tmp = None
690 self.log.info('Cleared ssh_config')
691 self._reconfig_ssh()
692 return 0, "", ""
693
694 @orchestrator._cli_read_command(
695 prefix='cephadm get-ssh-config',
696 desc='Returns the ssh config as used by cephadm'
697 )
698 def _get_ssh_config(self):
699 if self.ssh_config_file:
700 self.validate_ssh_config_fname(self.ssh_config_file)
701 with open(self.ssh_config_file) as f:
702 return HandleCommandResult(stdout=f.read())
703 ssh_config = self.get_store("ssh_config")
704 if ssh_config:
705 return HandleCommandResult(stdout=ssh_config)
706 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
707
708 @orchestrator._cli_write_command(
709 'cephadm generate-key',
710 desc='Generate a cluster SSH key (if not present)')
711 def _generate_key(self):
712 if not self.ssh_pub or not self.ssh_key:
713 self.log.info('Generating ssh key...')
714 tmp_dir = TemporaryDirectory()
715 path = tmp_dir.name + '/key'
716 try:
717 subprocess.check_call([
718 '/usr/bin/ssh-keygen',
719 '-C', 'ceph-%s' % self._cluster_fsid,
720 '-N', '',
721 '-f', path
722 ])
723 with open(path, 'r') as f:
724 secret = f.read()
725 with open(path + '.pub', 'r') as f:
726 pub = f.read()
727 finally:
728 os.unlink(path)
729 os.unlink(path + '.pub')
730 tmp_dir.cleanup()
731 self.set_store('ssh_identity_key', secret)
732 self.set_store('ssh_identity_pub', pub)
733 self._reconfig_ssh()
734 return 0, '', ''
735
736 @orchestrator._cli_write_command(
737 'cephadm set-priv-key',
738 desc='Set cluster SSH private key (use -i <private_key>)')
739 def _set_priv_key(self, inbuf=None):
740 if inbuf is None or len(inbuf) == 0:
741 return -errno.EINVAL, "", "empty private ssh key provided"
742 if inbuf == self.ssh_key:
743 return 0, "value unchanged", ""
744 self.set_store("ssh_identity_key", inbuf)
745 self.log.info('Set ssh private key')
746 self._reconfig_ssh()
747 return 0, "", ""
748
749 @orchestrator._cli_write_command(
750 'cephadm set-pub-key',
751 desc='Set cluster SSH public key (use -i <public_key>)')
752 def _set_pub_key(self, inbuf=None):
753 if inbuf is None or len(inbuf) == 0:
754 return -errno.EINVAL, "", "empty public ssh key provided"
755 if inbuf == self.ssh_pub:
756 return 0, "value unchanged", ""
757 self.set_store("ssh_identity_pub", inbuf)
758 self.log.info('Set ssh public key')
759 self._reconfig_ssh()
760 return 0, "", ""
761
762 @orchestrator._cli_write_command(
763 'cephadm clear-key',
764 desc='Clear cluster SSH key')
765 def _clear_key(self):
766 self.set_store('ssh_identity_key', None)
767 self.set_store('ssh_identity_pub', None)
768 self._reconfig_ssh()
769 self.log.info('Cleared cluster SSH key')
770 return 0, '', ''
771
772 @orchestrator._cli_read_command(
773 'cephadm get-pub-key',
774 desc='Show SSH public key for connecting to cluster hosts')
775 def _get_pub_key(self):
776 if self.ssh_pub:
777 return 0, self.ssh_pub, ''
778 else:
779 return -errno.ENOENT, '', 'No cluster SSH key defined'
780
781 @orchestrator._cli_read_command(
782 'cephadm get-user',
783 desc='Show user for SSHing to cluster hosts')
784 def _get_user(self):
785 return 0, self.ssh_user, ''
786
787 @orchestrator._cli_read_command(
788 'cephadm set-user',
789 'name=user,type=CephString',
790 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
791 def set_ssh_user(self, user):
792 current_user = self.ssh_user
793 if user == current_user:
794 return 0, "value unchanged", ""
795
796 self.set_store('ssh_user', user)
797 self._reconfig_ssh()
798
799 host = self.cache.get_hosts()[0]
800 r = CephadmServe(self)._check_host(host)
801 if r is not None:
802 # connection failed reset user
803 self.set_store('ssh_user', current_user)
804 self._reconfig_ssh()
805 return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
806
807 msg = 'ssh user set to %s' % user
808 if user != 'root':
809 msg += ' sudo will be used'
810 self.log.info(msg)
811 return 0, msg, ''
812
813 @orchestrator._cli_read_command(
814 'cephadm registry-login',
815 "name=url,type=CephString,req=false "
816 "name=username,type=CephString,req=false "
817 "name=password,type=CephString,req=false",
818 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
819 def registry_login(self, url=None, username=None, password=None, inbuf=None):
820 # if password not given in command line, get it through file input
821 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
822 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
823 "or -i <login credentials json file>")
824 elif not (url and username and password):
825 login_info = json.loads(inbuf)
826 if "url" in login_info and "username" in login_info and "password" in login_info:
827 url = login_info["url"]
828 username = login_info["username"]
829 password = login_info["password"]
830 else:
831 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
832 "Please setup json file as\n"
833 "{\n"
834 " \"url\": \"REGISTRY_URL\",\n"
835 " \"username\": \"REGISTRY_USERNAME\",\n"
836 " \"password\": \"REGISTRY_PASSWORD\"\n"
837 "}\n")
838 # verify login info works by attempting login on random host
839 host = None
840 for host_name in self.inventory.keys():
841 host = host_name
842 break
843 if not host:
844 raise OrchestratorError('no hosts defined')
845 r = self._registry_login(host, url, username, password)
846 if r is not None:
847 return 1, '', r
848 # if logins succeeded, store info
849 self.log.debug("Host logins successful. Storing login info.")
850 self.set_module_option('registry_url', url)
851 self.set_module_option('registry_username', username)
852 self.set_module_option('registry_password', password)
853 # distribute new login info to all hosts
854 self.cache.distribute_new_registry_login_info()
855 return 0, "registry login scheduled", ''
856
857 @orchestrator._cli_read_command(
858 'cephadm check-host',
859 'name=host,type=CephString '
860 'name=addr,type=CephString,req=false',
861 'Check whether we can access and manage a remote host')
862 def check_host(self, host, addr=None):
863 try:
864 out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
865 ['--expect-hostname', host],
866 addr=addr,
867 error_ok=True, no_fsid=True)
868 if code:
869 return 1, '', ('check-host failed:\n' + '\n'.join(err))
870 except OrchestratorError as e:
871 self.log.exception(f"check-host failed for '{host}'")
872 return 1, '', ('check-host failed:\n' +
873 f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
874 # if we have an outstanding health alert for this host, give the
875 # serve thread a kick
876 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
877 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
878 if item.startswith('host %s ' % host):
879 self.event.set()
880 return 0, '%s (%s) ok' % (host, addr), err
881
882 @orchestrator._cli_read_command(
883 'cephadm prepare-host',
884 'name=host,type=CephString '
885 'name=addr,type=CephString,req=false',
886 'Prepare a remote host for use with cephadm')
887 def _prepare_host(self, host, addr=None):
888 out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
889 ['--expect-hostname', host],
890 addr=addr,
891 error_ok=True, no_fsid=True)
892 if code:
893 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
894 # if we have an outstanding health alert for this host, give the
895 # serve thread a kick
896 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
897 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
898 if item.startswith('host %s ' % host):
899 self.event.set()
900 return 0, '%s (%s) ok' % (host, addr), err
901
902 @orchestrator._cli_write_command(
903 prefix='cephadm set-extra-ceph-conf',
904 desc="Text that is appended to all daemon's ceph.conf.\n"
905 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
906 "a complete ceph.conf.\n\n"
907 "Warning: this is a dangerous operation.")
908 def _set_extra_ceph_conf(self, inbuf=None) -> HandleCommandResult:
909 if inbuf:
910 # sanity check.
911 cp = ConfigParser()
912 cp.read_string(inbuf, source='<infile>')
913
914 self.set_store("extra_ceph_conf", json.dumps({
915 'conf': inbuf,
916 'last_modified': datetime_to_str(datetime.datetime.utcnow())
917 }))
918 self.log.info('Set extra_ceph_conf')
919 self._kick_serve_loop()
920 return HandleCommandResult()
921
922 @orchestrator._cli_read_command(
923 'cephadm get-extra-ceph-conf',
924 desc='Get extra ceph conf that is appended')
925 def _get_extra_ceph_conf(self) -> HandleCommandResult:
926 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
927
928 class ExtraCephConf(NamedTuple):
929 conf: str
930 last_modified: Optional[datetime.datetime]
931
932 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
933 data = self.get_store('extra_ceph_conf')
934 if not data:
935 return CephadmOrchestrator.ExtraCephConf('', None)
936 try:
937 j = json.loads(data)
938 except ValueError:
939 self.log.exception('unable to laod extra_ceph_conf')
940 return CephadmOrchestrator.ExtraCephConf('', None)
941 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
942
943 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
944 conf = self.extra_ceph_conf()
945 if not conf.last_modified:
946 return False
947 return conf.last_modified > dt
948
949 def _get_connection(self, host: str):
950 """
951 Setup a connection for running commands on remote host.
952 """
953 conn, r = self._cons.get(host, (None, None))
954 if conn:
955 if conn.has_connection():
956 self.log.debug('Have connection to %s' % host)
957 return conn, r
958 else:
959 self._reset_con(host)
960 n = self.ssh_user + '@' + host
961 self.log.debug("Opening connection to {} with ssh options '{}'".format(
962 n, self._ssh_options))
963 child_logger = self.log.getChild(n)
964 child_logger.setLevel('WARNING')
965 conn = remoto.Connection(
966 n,
967 logger=child_logger,
968 ssh_options=self._ssh_options,
969 sudo=True if self.ssh_user != 'root' else False)
970
971 r = conn.import_module(remotes)
972 self._cons[host] = conn, r
973
974 return conn, r
975
976 def _executable_path(self, conn, executable):
977 """
978 Remote validator that accepts a connection object to ensure that a certain
979 executable is available returning its full path if so.
980
981 Otherwise an exception with thorough details will be raised, informing the
982 user that the executable was not found.
983 """
984 executable_path = conn.remote_module.which(executable)
985 if not executable_path:
986 raise RuntimeError("Executable '{}' not found on host '{}'".format(
987 executable, conn.hostname))
988 self.log.debug("Found executable '{}' at path '{}'".format(executable,
989 executable_path))
990 return executable_path
991
992 @contextmanager
993 def _remote_connection(self,
994 host: str,
995 addr: Optional[str] = None,
996 ) -> Iterator[Tuple["BaseConnection", Any]]:
997 if not addr and host in self.inventory:
998 addr = self.inventory.get_addr(host)
999
1000 self.offline_hosts_remove(host)
1001
1002 try:
1003 try:
1004 if not addr:
1005 raise OrchestratorError("host address is empty")
1006 conn, connr = self._get_connection(addr)
1007 except OSError as e:
1008 self._reset_con(host)
1009 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1010 raise execnet.gateway_bootstrap.HostNotFound(msg)
1011
1012 yield (conn, connr)
1013
1014 except execnet.gateway_bootstrap.HostNotFound as e:
1015 # this is a misleading exception as it seems to be thrown for
1016 # any sort of connection failure, even those having nothing to
1017 # do with "host not found" (e.g., ssh key permission denied).
1018 self.offline_hosts.add(host)
1019 self._reset_con(host)
1020
1021 user = self.ssh_user if self.mode == 'root' else 'cephadm'
1022 msg = f'''Failed to connect to {host} ({addr}).
1023 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1024
1025 To add the cephadm SSH key to the host:
1026 > ceph cephadm get-pub-key > ~/ceph.pub
1027 > ssh-copy-id -f -i ~/ceph.pub {user}@{host}
1028
1029 To check that the host is reachable:
1030 > ceph cephadm get-ssh-config > ssh_config
1031 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1032 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
1033 raise OrchestratorError(msg) from e
1034 except Exception as ex:
1035 self.log.exception(ex)
1036 raise
1037
1038 def _get_container_image(self, daemon_name: str) -> Optional[str]:
1039 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1040 if daemon_type in CEPH_TYPES or \
1041 daemon_type == 'nfs' or \
1042 daemon_type == 'iscsi':
1043 # get container image
1044 ret, image, err = self.check_mon_command({
1045 'prefix': 'config get',
1046 'who': utils.name_to_config_section(daemon_name),
1047 'key': 'container_image',
1048 })
1049 image = image.strip() # type: ignore
1050 elif daemon_type == 'prometheus':
1051 image = self.container_image_prometheus
1052 elif daemon_type == 'grafana':
1053 image = self.container_image_grafana
1054 elif daemon_type == 'alertmanager':
1055 image = self.container_image_alertmanager
1056 elif daemon_type == 'node-exporter':
1057 image = self.container_image_node_exporter
1058 elif daemon_type == CustomContainerService.TYPE:
1059 # The image can't be resolved, the necessary information
1060 # is only available when a container is deployed (given
1061 # via spec).
1062 image = None
1063 else:
1064 assert False, daemon_type
1065
1066 self.log.debug('%s container image %s' % (daemon_name, image))
1067
1068 return image
1069
1070 def _run_cephadm(self,
1071 host: str,
1072 entity: Union[CephadmNoImage, str],
1073 command: str,
1074 args: List[str],
1075 addr: Optional[str] = "",
1076 stdin: Optional[str] = "",
1077 no_fsid: Optional[bool] = False,
1078 error_ok: Optional[bool] = False,
1079 image: Optional[str] = "",
1080 env_vars: Optional[List[str]] = None,
1081 ) -> Tuple[List[str], List[str], int]:
1082 """
1083 Run cephadm on the remote host with the given command + args
1084
1085 :env_vars: in format -> [KEY=VALUE, ..]
1086 """
1087 with self._remote_connection(host, addr) as tpl:
1088 conn, connr = tpl
1089 assert image or entity
1090 if not image and entity is not cephadmNoImage:
1091 image = self._get_container_image(entity)
1092
1093 final_args = []
1094
1095 if env_vars:
1096 for env_var_pair in env_vars:
1097 final_args.extend(['--env', env_var_pair])
1098
1099 if image:
1100 final_args.extend(['--image', image])
1101 final_args.append(command)
1102
1103 if not no_fsid:
1104 final_args += ['--fsid', self._cluster_fsid]
1105
1106 if self.container_init:
1107 final_args += ['--container-init']
1108
1109 final_args += args
1110
1111 self.log.debug('args: %s' % (' '.join(final_args)))
1112 if self.mode == 'root':
1113 if stdin:
1114 self.log.debug('stdin: %s' % stdin)
1115 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1116 if stdin:
1117 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1118 script += self._cephadm
1119 python = connr.choose_python()
1120 if not python:
1121 raise RuntimeError(
1122 'unable to find python on %s (tried %s in %s)' % (
1123 host, remotes.PYTHONS, remotes.PATH))
1124 try:
1125 out, err, code = remoto.process.check(
1126 conn,
1127 [python, '-u'],
1128 stdin=script.encode('utf-8'))
1129 except RuntimeError as e:
1130 self._reset_con(host)
1131 if error_ok:
1132 return [], [str(e)], 1
1133 raise
1134 elif self.mode == 'cephadm-package':
1135 try:
1136 out, err, code = remoto.process.check(
1137 conn,
1138 ['sudo', '/usr/bin/cephadm'] + final_args,
1139 stdin=stdin)
1140 except RuntimeError as e:
1141 self._reset_con(host)
1142 if error_ok:
1143 return [], [str(e)], 1
1144 raise
1145 else:
1146 assert False, 'unsupported mode'
1147
1148 self.log.debug('code: %d' % code)
1149 if out:
1150 self.log.debug('out: %s' % '\n'.join(out))
1151 if err:
1152 self.log.debug('err: %s' % '\n'.join(err))
1153 if code and not error_ok:
1154 raise OrchestratorError(
1155 'cephadm exited with an error code: %d, stderr:%s' % (
1156 code, '\n'.join(err)))
1157 return out, err, code
1158
1159 def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
1160 """
1161 Returns all hosts that went through _refresh_host_daemons().
1162
1163 This mitigates a potential race, where new host was added *after*
1164 ``_refresh_host_daemons()`` was called, but *before*
1165 ``_apply_all_specs()`` was called. thus we end up with a hosts
1166 where daemons might be running, but we have not yet detected them.
1167 """
1168 return [
1169 h for h in self.inventory.all_specs()
1170 if self.cache.host_had_daemon_refresh(h.hostname)
1171 ]
1172
1173 def _add_host(self, spec):
1174 # type: (HostSpec) -> str
1175 """
1176 Add a host to be managed by the orchestrator.
1177
1178 :param host: host name
1179 """
1180 assert_valid_host(spec.hostname)
1181 out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
1182 ['--expect-hostname', spec.hostname],
1183 addr=spec.addr,
1184 error_ok=True, no_fsid=True)
1185 if code:
1186 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1187 spec.hostname, spec.addr, err))
1188
1189 self.inventory.add_host(spec)
1190 self.cache.prime_empty_host(spec.hostname)
1191 self.offline_hosts_remove(spec.hostname)
1192 self.event.set() # refresh stray health check
1193 self.log.info('Added host %s' % spec.hostname)
1194 return "Added host '{}'".format(spec.hostname)
1195
1196 @trivial_completion
1197 def add_host(self, spec: HostSpec) -> str:
1198 return self._add_host(spec)
1199
1200 @trivial_completion
1201 def remove_host(self, host):
1202 # type: (str) -> str
1203 """
1204 Remove a host from orchestrator management.
1205
1206 :param host: host name
1207 """
1208 self.inventory.rm_host(host)
1209 self.cache.rm_host(host)
1210 self._reset_con(host)
1211 self.event.set() # refresh stray health check
1212 self.log.info('Removed host %s' % host)
1213 return "Removed host '{}'".format(host)
1214
1215 @trivial_completion
1216 def update_host_addr(self, host, addr) -> str:
1217 self.inventory.set_addr(host, addr)
1218 self._reset_con(host)
1219 self.event.set() # refresh stray health check
1220 self.log.info('Set host %s addr to %s' % (host, addr))
1221 return "Updated host '{}' addr to '{}'".format(host, addr)
1222
1223 @trivial_completion
1224 def get_hosts(self):
1225 # type: () -> List[orchestrator.HostSpec]
1226 """
1227 Return a list of hosts managed by the orchestrator.
1228
1229 Notes:
1230 - skip async: manager reads from cache.
1231 """
1232 return list(self.inventory.all_specs())
1233
1234 @trivial_completion
1235 def add_host_label(self, host, label) -> str:
1236 self.inventory.add_label(host, label)
1237 self.log.info('Added label %s to host %s' % (label, host))
1238 return 'Added label %s to host %s' % (label, host)
1239
1240 @trivial_completion
1241 def remove_host_label(self, host, label) -> str:
1242 self.inventory.rm_label(host, label)
1243 self.log.info('Removed label %s to host %s' % (label, host))
1244 return 'Removed label %s from host %s' % (label, host)
1245
1246 @trivial_completion
1247 def host_ok_to_stop(self, hostname: str):
1248 if hostname not in self.cache.get_hosts():
1249 raise OrchestratorError(f'Cannot find host "{hostname}"')
1250
1251 daemons = self.cache.get_daemons()
1252 daemon_map = defaultdict(lambda: [])
1253 for dd in daemons:
1254 if dd.hostname == hostname:
1255 daemon_map[dd.daemon_type].append(dd.daemon_id)
1256
1257 for daemon_type, daemon_ids in daemon_map.items():
1258 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
1259 if r.retval:
1260 self.log.error(f'It is NOT safe to stop host {hostname}')
1261 raise orchestrator.OrchestratorError(
1262 r.stderr,
1263 errno=r.retval)
1264
1265 msg = f'It is presumed safe to stop host {hostname}'
1266 self.log.info(msg)
1267 return msg
1268
1269 def get_minimal_ceph_conf(self) -> str:
1270 _, config, _ = self.check_mon_command({
1271 "prefix": "config generate-minimal-conf",
1272 })
1273 extra = self.extra_ceph_conf().conf
1274 if extra:
1275 config += '\n\n' + extra.strip() + '\n'
1276 return config
1277
1278 def _invalidate_daemons_and_kick_serve(self, filter_host=None):
1279 if filter_host:
1280 self.cache.invalidate_host_daemons(filter_host)
1281 else:
1282 for h in self.cache.get_hosts():
1283 # Also discover daemons deployed manually
1284 self.cache.invalidate_host_daemons(h)
1285
1286 self._kick_serve_loop()
1287
1288 @trivial_completion
1289 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1290 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
1291 if refresh:
1292 self._invalidate_daemons_and_kick_serve()
1293 self.log.info('Kicked serve() loop to refresh all services')
1294
1295 # <service_map>
1296 sm: Dict[str, orchestrator.ServiceDescription] = {}
1297 osd_count = 0
1298 for h, dm in self.cache.get_daemons_with_volatile_status():
1299 for name, dd in dm.items():
1300 if service_type and service_type != dd.daemon_type:
1301 continue
1302 n: str = dd.service_name()
1303 if service_name and service_name != n:
1304 continue
1305 if dd.daemon_type == 'osd':
1306 """
1307 OSDs do not know the affinity to their spec out of the box.
1308 """
1309 n = f"osd.{dd.osdspec_affinity}"
1310 if not dd.osdspec_affinity:
1311 # If there is no osdspec_affinity, the spec should suffice for displaying
1312 continue
1313 if n in self.spec_store.specs:
1314 spec = self.spec_store.specs[n]
1315 else:
1316 spec = ServiceSpec(
1317 unmanaged=True,
1318 service_type=dd.daemon_type,
1319 service_id=dd.service_id(),
1320 placement=PlacementSpec(
1321 hosts=[dd.hostname]
1322 )
1323 )
1324 if n not in sm:
1325 sm[n] = orchestrator.ServiceDescription(
1326 last_refresh=dd.last_refresh,
1327 container_image_id=dd.container_image_id,
1328 container_image_name=dd.container_image_name,
1329 spec=spec,
1330 events=self.events.get_for_service(spec.service_name()),
1331 )
1332 if n in self.spec_store.specs:
1333 if dd.daemon_type == 'osd':
1334 """
1335 The osd count can't be determined by the Placement spec.
1336 Showing an actual/expected representation cannot be determined
1337 here. So we're setting running = size for now.
1338 """
1339 osd_count += 1
1340 sm[n].size = osd_count
1341 else:
1342 sm[n].size = spec.placement.get_host_selection_size(
1343 self.inventory.all_specs())
1344
1345 sm[n].created = self.spec_store.spec_created[n]
1346 if service_type == 'nfs':
1347 spec = cast(NFSServiceSpec, spec)
1348 sm[n].rados_config_location = spec.rados_config_location()
1349 else:
1350 sm[n].size = 0
1351 if dd.status == 1:
1352 sm[n].running += 1
1353 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1354 sm[n].last_refresh = dd.last_refresh
1355 if sm[n].container_image_id != dd.container_image_id:
1356 sm[n].container_image_id = 'mix'
1357 if sm[n].container_image_name != dd.container_image_name:
1358 sm[n].container_image_name = 'mix'
1359 for n, spec in self.spec_store.specs.items():
1360 if n in sm:
1361 continue
1362 if service_type is not None and service_type != spec.service_type:
1363 continue
1364 if service_name is not None and service_name != n:
1365 continue
1366 sm[n] = orchestrator.ServiceDescription(
1367 spec=spec,
1368 size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
1369 running=0,
1370 events=self.events.get_for_service(spec.service_name()),
1371 )
1372 if service_type == 'nfs':
1373 spec = cast(NFSServiceSpec, spec)
1374 sm[n].rados_config_location = spec.rados_config_location()
1375 return list(sm.values())
1376
1377 @trivial_completion
1378 def list_daemons(self,
1379 service_name: Optional[str] = None,
1380 daemon_type: Optional[str] = None,
1381 daemon_id: Optional[str] = None,
1382 host: Optional[str] = None,
1383 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
1384 if refresh:
1385 self._invalidate_daemons_and_kick_serve(host)
1386 self.log.info('Kicked serve() loop to refresh all daemons')
1387
1388 result = []
1389 for h, dm in self.cache.get_daemons_with_volatile_status():
1390 if host and h != host:
1391 continue
1392 for name, dd in dm.items():
1393 if daemon_type is not None and daemon_type != dd.daemon_type:
1394 continue
1395 if daemon_id is not None and daemon_id != dd.daemon_id:
1396 continue
1397 if service_name is not None and service_name != dd.service_name():
1398 continue
1399 result.append(dd)
1400 return result
1401
1402 @trivial_completion
1403 def service_action(self, action, service_name) -> List[str]:
1404 args = []
1405 for host, dm in self.cache.daemons.items():
1406 for name, d in dm.items():
1407 if d.matches_service(service_name):
1408 args.append((d.daemon_type, d.daemon_id,
1409 d.hostname, action))
1410 self.log.info('%s service %s' % (action.capitalize(), service_name))
1411 return self._daemon_actions(args)
1412
1413 @forall_hosts
1414 def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
1415 with set_exception_subject('daemon', DaemonDescription(
1416 daemon_type=daemon_type,
1417 daemon_id=daemon_id
1418 ).name()):
1419 return self._daemon_action(daemon_type, daemon_id, host, action)
1420
1421 def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str:
1422 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
1423 host=host,
1424 daemon_id=daemon_id,
1425 daemon_type=daemon_type,
1426 )
1427
1428 self._daemon_action_set_image(action, image, daemon_type, daemon_id)
1429
1430 if action == 'redeploy':
1431 if self.daemon_is_self(daemon_type, daemon_id):
1432 self.mgr_service.fail_over()
1433 return '' # unreachable
1434 # stop, recreate the container+unit, then restart
1435 return self._create_daemon(daemon_spec)
1436 elif action == 'reconfig':
1437 return self._create_daemon(daemon_spec, reconfig=True)
1438
1439 actions = {
1440 'start': ['reset-failed', 'start'],
1441 'stop': ['stop'],
1442 'restart': ['reset-failed', 'restart'],
1443 }
1444 name = daemon_spec.name()
1445 for a in actions[action]:
1446 try:
1447 out, err, code = self._run_cephadm(
1448 host, name, 'unit',
1449 ['--name', name, a])
1450 except Exception:
1451 self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
1452 self.cache.invalidate_host_daemons(daemon_spec.host)
1453 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1454 self.events.for_daemon(name, 'INFO', msg)
1455 return msg
1456
1457 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str):
1458 if image is not None:
1459 if action != 'redeploy':
1460 raise OrchestratorError(
1461 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1462 if daemon_type not in CEPH_TYPES:
1463 raise OrchestratorError(
1464 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1465 f'types are: {", ".join(CEPH_TYPES)}')
1466
1467 self.check_mon_command({
1468 'prefix': 'config set',
1469 'name': 'container_image',
1470 'value': image,
1471 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1472 })
1473
1474 @trivial_completion
1475 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
1476 d = self.cache.get_daemon(daemon_name)
1477
1478 if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
1479 and not self.mgr_service.mgr_map_has_standby():
1480 raise OrchestratorError(
1481 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1482
1483 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
1484
1485 self.log.info(f'Schedule {action} daemon {daemon_name}')
1486 return self._schedule_daemon_action(daemon_name, action)
1487
1488 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
1489 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
1490
1491 def _schedule_daemon_action(self, daemon_name: str, action: str):
1492 dd = self.cache.get_daemon(daemon_name)
1493 if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
1494 and not self.mgr_service.mgr_map_has_standby():
1495 raise OrchestratorError(
1496 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1497 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
1498 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
1499 self._kick_serve_loop()
1500 return msg
1501
1502 @trivial_completion
1503 def remove_daemons(self, names):
1504 # type: (List[str]) -> List[str]
1505 args = []
1506 for host, dm in self.cache.daemons.items():
1507 for name in names:
1508 if name in dm:
1509 args.append((name, host))
1510 if not args:
1511 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1512 self.log.info('Remove daemons %s' % [a[0] for a in args])
1513 return self._remove_daemons(args)
1514
1515 @trivial_completion
1516 def remove_service(self, service_name) -> str:
1517 self.log.info('Remove service %s' % service_name)
1518 self._trigger_preview_refresh(service_name=service_name)
1519 found = self.spec_store.rm(service_name)
1520 if found:
1521 self._kick_serve_loop()
1522 return 'Removed service %s' % service_name
1523 else:
1524 # must be idempotent: still a success.
1525 return f'Failed to remove service. <{service_name}> was not found.'
1526
1527 @trivial_completion
1528 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
1529 """
1530 Return the storage inventory of hosts matching the given filter.
1531
1532 :param host_filter: host filter
1533
1534 TODO:
1535 - add filtering by label
1536 """
1537 if refresh:
1538 if host_filter and host_filter.hosts:
1539 for h in host_filter.hosts:
1540 self.cache.invalidate_host_devices(h)
1541 else:
1542 for h in self.cache.get_hosts():
1543 self.cache.invalidate_host_devices(h)
1544
1545 self.event.set()
1546 self.log.info('Kicked serve() loop to refresh devices')
1547
1548 result = []
1549 for host, dls in self.cache.devices.items():
1550 if host_filter and host_filter.hosts and host not in host_filter.hosts:
1551 continue
1552 result.append(orchestrator.InventoryHost(host,
1553 inventory.Devices(dls)))
1554 return result
1555
1556 @trivial_completion
1557 def zap_device(self, host, path) -> str:
1558 self.log.info('Zap device %s:%s' % (host, path))
1559 out, err, code = self._run_cephadm(
1560 host, 'osd', 'ceph-volume',
1561 ['--', 'lvm', 'zap', '--destroy', path],
1562 error_ok=True)
1563 self.cache.invalidate_host_devices(host)
1564 if code:
1565 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1566 return '\n'.join(out + err)
1567
1568 @trivial_completion
1569 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
1570 """
1571 Blink a device light. Calling something like::
1572
1573 lsmcli local-disk-ident-led-on --path $path
1574
1575 If you must, you can customize this via::
1576
1577 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1578 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1579
1580 See templates/blink_device_light_cmd.j2
1581 """
1582 @forall_hosts
1583 def blink(host, dev, path):
1584 cmd_line = self.template.render('blink_device_light_cmd.j2',
1585 {
1586 'on': on,
1587 'ident_fault': ident_fault,
1588 'dev': dev,
1589 'path': path
1590 },
1591 host=host)
1592 cmd_args = shlex.split(cmd_line)
1593
1594 out, err, code = self._run_cephadm(
1595 host, 'osd', 'shell', ['--'] + cmd_args,
1596 error_ok=True)
1597 if code:
1598 raise OrchestratorError(
1599 'Unable to affect %s light for %s:%s. Command: %s' % (
1600 ident_fault, host, dev, ' '.join(cmd_args)))
1601 self.log.info('Set %s light for %s:%s %s' % (
1602 ident_fault, host, dev, 'on' if on else 'off'))
1603 return "Set %s light for %s:%s %s" % (
1604 ident_fault, host, dev, 'on' if on else 'off')
1605
1606 return blink(locs)
1607
1608 def get_osd_uuid_map(self, only_up=False):
1609 # type: (bool) -> Dict[str, str]
1610 osd_map = self.get('osd_map')
1611 r = {}
1612 for o in osd_map['osds']:
1613 # only include OSDs that have ever started in this map. this way
1614 # an interrupted osd create can be repeated and succeed the second
1615 # time around.
1616 osd_id = o.get('osd')
1617 if osd_id is None:
1618 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1619 if not only_up or (o['up_from'] > 0):
1620 r[str(osd_id)] = o.get('uuid', '')
1621 return r
1622
1623 def _trigger_preview_refresh(self,
1624 specs: Optional[List[DriveGroupSpec]] = None,
1625 service_name: Optional[str] = None,
1626 ) -> None:
1627 # Only trigger a refresh when a spec has changed
1628 trigger_specs = []
1629 if specs:
1630 for spec in specs:
1631 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
1632 # the to-be-preview spec != the actual spec, this means we need to
1633 # trigger a refresh, if the spec has been removed (==None) we need to
1634 # refresh as well.
1635 if not preview_spec or spec != preview_spec:
1636 trigger_specs.append(spec)
1637 if service_name:
1638 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
1639 if not any(trigger_specs):
1640 return None
1641
1642 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
1643 for host in refresh_hosts:
1644 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1645 self.cache.osdspec_previews_refresh_queue.append(host)
1646
1647 @trivial_completion
1648 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
1649 """
1650 Deprecated. Please use `apply()` instead.
1651
1652 Keeping this around to be compapatible to mgr/dashboard
1653 """
1654 return [self._apply(spec) for spec in specs]
1655
1656 @trivial_completion
1657 def create_osds(self, drive_group: DriveGroupSpec) -> str:
1658 return self.osd_service.create_from_spec(drive_group)
1659
1660 def _preview_osdspecs(self,
1661 osdspecs: Optional[List[DriveGroupSpec]] = None
1662 ):
1663 if not osdspecs:
1664 return {'n/a': [{'error': True,
1665 'message': 'No OSDSpec or matching hosts found.'}]}
1666 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
1667 if not matching_hosts:
1668 return {'n/a': [{'error': True,
1669 'message': 'No OSDSpec or matching hosts found.'}]}
1670 # Is any host still loading previews
1671 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
1672 if pending_hosts:
1673 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1674 return {'n/a': [{'error': True,
1675 'message': 'Preview data is being generated.. '
1676 'Please re-run this command in a bit.'}]}
1677 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1678 previews_for_specs = {}
1679 for host, raw_reports in self.cache.osdspec_previews.items():
1680 if host not in matching_hosts:
1681 continue
1682 osd_reports = []
1683 for osd_report in raw_reports:
1684 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
1685 osd_reports.append(osd_report)
1686 previews_for_specs.update({host: osd_reports})
1687 return previews_for_specs
1688
1689 def _calc_daemon_deps(self, daemon_type, daemon_id):
1690 need = {
1691 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1692 'grafana': ['prometheus'],
1693 'alertmanager': ['mgr', 'alertmanager'],
1694 }
1695 deps = []
1696 for dep_type in need.get(daemon_type, []):
1697 for dd in self.cache.get_daemons_by_service(dep_type):
1698 deps.append(dd.name())
1699 return sorted(deps)
1700
1701 def _create_daemon(self,
1702 daemon_spec: CephadmDaemonSpec,
1703 reconfig=False,
1704 osd_uuid_map: Optional[Dict[str, Any]] = None,
1705 ) -> str:
1706
1707 with set_exception_subject('service', orchestrator.DaemonDescription(
1708 daemon_type=daemon_spec.daemon_type,
1709 daemon_id=daemon_spec.daemon_id,
1710 hostname=daemon_spec.host,
1711 ).service_id(), overwrite=True):
1712
1713 image = ''
1714 start_time = datetime.datetime.utcnow()
1715 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1716
1717 if daemon_spec.daemon_type == 'container':
1718 spec: Optional[CustomContainerSpec] = daemon_spec.spec
1719 if spec is None:
1720 # Exit here immediately because the required service
1721 # spec to create a daemon is not provided. This is only
1722 # provided when a service is applied via 'orch apply'
1723 # command.
1724 msg = "Failed to {} daemon {} on {}: Required " \
1725 "service specification not provided".format(
1726 'reconfigure' if reconfig else 'deploy',
1727 daemon_spec.name(), daemon_spec.host)
1728 self.log.info(msg)
1729 return msg
1730 image = spec.image
1731 if spec.ports:
1732 ports.extend(spec.ports)
1733
1734 cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
1735 daemon_spec)
1736
1737 # TCP port to open in the host firewall
1738 if len(ports) > 0:
1739 daemon_spec.extra_args.extend([
1740 '--tcp-ports', ' '.join(map(str, ports))
1741 ])
1742
1743 # osd deployments needs an --osd-uuid arg
1744 if daemon_spec.daemon_type == 'osd':
1745 if not osd_uuid_map:
1746 osd_uuid_map = self.get_osd_uuid_map()
1747 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1748 if not osd_uuid:
1749 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1750 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1751
1752 if reconfig:
1753 daemon_spec.extra_args.append('--reconfig')
1754 if self.allow_ptrace:
1755 daemon_spec.extra_args.append('--allow-ptrace')
1756
1757 if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
1758 self._registry_login(daemon_spec.host, self.registry_url,
1759 self.registry_username, self.registry_password)
1760
1761 daemon_spec.extra_args.extend(['--config-json', '-'])
1762
1763 self.log.info('%s daemon %s on %s' % (
1764 'Reconfiguring' if reconfig else 'Deploying',
1765 daemon_spec.name(), daemon_spec.host))
1766
1767 out, err, code = self._run_cephadm(
1768 daemon_spec.host, daemon_spec.name(), 'deploy',
1769 [
1770 '--name', daemon_spec.name(),
1771 ] + daemon_spec.extra_args,
1772 stdin=json.dumps(cephadm_config),
1773 image=image)
1774 if not code and daemon_spec.host in self.cache.daemons:
1775 # prime cached service state with what we (should have)
1776 # just created
1777 sd = orchestrator.DaemonDescription()
1778 sd.daemon_type = daemon_spec.daemon_type
1779 sd.daemon_id = daemon_spec.daemon_id
1780 sd.hostname = daemon_spec.host
1781 sd.status = 1
1782 sd.status_desc = 'starting'
1783 self.cache.add_daemon(daemon_spec.host, sd)
1784 if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1785 self.requires_post_actions.add(daemon_spec.daemon_type)
1786 self.cache.invalidate_host_daemons(daemon_spec.host)
1787 self.cache.update_daemon_config_deps(
1788 daemon_spec.host, daemon_spec.name(), deps, start_time)
1789 self.cache.save_host(daemon_spec.host)
1790 msg = "{} {} on host '{}'".format(
1791 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1792 if not code:
1793 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1794 else:
1795 what = 'reconfigure' if reconfig else 'deploy'
1796 self.events.for_daemon(
1797 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1798 return msg
1799
1800 @forall_hosts
1801 def _remove_daemons(self, name, host) -> str:
1802 return self._remove_daemon(name, host)
1803
1804 def _remove_daemon(self, name, host) -> str:
1805 """
1806 Remove a daemon
1807 """
1808 (daemon_type, daemon_id) = name.split('.', 1)
1809 daemon = orchestrator.DaemonDescription(
1810 daemon_type=daemon_type,
1811 daemon_id=daemon_id,
1812 hostname=host)
1813
1814 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1815
1816 self.cephadm_services[daemon_type].pre_remove(daemon)
1817
1818 args = ['--name', name, '--force']
1819 self.log.info('Removing daemon %s from %s' % (name, host))
1820 out, err, code = self._run_cephadm(
1821 host, name, 'rm-daemon', args)
1822 if not code:
1823 # remove item from cache
1824 self.cache.rm_daemon(host, name)
1825 self.cache.invalidate_host_daemons(host)
1826
1827 self.cephadm_services[daemon_type].post_remove(daemon)
1828
1829 return "Removed {} from host '{}'".format(name, host)
1830
1831 def _check_pool_exists(self, pool, service_name):
1832 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
1833 if not self.rados.pool_exists(pool):
1834 raise OrchestratorError(f'Cannot find pool "{pool}" for '
1835 f'service {service_name}')
1836
1837 def _add_daemon(self, daemon_type, spec,
1838 create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
1839 """
1840 Add (and place) a daemon. Require explicit host placement. Do not
1841 schedule, and do not apply the related scheduling limitations.
1842 """
1843 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
1844 if not spec.placement.hosts:
1845 raise OrchestratorError('must specify host(s) to deploy on')
1846 count = spec.placement.count or len(spec.placement.hosts)
1847 daemons = self.cache.get_daemons_by_service(spec.service_name())
1848 return self._create_daemons(daemon_type, spec, daemons,
1849 spec.placement.hosts, count,
1850 create_func, config_func)
1851
1852 def _create_daemons(self, daemon_type, spec, daemons,
1853 hosts, count,
1854 create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
1855 if count > len(hosts):
1856 raise OrchestratorError('too few hosts: want %d, have %s' % (
1857 count, hosts))
1858
1859 did_config = False
1860
1861 args = [] # type: List[CephadmDaemonSpec]
1862 for host, network, name in hosts:
1863 daemon_id = self.get_unique_name(daemon_type, host, daemons,
1864 prefix=spec.service_id,
1865 forcename=name)
1866
1867 if not did_config and config_func:
1868 if daemon_type == 'rgw':
1869 config_func(spec, daemon_id)
1870 else:
1871 config_func(spec)
1872 did_config = True
1873
1874 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
1875 host, daemon_id, network, spec)
1876 self.log.debug('Placing %s.%s on host %s' % (
1877 daemon_type, daemon_id, host))
1878 args.append(daemon_spec)
1879
1880 # add to daemon list so next name(s) will also be unique
1881 sd = orchestrator.DaemonDescription(
1882 hostname=host,
1883 daemon_type=daemon_type,
1884 daemon_id=daemon_id,
1885 )
1886 daemons.append(sd)
1887
1888 @forall_hosts
1889 def create_func_map(*args):
1890 daemon_spec = create_func(*args)
1891 return self._create_daemon(daemon_spec)
1892
1893 return create_func_map(args)
1894
1895 @trivial_completion
1896 def apply_mon(self, spec) -> str:
1897 return self._apply(spec)
1898
1899 @trivial_completion
1900 def add_mon(self, spec):
1901 # type: (ServiceSpec) -> List[str]
1902 return self._add_daemon('mon', spec, self.mon_service.prepare_create)
1903
1904 @trivial_completion
1905 def add_mgr(self, spec):
1906 # type: (ServiceSpec) -> List[str]
1907 return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
1908
1909 def _apply(self, spec: GenericSpec) -> str:
1910 if spec.service_type == 'host':
1911 return self._add_host(cast(HostSpec, spec))
1912
1913 if spec.service_type == 'osd':
1914 # _trigger preview refresh needs to be smart and
1915 # should only refresh if a change has been detected
1916 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
1917
1918 return self._apply_service_spec(cast(ServiceSpec, spec))
1919
1920 def _plan(self, spec: ServiceSpec):
1921 if spec.service_type == 'osd':
1922 return {'service_name': spec.service_name(),
1923 'service_type': spec.service_type,
1924 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
1925
1926 ha = HostAssignment(
1927 spec=spec,
1928 hosts=self._hosts_with_daemon_inventory(),
1929 get_daemons_func=self.cache.get_daemons_by_service,
1930 )
1931 ha.validate()
1932 hosts = ha.place()
1933
1934 add_daemon_hosts = ha.add_daemon_hosts(hosts)
1935 remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
1936
1937 return {
1938 'service_name': spec.service_name(),
1939 'service_type': spec.service_type,
1940 'add': [hs.hostname for hs in add_daemon_hosts],
1941 'remove': [d.hostname for d in remove_daemon_hosts]
1942 }
1943
1944 @trivial_completion
1945 def plan(self, specs: List[GenericSpec]) -> List:
1946 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1947 'to the current inventory setup. If any on these conditions changes, the \n'
1948 'preview will be invalid. Please make sure to have a minimal \n'
1949 'timeframe between planning and applying the specs.'}]
1950 if any([spec.service_type == 'host' for spec in specs]):
1951 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1952 for spec in specs:
1953 results.append(self._plan(cast(ServiceSpec, spec)))
1954 return results
1955
1956 def _apply_service_spec(self, spec: ServiceSpec) -> str:
1957 if spec.placement.is_empty():
1958 # fill in default placement
1959 defaults = {
1960 'mon': PlacementSpec(count=5),
1961 'mgr': PlacementSpec(count=2),
1962 'mds': PlacementSpec(count=2),
1963 'rgw': PlacementSpec(count=2),
1964 'iscsi': PlacementSpec(count=1),
1965 'rbd-mirror': PlacementSpec(count=2),
1966 'nfs': PlacementSpec(count=1),
1967 'grafana': PlacementSpec(count=1),
1968 'alertmanager': PlacementSpec(count=1),
1969 'prometheus': PlacementSpec(count=1),
1970 'node-exporter': PlacementSpec(host_pattern='*'),
1971 'crash': PlacementSpec(host_pattern='*'),
1972 'container': PlacementSpec(count=1),
1973 }
1974 spec.placement = defaults[spec.service_type]
1975 elif spec.service_type in ['mon', 'mgr'] and \
1976 spec.placement.count is not None and \
1977 spec.placement.count < 1:
1978 raise OrchestratorError('cannot scale %s service below 1' % (
1979 spec.service_type))
1980
1981 HostAssignment(
1982 spec=spec,
1983 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
1984 get_daemons_func=self.cache.get_daemons_by_service,
1985 ).validate()
1986
1987 self.log.info('Saving service %s spec with placement %s' % (
1988 spec.service_name(), spec.placement.pretty_str()))
1989 self.spec_store.save(spec)
1990 self._kick_serve_loop()
1991 return "Scheduled %s update..." % spec.service_name()
1992
1993 @trivial_completion
1994 def apply(self, specs: List[GenericSpec]) -> List[str]:
1995 results = []
1996 for spec in specs:
1997 results.append(self._apply(spec))
1998 return results
1999
2000 @trivial_completion
2001 def apply_mgr(self, spec) -> str:
2002 return self._apply(spec)
2003
2004 @trivial_completion
2005 def add_mds(self, spec: ServiceSpec) -> List[str]:
2006 return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config)
2007
2008 @trivial_completion
2009 def apply_mds(self, spec: ServiceSpec) -> str:
2010 return self._apply(spec)
2011
2012 @trivial_completion
2013 def add_rgw(self, spec) -> List[str]:
2014 return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
2015
2016 @trivial_completion
2017 def apply_rgw(self, spec) -> str:
2018 return self._apply(spec)
2019
2020 @trivial_completion
2021 def add_iscsi(self, spec):
2022 # type: (ServiceSpec) -> List[str]
2023 return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
2024
2025 @trivial_completion
2026 def apply_iscsi(self, spec) -> str:
2027 return self._apply(spec)
2028
2029 @trivial_completion
2030 def add_rbd_mirror(self, spec) -> List[str]:
2031 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
2032
2033 @trivial_completion
2034 def apply_rbd_mirror(self, spec) -> str:
2035 return self._apply(spec)
2036
2037 @trivial_completion
2038 def add_nfs(self, spec) -> List[str]:
2039 return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
2040
2041 @trivial_completion
2042 def apply_nfs(self, spec) -> str:
2043 return self._apply(spec)
2044
2045 def _get_dashboard_url(self):
2046 # type: () -> str
2047 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2048
2049 @trivial_completion
2050 def add_prometheus(self, spec) -> List[str]:
2051 return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
2052
2053 @trivial_completion
2054 def apply_prometheus(self, spec) -> str:
2055 return self._apply(spec)
2056
2057 @trivial_completion
2058 def add_node_exporter(self, spec):
2059 # type: (ServiceSpec) -> List[str]
2060 return self._add_daemon('node-exporter', spec,
2061 self.node_exporter_service.prepare_create)
2062
2063 @trivial_completion
2064 def apply_node_exporter(self, spec) -> str:
2065 return self._apply(spec)
2066
2067 @trivial_completion
2068 def add_crash(self, spec):
2069 # type: (ServiceSpec) -> List[str]
2070 return self._add_daemon('crash', spec,
2071 self.crash_service.prepare_create)
2072
2073 @trivial_completion
2074 def apply_crash(self, spec) -> str:
2075 return self._apply(spec)
2076
2077 @trivial_completion
2078 def add_grafana(self, spec):
2079 # type: (ServiceSpec) -> List[str]
2080 return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
2081
2082 @trivial_completion
2083 def apply_grafana(self, spec: ServiceSpec) -> str:
2084 return self._apply(spec)
2085
2086 @trivial_completion
2087 def add_alertmanager(self, spec):
2088 # type: (ServiceSpec) -> List[str]
2089 return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
2090
2091 @trivial_completion
2092 def apply_alertmanager(self, spec: ServiceSpec) -> str:
2093 return self._apply(spec)
2094
2095 @trivial_completion
2096 def add_container(self, spec: ServiceSpec) -> List[str]:
2097 return self._add_daemon('container', spec,
2098 self.container_service.prepare_create)
2099
2100 @trivial_completion
2101 def apply_container(self, spec: ServiceSpec) -> str:
2102 return self._apply(spec)
2103
2104 def _get_container_image_info(self, image_name) -> ContainerInspectInfo:
2105 # pick a random host...
2106 host = None
2107 for host_name in self.inventory.keys():
2108 host = host_name
2109 break
2110 if not host:
2111 raise OrchestratorError('no hosts defined')
2112 if self.cache.host_needs_registry_login(host) and self.registry_url:
2113 self._registry_login(host, self.registry_url,
2114 self.registry_username, self.registry_password)
2115 out, err, code = self._run_cephadm(
2116 host, '', 'pull', [],
2117 image=image_name,
2118 no_fsid=True,
2119 error_ok=True)
2120 if code:
2121 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2122 image_name, host, '\n'.join(out)))
2123 try:
2124 j = json.loads('\n'.join(out))
2125 r = ContainerInspectInfo(
2126 j['image_id'],
2127 j.get('ceph_version'),
2128 j.get('repo_digest')
2129 )
2130 self.log.debug(f'image {image_name} -> {r}')
2131 return r
2132 except (ValueError, KeyError) as _:
2133 msg = 'Failed to pull %s on %s: %s' % (image_name, host, '\n'.join(out))
2134 self.log.exception(msg)
2135 raise OrchestratorError(msg)
2136
2137 @trivial_completion
2138 def upgrade_check(self, image, version) -> str:
2139 if version:
2140 target_name = self.container_image_base + ':v' + version
2141 elif image:
2142 target_name = image
2143 else:
2144 raise OrchestratorError('must specify either image or version')
2145
2146 image_info = self._get_container_image_info(target_name)
2147 self.log.debug(f'image info {image} -> {image_info}')
2148 r = {
2149 'target_name': target_name,
2150 'target_id': image_info.image_id,
2151 'target_version': image_info.ceph_version,
2152 'needs_update': dict(),
2153 'up_to_date': list(),
2154 }
2155 for host, dm in self.cache.daemons.items():
2156 for name, dd in dm.items():
2157 if image_info.image_id == dd.container_image_id:
2158 r['up_to_date'].append(dd.name())
2159 else:
2160 r['needs_update'][dd.name()] = {
2161 'current_name': dd.container_image_name,
2162 'current_id': dd.container_image_id,
2163 'current_version': dd.version,
2164 }
2165 if self.use_repo_digest:
2166 r['target_digest'] = image_info.repo_digest
2167
2168 return json.dumps(r, indent=4, sort_keys=True)
2169
2170 @trivial_completion
2171 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
2172 return self.upgrade.upgrade_status()
2173
2174 @trivial_completion
2175 def upgrade_start(self, image, version) -> str:
2176 return self.upgrade.upgrade_start(image, version)
2177
2178 @trivial_completion
2179 def upgrade_pause(self) -> str:
2180 return self.upgrade.upgrade_pause()
2181
2182 @trivial_completion
2183 def upgrade_resume(self) -> str:
2184 return self.upgrade.upgrade_resume()
2185
2186 @trivial_completion
2187 def upgrade_stop(self) -> str:
2188 return self.upgrade.upgrade_stop()
2189
2190 @trivial_completion
2191 def remove_osds(self, osd_ids: List[str],
2192 replace: bool = False,
2193 force: bool = False) -> str:
2194 """
2195 Takes a list of OSDs and schedules them for removal.
2196 The function that takes care of the actual removal is
2197 process_removal_queue().
2198 """
2199
2200 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2201 to_remove_daemons = list()
2202 for daemon in daemons:
2203 if daemon.daemon_id in osd_ids:
2204 to_remove_daemons.append(daemon)
2205 if not to_remove_daemons:
2206 return f"Unable to find OSDs: {osd_ids}"
2207
2208 for daemon in to_remove_daemons:
2209 try:
2210 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2211 replace=replace,
2212 force=force,
2213 hostname=daemon.hostname,
2214 fullname=daemon.name(),
2215 process_started_at=datetime.datetime.utcnow(),
2216 remove_util=self.rm_util))
2217 except NotFoundError:
2218 return f"Unable to find OSDs: {osd_ids}"
2219
2220 # trigger the serve loop to initiate the removal
2221 self._kick_serve_loop()
2222 return "Scheduled OSD(s) for removal"
2223
2224 @trivial_completion
2225 def stop_remove_osds(self, osd_ids: List[str]):
2226 """
2227 Stops a `removal` process for a List of OSDs.
2228 This will revert their weight and remove it from the osds_to_remove queue
2229 """
2230 for osd_id in osd_ids:
2231 try:
2232 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
2233 remove_util=self.rm_util))
2234 except (NotFoundError, KeyError):
2235 return f'Unable to find OSD in the queue: {osd_id}'
2236
2237 # trigger the serve loop to halt the removal
2238 self._kick_serve_loop()
2239 return "Stopped OSD(s) removal"
2240
2241 @trivial_completion
2242 def remove_osds_status(self):
2243 """
2244 The CLI call to retrieve an osd removal report
2245 """
2246 return self.to_remove_osds.all_osds()