]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 from collections import defaultdict
5 from functools import wraps
6 from tempfile import TemporaryDirectory
7 from threading import Event
8
9 import string
10 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
11 Any, Set, TYPE_CHECKING, cast
12
13 import datetime
14 import six
15 import os
16 import random
17 import tempfile
18 import multiprocessing.pool
19 import shutil
20 import subprocess
21
22 from ceph.deployment import inventory
23 from ceph.deployment.drive_group import DriveGroupSpec
24 from ceph.deployment.service_spec import \
25 NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
26
27 from mgr_module import MgrModule, HandleCommandResult
28 import orchestrator
29 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
30 CLICommandMeta
31 from orchestrator._interface import GenericSpec
32
33 from . import remotes
34 from . import utils
35 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
36 RbdMirrorService, CrashService, CephadmService
37 from .services.iscsi import IscsiService
38 from .services.nfs import NFSService
39 from .services.osd import RemoveUtil, OSDRemoval, OSDService
40 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
41 NodeExporterService
42 from .schedule import HostAssignment
43 from .inventory import Inventory, SpecStore, HostCache
44 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
45 from .template import TemplateMgr
46
47 try:
48 import remoto
49 # NOTE(mattoliverau) Patch remoto until remoto PR
50 # (https://github.com/alfredodeza/remoto/pull/56) lands
51 from distutils.version import StrictVersion
52 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
53 def remoto_has_connection(self):
54 return self.gateway.hasreceiver()
55
56 from remoto.backends import BaseConnection
57 BaseConnection.has_connection = remoto_has_connection
58 import remoto.process
59 import execnet.gateway_bootstrap
60 except ImportError as e:
61 remoto = None
62 remoto_import_error = str(e)
63
64 try:
65 from typing import List
66 except ImportError:
67 pass
68
69 logger = logging.getLogger(__name__)
70
71 T = TypeVar('T')
72
73 DEFAULT_SSH_CONFIG = """
74 Host *
75 User root
76 StrictHostKeyChecking no
77 UserKnownHostsFile /dev/null
78 ConnectTimeout=30
79 """
80
81 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
82 CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
83
84 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
85
86
87 def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
88 @wraps(f)
89 def forall_hosts_wrapper(*args) -> List[T]:
90
91 # Some weired logic to make calling functions with multiple arguments work.
92 if len(args) == 1:
93 vals = args[0]
94 self = None
95 elif len(args) == 2:
96 self, vals = args
97 else:
98 assert 'either f([...]) or self.f([...])'
99
100 def do_work(arg):
101 if not isinstance(arg, tuple):
102 arg = (arg, )
103 try:
104 if self:
105 return f(self, *arg)
106 return f(*arg)
107 except Exception as e:
108 logger.exception(f'executing {f.__name__}({args}) failed.')
109 raise
110
111 assert CephadmOrchestrator.instance is not None
112 return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
113
114
115 return forall_hosts_wrapper
116
117
118 class CephadmCompletion(orchestrator.Completion):
119 def evaluate(self):
120 self.finalize(None)
121
122 def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
123 """
124 Decorator to make CephadmCompletion methods return
125 a completion object that executes themselves.
126 """
127
128 @wraps(f)
129 def wrapper(*args, **kwargs):
130 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
131
132 return wrapper
133
134
135 @six.add_metaclass(CLICommandMeta)
136 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
137
138 _STORE_HOST_PREFIX = "host"
139
140 instance = None
141 NATIVE_OPTIONS = [] # type: List[Any]
142 MODULE_OPTIONS = [
143 {
144 'name': 'ssh_config_file',
145 'type': 'str',
146 'default': None,
147 'desc': 'customized SSH config file to connect to managed hosts',
148 },
149 {
150 'name': 'device_cache_timeout',
151 'type': 'secs',
152 'default': 30 * 60,
153 'desc': 'seconds to cache device inventory',
154 },
155 {
156 'name': 'daemon_cache_timeout',
157 'type': 'secs',
158 'default': 10 * 60,
159 'desc': 'seconds to cache service (daemon) inventory',
160 },
161 {
162 'name': 'host_check_interval',
163 'type': 'secs',
164 'default': 10 * 60,
165 'desc': 'how frequently to perform a host check',
166 },
167 {
168 'name': 'mode',
169 'type': 'str',
170 'enum_allowed': ['root', 'cephadm-package'],
171 'default': 'root',
172 'desc': 'mode for remote execution of cephadm',
173 },
174 {
175 'name': 'container_image_base',
176 'default': 'docker.io/ceph/ceph',
177 'desc': 'Container image name, without the tag',
178 'runtime': True,
179 },
180 {
181 'name': 'container_image_prometheus',
182 'default': 'prom/prometheus:v2.18.1',
183 'desc': 'Prometheus container image',
184 },
185 {
186 'name': 'container_image_grafana',
187 'default': 'ceph/ceph-grafana:latest',
188 'desc': 'Prometheus container image',
189 },
190 {
191 'name': 'container_image_alertmanager',
192 'default': 'prom/alertmanager:v0.20.0',
193 'desc': 'Prometheus container image',
194 },
195 {
196 'name': 'container_image_node_exporter',
197 'default': 'prom/node-exporter:v0.18.1',
198 'desc': 'Prometheus container image',
199 },
200 {
201 'name': 'warn_on_stray_hosts',
202 'type': 'bool',
203 'default': True,
204 'desc': 'raise a health warning if daemons are detected on a host '
205 'that is not managed by cephadm',
206 },
207 {
208 'name': 'warn_on_stray_daemons',
209 'type': 'bool',
210 'default': True,
211 'desc': 'raise a health warning if daemons are detected '
212 'that are not managed by cephadm',
213 },
214 {
215 'name': 'warn_on_failed_host_check',
216 'type': 'bool',
217 'default': True,
218 'desc': 'raise a health warning if the host check fails',
219 },
220 {
221 'name': 'log_to_cluster',
222 'type': 'bool',
223 'default': True,
224 'desc': 'log to the "cephadm" cluster log channel"',
225 },
226 {
227 'name': 'allow_ptrace',
228 'type': 'bool',
229 'default': False,
230 'desc': 'allow SYS_PTRACE capability on ceph containers',
231 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
232 'process with gdb or strace. Enabling this options '
233 'can allow debugging daemons that encounter problems '
234 'at runtime.',
235 },
236 {
237 'name': 'prometheus_alerts_path',
238 'type': 'str',
239 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
240 'desc': 'location of alerts to include in prometheus deployments',
241 },
242 ]
243
244 def __init__(self, *args, **kwargs):
245 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
246 self._cluster_fsid = self.get('mon_map')['fsid']
247
248 # for serve()
249 self.run = True
250 self.event = Event()
251
252 if self.get_store('pause'):
253 self.paused = True
254 else:
255 self.paused = False
256
257 # for mypy which does not run the code
258 if TYPE_CHECKING:
259 self.ssh_config_file = None # type: Optional[str]
260 self.device_cache_timeout = 0
261 self.daemon_cache_timeout = 0
262 self.host_check_interval = 0
263 self.mode = ''
264 self.container_image_base = ''
265 self.container_image_prometheus = ''
266 self.container_image_grafana = ''
267 self.container_image_alertmanager = ''
268 self.container_image_node_exporter = ''
269 self.warn_on_stray_hosts = True
270 self.warn_on_stray_daemons = True
271 self.warn_on_failed_host_check = True
272 self.allow_ptrace = False
273 self.prometheus_alerts_path = ''
274
275 self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
276
277 self.config_notify()
278
279 path = self.get_ceph_option('cephadm_path')
280 try:
281 with open(path, 'r') as f:
282 self._cephadm = f.read()
283 except (IOError, TypeError) as e:
284 raise RuntimeError("unable to read cephadm at '%s': %s" % (
285 path, str(e)))
286
287 self._worker_pool = multiprocessing.pool.ThreadPool(10)
288
289 self._reconfig_ssh()
290
291 CephadmOrchestrator.instance = self
292
293 self.upgrade = CephadmUpgrade(self)
294
295 self.health_checks = {}
296
297 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
298
299 self.inventory = Inventory(self)
300
301 self.cache = HostCache(self)
302 self.cache.load()
303 self.rm_util = RemoveUtil(self)
304
305 self.spec_store = SpecStore(self)
306 self.spec_store.load()
307
308 # ensure the host lists are in sync
309 for h in self.inventory.keys():
310 if h not in self.cache.daemons:
311 self.cache.prime_empty_host(h)
312 for h in self.cache.get_hosts():
313 if h not in self.inventory:
314 self.cache.rm_host(h)
315
316 # in-memory only.
317 self.offline_hosts: Set[str] = set()
318
319 # services:
320 self.osd_service = OSDService(self)
321 self.nfs_service = NFSService(self)
322 self.mon_service = MonService(self)
323 self.mgr_service = MgrService(self)
324 self.mds_service = MdsService(self)
325 self.rgw_service = RgwService(self)
326 self.rbd_mirror_service = RbdMirrorService(self)
327 self.grafana_service = GrafanaService(self)
328 self.alertmanager_service = AlertmanagerService(self)
329 self.prometheus_service = PrometheusService(self)
330 self.node_exporter_service = NodeExporterService(self)
331 self.crash_service = CrashService(self)
332 self.iscsi_service = IscsiService(self)
333 self.cephadm_services = {
334 'mon': self.mon_service,
335 'mgr': self.mgr_service,
336 'osd': self.osd_service,
337 'mds': self.mds_service,
338 'rgw': self.rgw_service,
339 'rbd-mirror': self.rbd_mirror_service,
340 'nfs': self.nfs_service,
341 'grafana': self.grafana_service,
342 'alertmanager': self.alertmanager_service,
343 'prometheus': self.prometheus_service,
344 'node-exporter': self.node_exporter_service,
345 'crash': self.crash_service,
346 'iscsi': self.iscsi_service,
347 }
348
349 self.template = TemplateMgr()
350
351 def shutdown(self):
352 self.log.debug('shutdown')
353 self._worker_pool.close()
354 self._worker_pool.join()
355 self.run = False
356 self.event.set()
357
358 def _get_cephadm_service(self, service_type: str) -> CephadmService:
359 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
360 return self.cephadm_services[service_type]
361
362 def _kick_serve_loop(self):
363 self.log.debug('_kick_serve_loop')
364 self.event.set()
365
366 def _check_safe_to_destroy_mon(self, mon_id):
367 # type: (str) -> None
368 ret, out, err = self.check_mon_command({
369 'prefix': 'quorum_status',
370 })
371 try:
372 j = json.loads(out)
373 except Exception as e:
374 raise OrchestratorError('failed to parse quorum status')
375
376 mons = [m['name'] for m in j['monmap']['mons']]
377 if mon_id not in mons:
378 self.log.info('Safe to remove mon.%s: not in monmap (%s)' % (
379 mon_id, mons))
380 return
381 new_mons = [m for m in mons if m != mon_id]
382 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
383 if len(new_quorum) > len(new_mons) / 2:
384 self.log.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
385 return
386 raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
387
388 def _check_host(self, host):
389 if host not in self.inventory:
390 return
391 self.log.debug(' checking %s' % host)
392 try:
393 out, err, code = self._run_cephadm(
394 host, 'client', 'check-host', [],
395 error_ok=True, no_fsid=True)
396 self.cache.update_last_host_check(host)
397 self.cache.save_host(host)
398 if code:
399 self.log.debug(' host %s failed check' % host)
400 if self.warn_on_failed_host_check:
401 return 'host %s failed check: %s' % (host, err)
402 else:
403 self.log.debug(' host %s ok' % host)
404 except Exception as e:
405 self.log.debug(' host %s failed check' % host)
406 return 'host %s failed check: %s' % (host, e)
407
408 def _check_for_strays(self):
409 self.log.debug('_check_for_strays')
410 for k in ['CEPHADM_STRAY_HOST',
411 'CEPHADM_STRAY_DAEMON']:
412 if k in self.health_checks:
413 del self.health_checks[k]
414 if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
415 ls = self.list_servers()
416 managed = self.cache.get_daemon_names()
417 host_detail = [] # type: List[str]
418 host_num_daemons = 0
419 daemon_detail = [] # type: List[str]
420 for item in ls:
421 host = item.get('hostname')
422 daemons = item.get('services') # misnomer!
423 missing_names = []
424 for s in daemons:
425 name = '%s.%s' % (s.get('type'), s.get('id'))
426 if host not in self.inventory:
427 missing_names.append(name)
428 host_num_daemons += 1
429 if name not in managed:
430 daemon_detail.append(
431 'stray daemon %s on host %s not managed by cephadm' % (name, host))
432 if missing_names:
433 host_detail.append(
434 'stray host %s has %d stray daemons: %s' % (
435 host, len(missing_names), missing_names))
436 if self.warn_on_stray_hosts and host_detail:
437 self.health_checks['CEPHADM_STRAY_HOST'] = {
438 'severity': 'warning',
439 'summary': '%d stray host(s) with %s daemon(s) '
440 'not managed by cephadm' % (
441 len(host_detail), host_num_daemons),
442 'count': len(host_detail),
443 'detail': host_detail,
444 }
445 if self.warn_on_stray_daemons and daemon_detail:
446 self.health_checks['CEPHADM_STRAY_DAEMON'] = {
447 'severity': 'warning',
448 'summary': '%d stray daemons(s) not managed by cephadm' % (
449 len(daemon_detail)),
450 'count': len(daemon_detail),
451 'detail': daemon_detail,
452 }
453 self.set_health_checks(self.health_checks)
454
455 def _serve_sleep(self):
456 sleep_interval = 600
457 self.log.debug('Sleeping for %d seconds', sleep_interval)
458 ret = self.event.wait(sleep_interval)
459 self.event.clear()
460
461 def serve(self):
462 # type: () -> None
463 self.log.debug("serve starting")
464 while self.run:
465
466 # refresh daemons
467 self.log.debug('refreshing hosts')
468 bad_hosts = []
469 failures = []
470 for host in self.cache.get_hosts():
471 if self.cache.host_needs_check(host):
472 r = self._check_host(host)
473 if r is not None:
474 bad_hosts.append(r)
475 if self.cache.host_needs_daemon_refresh(host):
476 self.log.debug('refreshing %s daemons' % host)
477 r = self._refresh_host_daemons(host)
478 if r:
479 failures.append(r)
480 if self.cache.host_needs_device_refresh(host):
481 self.log.debug('refreshing %s devices' % host)
482 r = self._refresh_host_devices(host)
483 if r:
484 failures.append(r)
485
486 if self.cache.host_needs_osdspec_preview_refresh(host):
487 self.log.debug(f"refreshing OSDSpec previews for {host}")
488 r = self._refresh_host_osdspec_previews(host)
489 if r:
490 failures.append(r)
491
492 health_changed = False
493 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
494 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
495 health_changed = True
496 if bad_hosts:
497 self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
498 'severity': 'warning',
499 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
500 'count': len(bad_hosts),
501 'detail': bad_hosts,
502 }
503 health_changed = True
504 if failures:
505 self.health_checks['CEPHADM_REFRESH_FAILED'] = {
506 'severity': 'warning',
507 'summary': 'failed to probe daemons or devices',
508 'count': len(failures),
509 'detail': failures,
510 }
511 health_changed = True
512 elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
513 del self.health_checks['CEPHADM_REFRESH_FAILED']
514 health_changed = True
515 if health_changed:
516 self.set_health_checks(self.health_checks)
517
518 self._check_for_strays()
519
520 if self.paused:
521 self.health_checks['CEPHADM_PAUSED'] = {
522 'severity': 'warning',
523 'summary': 'cephadm background work is paused',
524 'count': 1,
525 'detail': ["'ceph orch resume' to resume"],
526 }
527 self.set_health_checks(self.health_checks)
528 else:
529 if 'CEPHADM_PAUSED' in self.health_checks:
530 del self.health_checks['CEPHADM_PAUSED']
531 self.set_health_checks(self.health_checks)
532
533 self.rm_util._remove_osds_bg()
534
535 if self._apply_all_services():
536 continue # did something, refresh
537
538 self._check_daemons()
539
540 if self.upgrade.continue_upgrade():
541 continue
542
543 self._serve_sleep()
544 self.log.debug("serve exit")
545
546 def config_notify(self):
547 """
548 This method is called whenever one of our config options is changed.
549 """
550 for opt in self.MODULE_OPTIONS:
551 setattr(self,
552 opt['name'], # type: ignore
553 self.get_module_option(opt['name'])) # type: ignore
554 self.log.debug(' mgr option %s = %s',
555 opt['name'], getattr(self, opt['name'])) # type: ignore
556 for opt in self.NATIVE_OPTIONS:
557 setattr(self,
558 opt, # type: ignore
559 self.get_ceph_option(opt))
560 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
561
562 self.event.set()
563
564 def notify(self, notify_type, notify_id):
565 pass
566
567 def pause(self):
568 if not self.paused:
569 self.log.info('Paused')
570 self.set_store('pause', 'true')
571 self.paused = True
572 # wake loop so we update the health status
573 self._kick_serve_loop()
574
575 def resume(self):
576 if self.paused:
577 self.log.info('Resumed')
578 self.paused = False
579 self.set_store('pause', None)
580 # unconditionally wake loop so that 'orch resume' can be used to kick
581 # cephadm
582 self._kick_serve_loop()
583
584 def get_unique_name(self, daemon_type, host, existing, prefix=None,
585 forcename=None):
586 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
587 """
588 Generate a unique random service name
589 """
590 suffix = daemon_type not in [
591 'mon', 'crash', 'nfs',
592 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
593 ]
594 if forcename:
595 if len([d for d in existing if d.daemon_id == forcename]):
596 raise orchestrator.OrchestratorValidationError('name %s already in use', forcename)
597 return forcename
598
599 if '.' in host:
600 host = host.split('.')[0]
601 while True:
602 if prefix:
603 name = prefix + '.'
604 else:
605 name = ''
606 name += host
607 if suffix:
608 name += '.' + ''.join(random.choice(string.ascii_lowercase)
609 for _ in range(6))
610 if len([d for d in existing if d.daemon_id == name]):
611 if not suffix:
612 raise orchestrator.OrchestratorValidationError('name %s already in use', name)
613 self.log.debug('name %s exists, trying again', name)
614 continue
615 return name
616
617 def _reconfig_ssh(self):
618 temp_files = [] # type: list
619 ssh_options = [] # type: List[str]
620
621 # ssh_config
622 ssh_config_fname = self.ssh_config_file
623 ssh_config = self.get_store("ssh_config")
624 if ssh_config is not None or ssh_config_fname is None:
625 if not ssh_config:
626 ssh_config = DEFAULT_SSH_CONFIG
627 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
628 os.fchmod(f.fileno(), 0o600)
629 f.write(ssh_config.encode('utf-8'))
630 f.flush() # make visible to other processes
631 temp_files += [f]
632 ssh_config_fname = f.name
633 if ssh_config_fname:
634 self.validate_ssh_config_fname(ssh_config_fname)
635 ssh_options += ['-F', ssh_config_fname]
636
637 # identity
638 ssh_key = self.get_store("ssh_identity_key")
639 ssh_pub = self.get_store("ssh_identity_pub")
640 self.ssh_pub = ssh_pub
641 self.ssh_key = ssh_key
642 if ssh_key and ssh_pub:
643 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
644 tkey.write(ssh_key.encode('utf-8'))
645 os.fchmod(tkey.fileno(), 0o600)
646 tkey.flush() # make visible to other processes
647 tpub = open(tkey.name + '.pub', 'w')
648 os.fchmod(tpub.fileno(), 0o600)
649 tpub.write(ssh_pub)
650 tpub.flush() # make visible to other processes
651 temp_files += [tkey, tpub]
652 ssh_options += ['-i', tkey.name]
653
654 self._temp_files = temp_files
655 if ssh_options:
656 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
657 else:
658 self._ssh_options = None
659
660 if self.mode == 'root':
661 self.ssh_user = 'root'
662 elif self.mode == 'cephadm-package':
663 self.ssh_user = 'cephadm'
664
665 self._reset_cons()
666
667 def validate_ssh_config_fname(self, ssh_config_fname):
668 if not os.path.isfile(ssh_config_fname):
669 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
670 ssh_config_fname))
671
672 def _reset_con(self, host):
673 conn, r = self._cons.get(host, (None, None))
674 if conn:
675 self.log.debug('_reset_con close %s' % host)
676 conn.exit()
677 del self._cons[host]
678
679 def _reset_cons(self):
680 for host, conn_and_r in self._cons.items():
681 self.log.debug('_reset_cons close %s' % host)
682 conn, r = conn_and_r
683 conn.exit()
684 self._cons = {}
685
686 def offline_hosts_remove(self, host):
687 if host in self.offline_hosts:
688 self.offline_hosts.remove(host)
689
690
691 @staticmethod
692 def can_run():
693 if remoto is not None:
694 return True, ""
695 else:
696 return False, "loading remoto library:{}".format(
697 remoto_import_error)
698
699 def available(self):
700 """
701 The cephadm orchestrator is always available.
702 """
703 return self.can_run()
704
705 def process(self, completions):
706 """
707 Does nothing, as completions are processed in another thread.
708 """
709 if completions:
710 self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
711
712 for p in completions:
713 p.evaluate()
714
715 @orchestrator._cli_write_command(
716 prefix='cephadm set-ssh-config',
717 desc='Set the ssh_config file (use -i <ssh_config>)')
718 def _set_ssh_config(self, inbuf=None):
719 """
720 Set an ssh_config file provided from stdin
721
722 TODO:
723 - validation
724 """
725 if inbuf is None or len(inbuf) == 0:
726 return -errno.EINVAL, "", "empty ssh config provided"
727 self.set_store("ssh_config", inbuf)
728 self.log.info('Set ssh_config')
729 return 0, "", ""
730
731 @orchestrator._cli_write_command(
732 prefix='cephadm clear-ssh-config',
733 desc='Clear the ssh_config file')
734 def _clear_ssh_config(self):
735 """
736 Clear the ssh_config file provided from stdin
737 """
738 self.set_store("ssh_config", None)
739 self.ssh_config_tmp = None
740 self.log.info('Cleared ssh_config')
741 return 0, "", ""
742
743 @orchestrator._cli_read_command(
744 prefix='cephadm get-ssh-config',
745 desc='Returns the ssh config as used by cephadm'
746 )
747 def _get_ssh_config(self):
748 if self.ssh_config_file:
749 self.validate_ssh_config_fname(self.ssh_config_file)
750 with open(self.ssh_config_file) as f:
751 return HandleCommandResult(stdout=f.read())
752 ssh_config = self.get_store("ssh_config")
753 if ssh_config:
754 return HandleCommandResult(stdout=ssh_config)
755 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
756
757
758 @orchestrator._cli_write_command(
759 'cephadm generate-key',
760 desc='Generate a cluster SSH key (if not present)')
761 def _generate_key(self):
762 if not self.ssh_pub or not self.ssh_key:
763 self.log.info('Generating ssh key...')
764 tmp_dir = TemporaryDirectory()
765 path = tmp_dir.name + '/key'
766 try:
767 subprocess.check_call([
768 '/usr/bin/ssh-keygen',
769 '-C', 'ceph-%s' % self._cluster_fsid,
770 '-N', '',
771 '-f', path
772 ])
773 with open(path, 'r') as f:
774 secret = f.read()
775 with open(path + '.pub', 'r') as f:
776 pub = f.read()
777 finally:
778 os.unlink(path)
779 os.unlink(path + '.pub')
780 tmp_dir.cleanup()
781 self.set_store('ssh_identity_key', secret)
782 self.set_store('ssh_identity_pub', pub)
783 self._reconfig_ssh()
784 return 0, '', ''
785
786 @orchestrator._cli_write_command(
787 'cephadm set-priv-key',
788 desc='Set cluster SSH private key (use -i <private_key>)')
789 def _set_priv_key(self, inbuf=None):
790 if inbuf is None or len(inbuf) == 0:
791 return -errno.EINVAL, "", "empty private ssh key provided"
792 self.set_store("ssh_identity_key", inbuf)
793 self.log.info('Set ssh private key')
794 self._reconfig_ssh()
795 return 0, "", ""
796
797 @orchestrator._cli_write_command(
798 'cephadm set-pub-key',
799 desc='Set cluster SSH public key (use -i <public_key>)')
800 def _set_pub_key(self, inbuf=None):
801 if inbuf is None or len(inbuf) == 0:
802 return -errno.EINVAL, "", "empty public ssh key provided"
803 self.set_store("ssh_identity_pub", inbuf)
804 self.log.info('Set ssh public key')
805 self._reconfig_ssh()
806 return 0, "", ""
807
808 @orchestrator._cli_write_command(
809 'cephadm clear-key',
810 desc='Clear cluster SSH key')
811 def _clear_key(self):
812 self.set_store('ssh_identity_key', None)
813 self.set_store('ssh_identity_pub', None)
814 self._reconfig_ssh()
815 self.log.info('Cleared cluster SSH key')
816 return 0, '', ''
817
818 @orchestrator._cli_read_command(
819 'cephadm get-pub-key',
820 desc='Show SSH public key for connecting to cluster hosts')
821 def _get_pub_key(self):
822 if self.ssh_pub:
823 return 0, self.ssh_pub, ''
824 else:
825 return -errno.ENOENT, '', 'No cluster SSH key defined'
826
827 @orchestrator._cli_read_command(
828 'cephadm get-user',
829 desc='Show user for SSHing to cluster hosts')
830 def _get_user(self):
831 return 0, self.ssh_user, ''
832
833 @orchestrator._cli_read_command(
834 'cephadm check-host',
835 'name=host,type=CephString '
836 'name=addr,type=CephString,req=false',
837 'Check whether we can access and manage a remote host')
838 def check_host(self, host, addr=None):
839 out, err, code = self._run_cephadm(host, 'client', 'check-host',
840 ['--expect-hostname', host],
841 addr=addr,
842 error_ok=True, no_fsid=True)
843 if code:
844 return 1, '', ('check-host failed:\n' + '\n'.join(err))
845 # if we have an outstanding health alert for this host, give the
846 # serve thread a kick
847 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
848 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
849 if item.startswith('host %s ' % host):
850 self.event.set()
851 return 0, '%s (%s) ok' % (host, addr), err
852
853 @orchestrator._cli_read_command(
854 'cephadm prepare-host',
855 'name=host,type=CephString '
856 'name=addr,type=CephString,req=false',
857 'Prepare a remote host for use with cephadm')
858 def _prepare_host(self, host, addr=None):
859 out, err, code = self._run_cephadm(host, 'client', 'prepare-host',
860 ['--expect-hostname', host],
861 addr=addr,
862 error_ok=True, no_fsid=True)
863 if code:
864 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
865 # if we have an outstanding health alert for this host, give the
866 # serve thread a kick
867 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
868 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
869 if item.startswith('host %s ' % host):
870 self.event.set()
871 return 0, '%s (%s) ok' % (host, addr), err
872
873 def _get_connection(self, host):
874 """
875 Setup a connection for running commands on remote host.
876 """
877 conn, r = self._cons.get(host, (None, None))
878 if conn:
879 if conn.has_connection():
880 self.log.debug('Have connection to %s' % host)
881 return conn, r
882 else:
883 self._reset_con(host)
884 n = self.ssh_user + '@' + host
885 self.log.debug("Opening connection to {} with ssh options '{}'".format(
886 n, self._ssh_options))
887 child_logger=self.log.getChild(n)
888 child_logger.setLevel('WARNING')
889 conn = remoto.Connection(
890 n,
891 logger=child_logger,
892 ssh_options=self._ssh_options)
893
894 r = conn.import_module(remotes)
895 self._cons[host] = conn, r
896
897 return conn, r
898
899 def _executable_path(self, conn, executable):
900 """
901 Remote validator that accepts a connection object to ensure that a certain
902 executable is available returning its full path if so.
903
904 Otherwise an exception with thorough details will be raised, informing the
905 user that the executable was not found.
906 """
907 executable_path = conn.remote_module.which(executable)
908 if not executable_path:
909 raise RuntimeError("Executable '{}' not found on host '{}'".format(
910 executable, conn.hostname))
911 self.log.debug("Found executable '{}' at path '{}'".format(executable,
912 executable_path))
913 return executable_path
914
915 def _run_cephadm(self,
916 host: str,
917 entity: Optional[str],
918 command: str,
919 args: List[str],
920 addr: Optional[str] = None,
921 stdin: Optional[str] = None,
922 no_fsid=False,
923 error_ok=False,
924 image: Optional[str] = None,
925 env_vars: Optional[List[str]] = None,
926 ) -> Tuple[List[str], List[str], int]:
927 """
928 Run cephadm on the remote host with the given command + args
929
930 :env_vars: in format -> [KEY=VALUE, ..]
931 """
932 if not addr and host in self.inventory:
933 addr = self.inventory.get_addr(host)
934
935 self.offline_hosts_remove(host)
936
937 try:
938 try:
939 conn, connr = self._get_connection(addr)
940 except OSError as e:
941 if error_ok:
942 self.log.exception('failed to establish ssh connection')
943 return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
944 raise execnet.gateway_bootstrap.HostNotFound(str(e)) from e
945
946 assert image or entity
947 if not image:
948 daemon_type = entity.split('.', 1)[0] # type: ignore
949 if daemon_type in CEPH_TYPES or \
950 daemon_type == 'nfs' or \
951 daemon_type == 'iscsi':
952 # get container image
953 ret, image, err = self.check_mon_command({
954 'prefix': 'config get',
955 'who': utils.name_to_config_section(entity),
956 'key': 'container_image',
957 })
958 image = image.strip() # type: ignore
959 elif daemon_type == 'prometheus':
960 image = self.container_image_prometheus
961 elif daemon_type == 'grafana':
962 image = self.container_image_grafana
963 elif daemon_type == 'alertmanager':
964 image = self.container_image_alertmanager
965 elif daemon_type == 'node-exporter':
966 image = self.container_image_node_exporter
967
968 self.log.debug('%s container image %s' % (entity, image))
969
970 final_args = []
971
972 if env_vars:
973 for env_var_pair in env_vars:
974 final_args.extend(['--env', env_var_pair])
975
976 if image:
977 final_args.extend(['--image', image])
978 final_args.append(command)
979
980 if not no_fsid:
981 final_args += ['--fsid', self._cluster_fsid]
982 final_args += args
983
984 self.log.debug('args: %s' % (' '.join(final_args)))
985 if self.mode == 'root':
986 if stdin:
987 self.log.debug('stdin: %s' % stdin)
988 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
989 if stdin:
990 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
991 script += self._cephadm
992 python = connr.choose_python()
993 if not python:
994 raise RuntimeError(
995 'unable to find python on %s (tried %s in %s)' % (
996 host, remotes.PYTHONS, remotes.PATH))
997 try:
998 out, err, code = remoto.process.check(
999 conn,
1000 [python, '-u'],
1001 stdin=script.encode('utf-8'))
1002 except RuntimeError as e:
1003 self._reset_con(host)
1004 if error_ok:
1005 return [], [str(e)], 1
1006 raise
1007 elif self.mode == 'cephadm-package':
1008 try:
1009 out, err, code = remoto.process.check(
1010 conn,
1011 ['sudo', '/usr/bin/cephadm'] + final_args,
1012 stdin=stdin)
1013 except RuntimeError as e:
1014 self._reset_con(host)
1015 if error_ok:
1016 return [], [str(e)], 1
1017 raise
1018 else:
1019 assert False, 'unsupported mode'
1020
1021 self.log.debug('code: %d' % code)
1022 if out:
1023 self.log.debug('out: %s' % '\n'.join(out))
1024 if err:
1025 self.log.debug('err: %s' % '\n'.join(err))
1026 if code and not error_ok:
1027 raise RuntimeError(
1028 'cephadm exited with an error code: %d, stderr:%s' % (
1029 code, '\n'.join(err)))
1030 return out, err, code
1031
1032 except execnet.gateway_bootstrap.HostNotFound as e:
1033 # this is a misleading exception as it seems to be thrown for
1034 # any sort of connection failure, even those having nothing to
1035 # do with "host not found" (e.g., ssh key permission denied).
1036 self.offline_hosts.add(host)
1037 user = 'root' if self.mode == 'root' else 'cephadm'
1038 msg = f'''Failed to connect to {host} ({addr}).
1039 Check that the host is reachable and accepts connections using the cephadm SSH key
1040
1041 you may want to run:
1042 > ceph cephadm get-ssh-config > ssh_config
1043 > ceph config-key get mgr/cephadm/ssh_identity_key > key
1044 > ssh -F ssh_config -i key {user}@{host}'''
1045 raise OrchestratorError(msg) from e
1046 except Exception as ex:
1047 self.log.exception(ex)
1048 raise
1049
1050 def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
1051 return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
1052
1053 def _add_host(self, spec):
1054 # type: (HostSpec) -> str
1055 """
1056 Add a host to be managed by the orchestrator.
1057
1058 :param host: host name
1059 """
1060 assert_valid_host(spec.hostname)
1061 out, err, code = self._run_cephadm(spec.hostname, 'client', 'check-host',
1062 ['--expect-hostname', spec.hostname],
1063 addr=spec.addr,
1064 error_ok=True, no_fsid=True)
1065 if code:
1066 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1067 spec.hostname, spec.addr, err))
1068
1069 self.inventory.add_host(spec)
1070 self.cache.prime_empty_host(spec.hostname)
1071 self.offline_hosts_remove(spec.hostname)
1072 self.event.set() # refresh stray health check
1073 self.log.info('Added host %s' % spec.hostname)
1074 return "Added host '{}'".format(spec.hostname)
1075
1076 @trivial_completion
1077 def add_host(self, spec: HostSpec) -> str:
1078 return self._add_host(spec)
1079
1080 @trivial_completion
1081 def remove_host(self, host):
1082 # type: (str) -> str
1083 """
1084 Remove a host from orchestrator management.
1085
1086 :param host: host name
1087 """
1088 self.inventory.rm_host(host)
1089 self.cache.rm_host(host)
1090 self._reset_con(host)
1091 self.event.set() # refresh stray health check
1092 self.log.info('Removed host %s' % host)
1093 return "Removed host '{}'".format(host)
1094
1095 @trivial_completion
1096 def update_host_addr(self, host, addr):
1097 self.inventory.set_addr(host, addr)
1098 self._reset_con(host)
1099 self.event.set() # refresh stray health check
1100 self.log.info('Set host %s addr to %s' % (host, addr))
1101 return "Updated host '{}' addr to '{}'".format(host, addr)
1102
1103 @trivial_completion
1104 def get_hosts(self):
1105 # type: () -> List[orchestrator.HostSpec]
1106 """
1107 Return a list of hosts managed by the orchestrator.
1108
1109 Notes:
1110 - skip async: manager reads from cache.
1111 """
1112 return list(self.inventory.all_specs())
1113
1114 @trivial_completion
1115 def add_host_label(self, host, label):
1116 self.inventory.add_label(host, label)
1117 self.log.info('Added label %s to host %s' % (label, host))
1118 return 'Added label %s to host %s' % (label, host)
1119
1120 @trivial_completion
1121 def remove_host_label(self, host, label):
1122 self.inventory.rm_label(host, label)
1123 self.log.info('Removed label %s to host %s' % (label, host))
1124 return 'Removed label %s from host %s' % (label, host)
1125
1126 def update_osdspec_previews(self, search_host: str = ''):
1127 # Set global 'pending' flag for host
1128 self.cache.loading_osdspec_preview.add(search_host)
1129 previews = []
1130 # query OSDSpecs for host <search host> and generate/get the preview
1131 # There can be multiple previews for one host due to multiple OSDSpecs.
1132 previews.extend(self.osd_service.get_previews(search_host))
1133 self.log.debug(f"Loading OSDSpec previews to HostCache")
1134 self.cache.osdspec_previews[search_host] = previews
1135 # Unset global 'pending' flag for host
1136 self.cache.loading_osdspec_preview.remove(search_host)
1137
1138 def _refresh_host_osdspec_previews(self, host) -> bool:
1139 self.update_osdspec_previews(host)
1140 self.cache.save_host(host)
1141 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
1142 return True
1143
1144 def _refresh_host_daemons(self, host):
1145 try:
1146 out, err, code = self._run_cephadm(
1147 host, 'mon', 'ls', [], no_fsid=True)
1148 if code:
1149 return 'host %s cephadm ls returned %d: %s' % (
1150 host, code, err)
1151 except Exception as e:
1152 return 'host %s scrape failed: %s' % (host, e)
1153 ls = json.loads(''.join(out))
1154 dm = {}
1155 for d in ls:
1156 if not d['style'].startswith('cephadm'):
1157 continue
1158 if d['fsid'] != self._cluster_fsid:
1159 continue
1160 if '.' not in d['name']:
1161 continue
1162 sd = orchestrator.DaemonDescription()
1163 sd.last_refresh = datetime.datetime.utcnow()
1164 for k in ['created', 'started', 'last_configured', 'last_deployed']:
1165 v = d.get(k, None)
1166 if v:
1167 setattr(sd, k, datetime.datetime.strptime(d[k], DATEFMT))
1168 sd.daemon_type = d['name'].split('.')[0]
1169 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
1170 sd.hostname = host
1171 sd.container_id = d.get('container_id')
1172 if sd.container_id:
1173 # shorten the hash
1174 sd.container_id = sd.container_id[0:12]
1175 sd.container_image_name = d.get('container_image_name')
1176 sd.container_image_id = d.get('container_image_id')
1177 sd.version = d.get('version')
1178 if sd.daemon_type == 'osd':
1179 sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
1180 if 'state' in d:
1181 sd.status_desc = d['state']
1182 sd.status = {
1183 'running': 1,
1184 'stopped': 0,
1185 'error': -1,
1186 'unknown': -1,
1187 }[d['state']]
1188 else:
1189 sd.status_desc = 'unknown'
1190 sd.status = None
1191 dm[sd.name()] = sd
1192 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
1193 self.cache.update_host_daemons(host, dm)
1194 self.cache.save_host(host)
1195 return None
1196
1197 def _refresh_host_devices(self, host):
1198 try:
1199 out, err, code = self._run_cephadm(
1200 host, 'osd',
1201 'ceph-volume',
1202 ['--', 'inventory', '--format=json'])
1203 if code:
1204 return 'host %s ceph-volume inventory returned %d: %s' % (
1205 host, code, err)
1206 except Exception as e:
1207 return 'host %s ceph-volume inventory failed: %s' % (host, e)
1208 devices = json.loads(''.join(out))
1209 try:
1210 out, err, code = self._run_cephadm(
1211 host, 'mon',
1212 'list-networks',
1213 [],
1214 no_fsid=True)
1215 if code:
1216 return 'host %s list-networks returned %d: %s' % (
1217 host, code, err)
1218 except Exception as e:
1219 return 'host %s list-networks failed: %s' % (host, e)
1220 networks = json.loads(''.join(out))
1221 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
1222 host, len(devices), len(networks)))
1223 devices = inventory.Devices.from_json(devices)
1224 self.cache.update_host_devices_networks(host, devices.devices, networks)
1225 self.update_osdspec_previews(host)
1226 self.cache.save_host(host)
1227 return None
1228
1229 @trivial_completion
1230 def describe_service(self, service_type=None, service_name=None,
1231 refresh=False):
1232 if refresh:
1233 # ugly sync path, FIXME someday perhaps?
1234 for host in self.inventory.keys():
1235 self._refresh_host_daemons(host)
1236 # <service_map>
1237 sm = {} # type: Dict[str, orchestrator.ServiceDescription]
1238 osd_count = 0
1239 for h, dm in self.cache.get_daemons_with_volatile_status():
1240 for name, dd in dm.items():
1241 if service_type and service_type != dd.daemon_type:
1242 continue
1243 n: str = dd.service_name()
1244 if service_name and service_name != n:
1245 continue
1246 if dd.daemon_type == 'osd':
1247 """
1248 OSDs do not know the affinity to their spec out of the box.
1249 """
1250 n = f"osd.{dd.osdspec_affinity}"
1251 if n in self.spec_store.specs:
1252 spec = self.spec_store.specs[n]
1253 else:
1254 spec = ServiceSpec(
1255 unmanaged=True,
1256 service_type=dd.daemon_type,
1257 service_id=dd.service_id(),
1258 placement=PlacementSpec(
1259 hosts=[dd.hostname]
1260 )
1261 )
1262 if n not in sm:
1263 sm[n] = orchestrator.ServiceDescription(
1264 last_refresh=dd.last_refresh,
1265 container_image_id=dd.container_image_id,
1266 container_image_name=dd.container_image_name,
1267 spec=spec,
1268 )
1269 if n in self.spec_store.specs:
1270 if dd.daemon_type == 'osd':
1271 """
1272 The osd count can't be determined by the Placement spec.
1273 It's rather pointless to show a actual/expected representation
1274 here. So we're setting running = size for now.
1275 """
1276 osd_count += 1
1277 sm[n].size = osd_count
1278 else:
1279 sm[n].size = spec.placement.get_host_selection_size(self._get_hosts)
1280
1281 sm[n].created = self.spec_store.spec_created[n]
1282 if service_type == 'nfs':
1283 spec = cast(NFSServiceSpec, spec)
1284 sm[n].rados_config_location = spec.rados_config_location()
1285 else:
1286 sm[n].size = 0
1287 if dd.status == 1:
1288 sm[n].running += 1
1289 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1290 sm[n].last_refresh = dd.last_refresh
1291 if sm[n].container_image_id != dd.container_image_id:
1292 sm[n].container_image_id = 'mix'
1293 if sm[n].container_image_name != dd.container_image_name:
1294 sm[n].container_image_name = 'mix'
1295 for n, spec in self.spec_store.specs.items():
1296 if n in sm:
1297 continue
1298 if service_type is not None and service_type != spec.service_type:
1299 continue
1300 if service_name is not None and service_name != n:
1301 continue
1302 sm[n] = orchestrator.ServiceDescription(
1303 spec=spec,
1304 size=spec.placement.get_host_selection_size(self._get_hosts),
1305 running=0,
1306 )
1307 if service_type == 'nfs':
1308 spec = cast(NFSServiceSpec, spec)
1309 sm[n].rados_config_location = spec.rados_config_location()
1310 return list(sm.values())
1311
1312 @trivial_completion
1313 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
1314 host=None, refresh=False):
1315 if refresh:
1316 # ugly sync path, FIXME someday perhaps?
1317 if host:
1318 self._refresh_host_daemons(host)
1319 else:
1320 for hostname in self.inventory.keys():
1321 self._refresh_host_daemons(hostname)
1322 result = []
1323 for h, dm in self.cache.get_daemons_with_volatile_status():
1324 if host and h != host:
1325 continue
1326 for name, dd in dm.items():
1327 if daemon_type is not None and daemon_type != dd.daemon_type:
1328 continue
1329 if daemon_id is not None and daemon_id != dd.daemon_id:
1330 continue
1331 if service_name is not None and service_name != dd.service_name():
1332 continue
1333 result.append(dd)
1334 return result
1335
1336 @trivial_completion
1337 def service_action(self, action, service_name):
1338 args = []
1339 for host, dm in self.cache.daemons.items():
1340 for name, d in dm.items():
1341 if d.matches_service(service_name):
1342 args.append((d.daemon_type, d.daemon_id,
1343 d.hostname, action))
1344 self.log.info('%s service %s' % (action.capitalize(), service_name))
1345 return self._daemon_actions(args)
1346
1347 @forall_hosts
1348 def _daemon_actions(self, daemon_type, daemon_id, host, action):
1349 return self._daemon_action(daemon_type, daemon_id, host, action)
1350
1351 def _daemon_action(self, daemon_type, daemon_id, host, action):
1352 if action == 'redeploy':
1353 # stop, recreate the container+unit, then restart
1354 return self._create_daemon(daemon_type, daemon_id, host)
1355 elif action == 'reconfig':
1356 return self._create_daemon(daemon_type, daemon_id, host,
1357 reconfig=True)
1358
1359 actions = {
1360 'start': ['reset-failed', 'start'],
1361 'stop': ['stop'],
1362 'restart': ['reset-failed', 'restart'],
1363 }
1364 name = '%s.%s' % (daemon_type, daemon_id)
1365 for a in actions[action]:
1366 out, err, code = self._run_cephadm(
1367 host, name, 'unit',
1368 ['--name', name, a],
1369 error_ok=True)
1370 self.cache.invalidate_host_daemons(host)
1371 return "{} {} from host '{}'".format(action, name, host)
1372
1373 @trivial_completion
1374 def daemon_action(self, action, daemon_type, daemon_id):
1375 args = []
1376 for host, dm in self.cache.daemons.items():
1377 for name, d in dm.items():
1378 if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
1379 args.append((d.daemon_type, d.daemon_id,
1380 d.hostname, action))
1381 if not args:
1382 raise orchestrator.OrchestratorError(
1383 'Unable to find %s.%s daemon(s)' % (
1384 daemon_type, daemon_id))
1385 self.log.info('%s daemons %s' % (
1386 action.capitalize(),
1387 ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
1388 return self._daemon_actions(args)
1389
1390 @trivial_completion
1391 def remove_daemons(self, names):
1392 # type: (List[str]) -> List[str]
1393 args = []
1394 for host, dm in self.cache.daemons.items():
1395 for name in names:
1396 if name in dm:
1397 args.append((name, host))
1398 if not args:
1399 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1400 self.log.info('Remove daemons %s' % [a[0] for a in args])
1401 return self._remove_daemons(args)
1402
1403 @trivial_completion
1404 def remove_service(self, service_name):
1405 self.log.info('Remove service %s' % service_name)
1406 self._trigger_preview_refresh(service_name=service_name)
1407 found = self.spec_store.rm(service_name)
1408 if found:
1409 self._kick_serve_loop()
1410 return ['Removed service %s' % service_name]
1411 else:
1412 # must be idempotent: still a success.
1413 return [f'Failed to remove service. <{service_name}> was not found.']
1414
1415 @trivial_completion
1416 def get_inventory(self, host_filter=None, refresh=False):
1417 """
1418 Return the storage inventory of hosts matching the given filter.
1419
1420 :param host_filter: host filter
1421
1422 TODO:
1423 - add filtering by label
1424 """
1425 if refresh:
1426 # ugly sync path, FIXME someday perhaps?
1427 if host_filter:
1428 for host in host_filter.hosts:
1429 self._refresh_host_devices(host)
1430 else:
1431 for host in self.inventory.keys():
1432 self._refresh_host_devices(host)
1433
1434 result = []
1435 for host, dls in self.cache.devices.items():
1436 if host_filter and host not in host_filter.hosts:
1437 continue
1438 result.append(orchestrator.InventoryHost(host,
1439 inventory.Devices(dls)))
1440 return result
1441
1442 @trivial_completion
1443 def zap_device(self, host, path):
1444 self.log.info('Zap device %s:%s' % (host, path))
1445 out, err, code = self._run_cephadm(
1446 host, 'osd', 'ceph-volume',
1447 ['--', 'lvm', 'zap', '--destroy', path],
1448 error_ok=True)
1449 self.cache.invalidate_host_devices(host)
1450 if code:
1451 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1452 return '\n'.join(out + err)
1453
1454 @trivial_completion
1455 def blink_device_light(self, ident_fault, on, locs):
1456 @forall_hosts
1457 def blink(host, dev, path):
1458 cmd = [
1459 'lsmcli',
1460 'local-disk-%s-led-%s' % (
1461 ident_fault,
1462 'on' if on else 'off'),
1463 '--path', path or dev,
1464 ]
1465 out, err, code = self._run_cephadm(
1466 host, 'osd', 'shell', ['--'] + cmd,
1467 error_ok=True)
1468 if code:
1469 raise RuntimeError(
1470 'Unable to affect %s light for %s:%s. Command: %s' % (
1471 ident_fault, host, dev, ' '.join(cmd)))
1472 self.log.info('Set %s light for %s:%s %s' % (
1473 ident_fault, host, dev, 'on' if on else 'off'))
1474 return "Set %s light for %s:%s %s" % (
1475 ident_fault, host, dev, 'on' if on else 'off')
1476
1477 return blink(locs)
1478
1479 def get_osd_uuid_map(self, only_up=False):
1480 # type: (bool) -> Dict[str, str]
1481 osd_map = self.get('osd_map')
1482 r = {}
1483 for o in osd_map['osds']:
1484 # only include OSDs that have ever started in this map. this way
1485 # an interrupted osd create can be repeated and succeed the second
1486 # time around.
1487 osd_id = o.get('osd')
1488 if osd_id is None:
1489 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1490 if not only_up or (o['up_from'] > 0):
1491 r[str(osd_id)] = o.get('uuid', '')
1492 return r
1493
1494 def resolve_hosts_for_osdspecs(self,
1495 specs: Optional[List[DriveGroupSpec]] = None,
1496 service_name: Optional[str] = None
1497 ) -> List[str]:
1498 osdspecs = []
1499 if service_name:
1500 self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
1501 osdspecs = self.spec_store.find(service_name=service_name)
1502 self.log.debug(f"Found OSDSpecs: {osdspecs}")
1503 if specs:
1504 osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
1505 if not service_name and not specs:
1506 # if neither parameters are fulfilled, search for all available osdspecs
1507 osdspecs = self.spec_store.find(service_name='osd')
1508 self.log.debug(f"Found OSDSpecs: {osdspecs}")
1509 if not osdspecs:
1510 self.log.debug("No OSDSpecs found")
1511 return []
1512 return sum([spec.placement.filter_matching_hosts(self._get_hosts) for spec in osdspecs], [])
1513
1514 def resolve_osdspecs_for_host(self, host):
1515 matching_specs = []
1516 self.log.debug(f"Finding OSDSpecs for host: <{host}>")
1517 for spec in self.spec_store.find('osd'):
1518 if host in spec.placement.filter_matching_hosts(self._get_hosts):
1519 self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
1520 matching_specs.append(spec)
1521 return matching_specs
1522
1523 def _trigger_preview_refresh(self,
1524 specs: Optional[List[DriveGroupSpec]] = None,
1525 service_name: Optional[str] = None):
1526 refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
1527 for host in refresh_hosts:
1528 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1529 self.cache.osdspec_previews_refresh_queue.append(host)
1530
1531 @trivial_completion
1532 def apply_drivegroups(self, specs: List[DriveGroupSpec]):
1533 self._trigger_preview_refresh(specs=specs)
1534 return [self._apply(spec) for spec in specs]
1535
1536 @trivial_completion
1537 def create_osds(self, drive_group: DriveGroupSpec):
1538 return self.osd_service.create(drive_group)
1539
1540 @trivial_completion
1541 def preview_osdspecs(self,
1542 osdspec_name: Optional[str] = None,
1543 osdspecs: Optional[List[DriveGroupSpec]] = None
1544 ):
1545 matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
1546 if not matching_hosts:
1547 return {'n/a': [{'error': True,
1548 'message': 'No OSDSpec or matching hosts found.'}]}
1549 # Is any host still loading previews
1550 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
1551 if pending_hosts:
1552 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1553 return {'n/a': [{'error': True,
1554 'message': 'Preview data is being generated.. '
1555 'Please try again in a bit.'}]}
1556 # drop all keys that are not in search_hosts and return preview struct
1557 return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
1558
1559 def _calc_daemon_deps(self, daemon_type, daemon_id):
1560 need = {
1561 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1562 'grafana': ['prometheus'],
1563 'alertmanager': ['mgr', 'alertmanager'],
1564 }
1565 deps = []
1566 for dep_type in need.get(daemon_type, []):
1567 for dd in self.cache.get_daemons_by_service(dep_type):
1568 deps.append(dd.name())
1569 return sorted(deps)
1570
1571 def _get_config_and_keyring(self, daemon_type, daemon_id,
1572 keyring=None,
1573 extra_ceph_config=None):
1574 # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
1575 # keyring
1576 if not keyring:
1577 ename = utils.name_to_auth_entity(daemon_type + '.' + daemon_id)
1578 ret, keyring, err = self.check_mon_command({
1579 'prefix': 'auth get',
1580 'entity': ename,
1581 })
1582
1583 # generate config
1584 ret, config, err = self.check_mon_command({
1585 "prefix": "config generate-minimal-conf",
1586 })
1587 if extra_ceph_config:
1588 config += extra_ceph_config
1589
1590 return {
1591 'config': config,
1592 'keyring': keyring,
1593 }
1594
1595 def _create_daemon(self,
1596 daemon_type: str,
1597 daemon_id: str,
1598 host: str,
1599 keyring: Optional[str] = None,
1600 extra_args: Optional[List[str]] = None,
1601 extra_config: Optional[Dict[str, Any]] = None,
1602 reconfig=False,
1603 osd_uuid_map: Optional[Dict[str, Any]] = None,
1604 redeploy=False,
1605 ) -> str:
1606
1607 if not extra_args:
1608 extra_args = []
1609 if not extra_config:
1610 extra_config = {}
1611 name = '%s.%s' % (daemon_type, daemon_id)
1612
1613 start_time = datetime.datetime.utcnow()
1614 deps = [] # type: List[str]
1615 cephadm_config = {} # type: Dict[str, Any]
1616 if daemon_type == 'prometheus':
1617 cephadm_config, deps = self.prometheus_service.generate_config()
1618 extra_args.extend(['--config-json', '-'])
1619 elif daemon_type == 'grafana':
1620 cephadm_config, deps = self.grafana_service.generate_config()
1621 extra_args.extend(['--config-json', '-'])
1622 elif daemon_type == 'nfs':
1623 cephadm_config, deps = \
1624 self.nfs_service._generate_nfs_config(daemon_type, daemon_id, host)
1625 extra_args.extend(['--config-json', '-'])
1626 elif daemon_type == 'alertmanager':
1627 cephadm_config, deps = self.alertmanager_service.generate_config()
1628 extra_args.extend(['--config-json', '-'])
1629 elif daemon_type == 'node-exporter':
1630 cephadm_config, deps = self.node_exporter_service.generate_config()
1631 extra_args.extend(['--config-json', '-'])
1632 else:
1633 # Ceph.daemons (mon, mgr, mds, osd, etc)
1634 cephadm_config = self._get_config_and_keyring(
1635 daemon_type, daemon_id,
1636 keyring=keyring,
1637 extra_ceph_config=extra_config.pop('config', ''))
1638 if extra_config:
1639 cephadm_config.update({'files': extra_config})
1640 extra_args.extend(['--config-json', '-'])
1641
1642 # osd deployments needs an --osd-uuid arg
1643 if daemon_type == 'osd':
1644 if not osd_uuid_map:
1645 osd_uuid_map = self.get_osd_uuid_map()
1646 osd_uuid = osd_uuid_map.get(daemon_id)
1647 if not osd_uuid:
1648 raise OrchestratorError('osd.%s not in osdmap' % daemon_id)
1649 extra_args.extend(['--osd-fsid', osd_uuid])
1650
1651 if reconfig:
1652 extra_args.append('--reconfig')
1653 if self.allow_ptrace:
1654 extra_args.append('--allow-ptrace')
1655
1656 self.log.info('%s daemon %s on %s' % (
1657 'Reconfiguring' if reconfig else 'Deploying',
1658 name, host))
1659
1660 out, err, code = self._run_cephadm(
1661 host, name, 'deploy',
1662 [
1663 '--name', name,
1664 ] + extra_args,
1665 stdin=json.dumps(cephadm_config))
1666 if not code and host in self.cache.daemons:
1667 # prime cached service state with what we (should have)
1668 # just created
1669 sd = orchestrator.DaemonDescription()
1670 sd.daemon_type = daemon_type
1671 sd.daemon_id = daemon_id
1672 sd.hostname = host
1673 sd.status = 1
1674 sd.status_desc = 'starting'
1675 self.cache.add_daemon(host, sd)
1676 self.cache.invalidate_host_daemons(host)
1677 self.cache.update_daemon_config_deps(host, name, deps, start_time)
1678 self.cache.save_host(host)
1679 return "{} {} on host '{}'".format(
1680 'Reconfigured' if reconfig else 'Deployed', name, host)
1681
1682 @forall_hosts
1683 def _remove_daemons(self, name, host) -> str:
1684 return self._remove_daemon(name, host)
1685
1686 def _remove_daemon(self, name, host) -> str:
1687 """
1688 Remove a daemon
1689 """
1690 (daemon_type, daemon_id) = name.split('.', 1)
1691 if daemon_type == 'mon':
1692 self._check_safe_to_destroy_mon(daemon_id)
1693
1694 # remove mon from quorum before we destroy the daemon
1695 self.log.info('Removing monitor %s from monmap...' % name)
1696 ret, out, err = self.check_mon_command({
1697 'prefix': 'mon rm',
1698 'name': daemon_id,
1699 })
1700
1701 args = ['--name', name, '--force']
1702 self.log.info('Removing daemon %s from %s' % (name, host))
1703 out, err, code = self._run_cephadm(
1704 host, name, 'rm-daemon', args)
1705 if not code:
1706 # remove item from cache
1707 self.cache.rm_daemon(host, name)
1708 self.cache.invalidate_host_daemons(host)
1709 return "Removed {} from host '{}'".format(name, host)
1710
1711 def _create_fn(self, service_type: str) -> Callable[..., str]:
1712 try:
1713 d: Dict[str, function] = {
1714 'mon': self.mon_service.create,
1715 'mgr': self.mgr_service.create,
1716 'osd': self.osd_service.create,
1717 'mds': self.mds_service.create,
1718 'rgw': self.rgw_service.create,
1719 'rbd-mirror': self.rbd_mirror_service.create,
1720 'nfs': self.nfs_service.create,
1721 'grafana': self.grafana_service.create,
1722 'alertmanager': self.alertmanager_service.create,
1723 'prometheus': self.prometheus_service.create,
1724 'node-exporter': self.node_exporter_service.create,
1725 'crash': self.crash_service.create,
1726 'iscsi': self.iscsi_service.create,
1727 }
1728 return d[service_type] # type: ignore
1729 except KeyError:
1730 self.log.exception(f'unknown service type {service_type}')
1731 raise OrchestratorError(f'unknown service type {service_type}') from e
1732
1733 def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
1734 return {
1735 'mds': self.mds_service.config,
1736 'rgw': self.rgw_service.config,
1737 'nfs': self.nfs_service.config,
1738 'iscsi': self.iscsi_service.config,
1739 }.get(service_type)
1740
1741 def _apply_service(self, spec: ServiceSpec) -> bool:
1742 """
1743 Schedule a service. Deploy new daemons or remove old ones, depending
1744 on the target label and count specified in the placement.
1745 """
1746 daemon_type = spec.service_type
1747 service_name = spec.service_name()
1748 if spec.unmanaged:
1749 self.log.debug('Skipping unmanaged service %s spec' % service_name)
1750 return False
1751 self.log.debug('Applying service %s spec' % service_name)
1752
1753 create_func = self._create_fn(daemon_type)
1754 config_func = self._config_fn(daemon_type)
1755
1756 if daemon_type == 'osd':
1757 create_func(spec)
1758 # TODO: return True would result in a busy loop
1759 return False
1760
1761 daemons = self.cache.get_daemons_by_service(service_name)
1762
1763 public_network = None
1764 if daemon_type == 'mon':
1765 ret, out, err = self.check_mon_command({
1766 'prefix': 'config get',
1767 'who': 'mon',
1768 'key': 'public_network',
1769 })
1770 if '/' in out:
1771 public_network = out.strip()
1772 self.log.debug('mon public_network is %s' % public_network)
1773
1774 def matches_network(host):
1775 # type: (str) -> bool
1776 if not public_network:
1777 return False
1778 # make sure we have 1 or more IPs for that network on that
1779 # host
1780 return len(self.cache.networks[host].get(public_network, [])) > 0
1781
1782 hosts = HostAssignment(
1783 spec=spec,
1784 get_hosts_func=self._get_hosts,
1785 get_daemons_func=self.cache.get_daemons_by_service,
1786 filter_new_host=matches_network if daemon_type == 'mon' else None,
1787 ).place()
1788
1789 r = False
1790
1791 # sanity check
1792 if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
1793 self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
1794 return False
1795
1796 # add any?
1797 did_config = False
1798 hosts_with_daemons = {d.hostname for d in daemons}
1799 self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
1800 for host, network, name in hosts:
1801 if host not in hosts_with_daemons:
1802 if not did_config and config_func:
1803 config_func(spec)
1804 did_config = True
1805 daemon_id = self.get_unique_name(daemon_type, host, daemons,
1806 prefix=spec.service_id,
1807 forcename=name)
1808 self.log.debug('Placing %s.%s on host %s' % (
1809 daemon_type, daemon_id, host))
1810 if daemon_type == 'mon':
1811 create_func(daemon_id, host, network) # type: ignore
1812 elif daemon_type in ['nfs', 'iscsi']:
1813 create_func(daemon_id, host, spec) # type: ignore
1814 else:
1815 create_func(daemon_id, host) # type: ignore
1816
1817 # add to daemon list so next name(s) will also be unique
1818 sd = orchestrator.DaemonDescription(
1819 hostname=host,
1820 daemon_type=daemon_type,
1821 daemon_id=daemon_id,
1822 )
1823 daemons.append(sd)
1824 r = True
1825
1826 # remove any?
1827 target_hosts = [h.hostname for h in hosts]
1828 for d in daemons:
1829 if d.hostname not in target_hosts:
1830 # NOTE: we are passing the 'force' flag here, which means
1831 # we can delete a mon instances data.
1832 self._remove_daemon(d.name(), d.hostname)
1833 r = True
1834
1835 return r
1836
1837 def _apply_all_services(self):
1838 r = False
1839 specs = [] # type: List[ServiceSpec]
1840 for sn, spec in self.spec_store.specs.items():
1841 specs.append(spec)
1842 for spec in specs:
1843 try:
1844 if self._apply_service(spec):
1845 r = True
1846 except Exception as e:
1847 self.log.exception('Failed to apply %s spec %s: %s' % (
1848 spec.service_name(), spec, e))
1849 return r
1850
1851 def _check_pool_exists(self, pool, service_name):
1852 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
1853 if not self.rados.pool_exists(pool):
1854 raise OrchestratorError(f'Cannot find pool "{pool}" for '
1855 f'service {service_name}')
1856
1857 def _check_daemons(self):
1858 # get monmap mtime so we can refresh configs when mons change
1859 monmap = self.get('mon_map')
1860 last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
1861 monmap['modified'], CEPH_DATEFMT)
1862 if last_monmap and last_monmap > datetime.datetime.utcnow():
1863 last_monmap = None # just in case clocks are skewed
1864
1865 daemons = self.cache.get_daemons()
1866 daemons_post = defaultdict(list)
1867 for dd in daemons:
1868 # orphan?
1869 spec = self.spec_store.specs.get(dd.service_name(), None)
1870 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
1871 # (mon and mgr specs should always exist; osds aren't matched
1872 # to a service spec)
1873 self.log.info('Removing orphan daemon %s...' % dd.name())
1874 self._remove_daemon(dd.name(), dd.hostname)
1875
1876 # ignore unmanaged services
1877 if spec and spec.unmanaged:
1878 continue
1879
1880 # These daemon types require additional configs after creation
1881 if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
1882 daemons_post[dd.daemon_type].append(dd)
1883
1884 deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
1885 last_deps, last_config = self.cache.get_daemon_last_config_deps(
1886 dd.hostname, dd.name())
1887 if last_deps is None:
1888 last_deps = []
1889 reconfig = False
1890 if not last_config:
1891 self.log.info('Reconfiguring %s (unknown last config time)...'% (
1892 dd.name()))
1893 reconfig = True
1894 elif last_deps != deps:
1895 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
1896 deps))
1897 self.log.info('Reconfiguring %s (dependencies changed)...' % (
1898 dd.name()))
1899 reconfig = True
1900 elif last_monmap and \
1901 last_monmap > last_config and \
1902 dd.daemon_type in CEPH_TYPES:
1903 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
1904 reconfig = True
1905 if reconfig:
1906 self._create_daemon(dd.daemon_type, dd.daemon_id,
1907 dd.hostname, reconfig=True)
1908
1909 # do daemon post actions
1910 for daemon_type, daemon_descs in daemons_post.items():
1911 self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
1912
1913 def _add_daemon(self, daemon_type, spec,
1914 create_func: Callable[..., T], config_func=None) -> List[T]:
1915 """
1916 Add (and place) a daemon. Require explicit host placement. Do not
1917 schedule, and do not apply the related scheduling limitations.
1918 """
1919 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
1920 if not spec.placement.hosts:
1921 raise OrchestratorError('must specify host(s) to deploy on')
1922 count = spec.placement.count or len(spec.placement.hosts)
1923 daemons = self.cache.get_daemons_by_service(spec.service_name())
1924 return self._create_daemons(daemon_type, spec, daemons,
1925 spec.placement.hosts, count,
1926 create_func, config_func)
1927
1928 def _create_daemons(self, daemon_type, spec, daemons,
1929 hosts, count,
1930 create_func: Callable[..., T], config_func=None) -> List[T]:
1931 if count > len(hosts):
1932 raise OrchestratorError('too few hosts: want %d, have %s' % (
1933 count, hosts))
1934
1935 if config_func:
1936 config_func(spec)
1937
1938 args = [] # type: List[tuple]
1939 for host, network, name in hosts:
1940 daemon_id = self.get_unique_name(daemon_type, host, daemons,
1941 prefix=spec.service_id,
1942 forcename=name)
1943 self.log.debug('Placing %s.%s on host %s' % (
1944 daemon_type, daemon_id, host))
1945 if daemon_type == 'mon':
1946 args.append((daemon_id, host, network)) # type: ignore
1947 elif daemon_type in ['nfs', 'iscsi']:
1948 args.append((daemon_id, host, spec)) # type: ignore
1949 else:
1950 args.append((daemon_id, host)) # type: ignore
1951
1952 # add to daemon list so next name(s) will also be unique
1953 sd = orchestrator.DaemonDescription(
1954 hostname=host,
1955 daemon_type=daemon_type,
1956 daemon_id=daemon_id,
1957 )
1958 daemons.append(sd)
1959
1960 @forall_hosts
1961 def create_func_map(*args):
1962 return create_func(*args)
1963
1964 return create_func_map(args)
1965
1966 @trivial_completion
1967 def apply_mon(self, spec):
1968 return self._apply(spec)
1969
1970 @trivial_completion
1971 def add_mon(self, spec):
1972 # type: (ServiceSpec) -> List[str]
1973 return self._add_daemon('mon', spec, self.mon_service.create)
1974
1975 @trivial_completion
1976 def add_mgr(self, spec):
1977 # type: (ServiceSpec) -> List[str]
1978 return self._add_daemon('mgr', spec, self.mgr_service.create)
1979
1980 def _apply(self, spec: GenericSpec) -> str:
1981 if spec.service_type == 'host':
1982 return self._add_host(cast(HostSpec, spec))
1983
1984 return self._apply_service_spec(cast(ServiceSpec, spec))
1985
1986 def _apply_service_spec(self, spec: ServiceSpec) -> str:
1987 if spec.placement.is_empty():
1988 # fill in default placement
1989 defaults = {
1990 'mon': PlacementSpec(count=5),
1991 'mgr': PlacementSpec(count=2),
1992 'mds': PlacementSpec(count=2),
1993 'rgw': PlacementSpec(count=2),
1994 'iscsi': PlacementSpec(count=1),
1995 'rbd-mirror': PlacementSpec(count=2),
1996 'nfs': PlacementSpec(count=1),
1997 'grafana': PlacementSpec(count=1),
1998 'alertmanager': PlacementSpec(count=1),
1999 'prometheus': PlacementSpec(count=1),
2000 'node-exporter': PlacementSpec(host_pattern='*'),
2001 'crash': PlacementSpec(host_pattern='*'),
2002 }
2003 spec.placement = defaults[spec.service_type]
2004 elif spec.service_type in ['mon', 'mgr'] and \
2005 spec.placement.count is not None and \
2006 spec.placement.count < 1:
2007 raise OrchestratorError('cannot scale %s service below 1' % (
2008 spec.service_type))
2009
2010 HostAssignment(
2011 spec=spec,
2012 get_hosts_func=self._get_hosts,
2013 get_daemons_func=self.cache.get_daemons_by_service,
2014 ).validate()
2015
2016 self.log.info('Saving service %s spec with placement %s' % (
2017 spec.service_name(), spec.placement.pretty_str()))
2018 self.spec_store.save(spec)
2019 self._kick_serve_loop()
2020 return "Scheduled %s update..." % spec.service_name()
2021
2022 @trivial_completion
2023 def apply(self, specs: List[GenericSpec]):
2024 results = []
2025 for spec in specs:
2026 results.append(self._apply(spec))
2027 return results
2028
2029 @trivial_completion
2030 def apply_mgr(self, spec):
2031 return self._apply(spec)
2032
2033 @trivial_completion
2034 def add_mds(self, spec: ServiceSpec):
2035 return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
2036
2037 @trivial_completion
2038 def apply_mds(self, spec: ServiceSpec):
2039 return self._apply(spec)
2040
2041 @trivial_completion
2042 def add_rgw(self, spec):
2043 return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
2044
2045 @trivial_completion
2046 def apply_rgw(self, spec):
2047 return self._apply(spec)
2048
2049 @trivial_completion
2050 def add_iscsi(self, spec):
2051 # type: (ServiceSpec) -> List[str]
2052 return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
2053
2054 @trivial_completion
2055 def apply_iscsi(self, spec):
2056 return self._apply(spec)
2057
2058 @trivial_completion
2059 def add_rbd_mirror(self, spec):
2060 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
2061
2062 @trivial_completion
2063 def apply_rbd_mirror(self, spec):
2064 return self._apply(spec)
2065
2066 @trivial_completion
2067 def add_nfs(self, spec):
2068 return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
2069
2070 @trivial_completion
2071 def apply_nfs(self, spec):
2072 return self._apply(spec)
2073
2074 def _get_dashboard_url(self):
2075 # type: () -> str
2076 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2077
2078 @trivial_completion
2079 def add_prometheus(self, spec):
2080 return self._add_daemon('prometheus', spec, self.prometheus_service.create)
2081
2082 @trivial_completion
2083 def apply_prometheus(self, spec):
2084 return self._apply(spec)
2085
2086 @trivial_completion
2087 def add_node_exporter(self, spec):
2088 # type: (ServiceSpec) -> List[str]
2089 return self._add_daemon('node-exporter', spec,
2090 self.node_exporter_service.create)
2091
2092 @trivial_completion
2093 def apply_node_exporter(self, spec):
2094 return self._apply(spec)
2095
2096 @trivial_completion
2097 def add_crash(self, spec):
2098 # type: (ServiceSpec) -> List[str]
2099 return self._add_daemon('crash', spec,
2100 self.crash_service.create)
2101
2102 @trivial_completion
2103 def apply_crash(self, spec):
2104 return self._apply(spec)
2105
2106 @trivial_completion
2107 def add_grafana(self, spec):
2108 # type: (ServiceSpec) -> List[str]
2109 return self._add_daemon('grafana', spec, self.grafana_service.create)
2110
2111 @trivial_completion
2112 def apply_grafana(self, spec: ServiceSpec):
2113 return self._apply(spec)
2114
2115 @trivial_completion
2116 def add_alertmanager(self, spec):
2117 # type: (ServiceSpec) -> List[str]
2118 return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
2119
2120 @trivial_completion
2121 def apply_alertmanager(self, spec: ServiceSpec):
2122 return self._apply(spec)
2123
2124 def _get_container_image_id(self, image_name):
2125 # pick a random host...
2126 host = None
2127 for host_name in self.inventory.keys():
2128 host = host_name
2129 break
2130 if not host:
2131 raise OrchestratorError('no hosts defined')
2132 out, err, code = self._run_cephadm(
2133 host, None, 'pull', [],
2134 image=image_name,
2135 no_fsid=True,
2136 error_ok=True)
2137 if code:
2138 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2139 image_name, host, '\n'.join(out)))
2140 j = json.loads('\n'.join(out))
2141 image_id = j.get('image_id')
2142 ceph_version = j.get('ceph_version')
2143 self.log.debug('image %s -> id %s version %s' %
2144 (image_name, image_id, ceph_version))
2145 return image_id, ceph_version
2146
2147 @trivial_completion
2148 def upgrade_check(self, image, version):
2149 if version:
2150 target_name = self.container_image_base + ':v' + version
2151 elif image:
2152 target_name = image
2153 else:
2154 raise OrchestratorError('must specify either image or version')
2155
2156 target_id, target_version = self._get_container_image_id(target_name)
2157 self.log.debug('Target image %s id %s version %s' % (
2158 target_name, target_id, target_version))
2159 r = {
2160 'target_name': target_name,
2161 'target_id': target_id,
2162 'target_version': target_version,
2163 'needs_update': dict(),
2164 'up_to_date': list(),
2165 }
2166 for host, dm in self.cache.daemons.items():
2167 for name, dd in dm.items():
2168 if target_id == dd.container_image_id:
2169 r['up_to_date'].append(dd.name())
2170 else:
2171 r['needs_update'][dd.name()] = {
2172 'current_name': dd.container_image_name,
2173 'current_id': dd.container_image_id,
2174 'current_version': dd.version,
2175 }
2176 return json.dumps(r, indent=4, sort_keys=True)
2177
2178 @trivial_completion
2179 def upgrade_status(self):
2180 return self.upgrade.upgrade_status()
2181
2182 @trivial_completion
2183 def upgrade_start(self, image, version):
2184 return self.upgrade.upgrade_start(image, version)
2185
2186 @trivial_completion
2187 def upgrade_pause(self):
2188 return self.upgrade.upgrade_pause()
2189
2190 @trivial_completion
2191 def upgrade_resume(self):
2192 return self.upgrade.upgrade_resume()
2193
2194 @trivial_completion
2195 def upgrade_stop(self):
2196 return self.upgrade.upgrade_stop()
2197
2198 @trivial_completion
2199 def remove_osds(self, osd_ids: List[str],
2200 replace: bool = False,
2201 force: bool = False):
2202 """
2203 Takes a list of OSDs and schedules them for removal.
2204 The function that takes care of the actual removal is
2205 _remove_osds_bg().
2206 """
2207
2208 daemons = self.cache.get_daemons_by_service('osd')
2209 found: Set[OSDRemoval] = set()
2210 for daemon in daemons:
2211 if daemon.daemon_id not in osd_ids:
2212 continue
2213 found.add(OSDRemoval(daemon.daemon_id, replace, force,
2214 daemon.hostname, daemon.name(),
2215 datetime.datetime.utcnow(), -1))
2216
2217 not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
2218 if not_found:
2219 raise OrchestratorError('Unable to find OSD: %s' % not_found)
2220
2221 self.rm_util.queue_osds_for_removal(found)
2222
2223 # trigger the serve loop to initiate the removal
2224 self._kick_serve_loop()
2225 return "Scheduled OSD(s) for removal"
2226
2227 @trivial_completion
2228 def remove_osds_status(self):
2229 """
2230 The CLI call to retrieve an osd removal report
2231 """
2232 return self.rm_util.report