]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
537c4d4102fa6ddb76e63512202dd25db0c8fd7a
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 import time
5 from copy import copy
6 from threading import Event
7 from functools import wraps
8
9 from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
10
11 import string
12 try:
13 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, \
14 Any, NamedTuple, Iterator, Set, Sequence
15 from typing import TYPE_CHECKING, cast
16 except ImportError:
17 TYPE_CHECKING = False # just for type checking
18
19
20 import datetime
21 import six
22 import os
23 import random
24 import tempfile
25 import multiprocessing.pool
26 import re
27 import shutil
28 import subprocess
29 import uuid
30
31 from ceph.deployment import inventory, translate
32 from ceph.deployment.drive_group import DriveGroupSpec
33 from ceph.deployment.drive_selection.selector import DriveSelection
34 from ceph.deployment.service_spec import \
35 HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
36
37 from mgr_module import MgrModule, HandleCommandResult
38 import orchestrator
39 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
40 CLICommandMeta
41
42 from . import remotes
43 from . import utils
44 from .nfs import NFSGanesha
45 from .osd import RemoveUtil, OSDRemoval
46
47
48 try:
49 import remoto
50 import remoto.process
51 import execnet.gateway_bootstrap
52 except ImportError as e:
53 remoto = None
54 remoto_import_error = str(e)
55
56 try:
57 from typing import List
58 except ImportError:
59 pass
60
61 logger = logging.getLogger(__name__)
62
63 DEFAULT_SSH_CONFIG = """
64 Host *
65 User root
66 StrictHostKeyChecking no
67 UserKnownHostsFile /dev/null
68 ConnectTimeout=30
69 """
70
71 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
72 CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
73
74 HOST_CACHE_PREFIX = "host."
75 SPEC_STORE_PREFIX = "spec."
76
77 # ceph daemon types that use the ceph container image.
78 # NOTE: listed in upgrade order!
79 CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
80 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
81
82
83 # for py2 compat
84 try:
85 from tempfile import TemporaryDirectory # py3
86 except ImportError:
87 # define a minimal (but sufficient) equivalent for <= py 3.2
88 class TemporaryDirectory(object): # type: ignore
89 def __init__(self):
90 self.name = tempfile.mkdtemp()
91
92 def __enter__(self):
93 if not self.name:
94 self.name = tempfile.mkdtemp()
95 return self.name
96
97 def cleanup(self):
98 shutil.rmtree(self.name)
99
100 def __exit__(self, exc_type, exc_value, traceback):
101 self.cleanup()
102
103
104 class SpecStore():
105 def __init__(self, mgr):
106 # type: (CephadmOrchestrator) -> None
107 self.mgr = mgr
108 self.specs = {} # type: Dict[str, ServiceSpec]
109 self.spec_created = {} # type: Dict[str, datetime.datetime]
110
111 def load(self):
112 # type: () -> None
113 for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
114 service_name = k[len(SPEC_STORE_PREFIX):]
115 try:
116 v = json.loads(v)
117 spec = ServiceSpec.from_json(v['spec'])
118 created = datetime.datetime.strptime(v['created'], DATEFMT)
119 self.specs[service_name] = spec
120 self.spec_created[service_name] = created
121 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
122 service_name))
123 except Exception as e:
124 self.mgr.log.warning('unable to load spec for %s: %s' % (
125 service_name, e))
126 pass
127
128 def save(self, spec):
129 # type: (ServiceSpec) -> None
130 self.specs[spec.service_name()] = spec
131 self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
132 self.mgr.set_store(
133 SPEC_STORE_PREFIX + spec.service_name(),
134 json.dumps({
135 'spec': spec.to_json(),
136 'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
137 }, sort_keys=True),
138 )
139
140 def rm(self, service_name):
141 # type: (str) -> bool
142 found = service_name in self.specs
143 if found:
144 del self.specs[service_name]
145 del self.spec_created[service_name]
146 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
147 return found
148
149 def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
150 specs = []
151 for sn, spec in self.specs.items():
152 if not service_name or \
153 sn == service_name or \
154 sn.startswith(service_name + '.'):
155 specs.append(spec)
156 self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
157 service_name, specs))
158 return specs
159
160 class HostCache():
161 def __init__(self, mgr):
162 # type: (CephadmOrchestrator) -> None
163 self.mgr: CephadmOrchestrator = mgr
164 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
165 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
166 self.devices = {} # type: Dict[str, List[inventory.Device]]
167 self.networks = {} # type: Dict[str, Dict[str, List[str]]]
168 self.last_device_update = {} # type: Dict[str, datetime.datetime]
169 self.daemon_refresh_queue = [] # type: List[str]
170 self.device_refresh_queue = [] # type: List[str]
171 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
172 self.last_host_check = {} # type: Dict[str, datetime.datetime]
173
174 def load(self):
175 # type: () -> None
176 for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
177 host = k[len(HOST_CACHE_PREFIX):]
178 if host not in self.mgr.inventory:
179 self.mgr.log.warning('removing stray HostCache host record %s' % (
180 host))
181 self.mgr.set_store(k, None)
182 try:
183 j = json.loads(v)
184 if 'last_device_update' in j:
185 self.last_device_update[host] = datetime.datetime.strptime(
186 j['last_device_update'], DATEFMT)
187 else:
188 self.device_refresh_queue.append(host)
189 # for services, we ignore the persisted last_*_update
190 # and always trigger a new scrape on mgr restart.
191 self.daemon_refresh_queue.append(host)
192 self.daemons[host] = {}
193 self.devices[host] = []
194 self.networks[host] = {}
195 self.daemon_config_deps[host] = {}
196 for name, d in j.get('daemons', {}).items():
197 self.daemons[host][name] = \
198 orchestrator.DaemonDescription.from_json(d)
199 for d in j.get('devices', []):
200 self.devices[host].append(inventory.Device.from_json(d))
201 self.networks[host] = j.get('networks', {})
202 for name, d in j.get('daemon_config_deps', {}).items():
203 self.daemon_config_deps[host][name] = {
204 'deps': d.get('deps', []),
205 'last_config': datetime.datetime.strptime(
206 d['last_config'], DATEFMT),
207 }
208 if 'last_host_check' in j:
209 self.last_host_check[host] = datetime.datetime.strptime(
210 j['last_host_check'], DATEFMT)
211 self.mgr.log.debug(
212 'HostCache.load: host %s has %d daemons, '
213 '%d devices, %d networks' % (
214 host, len(self.daemons[host]), len(self.devices[host]),
215 len(self.networks[host])))
216 except Exception as e:
217 self.mgr.log.warning('unable to load cached state for %s: %s' % (
218 host, e))
219 pass
220
221 def update_host_daemons(self, host, dm):
222 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
223 self.daemons[host] = dm
224 self.last_daemon_update[host] = datetime.datetime.utcnow()
225
226 def update_host_devices_networks(self, host, dls, nets):
227 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
228 self.devices[host] = dls
229 self.networks[host] = nets
230 self.last_device_update[host] = datetime.datetime.utcnow()
231
232 def update_daemon_config_deps(self, host, name, deps, stamp):
233 self.daemon_config_deps[host][name] = {
234 'deps': deps,
235 'last_config': stamp,
236 }
237
238 def update_last_host_check(self, host):
239 # type: (str) -> None
240 self.last_host_check[host] = datetime.datetime.utcnow()
241
242 def prime_empty_host(self, host):
243 # type: (str) -> None
244 """
245 Install an empty entry for a host
246 """
247 self.daemons[host] = {}
248 self.devices[host] = []
249 self.networks[host] = {}
250 self.daemon_config_deps[host] = {}
251 self.daemon_refresh_queue.append(host)
252 self.device_refresh_queue.append(host)
253
254 def invalidate_host_daemons(self, host):
255 # type: (str) -> None
256 self.daemon_refresh_queue.append(host)
257 if host in self.last_daemon_update:
258 del self.last_daemon_update[host]
259 self.mgr.event.set()
260
261 def invalidate_host_devices(self, host):
262 # type: (str) -> None
263 self.device_refresh_queue.append(host)
264 if host in self.last_device_update:
265 del self.last_device_update[host]
266 self.mgr.event.set()
267
268 def save_host(self, host):
269 # type: (str) -> None
270 j = { # type: ignore
271 'daemons': {},
272 'devices': [],
273 'daemon_config_deps': {},
274 }
275 if host in self.last_daemon_update:
276 j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
277 if host in self.last_device_update:
278 j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
279 for name, dd in self.daemons[host].items():
280 j['daemons'][name] = dd.to_json() # type: ignore
281 for d in self.devices[host]:
282 j['devices'].append(d.to_json()) # type: ignore
283 j['networks'] = self.networks[host]
284 for name, depi in self.daemon_config_deps[host].items():
285 j['daemon_config_deps'][name] = { # type: ignore
286 'deps': depi.get('deps', []),
287 'last_config': depi['last_config'].strftime(DATEFMT),
288 }
289 if host in self.last_host_check:
290 j['last_host_check']= self.last_host_check[host].strftime(DATEFMT)
291 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
292
293 def rm_host(self, host):
294 # type: (str) -> None
295 if host in self.daemons:
296 del self.daemons[host]
297 if host in self.devices:
298 del self.devices[host]
299 if host in self.networks:
300 del self.networks[host]
301 if host in self.last_daemon_update:
302 del self.last_daemon_update[host]
303 if host in self.last_device_update:
304 del self.last_device_update[host]
305 if host in self.daemon_config_deps:
306 del self.daemon_config_deps[host]
307 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
308
309 def get_hosts(self):
310 # type: () -> List[str]
311 r = []
312 for host, di in self.daemons.items():
313 r.append(host)
314 return r
315
316 def get_daemons(self):
317 # type: () -> List[orchestrator.DaemonDescription]
318 r = []
319 for host, dm in self.daemons.items():
320 for name, dd in dm.items():
321 r.append(dd)
322 return r
323
324 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
325 for host, dm in self.daemons.items():
326 if host in self.mgr.offline_hosts:
327 def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
328 ret = copy(dd)
329 ret.status = -1
330 ret.status_desc = 'host is offline'
331 return ret
332 yield host, {name: set_offline(d) for name, d in dm.items()}
333 else:
334 yield host, dm
335
336 def get_daemons_by_service(self, service_name):
337 # type: (str) -> List[orchestrator.DaemonDescription]
338 result = [] # type: List[orchestrator.DaemonDescription]
339 for host, dm in self.daemons.items():
340 for name, d in dm.items():
341 if name.startswith(service_name + '.'):
342 result.append(d)
343 return result
344
345 def get_daemon_names(self):
346 # type: () -> List[str]
347 r = []
348 for host, dm in self.daemons.items():
349 for name, dd in dm.items():
350 r.append(name)
351 return r
352
353 def get_daemon_last_config_deps(self, host, name):
354 if host in self.daemon_config_deps:
355 if name in self.daemon_config_deps[host]:
356 return self.daemon_config_deps[host][name].get('deps', []), \
357 self.daemon_config_deps[host][name].get('last_config', None)
358 return None, None
359
360 def host_needs_daemon_refresh(self, host):
361 # type: (str) -> bool
362 if host in self.mgr.offline_hosts:
363 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
364 return False
365 if host in self.daemon_refresh_queue:
366 self.daemon_refresh_queue.remove(host)
367 return True
368 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
369 seconds=self.mgr.daemon_cache_timeout)
370 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
371 return True
372 return False
373
374 def host_needs_device_refresh(self, host):
375 # type: (str) -> bool
376 if host in self.mgr.offline_hosts:
377 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
378 return False
379 if host in self.device_refresh_queue:
380 self.device_refresh_queue.remove(host)
381 return True
382 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
383 seconds=self.mgr.device_cache_timeout)
384 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
385 return True
386 return False
387
388 def host_needs_check(self, host):
389 # type: (str) -> bool
390 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
391 seconds=self.mgr.host_check_interval)
392 return host not in self.last_host_check or self.last_host_check[host] < cutoff
393
394 def add_daemon(self, host, dd):
395 # type: (str, orchestrator.DaemonDescription) -> None
396 assert host in self.daemons
397 self.daemons[host][dd.name()] = dd
398
399 def rm_daemon(self, host, name):
400 if host in self.daemons:
401 if name in self.daemons[host]:
402 del self.daemons[host][name]
403
404
405 class AsyncCompletion(orchestrator.Completion):
406 def __init__(self,
407 _first_promise=None, # type: Optional[orchestrator.Completion]
408 value=orchestrator._Promise.NO_RESULT, # type: Any
409 on_complete=None, # type: Optional[Callable]
410 name=None, # type: Optional[str]
411 many=False, # type: bool
412 update_progress=False, # type: bool
413 ):
414
415 assert CephadmOrchestrator.instance is not None
416 self.many = many
417 self.update_progress = update_progress
418 if name is None and on_complete is not None:
419 name = getattr(on_complete, '__name__', None)
420 super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
421
422 @property
423 def _progress_reference(self):
424 # type: () -> Optional[orchestrator.ProgressReference]
425 if hasattr(self._on_complete_, 'progress_id'): # type: ignore
426 return self._on_complete_ # type: ignore
427 return None
428
429 @property
430 def _on_complete(self):
431 # type: () -> Optional[Callable]
432 if self._on_complete_ is None:
433 return None
434
435 def callback(result):
436 try:
437 if self.update_progress:
438 assert self.progress_reference
439 self.progress_reference.progress = 1.0
440 self._on_complete_ = None
441 self._finalize(result)
442 except Exception as e:
443 try:
444 self.fail(e)
445 except Exception:
446 logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<')
447 if 'UNITTEST' in os.environ:
448 assert False
449
450 def error_callback(e):
451 pass
452
453 def run(value):
454 def do_work(*args, **kwargs):
455 assert self._on_complete_ is not None
456 try:
457 res = self._on_complete_(*args, **kwargs)
458 if self.update_progress and self.many:
459 assert self.progress_reference
460 self.progress_reference.progress += 1.0 / len(value)
461 return res
462 except Exception as e:
463 self.fail(e)
464 raise
465
466 assert CephadmOrchestrator.instance
467 if self.many:
468 if not value:
469 logger.info('calling map_async without values')
470 callback([])
471 if six.PY3:
472 CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
473 callback=callback,
474 error_callback=error_callback)
475 else:
476 CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
477 callback=callback)
478 else:
479 if six.PY3:
480 CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
481 callback=callback, error_callback=error_callback)
482 else:
483 CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
484 callback=callback)
485 return self.ASYNC_RESULT
486
487 return run
488
489 @_on_complete.setter
490 def _on_complete(self, inner):
491 # type: (Callable) -> None
492 self._on_complete_ = inner
493
494
495 def ssh_completion(cls=AsyncCompletion, **c_kwargs):
496 # type: (Type[orchestrator.Completion], Any) -> Callable
497 """
498 See ./HACKING.rst for a how-to
499 """
500 def decorator(f):
501 @wraps(f)
502 def wrapper(*args):
503
504 name = f.__name__
505 many = c_kwargs.get('many', False)
506
507 # Some weired logic to make calling functions with multiple arguments work.
508 if len(args) == 1:
509 [value] = args
510 if many and value and isinstance(value[0], tuple):
511 return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
512 else:
513 return cls(on_complete=f, value=value, name=name, **c_kwargs)
514 else:
515 if many:
516 self, value = args
517
518 def call_self(inner_args):
519 if not isinstance(inner_args, tuple):
520 inner_args = (inner_args, )
521 return f(self, *inner_args)
522
523 return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
524 else:
525 return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
526
527 return wrapper
528 return decorator
529
530
531 def async_completion(f):
532 # type: (Callable) -> Callable[..., AsyncCompletion]
533 """
534 See ./HACKING.rst for a how-to
535
536 :param f: wrapped function
537 """
538 return ssh_completion()(f)
539
540
541 def async_map_completion(f):
542 # type: (Callable) -> Callable[..., AsyncCompletion]
543 """
544 See ./HACKING.rst for a how-to
545
546 :param f: wrapped function
547
548 kind of similar to
549
550 >>> def sync_map(f):
551 ... return lambda x: map(f, x)
552
553 """
554 return ssh_completion(many=True)(f)
555
556
557 def trivial_completion(f):
558 # type: (Callable) -> Callable[..., orchestrator.Completion]
559 @wraps(f)
560 def wrapper(*args, **kwargs):
561 return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
562 return wrapper
563
564
565 @six.add_metaclass(CLICommandMeta)
566 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
567
568 _STORE_HOST_PREFIX = "host"
569
570 instance = None
571 NATIVE_OPTIONS = [] # type: List[Any]
572 MODULE_OPTIONS = [
573 {
574 'name': 'ssh_config_file',
575 'type': 'str',
576 'default': None,
577 'desc': 'customized SSH config file to connect to managed hosts',
578 },
579 {
580 'name': 'device_cache_timeout',
581 'type': 'secs',
582 'default': 30 * 60,
583 'desc': 'seconds to cache device inventory',
584 },
585 {
586 'name': 'daemon_cache_timeout',
587 'type': 'secs',
588 'default': 10 * 60,
589 'desc': 'seconds to cache service (daemon) inventory',
590 },
591 {
592 'name': 'host_check_interval',
593 'type': 'secs',
594 'default': 10 * 60,
595 'desc': 'how frequently to perform a host check',
596 },
597 {
598 'name': 'mode',
599 'type': 'str',
600 'enum_allowed': ['root', 'cephadm-package'],
601 'default': 'root',
602 'desc': 'mode for remote execution of cephadm',
603 },
604 {
605 'name': 'container_image_base',
606 'default': 'docker.io/ceph/ceph',
607 'desc': 'Container image name, without the tag',
608 'runtime': True,
609 },
610 {
611 'name': 'warn_on_stray_hosts',
612 'type': 'bool',
613 'default': True,
614 'desc': 'raise a health warning if daemons are detected on a host '
615 'that is not managed by cephadm',
616 },
617 {
618 'name': 'warn_on_stray_daemons',
619 'type': 'bool',
620 'default': True,
621 'desc': 'raise a health warning if daemons are detected '
622 'that are not managed by cephadm',
623 },
624 {
625 'name': 'warn_on_failed_host_check',
626 'type': 'bool',
627 'default': True,
628 'desc': 'raise a health warning if the host check fails',
629 },
630 {
631 'name': 'log_to_cluster',
632 'type': 'bool',
633 'default': True,
634 'desc': 'log to the "cephadm" cluster log channel"',
635 },
636 {
637 'name': 'allow_ptrace',
638 'type': 'bool',
639 'default': False,
640 'desc': 'allow SYS_PTRACE capability on ceph containers',
641 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
642 'process with gdb or strace. Enabling this options '
643 'can allow debugging daemons that encounter problems '
644 'at runtime.',
645 },
646 {
647 'name': 'prometheus_alerts_path',
648 'type': 'str',
649 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
650 'desc': 'location of alerts to include in prometheus deployments',
651 },
652 ]
653
654 def __init__(self, *args, **kwargs):
655 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
656 self._cluster_fsid = self.get('mon_map')['fsid']
657
658 # for serve()
659 self.run = True
660 self.event = Event()
661
662 if self.get_store('pause'):
663 self.paused = True
664 else:
665 self.paused = False
666
667 # for mypy which does not run the code
668 if TYPE_CHECKING:
669 self.ssh_config_file = None # type: Optional[str]
670 self.device_cache_timeout = 0
671 self.daemon_cache_timeout = 0
672 self.host_check_interval = 0
673 self.mode = ''
674 self.container_image_base = ''
675 self.warn_on_stray_hosts = True
676 self.warn_on_stray_daemons = True
677 self.warn_on_failed_host_check = True
678 self.allow_ptrace = False
679 self.prometheus_alerts_path = ''
680
681 self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
682
683 self.config_notify()
684
685 path = self.get_ceph_option('cephadm_path')
686 try:
687 with open(path, 'r') as f:
688 self._cephadm = f.read()
689 except (IOError, TypeError) as e:
690 raise RuntimeError("unable to read cephadm at '%s': %s" % (
691 path, str(e)))
692
693 self._worker_pool = multiprocessing.pool.ThreadPool(10)
694
695 self._reconfig_ssh()
696
697 CephadmOrchestrator.instance = self
698
699 t = self.get_store('upgrade_state')
700 if t:
701 self.upgrade_state = json.loads(t)
702 else:
703 self.upgrade_state = None
704
705 self.health_checks = {}
706
707 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
708
709 # load inventory
710 i = self.get_store('inventory')
711 if i:
712 self.inventory: Dict[str, dict] = json.loads(i)
713 else:
714 self.inventory = dict()
715 self.log.debug('Loaded inventory %s' % self.inventory)
716
717 self.cache = HostCache(self)
718 self.cache.load()
719 self.rm_util = RemoveUtil(self)
720
721 self.spec_store = SpecStore(self)
722 self.spec_store.load()
723
724 # ensure the host lists are in sync
725 for h in self.inventory.keys():
726 if h not in self.cache.daemons:
727 self.cache.prime_empty_host(h)
728 for h in self.cache.get_hosts():
729 if h not in self.inventory:
730 self.cache.rm_host(h)
731
732 # in-memory only.
733 self.offline_hosts: Set[str] = set()
734
735 def shutdown(self):
736 self.log.debug('shutdown')
737 self._worker_pool.close()
738 self._worker_pool.join()
739 self.run = False
740 self.event.set()
741
742 def _kick_serve_loop(self):
743 self.log.debug('_kick_serve_loop')
744 self.event.set()
745
746 def _check_safe_to_destroy_mon(self, mon_id):
747 # type: (str) -> None
748 ret, out, err = self.mon_command({
749 'prefix': 'quorum_status',
750 })
751 if ret:
752 raise OrchestratorError('failed to check mon quorum status')
753 try:
754 j = json.loads(out)
755 except Exception as e:
756 raise OrchestratorError('failed to parse quorum status')
757
758 mons = [m['name'] for m in j['monmap']['mons']]
759 if mon_id not in mons:
760 self.log.info('Safe to remove mon.%s: not in monmap (%s)' % (
761 mon_id, mons))
762 return
763 new_mons = [m for m in mons if m != mon_id]
764 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
765 if len(new_quorum) > len(new_mons) / 2:
766 self.log.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
767 return
768 raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
769
770 def _wait_for_ok_to_stop(self, s):
771 # only wait a little bit; the service might go away for something
772 tries = 4
773 while tries > 0:
774 if s.daemon_type not in ['mon', 'osd', 'mds']:
775 self.log.info('Upgrade: It is presumed safe to stop %s.%s' %
776 (s.daemon_type, s.daemon_id))
777 return True
778 ret, out, err = self.mon_command({
779 'prefix': '%s ok-to-stop' % s.daemon_type,
780 'ids': [s.daemon_id],
781 })
782 if not self.upgrade_state or self.upgrade_state.get('paused'):
783 return False
784 if ret:
785 self.log.info('Upgrade: It is NOT safe to stop %s.%s' %
786 (s.daemon_type, s.daemon_id))
787 time.sleep(15)
788 tries -= 1
789 else:
790 self.log.info('Upgrade: It is safe to stop %s.%s' %
791 (s.daemon_type, s.daemon_id))
792 return True
793 return False
794
795 def _clear_upgrade_health_checks(self):
796 for k in ['UPGRADE_NO_STANDBY_MGR',
797 'UPGRADE_FAILED_PULL']:
798 if k in self.health_checks:
799 del self.health_checks[k]
800 self.set_health_checks(self.health_checks)
801
802 def _fail_upgrade(self, alert_id, alert):
803 self.log.error('Upgrade: Paused due to %s: %s' % (alert_id,
804 alert['summary']))
805 self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
806 self.upgrade_state['paused'] = True
807 self._save_upgrade_state()
808 self.health_checks[alert_id] = alert
809 self.set_health_checks(self.health_checks)
810
811 def _update_upgrade_progress(self, progress):
812 if 'progress_id' not in self.upgrade_state:
813 self.upgrade_state['progress_id'] = str(uuid.uuid4())
814 self._save_upgrade_state()
815 self.remote('progress', 'update', self.upgrade_state['progress_id'],
816 ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
817 ev_progress=progress)
818
819 def _do_upgrade(self):
820 # type: () -> None
821 if not self.upgrade_state:
822 self.log.debug('_do_upgrade no state, exiting')
823 return
824
825 target_name = self.upgrade_state.get('target_name')
826 target_id = self.upgrade_state.get('target_id', None)
827 if not target_id:
828 # need to learn the container hash
829 self.log.info('Upgrade: First pull of %s' % target_name)
830 try:
831 target_id, target_version = self._get_container_image_id(target_name)
832 except OrchestratorError as e:
833 self._fail_upgrade('UPGRADE_FAILED_PULL', {
834 'severity': 'warning',
835 'summary': 'Upgrade: failed to pull target image',
836 'count': 1,
837 'detail': [str(e)],
838 })
839 return
840 self.upgrade_state['target_id'] = target_id
841 self.upgrade_state['target_version'] = target_version
842 self._save_upgrade_state()
843 target_version = self.upgrade_state.get('target_version')
844 self.log.info('Upgrade: Target is %s with id %s' % (target_name,
845 target_id))
846
847 # get all distinct container_image settings
848 image_settings = {}
849 ret, out, err = self.mon_command({
850 'prefix': 'config dump',
851 'format': 'json',
852 })
853 config = json.loads(out)
854 for opt in config:
855 if opt['name'] == 'container_image':
856 image_settings[opt['section']] = opt['value']
857
858 daemons = self.cache.get_daemons()
859 done = 0
860 for daemon_type in CEPH_UPGRADE_ORDER:
861 self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
862 need_upgrade_self = False
863 for d in daemons:
864 if d.daemon_type != daemon_type:
865 continue
866 if d.container_image_id == target_id:
867 self.log.debug('daemon %s.%s version correct' % (
868 daemon_type, d.daemon_id))
869 done += 1
870 continue
871 self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % (
872 daemon_type, d.daemon_id,
873 d.container_image_name, d.container_image_id, d.version))
874
875 if daemon_type == 'mgr' and \
876 d.daemon_id == self.get_mgr_id():
877 self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
878 self.get_mgr_id())
879 need_upgrade_self = True
880 continue
881
882 # make sure host has latest container image
883 out, err, code = self._run_cephadm(
884 d.hostname, None, 'inspect-image', [],
885 image=target_name, no_fsid=True, error_ok=True)
886 if code or json.loads(''.join(out)).get('image_id') != target_id:
887 self.log.info('Upgrade: Pulling %s on %s' % (target_name,
888 d.hostname))
889 out, err, code = self._run_cephadm(
890 d.hostname, None, 'pull', [],
891 image=target_name, no_fsid=True, error_ok=True)
892 if code:
893 self._fail_upgrade('UPGRADE_FAILED_PULL', {
894 'severity': 'warning',
895 'summary': 'Upgrade: failed to pull target image',
896 'count': 1,
897 'detail': [
898 'failed to pull %s on host %s' % (target_name,
899 d.hostname)],
900 })
901 return
902 r = json.loads(''.join(out))
903 if r.get('image_id') != target_id:
904 self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
905 self.upgrade_state['target_id'] = r['image_id']
906 self._save_upgrade_state()
907 return
908
909 self._update_upgrade_progress(done / len(daemons))
910
911 if not d.container_image_id:
912 if d.container_image_name == target_name:
913 self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
914 continue
915 if not self._wait_for_ok_to_stop(d):
916 return
917 self.log.info('Upgrade: Redeploying %s.%s' %
918 (d.daemon_type, d.daemon_id))
919 ret, out, err = self.mon_command({
920 'prefix': 'config set',
921 'name': 'container_image',
922 'value': target_name,
923 'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id),
924 })
925 self._daemon_action(
926 d.daemon_type,
927 d.daemon_id,
928 d.hostname,
929 'redeploy'
930 )
931 return
932
933 if need_upgrade_self:
934 mgr_map = self.get('mgr_map')
935 num = len(mgr_map.get('standbys'))
936 if not num:
937 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
938 'severity': 'warning',
939 'summary': 'Upgrade: Need standby mgr daemon',
940 'count': 1,
941 'detail': [
942 'The upgrade process needs to upgrade the mgr, '
943 'but it needs at least one standby to proceed.',
944 ],
945 })
946 return
947
948 self.log.info('Upgrade: there are %d other already-upgraded '
949 'standby mgrs, failing over' % num)
950
951 self._update_upgrade_progress(done / len(daemons))
952
953 # fail over
954 ret, out, err = self.mon_command({
955 'prefix': 'mgr fail',
956 'who': self.get_mgr_id(),
957 })
958 return
959 elif daemon_type == 'mgr':
960 if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
961 del self.health_checks['UPGRADE_NO_STANDBY_MGR']
962 self.set_health_checks(self.health_checks)
963
964 # make sure 'ceph versions' agrees
965 ret, out, err = self.mon_command({
966 'prefix': 'versions',
967 })
968 j = json.loads(out)
969 for version, count in j.get(daemon_type, {}).items():
970 if version != target_version:
971 self.log.warning(
972 'Upgrade: %d %s daemon(s) are %s != target %s' %
973 (count, daemon_type, version, target_version))
974
975 # push down configs
976 if image_settings.get(daemon_type) != target_name:
977 self.log.info('Upgrade: Setting container_image for all %s...' %
978 daemon_type)
979 ret, out, err = self.mon_command({
980 'prefix': 'config set',
981 'name': 'container_image',
982 'value': target_name,
983 'who': daemon_type,
984 })
985 to_clean = []
986 for section in image_settings.keys():
987 if section.startswith(utils.name_to_config_section(daemon_type) + '.'):
988 to_clean.append(section)
989 if to_clean:
990 self.log.debug('Upgrade: Cleaning up container_image for %s...' %
991 to_clean)
992 for section in to_clean:
993 ret, image, err = self.mon_command({
994 'prefix': 'config rm',
995 'name': 'container_image',
996 'who': section,
997 })
998
999 self.log.info('Upgrade: All %s daemons are up to date.' %
1000 daemon_type)
1001
1002 # clean up
1003 self.log.info('Upgrade: Finalizing container_image settings')
1004 ret, out, err = self.mon_command({
1005 'prefix': 'config set',
1006 'name': 'container_image',
1007 'value': target_name,
1008 'who': 'global',
1009 })
1010 for daemon_type in CEPH_UPGRADE_ORDER:
1011 ret, image, err = self.mon_command({
1012 'prefix': 'config rm',
1013 'name': 'container_image',
1014 'who': utils.name_to_config_section(daemon_type),
1015 })
1016
1017 self.log.info('Upgrade: Complete!')
1018 if 'progress_id' in self.upgrade_state:
1019 self.remote('progress', 'complete',
1020 self.upgrade_state['progress_id'])
1021 self.upgrade_state = None
1022 self._save_upgrade_state()
1023 return
1024
1025 def _check_host(self, host):
1026 if host not in self.inventory:
1027 return
1028 self.log.debug(' checking %s' % host)
1029 try:
1030 out, err, code = self._run_cephadm(
1031 host, 'client', 'check-host', [],
1032 error_ok=True, no_fsid=True)
1033 self.cache.update_last_host_check(host)
1034 self.cache.save_host(host)
1035 if code:
1036 self.log.debug(' host %s failed check' % host)
1037 if self.warn_on_failed_host_check:
1038 return 'host %s failed check: %s' % (host, err)
1039 else:
1040 self.log.debug(' host %s ok' % host)
1041 except Exception as e:
1042 self.log.debug(' host %s failed check' % host)
1043 return 'host %s failed check: %s' % (host, e)
1044
1045 def _check_for_strays(self):
1046 self.log.debug('_check_for_strays')
1047 for k in ['CEPHADM_STRAY_HOST',
1048 'CEPHADM_STRAY_DAEMON']:
1049 if k in self.health_checks:
1050 del self.health_checks[k]
1051 if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
1052 ls = self.list_servers()
1053 managed = self.cache.get_daemon_names()
1054 host_detail = [] # type: List[str]
1055 host_num_daemons = 0
1056 daemon_detail = [] # type: List[str]
1057 for item in ls:
1058 host = item.get('hostname')
1059 daemons = item.get('services') # misnomer!
1060 missing_names = []
1061 for s in daemons:
1062 name = '%s.%s' % (s.get('type'), s.get('id'))
1063 if host not in self.inventory:
1064 missing_names.append(name)
1065 host_num_daemons += 1
1066 if name not in managed:
1067 daemon_detail.append(
1068 'stray daemon %s on host %s not managed by cephadm' % (name, host))
1069 if missing_names:
1070 host_detail.append(
1071 'stray host %s has %d stray daemons: %s' % (
1072 host, len(missing_names), missing_names))
1073 if self.warn_on_stray_hosts and host_detail:
1074 self.health_checks['CEPHADM_STRAY_HOST'] = {
1075 'severity': 'warning',
1076 'summary': '%d stray host(s) with %s daemon(s) '
1077 'not managed by cephadm' % (
1078 len(host_detail), host_num_daemons),
1079 'count': len(host_detail),
1080 'detail': host_detail,
1081 }
1082 if self.warn_on_stray_daemons and daemon_detail:
1083 self.health_checks['CEPHADM_STRAY_DAEMON'] = {
1084 'severity': 'warning',
1085 'summary': '%d stray daemons(s) not managed by cephadm' % (
1086 len(daemon_detail)),
1087 'count': len(daemon_detail),
1088 'detail': daemon_detail,
1089 }
1090 self.set_health_checks(self.health_checks)
1091
1092 def _serve_sleep(self):
1093 sleep_interval = 600
1094 self.log.debug('Sleeping for %d seconds', sleep_interval)
1095 ret = self.event.wait(sleep_interval)
1096 self.event.clear()
1097
1098 def serve(self):
1099 # type: () -> None
1100 self.log.debug("serve starting")
1101 while self.run:
1102
1103 # refresh daemons
1104 self.log.debug('refreshing hosts')
1105 bad_hosts = []
1106 failures = []
1107 for host in self.cache.get_hosts():
1108 if self.cache.host_needs_check(host):
1109 r = self._check_host(host)
1110 if r is not None:
1111 bad_hosts.append(r)
1112 if self.cache.host_needs_daemon_refresh(host):
1113 self.log.debug('refreshing %s daemons' % host)
1114 r = self._refresh_host_daemons(host)
1115 if r:
1116 failures.append(r)
1117 if self.cache.host_needs_device_refresh(host):
1118 self.log.debug('refreshing %s devices' % host)
1119 r = self._refresh_host_devices(host)
1120 if r:
1121 failures.append(r)
1122
1123 health_changed = False
1124 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1125 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
1126 health_changed = True
1127 if bad_hosts:
1128 self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
1129 'severity': 'warning',
1130 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
1131 'count': len(bad_hosts),
1132 'detail': bad_hosts,
1133 }
1134 health_changed = True
1135 if failures:
1136 self.health_checks['CEPHADM_REFRESH_FAILED'] = {
1137 'severity': 'warning',
1138 'summary': 'failed to probe daemons or devices',
1139 'count': len(failures),
1140 'detail': failures,
1141 }
1142 health_changed = True
1143 elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
1144 del self.health_checks['CEPHADM_REFRESH_FAILED']
1145 health_changed = True
1146 if health_changed:
1147 self.set_health_checks(self.health_checks)
1148
1149
1150
1151 self._check_for_strays()
1152
1153 if self.paused:
1154 self.health_checks['CEPHADM_PAUSED'] = {
1155 'severity': 'warning',
1156 'summary': 'cephadm background work is paused',
1157 'count': 1,
1158 'detail': ["'ceph orch resume' to resume"],
1159 }
1160 self.set_health_checks(self.health_checks)
1161 else:
1162 if 'CEPHADM_PAUSED' in self.health_checks:
1163 del self.health_checks['CEPHADM_PAUSED']
1164 self.set_health_checks(self.health_checks)
1165
1166 self.rm_util._remove_osds_bg()
1167
1168 if self._apply_all_services():
1169 continue # did something, refresh
1170
1171 self._check_daemons()
1172
1173 if self.upgrade_state and not self.upgrade_state.get('paused'):
1174 self._do_upgrade()
1175 continue
1176
1177 self._serve_sleep()
1178 self.log.debug("serve exit")
1179
1180 def config_notify(self):
1181 """
1182 This method is called whenever one of our config options is changed.
1183 """
1184 for opt in self.MODULE_OPTIONS:
1185 setattr(self,
1186 opt['name'], # type: ignore
1187 self.get_module_option(opt['name'])) # type: ignore
1188 self.log.debug(' mgr option %s = %s',
1189 opt['name'], getattr(self, opt['name'])) # type: ignore
1190 for opt in self.NATIVE_OPTIONS:
1191 setattr(self,
1192 opt, # type: ignore
1193 self.get_ceph_option(opt))
1194 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
1195
1196 self.event.set()
1197
1198 def notify(self, notify_type, notify_id):
1199 pass
1200
1201 def pause(self):
1202 if not self.paused:
1203 self.log.info('Paused')
1204 self.set_store('pause', 'true')
1205 self.paused = True
1206 # wake loop so we update the health status
1207 self._kick_serve_loop()
1208
1209 def resume(self):
1210 if self.paused:
1211 self.log.info('Resumed')
1212 self.paused = False
1213 self.set_store('pause', None)
1214 # unconditionally wake loop so that 'orch resume' can be used to kick
1215 # cephadm
1216 self._kick_serve_loop()
1217
1218 def get_unique_name(self, daemon_type, host, existing, prefix=None,
1219 forcename=None):
1220 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
1221 """
1222 Generate a unique random service name
1223 """
1224 suffix = daemon_type not in [
1225 'mon', 'crash', 'nfs',
1226 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
1227 ]
1228 if forcename:
1229 if len([d for d in existing if d.daemon_id == forcename]):
1230 raise orchestrator.OrchestratorValidationError('name %s already in use', forcename)
1231 return forcename
1232
1233 if '.' in host:
1234 host = host.split('.')[0]
1235 while True:
1236 if prefix:
1237 name = prefix + '.'
1238 else:
1239 name = ''
1240 name += host
1241 if suffix:
1242 name += '.' + ''.join(random.choice(string.ascii_lowercase)
1243 for _ in range(6))
1244 if len([d for d in existing if d.daemon_id == name]):
1245 if not suffix:
1246 raise orchestrator.OrchestratorValidationError('name %s already in use', name)
1247 self.log.debug('name %s exists, trying again', name)
1248 continue
1249 return name
1250
1251 def get_service_name(self, daemon_type, daemon_id, host):
1252 # type: (str, str, str) -> (str)
1253 """
1254 Returns the generic service name
1255 """
1256 p = re.compile(r'(.*)\.%s.*' % (host))
1257 return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id))
1258
1259 def _save_inventory(self):
1260 self.set_store('inventory', json.dumps(self.inventory))
1261
1262 def _save_upgrade_state(self):
1263 self.set_store('upgrade_state', json.dumps(self.upgrade_state))
1264
1265 def _reconfig_ssh(self):
1266 temp_files = [] # type: list
1267 ssh_options = [] # type: List[str]
1268
1269 # ssh_config
1270 ssh_config_fname = self.ssh_config_file
1271 ssh_config = self.get_store("ssh_config")
1272 if ssh_config is not None or ssh_config_fname is None:
1273 if not ssh_config:
1274 ssh_config = DEFAULT_SSH_CONFIG
1275 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
1276 os.fchmod(f.fileno(), 0o600)
1277 f.write(ssh_config.encode('utf-8'))
1278 f.flush() # make visible to other processes
1279 temp_files += [f]
1280 ssh_config_fname = f.name
1281 if ssh_config_fname:
1282 self.validate_ssh_config_fname(ssh_config_fname)
1283 ssh_options += ['-F', ssh_config_fname]
1284
1285 # identity
1286 ssh_key = self.get_store("ssh_identity_key")
1287 ssh_pub = self.get_store("ssh_identity_pub")
1288 self.ssh_pub = ssh_pub
1289 self.ssh_key = ssh_key
1290 if ssh_key and ssh_pub:
1291 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
1292 tkey.write(ssh_key.encode('utf-8'))
1293 os.fchmod(tkey.fileno(), 0o600)
1294 tkey.flush() # make visible to other processes
1295 tpub = open(tkey.name + '.pub', 'w')
1296 os.fchmod(tpub.fileno(), 0o600)
1297 tpub.write(ssh_pub)
1298 tpub.flush() # make visible to other processes
1299 temp_files += [tkey, tpub]
1300 ssh_options += ['-i', tkey.name]
1301
1302 self._temp_files = temp_files
1303 if ssh_options:
1304 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
1305 else:
1306 self._ssh_options = None
1307
1308 if self.mode == 'root':
1309 self.ssh_user = 'root'
1310 elif self.mode == 'cephadm-package':
1311 self.ssh_user = 'cephadm'
1312
1313 self._reset_cons()
1314
1315 def validate_ssh_config_fname(self, ssh_config_fname):
1316 if not os.path.isfile(ssh_config_fname):
1317 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
1318 ssh_config_fname))
1319
1320 def _reset_con(self, host):
1321 conn, r = self._cons.get(host, (None, None))
1322 if conn:
1323 self.log.debug('_reset_con close %s' % host)
1324 conn.exit()
1325 del self._cons[host]
1326
1327 def _reset_cons(self):
1328 for host, conn_and_r in self._cons.items():
1329 self.log.debug('_reset_cons close %s' % host)
1330 conn, r = conn_and_r
1331 conn.exit()
1332 self._cons = {}
1333
1334 def offline_hosts_remove(self, host):
1335 if host in self.offline_hosts:
1336 self.offline_hosts.remove(host)
1337
1338
1339 @staticmethod
1340 def can_run():
1341 if remoto is not None:
1342 return True, ""
1343 else:
1344 return False, "loading remoto library:{}".format(
1345 remoto_import_error)
1346
1347 def available(self):
1348 """
1349 The cephadm orchestrator is always available.
1350 """
1351 return self.can_run()
1352
1353 def process(self, completions):
1354 """
1355 Does nothing, as completions are processed in another thread.
1356 """
1357 if completions:
1358 self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
1359
1360 for p in completions:
1361 p.finalize()
1362
1363 def _require_hosts(self, hosts):
1364 """
1365 Raise an error if any of the given hosts are unregistered.
1366 """
1367 if isinstance(hosts, six.string_types):
1368 hosts = [hosts]
1369 keys = self.inventory.keys()
1370 unregistered_hosts = set(hosts) - keys
1371 if unregistered_hosts:
1372 logger.warning('keys = {}'.format(keys))
1373 raise RuntimeError("Host(s) {} not registered".format(
1374 ", ".join(map(lambda h: "'{}'".format(h),
1375 unregistered_hosts))))
1376
1377 @orchestrator._cli_write_command(
1378 prefix='cephadm set-ssh-config',
1379 desc='Set the ssh_config file (use -i <ssh_config>)')
1380 def _set_ssh_config(self, inbuf=None):
1381 """
1382 Set an ssh_config file provided from stdin
1383
1384 TODO:
1385 - validation
1386 """
1387 if inbuf is None or len(inbuf) == 0:
1388 return -errno.EINVAL, "", "empty ssh config provided"
1389 self.set_store("ssh_config", inbuf)
1390 self.log.info('Set ssh_config')
1391 return 0, "", ""
1392
1393 @orchestrator._cli_write_command(
1394 prefix='cephadm clear-ssh-config',
1395 desc='Clear the ssh_config file')
1396 def _clear_ssh_config(self):
1397 """
1398 Clear the ssh_config file provided from stdin
1399 """
1400 self.set_store("ssh_config", None)
1401 self.ssh_config_tmp = None
1402 self.log.info('Cleared ssh_config')
1403 return 0, "", ""
1404
1405 @orchestrator._cli_read_command(
1406 prefix='cephadm get-ssh-config',
1407 desc='Returns the ssh config as used by cephadm'
1408 )
1409 def _get_ssh_config(self):
1410 if self.ssh_config_file:
1411 self.validate_ssh_config_fname(self.ssh_config_file)
1412 with open(self.ssh_config_file) as f:
1413 return HandleCommandResult(stdout=f.read())
1414 ssh_config = self.get_store("ssh_config")
1415 if ssh_config:
1416 return HandleCommandResult(stdout=ssh_config)
1417 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
1418
1419
1420 @orchestrator._cli_write_command(
1421 'cephadm generate-key',
1422 desc='Generate a cluster SSH key (if not present)')
1423 def _generate_key(self):
1424 if not self.ssh_pub or not self.ssh_key:
1425 self.log.info('Generating ssh key...')
1426 tmp_dir = TemporaryDirectory()
1427 path = tmp_dir.name + '/key'
1428 try:
1429 subprocess.check_call([
1430 '/usr/bin/ssh-keygen',
1431 '-C', 'ceph-%s' % self._cluster_fsid,
1432 '-N', '',
1433 '-f', path
1434 ])
1435 with open(path, 'r') as f:
1436 secret = f.read()
1437 with open(path + '.pub', 'r') as f:
1438 pub = f.read()
1439 finally:
1440 os.unlink(path)
1441 os.unlink(path + '.pub')
1442 tmp_dir.cleanup()
1443 self.set_store('ssh_identity_key', secret)
1444 self.set_store('ssh_identity_pub', pub)
1445 self._reconfig_ssh()
1446 return 0, '', ''
1447
1448 @orchestrator._cli_write_command(
1449 'cephadm clear-key',
1450 desc='Clear cluster SSH key')
1451 def _clear_key(self):
1452 self.set_store('ssh_identity_key', None)
1453 self.set_store('ssh_identity_pub', None)
1454 self._reconfig_ssh()
1455 self.log.info('Cleared cluster SSH key')
1456 return 0, '', ''
1457
1458 @orchestrator._cli_read_command(
1459 'cephadm get-pub-key',
1460 desc='Show SSH public key for connecting to cluster hosts')
1461 def _get_pub_key(self):
1462 if self.ssh_pub:
1463 return 0, self.ssh_pub, ''
1464 else:
1465 return -errno.ENOENT, '', 'No cluster SSH key defined'
1466
1467 @orchestrator._cli_read_command(
1468 'cephadm get-user',
1469 desc='Show user for SSHing to cluster hosts')
1470 def _get_user(self):
1471 return 0, self.ssh_user, ''
1472
1473 @orchestrator._cli_read_command(
1474 'cephadm check-host',
1475 'name=host,type=CephString '
1476 'name=addr,type=CephString,req=false',
1477 'Check whether we can access and manage a remote host')
1478 def check_host(self, host, addr=None):
1479 out, err, code = self._run_cephadm(host, 'client', 'check-host',
1480 ['--expect-hostname', host],
1481 addr=addr,
1482 error_ok=True, no_fsid=True)
1483 if code:
1484 return 1, '', ('check-host failed:\n' + '\n'.join(err))
1485 # if we have an outstanding health alert for this host, give the
1486 # serve thread a kick
1487 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1488 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1489 if item.startswith('host %s ' % host):
1490 self.event.set()
1491 return 0, '%s (%s) ok' % (host, addr), err
1492
1493 @orchestrator._cli_read_command(
1494 'cephadm prepare-host',
1495 'name=host,type=CephString '
1496 'name=addr,type=CephString,req=false',
1497 'Prepare a remote host for use with cephadm')
1498 def _prepare_host(self, host, addr=None):
1499 out, err, code = self._run_cephadm(host, 'client', 'prepare-host',
1500 ['--expect-hostname', host],
1501 addr=addr,
1502 error_ok=True, no_fsid=True)
1503 if code:
1504 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
1505 # if we have an outstanding health alert for this host, give the
1506 # serve thread a kick
1507 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1508 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1509 if item.startswith('host %s ' % host):
1510 self.event.set()
1511 return 0, '%s (%s) ok' % (host, addr), err
1512
1513 def _get_connection(self, host):
1514 """
1515 Setup a connection for running commands on remote host.
1516 """
1517 conn_and_r = self._cons.get(host)
1518 if conn_and_r:
1519 self.log.debug('Have connection to %s' % host)
1520 return conn_and_r
1521 n = self.ssh_user + '@' + host
1522 self.log.debug("Opening connection to {} with ssh options '{}'".format(
1523 n, self._ssh_options))
1524 child_logger=self.log.getChild(n)
1525 child_logger.setLevel('WARNING')
1526 conn = remoto.Connection(
1527 n,
1528 logger=child_logger,
1529 ssh_options=self._ssh_options)
1530
1531 r = conn.import_module(remotes)
1532 self._cons[host] = conn, r
1533
1534 return conn, r
1535
1536 def _executable_path(self, conn, executable):
1537 """
1538 Remote validator that accepts a connection object to ensure that a certain
1539 executable is available returning its full path if so.
1540
1541 Otherwise an exception with thorough details will be raised, informing the
1542 user that the executable was not found.
1543 """
1544 executable_path = conn.remote_module.which(executable)
1545 if not executable_path:
1546 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1547 executable, conn.hostname))
1548 self.log.debug("Found executable '{}' at path '{}'".format(executable,
1549 executable_path))
1550 return executable_path
1551
1552 def _run_cephadm(self, host, entity, command, args,
1553 addr=None,
1554 stdin=None,
1555 no_fsid=False,
1556 error_ok=False,
1557 image=None):
1558 # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int]
1559 """
1560 Run cephadm on the remote host with the given command + args
1561 """
1562 if not addr and host in self.inventory:
1563 addr = self.inventory[host].get('addr', host)
1564
1565 self.offline_hosts_remove(host)
1566
1567 try:
1568 try:
1569 conn, connr = self._get_connection(addr)
1570 except IOError as e:
1571 if error_ok:
1572 self.log.exception('failed to establish ssh connection')
1573 return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
1574 raise
1575
1576 assert image or entity
1577 if not image:
1578 daemon_type = entity.split('.', 1)[0] # type: ignore
1579 if daemon_type in CEPH_TYPES or \
1580 daemon_type == 'nfs':
1581 # get container image
1582 ret, image, err = self.mon_command({
1583 'prefix': 'config get',
1584 'who': utils.name_to_config_section(entity),
1585 'key': 'container_image',
1586 })
1587 image = image.strip() # type: ignore
1588 self.log.debug('%s container image %s' % (entity, image))
1589
1590 final_args = []
1591 if image:
1592 final_args.extend(['--image', image])
1593 final_args.append(command)
1594
1595 if not no_fsid:
1596 final_args += ['--fsid', self._cluster_fsid]
1597 final_args += args
1598
1599 if self.mode == 'root':
1600 self.log.debug('args: %s' % (' '.join(final_args)))
1601 if stdin:
1602 self.log.debug('stdin: %s' % stdin)
1603 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1604 if stdin:
1605 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1606 script += self._cephadm
1607 python = connr.choose_python()
1608 if not python:
1609 raise RuntimeError(
1610 'unable to find python on %s (tried %s in %s)' % (
1611 host, remotes.PYTHONS, remotes.PATH))
1612 try:
1613 out, err, code = remoto.process.check(
1614 conn,
1615 [python, '-u'],
1616 stdin=script.encode('utf-8'))
1617 except RuntimeError as e:
1618 self._reset_con(host)
1619 if error_ok:
1620 return [], [str(e)], 1
1621 raise
1622 elif self.mode == 'cephadm-package':
1623 try:
1624 out, err, code = remoto.process.check(
1625 conn,
1626 ['sudo', '/usr/bin/cephadm'] + final_args,
1627 stdin=stdin)
1628 except RuntimeError as e:
1629 self._reset_con(host)
1630 if error_ok:
1631 return [], [str(e)], 1
1632 raise
1633 else:
1634 assert False, 'unsupported mode'
1635
1636 self.log.debug('code: %d' % code)
1637 if out:
1638 self.log.debug('out: %s' % '\n'.join(out))
1639 if err:
1640 self.log.debug('err: %s' % '\n'.join(err))
1641 if code and not error_ok:
1642 raise RuntimeError(
1643 'cephadm exited with an error code: %d, stderr:%s' % (
1644 code, '\n'.join(err)))
1645 return out, err, code
1646
1647 except execnet.gateway_bootstrap.HostNotFound as e:
1648 # this is a misleading exception as it seems to be thrown for
1649 # any sort of connection failure, even those having nothing to
1650 # do with "host not found" (e.g., ssh key permission denied).
1651 self.offline_hosts.add(host)
1652 user = 'root' if self.mode == 'root' else 'cephadm'
1653 msg = f'Failed to connect to {host} ({addr}). ' \
1654 f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
1655 f'you may want to run: \n' \
1656 f'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}'
1657 raise OrchestratorError(msg) from e
1658 except Exception as ex:
1659 self.log.exception(ex)
1660 raise
1661
1662 def _get_hosts(self, label=None):
1663 # type: (Optional[str]) -> List[str]
1664 r = []
1665 for h, hostspec in self.inventory.items():
1666 if not label or label in hostspec.get('labels', []):
1667 r.append(h)
1668 return r
1669
1670 @async_completion
1671 def add_host(self, spec):
1672 # type: (HostSpec) -> str
1673 """
1674 Add a host to be managed by the orchestrator.
1675
1676 :param host: host name
1677 """
1678 assert_valid_host(spec.hostname)
1679 out, err, code = self._run_cephadm(spec.hostname, 'client', 'check-host',
1680 ['--expect-hostname', spec.hostname],
1681 addr=spec.addr,
1682 error_ok=True, no_fsid=True)
1683 if code:
1684 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1685 spec.hostname, spec.addr, err))
1686
1687 self.inventory[spec.hostname] = spec.to_json()
1688 self._save_inventory()
1689 self.cache.prime_empty_host(spec.hostname)
1690 self.offline_hosts_remove(spec.hostname)
1691 self.event.set() # refresh stray health check
1692 self.log.info('Added host %s' % spec.hostname)
1693 return "Added host '{}'".format(spec.hostname)
1694
1695 @async_completion
1696 def remove_host(self, host):
1697 # type: (str) -> str
1698 """
1699 Remove a host from orchestrator management.
1700
1701 :param host: host name
1702 """
1703 del self.inventory[host]
1704 self._save_inventory()
1705 self.cache.rm_host(host)
1706 self._reset_con(host)
1707 self.event.set() # refresh stray health check
1708 self.log.info('Removed host %s' % host)
1709 return "Removed host '{}'".format(host)
1710
1711 @async_completion
1712 def update_host_addr(self, host, addr):
1713 if host not in self.inventory:
1714 raise OrchestratorError('host %s not registered' % host)
1715 self.inventory[host]['addr'] = addr
1716 self._save_inventory()
1717 self._reset_con(host)
1718 self.event.set() # refresh stray health check
1719 self.log.info('Set host %s addr to %s' % (host, addr))
1720 return "Updated host '{}' addr to '{}'".format(host, addr)
1721
1722 @trivial_completion
1723 def get_hosts(self):
1724 # type: () -> List[orchestrator.HostSpec]
1725 """
1726 Return a list of hosts managed by the orchestrator.
1727
1728 Notes:
1729 - skip async: manager reads from cache.
1730 """
1731 r = []
1732 for hostname, info in self.inventory.items():
1733 r.append(orchestrator.HostSpec(
1734 hostname,
1735 addr=info.get('addr', hostname),
1736 labels=info.get('labels', []),
1737 status='Offline' if hostname in self.offline_hosts else info.get('status', ''),
1738 ))
1739 return r
1740
1741 @async_completion
1742 def add_host_label(self, host, label):
1743 if host not in self.inventory:
1744 raise OrchestratorError('host %s does not exist' % host)
1745
1746 if 'labels' not in self.inventory[host]:
1747 self.inventory[host]['labels'] = list()
1748 if label not in self.inventory[host]['labels']:
1749 self.inventory[host]['labels'].append(label)
1750 self._save_inventory()
1751 self.log.info('Added label %s to host %s' % (label, host))
1752 return 'Added label %s to host %s' % (label, host)
1753
1754 @async_completion
1755 def remove_host_label(self, host, label):
1756 if host not in self.inventory:
1757 raise OrchestratorError('host %s does not exist' % host)
1758
1759 if 'labels' not in self.inventory[host]:
1760 self.inventory[host]['labels'] = list()
1761 if label in self.inventory[host]['labels']:
1762 self.inventory[host]['labels'].remove(label)
1763 self._save_inventory()
1764 self.log.info('Removed label %s to host %s' % (label, host))
1765 return 'Removed label %s from host %s' % (label, host)
1766
1767 def _refresh_host_daemons(self, host):
1768 try:
1769 out, err, code = self._run_cephadm(
1770 host, 'mon', 'ls', [], no_fsid=True)
1771 if code:
1772 return 'host %s cephadm ls returned %d: %s' % (
1773 host, code, err)
1774 except Exception as e:
1775 return 'host %s scrape failed: %s' % (host, e)
1776 ls = json.loads(''.join(out))
1777 dm = {}
1778 for d in ls:
1779 if not d['style'].startswith('cephadm'):
1780 continue
1781 if d['fsid'] != self._cluster_fsid:
1782 continue
1783 if '.' not in d['name']:
1784 continue
1785 sd = orchestrator.DaemonDescription()
1786 sd.last_refresh = datetime.datetime.utcnow()
1787 for k in ['created', 'started', 'last_configured', 'last_deployed']:
1788 v = d.get(k, None)
1789 if v:
1790 setattr(sd, k, datetime.datetime.strptime(d[k], DATEFMT))
1791 sd.daemon_type = d['name'].split('.')[0]
1792 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
1793 sd.hostname = host
1794 sd.container_id = d.get('container_id')
1795 if sd.container_id:
1796 # shorten the hash
1797 sd.container_id = sd.container_id[0:12]
1798 sd.container_image_name = d.get('container_image_name')
1799 sd.container_image_id = d.get('container_image_id')
1800 sd.version = d.get('version')
1801 if 'state' in d:
1802 sd.status_desc = d['state']
1803 sd.status = {
1804 'running': 1,
1805 'stopped': 0,
1806 'error': -1,
1807 'unknown': -1,
1808 }[d['state']]
1809 else:
1810 sd.status_desc = 'unknown'
1811 sd.status = None
1812 dm[sd.name()] = sd
1813 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
1814 self.cache.update_host_daemons(host, dm)
1815 self.cache.save_host(host)
1816 return None
1817
1818 def _refresh_host_devices(self, host):
1819 try:
1820 out, err, code = self._run_cephadm(
1821 host, 'osd',
1822 'ceph-volume',
1823 ['--', 'inventory', '--format=json'])
1824 if code:
1825 return 'host %s ceph-volume inventory returned %d: %s' % (
1826 host, code, err)
1827 except Exception as e:
1828 return 'host %s ceph-volume inventory failed: %s' % (host, e)
1829 devices = json.loads(''.join(out))
1830 try:
1831 out, err, code = self._run_cephadm(
1832 host, 'mon',
1833 'list-networks',
1834 [],
1835 no_fsid=True)
1836 if code:
1837 return 'host %s list-networks returned %d: %s' % (
1838 host, code, err)
1839 except Exception as e:
1840 return 'host %s list-networks failed: %s' % (host, e)
1841 networks = json.loads(''.join(out))
1842 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
1843 host, len(devices), len(networks)))
1844 devices = inventory.Devices.from_json(devices)
1845 self.cache.update_host_devices_networks(host, devices.devices, networks)
1846 self.cache.save_host(host)
1847 return None
1848
1849 def _get_spec_size(self, spec):
1850 if spec.placement.count:
1851 return spec.placement.count
1852 elif spec.placement.host_pattern:
1853 return len(spec.placement.pattern_matches_hosts(self.inventory.keys()))
1854 elif spec.placement.label:
1855 return len(self._get_hosts(spec.placement.label))
1856 elif spec.placement.hosts:
1857 return len(spec.placement.hosts)
1858 # hmm!
1859 return 0
1860
1861 @trivial_completion
1862 def describe_service(self, service_type=None, service_name=None,
1863 refresh=False):
1864 if refresh:
1865 # ugly sync path, FIXME someday perhaps?
1866 for host, hi in self.inventory.items():
1867 self._refresh_host_daemons(host)
1868 # <service_map>
1869 sm = {} # type: Dict[str, orchestrator.ServiceDescription]
1870 for h, dm in self.cache.get_daemons_with_volatile_status():
1871 for name, dd in dm.items():
1872 if service_type and service_type != dd.daemon_type:
1873 continue
1874 n: str = dd.service_name()
1875 if service_name and service_name != n:
1876 continue
1877 if dd.daemon_type == 'osd':
1878 continue # ignore OSDs for now
1879 if dd.service_name() in self.spec_store.specs:
1880 spec = self.spec_store.specs[dd.service_name()]
1881 else:
1882 spec = ServiceSpec(
1883 unmanaged=True,
1884 service_type=dd.daemon_type,
1885 service_id=dd.service_id(),
1886 placement=PlacementSpec(
1887 hosts=[dd.hostname]
1888 )
1889 )
1890 if n not in sm:
1891 sm[n] = orchestrator.ServiceDescription(
1892 last_refresh=dd.last_refresh,
1893 container_image_id=dd.container_image_id,
1894 container_image_name=dd.container_image_name,
1895 spec=spec,
1896 )
1897 if dd.service_name() in self.spec_store.specs:
1898 sm[n].size = self._get_spec_size(spec)
1899 sm[n].created = self.spec_store.spec_created[dd.service_name()]
1900 if service_type == 'nfs':
1901 spec = cast(NFSServiceSpec, spec)
1902 sm[n].rados_config_location = spec.rados_config_location()
1903 else:
1904 sm[n].size = 0
1905 if dd.status == 1:
1906 sm[n].running += 1
1907 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1908 sm[n].last_refresh = dd.last_refresh
1909 if sm[n].container_image_id != dd.container_image_id:
1910 sm[n].container_image_id = 'mix'
1911 if sm[n].container_image_name != dd.container_image_name:
1912 sm[n].container_image_name = 'mix'
1913 for n, spec in self.spec_store.specs.items():
1914 if n in sm:
1915 continue
1916 if service_type is not None and service_type != spec.service_type:
1917 continue
1918 if service_name is not None and service_name != n:
1919 continue
1920 sm[n] = orchestrator.ServiceDescription(
1921 spec=spec,
1922 size=self._get_spec_size(spec),
1923 running=0,
1924 )
1925 if service_type == 'nfs':
1926 spec = cast(NFSServiceSpec, spec)
1927 sm[n].rados_config_location = spec.rados_config_location()
1928 return list(sm.values())
1929
1930 @trivial_completion
1931 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
1932 host=None, refresh=False):
1933 if refresh:
1934 # ugly sync path, FIXME someday perhaps?
1935 if host:
1936 self._refresh_host_daemons(host)
1937 else:
1938 for hostname, hi in self.inventory.items():
1939 self._refresh_host_daemons(hostname)
1940 result = []
1941 for h, dm in self.cache.get_daemons_with_volatile_status():
1942 if host and h != host:
1943 continue
1944 for name, dd in dm.items():
1945 if daemon_type is not None and daemon_type != dd.daemon_type:
1946 continue
1947 if daemon_id is not None and daemon_id != dd.daemon_id:
1948 continue
1949 if service_name is not None and service_name != dd.service_name():
1950 continue
1951 result.append(dd)
1952 return result
1953
1954 def service_action(self, action, service_name):
1955 args = []
1956 for host, dm in self.cache.daemons.items():
1957 for name, d in dm.items():
1958 if d.matches_service(service_name):
1959 args.append((d.daemon_type, d.daemon_id,
1960 d.hostname, action))
1961 self.log.info('%s service %s' % (action.capitalize(), service_name))
1962 return self._daemon_actions(args)
1963
1964 @async_map_completion
1965 def _daemon_actions(self, daemon_type, daemon_id, host, action):
1966 return self._daemon_action(daemon_type, daemon_id, host, action)
1967
1968 def _daemon_action(self, daemon_type, daemon_id, host, action):
1969 if action == 'redeploy':
1970 # stop, recreate the container+unit, then restart
1971 return self._create_daemon(daemon_type, daemon_id, host)
1972 elif action == 'reconfig':
1973 return self._create_daemon(daemon_type, daemon_id, host,
1974 reconfig=True)
1975
1976 actions = {
1977 'start': ['reset-failed', 'start'],
1978 'stop': ['stop'],
1979 'restart': ['reset-failed', 'restart'],
1980 }
1981 name = '%s.%s' % (daemon_type, daemon_id)
1982 for a in actions[action]:
1983 out, err, code = self._run_cephadm(
1984 host, name, 'unit',
1985 ['--name', name, a],
1986 error_ok=True)
1987 self.cache.invalidate_host_daemons(host)
1988 return "{} {} from host '{}'".format(action, name, host)
1989
1990 def daemon_action(self, action, daemon_type, daemon_id):
1991 args = []
1992 for host, dm in self.cache.daemons.items():
1993 for name, d in dm.items():
1994 if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
1995 args.append((d.daemon_type, d.daemon_id,
1996 d.hostname, action))
1997 if not args:
1998 raise orchestrator.OrchestratorError(
1999 'Unable to find %s.%s daemon(s)' % (
2000 daemon_type, daemon_id))
2001 self.log.info('%s daemons %s' % (
2002 action.capitalize(),
2003 ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
2004 return self._daemon_actions(args)
2005
2006 def remove_daemons(self, names):
2007 # type: (List[str]) -> orchestrator.Completion
2008 args = []
2009 for host, dm in self.cache.daemons.items():
2010 for name in names:
2011 if name in dm:
2012 args.append((name, host))
2013 if not args:
2014 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
2015 self.log.info('Remove daemons %s' % [a[0] for a in args])
2016 return self._remove_daemons(args)
2017
2018 @trivial_completion
2019 def remove_service(self, service_name):
2020 self.log.info('Remove service %s' % service_name)
2021 found = self.spec_store.rm(service_name)
2022 if found:
2023 self._kick_serve_loop()
2024 return ['Removed service %s' % service_name]
2025 else:
2026 # must be idempotent: still a success.
2027 return [f'Failed to remove service. <{service_name}> was not found.']
2028
2029 @trivial_completion
2030 def get_inventory(self, host_filter=None, refresh=False):
2031 """
2032 Return the storage inventory of hosts matching the given filter.
2033
2034 :param host_filter: host filter
2035
2036 TODO:
2037 - add filtering by label
2038 """
2039 if refresh:
2040 # ugly sync path, FIXME someday perhaps?
2041 if host_filter:
2042 for host in host_filter.hosts:
2043 self._refresh_host_devices(host)
2044 else:
2045 for host, hi in self.inventory.items():
2046 self._refresh_host_devices(host)
2047
2048 result = []
2049 for host, dls in self.cache.devices.items():
2050 if host_filter and host not in host_filter.hosts:
2051 continue
2052 result.append(orchestrator.InventoryHost(host,
2053 inventory.Devices(dls)))
2054 return result
2055
2056 @trivial_completion
2057 def zap_device(self, host, path):
2058 self.log.info('Zap device %s:%s' % (host, path))
2059 out, err, code = self._run_cephadm(
2060 host, 'osd', 'ceph-volume',
2061 ['--', 'lvm', 'zap', '--destroy', path],
2062 error_ok=True)
2063 self.cache.invalidate_host_devices(host)
2064 if code:
2065 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
2066 return '\n'.join(out + err)
2067
2068 def blink_device_light(self, ident_fault, on, locs):
2069 @async_map_completion
2070 def blink(host, dev, path):
2071 cmd = [
2072 'lsmcli',
2073 'local-disk-%s-led-%s' % (
2074 ident_fault,
2075 'on' if on else 'off'),
2076 '--path', path or dev,
2077 ]
2078 out, err, code = self._run_cephadm(
2079 host, 'osd', 'shell', ['--'] + cmd,
2080 error_ok=True)
2081 if code:
2082 raise RuntimeError(
2083 'Unable to affect %s light for %s:%s. Command: %s' % (
2084 ident_fault, host, dev, ' '.join(cmd)))
2085 self.log.info('Set %s light for %s:%s %s' % (
2086 ident_fault, host, dev, 'on' if on else 'off'))
2087 return "Set %s light for %s:%s %s" % (
2088 ident_fault, host, dev, 'on' if on else 'off')
2089
2090 return blink(locs)
2091
2092 def get_osd_uuid_map(self, only_up=False):
2093 # type: (bool) -> Dict[str, str]
2094 osd_map = self.get('osd_map')
2095 r = {}
2096 for o in osd_map['osds']:
2097 # only include OSDs that have ever started in this map. this way
2098 # an interrupted osd create can be repeated and succeed the second
2099 # time around.
2100 osd_id = o.get('osd')
2101 if osd_id is None:
2102 raise OrchestratorError("Could not retrieve osd_id from osd_map")
2103 if not only_up or (o['up_from'] > 0):
2104 r[str(osd_id)] = o.get('uuid', '')
2105 return r
2106
2107 @trivial_completion
2108 def apply_drivegroups(self, specs: List[DriveGroupSpec]):
2109 return [self._apply(spec) for spec in specs]
2110
2111 def find_destroyed_osds(self) -> Dict[str, List[str]]:
2112 osd_host_map: Dict[str, List[str]] = dict()
2113 ret, out, err = self.mon_command({
2114 'prefix': 'osd tree',
2115 'states': ['destroyed'],
2116 'format': 'json'
2117 })
2118 if ret != 0:
2119 raise OrchestratorError(f"Caught error on calling 'osd tree destroyed' -> {err}")
2120 try:
2121 tree = json.loads(out)
2122 except json.decoder.JSONDecodeError:
2123 self.log.error(f"Could not decode json -> {out}")
2124 return osd_host_map
2125
2126 nodes = tree.get('nodes', {})
2127 for node in nodes:
2128 if node.get('type') == 'host':
2129 osd_host_map.update(
2130 {node.get('name'): [str(_id) for _id in node.get('children', list())]}
2131 )
2132 return osd_host_map
2133
2134 @trivial_completion
2135 def create_osds(self, drive_group: DriveGroupSpec):
2136 self.log.debug(f"Processing DriveGroup {drive_group}")
2137 ret = []
2138 drive_group.osd_id_claims = self.find_destroyed_osds()
2139 self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
2140 for host, drive_selection in self.prepare_drivegroup(drive_group):
2141 self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
2142 cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
2143 drive_group.osd_id_claims.get(host, []))
2144 if not cmd:
2145 self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
2146 continue
2147 ret_msg = self._create_osd(host, cmd,
2148 replace_osd_ids=drive_group.osd_id_claims.get(host, []))
2149 ret.append(ret_msg)
2150 return ", ".join(ret)
2151
2152 def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
2153 # 1) use fn_filter to determine matching_hosts
2154 matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
2155 # 2) Map the inventory to the InventoryHost object
2156 host_ds_map = []
2157
2158 # set osd_id_claims
2159
2160 def _find_inv_for_host(hostname: str, inventory_dict: dict):
2161 # This is stupid and needs to be loaded with the host
2162 for _host, _inventory in inventory_dict.items():
2163 if _host == hostname:
2164 return _inventory
2165 raise OrchestratorError("No inventory found for host: {}".format(hostname))
2166
2167 # 3) iterate over matching_host and call DriveSelection
2168 self.log.debug(f"Checking matching hosts -> {matching_hosts}")
2169 for host in matching_hosts:
2170 inventory_for_host = _find_inv_for_host(host, self.cache.devices)
2171 self.log.debug(f"Found inventory for host {inventory_for_host}")
2172 drive_selection = DriveSelection(drive_group, inventory_for_host)
2173 self.log.debug(f"Found drive selection {drive_selection}")
2174 host_ds_map.append((host, drive_selection))
2175 return host_ds_map
2176
2177 def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
2178 drive_selection: DriveSelection,
2179 osd_id_claims: Optional[List[str]] = None,
2180 preview: bool = False) -> Optional[str]:
2181 self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
2182 cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
2183 self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
2184 return cmd
2185
2186 def preview_drivegroups(self, drive_group_name: Optional[str] = None,
2187 dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
2188 # find drivegroups
2189 if drive_group_name:
2190 drive_groups = cast(List[DriveGroupSpec],
2191 self.spec_store.find(service_name=drive_group_name))
2192 elif dg_specs:
2193 drive_groups = dg_specs
2194 else:
2195 drive_groups = []
2196 ret_all = []
2197 for drive_group in drive_groups:
2198 drive_group.osd_id_claims = self.find_destroyed_osds()
2199 self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
2200 # prepare driveselection
2201 for host, ds in self.prepare_drivegroup(drive_group):
2202 cmd = self.driveselection_to_ceph_volume(drive_group, ds,
2203 drive_group.osd_id_claims.get(host, []), preview=True)
2204 if not cmd:
2205 self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
2206 continue
2207 out, err, code = self._run_ceph_volume_command(host, cmd)
2208 if out:
2209 concat_out = json.loads(" ".join(out))
2210 ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
2211 return ret_all
2212
2213 def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
2214 self._require_hosts(host)
2215
2216 # get bootstrap key
2217 ret, keyring, err = self.mon_command({
2218 'prefix': 'auth get',
2219 'entity': 'client.bootstrap-osd',
2220 })
2221
2222 # generate config
2223 ret, config, err = self.mon_command({
2224 "prefix": "config generate-minimal-conf",
2225 })
2226
2227 j = json.dumps({
2228 'config': config,
2229 'keyring': keyring,
2230 })
2231
2232 split_cmd = cmd.split(' ')
2233 _cmd = ['--config-json', '-', '--']
2234 _cmd.extend(split_cmd)
2235 out, err, code = self._run_cephadm(
2236 host, 'osd', 'ceph-volume',
2237 _cmd,
2238 stdin=j,
2239 error_ok=True)
2240 return out, err, code
2241
2242 def _create_osd(self, host, cmd, replace_osd_ids=None):
2243 out, err, code = self._run_ceph_volume_command(host, cmd)
2244
2245 if code == 1 and ', it is already prepared' in '\n'.join(err):
2246 # HACK: when we create against an existing LV, ceph-volume
2247 # returns an error and the above message. To make this
2248 # command idempotent, tolerate this "error" and continue.
2249 self.log.debug('the device was already prepared; continuing')
2250 code = 0
2251 if code:
2252 raise RuntimeError(
2253 'cephadm exited with an error code: %d, stderr:%s' % (
2254 code, '\n'.join(err)))
2255
2256 # check result
2257 out, err, code = self._run_cephadm(
2258 host, 'osd', 'ceph-volume',
2259 [
2260 '--',
2261 'lvm', 'list',
2262 '--format', 'json',
2263 ])
2264 before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
2265 osds_elems = json.loads('\n'.join(out))
2266 fsid = self._cluster_fsid
2267 osd_uuid_map = self.get_osd_uuid_map()
2268 created = []
2269 for osd_id, osds in osds_elems.items():
2270 for osd in osds:
2271 if osd['tags']['ceph.cluster_fsid'] != fsid:
2272 self.log.debug('mismatched fsid, skipping %s' % osd)
2273 continue
2274 if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
2275 # if it exists but is part of the replacement operation, don't skip
2276 continue
2277 if osd_id not in osd_uuid_map:
2278 self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
2279 continue
2280 if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
2281 self.log.debug('mismatched osd uuid (cluster has %s, osd '
2282 'has %s)' % (
2283 osd_uuid_map.get(osd_id),
2284 osd['tags']['ceph.osd_fsid']))
2285 continue
2286
2287 created.append(osd_id)
2288 self._create_daemon(
2289 'osd', osd_id, host,
2290 osd_uuid_map=osd_uuid_map)
2291
2292 if created:
2293 self.cache.invalidate_host_devices(host)
2294 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
2295 else:
2296 return "Created no osd(s) on host %s; already created?" % host
2297
2298 def _calc_daemon_deps(self, daemon_type, daemon_id):
2299 need = {
2300 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
2301 'grafana': ['prometheus'],
2302 'alertmanager': ['mgr', 'alertmanager'],
2303 }
2304 deps = []
2305 for dep_type in need.get(daemon_type, []):
2306 for dd in self.cache.get_daemons_by_service(dep_type):
2307 deps.append(dd.name())
2308 return sorted(deps)
2309
2310 def _get_config_and_keyring(self, daemon_type, daemon_id,
2311 keyring=None,
2312 extra_ceph_config=None):
2313 # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
2314 # keyring
2315 if not keyring:
2316 if daemon_type == 'mon':
2317 ename = 'mon.'
2318 else:
2319 ename = utils.name_to_config_section(daemon_type + '.' + daemon_id)
2320 ret, keyring, err = self.mon_command({
2321 'prefix': 'auth get',
2322 'entity': ename,
2323 })
2324
2325 # generate config
2326 ret, config, err = self.mon_command({
2327 "prefix": "config generate-minimal-conf",
2328 })
2329 if extra_ceph_config:
2330 config += extra_ceph_config
2331
2332 return {
2333 'config': config,
2334 'keyring': keyring,
2335 }
2336
2337 def _create_daemon(self, daemon_type, daemon_id, host,
2338 keyring=None,
2339 extra_args=None, extra_config=None,
2340 reconfig=False,
2341 osd_uuid_map=None):
2342 if not extra_args:
2343 extra_args = []
2344 if not extra_config:
2345 extra_config = {}
2346 name = '%s.%s' % (daemon_type, daemon_id)
2347
2348 start_time = datetime.datetime.utcnow()
2349 deps = [] # type: List[str]
2350 cephadm_config = {} # type: Dict[str, Any]
2351 if daemon_type == 'prometheus':
2352 cephadm_config, deps = self._generate_prometheus_config()
2353 extra_args.extend(['--config-json', '-'])
2354 elif daemon_type == 'grafana':
2355 cephadm_config, deps = self._generate_grafana_config()
2356 extra_args.extend(['--config-json', '-'])
2357 elif daemon_type == 'nfs':
2358 cephadm_config, deps = \
2359 self._generate_nfs_config(daemon_type, daemon_id, host)
2360 extra_args.extend(['--config-json', '-'])
2361 elif daemon_type == 'alertmanager':
2362 cephadm_config, deps = self._generate_alertmanager_config()
2363 extra_args.extend(['--config-json', '-'])
2364 else:
2365 # Ceph.daemons (mon, mgr, mds, osd, etc)
2366 cephadm_config = self._get_config_and_keyring(
2367 daemon_type, daemon_id,
2368 keyring=keyring,
2369 extra_ceph_config=extra_config.pop('config', ''))
2370 if extra_config:
2371 cephadm_config.update({'files': extra_config})
2372 extra_args.extend(['--config-json', '-'])
2373
2374 # osd deployments needs an --osd-uuid arg
2375 if daemon_type == 'osd':
2376 if not osd_uuid_map:
2377 osd_uuid_map = self.get_osd_uuid_map()
2378 osd_uuid = osd_uuid_map.get(daemon_id)
2379 if not osd_uuid:
2380 raise OrchestratorError('osd.%d not in osdmap' % daemon_id)
2381 extra_args.extend(['--osd-fsid', osd_uuid])
2382
2383 if reconfig:
2384 extra_args.append('--reconfig')
2385 if self.allow_ptrace:
2386 extra_args.append('--allow-ptrace')
2387
2388 self.log.info('%s daemon %s on %s' % (
2389 'Reconfiguring' if reconfig else 'Deploying',
2390 name, host))
2391
2392 out, err, code = self._run_cephadm(
2393 host, name, 'deploy',
2394 [
2395 '--name', name,
2396 ] + extra_args,
2397 stdin=json.dumps(cephadm_config))
2398 if not code and host in self.cache.daemons:
2399 # prime cached service state with what we (should have)
2400 # just created
2401 sd = orchestrator.DaemonDescription()
2402 sd.daemon_type = daemon_type
2403 sd.daemon_id = daemon_id
2404 sd.hostname = host
2405 sd.status = 1
2406 sd.status_desc = 'starting'
2407 self.cache.add_daemon(host, sd)
2408 self.cache.invalidate_host_daemons(host)
2409 self.cache.update_daemon_config_deps(host, name, deps, start_time)
2410 self.cache.save_host(host)
2411 return "{} {} on host '{}'".format(
2412 'Reconfigured' if reconfig else 'Deployed', name, host)
2413
2414 @async_map_completion
2415 def _remove_daemons(self, name, host):
2416 return self._remove_daemon(name, host)
2417
2418 def _remove_daemon(self, name, host):
2419 """
2420 Remove a daemon
2421 """
2422 (daemon_type, daemon_id) = name.split('.', 1)
2423 if daemon_type == 'mon':
2424 self._check_safe_to_destroy_mon(daemon_id)
2425
2426 # remove mon from quorum before we destroy the daemon
2427 self.log.info('Removing monitor %s from monmap...' % name)
2428 ret, out, err = self.mon_command({
2429 'prefix': 'mon rm',
2430 'name': daemon_id,
2431 })
2432 if ret:
2433 raise OrchestratorError('failed to remove mon %s from monmap' % (
2434 name))
2435
2436 args = ['--name', name, '--force']
2437 self.log.info('Removing daemon %s from %s' % (name, host))
2438 out, err, code = self._run_cephadm(
2439 host, name, 'rm-daemon', args)
2440 if not code:
2441 # remove item from cache
2442 self.cache.rm_daemon(host, name)
2443 self.cache.invalidate_host_daemons(host)
2444 return "Removed {} from host '{}'".format(name, host)
2445
2446 def _apply_service(self, spec):
2447 """
2448 Schedule a service. Deploy new daemons or remove old ones, depending
2449 on the target label and count specified in the placement.
2450 """
2451 daemon_type = spec.service_type
2452 service_name = spec.service_name()
2453 if spec.unmanaged:
2454 self.log.debug('Skipping unmanaged service %s spec' % service_name)
2455 return False
2456 self.log.debug('Applying service %s spec' % service_name)
2457 create_fns = {
2458 'mon': self._create_mon,
2459 'mgr': self._create_mgr,
2460 'osd': self.create_osds,
2461 'mds': self._create_mds,
2462 'rgw': self._create_rgw,
2463 'rbd-mirror': self._create_rbd_mirror,
2464 'nfs': self._create_nfs,
2465 'grafana': self._create_grafana,
2466 'alertmanager': self._create_alertmanager,
2467 'prometheus': self._create_prometheus,
2468 'node-exporter': self._create_node_exporter,
2469 'crash': self._create_crash,
2470 'iscsi': self._create_iscsi,
2471 }
2472 config_fns = {
2473 'mds': self._config_mds,
2474 'rgw': self._config_rgw,
2475 'nfs': self._config_nfs,
2476 'iscsi': self._config_iscsi,
2477 }
2478 create_func = create_fns.get(daemon_type, None)
2479 if not create_func:
2480 self.log.debug('unrecognized service type %s' % daemon_type)
2481 return False
2482 config_func = config_fns.get(daemon_type, None)
2483
2484 daemons = self.cache.get_daemons_by_service(service_name)
2485
2486 public_network = None
2487 if daemon_type == 'mon':
2488 ret, out, err = self.mon_command({
2489 'prefix': 'config get',
2490 'who': 'mon',
2491 'key': 'public_network',
2492 })
2493 if '/' in out:
2494 public_network = out.strip()
2495 self.log.debug('mon public_network is %s' % public_network)
2496
2497 def matches_network(host):
2498 # type: (str) -> bool
2499 if not public_network:
2500 return False
2501 # make sure we have 1 or more IPs for that network on that
2502 # host
2503 return len(self.cache.networks[host].get(public_network, [])) > 0
2504
2505 hosts = HostAssignment(
2506 spec=spec,
2507 get_hosts_func=self._get_hosts,
2508 get_daemons_func=self.cache.get_daemons_by_service,
2509 filter_new_host=matches_network if daemon_type == 'mon' else None,
2510 ).place()
2511
2512 r = False
2513
2514 if daemon_type == 'osd':
2515 return False if create_func(spec) else True # type: ignore
2516
2517 # sanity check
2518 if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
2519 self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
2520 return False
2521
2522 # add any?
2523 did_config = False
2524 hosts_with_daemons = {d.hostname for d in daemons}
2525 self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
2526 for host, network, name in hosts:
2527 if host not in hosts_with_daemons:
2528 if not did_config and config_func:
2529 config_func(spec)
2530 did_config = True
2531 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2532 spec.service_id, name)
2533 self.log.debug('Placing %s.%s on host %s' % (
2534 daemon_type, daemon_id, host))
2535 if daemon_type == 'mon':
2536 create_func(daemon_id, host, network) # type: ignore
2537 elif daemon_type == 'nfs':
2538 create_func(daemon_id, host, spec) # type: ignore
2539 else:
2540 create_func(daemon_id, host) # type: ignore
2541
2542 # add to daemon list so next name(s) will also be unique
2543 sd = orchestrator.DaemonDescription(
2544 hostname=host,
2545 daemon_type=daemon_type,
2546 daemon_id=daemon_id,
2547 )
2548 daemons.append(sd)
2549 r = True
2550
2551 # remove any?
2552 target_hosts = [h.hostname for h in hosts]
2553 for d in daemons:
2554 if d.hostname not in target_hosts:
2555 # NOTE: we are passing the 'force' flag here, which means
2556 # we can delete a mon instances data.
2557 self._remove_daemon(d.name(), d.hostname)
2558 r = True
2559
2560 return r
2561
2562 def _apply_all_services(self):
2563 r = False
2564 specs = [] # type: List[ServiceSpec]
2565 for sn, spec in self.spec_store.specs.items():
2566 specs.append(spec)
2567 for spec in specs:
2568 try:
2569 if self._apply_service(spec):
2570 r = True
2571 except Exception as e:
2572 self.log.warning('Failed to apply %s spec %s: %s' % (
2573 spec.service_name(), spec, e))
2574 return r
2575
2576 def _check_daemons(self):
2577 # get monmap mtime so we can refresh configs when mons change
2578 monmap = self.get('mon_map')
2579 last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
2580 monmap['modified'], CEPH_DATEFMT)
2581 if last_monmap and last_monmap > datetime.datetime.utcnow():
2582 last_monmap = None # just in case clocks are skewed
2583
2584 daemons = self.cache.get_daemons()
2585 grafanas = [] # type: List[orchestrator.DaemonDescription]
2586 for dd in daemons:
2587 # orphan?
2588 spec = self.spec_store.specs.get(dd.service_name(), None)
2589 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
2590 # (mon and mgr specs should always exist; osds aren't matched
2591 # to a service spec)
2592 self.log.info('Removing orphan daemon %s...' % dd.name())
2593 self._remove_daemon(dd.name(), dd.hostname)
2594
2595 # ignore unmanaged services
2596 if not spec or spec.unmanaged:
2597 continue
2598
2599 # dependencies?
2600 if dd.daemon_type == 'grafana':
2601 # put running instances at the front of the list
2602 grafanas.insert(0, dd)
2603 deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
2604 last_deps, last_config = self.cache.get_daemon_last_config_deps(
2605 dd.hostname, dd.name())
2606 if last_deps is None:
2607 last_deps = []
2608 reconfig = False
2609 if not last_config:
2610 self.log.info('Reconfiguring %s (unknown last config time)...'% (
2611 dd.name()))
2612 reconfig = True
2613 elif last_deps != deps:
2614 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
2615 deps))
2616 self.log.info('Reconfiguring %s (dependencies changed)...' % (
2617 dd.name()))
2618 reconfig = True
2619 elif last_monmap and \
2620 last_monmap > last_config and \
2621 dd.daemon_type in CEPH_TYPES:
2622 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
2623 reconfig = True
2624 if reconfig:
2625 self._create_daemon(dd.daemon_type, dd.daemon_id,
2626 dd.hostname, reconfig=True)
2627
2628 # make sure the dashboard [does not] references grafana
2629 try:
2630 current_url = self.get_module_option_ex('dashboard',
2631 'GRAFANA_API_URL')
2632 if grafanas:
2633 host = grafanas[0].hostname
2634 url = 'https://%s:3000' % (self.inventory[host].get('addr',
2635 host))
2636 if current_url != url:
2637 self.log.info('Setting dashboard grafana config to %s' % url)
2638 self.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
2639 url)
2640 # FIXME: is it a signed cert??
2641 except Exception as e:
2642 self.log.debug('got exception fetching dashboard grafana state: %s',
2643 e)
2644
2645 def _add_daemon(self, daemon_type, spec,
2646 create_func, config_func=None):
2647 """
2648 Add (and place) a daemon. Require explicit host placement. Do not
2649 schedule, and do not apply the related scheduling limitations.
2650 """
2651 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2652 if not spec.placement.hosts:
2653 raise OrchestratorError('must specify host(s) to deploy on')
2654 count = spec.placement.count or len(spec.placement.hosts)
2655 daemons = self.cache.get_daemons_by_service(spec.service_name())
2656 return self._create_daemons(daemon_type, spec, daemons,
2657 spec.placement.hosts, count,
2658 create_func, config_func)
2659
2660 def _create_daemons(self, daemon_type, spec, daemons,
2661 hosts, count,
2662 create_func, config_func=None):
2663 if count > len(hosts):
2664 raise OrchestratorError('too few hosts: want %d, have %s' % (
2665 count, hosts))
2666
2667 if config_func:
2668 config_func(spec)
2669
2670 args = [] # type: List[tuple]
2671 for host, network, name in hosts:
2672 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2673 spec.service_id, name)
2674 self.log.debug('Placing %s.%s on host %s' % (
2675 daemon_type, daemon_id, host))
2676 if daemon_type == 'mon':
2677 args.append((daemon_id, host, network)) # type: ignore
2678 elif daemon_type == 'nfs':
2679 args.append((daemon_id, host, spec)) # type: ignore
2680 elif daemon_type == 'iscsi':
2681 args.append((daemon_id, host, spec)) # type: ignore
2682 else:
2683 args.append((daemon_id, host)) # type: ignore
2684
2685 # add to daemon list so next name(s) will also be unique
2686 sd = orchestrator.DaemonDescription(
2687 hostname=host,
2688 daemon_type=daemon_type,
2689 daemon_id=daemon_id,
2690 )
2691 daemons.append(sd)
2692
2693 @async_map_completion
2694 def create_func_map(*args):
2695 return create_func(*args)
2696
2697 return create_func_map(args)
2698
2699 @trivial_completion
2700 def apply_mon(self, spec):
2701 return self._apply(spec)
2702
2703 def _create_mon(self, name, host, network):
2704 """
2705 Create a new monitor on the given host.
2706 """
2707 # get mon. key
2708 ret, keyring, err = self.mon_command({
2709 'prefix': 'auth get',
2710 'entity': 'mon.',
2711 })
2712
2713 extra_config = '[mon.%s]\n' % name
2714 if network:
2715 # infer whether this is a CIDR network, addrvec, or plain IP
2716 if '/' in network:
2717 extra_config += 'public network = %s\n' % network
2718 elif network.startswith('[v') and network.endswith(']'):
2719 extra_config += 'public addrv = %s\n' % network
2720 elif ':' not in network:
2721 extra_config += 'public addr = %s\n' % network
2722 else:
2723 raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
2724 else:
2725 # try to get the public_network from the config
2726 ret, network, err = self.mon_command({
2727 'prefix': 'config get',
2728 'who': 'mon',
2729 'key': 'public_network',
2730 })
2731 network = network.strip() # type: ignore
2732 if ret:
2733 raise RuntimeError('Unable to fetch cluster_network config option')
2734 if not network:
2735 raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
2736 if '/' not in network:
2737 raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
2738 extra_config += 'public network = %s\n' % network
2739
2740 return self._create_daemon('mon', name, host,
2741 keyring=keyring,
2742 extra_config={'config': extra_config})
2743
2744 def add_mon(self, spec):
2745 # type: (ServiceSpec) -> orchestrator.Completion
2746 return self._add_daemon('mon', spec, self._create_mon)
2747
2748 def _create_mgr(self, mgr_id, host):
2749 """
2750 Create a new manager instance on a host.
2751 """
2752 # get mgr. key
2753 ret, keyring, err = self.mon_command({
2754 'prefix': 'auth get-or-create',
2755 'entity': 'mgr.%s' % mgr_id,
2756 'caps': ['mon', 'profile mgr',
2757 'osd', 'allow *',
2758 'mds', 'allow *'],
2759 })
2760
2761 return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
2762
2763 def add_mgr(self, spec):
2764 # type: (ServiceSpec) -> orchestrator.Completion
2765 return self._add_daemon('mgr', spec, self._create_mgr)
2766
2767 def _apply(self, spec: ServiceSpec) -> str:
2768 if spec.placement.is_empty():
2769 # fill in default placement
2770 defaults = {
2771 'mon': PlacementSpec(count=5),
2772 'mgr': PlacementSpec(count=2),
2773 'mds': PlacementSpec(count=2),
2774 'rgw': PlacementSpec(count=2),
2775 'iscsi': PlacementSpec(count=1),
2776 'rbd-mirror': PlacementSpec(count=2),
2777 'nfs': PlacementSpec(count=1),
2778 'grafana': PlacementSpec(count=1),
2779 'alertmanager': PlacementSpec(count=1),
2780 'prometheus': PlacementSpec(count=1),
2781 'node-exporter': PlacementSpec(host_pattern='*'),
2782 'crash': PlacementSpec(host_pattern='*'),
2783 }
2784 spec.placement = defaults[spec.service_type]
2785 elif spec.service_type in ['mon', 'mgr'] and \
2786 spec.placement.count is not None and \
2787 spec.placement.count < 1:
2788 raise OrchestratorError('cannot scale %s service below 1' % (
2789 spec.service_type))
2790
2791 HostAssignment(
2792 spec=spec,
2793 get_hosts_func=self._get_hosts,
2794 get_daemons_func=self.cache.get_daemons_by_service,
2795 ).validate()
2796
2797 self.log.info('Saving service %s spec with placement %s' % (
2798 spec.service_name(), spec.placement.pretty_str()))
2799 self.spec_store.save(spec)
2800 self._kick_serve_loop()
2801 return "Scheduled %s update..." % spec.service_name()
2802
2803 @trivial_completion
2804 def apply(self, specs: List[ServiceSpec]):
2805 return [self._apply(spec) for spec in specs]
2806
2807 @trivial_completion
2808 def apply_mgr(self, spec):
2809 return self._apply(spec)
2810
2811 def add_mds(self, spec: ServiceSpec):
2812 return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
2813
2814 @trivial_completion
2815 def apply_mds(self, spec: ServiceSpec):
2816 return self._apply(spec)
2817
2818 def _config_mds(self, spec):
2819 # ensure mds_join_fs is set for these daemons
2820 assert spec.service_id
2821 ret, out, err = self.mon_command({
2822 'prefix': 'config set',
2823 'who': 'mds.' + spec.service_id,
2824 'name': 'mds_join_fs',
2825 'value': spec.service_id,
2826 })
2827
2828 def _create_mds(self, mds_id, host):
2829 # get mgr. key
2830 ret, keyring, err = self.mon_command({
2831 'prefix': 'auth get-or-create',
2832 'entity': 'mds.' + mds_id,
2833 'caps': ['mon', 'profile mds',
2834 'osd', 'allow rwx',
2835 'mds', 'allow'],
2836 })
2837 return self._create_daemon('mds', mds_id, host, keyring=keyring)
2838
2839 def add_rgw(self, spec):
2840 return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
2841
2842 def _config_rgw(self, spec):
2843 # ensure rgw_realm and rgw_zone is set for these daemons
2844 ret, out, err = self.mon_command({
2845 'prefix': 'config set',
2846 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
2847 'name': 'rgw_zone',
2848 'value': spec.rgw_zone,
2849 })
2850 ret, out, err = self.mon_command({
2851 'prefix': 'config set',
2852 'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
2853 'name': 'rgw_realm',
2854 'value': spec.rgw_realm,
2855 })
2856 ret, out, err = self.mon_command({
2857 'prefix': 'config set',
2858 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
2859 'name': 'rgw_frontends',
2860 'value': spec.rgw_frontends_config_value(),
2861 })
2862
2863 if spec.rgw_frontend_ssl_certificate:
2864 if isinstance(spec.rgw_frontend_ssl_certificate, list):
2865 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
2866 else:
2867 cert_data = spec.rgw_frontend_ssl_certificate
2868 ret, out, err = self.mon_command({
2869 'prefix': 'config-key set',
2870 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
2871 'val': cert_data,
2872 })
2873
2874 if spec.rgw_frontend_ssl_key:
2875 if isinstance(spec.rgw_frontend_ssl_key, list):
2876 key_data = '\n'.join(spec.rgw_frontend_ssl_key)
2877 else:
2878 key_data = spec.rgw_frontend_ssl_key
2879 ret, out, err = self.mon_command({
2880 'prefix': 'config-key set',
2881 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
2882 'val': key_data,
2883 })
2884
2885 logger.info('Saving service %s spec with placement %s' % (
2886 spec.service_name(), spec.placement.pretty_str()))
2887 self.spec_store.save(spec)
2888
2889 def _create_rgw(self, rgw_id, host):
2890 ret, keyring, err = self.mon_command({
2891 'prefix': 'auth get-or-create',
2892 'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
2893 'caps': ['mon', 'allow *',
2894 'mgr', 'allow rw',
2895 'osd', 'allow rwx'],
2896 })
2897 return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
2898
2899 @trivial_completion
2900 def apply_rgw(self, spec):
2901 return self._apply(spec)
2902
2903 def add_iscsi(self, spec):
2904 # type: (ServiceSpec) -> orchestrator.Completion
2905 return self._add_daemon('iscsi', spec, self._create_iscsi, self._config_iscsi)
2906
2907 def _config_iscsi(self, spec):
2908 logger.info('Saving service %s spec with placement %s' % (
2909 spec.service_name(), spec.placement.pretty_str()))
2910 self.spec_store.save(spec)
2911
2912 def _create_iscsi(self, igw_id, host, spec):
2913 ret, keyring, err = self.mon_command({
2914 'prefix': 'auth get-or-create',
2915 'entity': utils.name_to_config_section('iscsi') + '.' + igw_id,
2916 'caps': ['mon', 'allow rw',
2917 'osd', f'allow rwx pool={spec.pool}'],
2918 })
2919
2920 api_secure = 'false' if spec.api_secure is None else spec.api_secure
2921 igw_conf = f"""
2922 # generated by cephadm
2923 [config]
2924 cluster_client_name = {utils.name_to_config_section('iscsi')}.{igw_id}
2925 pool = {spec.pool}
2926 trusted_ip_list = {spec.trusted_ip_list or ''}
2927 minimum_gateways = 1
2928 fqdn_enabled = {spec.fqdn_enabled or ''}
2929 api_port = {spec.api_port or ''}
2930 api_user = {spec.api_user or ''}
2931 api_password = {spec.api_password or ''}
2932 api_secure = {api_secure}
2933 """
2934 extra_config = {'iscsi-gateway.cfg': igw_conf}
2935 return self._create_daemon('iscsi', igw_id, host, keyring=keyring,
2936 extra_config=extra_config)
2937
2938 @trivial_completion
2939 def apply_iscsi(self, spec):
2940 return self._apply(spec)
2941
2942 def add_rbd_mirror(self, spec):
2943 return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
2944
2945 def _create_rbd_mirror(self, daemon_id, host):
2946 ret, keyring, err = self.mon_command({
2947 'prefix': 'auth get-or-create',
2948 'entity': 'client.rbd-mirror.' + daemon_id,
2949 'caps': ['mon', 'profile rbd-mirror',
2950 'osd', 'profile rbd'],
2951 })
2952 return self._create_daemon('rbd-mirror', daemon_id, host,
2953 keyring=keyring)
2954
2955 @trivial_completion
2956 def apply_rbd_mirror(self, spec):
2957 return self._apply(spec)
2958
2959 def _generate_nfs_config(self, daemon_type, daemon_id, host):
2960 # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
2961 deps = [] # type: List[str]
2962
2963 # find the matching NFSServiceSpec
2964 # TODO: find the spec and pass via _create_daemon instead ??
2965 service_name = self.get_service_name(daemon_type, daemon_id, host)
2966 specs = self.spec_store.find(service_name)
2967 if not specs:
2968 raise OrchestratorError('Cannot find service spec %s' % (service_name))
2969 elif len(specs) > 1:
2970 raise OrchestratorError('Found multiple service specs for %s' % (service_name))
2971 else:
2972 # cast to keep mypy happy
2973 spec = cast(NFSServiceSpec, specs[0])
2974
2975 nfs = NFSGanesha(self, daemon_id, spec)
2976
2977 # create the keyring
2978 entity = nfs.get_keyring_entity()
2979 keyring = nfs.get_or_create_keyring(entity=entity)
2980
2981 # update the caps after get-or-create, the keyring might already exist!
2982 nfs.update_keyring_caps(entity=entity)
2983
2984 # create the rados config object
2985 nfs.create_rados_config_obj()
2986
2987 # generate the cephadm config
2988 cephadm_config = nfs.get_cephadm_config()
2989 cephadm_config.update(
2990 self._get_config_and_keyring(
2991 daemon_type, daemon_id,
2992 keyring=keyring))
2993
2994 return cephadm_config, deps
2995
2996 def add_nfs(self, spec):
2997 return self._add_daemon('nfs', spec, self._create_nfs, self._config_nfs)
2998
2999 def _config_nfs(self, spec):
3000 logger.info('Saving service %s spec with placement %s' % (
3001 spec.service_name(), spec.placement.pretty_str()))
3002 self.spec_store.save(spec)
3003
3004 def _create_nfs(self, daemon_id, host, spec):
3005 return self._create_daemon('nfs', daemon_id, host)
3006
3007 @trivial_completion
3008 def apply_nfs(self, spec):
3009 return self._apply(spec)
3010
3011 def _generate_prometheus_config(self):
3012 # type: () -> Tuple[Dict[str, Any], List[str]]
3013 deps = [] # type: List[str]
3014
3015 # scrape mgrs
3016 mgr_scrape_list = []
3017 mgr_map = self.get('mgr_map')
3018 port = None
3019 t = mgr_map.get('services', {}).get('prometheus', None)
3020 if t:
3021 t = t.split('/')[2]
3022 mgr_scrape_list.append(t)
3023 port = '9283'
3024 if ':' in t:
3025 port = t.split(':')[1]
3026 # scan all mgrs to generate deps and to get standbys too.
3027 # assume that they are all on the same port as the active mgr.
3028 for dd in self.cache.get_daemons_by_service('mgr'):
3029 # we consider the mgr a dep even if the prometheus module is
3030 # disabled in order to be consistent with _calc_daemon_deps().
3031 deps.append(dd.name())
3032 if not port:
3033 continue
3034 if dd.daemon_id == self.get_mgr_id():
3035 continue
3036 hi = self.inventory.get(dd.hostname, {})
3037 addr = hi.get('addr', dd.hostname)
3038 mgr_scrape_list.append(addr.split(':')[0] + ':' + port)
3039
3040 # scrape node exporters
3041 node_configs = ''
3042 for dd in self.cache.get_daemons_by_service('node-exporter'):
3043 deps.append(dd.name())
3044 hi = self.inventory.get(dd.hostname, {})
3045 addr = hi.get('addr', dd.hostname)
3046 if not node_configs:
3047 node_configs = """
3048 - job_name: 'node'
3049 static_configs:
3050 """
3051 node_configs += """ - targets: {}
3052 labels:
3053 instance: '{}'
3054 """.format([addr.split(':')[0] + ':9100'],
3055 dd.hostname)
3056
3057 # scrape alert managers
3058 alertmgr_configs = ""
3059 alertmgr_targets = []
3060 for dd in self.cache.get_daemons_by_service('alertmanager'):
3061 deps.append(dd.name())
3062 hi = self.inventory.get(dd.hostname, {})
3063 addr = hi.get('addr', dd.hostname)
3064 alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0]))
3065 if alertmgr_targets:
3066 alertmgr_configs = """alerting:
3067 alertmanagers:
3068 - scheme: http
3069 path_prefix: /alertmanager
3070 static_configs:
3071 - targets: [{}]
3072 """.format(", ".join(alertmgr_targets))
3073
3074 # generate the prometheus configuration
3075 r = {
3076 'files': {
3077 'prometheus.yml': """# generated by cephadm
3078 global:
3079 scrape_interval: 5s
3080 evaluation_interval: 10s
3081 rule_files:
3082 - /etc/prometheus/alerting/*
3083 {alertmgr_configs}
3084 scrape_configs:
3085 - job_name: 'ceph'
3086 static_configs:
3087 - targets: {mgr_scrape_list}
3088 labels:
3089 instance: 'ceph_cluster'
3090 {node_configs}
3091 """.format(
3092 mgr_scrape_list=str(mgr_scrape_list),
3093 node_configs=str(node_configs),
3094 alertmgr_configs=str(alertmgr_configs)
3095 ),
3096 },
3097 }
3098
3099 # include alerts, if present in the container
3100 if os.path.exists(self.prometheus_alerts_path):
3101 with open(self.prometheus_alerts_path, "r") as f:
3102 alerts = f.read()
3103 r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
3104
3105 return r, sorted(deps)
3106
3107 def _generate_grafana_config(self):
3108 # type: () -> Tuple[Dict[str, Any], List[str]]
3109 deps = [] # type: List[str]
3110 def generate_grafana_ds_config(hosts: List[str]) -> str:
3111 config = '''# generated by cephadm
3112 deleteDatasources:
3113 {delete_data_sources}
3114
3115 datasources:
3116 {data_sources}
3117 '''
3118 delete_ds_template = '''
3119 - name: '{name}'
3120 orgId: 1\n'''.lstrip('\n')
3121 ds_template = '''
3122 - name: '{name}'
3123 type: 'prometheus'
3124 access: 'proxy'
3125 orgId: 1
3126 url: 'http://{host}:9095'
3127 basicAuth: false
3128 isDefault: {is_default}
3129 editable: false\n'''.lstrip('\n')
3130
3131 delete_data_sources = ''
3132 data_sources = ''
3133 for i, host in enumerate(hosts):
3134 name = "Dashboard %d" % (i + 1)
3135 data_sources += ds_template.format(
3136 name=name,
3137 host=host,
3138 is_default=str(i == 0).lower()
3139 )
3140 delete_data_sources += delete_ds_template.format(
3141 name=name
3142 )
3143 return config.format(
3144 delete_data_sources=delete_data_sources,
3145 data_sources=data_sources,
3146 )
3147
3148 prom_services = [] # type: List[str]
3149 for dd in self.cache.get_daemons_by_service('prometheus'):
3150 prom_services.append(dd.hostname)
3151 deps.append(dd.name())
3152
3153 cert = self.get_store('grafana_crt')
3154 pkey = self.get_store('grafana_key')
3155 if cert and pkey:
3156 try:
3157 verify_tls(cert, pkey)
3158 except ServerConfigException as e:
3159 logger.warning('Provided grafana TLS certificates invalid: %s', str(e))
3160 cert, pkey = None, None
3161 if not (cert and pkey):
3162 cert, pkey = create_self_signed_cert('Ceph', 'cephadm')
3163 self.set_store('grafana_crt', cert)
3164 self.set_store('grafana_key', pkey)
3165 self.mon_command({
3166 'prefix': 'dashboard set-grafana-api-ssl-verify',
3167 'value': 'false',
3168 })
3169
3170
3171
3172 config_file = {
3173 'files': {
3174 "grafana.ini": """# generated by cephadm
3175 [users]
3176 default_theme = light
3177 [auth.anonymous]
3178 enabled = true
3179 org_name = 'Main Org.'
3180 org_role = 'Viewer'
3181 [server]
3182 domain = 'bootstrap.storage.lab'
3183 protocol = https
3184 cert_file = /etc/grafana/certs/cert_file
3185 cert_key = /etc/grafana/certs/cert_key
3186 http_port = 3000
3187 [security]
3188 admin_user = admin
3189 admin_password = admin
3190 allow_embedding = true
3191 """,
3192 'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services),
3193 'certs/cert_file': '# generated by cephadm\n%s' % cert,
3194 'certs/cert_key': '# generated by cephadm\n%s' % pkey,
3195 }
3196 }
3197 return config_file, sorted(deps)
3198
3199 def _get_dashboard_url(self):
3200 # type: () -> str
3201 return self.get('mgr_map').get('services', {}).get('dashboard', '')
3202
3203 def _generate_alertmanager_config(self):
3204 # type: () -> Tuple[Dict[str, Any], List[str]]
3205 deps = [] # type: List[str]
3206
3207 # dashboard(s)
3208 dashboard_urls = []
3209 mgr_map = self.get('mgr_map')
3210 port = None
3211 proto = None # http: or https:
3212 url = mgr_map.get('services', {}).get('dashboard', None)
3213 if url:
3214 dashboard_urls.append(url)
3215 proto = url.split('/')[0]
3216 port = url.split('/')[2].split(':')[1]
3217 # scan all mgrs to generate deps and to get standbys too.
3218 # assume that they are all on the same port as the active mgr.
3219 for dd in self.cache.get_daemons_by_service('mgr'):
3220 # we consider mgr a dep even if the dashboard is disabled
3221 # in order to be consistent with _calc_daemon_deps().
3222 deps.append(dd.name())
3223 if not port:
3224 continue
3225 if dd.daemon_id == self.get_mgr_id():
3226 continue
3227 hi = self.inventory.get(dd.hostname, {})
3228 addr = hi.get('addr', dd.hostname)
3229 dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0],
3230 port))
3231
3232 yml = """# generated by cephadm
3233 # See https://prometheus.io/docs/alerting/configuration/ for documentation.
3234
3235 global:
3236 resolve_timeout: 5m
3237
3238 route:
3239 group_by: ['alertname']
3240 group_wait: 10s
3241 group_interval: 10s
3242 repeat_interval: 1h
3243 receiver: 'ceph-dashboard'
3244 receivers:
3245 - name: 'ceph-dashboard'
3246 webhook_configs:
3247 {urls}
3248 """.format(
3249 urls='\n'.join(
3250 [" - url: '{}api/prometheus_receiver'".format(u)
3251 for u in dashboard_urls]
3252 ))
3253 peers = []
3254 port = '9094'
3255 for dd in self.cache.get_daemons_by_service('alertmanager'):
3256 deps.append(dd.name())
3257 hi = self.inventory.get(dd.hostname, {})
3258 addr = hi.get('addr', dd.hostname)
3259 peers.append(addr.split(':')[0] + ':' + port)
3260 return {
3261 "files": {
3262 "alertmanager.yml": yml
3263 },
3264 "peers": peers
3265 }, sorted(deps)
3266
3267 def add_prometheus(self, spec):
3268 return self._add_daemon('prometheus', spec, self._create_prometheus)
3269
3270 def _create_prometheus(self, daemon_id, host):
3271 return self._create_daemon('prometheus', daemon_id, host)
3272
3273 @trivial_completion
3274 def apply_prometheus(self, spec):
3275 return self._apply(spec)
3276
3277 def add_node_exporter(self, spec):
3278 # type: (ServiceSpec) -> AsyncCompletion
3279 return self._add_daemon('node-exporter', spec,
3280 self._create_node_exporter)
3281
3282 @trivial_completion
3283 def apply_node_exporter(self, spec):
3284 return self._apply(spec)
3285
3286 def _create_node_exporter(self, daemon_id, host):
3287 return self._create_daemon('node-exporter', daemon_id, host)
3288
3289 def add_crash(self, spec):
3290 # type: (ServiceSpec) -> AsyncCompletion
3291 return self._add_daemon('crash', spec,
3292 self._create_crash)
3293
3294 @trivial_completion
3295 def apply_crash(self, spec):
3296 return self._apply(spec)
3297
3298 def _create_crash(self, daemon_id, host):
3299 ret, keyring, err = self.mon_command({
3300 'prefix': 'auth get-or-create',
3301 'entity': 'client.crash.' + host,
3302 'caps': ['mon', 'profile crash',
3303 'mgr', 'profile crash'],
3304 })
3305 return self._create_daemon('crash', daemon_id, host, keyring=keyring)
3306
3307 def add_grafana(self, spec):
3308 # type: (ServiceSpec) -> AsyncCompletion
3309 return self._add_daemon('grafana', spec, self._create_grafana)
3310
3311 @trivial_completion
3312 def apply_grafana(self, spec: ServiceSpec):
3313 return self._apply(spec)
3314
3315 def _create_grafana(self, daemon_id, host):
3316 # type: (str, str) -> str
3317 return self._create_daemon('grafana', daemon_id, host)
3318
3319 def add_alertmanager(self, spec):
3320 # type: (ServiceSpec) -> AsyncCompletion
3321 return self._add_daemon('alertmanager', spec, self._create_alertmanager)
3322
3323 @trivial_completion
3324 def apply_alertmanager(self, spec: ServiceSpec):
3325 return self._apply(spec)
3326
3327 def _create_alertmanager(self, daemon_id, host):
3328 return self._create_daemon('alertmanager', daemon_id, host)
3329
3330
3331 def _get_container_image_id(self, image_name):
3332 # pick a random host...
3333 host = None
3334 for host_name, hi in self.inventory.items():
3335 host = host_name
3336 break
3337 if not host:
3338 raise OrchestratorError('no hosts defined')
3339 out, err, code = self._run_cephadm(
3340 host, None, 'pull', [],
3341 image=image_name,
3342 no_fsid=True,
3343 error_ok=True)
3344 if code:
3345 raise OrchestratorError('Failed to pull %s on %s: %s' % (
3346 image_name, host, '\n'.join(out)))
3347 j = json.loads('\n'.join(out))
3348 image_id = j.get('image_id')
3349 ceph_version = j.get('ceph_version')
3350 self.log.debug('image %s -> id %s version %s' %
3351 (image_name, image_id, ceph_version))
3352 return image_id, ceph_version
3353
3354 @trivial_completion
3355 def upgrade_check(self, image, version):
3356 if version:
3357 target_name = self.container_image_base + ':v' + version
3358 elif image:
3359 target_name = image
3360 else:
3361 raise OrchestratorError('must specify either image or version')
3362
3363 target_id, target_version = self._get_container_image_id(target_name)
3364 self.log.debug('Target image %s id %s version %s' % (
3365 target_name, target_id, target_version))
3366 r = {
3367 'target_name': target_name,
3368 'target_id': target_id,
3369 'target_version': target_version,
3370 'needs_update': dict(),
3371 'up_to_date': list(),
3372 }
3373 for host, dm in self.cache.daemons.items():
3374 for name, dd in dm.items():
3375 if target_id == dd.container_image_id:
3376 r['up_to_date'].append(dd.name())
3377 else:
3378 r['needs_update'][dd.name()] = {
3379 'current_name': dd.container_image_name,
3380 'current_id': dd.container_image_id,
3381 'current_version': dd.version,
3382 }
3383 return json.dumps(r, indent=4, sort_keys=True)
3384
3385 @trivial_completion
3386 def upgrade_status(self):
3387 r = orchestrator.UpgradeStatusSpec()
3388 if self.upgrade_state:
3389 r.target_image = self.upgrade_state.get('target_name')
3390 r.in_progress = True
3391 if self.upgrade_state.get('error'):
3392 r.message = 'Error: ' + self.upgrade_state.get('error')
3393 elif self.upgrade_state.get('paused'):
3394 r.message = 'Upgrade paused'
3395 return r
3396
3397 @trivial_completion
3398 def upgrade_start(self, image, version):
3399 if self.mode != 'root':
3400 raise OrchestratorError('upgrade is not supported in %s mode' % (
3401 self.mode))
3402 if version:
3403 try:
3404 (major, minor, patch) = version.split('.')
3405 assert int(minor) >= 0
3406 assert int(patch) >= 0
3407 except:
3408 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
3409 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
3410 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
3411 target_name = self.container_image_base + ':v' + version
3412 elif image:
3413 target_name = image
3414 else:
3415 raise OrchestratorError('must specify either image or version')
3416 if self.upgrade_state:
3417 if self.upgrade_state.get('target_name') != target_name:
3418 raise OrchestratorError(
3419 'Upgrade to %s (not %s) already in progress' %
3420 (self.upgrade_state.get('target_name'), target_name))
3421 if self.upgrade_state.get('paused'):
3422 del self.upgrade_state['paused']
3423 self._save_upgrade_state()
3424 return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
3425 return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
3426 self.upgrade_state = {
3427 'target_name': target_name,
3428 'progress_id': str(uuid.uuid4()),
3429 }
3430 self._update_upgrade_progress(0.0)
3431 self._save_upgrade_state()
3432 self._clear_upgrade_health_checks()
3433 self.event.set()
3434 return 'Initiating upgrade to %s' % (target_name)
3435
3436 @trivial_completion
3437 def upgrade_pause(self):
3438 if not self.upgrade_state:
3439 raise OrchestratorError('No upgrade in progress')
3440 if self.upgrade_state.get('paused'):
3441 return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
3442 self.upgrade_state['paused'] = True
3443 self._save_upgrade_state()
3444 return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
3445
3446 @trivial_completion
3447 def upgrade_resume(self):
3448 if not self.upgrade_state:
3449 raise OrchestratorError('No upgrade in progress')
3450 if not self.upgrade_state.get('paused'):
3451 return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
3452 del self.upgrade_state['paused']
3453 self._save_upgrade_state()
3454 self.event.set()
3455 return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
3456
3457 @trivial_completion
3458 def upgrade_stop(self):
3459 if not self.upgrade_state:
3460 return 'No upgrade in progress'
3461 target_name = self.upgrade_state.get('target_name')
3462 if 'progress_id' in self.upgrade_state:
3463 self.remote('progress', 'complete',
3464 self.upgrade_state['progress_id'])
3465 self.upgrade_state = None
3466 self._save_upgrade_state()
3467 self._clear_upgrade_health_checks()
3468 self.event.set()
3469 return 'Stopped upgrade to %s' % target_name
3470
3471 @trivial_completion
3472 def remove_osds(self, osd_ids: List[str],
3473 replace: bool = False,
3474 force: bool = False):
3475 """
3476 Takes a list of OSDs and schedules them for removal.
3477 The function that takes care of the actual removal is
3478 _remove_osds_bg().
3479 """
3480
3481 daemons = self.cache.get_daemons_by_service('osd')
3482 found: Set[OSDRemoval] = set()
3483 for daemon in daemons:
3484 if daemon.daemon_id not in osd_ids:
3485 continue
3486 found.add(OSDRemoval(daemon.daemon_id, replace, force,
3487 daemon.hostname, daemon.name(),
3488 datetime.datetime.utcnow(), -1))
3489
3490 not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
3491 if not_found:
3492 raise OrchestratorError('Unable to find OSD: %s' % not_found)
3493
3494 self.rm_util.queue_osds_for_removal(found)
3495
3496 # trigger the serve loop to initiate the removal
3497 self._kick_serve_loop()
3498 return "Scheduled OSD(s) for removal"
3499
3500 @trivial_completion
3501 def remove_osds_status(self):
3502 """
3503 The CLI call to retrieve an osd removal report
3504 """
3505 return self.rm_util.report
3506
3507
3508 class BaseScheduler(object):
3509 """
3510 Base Scheduler Interface
3511
3512 * requires a placement_spec
3513
3514 `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
3515 """
3516
3517 def __init__(self, placement_spec):
3518 # type: (PlacementSpec) -> None
3519 self.placement_spec = placement_spec
3520
3521 def place(self, host_pool, count=None):
3522 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3523 raise NotImplementedError
3524
3525
3526 class SimpleScheduler(BaseScheduler):
3527 """
3528 The most simple way to pick/schedule a set of hosts.
3529 1) Shuffle the provided host_pool
3530 2) Select from list up to :count
3531 """
3532 def __init__(self, placement_spec):
3533 super(SimpleScheduler, self).__init__(placement_spec)
3534
3535 def place(self, host_pool, count=None):
3536 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3537 if not host_pool:
3538 return []
3539 host_pool = [x for x in host_pool]
3540 # shuffle for pseudo random selection
3541 random.shuffle(host_pool)
3542 return host_pool[:count]
3543
3544
3545 class HostAssignment(object):
3546 """
3547 A class to detect if hosts are being passed imperative or declarative
3548 If the spec is populated via the `hosts/hosts` field it will not load
3549 any hosts into the list.
3550 If the spec isn't populated, i.e. when only num or label is present (declarative)
3551 it will use the provided `get_host_func` to load it from the inventory.
3552
3553 Schedulers can be assigned to pick hosts from the pool.
3554 """
3555
3556 def __init__(self,
3557 spec, # type: ServiceSpec
3558 get_hosts_func, # type: Callable[[Optional[str]],List[str]]
3559 get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
3560
3561 filter_new_host=None, # type: Optional[Callable[[str],bool]]
3562 scheduler=None, # type: Optional[BaseScheduler]
3563 ):
3564 assert spec and get_hosts_func and get_daemons_func
3565 self.spec = spec # type: ServiceSpec
3566 self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
3567 self.get_hosts_func = get_hosts_func
3568 self.get_daemons_func = get_daemons_func
3569 self.filter_new_host = filter_new_host
3570 self.service_name = spec.service_name()
3571
3572
3573 def validate(self):
3574 self.spec.validate()
3575
3576 if self.spec.placement.hosts:
3577 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
3578 unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func(None)))
3579 if unknown_hosts:
3580 raise OrchestratorValidationError(
3581 f'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
3582
3583 if self.spec.placement.host_pattern:
3584 pattern_hostnames = self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
3585 if not pattern_hostnames:
3586 raise OrchestratorValidationError(
3587 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
3588
3589 if self.spec.placement.label:
3590 label_hostnames = self.get_hosts_func(self.spec.placement.label)
3591 if not label_hostnames:
3592 raise OrchestratorValidationError(
3593 f'Cannot place {self.spec.one_line_str()}: No matching '
3594 f'hosts for label {self.spec.placement.label}')
3595
3596 def place(self):
3597 # type: () -> List[HostPlacementSpec]
3598 """
3599 Load hosts into the spec.placement.hosts container.
3600 """
3601
3602 self.validate()
3603
3604 # count == 0
3605 if self.spec.placement.count == 0:
3606 return []
3607
3608 # respect any explicit host list
3609 if self.spec.placement.hosts and not self.spec.placement.count:
3610 logger.debug('Provided hosts: %s' % self.spec.placement.hosts)
3611 return self.spec.placement.hosts
3612
3613 # respect host_pattern
3614 if self.spec.placement.host_pattern:
3615 candidates = [
3616 HostPlacementSpec(x, '', '')
3617 for x in self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
3618 ]
3619 logger.debug('All hosts: {}'.format(candidates))
3620 return candidates
3621
3622 count = 0
3623 if self.spec.placement.hosts and \
3624 self.spec.placement.count and \
3625 len(self.spec.placement.hosts) >= self.spec.placement.count:
3626 hosts = self.spec.placement.hosts
3627 logger.debug('place %d over provided host list: %s' % (
3628 count, hosts))
3629 count = self.spec.placement.count
3630 elif self.spec.placement.label:
3631 hosts = [
3632 HostPlacementSpec(x, '', '')
3633 for x in self.get_hosts_func(self.spec.placement.label)
3634 ]
3635 if not self.spec.placement.count:
3636 logger.debug('Labeled hosts: {}'.format(hosts))
3637 return hosts
3638 count = self.spec.placement.count
3639 logger.debug('place %d over label %s: %s' % (
3640 count, self.spec.placement.label, hosts))
3641 else:
3642 hosts = [
3643 HostPlacementSpec(x, '', '')
3644 for x in self.get_hosts_func(None)
3645 ]
3646 if self.spec.placement.count:
3647 count = self.spec.placement.count
3648 else:
3649 # this should be a totally empty spec given all of the
3650 # alternative paths above.
3651 assert self.spec.placement.count is None
3652 assert not self.spec.placement.hosts
3653 assert not self.spec.placement.label
3654 count = 1
3655 logger.debug('place %d over all hosts: %s' % (count, hosts))
3656
3657 # we need to select a subset of the candidates
3658
3659 # if a partial host list is provided, always start with that
3660 if len(self.spec.placement.hosts) < count:
3661 chosen = self.spec.placement.hosts
3662 else:
3663 chosen = []
3664
3665 # prefer hosts that already have services
3666 daemons = self.get_daemons_func(self.service_name)
3667 hosts_with_daemons = {d.hostname for d in daemons}
3668 # calc existing daemons (that aren't already in chosen)
3669 chosen_hosts = [hs.hostname for hs in chosen]
3670 existing = [hs for hs in hosts
3671 if hs.hostname in hosts_with_daemons and \
3672 hs.hostname not in chosen_hosts]
3673 if len(chosen + existing) >= count:
3674 chosen = chosen + self.scheduler.place(
3675 existing,
3676 count - len(chosen))
3677 logger.debug('Hosts with existing daemons: {}'.format(chosen))
3678 return chosen
3679
3680 need = count - len(existing + chosen)
3681 others = [hs for hs in hosts
3682 if hs.hostname not in hosts_with_daemons]
3683 if self.filter_new_host:
3684 old = others
3685 others = [h for h in others if self.filter_new_host(h.hostname)]
3686 logger.debug('filtered %s down to %s' % (old, hosts))
3687 chosen = chosen + self.scheduler.place(others, need)
3688 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
3689 existing, chosen))
3690 return existing + chosen