]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/module.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
CommitLineData
9f95a23c
TL
1import json
2import errno
3import logging
f91f0fd5
TL
4import re
5import shlex
e306af50 6from collections import defaultdict
f91f0fd5 7from configparser import ConfigParser
f6b5b4d7 8from contextlib import contextmanager
9f95a23c 9from functools import wraps
e306af50
TL
10from tempfile import TemporaryDirectory
11from threading import Event
9f95a23c
TL
12
13import string
e306af50 14from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
f91f0fd5 15 Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple
9f95a23c
TL
16
17import datetime
18import six
19import os
20import random
21import tempfile
22import multiprocessing.pool
9f95a23c 23import subprocess
9f95a23c 24
e306af50 25from ceph.deployment import inventory
9f95a23c 26from ceph.deployment.drive_group import DriveGroupSpec
801d1391 27from ceph.deployment.service_spec import \
f91f0fd5
TL
28 NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
29 CustomContainerSpec
30from cephadm.serve import CephadmServe
f6b5b4d7 31from cephadm.services.cephadmservice import CephadmDaemonSpec
9f95a23c 32
801d1391 33from mgr_module import MgrModule, HandleCommandResult
9f95a23c
TL
34import orchestrator
35from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
f6b5b4d7 36 CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
e306af50 37from orchestrator._interface import GenericSpec
9f95a23c
TL
38
39from . import remotes
801d1391 40from . import utils
f6b5b4d7 41from .migrations import Migrations
e306af50
TL
42from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
43 RbdMirrorService, CrashService, CephadmService
f91f0fd5 44from .services.container import CustomContainerService
e306af50
TL
45from .services.iscsi import IscsiService
46from .services.nfs import NFSService
f6b5b4d7 47from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
e306af50
TL
48from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
49 NodeExporterService
f91f0fd5 50from .schedule import HostAssignment
f6b5b4d7 51from .inventory import Inventory, SpecStore, HostCache, EventStore
e306af50
TL
52from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
53from .template import TemplateMgr
f91f0fd5
TL
54from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, \
55 str_to_datetime, datetime_to_str
9f95a23c
TL
56
57try:
58 import remoto
e306af50
TL
59 # NOTE(mattoliverau) Patch remoto until remoto PR
60 # (https://github.com/alfredodeza/remoto/pull/56) lands
61 from distutils.version import StrictVersion
62 if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
63 def remoto_has_connection(self):
64 return self.gateway.hasreceiver()
65
66 from remoto.backends import BaseConnection
67 BaseConnection.has_connection = remoto_has_connection
9f95a23c
TL
68 import remoto.process
69 import execnet.gateway_bootstrap
70except ImportError as e:
71 remoto = None
72 remoto_import_error = str(e)
73
74try:
75 from typing import List
76except ImportError:
77 pass
78
79logger = logging.getLogger(__name__)
80
e306af50
TL
81T = TypeVar('T')
82
1911f103
TL
83DEFAULT_SSH_CONFIG = """
84Host *
85 User root
86 StrictHostKeyChecking no
87 UserKnownHostsFile /dev/null
88 ConnectTimeout=30
89"""
9f95a23c 90
9f95a23c
TL
91CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
92
9f95a23c
TL
93CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
94
95
f6b5b4d7 96class CephadmCompletion(orchestrator.Completion[T]):
e306af50
TL
97 def evaluate(self):
98 self.finalize(None)
9f95a23c 99
f91f0fd5 100
f6b5b4d7 101def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
9f95a23c 102 """
e306af50
TL
103 Decorator to make CephadmCompletion methods return
104 a completion object that executes themselves.
9f95a23c 105 """
9f95a23c 106
9f95a23c
TL
107 @wraps(f)
108 def wrapper(*args, **kwargs):
e306af50
TL
109 return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
110
9f95a23c
TL
111 return wrapper
112
113
f91f0fd5
TL
114class ContainerInspectInfo(NamedTuple):
115 image_id: str
116 ceph_version: Optional[str]
117 repo_digest: Optional[str]
118
119
9f95a23c
TL
120@six.add_metaclass(CLICommandMeta)
121class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
122
123 _STORE_HOST_PREFIX = "host"
124
125 instance = None
126 NATIVE_OPTIONS = [] # type: List[Any]
f6b5b4d7 127 MODULE_OPTIONS: List[dict] = [
9f95a23c
TL
128 {
129 'name': 'ssh_config_file',
130 'type': 'str',
131 'default': None,
132 'desc': 'customized SSH config file to connect to managed hosts',
133 },
134 {
135 'name': 'device_cache_timeout',
136 'type': 'secs',
137 'default': 30 * 60,
138 'desc': 'seconds to cache device inventory',
139 },
140 {
141 'name': 'daemon_cache_timeout',
142 'type': 'secs',
143 'default': 10 * 60,
144 'desc': 'seconds to cache service (daemon) inventory',
145 },
146 {
147 'name': 'host_check_interval',
148 'type': 'secs',
149 'default': 10 * 60,
150 'desc': 'how frequently to perform a host check',
151 },
152 {
153 'name': 'mode',
154 'type': 'str',
155 'enum_allowed': ['root', 'cephadm-package'],
156 'default': 'root',
157 'desc': 'mode for remote execution of cephadm',
158 },
159 {
160 'name': 'container_image_base',
801d1391 161 'default': 'docker.io/ceph/ceph',
9f95a23c
TL
162 'desc': 'Container image name, without the tag',
163 'runtime': True,
164 },
e306af50
TL
165 {
166 'name': 'container_image_prometheus',
f91f0fd5 167 'default': 'docker.io/prom/prometheus:v2.18.1',
e306af50
TL
168 'desc': 'Prometheus container image',
169 },
170 {
171 'name': 'container_image_grafana',
f91f0fd5 172 'default': 'docker.io/ceph/ceph-grafana:6.6.2',
e306af50
TL
173 'desc': 'Prometheus container image',
174 },
175 {
176 'name': 'container_image_alertmanager',
f91f0fd5 177 'default': 'docker.io/prom/alertmanager:v0.20.0',
e306af50
TL
178 'desc': 'Prometheus container image',
179 },
180 {
181 'name': 'container_image_node_exporter',
f91f0fd5 182 'default': 'docker.io/prom/node-exporter:v0.18.1',
e306af50
TL
183 'desc': 'Prometheus container image',
184 },
9f95a23c
TL
185 {
186 'name': 'warn_on_stray_hosts',
187 'type': 'bool',
188 'default': True,
189 'desc': 'raise a health warning if daemons are detected on a host '
190 'that is not managed by cephadm',
191 },
192 {
193 'name': 'warn_on_stray_daemons',
194 'type': 'bool',
195 'default': True,
196 'desc': 'raise a health warning if daemons are detected '
197 'that are not managed by cephadm',
198 },
199 {
200 'name': 'warn_on_failed_host_check',
201 'type': 'bool',
202 'default': True,
203 'desc': 'raise a health warning if the host check fails',
204 },
205 {
206 'name': 'log_to_cluster',
207 'type': 'bool',
208 'default': True,
209 'desc': 'log to the "cephadm" cluster log channel"',
210 },
211 {
212 'name': 'allow_ptrace',
213 'type': 'bool',
214 'default': False,
215 'desc': 'allow SYS_PTRACE capability on ceph containers',
216 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
217 'process with gdb or strace. Enabling this options '
218 'can allow debugging daemons that encounter problems '
219 'at runtime.',
220 },
f91f0fd5
TL
221 {
222 'name': 'container_init',
223 'type': 'bool',
224 'default': False,
225 'desc': 'Run podman/docker with `--init`',
226 },
801d1391
TL
227 {
228 'name': 'prometheus_alerts_path',
229 'type': 'str',
230 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
231 'desc': 'location of alerts to include in prometheus deployments',
232 },
f6b5b4d7
TL
233 {
234 'name': 'migration_current',
235 'type': 'int',
236 'default': None,
237 'desc': 'internal - do not modify',
238 # used to track track spec and other data migrations.
239 },
240 {
241 'name': 'config_dashboard',
242 'type': 'bool',
243 'default': True,
244 'desc': 'manage configs like API endpoints in Dashboard.'
245 },
246 {
247 'name': 'manage_etc_ceph_ceph_conf',
248 'type': 'bool',
249 'default': False,
250 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
251 },
252 {
253 'name': 'registry_url',
254 'type': 'str',
255 'default': None,
256 'desc': 'Custom repository url'
257 },
258 {
259 'name': 'registry_username',
260 'type': 'str',
261 'default': None,
262 'desc': 'Custom repository username'
263 },
264 {
265 'name': 'registry_password',
266 'type': 'str',
267 'default': None,
268 'desc': 'Custom repository password'
269 },
f91f0fd5
TL
270 {
271 'name': 'use_repo_digest',
272 'type': 'bool',
273 'default': False,
274 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image',
275 }
9f95a23c
TL
276 ]
277
278 def __init__(self, *args, **kwargs):
279 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
280 self._cluster_fsid = self.get('mon_map')['fsid']
f6b5b4d7 281 self.last_monmap: Optional[datetime.datetime] = None
9f95a23c
TL
282
283 # for serve()
284 self.run = True
285 self.event = Event()
286
287 if self.get_store('pause'):
288 self.paused = True
289 else:
290 self.paused = False
291
292 # for mypy which does not run the code
293 if TYPE_CHECKING:
294 self.ssh_config_file = None # type: Optional[str]
295 self.device_cache_timeout = 0
296 self.daemon_cache_timeout = 0
297 self.host_check_interval = 0
298 self.mode = ''
299 self.container_image_base = ''
e306af50
TL
300 self.container_image_prometheus = ''
301 self.container_image_grafana = ''
302 self.container_image_alertmanager = ''
303 self.container_image_node_exporter = ''
9f95a23c
TL
304 self.warn_on_stray_hosts = True
305 self.warn_on_stray_daemons = True
306 self.warn_on_failed_host_check = True
307 self.allow_ptrace = False
f91f0fd5 308 self.container_init = False
801d1391 309 self.prometheus_alerts_path = ''
f6b5b4d7
TL
310 self.migration_current = None
311 self.config_dashboard = True
312 self.manage_etc_ceph_ceph_conf = True
313 self.registry_url: Optional[str] = None
314 self.registry_username: Optional[str] = None
315 self.registry_password: Optional[str] = None
f91f0fd5 316 self.use_repo_digest = False
9f95a23c 317
f91f0fd5
TL
318 self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
319 remoto.backends.LegacyModuleExecute]] = {}
f6b5b4d7
TL
320
321 self.notify('mon_map', None)
9f95a23c
TL
322 self.config_notify()
323
324 path = self.get_ceph_option('cephadm_path')
325 try:
326 with open(path, 'r') as f:
327 self._cephadm = f.read()
328 except (IOError, TypeError) as e:
329 raise RuntimeError("unable to read cephadm at '%s': %s" % (
330 path, str(e)))
331
332 self._worker_pool = multiprocessing.pool.ThreadPool(10)
333
334 self._reconfig_ssh()
335
336 CephadmOrchestrator.instance = self
337
e306af50 338 self.upgrade = CephadmUpgrade(self)
9f95a23c
TL
339
340 self.health_checks = {}
341
342 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
343
e306af50 344 self.inventory = Inventory(self)
9f95a23c
TL
345
346 self.cache = HostCache(self)
347 self.cache.load()
f6b5b4d7 348
9f95a23c 349 self.rm_util = RemoveUtil(self)
f6b5b4d7
TL
350 self.to_remove_osds = OSDQueue()
351 self.rm_util.load_from_store()
9f95a23c
TL
352
353 self.spec_store = SpecStore(self)
354 self.spec_store.load()
355
356 # ensure the host lists are in sync
357 for h in self.inventory.keys():
358 if h not in self.cache.daemons:
359 self.cache.prime_empty_host(h)
360 for h in self.cache.get_hosts():
361 if h not in self.inventory:
362 self.cache.rm_host(h)
363
1911f103 364 # in-memory only.
f6b5b4d7 365 self.events = EventStore(self)
1911f103
TL
366 self.offline_hosts: Set[str] = set()
367
f6b5b4d7
TL
368 self.migration = Migrations(self)
369
e306af50
TL
370 # services:
371 self.osd_service = OSDService(self)
372 self.nfs_service = NFSService(self)
373 self.mon_service = MonService(self)
374 self.mgr_service = MgrService(self)
375 self.mds_service = MdsService(self)
376 self.rgw_service = RgwService(self)
377 self.rbd_mirror_service = RbdMirrorService(self)
378 self.grafana_service = GrafanaService(self)
379 self.alertmanager_service = AlertmanagerService(self)
380 self.prometheus_service = PrometheusService(self)
381 self.node_exporter_service = NodeExporterService(self)
382 self.crash_service = CrashService(self)
383 self.iscsi_service = IscsiService(self)
f91f0fd5 384 self.container_service = CustomContainerService(self)
e306af50
TL
385 self.cephadm_services = {
386 'mon': self.mon_service,
387 'mgr': self.mgr_service,
388 'osd': self.osd_service,
389 'mds': self.mds_service,
390 'rgw': self.rgw_service,
391 'rbd-mirror': self.rbd_mirror_service,
392 'nfs': self.nfs_service,
393 'grafana': self.grafana_service,
394 'alertmanager': self.alertmanager_service,
395 'prometheus': self.prometheus_service,
396 'node-exporter': self.node_exporter_service,
397 'crash': self.crash_service,
398 'iscsi': self.iscsi_service,
f91f0fd5 399 'container': self.container_service,
e306af50
TL
400 }
401
f91f0fd5 402 self.template = TemplateMgr(self)
e306af50 403
f6b5b4d7
TL
404 self.requires_post_actions = set()
405
9f95a23c
TL
406 def shutdown(self):
407 self.log.debug('shutdown')
408 self._worker_pool.close()
409 self._worker_pool.join()
410 self.run = False
411 self.event.set()
412
e306af50
TL
413 def _get_cephadm_service(self, service_type: str) -> CephadmService:
414 assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
415 return self.cephadm_services[service_type]
416
9f95a23c
TL
417 def _kick_serve_loop(self):
418 self.log.debug('_kick_serve_loop')
419 self.event.set()
420
f6b5b4d7
TL
421 # function responsible for logging single host into custom registry
422 def _registry_login(self, host, url, username, password):
423 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
424 # want to pass info over stdin rather than through normal list of args
425 args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
426 " \"password\": \"" + password + "\"}")
427 out, err, code = self._run_cephadm(
428 host, 'mon', 'registry-login',
429 ['--registry-json', '-'], stdin=args_str, error_ok=True)
430 if code:
431 return f"Host {host} failed to login to {url} as {username} with given password"
432 return
9f95a23c 433
f6b5b4d7
TL
434 def serve(self) -> None:
435 """
436 The main loop of cephadm.
437
438 A command handler will typically change the declarative state
439 of cephadm. This loop will then attempt to apply this new state.
440 """
f91f0fd5
TL
441 serve = CephadmServe(self)
442 serve.serve()
443
444 def set_container_image(self, entity: str, image):
445 self.check_mon_command({
446 'prefix': 'config set',
447 'name': 'container_image',
448 'value': image,
449 'who': entity,
450 })
f6b5b4d7 451
9f95a23c
TL
452 def config_notify(self):
453 """
454 This method is called whenever one of our config options is changed.
f6b5b4d7
TL
455
456 TODO: this method should be moved into mgr_module.py
9f95a23c
TL
457 """
458 for opt in self.MODULE_OPTIONS:
459 setattr(self,
460 opt['name'], # type: ignore
461 self.get_module_option(opt['name'])) # type: ignore
462 self.log.debug(' mgr option %s = %s',
463 opt['name'], getattr(self, opt['name'])) # type: ignore
464 for opt in self.NATIVE_OPTIONS:
465 setattr(self,
466 opt, # type: ignore
467 self.get_ceph_option(opt))
468 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
469
470 self.event.set()
471
472 def notify(self, notify_type, notify_id):
f6b5b4d7
TL
473 if notify_type == "mon_map":
474 # get monmap mtime so we can refresh configs when mons change
475 monmap = self.get('mon_map')
476 self.last_monmap = datetime.datetime.strptime(
477 monmap['modified'], CEPH_DATEFMT)
478 if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
479 self.last_monmap = None # just in case clocks are skewed
f91f0fd5
TL
480 if getattr(self, 'manage_etc_ceph_ceph_conf', False):
481 # getattr, due to notify() being called before config_notify()
482 self._kick_serve_loop()
f6b5b4d7
TL
483 if notify_type == "pg_summary":
484 self._trigger_osd_removal()
485
486 def _trigger_osd_removal(self):
487 data = self.get("osd_stats")
488 for osd in data.get('osd_stats', []):
489 if osd.get('num_pgs') == 0:
490 # if _ANY_ osd that is currently in the queue appears to be empty,
491 # start the removal process
492 if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
493 self.log.debug(f"Found empty osd. Starting removal process")
494 # if the osd that is now empty is also part of the removal queue
495 # start the process
496 self.rm_util.process_removal_queue()
9f95a23c
TL
497
498 def pause(self):
499 if not self.paused:
500 self.log.info('Paused')
501 self.set_store('pause', 'true')
502 self.paused = True
503 # wake loop so we update the health status
504 self._kick_serve_loop()
505
506 def resume(self):
507 if self.paused:
508 self.log.info('Resumed')
509 self.paused = False
510 self.set_store('pause', None)
511 # unconditionally wake loop so that 'orch resume' can be used to kick
512 # cephadm
513 self._kick_serve_loop()
514
515 def get_unique_name(self, daemon_type, host, existing, prefix=None,
516 forcename=None):
517 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
518 """
519 Generate a unique random service name
520 """
521 suffix = daemon_type not in [
801d1391 522 'mon', 'crash', 'nfs',
9f95a23c 523 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
f91f0fd5 524 'container'
9f95a23c
TL
525 ]
526 if forcename:
527 if len([d for d in existing if d.daemon_id == forcename]):
f91f0fd5
TL
528 raise orchestrator.OrchestratorValidationError(
529 f'name {daemon_type}.{forcename} already in use')
9f95a23c
TL
530 return forcename
531
532 if '.' in host:
533 host = host.split('.')[0]
534 while True:
535 if prefix:
536 name = prefix + '.'
537 else:
538 name = ''
539 name += host
540 if suffix:
541 name += '.' + ''.join(random.choice(string.ascii_lowercase)
542 for _ in range(6))
543 if len([d for d in existing if d.daemon_id == name]):
544 if not suffix:
f91f0fd5
TL
545 raise orchestrator.OrchestratorValidationError(
546 f'name {daemon_type}.{name} already in use')
9f95a23c
TL
547 self.log.debug('name %s exists, trying again', name)
548 continue
549 return name
550
9f95a23c
TL
551 def _reconfig_ssh(self):
552 temp_files = [] # type: list
553 ssh_options = [] # type: List[str]
554
555 # ssh_config
556 ssh_config_fname = self.ssh_config_file
557 ssh_config = self.get_store("ssh_config")
558 if ssh_config is not None or ssh_config_fname is None:
559 if not ssh_config:
560 ssh_config = DEFAULT_SSH_CONFIG
561 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
562 os.fchmod(f.fileno(), 0o600)
563 f.write(ssh_config.encode('utf-8'))
564 f.flush() # make visible to other processes
565 temp_files += [f]
566 ssh_config_fname = f.name
567 if ssh_config_fname:
801d1391 568 self.validate_ssh_config_fname(ssh_config_fname)
9f95a23c 569 ssh_options += ['-F', ssh_config_fname]
f6b5b4d7 570 self.ssh_config = ssh_config
9f95a23c
TL
571
572 # identity
573 ssh_key = self.get_store("ssh_identity_key")
574 ssh_pub = self.get_store("ssh_identity_pub")
575 self.ssh_pub = ssh_pub
576 self.ssh_key = ssh_key
577 if ssh_key and ssh_pub:
578 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
579 tkey.write(ssh_key.encode('utf-8'))
580 os.fchmod(tkey.fileno(), 0o600)
581 tkey.flush() # make visible to other processes
582 tpub = open(tkey.name + '.pub', 'w')
583 os.fchmod(tpub.fileno(), 0o600)
584 tpub.write(ssh_pub)
585 tpub.flush() # make visible to other processes
586 temp_files += [tkey, tpub]
587 ssh_options += ['-i', tkey.name]
588
589 self._temp_files = temp_files
590 if ssh_options:
591 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
592 else:
593 self._ssh_options = None
594
595 if self.mode == 'root':
f6b5b4d7 596 self.ssh_user = self.get_store('ssh_user', default='root')
9f95a23c
TL
597 elif self.mode == 'cephadm-package':
598 self.ssh_user = 'cephadm'
599
600 self._reset_cons()
601
f91f0fd5
TL
602 def validate_ssh_config_content(self, ssh_config):
603 if ssh_config is None or len(ssh_config.strip()) == 0:
604 raise OrchestratorValidationError('ssh_config cannot be empty')
605 # StrictHostKeyChecking is [yes|no] ?
606 l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config)
607 if not l:
608 raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking')
609 for s in l:
610 if 'ask' in s.lower():
611 raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'')
612
801d1391
TL
613 def validate_ssh_config_fname(self, ssh_config_fname):
614 if not os.path.isfile(ssh_config_fname):
615 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
616 ssh_config_fname))
617
9f95a23c
TL
618 def _reset_con(self, host):
619 conn, r = self._cons.get(host, (None, None))
620 if conn:
621 self.log.debug('_reset_con close %s' % host)
622 conn.exit()
623 del self._cons[host]
624
625 def _reset_cons(self):
626 for host, conn_and_r in self._cons.items():
627 self.log.debug('_reset_cons close %s' % host)
628 conn, r = conn_and_r
629 conn.exit()
630 self._cons = {}
631
1911f103
TL
632 def offline_hosts_remove(self, host):
633 if host in self.offline_hosts:
634 self.offline_hosts.remove(host)
635
9f95a23c
TL
636 @staticmethod
637 def can_run():
638 if remoto is not None:
639 return True, ""
640 else:
641 return False, "loading remoto library:{}".format(
f91f0fd5 642 remoto_import_error)
9f95a23c
TL
643
644 def available(self):
645 """
646 The cephadm orchestrator is always available.
647 """
f6b5b4d7
TL
648 ok, err = self.can_run()
649 if not ok:
650 return ok, err
651 if not self.ssh_key or not self.ssh_pub:
652 return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
653 return True, ''
9f95a23c
TL
654
655 def process(self, completions):
656 """
657 Does nothing, as completions are processed in another thread.
658 """
659 if completions:
f91f0fd5
TL
660 self.log.debug("process: completions={0}".format(
661 orchestrator.pretty_print(completions)))
9f95a23c
TL
662
663 for p in completions:
e306af50 664 p.evaluate()
9f95a23c
TL
665
666 @orchestrator._cli_write_command(
667 prefix='cephadm set-ssh-config',
668 desc='Set the ssh_config file (use -i <ssh_config>)')
669 def _set_ssh_config(self, inbuf=None):
670 """
671 Set an ssh_config file provided from stdin
9f95a23c 672 """
f6b5b4d7
TL
673 if inbuf == self.ssh_config:
674 return 0, "value unchanged", ""
f91f0fd5 675 self.validate_ssh_config_content(inbuf)
9f95a23c
TL
676 self.set_store("ssh_config", inbuf)
677 self.log.info('Set ssh_config')
f6b5b4d7 678 self._reconfig_ssh()
9f95a23c
TL
679 return 0, "", ""
680
681 @orchestrator._cli_write_command(
682 prefix='cephadm clear-ssh-config',
683 desc='Clear the ssh_config file')
684 def _clear_ssh_config(self):
685 """
686 Clear the ssh_config file provided from stdin
687 """
688 self.set_store("ssh_config", None)
689 self.ssh_config_tmp = None
690 self.log.info('Cleared ssh_config')
f6b5b4d7 691 self._reconfig_ssh()
9f95a23c
TL
692 return 0, "", ""
693
801d1391
TL
694 @orchestrator._cli_read_command(
695 prefix='cephadm get-ssh-config',
696 desc='Returns the ssh config as used by cephadm'
697 )
698 def _get_ssh_config(self):
699 if self.ssh_config_file:
700 self.validate_ssh_config_fname(self.ssh_config_file)
701 with open(self.ssh_config_file) as f:
702 return HandleCommandResult(stdout=f.read())
703 ssh_config = self.get_store("ssh_config")
704 if ssh_config:
705 return HandleCommandResult(stdout=ssh_config)
706 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
707
9f95a23c
TL
708 @orchestrator._cli_write_command(
709 'cephadm generate-key',
710 desc='Generate a cluster SSH key (if not present)')
711 def _generate_key(self):
712 if not self.ssh_pub or not self.ssh_key:
713 self.log.info('Generating ssh key...')
714 tmp_dir = TemporaryDirectory()
715 path = tmp_dir.name + '/key'
716 try:
1911f103 717 subprocess.check_call([
9f95a23c
TL
718 '/usr/bin/ssh-keygen',
719 '-C', 'ceph-%s' % self._cluster_fsid,
720 '-N', '',
721 '-f', path
722 ])
723 with open(path, 'r') as f:
724 secret = f.read()
725 with open(path + '.pub', 'r') as f:
726 pub = f.read()
727 finally:
728 os.unlink(path)
729 os.unlink(path + '.pub')
730 tmp_dir.cleanup()
731 self.set_store('ssh_identity_key', secret)
732 self.set_store('ssh_identity_pub', pub)
733 self._reconfig_ssh()
734 return 0, '', ''
735
e306af50
TL
736 @orchestrator._cli_write_command(
737 'cephadm set-priv-key',
738 desc='Set cluster SSH private key (use -i <private_key>)')
739 def _set_priv_key(self, inbuf=None):
740 if inbuf is None or len(inbuf) == 0:
741 return -errno.EINVAL, "", "empty private ssh key provided"
f6b5b4d7
TL
742 if inbuf == self.ssh_key:
743 return 0, "value unchanged", ""
e306af50
TL
744 self.set_store("ssh_identity_key", inbuf)
745 self.log.info('Set ssh private key')
746 self._reconfig_ssh()
747 return 0, "", ""
748
749 @orchestrator._cli_write_command(
750 'cephadm set-pub-key',
751 desc='Set cluster SSH public key (use -i <public_key>)')
752 def _set_pub_key(self, inbuf=None):
753 if inbuf is None or len(inbuf) == 0:
754 return -errno.EINVAL, "", "empty public ssh key provided"
f6b5b4d7
TL
755 if inbuf == self.ssh_pub:
756 return 0, "value unchanged", ""
e306af50
TL
757 self.set_store("ssh_identity_pub", inbuf)
758 self.log.info('Set ssh public key')
759 self._reconfig_ssh()
760 return 0, "", ""
761
9f95a23c
TL
762 @orchestrator._cli_write_command(
763 'cephadm clear-key',
764 desc='Clear cluster SSH key')
765 def _clear_key(self):
766 self.set_store('ssh_identity_key', None)
767 self.set_store('ssh_identity_pub', None)
768 self._reconfig_ssh()
769 self.log.info('Cleared cluster SSH key')
770 return 0, '', ''
771
772 @orchestrator._cli_read_command(
773 'cephadm get-pub-key',
774 desc='Show SSH public key for connecting to cluster hosts')
775 def _get_pub_key(self):
776 if self.ssh_pub:
777 return 0, self.ssh_pub, ''
778 else:
779 return -errno.ENOENT, '', 'No cluster SSH key defined'
780
781 @orchestrator._cli_read_command(
782 'cephadm get-user',
783 desc='Show user for SSHing to cluster hosts')
784 def _get_user(self):
785 return 0, self.ssh_user, ''
786
f6b5b4d7
TL
787 @orchestrator._cli_read_command(
788 'cephadm set-user',
789 'name=user,type=CephString',
790 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
791 def set_ssh_user(self, user):
792 current_user = self.ssh_user
793 if user == current_user:
794 return 0, "value unchanged", ""
795
796 self.set_store('ssh_user', user)
797 self._reconfig_ssh()
798
799 host = self.cache.get_hosts()[0]
f91f0fd5 800 r = CephadmServe(self)._check_host(host)
f6b5b4d7 801 if r is not None:
f91f0fd5 802 # connection failed reset user
f6b5b4d7
TL
803 self.set_store('ssh_user', current_user)
804 self._reconfig_ssh()
805 return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
806
807 msg = 'ssh user set to %s' % user
808 if user != 'root':
809 msg += ' sudo will be used'
810 self.log.info(msg)
811 return 0, msg, ''
812
813 @orchestrator._cli_read_command(
814 'cephadm registry-login',
815 "name=url,type=CephString,req=false "
816 "name=username,type=CephString,req=false "
817 "name=password,type=CephString,req=false",
818 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
819 def registry_login(self, url=None, username=None, password=None, inbuf=None):
820 # if password not given in command line, get it through file input
821 if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
822 return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
f91f0fd5 823 "or -i <login credentials json file>")
f6b5b4d7
TL
824 elif not (url and username and password):
825 login_info = json.loads(inbuf)
826 if "url" in login_info and "username" in login_info and "password" in login_info:
827 url = login_info["url"]
828 username = login_info["username"]
829 password = login_info["password"]
830 else:
831 return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
f91f0fd5
TL
832 "Please setup json file as\n"
833 "{\n"
834 " \"url\": \"REGISTRY_URL\",\n"
835 " \"username\": \"REGISTRY_USERNAME\",\n"
836 " \"password\": \"REGISTRY_PASSWORD\"\n"
837 "}\n")
f6b5b4d7
TL
838 # verify login info works by attempting login on random host
839 host = None
840 for host_name in self.inventory.keys():
841 host = host_name
842 break
843 if not host:
844 raise OrchestratorError('no hosts defined')
845 r = self._registry_login(host, url, username, password)
846 if r is not None:
847 return 1, '', r
848 # if logins succeeded, store info
849 self.log.debug("Host logins successful. Storing login info.")
850 self.set_module_option('registry_url', url)
851 self.set_module_option('registry_username', username)
852 self.set_module_option('registry_password', password)
853 # distribute new login info to all hosts
854 self.cache.distribute_new_registry_login_info()
855 return 0, "registry login scheduled", ''
856
9f95a23c
TL
857 @orchestrator._cli_read_command(
858 'cephadm check-host',
859 'name=host,type=CephString '
860 'name=addr,type=CephString,req=false',
861 'Check whether we can access and manage a remote host')
862 def check_host(self, host, addr=None):
f6b5b4d7
TL
863 try:
864 out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
865 ['--expect-hostname', host],
866 addr=addr,
867 error_ok=True, no_fsid=True)
868 if code:
869 return 1, '', ('check-host failed:\n' + '\n'.join(err))
870 except OrchestratorError as e:
871 self.log.exception(f"check-host failed for '{host}'")
872 return 1, '', ('check-host failed:\n' +
f91f0fd5 873 f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
9f95a23c
TL
874 # if we have an outstanding health alert for this host, give the
875 # serve thread a kick
876 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
877 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
878 if item.startswith('host %s ' % host):
879 self.event.set()
880 return 0, '%s (%s) ok' % (host, addr), err
881
882 @orchestrator._cli_read_command(
883 'cephadm prepare-host',
884 'name=host,type=CephString '
885 'name=addr,type=CephString,req=false',
886 'Prepare a remote host for use with cephadm')
887 def _prepare_host(self, host, addr=None):
f6b5b4d7 888 out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
9f95a23c
TL
889 ['--expect-hostname', host],
890 addr=addr,
891 error_ok=True, no_fsid=True)
892 if code:
893 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
894 # if we have an outstanding health alert for this host, give the
895 # serve thread a kick
896 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
897 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
898 if item.startswith('host %s ' % host):
899 self.event.set()
900 return 0, '%s (%s) ok' % (host, addr), err
901
f91f0fd5
TL
902 @orchestrator._cli_write_command(
903 prefix='cephadm set-extra-ceph-conf',
904 desc="Text that is appended to all daemon's ceph.conf.\n"
905 "Mainly a workaround, till `config generate-minimal-conf` generates\n"
906 "a complete ceph.conf.\n\n"
907 "Warning: this is a dangerous operation.")
908 def _set_extra_ceph_conf(self, inbuf=None) -> HandleCommandResult:
909 if inbuf:
910 # sanity check.
911 cp = ConfigParser()
912 cp.read_string(inbuf, source='<infile>')
913
914 self.set_store("extra_ceph_conf", json.dumps({
915 'conf': inbuf,
916 'last_modified': datetime_to_str(datetime.datetime.utcnow())
917 }))
918 self.log.info('Set extra_ceph_conf')
919 self._kick_serve_loop()
920 return HandleCommandResult()
921
922 @orchestrator._cli_read_command(
923 'cephadm get-extra-ceph-conf',
924 desc='Get extra ceph conf that is appended')
925 def _get_extra_ceph_conf(self) -> HandleCommandResult:
926 return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
927
928 class ExtraCephConf(NamedTuple):
929 conf: str
930 last_modified: Optional[datetime.datetime]
931
932 def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf':
933 data = self.get_store('extra_ceph_conf')
934 if not data:
935 return CephadmOrchestrator.ExtraCephConf('', None)
936 try:
937 j = json.loads(data)
938 except ValueError:
939 self.log.exception('unable to laod extra_ceph_conf')
940 return CephadmOrchestrator.ExtraCephConf('', None)
941 return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified']))
942
943 def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool:
944 conf = self.extra_ceph_conf()
945 if not conf.last_modified:
946 return False
947 return conf.last_modified > dt
948
f6b5b4d7 949 def _get_connection(self, host: str):
9f95a23c
TL
950 """
951 Setup a connection for running commands on remote host.
952 """
e306af50
TL
953 conn, r = self._cons.get(host, (None, None))
954 if conn:
955 if conn.has_connection():
956 self.log.debug('Have connection to %s' % host)
957 return conn, r
958 else:
959 self._reset_con(host)
9f95a23c
TL
960 n = self.ssh_user + '@' + host
961 self.log.debug("Opening connection to {} with ssh options '{}'".format(
962 n, self._ssh_options))
f91f0fd5 963 child_logger = self.log.getChild(n)
9f95a23c
TL
964 child_logger.setLevel('WARNING')
965 conn = remoto.Connection(
966 n,
967 logger=child_logger,
f6b5b4d7
TL
968 ssh_options=self._ssh_options,
969 sudo=True if self.ssh_user != 'root' else False)
9f95a23c
TL
970
971 r = conn.import_module(remotes)
972 self._cons[host] = conn, r
973
974 return conn, r
975
976 def _executable_path(self, conn, executable):
977 """
978 Remote validator that accepts a connection object to ensure that a certain
979 executable is available returning its full path if so.
980
981 Otherwise an exception with thorough details will be raised, informing the
982 user that the executable was not found.
983 """
984 executable_path = conn.remote_module.which(executable)
985 if not executable_path:
986 raise RuntimeError("Executable '{}' not found on host '{}'".format(
987 executable, conn.hostname))
988 self.log.debug("Found executable '{}' at path '{}'".format(executable,
f91f0fd5 989 executable_path))
9f95a23c
TL
990 return executable_path
991
f6b5b4d7
TL
992 @contextmanager
993 def _remote_connection(self,
994 host: str,
f91f0fd5 995 addr: Optional[str] = None,
f6b5b4d7 996 ) -> Iterator[Tuple["BaseConnection", Any]]:
9f95a23c 997 if not addr and host in self.inventory:
e306af50 998 addr = self.inventory.get_addr(host)
9f95a23c 999
1911f103
TL
1000 self.offline_hosts_remove(host)
1001
9f95a23c 1002 try:
1911f103 1003 try:
f6b5b4d7
TL
1004 if not addr:
1005 raise OrchestratorError("host address is empty")
1911f103 1006 conn, connr = self._get_connection(addr)
e306af50 1007 except OSError as e:
f6b5b4d7
TL
1008 self._reset_con(host)
1009 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1010 raise execnet.gateway_bootstrap.HostNotFound(msg)
9f95a23c 1011
f6b5b4d7
TL
1012 yield (conn, connr)
1013
1014 except execnet.gateway_bootstrap.HostNotFound as e:
1015 # this is a misleading exception as it seems to be thrown for
1016 # any sort of connection failure, even those having nothing to
1017 # do with "host not found" (e.g., ssh key permission denied).
1018 self.offline_hosts.add(host)
1019 self._reset_con(host)
1020
1021 user = self.ssh_user if self.mode == 'root' else 'cephadm'
1022 msg = f'''Failed to connect to {host} ({addr}).
f91f0fd5
TL
1023Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1024
1025To add the cephadm SSH key to the host:
1026> ceph cephadm get-pub-key > ~/ceph.pub
1027> ssh-copy-id -f -i ~/ceph.pub {user}@{host}
f6b5b4d7 1028
f91f0fd5 1029To check that the host is reachable:
f6b5b4d7 1030> ceph cephadm get-ssh-config > ssh_config
f91f0fd5
TL
1031> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1032> ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}'''
f6b5b4d7
TL
1033 raise OrchestratorError(msg) from e
1034 except Exception as ex:
1035 self.log.exception(ex)
1036 raise
1037
f91f0fd5 1038 def _get_container_image(self, daemon_name: str) -> Optional[str]:
f6b5b4d7
TL
1039 daemon_type = daemon_name.split('.', 1)[0] # type: ignore
1040 if daemon_type in CEPH_TYPES or \
1041 daemon_type == 'nfs' or \
1042 daemon_type == 'iscsi':
1043 # get container image
1044 ret, image, err = self.check_mon_command({
1045 'prefix': 'config get',
1046 'who': utils.name_to_config_section(daemon_name),
1047 'key': 'container_image',
1048 })
1049 image = image.strip() # type: ignore
1050 elif daemon_type == 'prometheus':
1051 image = self.container_image_prometheus
1052 elif daemon_type == 'grafana':
1053 image = self.container_image_grafana
1054 elif daemon_type == 'alertmanager':
1055 image = self.container_image_alertmanager
1056 elif daemon_type == 'node-exporter':
1057 image = self.container_image_node_exporter
f91f0fd5
TL
1058 elif daemon_type == CustomContainerService.TYPE:
1059 # The image can't be resolved, the necessary information
1060 # is only available when a container is deployed (given
1061 # via spec).
1062 image = None
f6b5b4d7
TL
1063 else:
1064 assert False, daemon_type
1065
1066 self.log.debug('%s container image %s' % (daemon_name, image))
1067
1068 return image
1069
1070 def _run_cephadm(self,
1071 host: str,
1072 entity: Union[CephadmNoImage, str],
1073 command: str,
1074 args: List[str],
1075 addr: Optional[str] = "",
1076 stdin: Optional[str] = "",
1077 no_fsid: Optional[bool] = False,
1078 error_ok: Optional[bool] = False,
1079 image: Optional[str] = "",
f91f0fd5 1080 env_vars: Optional[List[str]] = None,
f6b5b4d7
TL
1081 ) -> Tuple[List[str], List[str], int]:
1082 """
1083 Run cephadm on the remote host with the given command + args
1084
1085 :env_vars: in format -> [KEY=VALUE, ..]
1086 """
1087 with self._remote_connection(host, addr) as tpl:
1088 conn, connr = tpl
9f95a23c 1089 assert image or entity
f6b5b4d7
TL
1090 if not image and entity is not cephadmNoImage:
1091 image = self._get_container_image(entity)
9f95a23c
TL
1092
1093 final_args = []
e306af50
TL
1094
1095 if env_vars:
1096 for env_var_pair in env_vars:
1097 final_args.extend(['--env', env_var_pair])
1098
9f95a23c
TL
1099 if image:
1100 final_args.extend(['--image', image])
1101 final_args.append(command)
1102
1103 if not no_fsid:
1104 final_args += ['--fsid', self._cluster_fsid]
f91f0fd5
TL
1105
1106 if self.container_init:
1107 final_args += ['--container-init']
1108
9f95a23c
TL
1109 final_args += args
1110
e306af50 1111 self.log.debug('args: %s' % (' '.join(final_args)))
9f95a23c 1112 if self.mode == 'root':
9f95a23c
TL
1113 if stdin:
1114 self.log.debug('stdin: %s' % stdin)
1115 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1116 if stdin:
1117 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1118 script += self._cephadm
1119 python = connr.choose_python()
1120 if not python:
1121 raise RuntimeError(
1122 'unable to find python on %s (tried %s in %s)' % (
1123 host, remotes.PYTHONS, remotes.PATH))
1124 try:
1125 out, err, code = remoto.process.check(
f91f0fd5
TL
1126 conn,
1127 [python, '-u'],
1128 stdin=script.encode('utf-8'))
9f95a23c
TL
1129 except RuntimeError as e:
1130 self._reset_con(host)
1131 if error_ok:
1132 return [], [str(e)], 1
1133 raise
1134 elif self.mode == 'cephadm-package':
1135 try:
1136 out, err, code = remoto.process.check(
1137 conn,
1138 ['sudo', '/usr/bin/cephadm'] + final_args,
1139 stdin=stdin)
1140 except RuntimeError as e:
1141 self._reset_con(host)
1142 if error_ok:
1143 return [], [str(e)], 1
1144 raise
1145 else:
1146 assert False, 'unsupported mode'
1147
1148 self.log.debug('code: %d' % code)
1149 if out:
1150 self.log.debug('out: %s' % '\n'.join(out))
1151 if err:
1152 self.log.debug('err: %s' % '\n'.join(err))
1153 if code and not error_ok:
f6b5b4d7 1154 raise OrchestratorError(
9f95a23c
TL
1155 'cephadm exited with an error code: %d, stderr:%s' % (
1156 code, '\n'.join(err)))
1157 return out, err, code
1158
f91f0fd5
TL
1159 def _hosts_with_daemon_inventory(self) -> List[HostSpec]:
1160 """
1161 Returns all hosts that went through _refresh_host_daemons().
9f95a23c 1162
f91f0fd5
TL
1163 This mitigates a potential race, where new host was added *after*
1164 ``_refresh_host_daemons()`` was called, but *before*
1165 ``_apply_all_specs()`` was called. thus we end up with a hosts
1166 where daemons might be running, but we have not yet detected them.
1167 """
1168 return [
1169 h for h in self.inventory.all_specs()
1170 if self.cache.host_had_daemon_refresh(h.hostname)
1171 ]
9f95a23c 1172
e306af50 1173 def _add_host(self, spec):
9f95a23c
TL
1174 # type: (HostSpec) -> str
1175 """
1176 Add a host to be managed by the orchestrator.
1177
1178 :param host: host name
1179 """
1180 assert_valid_host(spec.hostname)
f6b5b4d7 1181 out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
9f95a23c
TL
1182 ['--expect-hostname', spec.hostname],
1183 addr=spec.addr,
1184 error_ok=True, no_fsid=True)
1185 if code:
1186 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1187 spec.hostname, spec.addr, err))
1188
e306af50 1189 self.inventory.add_host(spec)
9f95a23c 1190 self.cache.prime_empty_host(spec.hostname)
1911f103 1191 self.offline_hosts_remove(spec.hostname)
9f95a23c
TL
1192 self.event.set() # refresh stray health check
1193 self.log.info('Added host %s' % spec.hostname)
1194 return "Added host '{}'".format(spec.hostname)
1195
e306af50
TL
1196 @trivial_completion
1197 def add_host(self, spec: HostSpec) -> str:
1198 return self._add_host(spec)
1199
1200 @trivial_completion
9f95a23c
TL
1201 def remove_host(self, host):
1202 # type: (str) -> str
1203 """
1204 Remove a host from orchestrator management.
1205
1206 :param host: host name
1207 """
e306af50 1208 self.inventory.rm_host(host)
9f95a23c
TL
1209 self.cache.rm_host(host)
1210 self._reset_con(host)
1211 self.event.set() # refresh stray health check
1212 self.log.info('Removed host %s' % host)
1213 return "Removed host '{}'".format(host)
1214
e306af50 1215 @trivial_completion
f6b5b4d7 1216 def update_host_addr(self, host, addr) -> str:
e306af50 1217 self.inventory.set_addr(host, addr)
9f95a23c
TL
1218 self._reset_con(host)
1219 self.event.set() # refresh stray health check
1220 self.log.info('Set host %s addr to %s' % (host, addr))
1221 return "Updated host '{}' addr to '{}'".format(host, addr)
1222
1223 @trivial_completion
1224 def get_hosts(self):
1225 # type: () -> List[orchestrator.HostSpec]
1226 """
1227 Return a list of hosts managed by the orchestrator.
1228
1229 Notes:
1230 - skip async: manager reads from cache.
1231 """
e306af50 1232 return list(self.inventory.all_specs())
9f95a23c 1233
e306af50 1234 @trivial_completion
f6b5b4d7 1235 def add_host_label(self, host, label) -> str:
e306af50 1236 self.inventory.add_label(host, label)
9f95a23c
TL
1237 self.log.info('Added label %s to host %s' % (label, host))
1238 return 'Added label %s to host %s' % (label, host)
1239
e306af50 1240 @trivial_completion
f6b5b4d7 1241 def remove_host_label(self, host, label) -> str:
e306af50 1242 self.inventory.rm_label(host, label)
9f95a23c
TL
1243 self.log.info('Removed label %s to host %s' % (label, host))
1244 return 'Removed label %s from host %s' % (label, host)
1245
f6b5b4d7
TL
1246 @trivial_completion
1247 def host_ok_to_stop(self, hostname: str):
1248 if hostname not in self.cache.get_hosts():
1249 raise OrchestratorError(f'Cannot find host "{hostname}"')
1250
1251 daemons = self.cache.get_daemons()
1252 daemon_map = defaultdict(lambda: [])
1253 for dd in daemons:
1254 if dd.hostname == hostname:
1255 daemon_map[dd.daemon_type].append(dd.daemon_id)
1256
f91f0fd5 1257 for daemon_type, daemon_ids in daemon_map.items():
f6b5b4d7
TL
1258 r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
1259 if r.retval:
1260 self.log.error(f'It is NOT safe to stop host {hostname}')
1261 raise orchestrator.OrchestratorError(
f91f0fd5
TL
1262 r.stderr,
1263 errno=r.retval)
f6b5b4d7
TL
1264
1265 msg = f'It is presumed safe to stop host {hostname}'
1266 self.log.info(msg)
1267 return msg
1268
f91f0fd5
TL
1269 def get_minimal_ceph_conf(self) -> str:
1270 _, config, _ = self.check_mon_command({
f6b5b4d7
TL
1271 "prefix": "config generate-minimal-conf",
1272 })
f91f0fd5
TL
1273 extra = self.extra_ceph_conf().conf
1274 if extra:
1275 config += '\n\n' + extra.strip() + '\n'
1276 return config
f6b5b4d7
TL
1277
1278 def _invalidate_daemons_and_kick_serve(self, filter_host=None):
1279 if filter_host:
1280 self.cache.invalidate_host_daemons(filter_host)
1281 else:
1282 for h in self.cache.get_hosts():
1283 # Also discover daemons deployed manually
1284 self.cache.invalidate_host_daemons(h)
1285
1286 self._kick_serve_loop()
1287
9f95a23c 1288 @trivial_completion
f6b5b4d7
TL
1289 def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
1290 refresh: bool = False) -> List[orchestrator.ServiceDescription]:
9f95a23c 1291 if refresh:
f6b5b4d7
TL
1292 self._invalidate_daemons_and_kick_serve()
1293 self.log.info('Kicked serve() loop to refresh all services')
1294
9f95a23c 1295 # <service_map>
f6b5b4d7 1296 sm: Dict[str, orchestrator.ServiceDescription] = {}
e306af50 1297 osd_count = 0
1911f103 1298 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c
TL
1299 for name, dd in dm.items():
1300 if service_type and service_type != dd.daemon_type:
1301 continue
1302 n: str = dd.service_name()
1303 if service_name and service_name != n:
1304 continue
1305 if dd.daemon_type == 'osd':
e306af50
TL
1306 """
1307 OSDs do not know the affinity to their spec out of the box.
1308 """
1309 n = f"osd.{dd.osdspec_affinity}"
f6b5b4d7
TL
1310 if not dd.osdspec_affinity:
1311 # If there is no osdspec_affinity, the spec should suffice for displaying
1312 continue
e306af50
TL
1313 if n in self.spec_store.specs:
1314 spec = self.spec_store.specs[n]
1911f103
TL
1315 else:
1316 spec = ServiceSpec(
1317 unmanaged=True,
1318 service_type=dd.daemon_type,
1319 service_id=dd.service_id(),
1320 placement=PlacementSpec(
1321 hosts=[dd.hostname]
1322 )
1323 )
9f95a23c
TL
1324 if n not in sm:
1325 sm[n] = orchestrator.ServiceDescription(
9f95a23c
TL
1326 last_refresh=dd.last_refresh,
1327 container_image_id=dd.container_image_id,
1328 container_image_name=dd.container_image_name,
1329 spec=spec,
f6b5b4d7 1330 events=self.events.get_for_service(spec.service_name()),
9f95a23c 1331 )
e306af50
TL
1332 if n in self.spec_store.specs:
1333 if dd.daemon_type == 'osd':
1334 """
1335 The osd count can't be determined by the Placement spec.
f6b5b4d7 1336 Showing an actual/expected representation cannot be determined
e306af50
TL
1337 here. So we're setting running = size for now.
1338 """
1339 osd_count += 1
1340 sm[n].size = osd_count
1341 else:
f6b5b4d7
TL
1342 sm[n].size = spec.placement.get_host_selection_size(
1343 self.inventory.all_specs())
e306af50
TL
1344
1345 sm[n].created = self.spec_store.spec_created[n]
1911f103
TL
1346 if service_type == 'nfs':
1347 spec = cast(NFSServiceSpec, spec)
1348 sm[n].rados_config_location = spec.rados_config_location()
9f95a23c
TL
1349 else:
1350 sm[n].size = 0
1351 if dd.status == 1:
1352 sm[n].running += 1
1353 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1354 sm[n].last_refresh = dd.last_refresh
1355 if sm[n].container_image_id != dd.container_image_id:
1356 sm[n].container_image_id = 'mix'
1357 if sm[n].container_image_name != dd.container_image_name:
1358 sm[n].container_image_name = 'mix'
1359 for n, spec in self.spec_store.specs.items():
1360 if n in sm:
1361 continue
1362 if service_type is not None and service_type != spec.service_type:
1363 continue
1364 if service_name is not None and service_name != n:
1365 continue
1366 sm[n] = orchestrator.ServiceDescription(
9f95a23c 1367 spec=spec,
f6b5b4d7 1368 size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
9f95a23c 1369 running=0,
f6b5b4d7 1370 events=self.events.get_for_service(spec.service_name()),
9f95a23c 1371 )
1911f103
TL
1372 if service_type == 'nfs':
1373 spec = cast(NFSServiceSpec, spec)
1374 sm[n].rados_config_location = spec.rados_config_location()
1375 return list(sm.values())
9f95a23c
TL
1376
1377 @trivial_completion
f6b5b4d7
TL
1378 def list_daemons(self,
1379 service_name: Optional[str] = None,
1380 daemon_type: Optional[str] = None,
1381 daemon_id: Optional[str] = None,
1382 host: Optional[str] = None,
1383 refresh: bool = False) -> List[orchestrator.DaemonDescription]:
9f95a23c 1384 if refresh:
f6b5b4d7
TL
1385 self._invalidate_daemons_and_kick_serve(host)
1386 self.log.info('Kicked serve() loop to refresh all daemons')
1387
9f95a23c 1388 result = []
1911f103 1389 for h, dm in self.cache.get_daemons_with_volatile_status():
9f95a23c
TL
1390 if host and h != host:
1391 continue
1392 for name, dd in dm.items():
801d1391
TL
1393 if daemon_type is not None and daemon_type != dd.daemon_type:
1394 continue
1395 if daemon_id is not None and daemon_id != dd.daemon_id:
9f95a23c 1396 continue
801d1391 1397 if service_name is not None and service_name != dd.service_name():
9f95a23c
TL
1398 continue
1399 result.append(dd)
1400 return result
1401
e306af50 1402 @trivial_completion
f6b5b4d7 1403 def service_action(self, action, service_name) -> List[str]:
9f95a23c
TL
1404 args = []
1405 for host, dm in self.cache.daemons.items():
1406 for name, d in dm.items():
1407 if d.matches_service(service_name):
1408 args.append((d.daemon_type, d.daemon_id,
1409 d.hostname, action))
1410 self.log.info('%s service %s' % (action.capitalize(), service_name))
1411 return self._daemon_actions(args)
1412
e306af50 1413 @forall_hosts
f6b5b4d7
TL
1414 def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
1415 with set_exception_subject('daemon', DaemonDescription(
1416 daemon_type=daemon_type,
1417 daemon_id=daemon_id
1418 ).name()):
1419 return self._daemon_action(daemon_type, daemon_id, host, action)
1420
f91f0fd5 1421 def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str:
f6b5b4d7
TL
1422 daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
1423 host=host,
1424 daemon_id=daemon_id,
1425 daemon_type=daemon_type,
1426 )
1427
f91f0fd5 1428 self._daemon_action_set_image(action, image, daemon_type, daemon_id)
9f95a23c 1429
9f95a23c 1430 if action == 'redeploy':
f91f0fd5
TL
1431 if self.daemon_is_self(daemon_type, daemon_id):
1432 self.mgr_service.fail_over()
1433 return '' # unreachable
9f95a23c 1434 # stop, recreate the container+unit, then restart
f6b5b4d7 1435 return self._create_daemon(daemon_spec)
9f95a23c 1436 elif action == 'reconfig':
f6b5b4d7 1437 return self._create_daemon(daemon_spec, reconfig=True)
9f95a23c
TL
1438
1439 actions = {
1440 'start': ['reset-failed', 'start'],
1441 'stop': ['stop'],
1442 'restart': ['reset-failed', 'restart'],
1443 }
f6b5b4d7 1444 name = daemon_spec.name()
9f95a23c 1445 for a in actions[action]:
f6b5b4d7
TL
1446 try:
1447 out, err, code = self._run_cephadm(
1448 host, name, 'unit',
1449 ['--name', name, a])
1450 except Exception:
1451 self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
1452 self.cache.invalidate_host_daemons(daemon_spec.host)
1453 msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
1454 self.events.for_daemon(name, 'INFO', msg)
1455 return msg
9f95a23c 1456
f91f0fd5
TL
1457 def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str):
1458 if image is not None:
1459 if action != 'redeploy':
1460 raise OrchestratorError(
1461 f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
1462 if daemon_type not in CEPH_TYPES:
1463 raise OrchestratorError(
1464 f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
1465 f'types are: {", ".join(CEPH_TYPES)}')
1466
1467 self.check_mon_command({
1468 'prefix': 'config set',
1469 'name': 'container_image',
1470 'value': image,
1471 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
1472 })
1473
e306af50 1474 @trivial_completion
f91f0fd5 1475 def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str:
f6b5b4d7
TL
1476 d = self.cache.get_daemon(daemon_name)
1477
f91f0fd5
TL
1478 if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \
1479 and not self.mgr_service.mgr_map_has_standby():
1480 raise OrchestratorError(
1481 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1482
1483 self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id)
1484
1485 self.log.info(f'Schedule {action} daemon {daemon_name}')
1486 return self._schedule_daemon_action(daemon_name, action)
1487
1488 def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool:
1489 return daemon_type == 'mgr' and daemon_id == self.get_mgr_id()
1490
1491 def _schedule_daemon_action(self, daemon_name: str, action: str):
1492 dd = self.cache.get_daemon(daemon_name)
1493 if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \
1494 and not self.mgr_service.mgr_map_has_standby():
1495 raise OrchestratorError(
1496 f'Unable to schedule redeploy for {daemon_name}: No standby MGRs')
1497 self.cache.schedule_daemon_action(dd.hostname, dd.name(), action)
1498 msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname)
1499 self._kick_serve_loop()
1500 return msg
9f95a23c 1501
e306af50 1502 @trivial_completion
9f95a23c 1503 def remove_daemons(self, names):
e306af50 1504 # type: (List[str]) -> List[str]
9f95a23c
TL
1505 args = []
1506 for host, dm in self.cache.daemons.items():
1507 for name in names:
1508 if name in dm:
1509 args.append((name, host))
1510 if not args:
1511 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1512 self.log.info('Remove daemons %s' % [a[0] for a in args])
1513 return self._remove_daemons(args)
1514
1515 @trivial_completion
f6b5b4d7 1516 def remove_service(self, service_name) -> str:
9f95a23c 1517 self.log.info('Remove service %s' % service_name)
e306af50 1518 self._trigger_preview_refresh(service_name=service_name)
1911f103
TL
1519 found = self.spec_store.rm(service_name)
1520 if found:
1521 self._kick_serve_loop()
f6b5b4d7 1522 return 'Removed service %s' % service_name
1911f103
TL
1523 else:
1524 # must be idempotent: still a success.
f6b5b4d7 1525 return f'Failed to remove service. <{service_name}> was not found.'
9f95a23c
TL
1526
1527 @trivial_completion
f6b5b4d7 1528 def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
9f95a23c
TL
1529 """
1530 Return the storage inventory of hosts matching the given filter.
1531
1532 :param host_filter: host filter
1533
1534 TODO:
1535 - add filtering by label
1536 """
1537 if refresh:
f6b5b4d7
TL
1538 if host_filter and host_filter.hosts:
1539 for h in host_filter.hosts:
1540 self.cache.invalidate_host_devices(h)
9f95a23c 1541 else:
f6b5b4d7
TL
1542 for h in self.cache.get_hosts():
1543 self.cache.invalidate_host_devices(h)
1544
1545 self.event.set()
1546 self.log.info('Kicked serve() loop to refresh devices')
9f95a23c
TL
1547
1548 result = []
1549 for host, dls in self.cache.devices.items():
f6b5b4d7 1550 if host_filter and host_filter.hosts and host not in host_filter.hosts:
9f95a23c
TL
1551 continue
1552 result.append(orchestrator.InventoryHost(host,
1553 inventory.Devices(dls)))
1554 return result
1555
1556 @trivial_completion
f6b5b4d7 1557 def zap_device(self, host, path) -> str:
9f95a23c
TL
1558 self.log.info('Zap device %s:%s' % (host, path))
1559 out, err, code = self._run_cephadm(
1560 host, 'osd', 'ceph-volume',
1561 ['--', 'lvm', 'zap', '--destroy', path],
1562 error_ok=True)
1563 self.cache.invalidate_host_devices(host)
1564 if code:
1565 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
1566 return '\n'.join(out + err)
1567
e306af50 1568 @trivial_completion
f91f0fd5
TL
1569 def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
1570 """
1571 Blink a device light. Calling something like::
1572
1573 lsmcli local-disk-ident-led-on --path $path
1574
1575 If you must, you can customize this via::
1576
1577 ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>'
1578 ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>'
1579
1580 See templates/blink_device_light_cmd.j2
1581 """
e306af50 1582 @forall_hosts
9f95a23c 1583 def blink(host, dev, path):
f91f0fd5
TL
1584 cmd_line = self.template.render('blink_device_light_cmd.j2',
1585 {
1586 'on': on,
1587 'ident_fault': ident_fault,
1588 'dev': dev,
1589 'path': path
1590 },
1591 host=host)
1592 cmd_args = shlex.split(cmd_line)
1593
9f95a23c 1594 out, err, code = self._run_cephadm(
f91f0fd5 1595 host, 'osd', 'shell', ['--'] + cmd_args,
9f95a23c
TL
1596 error_ok=True)
1597 if code:
f6b5b4d7 1598 raise OrchestratorError(
9f95a23c 1599 'Unable to affect %s light for %s:%s. Command: %s' % (
f91f0fd5 1600 ident_fault, host, dev, ' '.join(cmd_args)))
9f95a23c
TL
1601 self.log.info('Set %s light for %s:%s %s' % (
1602 ident_fault, host, dev, 'on' if on else 'off'))
1603 return "Set %s light for %s:%s %s" % (
1604 ident_fault, host, dev, 'on' if on else 'off')
1605
1606 return blink(locs)
1607
1608 def get_osd_uuid_map(self, only_up=False):
1911f103 1609 # type: (bool) -> Dict[str, str]
9f95a23c
TL
1610 osd_map = self.get('osd_map')
1611 r = {}
1612 for o in osd_map['osds']:
1613 # only include OSDs that have ever started in this map. this way
1614 # an interrupted osd create can be repeated and succeed the second
1615 # time around.
1911f103
TL
1616 osd_id = o.get('osd')
1617 if osd_id is None:
1618 raise OrchestratorError("Could not retrieve osd_id from osd_map")
1619 if not only_up or (o['up_from'] > 0):
1620 r[str(osd_id)] = o.get('uuid', '')
9f95a23c
TL
1621 return r
1622
e306af50
TL
1623 def _trigger_preview_refresh(self,
1624 specs: Optional[List[DriveGroupSpec]] = None,
f6b5b4d7
TL
1625 service_name: Optional[str] = None,
1626 ) -> None:
1627 # Only trigger a refresh when a spec has changed
1628 trigger_specs = []
1629 if specs:
1630 for spec in specs:
1631 preview_spec = self.spec_store.spec_preview.get(spec.service_name())
1632 # the to-be-preview spec != the actual spec, this means we need to
1633 # trigger a refresh, if the spec has been removed (==None) we need to
1634 # refresh as well.
1635 if not preview_spec or spec != preview_spec:
1636 trigger_specs.append(spec)
1637 if service_name:
1638 trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
1639 if not any(trigger_specs):
1640 return None
1641
1642 refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
e306af50
TL
1643 for host in refresh_hosts:
1644 self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
1645 self.cache.osdspec_previews_refresh_queue.append(host)
1646
9f95a23c 1647 @trivial_completion
f6b5b4d7
TL
1648 def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
1649 """
1650 Deprecated. Please use `apply()` instead.
1651
1652 Keeping this around to be compapatible to mgr/dashboard
1653 """
9f95a23c
TL
1654 return [self._apply(spec) for spec in specs]
1655
1656 @trivial_completion
f6b5b4d7
TL
1657 def create_osds(self, drive_group: DriveGroupSpec) -> str:
1658 return self.osd_service.create_from_spec(drive_group)
9f95a23c 1659
f6b5b4d7
TL
1660 def _preview_osdspecs(self,
1661 osdspecs: Optional[List[DriveGroupSpec]] = None
1662 ):
1663 if not osdspecs:
1664 return {'n/a': [{'error': True,
1665 'message': 'No OSDSpec or matching hosts found.'}]}
1666 matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
e306af50
TL
1667 if not matching_hosts:
1668 return {'n/a': [{'error': True,
1669 'message': 'No OSDSpec or matching hosts found.'}]}
1670 # Is any host still loading previews
1671 pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
1672 if pending_hosts:
1673 # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
1674 return {'n/a': [{'error': True,
1675 'message': 'Preview data is being generated.. '
f6b5b4d7
TL
1676 'Please re-run this command in a bit.'}]}
1677 # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
1678 previews_for_specs = {}
1679 for host, raw_reports in self.cache.osdspec_previews.items():
1680 if host not in matching_hosts:
1681 continue
1682 osd_reports = []
1683 for osd_report in raw_reports:
1684 if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
1685 osd_reports.append(osd_report)
1686 previews_for_specs.update({host: osd_reports})
1687 return previews_for_specs
9f95a23c
TL
1688
1689 def _calc_daemon_deps(self, daemon_type, daemon_id):
1690 need = {
1691 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
1692 'grafana': ['prometheus'],
801d1391 1693 'alertmanager': ['mgr', 'alertmanager'],
9f95a23c
TL
1694 }
1695 deps = []
1696 for dep_type in need.get(daemon_type, []):
1697 for dd in self.cache.get_daemons_by_service(dep_type):
1698 deps.append(dd.name())
1699 return sorted(deps)
1700
e306af50 1701 def _create_daemon(self,
f6b5b4d7 1702 daemon_spec: CephadmDaemonSpec,
9f95a23c 1703 reconfig=False,
e306af50 1704 osd_uuid_map: Optional[Dict[str, Any]] = None,
e306af50
TL
1705 ) -> str:
1706
f6b5b4d7
TL
1707 with set_exception_subject('service', orchestrator.DaemonDescription(
1708 daemon_type=daemon_spec.daemon_type,
1709 daemon_id=daemon_spec.daemon_id,
1710 hostname=daemon_spec.host,
1711 ).service_id(), overwrite=True):
1712
f91f0fd5 1713 image = ''
f6b5b4d7 1714 start_time = datetime.datetime.utcnow()
f91f0fd5
TL
1715 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1716
1717 if daemon_spec.daemon_type == 'container':
1718 spec: Optional[CustomContainerSpec] = daemon_spec.spec
1719 if spec is None:
1720 # Exit here immediately because the required service
1721 # spec to create a daemon is not provided. This is only
1722 # provided when a service is applied via 'orch apply'
1723 # command.
1724 msg = "Failed to {} daemon {} on {}: Required " \
1725 "service specification not provided".format(
1726 'reconfigure' if reconfig else 'deploy',
1727 daemon_spec.name(), daemon_spec.host)
1728 self.log.info(msg)
1729 return msg
1730 image = spec.image
1731 if spec.ports:
1732 ports.extend(spec.ports)
1733
1734 cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(
1735 daemon_spec)
f6b5b4d7
TL
1736
1737 # TCP port to open in the host firewall
f91f0fd5
TL
1738 if len(ports) > 0:
1739 daemon_spec.extra_args.extend([
1740 '--tcp-ports', ' '.join(map(str, ports))
1741 ])
9f95a23c
TL
1742
1743 # osd deployments needs an --osd-uuid arg
f6b5b4d7 1744 if daemon_spec.daemon_type == 'osd':
9f95a23c
TL
1745 if not osd_uuid_map:
1746 osd_uuid_map = self.get_osd_uuid_map()
f6b5b4d7 1747 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
9f95a23c 1748 if not osd_uuid:
f6b5b4d7
TL
1749 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1750 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
9f95a23c 1751
f6b5b4d7
TL
1752 if reconfig:
1753 daemon_spec.extra_args.append('--reconfig')
1754 if self.allow_ptrace:
1755 daemon_spec.extra_args.append('--allow-ptrace')
9f95a23c 1756
f6b5b4d7 1757 if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
f91f0fd5
TL
1758 self._registry_login(daemon_spec.host, self.registry_url,
1759 self.registry_username, self.registry_password)
1760
1761 daemon_spec.extra_args.extend(['--config-json', '-'])
9f95a23c 1762
f6b5b4d7
TL
1763 self.log.info('%s daemon %s on %s' % (
1764 'Reconfiguring' if reconfig else 'Deploying',
1765 daemon_spec.name(), daemon_spec.host))
1766
1767 out, err, code = self._run_cephadm(
1768 daemon_spec.host, daemon_spec.name(), 'deploy',
1769 [
1770 '--name', daemon_spec.name(),
1771 ] + daemon_spec.extra_args,
f91f0fd5
TL
1772 stdin=json.dumps(cephadm_config),
1773 image=image)
f6b5b4d7
TL
1774 if not code and daemon_spec.host in self.cache.daemons:
1775 # prime cached service state with what we (should have)
1776 # just created
1777 sd = orchestrator.DaemonDescription()
1778 sd.daemon_type = daemon_spec.daemon_type
1779 sd.daemon_id = daemon_spec.daemon_id
1780 sd.hostname = daemon_spec.host
1781 sd.status = 1
1782 sd.status_desc = 'starting'
1783 self.cache.add_daemon(daemon_spec.host, sd)
f91f0fd5 1784 if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
f6b5b4d7
TL
1785 self.requires_post_actions.add(daemon_spec.daemon_type)
1786 self.cache.invalidate_host_daemons(daemon_spec.host)
f91f0fd5
TL
1787 self.cache.update_daemon_config_deps(
1788 daemon_spec.host, daemon_spec.name(), deps, start_time)
f6b5b4d7
TL
1789 self.cache.save_host(daemon_spec.host)
1790 msg = "{} {} on host '{}'".format(
1791 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1792 if not code:
1793 self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1794 else:
1795 what = 'reconfigure' if reconfig else 'deploy'
f91f0fd5
TL
1796 self.events.for_daemon(
1797 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
f6b5b4d7 1798 return msg
9f95a23c 1799
e306af50
TL
1800 @forall_hosts
1801 def _remove_daemons(self, name, host) -> str:
9f95a23c
TL
1802 return self._remove_daemon(name, host)
1803
e306af50 1804 def _remove_daemon(self, name, host) -> str:
9f95a23c
TL
1805 """
1806 Remove a daemon
1807 """
1808 (daemon_type, daemon_id) = name.split('.', 1)
f91f0fd5
TL
1809 daemon = orchestrator.DaemonDescription(
1810 daemon_type=daemon_type,
1811 daemon_id=daemon_id,
1812 hostname=host)
9f95a23c 1813
f91f0fd5 1814 with set_exception_subject('service', daemon.service_id(), overwrite=True):
f6b5b4d7 1815
f91f0fd5 1816 self.cephadm_services[daemon_type].pre_remove(daemon)
f6b5b4d7
TL
1817
1818 args = ['--name', name, '--force']
1819 self.log.info('Removing daemon %s from %s' % (name, host))
1820 out, err, code = self._run_cephadm(
1821 host, name, 'rm-daemon', args)
1822 if not code:
1823 # remove item from cache
1824 self.cache.rm_daemon(host, name)
1825 self.cache.invalidate_host_daemons(host)
e306af50 1826
f91f0fd5 1827 self.cephadm_services[daemon_type].post_remove(daemon)
9f95a23c 1828
f91f0fd5 1829 return "Removed {} from host '{}'".format(name, host)
9f95a23c 1830
e306af50
TL
1831 def _check_pool_exists(self, pool, service_name):
1832 logger.info(f'Checking pool "{pool}" exists for service {service_name}')
1833 if not self.rados.pool_exists(pool):
1834 raise OrchestratorError(f'Cannot find pool "{pool}" for '
1835 f'service {service_name}')
1836
9f95a23c 1837 def _add_daemon(self, daemon_type, spec,
f91f0fd5 1838 create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
9f95a23c
TL
1839 """
1840 Add (and place) a daemon. Require explicit host placement. Do not
1841 schedule, and do not apply the related scheduling limitations.
1842 """
1843 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
1844 if not spec.placement.hosts:
1845 raise OrchestratorError('must specify host(s) to deploy on')
1846 count = spec.placement.count or len(spec.placement.hosts)
1847 daemons = self.cache.get_daemons_by_service(spec.service_name())
1848 return self._create_daemons(daemon_type, spec, daemons,
1849 spec.placement.hosts, count,
1850 create_func, config_func)
1851
1852 def _create_daemons(self, daemon_type, spec, daemons,
1853 hosts, count,
f91f0fd5 1854 create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]:
9f95a23c
TL
1855 if count > len(hosts):
1856 raise OrchestratorError('too few hosts: want %d, have %s' % (
1857 count, hosts))
1858
f6b5b4d7 1859 did_config = False
9f95a23c 1860
f6b5b4d7 1861 args = [] # type: List[CephadmDaemonSpec]
9f95a23c
TL
1862 for host, network, name in hosts:
1863 daemon_id = self.get_unique_name(daemon_type, host, daemons,
e306af50
TL
1864 prefix=spec.service_id,
1865 forcename=name)
f6b5b4d7
TL
1866
1867 if not did_config and config_func:
1868 if daemon_type == 'rgw':
1869 config_func(spec, daemon_id)
1870 else:
1871 config_func(spec)
1872 did_config = True
1873
f91f0fd5
TL
1874 daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(
1875 host, daemon_id, network, spec)
9f95a23c
TL
1876 self.log.debug('Placing %s.%s on host %s' % (
1877 daemon_type, daemon_id, host))
f6b5b4d7 1878 args.append(daemon_spec)
9f95a23c
TL
1879
1880 # add to daemon list so next name(s) will also be unique
1881 sd = orchestrator.DaemonDescription(
1882 hostname=host,
1883 daemon_type=daemon_type,
1884 daemon_id=daemon_id,
1885 )
1886 daemons.append(sd)
1887
e306af50 1888 @forall_hosts
9f95a23c 1889 def create_func_map(*args):
f91f0fd5
TL
1890 daemon_spec = create_func(*args)
1891 return self._create_daemon(daemon_spec)
9f95a23c
TL
1892
1893 return create_func_map(args)
1894
1895 @trivial_completion
f6b5b4d7 1896 def apply_mon(self, spec) -> str:
9f95a23c
TL
1897 return self._apply(spec)
1898
e306af50 1899 @trivial_completion
9f95a23c 1900 def add_mon(self, spec):
e306af50 1901 # type: (ServiceSpec) -> List[str]
f91f0fd5 1902 return self._add_daemon('mon', spec, self.mon_service.prepare_create)
9f95a23c 1903
e306af50
TL
1904 @trivial_completion
1905 def add_mgr(self, spec):
1906 # type: (ServiceSpec) -> List[str]
f91f0fd5 1907 return self._add_daemon('mgr', spec, self.mgr_service.prepare_create)
9f95a23c 1908
e306af50
TL
1909 def _apply(self, spec: GenericSpec) -> str:
1910 if spec.service_type == 'host':
1911 return self._add_host(cast(HostSpec, spec))
9f95a23c 1912
f6b5b4d7
TL
1913 if spec.service_type == 'osd':
1914 # _trigger preview refresh needs to be smart and
1915 # should only refresh if a change has been detected
1916 self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
1917
e306af50 1918 return self._apply_service_spec(cast(ServiceSpec, spec))
9f95a23c 1919
f6b5b4d7
TL
1920 def _plan(self, spec: ServiceSpec):
1921 if spec.service_type == 'osd':
1922 return {'service_name': spec.service_name(),
1923 'service_type': spec.service_type,
1924 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
1925
1926 ha = HostAssignment(
1927 spec=spec,
f91f0fd5 1928 hosts=self._hosts_with_daemon_inventory(),
f6b5b4d7
TL
1929 get_daemons_func=self.cache.get_daemons_by_service,
1930 )
1931 ha.validate()
1932 hosts = ha.place()
1933
1934 add_daemon_hosts = ha.add_daemon_hosts(hosts)
1935 remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
1936
1937 return {
1938 'service_name': spec.service_name(),
1939 'service_type': spec.service_type,
1940 'add': [hs.hostname for hs in add_daemon_hosts],
1941 'remove': [d.hostname for d in remove_daemon_hosts]
1942 }
1943
1944 @trivial_completion
1945 def plan(self, specs: List[GenericSpec]) -> List:
1946 results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
1947 'to the current inventory setup. If any on these conditions changes, the \n'
1948 'preview will be invalid. Please make sure to have a minimal \n'
1949 'timeframe between planning and applying the specs.'}]
1950 if any([spec.service_type == 'host' for spec in specs]):
1951 return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
1952 for spec in specs:
1953 results.append(self._plan(cast(ServiceSpec, spec)))
1954 return results
1955
e306af50 1956 def _apply_service_spec(self, spec: ServiceSpec) -> str:
9f95a23c
TL
1957 if spec.placement.is_empty():
1958 # fill in default placement
1959 defaults = {
1960 'mon': PlacementSpec(count=5),
1961 'mgr': PlacementSpec(count=2),
1962 'mds': PlacementSpec(count=2),
1963 'rgw': PlacementSpec(count=2),
1911f103 1964 'iscsi': PlacementSpec(count=1),
9f95a23c 1965 'rbd-mirror': PlacementSpec(count=2),
801d1391 1966 'nfs': PlacementSpec(count=1),
9f95a23c
TL
1967 'grafana': PlacementSpec(count=1),
1968 'alertmanager': PlacementSpec(count=1),
1969 'prometheus': PlacementSpec(count=1),
1970 'node-exporter': PlacementSpec(host_pattern='*'),
1971 'crash': PlacementSpec(host_pattern='*'),
f91f0fd5 1972 'container': PlacementSpec(count=1),
9f95a23c
TL
1973 }
1974 spec.placement = defaults[spec.service_type]
1975 elif spec.service_type in ['mon', 'mgr'] and \
f91f0fd5
TL
1976 spec.placement.count is not None and \
1977 spec.placement.count < 1:
9f95a23c
TL
1978 raise OrchestratorError('cannot scale %s service below 1' % (
1979 spec.service_type))
1980
1981 HostAssignment(
1982 spec=spec,
f91f0fd5 1983 hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh
9f95a23c
TL
1984 get_daemons_func=self.cache.get_daemons_by_service,
1985 ).validate()
1986
1987 self.log.info('Saving service %s spec with placement %s' % (
1988 spec.service_name(), spec.placement.pretty_str()))
1989 self.spec_store.save(spec)
1990 self._kick_serve_loop()
1911f103 1991 return "Scheduled %s update..." % spec.service_name()
9f95a23c
TL
1992
1993 @trivial_completion
f6b5b4d7 1994 def apply(self, specs: List[GenericSpec]) -> List[str]:
e306af50
TL
1995 results = []
1996 for spec in specs:
1997 results.append(self._apply(spec))
1998 return results
9f95a23c
TL
1999
2000 @trivial_completion
f6b5b4d7 2001 def apply_mgr(self, spec) -> str:
9f95a23c
TL
2002 return self._apply(spec)
2003
e306af50 2004 @trivial_completion
f6b5b4d7 2005 def add_mds(self, spec: ServiceSpec) -> List[str]:
f91f0fd5 2006 return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config)
9f95a23c
TL
2007
2008 @trivial_completion
f6b5b4d7 2009 def apply_mds(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2010 return self._apply(spec)
2011
e306af50 2012 @trivial_completion
f6b5b4d7 2013 def add_rgw(self, spec) -> List[str]:
f91f0fd5 2014 return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config)
9f95a23c
TL
2015
2016 @trivial_completion
f6b5b4d7 2017 def apply_rgw(self, spec) -> str:
9f95a23c
TL
2018 return self._apply(spec)
2019
e306af50 2020 @trivial_completion
1911f103 2021 def add_iscsi(self, spec):
e306af50 2022 # type: (ServiceSpec) -> List[str]
f91f0fd5 2023 return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config)
1911f103
TL
2024
2025 @trivial_completion
f6b5b4d7 2026 def apply_iscsi(self, spec) -> str:
1911f103
TL
2027 return self._apply(spec)
2028
e306af50 2029 @trivial_completion
f6b5b4d7 2030 def add_rbd_mirror(self, spec) -> List[str]:
f91f0fd5 2031 return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create)
9f95a23c
TL
2032
2033 @trivial_completion
f6b5b4d7 2034 def apply_rbd_mirror(self, spec) -> str:
9f95a23c
TL
2035 return self._apply(spec)
2036
e306af50 2037 @trivial_completion
f6b5b4d7 2038 def add_nfs(self, spec) -> List[str]:
f91f0fd5 2039 return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config)
801d1391
TL
2040
2041 @trivial_completion
f6b5b4d7 2042 def apply_nfs(self, spec) -> str:
801d1391
TL
2043 return self._apply(spec)
2044
9f95a23c
TL
2045 def _get_dashboard_url(self):
2046 # type: () -> str
2047 return self.get('mgr_map').get('services', {}).get('dashboard', '')
2048
e306af50 2049 @trivial_completion
f6b5b4d7 2050 def add_prometheus(self, spec) -> List[str]:
f91f0fd5 2051 return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create)
9f95a23c
TL
2052
2053 @trivial_completion
f6b5b4d7 2054 def apply_prometheus(self, spec) -> str:
9f95a23c
TL
2055 return self._apply(spec)
2056
e306af50 2057 @trivial_completion
9f95a23c 2058 def add_node_exporter(self, spec):
e306af50 2059 # type: (ServiceSpec) -> List[str]
9f95a23c 2060 return self._add_daemon('node-exporter', spec,
f91f0fd5 2061 self.node_exporter_service.prepare_create)
9f95a23c
TL
2062
2063 @trivial_completion
f6b5b4d7 2064 def apply_node_exporter(self, spec) -> str:
9f95a23c
TL
2065 return self._apply(spec)
2066
e306af50 2067 @trivial_completion
9f95a23c 2068 def add_crash(self, spec):
e306af50 2069 # type: (ServiceSpec) -> List[str]
9f95a23c 2070 return self._add_daemon('crash', spec,
f91f0fd5 2071 self.crash_service.prepare_create)
9f95a23c
TL
2072
2073 @trivial_completion
f6b5b4d7 2074 def apply_crash(self, spec) -> str:
9f95a23c
TL
2075 return self._apply(spec)
2076
e306af50 2077 @trivial_completion
9f95a23c 2078 def add_grafana(self, spec):
e306af50 2079 # type: (ServiceSpec) -> List[str]
f91f0fd5 2080 return self._add_daemon('grafana', spec, self.grafana_service.prepare_create)
9f95a23c
TL
2081
2082 @trivial_completion
f6b5b4d7 2083 def apply_grafana(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2084 return self._apply(spec)
2085
e306af50 2086 @trivial_completion
9f95a23c 2087 def add_alertmanager(self, spec):
e306af50 2088 # type: (ServiceSpec) -> List[str]
f91f0fd5 2089 return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create)
9f95a23c
TL
2090
2091 @trivial_completion
f6b5b4d7 2092 def apply_alertmanager(self, spec: ServiceSpec) -> str:
9f95a23c
TL
2093 return self._apply(spec)
2094
f91f0fd5
TL
2095 @trivial_completion
2096 def add_container(self, spec: ServiceSpec) -> List[str]:
2097 return self._add_daemon('container', spec,
2098 self.container_service.prepare_create)
2099
2100 @trivial_completion
2101 def apply_container(self, spec: ServiceSpec) -> str:
2102 return self._apply(spec)
2103
2104 def _get_container_image_info(self, image_name) -> ContainerInspectInfo:
9f95a23c
TL
2105 # pick a random host...
2106 host = None
e306af50 2107 for host_name in self.inventory.keys():
9f95a23c
TL
2108 host = host_name
2109 break
2110 if not host:
2111 raise OrchestratorError('no hosts defined')
f6b5b4d7 2112 if self.cache.host_needs_registry_login(host) and self.registry_url:
f91f0fd5
TL
2113 self._registry_login(host, self.registry_url,
2114 self.registry_username, self.registry_password)
9f95a23c 2115 out, err, code = self._run_cephadm(
f6b5b4d7 2116 host, '', 'pull', [],
9f95a23c
TL
2117 image=image_name,
2118 no_fsid=True,
2119 error_ok=True)
2120 if code:
2121 raise OrchestratorError('Failed to pull %s on %s: %s' % (
2122 image_name, host, '\n'.join(out)))
f91f0fd5
TL
2123 try:
2124 j = json.loads('\n'.join(out))
2125 r = ContainerInspectInfo(
2126 j['image_id'],
2127 j.get('ceph_version'),
2128 j.get('repo_digest')
2129 )
2130 self.log.debug(f'image {image_name} -> {r}')
2131 return r
2132 except (ValueError, KeyError) as _:
2133 msg = 'Failed to pull %s on %s: %s' % (image_name, host, '\n'.join(out))
2134 self.log.exception(msg)
2135 raise OrchestratorError(msg)
9f95a23c
TL
2136
2137 @trivial_completion
f6b5b4d7 2138 def upgrade_check(self, image, version) -> str:
9f95a23c
TL
2139 if version:
2140 target_name = self.container_image_base + ':v' + version
2141 elif image:
2142 target_name = image
2143 else:
2144 raise OrchestratorError('must specify either image or version')
2145
f91f0fd5
TL
2146 image_info = self._get_container_image_info(target_name)
2147 self.log.debug(f'image info {image} -> {image_info}')
9f95a23c
TL
2148 r = {
2149 'target_name': target_name,
f91f0fd5
TL
2150 'target_id': image_info.image_id,
2151 'target_version': image_info.ceph_version,
9f95a23c
TL
2152 'needs_update': dict(),
2153 'up_to_date': list(),
2154 }
2155 for host, dm in self.cache.daemons.items():
2156 for name, dd in dm.items():
f91f0fd5 2157 if image_info.image_id == dd.container_image_id:
9f95a23c
TL
2158 r['up_to_date'].append(dd.name())
2159 else:
2160 r['needs_update'][dd.name()] = {
2161 'current_name': dd.container_image_name,
2162 'current_id': dd.container_image_id,
2163 'current_version': dd.version,
2164 }
f91f0fd5
TL
2165 if self.use_repo_digest:
2166 r['target_digest'] = image_info.repo_digest
2167
9f95a23c
TL
2168 return json.dumps(r, indent=4, sort_keys=True)
2169
2170 @trivial_completion
f6b5b4d7 2171 def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
e306af50 2172 return self.upgrade.upgrade_status()
9f95a23c
TL
2173
2174 @trivial_completion
f6b5b4d7 2175 def upgrade_start(self, image, version) -> str:
e306af50 2176 return self.upgrade.upgrade_start(image, version)
9f95a23c
TL
2177
2178 @trivial_completion
f6b5b4d7 2179 def upgrade_pause(self) -> str:
e306af50 2180 return self.upgrade.upgrade_pause()
9f95a23c
TL
2181
2182 @trivial_completion
f6b5b4d7 2183 def upgrade_resume(self) -> str:
e306af50 2184 return self.upgrade.upgrade_resume()
9f95a23c
TL
2185
2186 @trivial_completion
f6b5b4d7 2187 def upgrade_stop(self) -> str:
e306af50 2188 return self.upgrade.upgrade_stop()
9f95a23c
TL
2189
2190 @trivial_completion
2191 def remove_osds(self, osd_ids: List[str],
2192 replace: bool = False,
f6b5b4d7 2193 force: bool = False) -> str:
9f95a23c
TL
2194 """
2195 Takes a list of OSDs and schedules them for removal.
2196 The function that takes care of the actual removal is
f6b5b4d7 2197 process_removal_queue().
9f95a23c
TL
2198 """
2199
f6b5b4d7
TL
2200 daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
2201 to_remove_daemons = list()
9f95a23c 2202 for daemon in daemons:
f6b5b4d7
TL
2203 if daemon.daemon_id in osd_ids:
2204 to_remove_daemons.append(daemon)
2205 if not to_remove_daemons:
2206 return f"Unable to find OSDs: {osd_ids}"
9f95a23c 2207
f6b5b4d7
TL
2208 for daemon in to_remove_daemons:
2209 try:
2210 self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
2211 replace=replace,
2212 force=force,
2213 hostname=daemon.hostname,
2214 fullname=daemon.name(),
2215 process_started_at=datetime.datetime.utcnow(),
2216 remove_util=self.rm_util))
2217 except NotFoundError:
2218 return f"Unable to find OSDs: {osd_ids}"
9f95a23c
TL
2219
2220 # trigger the serve loop to initiate the removal
2221 self._kick_serve_loop()
2222 return "Scheduled OSD(s) for removal"
2223
f6b5b4d7
TL
2224 @trivial_completion
2225 def stop_remove_osds(self, osd_ids: List[str]):
2226 """
2227 Stops a `removal` process for a List of OSDs.
2228 This will revert their weight and remove it from the osds_to_remove queue
2229 """
2230 for osd_id in osd_ids:
2231 try:
2232 self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
2233 remove_util=self.rm_util))
2234 except (NotFoundError, KeyError):
2235 return f'Unable to find OSD in the queue: {osd_id}'
2236
2237 # trigger the serve loop to halt the removal
2238 self._kick_serve_loop()
2239 return "Stopped OSD(s) removal"
2240
9f95a23c
TL
2241 @trivial_completion
2242 def remove_osds_status(self):
2243 """
2244 The CLI call to retrieve an osd removal report
2245 """
f6b5b4d7 2246 return self.to_remove_osds.all_osds()