]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
4de556f9c6cb90813e59f91eca3c2aac2e7d6df9
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 from collections import defaultdict
5 from contextlib import contextmanager
6 from functools import wraps
7 from tempfile import TemporaryDirectory
8 from threading import Event
9
10 import string
11 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
12 Any, Set, TYPE_CHECKING, cast, Iterator, Union
13
14 import datetime
15 import six
16 import os
17 import random
18 import tempfile
19 import multiprocessing.pool
20 import shutil
21 import subprocess
22
23 from ceph.deployment import inventory
24 from ceph.deployment.drive_group import DriveGroupSpec
25 from ceph.deployment.service_spec import \
26 NFSServiceSpec, RGWSpec, ServiceSpec, PlacementSpec, assert_valid_host
27 from cephadm.services.cephadmservice import CephadmDaemonSpec
28
29 from mgr_module import MgrModule, HandleCommandResult
30 import orchestrator
31 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
32 CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
33 from orchestrator._interface import GenericSpec
34
35 from . import remotes
36 from . import utils
37 from .migrations import Migrations
38 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
39 RbdMirrorService, CrashService, CephadmService
40 from .services.iscsi import IscsiService
41 from .services.nfs import NFSService
42 from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
43 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
44 NodeExporterService
45 from .schedule import HostAssignment, HostPlacementSpec
46 from .inventory import Inventory, SpecStore, HostCache, EventStore
47 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
48 from .template import TemplateMgr
49 from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
50
51 try:
52 import remoto
53 # NOTE(mattoliverau) Patch remoto until remoto PR
54 # (https://github.com/alfredodeza/remoto/pull/56) lands
55 from distutils.version import StrictVersion
56 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
57 def remoto_has_connection(self):
58 return self.gateway.hasreceiver()
59
60 from remoto.backends import BaseConnection
61 BaseConnection.has_connection = remoto_has_connection
62 import remoto.process
63 import execnet.gateway_bootstrap
64 except ImportError as e:
65 remoto = None
66 remoto_import_error = str(e)
67
68 try:
69 from typing import List
70 except ImportError:
71 pass
72
73 logger = logging.getLogger(__name__)
74
75 T = TypeVar('T')
76
77 DEFAULT_SSH_CONFIG = """
78 Host *
79 User root
80 StrictHostKeyChecking no
81 UserKnownHostsFile /dev/null
82 ConnectTimeout=30
83 """
84
85 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
86 CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
87
88 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
89
90
91 class CephadmCompletion(orchestrator.Completion[T]):
92 def evaluate(self):
93 self.finalize(None)
94
95 def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
96 """
97 Decorator to make CephadmCompletion methods return
98 a completion object that executes themselves.
99 """
100
101 @wraps(f)
102 def wrapper(*args, **kwargs):
103 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
104
105 return wrapper
106
107
108 @six.add_metaclass(CLICommandMeta)
109 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
110
111 _STORE_HOST_PREFIX = "host"
112
113 instance = None
114 NATIVE_OPTIONS = [] # type: List[Any]
115 MODULE_OPTIONS: List[dict] = [
116 {
117 'name': 'ssh_config_file',
118 'type': 'str',
119 'default': None,
120 'desc': 'customized SSH config file to connect to managed hosts',
121 },
122 {
123 'name': 'device_cache_timeout',
124 'type': 'secs',
125 'default': 30 * 60,
126 'desc': 'seconds to cache device inventory',
127 },
128 {
129 'name': 'daemon_cache_timeout',
130 'type': 'secs',
131 'default': 10 * 60,
132 'desc': 'seconds to cache service (daemon) inventory',
133 },
134 {
135 'name': 'host_check_interval',
136 'type': 'secs',
137 'default': 10 * 60,
138 'desc': 'how frequently to perform a host check',
139 },
140 {
141 'name': 'mode',
142 'type': 'str',
143 'enum_allowed': ['root', 'cephadm-package'],
144 'default': 'root',
145 'desc': 'mode for remote execution of cephadm',
146 },
147 {
148 'name': 'container_image_base',
149 'default': 'docker.io/ceph/ceph',
150 'desc': 'Container image name, without the tag',
151 'runtime': True,
152 },
153 {
154 'name': 'container_image_prometheus',
155 'default': 'prom/prometheus:v2.18.1',
156 'desc': 'Prometheus container image',
157 },
158 {
159 'name': 'container_image_grafana',
160 'default': 'ceph/ceph-grafana:6.6.2',
161 'desc': 'Prometheus container image',
162 },
163 {
164 'name': 'container_image_alertmanager',
165 'default': 'prom/alertmanager:v0.20.0',
166 'desc': 'Prometheus container image',
167 },
168 {
169 'name': 'container_image_node_exporter',
170 'default': 'prom/node-exporter:v0.18.1',
171 'desc': 'Prometheus container image',
172 },
173 {
174 'name': 'warn_on_stray_hosts',
175 'type': 'bool',
176 'default': True,
177 'desc': 'raise a health warning if daemons are detected on a host '
178 'that is not managed by cephadm',
179 },
180 {
181 'name': 'warn_on_stray_daemons',
182 'type': 'bool',
183 'default': True,
184 'desc': 'raise a health warning if daemons are detected '
185 'that are not managed by cephadm',
186 },
187 {
188 'name': 'warn_on_failed_host_check',
189 'type': 'bool',
190 'default': True,
191 'desc': 'raise a health warning if the host check fails',
192 },
193 {
194 'name': 'log_to_cluster',
195 'type': 'bool',
196 'default': True,
197 'desc': 'log to the "cephadm" cluster log channel"',
198 },
199 {
200 'name': 'allow_ptrace',
201 'type': 'bool',
202 'default': False,
203 'desc': 'allow SYS_PTRACE capability on ceph containers',
204 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
205 'process with gdb or strace. Enabling this options '
206 'can allow debugging daemons that encounter problems '
207 'at runtime.',
208 },
209 {
210 'name': 'prometheus_alerts_path',
211 'type': 'str',
212 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
213 'desc': 'location of alerts to include in prometheus deployments',
214 },
215 {
216 'name': 'migration_current',
217 'type': 'int',
218 'default': None,
219 'desc': 'internal - do not modify',
220 # used to track track spec and other data migrations.
221 },
222 {
223 'name': 'config_dashboard',
224 'type': 'bool',
225 'default': True,
226 'desc': 'manage configs like API endpoints in Dashboard.'
227 },
228 {
229 'name': 'manage_etc_ceph_ceph_conf',
230 'type': 'bool',
231 'default': False,
232 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
233 },
234 {
235 'name': 'registry_url',
236 'type': 'str',
237 'default': None,
238 'desc': 'Custom repository url'
239 },
240 {
241 'name': 'registry_username',
242 'type': 'str',
243 'default': None,
244 'desc': 'Custom repository username'
245 },
246 {
247 'name': 'registry_password',
248 'type': 'str',
249 'default': None,
250 'desc': 'Custom repository password'
251 },
252 ]
253
254 def __init__(self, *args, **kwargs):
255 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
256 self._cluster_fsid = self.get('mon_map')['fsid']
257 self.last_monmap: Optional[datetime.datetime] = None
258
259 # for serve()
260 self.run = True
261 self.event = Event()
262
263 if self.get_store('pause'):
264 self.paused = True
265 else:
266 self.paused = False
267
268 # for mypy which does not run the code
269 if TYPE_CHECKING:
270 self.ssh_config_file = None # type: Optional[str]
271 self.device_cache_timeout = 0
272 self.daemon_cache_timeout = 0
273 self.host_check_interval = 0
274 self.mode = ''
275 self.container_image_base = ''
276 self.container_image_prometheus = ''
277 self.container_image_grafana = ''
278 self.container_image_alertmanager = ''
279 self.container_image_node_exporter = ''
280 self.warn_on_stray_hosts = True
281 self.warn_on_stray_daemons = True
282 self.warn_on_failed_host_check = True
283 self.allow_ptrace = False
284 self.prometheus_alerts_path = ''
285 self.migration_current = None
286 self.config_dashboard = True
287 self.manage_etc_ceph_ceph_conf = True
288 self.registry_url: Optional[str] = None
289 self.registry_username: Optional[str] = None
290 self.registry_password: Optional[str] = None
291
292 self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
293
294
295 self.notify('mon_map', None)
296 self.config_notify()
297
298 path = self.get_ceph_option('cephadm_path')
299 try:
300 with open(path, 'r') as f:
301 self._cephadm = f.read()
302 except (IOError, TypeError) as e:
303 raise RuntimeError("unable to read cephadm at '%s': %s" % (
304 path, str(e)))
305
306 self._worker_pool = multiprocessing.pool.ThreadPool(10)
307
308 self._reconfig_ssh()
309
310 CephadmOrchestrator.instance = self
311
312 self.upgrade = CephadmUpgrade(self)
313
314 self.health_checks = {}
315
316 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
317
318 self.inventory = Inventory(self)
319
320 self.cache = HostCache(self)
321 self.cache.load()
322
323 self.rm_util = RemoveUtil(self)
324 self.to_remove_osds = OSDQueue()
325 self.rm_util.load_from_store()
326
327 self.spec_store = SpecStore(self)
328 self.spec_store.load()
329
330 # ensure the host lists are in sync
331 for h in self.inventory.keys():
332 if h not in self.cache.daemons:
333 self.cache.prime_empty_host(h)
334 for h in self.cache.get_hosts():
335 if h not in self.inventory:
336 self.cache.rm_host(h)
337
338
339 # in-memory only.
340 self.events = EventStore(self)
341 self.offline_hosts: Set[str] = set()
342
343 self.migration = Migrations(self)
344
345 # services:
346 self.osd_service = OSDService(self)
347 self.nfs_service = NFSService(self)
348 self.mon_service = MonService(self)
349 self.mgr_service = MgrService(self)
350 self.mds_service = MdsService(self)
351 self.rgw_service = RgwService(self)
352 self.rbd_mirror_service = RbdMirrorService(self)
353 self.grafana_service = GrafanaService(self)
354 self.alertmanager_service = AlertmanagerService(self)
355 self.prometheus_service = PrometheusService(self)
356 self.node_exporter_service = NodeExporterService(self)
357 self.crash_service = CrashService(self)
358 self.iscsi_service = IscsiService(self)
359 self.cephadm_services = {
360 'mon': self.mon_service,
361 'mgr': self.mgr_service,
362 'osd': self.osd_service,
363 'mds': self.mds_service,
364 'rgw': self.rgw_service,
365 'rbd-mirror': self.rbd_mirror_service,
366 'nfs': self.nfs_service,
367 'grafana': self.grafana_service,
368 'alertmanager': self.alertmanager_service,
369 'prometheus': self.prometheus_service,
370 'node-exporter': self.node_exporter_service,
371 'crash': self.crash_service,
372 'iscsi': self.iscsi_service,
373 }
374
375 self.template = TemplateMgr()
376
377 self.requires_post_actions = set()
378
379 def shutdown(self):
380 self.log.debug('shutdown')
381 self._worker_pool.close()
382 self._worker_pool.join()
383 self.run = False
384 self.event.set()
385
386 def _get_cephadm_service(self, service_type: str) -> CephadmService:
387 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
388 return self.cephadm_services[service_type]
389
390 def _kick_serve_loop(self):
391 self.log.debug('_kick_serve_loop')
392 self.event.set()
393
394 # function responsible for logging single host into custom registry
395 def _registry_login(self, host, url, username, password):
396 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
397 # want to pass info over stdin rather than through normal list of args
398 args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
399 " \"password\": \"" + password + "\"}")
400 out, err, code = self._run_cephadm(
401 host, 'mon', 'registry-login',
402 ['--registry-json', '-'], stdin=args_str, error_ok=True)
403 if code:
404 return f"Host {host} failed to login to {url} as {username} with given password"
405 return
406
407
408 def _check_host(self, host):
409 if host not in self.inventory:
410 return
411 self.log.debug(' checking %s' % host)
412 try:
413 out, err, code = self._run_cephadm(
414 host, cephadmNoImage, 'check-host', [],
415 error_ok=True, no_fsid=True)
416 self.cache.update_last_host_check(host)
417 self.cache.save_host(host)
418 if code:
419 self.log.debug(' host %s failed check' % host)
420 if self.warn_on_failed_host_check:
421 return 'host %s failed check: %s' % (host, err)
422 else:
423 self.log.debug(' host %s ok' % host)
424 except Exception as e:
425 self.log.debug(' host %s failed check' % host)
426 return 'host %s failed check: %s' % (host, e)
427
428 def _check_for_strays(self):
429 self.log.debug('_check_for_strays')
430 for k in ['CEPHADM_STRAY_HOST',
431 'CEPHADM_STRAY_DAEMON']:
432 if k in self.health_checks:
433 del self.health_checks[k]
434 if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
435 ls = self.list_servers()
436 managed = self.cache.get_daemon_names()
437 host_detail = [] # type: List[str]
438 host_num_daemons = 0
439 daemon_detail = [] # type: List[str]
440 for item in ls:
441 host = item.get('hostname')
442 daemons = item.get('services') # misnomer!
443 missing_names = []
444 for s in daemons:
445 name = '%s.%s' % (s.get('type'), s.get('id'))
446 if host not in self.inventory:
447 missing_names.append(name)
448 host_num_daemons += 1
449 if name not in managed:
450 daemon_detail.append(
451 'stray daemon %s on host %s not managed by cephadm' % (name, host))
452 if missing_names:
453 host_detail.append(
454 'stray host %s has %d stray daemons: %s' % (
455 host, len(missing_names), missing_names))
456 if self.warn_on_stray_hosts and host_detail:
457 self.health_checks['CEPHADM_STRAY_HOST'] = {
458 'severity': 'warning',
459 'summary': '%d stray host(s) with %s daemon(s) '
460 'not managed by cephadm' % (
461 len(host_detail), host_num_daemons),
462 'count': len(host_detail),
463 'detail': host_detail,
464 }
465 if self.warn_on_stray_daemons and daemon_detail:
466 self.health_checks['CEPHADM_STRAY_DAEMON'] = {
467 'severity': 'warning',
468 'summary': '%d stray daemons(s) not managed by cephadm' % (
469 len(daemon_detail)),
470 'count': len(daemon_detail),
471 'detail': daemon_detail,
472 }
473 self.set_health_checks(self.health_checks)
474
475 def _serve_sleep(self):
476 sleep_interval = 600
477 self.log.debug('Sleeping for %d seconds', sleep_interval)
478 ret = self.event.wait(sleep_interval)
479 self.event.clear()
480
481 def serve(self) -> None:
482 """
483 The main loop of cephadm.
484
485 A command handler will typically change the declarative state
486 of cephadm. This loop will then attempt to apply this new state.
487 """
488 self.log.debug("serve starting")
489 while self.run:
490
491 try:
492
493 # refresh daemons
494 self.log.debug('refreshing hosts and daemons')
495 self._refresh_hosts_and_daemons()
496
497 self._check_for_strays()
498
499 self._update_paused_health()
500
501 if not self.paused:
502 self.rm_util.process_removal_queue()
503
504 self.migration.migrate()
505 if self.migration.is_migration_ongoing():
506 continue
507
508 if self._apply_all_services():
509 continue # did something, refresh
510
511 self._check_daemons()
512
513 if self.upgrade.continue_upgrade():
514 continue
515
516 except OrchestratorError as e:
517 if e.event_subject:
518 self.events.from_orch_error(e)
519
520 self._serve_sleep()
521 self.log.debug("serve exit")
522
523 def _update_paused_health(self):
524 if self.paused:
525 self.health_checks['CEPHADM_PAUSED'] = {
526 'severity': 'warning',
527 'summary': 'cephadm background work is paused',
528 'count': 1,
529 'detail': ["'ceph orch resume' to resume"],
530 }
531 self.set_health_checks(self.health_checks)
532 else:
533 if 'CEPHADM_PAUSED' in self.health_checks:
534 del self.health_checks['CEPHADM_PAUSED']
535 self.set_health_checks(self.health_checks)
536
537 def config_notify(self):
538 """
539 This method is called whenever one of our config options is changed.
540
541 TODO: this method should be moved into mgr_module.py
542 """
543 for opt in self.MODULE_OPTIONS:
544 setattr(self,
545 opt['name'], # type: ignore
546 self.get_module_option(opt['name'])) # type: ignore
547 self.log.debug(' mgr option %s = %s',
548 opt['name'], getattr(self, opt['name'])) # type: ignore
549 for opt in self.NATIVE_OPTIONS:
550 setattr(self,
551 opt, # type: ignore
552 self.get_ceph_option(opt))
553 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
554
555 self.event.set()
556
557 def notify(self, notify_type, notify_id):
558 if notify_type == "mon_map":
559 # get monmap mtime so we can refresh configs when mons change
560 monmap = self.get('mon_map')
561 self.last_monmap = datetime.datetime.strptime(
562 monmap['modified'], CEPH_DATEFMT)
563 if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
564 self.last_monmap = None # just in case clocks are skewed
565 if notify_type == "pg_summary":
566 self._trigger_osd_removal()
567
568 def _trigger_osd_removal(self):
569 data = self.get("osd_stats")
570 for osd in data.get('osd_stats', []):
571 if osd.get('num_pgs') == 0:
572 # if _ANY_ osd that is currently in the queue appears to be empty,
573 # start the removal process
574 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
575 self.log.debug(f"Found empty osd. Starting removal process")
576 # if the osd that is now empty is also part of the removal queue
577 # start the process
578 self.rm_util.process_removal_queue()
579
580 def pause(self):
581 if not self.paused:
582 self.log.info('Paused')
583 self.set_store('pause', 'true')
584 self.paused = True
585 # wake loop so we update the health status
586 self._kick_serve_loop()
587
588 def resume(self):
589 if self.paused:
590 self.log.info('Resumed')
591 self.paused = False
592 self.set_store('pause', None)
593 # unconditionally wake loop so that 'orch resume' can be used to kick
594 # cephadm
595 self._kick_serve_loop()
596
597 def get_unique_name(self, daemon_type, host, existing, prefix=None,
598 forcename=None):
599 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
600 """
601 Generate a unique random service name
602 """
603 suffix = daemon_type not in [
604 'mon', 'crash', 'nfs',
605 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
606 ]
607 if forcename:
608 if len([d for d in existing if d.daemon_id == forcename]):
609 raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{forcename} already in use')
610 return forcename
611
612 if '.' in host:
613 host = host.split('.')[0]
614 while True:
615 if prefix:
616 name = prefix + '.'
617 else:
618 name = ''
619 name += host
620 if suffix:
621 name += '.' + ''.join(random.choice(string.ascii_lowercase)
622 for _ in range(6))
623 if len([d for d in existing if d.daemon_id == name]):
624 if not suffix:
625 raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{name} already in use')
626 self.log.debug('name %s exists, trying again', name)
627 continue
628 return name
629
630 def _reconfig_ssh(self):
631 temp_files = [] # type: list
632 ssh_options = [] # type: List[str]
633
634 # ssh_config
635 ssh_config_fname = self.ssh_config_file
636 ssh_config = self.get_store("ssh_config")
637 if ssh_config is not None or ssh_config_fname is None:
638 if not ssh_config:
639 ssh_config = DEFAULT_SSH_CONFIG
640 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
641 os.fchmod(f.fileno(), 0o600)
642 f.write(ssh_config.encode('utf-8'))
643 f.flush() # make visible to other processes
644 temp_files += [f]
645 ssh_config_fname = f.name
646 if ssh_config_fname:
647 self.validate_ssh_config_fname(ssh_config_fname)
648 ssh_options += ['-F', ssh_config_fname]
649 self.ssh_config = ssh_config
650
651 # identity
652 ssh_key = self.get_store("ssh_identity_key")
653 ssh_pub = self.get_store("ssh_identity_pub")
654 self.ssh_pub = ssh_pub
655 self.ssh_key = ssh_key
656 if ssh_key and ssh_pub:
657 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
658 tkey.write(ssh_key.encode('utf-8'))
659 os.fchmod(tkey.fileno(), 0o600)
660 tkey.flush() # make visible to other processes
661 tpub = open(tkey.name + '.pub', 'w')
662 os.fchmod(tpub.fileno(), 0o600)
663 tpub.write(ssh_pub)
664 tpub.flush() # make visible to other processes
665 temp_files += [tkey, tpub]
666 ssh_options += ['-i', tkey.name]
667
668 self._temp_files = temp_files
669 if ssh_options:
670 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
671 else:
672 self._ssh_options = None
673
674 if self.mode == 'root':
675 self.ssh_user = self.get_store('ssh_user', default='root')
676 elif self.mode == 'cephadm-package':
677 self.ssh_user = 'cephadm'
678
679 self._reset_cons()
680
681 def validate_ssh_config_fname(self, ssh_config_fname):
682 if not os.path.isfile(ssh_config_fname):
683 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
684 ssh_config_fname))
685
686 def _reset_con(self, host):
687 conn, r = self._cons.get(host, (None, None))
688 if conn:
689 self.log.debug('_reset_con close %s' % host)
690 conn.exit()
691 del self._cons[host]
692
693 def _reset_cons(self):
694 for host, conn_and_r in self._cons.items():
695 self.log.debug('_reset_cons close %s' % host)
696 conn, r = conn_and_r
697 conn.exit()
698 self._cons = {}
699
700 def offline_hosts_remove(self, host):
701 if host in self.offline_hosts:
702 self.offline_hosts.remove(host)
703
704
705 @staticmethod
706 def can_run():
707 if remoto is not None:
708 return True, ""
709 else:
710 return False, "loading remoto library:{}".format(
711 remoto_import_error)
712
713 def available(self):
714 """
715 The cephadm orchestrator is always available.
716 """
717 ok, err = self.can_run()
718 if not ok:
719 return ok, err
720 if not self.ssh_key or not self.ssh_pub:
721 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
722 return True, ''
723
724 def process(self, completions):
725 """
726 Does nothing, as completions are processed in another thread.
727 """
728 if completions:
729 self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
730
731 for p in completions:
732 p.evaluate()
733
734 @orchestrator._cli_write_command(
735 prefix='cephadm set-ssh-config',
736 desc='Set the ssh_config file (use -i <ssh_config>)')
737 def _set_ssh_config(self, inbuf=None):
738 """
739 Set an ssh_config file provided from stdin
740
741 TODO:
742 - validation
743 """
744 if inbuf is None or len(inbuf) == 0:
745 return -errno.EINVAL, "", "empty ssh config provided"
746 if inbuf == self.ssh_config:
747 return 0, "value unchanged", ""
748 self.set_store("ssh_config", inbuf)
749 self.log.info('Set ssh_config')
750 self._reconfig_ssh()
751 return 0, "", ""
752
753 @orchestrator._cli_write_command(
754 prefix='cephadm clear-ssh-config',
755 desc='Clear the ssh_config file')
756 def _clear_ssh_config(self):
757 """
758 Clear the ssh_config file provided from stdin
759 """
760 self.set_store("ssh_config", None)
761 self.ssh_config_tmp = None
762 self.log.info('Cleared ssh_config')
763 self._reconfig_ssh()
764 return 0, "", ""
765
766 @orchestrator._cli_read_command(
767 prefix='cephadm get-ssh-config',
768 desc='Returns the ssh config as used by cephadm'
769 )
770 def _get_ssh_config(self):
771 if self.ssh_config_file:
772 self.validate_ssh_config_fname(self.ssh_config_file)
773 with open(self.ssh_config_file) as f:
774 return HandleCommandResult(stdout=f.read())
775 ssh_config = self.get_store("ssh_config")
776 if ssh_config:
777 return HandleCommandResult(stdout=ssh_config)
778 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
779
780
781 @orchestrator._cli_write_command(
782 'cephadm generate-key',
783 desc='Generate a cluster SSH key (if not present)')
784 def _generate_key(self):
785 if not self.ssh_pub or not self.ssh_key:
786 self.log.info('Generating ssh key...')
787 tmp_dir = TemporaryDirectory()
788 path = tmp_dir.name + '/key'
789 try:
790 subprocess.check_call([
791 '/usr/bin/ssh-keygen',
792 '-C', 'ceph-%s' % self._cluster_fsid,
793 '-N', '',
794 '-f', path
795 ])
796 with open(path, 'r') as f:
797 secret = f.read()
798 with open(path + '.pub', 'r') as f:
799 pub = f.read()
800 finally:
801 os.unlink(path)
802 os.unlink(path + '.pub')
803 tmp_dir.cleanup()
804 self.set_store('ssh_identity_key', secret)
805 self.set_store('ssh_identity_pub', pub)
806 self._reconfig_ssh()
807 return 0, '', ''
808
809 @orchestrator._cli_write_command(
810 'cephadm set-priv-key',
811 desc='Set cluster SSH private key (use -i <private_key>)')
812 def _set_priv_key(self, inbuf=None):
813 if inbuf is None or len(inbuf) == 0:
814 return -errno.EINVAL, "", "empty private ssh key provided"
815 if inbuf == self.ssh_key:
816 return 0, "value unchanged", ""
817 self.set_store("ssh_identity_key", inbuf)
818 self.log.info('Set ssh private key')
819 self._reconfig_ssh()
820 return 0, "", ""
821
822 @orchestrator._cli_write_command(
823 'cephadm set-pub-key',
824 desc='Set cluster SSH public key (use -i <public_key>)')
825 def _set_pub_key(self, inbuf=None):
826 if inbuf is None or len(inbuf) == 0:
827 return -errno.EINVAL, "", "empty public ssh key provided"
828 if inbuf == self.ssh_pub:
829 return 0, "value unchanged", ""
830 self.set_store("ssh_identity_pub", inbuf)
831 self.log.info('Set ssh public key')
832 self._reconfig_ssh()
833 return 0, "", ""
834
835 @orchestrator._cli_write_command(
836 'cephadm clear-key',
837 desc='Clear cluster SSH key')
838 def _clear_key(self):
839 self.set_store('ssh_identity_key', None)
840 self.set_store('ssh_identity_pub', None)
841 self._reconfig_ssh()
842 self.log.info('Cleared cluster SSH key')
843 return 0, '', ''
844
845 @orchestrator._cli_read_command(
846 'cephadm get-pub-key',
847 desc='Show SSH public key for connecting to cluster hosts')
848 def _get_pub_key(self):
849 if self.ssh_pub:
850 return 0, self.ssh_pub, ''
851 else:
852 return -errno.ENOENT, '', 'No cluster SSH key defined'
853
854 @orchestrator._cli_read_command(
855 'cephadm get-user',
856 desc='Show user for SSHing to cluster hosts')
857 def _get_user(self):
858 return 0, self.ssh_user, ''
859
860 @orchestrator._cli_read_command(
861 'cephadm set-user',
862 'name=user,type=CephString',
863 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
864 def set_ssh_user(self, user):
865 current_user = self.ssh_user
866 if user == current_user:
867 return 0, "value unchanged", ""
868
869 self.set_store('ssh_user', user)
870 self._reconfig_ssh()
871
872 host = self.cache.get_hosts()[0]
873 r = self._check_host(host)
874 if r is not None:
875 #connection failed reset user
876 self.set_store('ssh_user', current_user)
877 self._reconfig_ssh()
878 return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
879
880 msg = 'ssh user set to %s' % user
881 if user != 'root':
882 msg += ' sudo will be used'
883 self.log.info(msg)
884 return 0, msg, ''
885
886 @orchestrator._cli_read_command(
887 'cephadm registry-login',
888 "name=url,type=CephString,req=false "
889 "name=username,type=CephString,req=false "
890 "name=password,type=CephString,req=false",
891 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
892 def registry_login(self, url=None, username=None, password=None, inbuf=None):
893 # if password not given in command line, get it through file input
894 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
895 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
896 "or -i <login credentials json file>")
897 elif not (url and username and password):
898 login_info = json.loads(inbuf)
899 if "url" in login_info and "username" in login_info and "password" in login_info:
900 url = login_info["url"]
901 username = login_info["username"]
902 password = login_info["password"]
903 else:
904 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
905 "Please setup json file as\n"
906 "{\n"
907 " \"url\": \"REGISTRY_URL\",\n"
908 " \"username\": \"REGISTRY_USERNAME\",\n"
909 " \"password\": \"REGISTRY_PASSWORD\"\n"
910 "}\n")
911 # verify login info works by attempting login on random host
912 host = None
913 for host_name in self.inventory.keys():
914 host = host_name
915 break
916 if not host:
917 raise OrchestratorError('no hosts defined')
918 r = self._registry_login(host, url, username, password)
919 if r is not None:
920 return 1, '', r
921 # if logins succeeded, store info
922 self.log.debug("Host logins successful. Storing login info.")
923 self.set_module_option('registry_url', url)
924 self.set_module_option('registry_username', username)
925 self.set_module_option('registry_password', password)
926 # distribute new login info to all hosts
927 self.cache.distribute_new_registry_login_info()
928 return 0, "registry login scheduled", ''
929
930 @orchestrator._cli_read_command(
931 'cephadm check-host',
932 'name=host,type=CephString '
933 'name=addr,type=CephString,req=false',
934 'Check whether we can access and manage a remote host')
935 def check_host(self, host, addr=None):
936 try:
937 out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
938 ['--expect-hostname', host],
939 addr=addr,
940 error_ok=True, no_fsid=True)
941 if code:
942 return 1, '', ('check-host failed:\n' + '\n'.join(err))
943 except OrchestratorError as e:
944 self.log.exception(f"check-host failed for '{host}'")
945 return 1, '', ('check-host failed:\n' +
946 f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
947 # if we have an outstanding health alert for this host, give the
948 # serve thread a kick
949 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
950 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
951 if item.startswith('host %s ' % host):
952 self.event.set()
953 return 0, '%s (%s) ok' % (host, addr), err
954
955 @orchestrator._cli_read_command(
956 'cephadm prepare-host',
957 'name=host,type=CephString '
958 'name=addr,type=CephString,req=false',
959 'Prepare a remote host for use with cephadm')
960 def _prepare_host(self, host, addr=None):
961 out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
962 ['--expect-hostname', host],
963 addr=addr,
964 error_ok=True, no_fsid=True)
965 if code:
966 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
967 # if we have an outstanding health alert for this host, give the
968 # serve thread a kick
969 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
970 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
971 if item.startswith('host %s ' % host):
972 self.event.set()
973 return 0, '%s (%s) ok' % (host, addr), err
974
975 def _get_connection(self, host: str):
976 """
977 Setup a connection for running commands on remote host.
978 """
979 conn, r = self._cons.get(host, (None, None))
980 if conn:
981 if conn.has_connection():
982 self.log.debug('Have connection to %s' % host)
983 return conn, r
984 else:
985 self._reset_con(host)
986 n = self.ssh_user + '@' + host
987 self.log.debug("Opening connection to {} with ssh options '{}'".format(
988 n, self._ssh_options))
989 child_logger=self.log.getChild(n)
990 child_logger.setLevel('WARNING')
991 conn = remoto.Connection(
992 n,
993 logger=child_logger,
994 ssh_options=self._ssh_options,
995 sudo=True if self.ssh_user != 'root' else False)
996
997 r = conn.import_module(remotes)
998 self._cons[host] = conn, r
999
1000 return conn, r
1001
1002 def _executable_path(self, conn, executable):
1003 """
1004 Remote validator that accepts a connection object to ensure that a certain
1005 executable is available returning its full path if so.
1006
1007 Otherwise an exception with thorough details will be raised, informing the
1008 user that the executable was not found.
1009 """
1010 executable_path = conn.remote_module.which(executable)
1011 if not executable_path:
1012 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1013 executable, conn.hostname))
1014 self.log.debug("Found executable '{}' at path '{}'".format(executable,
1015 executable_path))
1016 return executable_path
1017
1018 @contextmanager
1019 def _remote_connection(self,
1020 host: str,
1021 addr: Optional[str]=None,
1022 ) -> Iterator[Tuple["BaseConnection", Any]]:
1023 if not addr and host in self.inventory:
1024 addr = self.inventory.get_addr(host)
1025
1026 self.offline_hosts_remove(host)
1027
1028 try:
1029 try:
1030 if not addr:
1031 raise OrchestratorError("host address is empty")
1032 conn, connr = self._get_connection(addr)
1033 except OSError as e:
1034 self._reset_con(host)
1035 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1036 raise execnet.gateway_bootstrap.HostNotFound(msg)
1037
1038 yield (conn, connr)
1039
1040 except execnet.gateway_bootstrap.HostNotFound as e:
1041 # this is a misleading exception as it seems to be thrown for
1042 # any sort of connection failure, even those having nothing to
1043 # do with "host not found" (e.g., ssh key permission denied).
1044 self.offline_hosts.add(host)
1045 self._reset_con(host)
1046
1047 user = self.ssh_user if self.mode == 'root' else 'cephadm'
1048 msg = f'''Failed to connect to {host} ({addr}).
1049 Check that the host is reachable and accepts connections using the cephadm SSH key
1050
1051 you may want to run:
1052 > ceph cephadm get-ssh-config > ssh_config
1053 > ceph config-key get mgr/cephadm/ssh_identity_key > key
1054 > ssh -F ssh_config -i key {user}@{host}'''
1055 raise OrchestratorError(msg) from e
1056 except Exception as ex:
1057 self.log.exception(ex)
1058 raise
1059
1060 def _get_container_image(self, daemon_name: str) -> str:
1061 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1062 if daemon_type in CEPH_TYPES or \
1063 daemon_type == 'nfs' or \
1064 daemon_type == 'iscsi':
1065 # get container image
1066 ret, image, err = self.check_mon_command({
1067 'prefix': 'config get',
1068 'who': utils.name_to_config_section(daemon_name),
1069 'key': 'container_image',
1070 })
1071 image = image.strip() # type: ignore
1072 elif daemon_type == 'prometheus':
1073 image = self.container_image_prometheus
1074 elif daemon_type == 'grafana':
1075 image = self.container_image_grafana
1076 elif daemon_type == 'alertmanager':
1077 image = self.container_image_alertmanager
1078 elif daemon_type == 'node-exporter':
1079 image = self.container_image_node_exporter
1080 else:
1081 assert False, daemon_type
1082
1083 self.log.debug('%s container image %s' % (daemon_name, image))
1084
1085 return image
1086
1087 def _run_cephadm(self,
1088 host: str,
1089 entity: Union[CephadmNoImage, str],
1090 command: str,
1091 args: List[str],
1092 addr: Optional[str] = "",
1093 stdin: Optional[str] = "",
1094 no_fsid: Optional[bool] = False,
1095 error_ok: Optional[bool] = False,
1096 image: Optional[str] = "",
1097 env_vars: Optional[List[str]]= None,
1098 ) -> Tuple[List[str], List[str], int]:
1099 """
1100 Run cephadm on the remote host with the given command + args
1101
1102 :env_vars: in format -> [KEY=VALUE, ..]
1103 """
1104 with self._remote_connection(host, addr) as tpl:
1105 conn, connr = tpl
1106 assert image or entity
1107 if not image and entity is not cephadmNoImage:
1108 image = self._get_container_image(entity)
1109
1110 final_args = []
1111
1112 if env_vars:
1113 for env_var_pair in env_vars:
1114 final_args.extend(['--env', env_var_pair])
1115
1116 if image:
1117 final_args.extend(['--image', image])
1118 final_args.append(command)
1119
1120 if not no_fsid:
1121 final_args += ['--fsid', self._cluster_fsid]
1122 final_args += args
1123
1124 self.log.debug('args: %s' % (' '.join(final_args)))
1125 if self.mode == 'root':
1126 if stdin:
1127 self.log.debug('stdin: %s' % stdin)
1128 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1129 if stdin:
1130 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1131 script += self._cephadm
1132 python = connr.choose_python()
1133 if not python:
1134 raise RuntimeError(
1135 'unable to find python on %s (tried %s in %s)' % (
1136 host, remotes.PYTHONS, remotes.PATH))
1137 try:
1138 out, err, code = remoto.process.check(
1139 conn,
1140 [python, '-u'],
1141 stdin=script.encode('utf-8'))
1142 except RuntimeError as e:
1143 self._reset_con(host)
1144 if error_ok:
1145 return [], [str(e)], 1
1146 raise
1147 elif self.mode == 'cephadm-package':
1148 try:
1149 out, err, code = remoto.process.check(
1150 conn,
1151 ['sudo', '/usr/bin/cephadm'] + final_args,
1152 stdin=stdin)
1153 except RuntimeError as e:
1154 self._reset_con(host)
1155 if error_ok:
1156 return [], [str(e)], 1
1157 raise
1158 else:
1159 assert False, 'unsupported mode'
1160
1161 self.log.debug('code: %d' % code)
1162 if out:
1163 self.log.debug('out: %s' % '\n'.join(out))
1164 if err:
1165 self.log.debug('err: %s' % '\n'.join(err))
1166 if code and not error_ok:
1167 raise OrchestratorError(
1168 'cephadm exited with an error code: %d, stderr:%s' % (
1169 code, '\n'.join(err)))
1170 return out, err, code
1171
1172
1173 def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
1174 return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
1175
1176 def _add_host(self, spec):
1177 # type: (HostSpec) -> str
1178 """
1179 Add a host to be managed by the orchestrator.
1180
1181 :param host: host name
1182 """
1183 assert_valid_host(spec.hostname)
1184 out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
1185 ['--expect-hostname', spec.hostname],
1186 addr=spec.addr,
1187 error_ok=True, no_fsid=True)
1188 if code:
1189 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1190 spec.hostname, spec.addr, err))
1191
1192 self.inventory.add_host(spec)
1193 self.cache.prime_empty_host(spec.hostname)
1194 self.offline_hosts_remove(spec.hostname)
1195 self.event.set() # refresh stray health check
1196 self.log.info('Added host %s' % spec.hostname)
1197 return "Added host '{}'".format(spec.hostname)
1198
1199 @trivial_completion
1200 def add_host(self, spec: HostSpec) -> str:
1201 return self._add_host(spec)
1202
1203 @trivial_completion
1204 def remove_host(self, host):
1205 # type: (str) -> str
1206 """
1207 Remove a host from orchestrator management.
1208
1209 :param host: host name
1210 """
1211 self.inventory.rm_host(host)
1212 self.cache.rm_host(host)
1213 self._reset_con(host)
1214 self.event.set() # refresh stray health check
1215 self.log.info('Removed host %s' % host)
1216 return "Removed host '{}'".format(host)
1217
1218 @trivial_completion
1219 def update_host_addr(self, host, addr) -> str:
1220 self.inventory.set_addr(host, addr)
1221 self._reset_con(host)
1222 self.event.set() # refresh stray health check
1223 self.log.info('Set host %s addr to %s' % (host, addr))
1224 return "Updated host '{}' addr to '{}'".format(host, addr)
1225
1226 @trivial_completion
1227 def get_hosts(self):
1228 # type: () -> List[orchestrator.HostSpec]
1229 """
1230 Return a list of hosts managed by the orchestrator.
1231
1232 Notes:
1233 - skip async: manager reads from cache.
1234 """
1235 return list(self.inventory.all_specs())
1236
1237 @trivial_completion
1238 def add_host_label(self, host, label) -> str:
1239 self.inventory.add_label(host, label)
1240 self.log.info('Added label %s to host %s' % (label, host))
1241 return 'Added label %s to host %s' % (label, host)
1242
1243 @trivial_completion
1244 def remove_host_label(self, host, label) -> str:
1245 self.inventory.rm_label(host, label)
1246 self.log.info('Removed label %s to host %s' % (label, host))
1247 return 'Removed label %s from host %s' % (label, host)
1248
1249 @trivial_completion
1250 def host_ok_to_stop(self, hostname: str):
1251 if hostname not in self.cache.get_hosts():
1252 raise OrchestratorError(f'Cannot find host "{hostname}"')
1253
1254 daemons = self.cache.get_daemons()
1255 daemon_map = defaultdict(lambda: [])
1256 for dd in daemons:
1257 if dd.hostname == hostname:
1258 daemon_map[dd.daemon_type].append(dd.daemon_id)
1259
1260 for daemon_type,daemon_ids in daemon_map.items():
1261 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
1262 if r.retval:
1263 self.log.error(f'It is NOT safe to stop host {hostname}')
1264 raise orchestrator.OrchestratorError(
1265 r.stderr,
1266 errno=r.retval)
1267
1268 msg = f'It is presumed safe to stop host {hostname}'
1269 self.log.info(msg)
1270 return msg
1271
1272 def update_osdspec_previews(self, search_host: str = ''):
1273 # Set global 'pending' flag for host
1274 self.cache.loading_osdspec_preview.add(search_host)
1275 previews = []
1276 # query OSDSpecs for host <search host> and generate/get the preview
1277 # There can be multiple previews for one host due to multiple OSDSpecs.
1278 previews.extend(self.osd_service.get_previews(search_host))
1279 self.log.debug(f"Loading OSDSpec previews to HostCache")
1280 self.cache.osdspec_previews[search_host] = previews
1281 # Unset global 'pending' flag for host
1282 self.cache.loading_osdspec_preview.remove(search_host)
1283
1284 def _refresh_host_osdspec_previews(self, host) -> bool:
1285 self.update_osdspec_previews(host)
1286 self.cache.save_host(host)
1287 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
1288 return True
1289
1290 def _refresh_hosts_and_daemons(self) -> None:
1291 bad_hosts = []
1292 failures = []
1293
1294 @forall_hosts
1295 def refresh(host):
1296 if self.cache.host_needs_check(host):
1297 r = self._check_host(host)
1298 if r is not None:
1299 bad_hosts.append(r)
1300 if self.cache.host_needs_daemon_refresh(host):
1301 self.log.debug('refreshing %s daemons' % host)
1302 r = self._refresh_host_daemons(host)
1303 if r:
1304 failures.append(r)
1305
1306 if self.cache.host_needs_registry_login(host) and self.registry_url:
1307 self.log.debug(f"Logging `{host}` into custom registry")
1308 r = self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
1309 if r:
1310 bad_hosts.append(r)
1311
1312 if self.cache.host_needs_device_refresh(host):
1313 self.log.debug('refreshing %s devices' % host)
1314 r = self._refresh_host_devices(host)
1315 if r:
1316 failures.append(r)
1317
1318 if self.cache.host_needs_osdspec_preview_refresh(host):
1319 self.log.debug(f"refreshing OSDSpec previews for {host}")
1320 r = self._refresh_host_osdspec_previews(host)
1321 if r:
1322 failures.append(r)
1323
1324 if self.cache.host_needs_new_etc_ceph_ceph_conf(host):
1325 self.log.debug(f"deploying new /etc/ceph/ceph.conf on `{host}`")
1326 r = self._deploy_etc_ceph_ceph_conf(host)
1327 if r:
1328 bad_hosts.append(r)
1329
1330 refresh(self.cache.get_hosts())
1331
1332 health_changed = False
1333 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1334 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
1335 health_changed = True
1336 if bad_hosts:
1337 self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
1338 'severity': 'warning',
1339 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
1340 'count': len(bad_hosts),
1341 'detail': bad_hosts,
1342 }
1343 health_changed = True
1344 if failures:
1345 self.health_checks['CEPHADM_REFRESH_FAILED'] = {
1346 'severity': 'warning',
1347 'summary': 'failed to probe daemons or devices',
1348 'count': len(failures),
1349 'detail': failures,
1350 }
1351 health_changed = True
1352 elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
1353 del self.health_checks['CEPHADM_REFRESH_FAILED']
1354 health_changed = True
1355 if health_changed:
1356 self.set_health_checks(self.health_checks)
1357
1358 def _refresh_host_daemons(self, host) -> Optional[str]:
1359 try:
1360 out, err, code = self._run_cephadm(
1361 host, 'mon', 'ls', [], no_fsid=True)
1362 if code:
1363 return 'host %s cephadm ls returned %d: %s' % (
1364 host, code, err)
1365 except Exception as e:
1366 return 'host %s scrape failed: %s' % (host, e)
1367 ls = json.loads(''.join(out))
1368 dm = {}
1369 for d in ls:
1370 if not d['style'].startswith('cephadm'):
1371 continue
1372 if d['fsid'] != self._cluster_fsid:
1373 continue
1374 if '.' not in d['name']:
1375 continue
1376 sd = orchestrator.DaemonDescription()
1377 sd.last_refresh = datetime.datetime.utcnow()
1378 for k in ['created', 'started', 'last_configured', 'last_deployed']:
1379 v = d.get(k, None)
1380 if v:
1381 setattr(sd, k, datetime.datetime.strptime(d[k], DATEFMT))
1382 sd.daemon_type = d['name'].split('.')[0]
1383 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
1384 sd.hostname = host
1385 sd.container_id = d.get('container_id')
1386 if sd.container_id:
1387 # shorten the hash
1388 sd.container_id = sd.container_id[0:12]
1389 sd.container_image_name = d.get('container_image_name')
1390 sd.container_image_id = d.get('container_image_id')
1391 sd.version = d.get('version')
1392 if sd.daemon_type == 'osd':
1393 sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
1394 if 'state' in d:
1395 sd.status_desc = d['state']
1396 sd.status = {
1397 'running': 1,
1398 'stopped': 0,
1399 'error': -1,
1400 'unknown': -1,
1401 }[d['state']]
1402 else:
1403 sd.status_desc = 'unknown'
1404 sd.status = None
1405 dm[sd.name()] = sd
1406 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
1407 self.cache.update_host_daemons(host, dm)
1408 self.cache.save_host(host)
1409 return None
1410
1411 def _refresh_host_devices(self, host) -> Optional[str]:
1412 try:
1413 out, err, code = self._run_cephadm(
1414 host, 'osd',
1415 'ceph-volume',
1416 ['--', 'inventory', '--format=json'])
1417 if code:
1418 return 'host %s ceph-volume inventory returned %d: %s' % (
1419 host, code, err)
1420 except Exception as e:
1421 return 'host %s ceph-volume inventory failed: %s' % (host, e)
1422 devices = json.loads(''.join(out))
1423 try:
1424 out, err, code = self._run_cephadm(
1425 host, 'mon',
1426 'list-networks',
1427 [],
1428 no_fsid=True)
1429 if code:
1430 return 'host %s list-networks returned %d: %s' % (
1431 host, code, err)
1432 except Exception as e:
1433 return 'host %s list-networks failed: %s' % (host, e)
1434 networks = json.loads(''.join(out))
1435 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
1436 host, len(devices), len(networks)))
1437 devices = inventory.Devices.from_json(devices)
1438 self.cache.update_host_devices_networks(host, devices.devices, networks)
1439 self.update_osdspec_previews(host)
1440 self.cache.save_host(host)
1441 return None
1442
1443 def _deploy_etc_ceph_ceph_conf(self, host: str) -> Optional[str]:
1444 ret, config, err = self.check_mon_command({
1445 "prefix": "config generate-minimal-conf",
1446 })
1447
1448 try:
1449 with self._remote_connection(host) as tpl:
1450 conn, connr = tpl
1451 out, err, code = remoto.process.check(
1452 conn,
1453 ['mkdir', '-p', '/etc/ceph'])
1454 if code:
1455 return f'failed to create /etc/ceph on {host}: {err}'
1456 out, err, code = remoto.process.check(
1457 conn,
1458 ['dd', 'of=/etc/ceph/ceph.conf'],
1459 stdin=config.encode('utf-8')
1460 )
1461 if code:
1462 return f'failed to create /etc/ceph/ceph.conf on {host}: {err}'
1463 self.cache.update_last_etc_ceph_ceph_conf(host)
1464 self.cache.save_host(host)
1465 except OrchestratorError as e:
1466 return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
1467 return None
1468
1469 def _invalidate_daemons_and_kick_serve(self, filter_host=None):
1470 if filter_host:
1471 self.cache.invalidate_host_daemons(filter_host)
1472 else:
1473 for h in self.cache.get_hosts():
1474 # Also discover daemons deployed manually
1475 self.cache.invalidate_host_daemons(h)
1476
1477 self._kick_serve_loop()
1478
1479 @trivial_completion
1480 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1481 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
1482 if refresh:
1483 self._invalidate_daemons_and_kick_serve()
1484 self.log.info('Kicked serve() loop to refresh all services')
1485
1486 # <service_map>
1487 sm: Dict[str, orchestrator.ServiceDescription] = {}
1488 osd_count = 0
1489 for h, dm in self.cache.get_daemons_with_volatile_status():
1490 for name, dd in dm.items():
1491 if service_type and service_type != dd.daemon_type:
1492 continue
1493 n: str = dd.service_name()
1494 if service_name and service_name != n:
1495 continue
1496 if dd.daemon_type == 'osd':
1497 """
1498 OSDs do not know the affinity to their spec out of the box.
1499 """
1500 n = f"osd.{dd.osdspec_affinity}"
1501 if not dd.osdspec_affinity:
1502 # If there is no osdspec_affinity, the spec should suffice for displaying
1503 continue
1504 if n in self.spec_store.specs:
1505 spec = self.spec_store.specs[n]
1506 else:
1507 spec = ServiceSpec(
1508 unmanaged=True,
1509 service_type=dd.daemon_type,
1510 service_id=dd.service_id(),
1511 placement=PlacementSpec(
1512 hosts=[dd.hostname]
1513 )
1514 )
1515 if n not in sm:
1516 sm[n] = orchestrator.ServiceDescription(
1517 last_refresh=dd.last_refresh,
1518 container_image_id=dd.container_image_id,
1519 container_image_name=dd.container_image_name,
1520 spec=spec,
1521 events=self.events.get_for_service(spec.service_name()),
1522 )
1523 if n in self.spec_store.specs:
1524 if dd.daemon_type == 'osd':
1525 """
1526 The osd count can't be determined by the Placement spec.
1527 Showing an actual/expected representation cannot be determined
1528 here. So we're setting running = size for now.
1529 """
1530 osd_count += 1
1531 sm[n].size = osd_count
1532 else:
1533 sm[n].size = spec.placement.get_host_selection_size(
1534 self.inventory.all_specs())
1535
1536 sm[n].created = self.spec_store.spec_created[n]
1537 if service_type == 'nfs':
1538 spec = cast(NFSServiceSpec, spec)
1539 sm[n].rados_config_location = spec.rados_config_location()
1540 else:
1541 sm[n].size = 0
1542 if dd.status == 1:
1543 sm[n].running += 1
1544 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1545 sm[n].last_refresh = dd.last_refresh
1546 if sm[n].container_image_id != dd.container_image_id:
1547 sm[n].container_image_id = 'mix'
1548 if sm[n].container_image_name != dd.container_image_name:
1549 sm[n].container_image_name = 'mix'
1550 for n, spec in self.spec_store.specs.items():
1551 if n in sm:
1552 continue
1553 if service_type is not None and service_type != spec.service_type:
1554 continue
1555 if service_name is not None and service_name != n:
1556 continue
1557 sm[n] = orchestrator.ServiceDescription(
1558 spec=spec,
1559 size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
1560 running=0,
1561 events=self.events.get_for_service(spec.service_name()),
1562 )
1563 if service_type == 'nfs':
1564 spec = cast(NFSServiceSpec, spec)
1565 sm[n].rados_config_location = spec.rados_config_location()
1566 return list(sm.values())
1567
1568 @trivial_completion
1569 def list_daemons(self,
1570 service_name: Optional[str] = None,
1571 daemon_type: Optional[str] = None,
1572 daemon_id: Optional[str] = None,
1573 host: Optional[str] = None,
1574 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
1575 if refresh:
1576 self._invalidate_daemons_and_kick_serve(host)
1577 self.log.info('Kicked serve() loop to refresh all daemons')
1578
1579 result = []
1580 for h, dm in self.cache.get_daemons_with_volatile_status():
1581 if host and h != host:
1582 continue
1583 for name, dd in dm.items():
1584 if daemon_type is not None and daemon_type != dd.daemon_type:
1585 continue
1586 if daemon_id is not None and daemon_id != dd.daemon_id:
1587 continue
1588 if service_name is not None and service_name != dd.service_name():
1589 continue
1590 result.append(dd)
1591 return result
1592
1593 @trivial_completion
1594 def service_action(self, action, service_name) -> List[str]:
1595 args = []
1596 for host, dm in self.cache.daemons.items():
1597 for name, d in dm.items():
1598 if d.matches_service(service_name):
1599 args.append((d.daemon_type, d.daemon_id,
1600 d.hostname, action))
1601 self.log.info('%s service %s' % (action.capitalize(), service_name))
1602 return self._daemon_actions(args)
1603
1604 @forall_hosts
1605 def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
1606 with set_exception_subject('daemon', DaemonDescription(
1607 daemon_type=daemon_type,
1608 daemon_id=daemon_id
1609 ).name()):
1610 return self._daemon_action(daemon_type, daemon_id, host, action)
1611
1612 def _daemon_action(self, daemon_type, daemon_id, host, action, image=None):
1613 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
1614 host=host,
1615 daemon_id=daemon_id,
1616 daemon_type=daemon_type,
1617 )
1618
1619 if image is not None:
1620 if action != 'redeploy':
1621 raise OrchestratorError(
1622 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1623 if daemon_type not in CEPH_TYPES:
1624 raise OrchestratorError(
1625 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1626 f'types are: {", ".join(CEPH_TYPES)}')
1627
1628 self.check_mon_command({
1629 'prefix': 'config set',
1630 'name': 'container_image',
1631 'value': image,
1632 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1633 })
1634
1635 if action == 'redeploy':
1636 # stop, recreate the container+unit, then restart
1637 return self._create_daemon(daemon_spec)
1638 elif action == 'reconfig':
1639 return self._create_daemon(daemon_spec, reconfig=True)
1640
1641 actions = {
1642 'start': ['reset-failed', 'start'],
1643 'stop': ['stop'],
1644 'restart': ['reset-failed', 'restart'],
1645 }
1646 name = daemon_spec.name()
1647 for a in actions[action]:
1648 try:
1649 out, err, code = self._run_cephadm(
1650 host, name, 'unit',
1651 ['--name', name, a])
1652 except Exception:
1653 self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
1654 self.cache.invalidate_host_daemons(daemon_spec.host)
1655 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1656 self.events.for_daemon(name, 'INFO', msg)
1657 return msg
1658
1659 @trivial_completion
1660 def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> str:
1661 d = self.cache.get_daemon(daemon_name)
1662
1663 self.log.info(f'{action} daemon {daemon_name}')
1664 return self._daemon_action(d.daemon_type, d.daemon_id,
1665 d.hostname, action, image=image)
1666
1667 @trivial_completion
1668 def remove_daemons(self, names):
1669 # type: (List[str]) -> List[str]
1670 args = []
1671 for host, dm in self.cache.daemons.items():
1672 for name in names:
1673 if name in dm:
1674 args.append((name, host))
1675 if not args:
1676 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1677 self.log.info('Remove daemons %s' % [a[0] for a in args])
1678 return self._remove_daemons(args)
1679
1680 @trivial_completion
1681 def remove_service(self, service_name) -> str:
1682 self.log.info('Remove service %s' % service_name)
1683 self._trigger_preview_refresh(service_name=service_name)
1684 found = self.spec_store.rm(service_name)
1685 if found:
1686 self._kick_serve_loop()
1687 return 'Removed service %s' % service_name
1688 else:
1689 # must be idempotent: still a success.
1690 return f'Failed to remove service. <{service_name}> was not found.'
1691
1692 @trivial_completion
1693 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
1694 """
1695 Return the storage inventory of hosts matching the given filter.
1696
1697 :param host_filter: host filter
1698
1699 TODO:
1700 - add filtering by label
1701 """
1702 if refresh:
1703 if host_filter and host_filter.hosts:
1704 for h in host_filter.hosts:
1705 self.cache.invalidate_host_devices(h)
1706 else:
1707 for h in self.cache.get_hosts():
1708 self.cache.invalidate_host_devices(h)
1709
1710 self.event.set()
1711 self.log.info('Kicked serve() loop to refresh devices')
1712
1713 result = []
1714 for host, dls in self.cache.devices.items():
1715 if host_filter and host_filter.hosts and host not in host_filter.hosts:
1716 continue
1717 result.append(orchestrator.InventoryHost(host,
1718 inventory.Devices(dls)))
1719 return result
1720
1721 @trivial_completion
1722 def zap_device(self, host, path) -> str:
1723 self.log.info('Zap device %s:%s' % (host, path))
1724 out, err, code = self._run_cephadm(
1725 host, 'osd', 'ceph-volume',
1726 ['--', 'lvm', 'zap', '--destroy', path],
1727 error_ok=True)
1728 self.cache.invalidate_host_devices(host)
1729 if code:
1730 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1731 return '\n'.join(out + err)
1732
1733 @trivial_completion
1734 def blink_device_light(self, ident_fault, on, locs) -> List[str]:
1735 @forall_hosts
1736 def blink(host, dev, path):
1737 cmd = [
1738 'lsmcli',
1739 'local-disk-%s-led-%s' % (
1740 ident_fault,
1741 'on' if on else 'off'),
1742 '--path', path or dev,
1743 ]
1744 out, err, code = self._run_cephadm(
1745 host, 'osd', 'shell', ['--'] + cmd,
1746 error_ok=True)
1747 if code:
1748 raise OrchestratorError(
1749 'Unable to affect %s light for %s:%s. Command: %s' % (
1750 ident_fault, host, dev, ' '.join(cmd)))
1751 self.log.info('Set %s light for %s:%s %s' % (
1752 ident_fault, host, dev, 'on' if on else 'off'))
1753 return "Set %s light for %s:%s %s" % (
1754 ident_fault, host, dev, 'on' if on else 'off')
1755
1756 return blink(locs)
1757
1758 def get_osd_uuid_map(self, only_up=False):
1759 # type: (bool) -> Dict[str, str]
1760 osd_map = self.get('osd_map')
1761 r = {}
1762 for o in osd_map['osds']:
1763 # only include OSDs that have ever started in this map. this way
1764 # an interrupted osd create can be repeated and succeed the second
1765 # time around.
1766 osd_id = o.get('osd')
1767 if osd_id is None:
1768 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1769 if not only_up or (o['up_from'] > 0):
1770 r[str(osd_id)] = o.get('uuid', '')
1771 return r
1772
1773 def _trigger_preview_refresh(self,
1774 specs: Optional[List[DriveGroupSpec]] = None,
1775 service_name: Optional[str] = None,
1776 ) -> None:
1777 # Only trigger a refresh when a spec has changed
1778 trigger_specs = []
1779 if specs:
1780 for spec in specs:
1781 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
1782 # the to-be-preview spec != the actual spec, this means we need to
1783 # trigger a refresh, if the spec has been removed (==None) we need to
1784 # refresh as well.
1785 if not preview_spec or spec != preview_spec:
1786 trigger_specs.append(spec)
1787 if service_name:
1788 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
1789 if not any(trigger_specs):
1790 return None
1791
1792 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
1793 for host in refresh_hosts:
1794 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1795 self.cache.osdspec_previews_refresh_queue.append(host)
1796
1797 @trivial_completion
1798 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
1799 """
1800 Deprecated. Please use `apply()` instead.
1801
1802 Keeping this around to be compapatible to mgr/dashboard
1803 """
1804 return [self._apply(spec) for spec in specs]
1805
1806 @trivial_completion
1807 def create_osds(self, drive_group: DriveGroupSpec) -> str:
1808 return self.osd_service.create_from_spec(drive_group)
1809
1810 def _preview_osdspecs(self,
1811 osdspecs: Optional[List[DriveGroupSpec]] = None
1812 ):
1813 if not osdspecs:
1814 return {'n/a': [{'error': True,
1815 'message': 'No OSDSpec or matching hosts found.'}]}
1816 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
1817 if not matching_hosts:
1818 return {'n/a': [{'error': True,
1819 'message': 'No OSDSpec or matching hosts found.'}]}
1820 # Is any host still loading previews
1821 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
1822 if pending_hosts:
1823 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1824 return {'n/a': [{'error': True,
1825 'message': 'Preview data is being generated.. '
1826 'Please re-run this command in a bit.'}]}
1827 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1828 previews_for_specs = {}
1829 for host, raw_reports in self.cache.osdspec_previews.items():
1830 if host not in matching_hosts:
1831 continue
1832 osd_reports = []
1833 for osd_report in raw_reports:
1834 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
1835 osd_reports.append(osd_report)
1836 previews_for_specs.update({host: osd_reports})
1837 return previews_for_specs
1838
1839 def _calc_daemon_deps(self, daemon_type, daemon_id):
1840 need = {
1841 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1842 'grafana': ['prometheus'],
1843 'alertmanager': ['mgr', 'alertmanager'],
1844 }
1845 deps = []
1846 for dep_type in need.get(daemon_type, []):
1847 for dd in self.cache.get_daemons_by_service(dep_type):
1848 deps.append(dd.name())
1849 return sorted(deps)
1850
1851 def _get_config_and_keyring(self, daemon_type, daemon_id, host,
1852 keyring=None,
1853 extra_ceph_config=None):
1854 # type: (str, str, str, Optional[str], Optional[str]) -> Dict[str, Any]
1855 # keyring
1856 if not keyring:
1857 ename = utils.name_to_auth_entity(daemon_type, daemon_id, host=host)
1858 ret, keyring, err = self.check_mon_command({
1859 'prefix': 'auth get',
1860 'entity': ename,
1861 })
1862
1863 # generate config
1864 ret, config, err = self.check_mon_command({
1865 "prefix": "config generate-minimal-conf",
1866 })
1867 if extra_ceph_config:
1868 config += extra_ceph_config
1869
1870 return {
1871 'config': config,
1872 'keyring': keyring,
1873 }
1874
1875 def _create_daemon(self,
1876 daemon_spec: CephadmDaemonSpec,
1877 reconfig=False,
1878 osd_uuid_map: Optional[Dict[str, Any]] = None,
1879 redeploy=False,
1880 ) -> str:
1881
1882
1883 with set_exception_subject('service', orchestrator.DaemonDescription(
1884 daemon_type=daemon_spec.daemon_type,
1885 daemon_id=daemon_spec.daemon_id,
1886 hostname=daemon_spec.host,
1887 ).service_id(), overwrite=True):
1888
1889 start_time = datetime.datetime.utcnow()
1890 cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
1891
1892 daemon_spec.extra_args.extend(['--config-json', '-'])
1893
1894 # TCP port to open in the host firewall
1895 if daemon_spec.ports:
1896 daemon_spec.extra_args.extend(['--tcp-ports', ' '.join(map(str,daemon_spec.ports))])
1897
1898 # osd deployments needs an --osd-uuid arg
1899 if daemon_spec.daemon_type == 'osd':
1900 if not osd_uuid_map:
1901 osd_uuid_map = self.get_osd_uuid_map()
1902 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1903 if not osd_uuid:
1904 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1905 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1906
1907 if reconfig:
1908 daemon_spec.extra_args.append('--reconfig')
1909 if self.allow_ptrace:
1910 daemon_spec.extra_args.append('--allow-ptrace')
1911
1912 if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
1913 self._registry_login(daemon_spec.host, self.registry_url, self.registry_username, self.registry_password)
1914
1915 self.log.info('%s daemon %s on %s' % (
1916 'Reconfiguring' if reconfig else 'Deploying',
1917 daemon_spec.name(), daemon_spec.host))
1918
1919 out, err, code = self._run_cephadm(
1920 daemon_spec.host, daemon_spec.name(), 'deploy',
1921 [
1922 '--name', daemon_spec.name(),
1923 ] + daemon_spec.extra_args,
1924 stdin=json.dumps(cephadm_config))
1925 if not code and daemon_spec.host in self.cache.daemons:
1926 # prime cached service state with what we (should have)
1927 # just created
1928 sd = orchestrator.DaemonDescription()
1929 sd.daemon_type = daemon_spec.daemon_type
1930 sd.daemon_id = daemon_spec.daemon_id
1931 sd.hostname = daemon_spec.host
1932 sd.status = 1
1933 sd.status_desc = 'starting'
1934 self.cache.add_daemon(daemon_spec.host, sd)
1935 if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
1936 self.requires_post_actions.add(daemon_spec.daemon_type)
1937 self.cache.invalidate_host_daemons(daemon_spec.host)
1938 self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
1939 self.cache.save_host(daemon_spec.host)
1940 msg = "{} {} on host '{}'".format(
1941 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1942 if not code:
1943 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1944 else:
1945 what = 'reconfigure' if reconfig else 'deploy'
1946 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1947 return msg
1948
1949 @forall_hosts
1950 def _remove_daemons(self, name, host) -> str:
1951 return self._remove_daemon(name, host)
1952
1953 def _remove_daemon(self, name, host) -> str:
1954 """
1955 Remove a daemon
1956 """
1957 (daemon_type, daemon_id) = name.split('.', 1)
1958
1959 with set_exception_subject('service', orchestrator.DaemonDescription(
1960 daemon_type=daemon_type,
1961 daemon_id=daemon_id,
1962 hostname=host,
1963 ).service_id(), overwrite=True):
1964
1965
1966 self.cephadm_services[daemon_type].pre_remove(daemon_id)
1967
1968 args = ['--name', name, '--force']
1969 self.log.info('Removing daemon %s from %s' % (name, host))
1970 out, err, code = self._run_cephadm(
1971 host, name, 'rm-daemon', args)
1972 if not code:
1973 # remove item from cache
1974 self.cache.rm_daemon(host, name)
1975 self.cache.invalidate_host_daemons(host)
1976 return "Removed {} from host '{}'".format(name, host)
1977
1978 def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
1979 fn = {
1980 'mds': self.mds_service.config,
1981 'rgw': self.rgw_service.config,
1982 'nfs': self.nfs_service.config,
1983 'iscsi': self.iscsi_service.config,
1984 }.get(service_type)
1985 return cast(Callable[[ServiceSpec], None], fn)
1986
1987 def _apply_service(self, spec: ServiceSpec) -> bool:
1988 """
1989 Schedule a service. Deploy new daemons or remove old ones, depending
1990 on the target label and count specified in the placement.
1991 """
1992 daemon_type = spec.service_type
1993 service_name = spec.service_name()
1994 if spec.unmanaged:
1995 self.log.debug('Skipping unmanaged service %s' % service_name)
1996 return False
1997 if spec.preview_only:
1998 self.log.debug('Skipping preview_only service %s' % service_name)
1999 return False
2000 self.log.debug('Applying service %s spec' % service_name)
2001
2002 config_func = self._config_fn(daemon_type)
2003
2004 if daemon_type == 'osd':
2005 self.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
2006 # TODO: return True would result in a busy loop
2007 return False
2008
2009 daemons = self.cache.get_daemons_by_service(service_name)
2010
2011 public_network = None
2012 if daemon_type == 'mon':
2013 ret, out, err = self.check_mon_command({
2014 'prefix': 'config get',
2015 'who': 'mon',
2016 'key': 'public_network',
2017 })
2018 if '/' in out:
2019 public_network = out.strip()
2020 self.log.debug('mon public_network is %s' % public_network)
2021
2022 def matches_network(host):
2023 # type: (str) -> bool
2024 if not public_network:
2025 return False
2026 # make sure we have 1 or more IPs for that network on that
2027 # host
2028 return len(self.cache.networks[host].get(public_network, [])) > 0
2029
2030 ha = HostAssignment(
2031 spec=spec,
2032 get_hosts_func=self._get_hosts,
2033 get_daemons_func=self.cache.get_daemons_by_service,
2034 filter_new_host=matches_network if daemon_type == 'mon' else None,
2035 )
2036
2037 hosts: List[HostPlacementSpec] = ha.place()
2038 self.log.debug('Usable hosts: %s' % hosts)
2039
2040 r = False
2041
2042 # sanity check
2043 if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
2044 self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
2045 return False
2046
2047 # add any?
2048 did_config = False
2049
2050 add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
2051 self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
2052
2053 remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
2054 self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
2055
2056 for host, network, name in add_daemon_hosts:
2057 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2058 prefix=spec.service_id,
2059 forcename=name)
2060
2061 if not did_config and config_func:
2062 if daemon_type == 'rgw':
2063 rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
2064 rgw_config_func(cast(RGWSpec, spec), daemon_id)
2065 else:
2066 config_func(spec)
2067 did_config = True
2068
2069 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
2070 self.log.debug('Placing %s.%s on host %s' % (
2071 daemon_type, daemon_id, host))
2072
2073 self.cephadm_services[daemon_type].create(daemon_spec)
2074
2075 # add to daemon list so next name(s) will also be unique
2076 sd = orchestrator.DaemonDescription(
2077 hostname=host,
2078 daemon_type=daemon_type,
2079 daemon_id=daemon_id,
2080 )
2081 daemons.append(sd)
2082 r = True
2083
2084 # remove any?
2085 def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
2086 daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
2087 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
2088 return not r.retval
2089
2090 while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
2091 # let's find a subset that is ok-to-stop
2092 remove_daemon_hosts.pop()
2093 for d in remove_daemon_hosts:
2094 # NOTE: we are passing the 'force' flag here, which means
2095 # we can delete a mon instances data.
2096 self._remove_daemon(d.name(), d.hostname)
2097 r = True
2098
2099 return r
2100
2101 def _apply_all_services(self):
2102 r = False
2103 specs = [] # type: List[ServiceSpec]
2104 for sn, spec in self.spec_store.specs.items():
2105 specs.append(spec)
2106 for spec in specs:
2107 try:
2108 if self._apply_service(spec):
2109 r = True
2110 except Exception as e:
2111 self.log.exception('Failed to apply %s spec %s: %s' % (
2112 spec.service_name(), spec, e))
2113 self.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
2114
2115 return r
2116
2117 def _check_pool_exists(self, pool, service_name):
2118 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
2119 if not self.rados.pool_exists(pool):
2120 raise OrchestratorError(f'Cannot find pool "{pool}" for '
2121 f'service {service_name}')
2122
2123 def _check_daemons(self):
2124
2125 daemons = self.cache.get_daemons()
2126 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
2127 for dd in daemons:
2128 # orphan?
2129 spec = self.spec_store.specs.get(dd.service_name(), None)
2130 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
2131 # (mon and mgr specs should always exist; osds aren't matched
2132 # to a service spec)
2133 self.log.info('Removing orphan daemon %s...' % dd.name())
2134 self._remove_daemon(dd.name(), dd.hostname)
2135
2136 # ignore unmanaged services
2137 if spec and spec.unmanaged:
2138 continue
2139
2140 # These daemon types require additional configs after creation
2141 if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
2142 daemons_post[dd.daemon_type].append(dd)
2143
2144 if self.cephadm_services[dd.daemon_type].get_active_daemon(
2145 self.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
2146 dd.is_active = True
2147 else:
2148 dd.is_active = False
2149
2150 deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
2151 last_deps, last_config = self.cache.get_daemon_last_config_deps(
2152 dd.hostname, dd.name())
2153 if last_deps is None:
2154 last_deps = []
2155 reconfig = False
2156 if not last_config:
2157 self.log.info('Reconfiguring %s (unknown last config time)...'% (
2158 dd.name()))
2159 reconfig = True
2160 elif last_deps != deps:
2161 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
2162 deps))
2163 self.log.info('Reconfiguring %s (dependencies changed)...' % (
2164 dd.name()))
2165 reconfig = True
2166 elif self.last_monmap and \
2167 self.last_monmap > last_config and \
2168 dd.daemon_type in CEPH_TYPES:
2169 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
2170 reconfig = True
2171 if reconfig:
2172 try:
2173 self._create_daemon(
2174 CephadmDaemonSpec(
2175 host=dd.hostname,
2176 daemon_id=dd.daemon_id,
2177 daemon_type=dd.daemon_type),
2178 reconfig=True)
2179 except OrchestratorError as e:
2180 self.events.from_orch_error(e)
2181 if dd.daemon_type in daemons_post:
2182 del daemons_post[dd.daemon_type]
2183 # continue...
2184 except Exception as e:
2185 self.events.for_daemon_from_exception(dd.name(), e)
2186 if dd.daemon_type in daemons_post:
2187 del daemons_post[dd.daemon_type]
2188 # continue...
2189
2190 # do daemon post actions
2191 for daemon_type, daemon_descs in daemons_post.items():
2192 if daemon_type in self.requires_post_actions:
2193 self.requires_post_actions.remove(daemon_type)
2194 self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
2195
2196 def _add_daemon(self, daemon_type, spec,
2197 create_func: Callable[..., T], config_func=None) -> List[T]:
2198 """
2199 Add (and place) a daemon. Require explicit host placement. Do not
2200 schedule, and do not apply the related scheduling limitations.
2201 """
2202 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2203 if not spec.placement.hosts:
2204 raise OrchestratorError('must specify host(s) to deploy on')
2205 count = spec.placement.count or len(spec.placement.hosts)
2206 daemons = self.cache.get_daemons_by_service(spec.service_name())
2207 return self._create_daemons(daemon_type, spec, daemons,
2208 spec.placement.hosts, count,
2209 create_func, config_func)
2210
2211 def _create_daemons(self, daemon_type, spec, daemons,
2212 hosts, count,
2213 create_func: Callable[..., T], config_func=None) -> List[T]:
2214 if count > len(hosts):
2215 raise OrchestratorError('too few hosts: want %d, have %s' % (
2216 count, hosts))
2217
2218 did_config = False
2219
2220 args = [] # type: List[CephadmDaemonSpec]
2221 for host, network, name in hosts:
2222 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2223 prefix=spec.service_id,
2224 forcename=name)
2225
2226 if not did_config and config_func:
2227 if daemon_type == 'rgw':
2228 config_func(spec, daemon_id)
2229 else:
2230 config_func(spec)
2231 did_config = True
2232
2233 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
2234 self.log.debug('Placing %s.%s on host %s' % (
2235 daemon_type, daemon_id, host))
2236 args.append(daemon_spec)
2237
2238 # add to daemon list so next name(s) will also be unique
2239 sd = orchestrator.DaemonDescription(
2240 hostname=host,
2241 daemon_type=daemon_type,
2242 daemon_id=daemon_id,
2243 )
2244 daemons.append(sd)
2245
2246 @forall_hosts
2247 def create_func_map(*args):
2248 return create_func(*args)
2249
2250 return create_func_map(args)
2251
2252 @trivial_completion
2253 def apply_mon(self, spec) -> str:
2254 return self._apply(spec)
2255
2256 @trivial_completion
2257 def add_mon(self, spec):
2258 # type: (ServiceSpec) -> List[str]
2259 return self._add_daemon('mon', spec, self.mon_service.create)
2260
2261 @trivial_completion
2262 def add_mgr(self, spec):
2263 # type: (ServiceSpec) -> List[str]
2264 return self._add_daemon('mgr', spec, self.mgr_service.create)
2265
2266 def _apply(self, spec: GenericSpec) -> str:
2267 self.migration.verify_no_migration()
2268
2269 if spec.service_type == 'host':
2270 return self._add_host(cast(HostSpec, spec))
2271
2272 if spec.service_type == 'osd':
2273 # _trigger preview refresh needs to be smart and
2274 # should only refresh if a change has been detected
2275 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
2276
2277 return self._apply_service_spec(cast(ServiceSpec, spec))
2278
2279 def _plan(self, spec: ServiceSpec):
2280 if spec.service_type == 'osd':
2281 return {'service_name': spec.service_name(),
2282 'service_type': spec.service_type,
2283 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
2284
2285 ha = HostAssignment(
2286 spec=spec,
2287 get_hosts_func=self._get_hosts,
2288 get_daemons_func=self.cache.get_daemons_by_service,
2289 )
2290 ha.validate()
2291 hosts = ha.place()
2292
2293 add_daemon_hosts = ha.add_daemon_hosts(hosts)
2294 remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
2295
2296 return {
2297 'service_name': spec.service_name(),
2298 'service_type': spec.service_type,
2299 'add': [hs.hostname for hs in add_daemon_hosts],
2300 'remove': [d.hostname for d in remove_daemon_hosts]
2301 }
2302
2303 @trivial_completion
2304 def plan(self, specs: List[GenericSpec]) -> List:
2305 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
2306 'to the current inventory setup. If any on these conditions changes, the \n'
2307 'preview will be invalid. Please make sure to have a minimal \n'
2308 'timeframe between planning and applying the specs.'}]
2309 if any([spec.service_type == 'host' for spec in specs]):
2310 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
2311 for spec in specs:
2312 results.append(self._plan(cast(ServiceSpec, spec)))
2313 return results
2314
2315 def _apply_service_spec(self, spec: ServiceSpec) -> str:
2316 if spec.placement.is_empty():
2317 # fill in default placement
2318 defaults = {
2319 'mon': PlacementSpec(count=5),
2320 'mgr': PlacementSpec(count=2),
2321 'mds': PlacementSpec(count=2),
2322 'rgw': PlacementSpec(count=2),
2323 'iscsi': PlacementSpec(count=1),
2324 'rbd-mirror': PlacementSpec(count=2),
2325 'nfs': PlacementSpec(count=1),
2326 'grafana': PlacementSpec(count=1),
2327 'alertmanager': PlacementSpec(count=1),
2328 'prometheus': PlacementSpec(count=1),
2329 'node-exporter': PlacementSpec(host_pattern='*'),
2330 'crash': PlacementSpec(host_pattern='*'),
2331 }
2332 spec.placement = defaults[spec.service_type]
2333 elif spec.service_type in ['mon', 'mgr'] and \
2334 spec.placement.count is not None and \
2335 spec.placement.count < 1:
2336 raise OrchestratorError('cannot scale %s service below 1' % (
2337 spec.service_type))
2338
2339 HostAssignment(
2340 spec=spec,
2341 get_hosts_func=self._get_hosts,
2342 get_daemons_func=self.cache.get_daemons_by_service,
2343 ).validate()
2344
2345 self.log.info('Saving service %s spec with placement %s' % (
2346 spec.service_name(), spec.placement.pretty_str()))
2347 self.spec_store.save(spec)
2348 self._kick_serve_loop()
2349 return "Scheduled %s update..." % spec.service_name()
2350
2351 @trivial_completion
2352 def apply(self, specs: List[GenericSpec]) -> List[str]:
2353 results = []
2354 for spec in specs:
2355 results.append(self._apply(spec))
2356 return results
2357
2358 @trivial_completion
2359 def apply_mgr(self, spec) -> str:
2360 return self._apply(spec)
2361
2362 @trivial_completion
2363 def add_mds(self, spec: ServiceSpec) -> List[str]:
2364 return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
2365
2366 @trivial_completion
2367 def apply_mds(self, spec: ServiceSpec) -> str:
2368 return self._apply(spec)
2369
2370 @trivial_completion
2371 def add_rgw(self, spec) -> List[str]:
2372 return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
2373
2374 @trivial_completion
2375 def apply_rgw(self, spec) -> str:
2376 return self._apply(spec)
2377
2378 @trivial_completion
2379 def add_iscsi(self, spec):
2380 # type: (ServiceSpec) -> List[str]
2381 return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
2382
2383 @trivial_completion
2384 def apply_iscsi(self, spec) -> str:
2385 return self._apply(spec)
2386
2387 @trivial_completion
2388 def add_rbd_mirror(self, spec) -> List[str]:
2389 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
2390
2391 @trivial_completion
2392 def apply_rbd_mirror(self, spec) -> str:
2393 return self._apply(spec)
2394
2395 @trivial_completion
2396 def add_nfs(self, spec) -> List[str]:
2397 return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
2398
2399 @trivial_completion
2400 def apply_nfs(self, spec) -> str:
2401 return self._apply(spec)
2402
2403 def _get_dashboard_url(self):
2404 # type: () -> str
2405 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2406
2407 @trivial_completion
2408 def add_prometheus(self, spec) -> List[str]:
2409 return self._add_daemon('prometheus', spec, self.prometheus_service.create)
2410
2411 @trivial_completion
2412 def apply_prometheus(self, spec) -> str:
2413 return self._apply(spec)
2414
2415 @trivial_completion
2416 def add_node_exporter(self, spec):
2417 # type: (ServiceSpec) -> List[str]
2418 return self._add_daemon('node-exporter', spec,
2419 self.node_exporter_service.create)
2420
2421 @trivial_completion
2422 def apply_node_exporter(self, spec) -> str:
2423 return self._apply(spec)
2424
2425 @trivial_completion
2426 def add_crash(self, spec):
2427 # type: (ServiceSpec) -> List[str]
2428 return self._add_daemon('crash', spec,
2429 self.crash_service.create)
2430
2431 @trivial_completion
2432 def apply_crash(self, spec) -> str:
2433 return self._apply(spec)
2434
2435 @trivial_completion
2436 def add_grafana(self, spec):
2437 # type: (ServiceSpec) -> List[str]
2438 return self._add_daemon('grafana', spec, self.grafana_service.create)
2439
2440 @trivial_completion
2441 def apply_grafana(self, spec: ServiceSpec) -> str:
2442 return self._apply(spec)
2443
2444 @trivial_completion
2445 def add_alertmanager(self, spec):
2446 # type: (ServiceSpec) -> List[str]
2447 return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
2448
2449 @trivial_completion
2450 def apply_alertmanager(self, spec: ServiceSpec) -> str:
2451 return self._apply(spec)
2452
2453 def _get_container_image_id(self, image_name):
2454 # pick a random host...
2455 host = None
2456 for host_name in self.inventory.keys():
2457 host = host_name
2458 break
2459 if not host:
2460 raise OrchestratorError('no hosts defined')
2461 if self.cache.host_needs_registry_login(host) and self.registry_url:
2462 self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
2463 out, err, code = self._run_cephadm(
2464 host, '', 'pull', [],
2465 image=image_name,
2466 no_fsid=True,
2467 error_ok=True)
2468 if code:
2469 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2470 image_name, host, '\n'.join(out)))
2471 j = json.loads('\n'.join(out))
2472 image_id = j.get('image_id')
2473 ceph_version = j.get('ceph_version')
2474 self.log.debug('image %s -> id %s version %s' %
2475 (image_name, image_id, ceph_version))
2476 return image_id, ceph_version
2477
2478 @trivial_completion
2479 def upgrade_check(self, image, version) -> str:
2480 if version:
2481 target_name = self.container_image_base + ':v' + version
2482 elif image:
2483 target_name = image
2484 else:
2485 raise OrchestratorError('must specify either image or version')
2486
2487 target_id, target_version = self._get_container_image_id(target_name)
2488 self.log.debug('Target image %s id %s version %s' % (
2489 target_name, target_id, target_version))
2490 r = {
2491 'target_name': target_name,
2492 'target_id': target_id,
2493 'target_version': target_version,
2494 'needs_update': dict(),
2495 'up_to_date': list(),
2496 }
2497 for host, dm in self.cache.daemons.items():
2498 for name, dd in dm.items():
2499 if target_id == dd.container_image_id:
2500 r['up_to_date'].append(dd.name())
2501 else:
2502 r['needs_update'][dd.name()] = {
2503 'current_name': dd.container_image_name,
2504 'current_id': dd.container_image_id,
2505 'current_version': dd.version,
2506 }
2507 return json.dumps(r, indent=4, sort_keys=True)
2508
2509 @trivial_completion
2510 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
2511 return self.upgrade.upgrade_status()
2512
2513 @trivial_completion
2514 def upgrade_start(self, image, version) -> str:
2515 return self.upgrade.upgrade_start(image, version)
2516
2517 @trivial_completion
2518 def upgrade_pause(self) -> str:
2519 return self.upgrade.upgrade_pause()
2520
2521 @trivial_completion
2522 def upgrade_resume(self) -> str:
2523 return self.upgrade.upgrade_resume()
2524
2525 @trivial_completion
2526 def upgrade_stop(self) -> str:
2527 return self.upgrade.upgrade_stop()
2528
2529 @trivial_completion
2530 def remove_osds(self, osd_ids: List[str],
2531 replace: bool = False,
2532 force: bool = False) -> str:
2533 """
2534 Takes a list of OSDs and schedules them for removal.
2535 The function that takes care of the actual removal is
2536 process_removal_queue().
2537 """
2538
2539 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2540 to_remove_daemons = list()
2541 for daemon in daemons:
2542 if daemon.daemon_id in osd_ids:
2543 to_remove_daemons.append(daemon)
2544 if not to_remove_daemons:
2545 return f"Unable to find OSDs: {osd_ids}"
2546
2547 for daemon in to_remove_daemons:
2548 try:
2549 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2550 replace=replace,
2551 force=force,
2552 hostname=daemon.hostname,
2553 fullname=daemon.name(),
2554 process_started_at=datetime.datetime.utcnow(),
2555 remove_util=self.rm_util))
2556 except NotFoundError:
2557 return f"Unable to find OSDs: {osd_ids}"
2558
2559 # trigger the serve loop to initiate the removal
2560 self._kick_serve_loop()
2561 return "Scheduled OSD(s) for removal"
2562
2563 @trivial_completion
2564 def stop_remove_osds(self, osd_ids: List[str]):
2565 """
2566 Stops a `removal` process for a List of OSDs.
2567 This will revert their weight and remove it from the osds_to_remove queue
2568 """
2569 for osd_id in osd_ids:
2570 try:
2571 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
2572 remove_util=self.rm_util))
2573 except (NotFoundError, KeyError):
2574 return f'Unable to find OSD in the queue: {osd_id}'
2575
2576 # trigger the serve loop to halt the removal
2577 self._kick_serve_loop()
2578 return "Stopped OSD(s) removal"
2579
2580 @trivial_completion
2581 def remove_osds_status(self):
2582 """
2583 The CLI call to retrieve an osd removal report
2584 """
2585 return self.to_remove_osds.all_osds()