]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/module.py
bump version to 15.2.1-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
1 import json
2 import errno
3 import logging
4 import time
5 import yaml
6 from threading import Event
7 from functools import wraps
8
9 from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
10
11 import string
12 try:
13 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, \
14 Any, NamedTuple, Iterator, Set, Sequence
15 from typing import TYPE_CHECKING, cast
16 except ImportError:
17 TYPE_CHECKING = False # just for type checking
18
19
20 import datetime
21 import six
22 import os
23 import random
24 import tempfile
25 import multiprocessing.pool
26 import re
27 import shutil
28 import subprocess
29 import uuid
30
31 from ceph.deployment import inventory, translate
32 from ceph.deployment.drive_group import DriveGroupSpec
33 from ceph.deployment.drive_selection import selector
34 from ceph.deployment.service_spec import \
35 HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
36
37 from mgr_module import MgrModule, HandleCommandResult
38 import orchestrator
39 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
40 CLICommandMeta
41
42 from . import remotes
43 from . import utils
44 from .nfs import NFSGanesha
45 from .osd import RemoveUtil, OSDRemoval
46
47
48 try:
49 import remoto
50 import remoto.process
51 import execnet.gateway_bootstrap
52 except ImportError as e:
53 remoto = None
54 remoto_import_error = str(e)
55
56 try:
57 from typing import List
58 except ImportError:
59 pass
60
61 logger = logging.getLogger(__name__)
62
63 DEFAULT_SSH_CONFIG = ('Host *\n'
64 'User root\n'
65 'StrictHostKeyChecking no\n'
66 'UserKnownHostsFile /dev/null\n')
67
68 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
69 CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
70
71 HOST_CACHE_PREFIX = "host."
72 SPEC_STORE_PREFIX = "spec."
73
74 # ceph daemon types that use the ceph container image.
75 # NOTE: listed in upgrade order!
76 CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
77 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
78
79
80 # for py2 compat
81 try:
82 from tempfile import TemporaryDirectory # py3
83 except ImportError:
84 # define a minimal (but sufficient) equivalent for <= py 3.2
85 class TemporaryDirectory(object): # type: ignore
86 def __init__(self):
87 self.name = tempfile.mkdtemp()
88
89 def __enter__(self):
90 if not self.name:
91 self.name = tempfile.mkdtemp()
92 return self.name
93
94 def cleanup(self):
95 shutil.rmtree(self.name)
96
97 def __exit__(self, exc_type, exc_value, traceback):
98 self.cleanup()
99
100
101 class SpecStore():
102 def __init__(self, mgr):
103 # type: (CephadmOrchestrator) -> None
104 self.mgr = mgr
105 self.specs = {} # type: Dict[str, ServiceSpec]
106 self.spec_created = {} # type: Dict[str, datetime.datetime]
107
108 def load(self):
109 # type: () -> None
110 for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
111 service_name = k[len(SPEC_STORE_PREFIX):]
112 try:
113 v = json.loads(v)
114 spec = ServiceSpec.from_json(v['spec'])
115 created = datetime.datetime.strptime(v['created'], DATEFMT)
116 self.specs[service_name] = spec
117 self.spec_created[service_name] = created
118 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
119 service_name))
120 except Exception as e:
121 self.mgr.log.warning('unable to load spec for %s: %s' % (
122 service_name, e))
123 pass
124
125 def save(self, spec):
126 # type: (ServiceSpec) -> None
127 self.specs[spec.service_name()] = spec
128 self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
129 self.mgr.set_store(
130 SPEC_STORE_PREFIX + spec.service_name(),
131 json.dumps({
132 'spec': spec.to_json(),
133 'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
134 }, sort_keys=True),
135 )
136
137 def rm(self, service_name):
138 # type: (str) -> None
139 if service_name in self.specs:
140 del self.specs[service_name]
141 del self.spec_created[service_name]
142 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
143
144 def find(self, service_name=None):
145 # type: (Optional[str]) -> List[ServiceSpec]
146 specs = []
147 for sn, spec in self.specs.items():
148 if not service_name or \
149 sn == service_name or \
150 sn.startswith(service_name + '.'):
151 specs.append(spec)
152 self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
153 service_name, specs))
154 return specs
155
156 class HostCache():
157 def __init__(self, mgr):
158 # type: (CephadmOrchestrator) -> None
159 self.mgr = mgr
160 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
161 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
162 self.devices = {} # type: Dict[str, List[inventory.Device]]
163 self.networks = {} # type: Dict[str, Dict[str, List[str]]]
164 self.last_device_update = {} # type: Dict[str, datetime.datetime]
165 self.daemon_refresh_queue = [] # type: List[str]
166 self.device_refresh_queue = [] # type: List[str]
167 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
168 self.last_host_check = {} # type: Dict[str, datetime.datetime]
169
170 def load(self):
171 # type: () -> None
172 for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
173 host = k[len(HOST_CACHE_PREFIX):]
174 if host not in self.mgr.inventory:
175 self.mgr.log.warning('removing stray HostCache host record %s' % (
176 host))
177 self.mgr.set_store(k, None)
178 try:
179 j = json.loads(v)
180 if 'last_device_update' in j:
181 self.last_device_update[host] = datetime.datetime.strptime(
182 j['last_device_update'], DATEFMT)
183 else:
184 self.device_refresh_queue.append(host)
185 # for services, we ignore the persisted last_*_update
186 # and always trigger a new scrape on mgr restart.
187 self.daemon_refresh_queue.append(host)
188 self.daemons[host] = {}
189 self.devices[host] = []
190 self.networks[host] = {}
191 self.daemon_config_deps[host] = {}
192 for name, d in j.get('daemons', {}).items():
193 self.daemons[host][name] = \
194 orchestrator.DaemonDescription.from_json(d)
195 for d in j.get('devices', []):
196 self.devices[host].append(inventory.Device.from_json(d))
197 self.networks[host] = j.get('networks', {})
198 for name, d in j.get('daemon_config_deps', {}).items():
199 self.daemon_config_deps[host][name] = {
200 'deps': d.get('deps', []),
201 'last_config': datetime.datetime.strptime(
202 d['last_config'], DATEFMT),
203 }
204 if 'last_host_check' in j:
205 self.last_host_check[host] = datetime.datetime.strptime(
206 j['last_host_check'], DATEFMT)
207 self.mgr.log.debug(
208 'HostCache.load: host %s has %d daemons, '
209 '%d devices, %d networks' % (
210 host, len(self.daemons[host]), len(self.devices[host]),
211 len(self.networks[host])))
212 except Exception as e:
213 self.mgr.log.warning('unable to load cached state for %s: %s' % (
214 host, e))
215 pass
216
217 def update_host_daemons(self, host, dm):
218 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
219 self.daemons[host] = dm
220 self.last_daemon_update[host] = datetime.datetime.utcnow()
221
222 def update_host_devices_networks(self, host, dls, nets):
223 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
224 self.devices[host] = dls
225 self.networks[host] = nets
226 self.last_device_update[host] = datetime.datetime.utcnow()
227
228 def update_daemon_config_deps(self, host, name, deps, stamp):
229 self.daemon_config_deps[host][name] = {
230 'deps': deps,
231 'last_config': stamp,
232 }
233
234 def update_last_host_check(self, host):
235 # type: (str) -> None
236 self.last_host_check[host] = datetime.datetime.utcnow()
237
238 def prime_empty_host(self, host):
239 # type: (str) -> None
240 """
241 Install an empty entry for a host
242 """
243 self.daemons[host] = {}
244 self.devices[host] = []
245 self.networks[host] = {}
246 self.daemon_config_deps[host] = {}
247 self.daemon_refresh_queue.append(host)
248 self.device_refresh_queue.append(host)
249
250 def invalidate_host_daemons(self, host):
251 # type: (str) -> None
252 self.daemon_refresh_queue.append(host)
253 if host in self.last_daemon_update:
254 del self.last_daemon_update[host]
255 self.mgr.event.set()
256
257 def invalidate_host_devices(self, host):
258 # type: (str) -> None
259 self.device_refresh_queue.append(host)
260 if host in self.last_device_update:
261 del self.last_device_update[host]
262 self.mgr.event.set()
263
264 def save_host(self, host):
265 # type: (str) -> None
266 j = { # type: ignore
267 'daemons': {},
268 'devices': [],
269 'daemon_config_deps': {},
270 }
271 if host in self.last_daemon_update:
272 j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
273 if host in self.last_device_update:
274 j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
275 for name, dd in self.daemons[host].items():
276 j['daemons'][name] = dd.to_json() # type: ignore
277 for d in self.devices[host]:
278 j['devices'].append(d.to_json()) # type: ignore
279 j['networks'] = self.networks[host]
280 for name, depi in self.daemon_config_deps[host].items():
281 j['daemon_config_deps'][name] = { # type: ignore
282 'deps': depi.get('deps', []),
283 'last_config': depi['last_config'].strftime(DATEFMT),
284 }
285 if host in self.last_host_check:
286 j['last_host_check']= self.last_host_check[host].strftime(DATEFMT)
287 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
288
289 def rm_host(self, host):
290 # type: (str) -> None
291 if host in self.daemons:
292 del self.daemons[host]
293 if host in self.devices:
294 del self.devices[host]
295 if host in self.networks:
296 del self.networks[host]
297 if host in self.last_daemon_update:
298 del self.last_daemon_update[host]
299 if host in self.last_device_update:
300 del self.last_device_update[host]
301 if host in self.daemon_config_deps:
302 del self.daemon_config_deps[host]
303 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
304
305 def get_hosts(self):
306 # type: () -> List[str]
307 r = []
308 for host, di in self.daemons.items():
309 r.append(host)
310 return r
311
312 def get_daemons(self):
313 # type: () -> List[orchestrator.DaemonDescription]
314 r = []
315 for host, dm in self.daemons.items():
316 for name, dd in dm.items():
317 r.append(dd)
318 return r
319
320 def get_daemons_by_service(self, service_name):
321 # type: (str) -> List[orchestrator.DaemonDescription]
322 result = [] # type: List[orchestrator.DaemonDescription]
323 for host, dm in self.daemons.items():
324 for name, d in dm.items():
325 if name.startswith(service_name + '.'):
326 result.append(d)
327 return result
328
329 def get_daemon_names(self):
330 # type: () -> List[str]
331 r = []
332 for host, dm in self.daemons.items():
333 for name, dd in dm.items():
334 r.append(name)
335 return r
336
337 def get_daemon_last_config_deps(self, host, name):
338 if host in self.daemon_config_deps:
339 if name in self.daemon_config_deps[host]:
340 return self.daemon_config_deps[host][name].get('deps', []), \
341 self.daemon_config_deps[host][name].get('last_config', None)
342 return None, None
343
344 def host_needs_daemon_refresh(self, host):
345 # type: (str) -> bool
346 if host in self.daemon_refresh_queue:
347 self.daemon_refresh_queue.remove(host)
348 return True
349 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
350 seconds=self.mgr.daemon_cache_timeout)
351 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
352 return True
353 return False
354
355 def host_needs_device_refresh(self, host):
356 # type: (str) -> bool
357 if host in self.device_refresh_queue:
358 self.device_refresh_queue.remove(host)
359 return True
360 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
361 seconds=self.mgr.device_cache_timeout)
362 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
363 return True
364 return False
365
366 def host_needs_check(self, host):
367 # type: (str) -> bool
368 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
369 seconds=self.mgr.host_check_interval)
370 return host not in self.last_host_check or self.last_host_check[host] < cutoff
371
372 def add_daemon(self, host, dd):
373 # type: (str, orchestrator.DaemonDescription) -> None
374 assert host in self.daemons
375 self.daemons[host][dd.name()] = dd
376
377 def rm_daemon(self, host, name):
378 if host in self.daemons:
379 if name in self.daemons[host]:
380 del self.daemons[host][name]
381
382
383 class AsyncCompletion(orchestrator.Completion):
384 def __init__(self,
385 _first_promise=None, # type: Optional[orchestrator.Completion]
386 value=orchestrator._Promise.NO_RESULT, # type: Any
387 on_complete=None, # type: Optional[Callable]
388 name=None, # type: Optional[str]
389 many=False, # type: bool
390 update_progress=False, # type: bool
391 ):
392
393 assert CephadmOrchestrator.instance is not None
394 self.many = many
395 self.update_progress = update_progress
396 if name is None and on_complete is not None:
397 name = getattr(on_complete, '__name__', None)
398 super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
399
400 @property
401 def _progress_reference(self):
402 # type: () -> Optional[orchestrator.ProgressReference]
403 if hasattr(self._on_complete_, 'progress_id'): # type: ignore
404 return self._on_complete_ # type: ignore
405 return None
406
407 @property
408 def _on_complete(self):
409 # type: () -> Optional[Callable]
410 if self._on_complete_ is None:
411 return None
412
413 def callback(result):
414 try:
415 if self.update_progress:
416 assert self.progress_reference
417 self.progress_reference.progress = 1.0
418 self._on_complete_ = None
419 self._finalize(result)
420 except Exception as e:
421 try:
422 self.fail(e)
423 except Exception:
424 logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<')
425 if 'UNITTEST' in os.environ:
426 assert False
427
428 def error_callback(e):
429 pass
430
431 def run(value):
432 def do_work(*args, **kwargs):
433 assert self._on_complete_ is not None
434 try:
435 res = self._on_complete_(*args, **kwargs)
436 if self.update_progress and self.many:
437 assert self.progress_reference
438 self.progress_reference.progress += 1.0 / len(value)
439 return res
440 except Exception as e:
441 self.fail(e)
442 raise
443
444 assert CephadmOrchestrator.instance
445 if self.many:
446 if not value:
447 logger.info('calling map_async without values')
448 callback([])
449 if six.PY3:
450 CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
451 callback=callback,
452 error_callback=error_callback)
453 else:
454 CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
455 callback=callback)
456 else:
457 if six.PY3:
458 CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
459 callback=callback, error_callback=error_callback)
460 else:
461 CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
462 callback=callback)
463 return self.ASYNC_RESULT
464
465 return run
466
467 @_on_complete.setter
468 def _on_complete(self, inner):
469 # type: (Callable) -> None
470 self._on_complete_ = inner
471
472
473 def ssh_completion(cls=AsyncCompletion, **c_kwargs):
474 # type: (Type[orchestrator.Completion], Any) -> Callable
475 """
476 See ./HACKING.rst for a how-to
477 """
478 def decorator(f):
479 @wraps(f)
480 def wrapper(*args):
481
482 name = f.__name__
483 many = c_kwargs.get('many', False)
484
485 # Some weired logic to make calling functions with multiple arguments work.
486 if len(args) == 1:
487 [value] = args
488 if many and value and isinstance(value[0], tuple):
489 return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
490 else:
491 return cls(on_complete=f, value=value, name=name, **c_kwargs)
492 else:
493 if many:
494 self, value = args
495
496 def call_self(inner_args):
497 if not isinstance(inner_args, tuple):
498 inner_args = (inner_args, )
499 return f(self, *inner_args)
500
501 return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
502 else:
503 return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
504
505 return wrapper
506 return decorator
507
508
509 def async_completion(f):
510 # type: (Callable) -> Callable[..., AsyncCompletion]
511 """
512 See ./HACKING.rst for a how-to
513
514 :param f: wrapped function
515 """
516 return ssh_completion()(f)
517
518
519 def async_map_completion(f):
520 # type: (Callable) -> Callable[..., AsyncCompletion]
521 """
522 See ./HACKING.rst for a how-to
523
524 :param f: wrapped function
525
526 kind of similar to
527
528 >>> def sync_map(f):
529 ... return lambda x: map(f, x)
530
531 """
532 return ssh_completion(many=True)(f)
533
534
535 def trivial_completion(f):
536 # type: (Callable) -> Callable[..., orchestrator.Completion]
537 @wraps(f)
538 def wrapper(*args, **kwargs):
539 return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
540 return wrapper
541
542
543 @six.add_metaclass(CLICommandMeta)
544 class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
545
546 _STORE_HOST_PREFIX = "host"
547
548 instance = None
549 NATIVE_OPTIONS = [] # type: List[Any]
550 MODULE_OPTIONS = [
551 {
552 'name': 'ssh_config_file',
553 'type': 'str',
554 'default': None,
555 'desc': 'customized SSH config file to connect to managed hosts',
556 },
557 {
558 'name': 'device_cache_timeout',
559 'type': 'secs',
560 'default': 30 * 60,
561 'desc': 'seconds to cache device inventory',
562 },
563 {
564 'name': 'daemon_cache_timeout',
565 'type': 'secs',
566 'default': 10 * 60,
567 'desc': 'seconds to cache service (daemon) inventory',
568 },
569 {
570 'name': 'host_check_interval',
571 'type': 'secs',
572 'default': 10 * 60,
573 'desc': 'how frequently to perform a host check',
574 },
575 {
576 'name': 'mode',
577 'type': 'str',
578 'enum_allowed': ['root', 'cephadm-package'],
579 'default': 'root',
580 'desc': 'mode for remote execution of cephadm',
581 },
582 {
583 'name': 'container_image_base',
584 'default': 'docker.io/ceph/ceph',
585 'desc': 'Container image name, without the tag',
586 'runtime': True,
587 },
588 {
589 'name': 'warn_on_stray_hosts',
590 'type': 'bool',
591 'default': True,
592 'desc': 'raise a health warning if daemons are detected on a host '
593 'that is not managed by cephadm',
594 },
595 {
596 'name': 'warn_on_stray_daemons',
597 'type': 'bool',
598 'default': True,
599 'desc': 'raise a health warning if daemons are detected '
600 'that are not managed by cephadm',
601 },
602 {
603 'name': 'warn_on_failed_host_check',
604 'type': 'bool',
605 'default': True,
606 'desc': 'raise a health warning if the host check fails',
607 },
608 {
609 'name': 'log_to_cluster',
610 'type': 'bool',
611 'default': True,
612 'desc': 'log to the "cephadm" cluster log channel"',
613 },
614 {
615 'name': 'allow_ptrace',
616 'type': 'bool',
617 'default': False,
618 'desc': 'allow SYS_PTRACE capability on ceph containers',
619 'long_desc': 'The SYS_PTRACE capability is needed to attach to a '
620 'process with gdb or strace. Enabling this options '
621 'can allow debugging daemons that encounter problems '
622 'at runtime.',
623 },
624 {
625 'name': 'prometheus_alerts_path',
626 'type': 'str',
627 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
628 'desc': 'location of alerts to include in prometheus deployments',
629 },
630 ]
631
632 def __init__(self, *args, **kwargs):
633 super(CephadmOrchestrator, self).__init__(*args, **kwargs)
634 self._cluster_fsid = self.get('mon_map')['fsid']
635
636 # for serve()
637 self.run = True
638 self.event = Event()
639
640 if self.get_store('pause'):
641 self.paused = True
642 else:
643 self.paused = False
644
645 # for mypy which does not run the code
646 if TYPE_CHECKING:
647 self.ssh_config_file = None # type: Optional[str]
648 self.device_cache_timeout = 0
649 self.daemon_cache_timeout = 0
650 self.host_check_interval = 0
651 self.mode = ''
652 self.container_image_base = ''
653 self.warn_on_stray_hosts = True
654 self.warn_on_stray_daemons = True
655 self.warn_on_failed_host_check = True
656 self.allow_ptrace = False
657 self.prometheus_alerts_path = ''
658
659 self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
660
661 self.config_notify()
662
663 path = self.get_ceph_option('cephadm_path')
664 try:
665 with open(path, 'r') as f:
666 self._cephadm = f.read()
667 except (IOError, TypeError) as e:
668 raise RuntimeError("unable to read cephadm at '%s': %s" % (
669 path, str(e)))
670
671 self._worker_pool = multiprocessing.pool.ThreadPool(10)
672
673 self._reconfig_ssh()
674
675 CephadmOrchestrator.instance = self
676
677 t = self.get_store('upgrade_state')
678 if t:
679 self.upgrade_state = json.loads(t)
680 else:
681 self.upgrade_state = None
682
683 self.health_checks = {}
684
685 self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
686
687 # load inventory
688 i = self.get_store('inventory')
689 if i:
690 self.inventory: Dict[str, dict] = json.loads(i)
691 else:
692 self.inventory = dict()
693 self.log.debug('Loaded inventory %s' % self.inventory)
694
695 self.cache = HostCache(self)
696 self.cache.load()
697 self.rm_util = RemoveUtil(self)
698
699 self.spec_store = SpecStore(self)
700 self.spec_store.load()
701
702 # ensure the host lists are in sync
703 for h in self.inventory.keys():
704 if h not in self.cache.daemons:
705 self.cache.prime_empty_host(h)
706 for h in self.cache.get_hosts():
707 if h not in self.inventory:
708 self.cache.rm_host(h)
709
710 def shutdown(self):
711 self.log.debug('shutdown')
712 self._worker_pool.close()
713 self._worker_pool.join()
714 self.run = False
715 self.event.set()
716
717 def _kick_serve_loop(self):
718 self.log.debug('_kick_serve_loop')
719 self.event.set()
720
721 def _check_safe_to_destroy_mon(self, mon_id):
722 # type: (str) -> None
723 ret, out, err = self.mon_command({
724 'prefix': 'quorum_status',
725 })
726 if ret:
727 raise OrchestratorError('failed to check mon quorum status')
728 try:
729 j = json.loads(out)
730 except Exception as e:
731 raise OrchestratorError('failed to parse quorum status')
732
733 mons = [m['name'] for m in j['monmap']['mons']]
734 if mon_id not in mons:
735 self.log.info('Safe to remove mon.%s: not in monmap (%s)' % (
736 mon_id, mons))
737 return
738 new_mons = [m for m in mons if m != mon_id]
739 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
740 if len(new_quorum) > len(new_mons) / 2:
741 self.log.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
742 return
743 raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
744
745 def _wait_for_ok_to_stop(self, s):
746 # only wait a little bit; the service might go away for something
747 tries = 4
748 while tries > 0:
749 if s.daemon_type not in ['mon', 'osd', 'mds']:
750 self.log.info('Upgrade: It is presumed safe to stop %s.%s' %
751 (s.daemon_type, s.daemon_id))
752 return True
753 ret, out, err = self.mon_command({
754 'prefix': '%s ok-to-stop' % s.daemon_type,
755 'ids': [s.daemon_id],
756 })
757 if not self.upgrade_state or self.upgrade_state.get('paused'):
758 return False
759 if ret:
760 self.log.info('Upgrade: It is NOT safe to stop %s.%s' %
761 (s.daemon_type, s.daemon_id))
762 time.sleep(15)
763 tries -= 1
764 else:
765 self.log.info('Upgrade: It is safe to stop %s.%s' %
766 (s.daemon_type, s.daemon_id))
767 return True
768 return False
769
770 def _clear_upgrade_health_checks(self):
771 for k in ['UPGRADE_NO_STANDBY_MGR',
772 'UPGRADE_FAILED_PULL']:
773 if k in self.health_checks:
774 del self.health_checks[k]
775 self.set_health_checks(self.health_checks)
776
777 def _fail_upgrade(self, alert_id, alert):
778 self.log.error('Upgrade: Paused due to %s: %s' % (alert_id,
779 alert['summary']))
780 self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
781 self.upgrade_state['paused'] = True
782 self._save_upgrade_state()
783 self.health_checks[alert_id] = alert
784 self.set_health_checks(self.health_checks)
785
786 def _update_upgrade_progress(self, progress):
787 if 'progress_id' not in self.upgrade_state:
788 self.upgrade_state['progress_id'] = str(uuid.uuid4())
789 self._save_upgrade_state()
790 self.remote('progress', 'update', self.upgrade_state['progress_id'],
791 ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
792 ev_progress=progress)
793
794 def _do_upgrade(self):
795 # type: () -> None
796 if not self.upgrade_state:
797 self.log.debug('_do_upgrade no state, exiting')
798 return
799
800 target_name = self.upgrade_state.get('target_name')
801 target_id = self.upgrade_state.get('target_id', None)
802 if not target_id:
803 # need to learn the container hash
804 self.log.info('Upgrade: First pull of %s' % target_name)
805 try:
806 target_id, target_version = self._get_container_image_id(target_name)
807 except OrchestratorError as e:
808 self._fail_upgrade('UPGRADE_FAILED_PULL', {
809 'severity': 'warning',
810 'summary': 'Upgrade: failed to pull target image',
811 'count': 1,
812 'detail': [str(e)],
813 })
814 return
815 self.upgrade_state['target_id'] = target_id
816 self.upgrade_state['target_version'] = target_version
817 self._save_upgrade_state()
818 target_version = self.upgrade_state.get('target_version')
819 self.log.info('Upgrade: Target is %s with id %s' % (target_name,
820 target_id))
821
822 # get all distinct container_image settings
823 image_settings = {}
824 ret, out, err = self.mon_command({
825 'prefix': 'config dump',
826 'format': 'json',
827 })
828 config = json.loads(out)
829 for opt in config:
830 if opt['name'] == 'container_image':
831 image_settings[opt['section']] = opt['value']
832
833 daemons = self.cache.get_daemons()
834 done = 0
835 for daemon_type in CEPH_UPGRADE_ORDER:
836 self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
837 need_upgrade_self = False
838 for d in daemons:
839 if d.daemon_type != daemon_type:
840 continue
841 if d.container_image_id == target_id:
842 self.log.debug('daemon %s.%s version correct' % (
843 daemon_type, d.daemon_id))
844 done += 1
845 continue
846 self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % (
847 daemon_type, d.daemon_id,
848 d.container_image_name, d.container_image_id, d.version))
849
850 if daemon_type == 'mgr' and \
851 d.daemon_id == self.get_mgr_id():
852 self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
853 self.get_mgr_id())
854 need_upgrade_self = True
855 continue
856
857 # make sure host has latest container image
858 out, err, code = self._run_cephadm(
859 d.hostname, None, 'inspect-image', [],
860 image=target_name, no_fsid=True, error_ok=True)
861 if code or json.loads(''.join(out)).get('image_id') != target_id:
862 self.log.info('Upgrade: Pulling %s on %s' % (target_name,
863 d.hostname))
864 out, err, code = self._run_cephadm(
865 d.hostname, None, 'pull', [],
866 image=target_name, no_fsid=True, error_ok=True)
867 if code:
868 self._fail_upgrade('UPGRADE_FAILED_PULL', {
869 'severity': 'warning',
870 'summary': 'Upgrade: failed to pull target image',
871 'count': 1,
872 'detail': [
873 'failed to pull %s on host %s' % (target_name,
874 d.hostname)],
875 })
876 return
877 r = json.loads(''.join(out))
878 if r.get('image_id') != target_id:
879 self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
880 self.upgrade_state['target_id'] = r['image_id']
881 self._save_upgrade_state()
882 return
883
884 self._update_upgrade_progress(done / len(daemons))
885
886 if not d.container_image_id:
887 if d.container_image_name == target_name:
888 self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
889 continue
890 if not self._wait_for_ok_to_stop(d):
891 return
892 self.log.info('Upgrade: Redeploying %s.%s' %
893 (d.daemon_type, d.daemon_id))
894 ret, out, err = self.mon_command({
895 'prefix': 'config set',
896 'name': 'container_image',
897 'value': target_name,
898 'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id),
899 })
900 self._daemon_action(
901 d.daemon_type,
902 d.daemon_id,
903 d.hostname,
904 'redeploy'
905 )
906 return
907
908 if need_upgrade_self:
909 mgr_map = self.get('mgr_map')
910 num = len(mgr_map.get('standbys'))
911 if not num:
912 self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
913 'severity': 'warning',
914 'summary': 'Upgrade: Need standby mgr daemon',
915 'count': 1,
916 'detail': [
917 'The upgrade process needs to upgrade the mgr, '
918 'but it needs at least one standby to proceed.',
919 ],
920 })
921 return
922
923 self.log.info('Upgrade: there are %d other already-upgraded '
924 'standby mgrs, failing over' % num)
925
926 self._update_upgrade_progress(done / len(daemons))
927
928 # fail over
929 ret, out, err = self.mon_command({
930 'prefix': 'mgr fail',
931 'who': self.get_mgr_id(),
932 })
933 return
934 elif daemon_type == 'mgr':
935 if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
936 del self.health_checks['UPGRADE_NO_STANDBY_MGR']
937 self.set_health_checks(self.health_checks)
938
939 # make sure 'ceph versions' agrees
940 ret, out, err = self.mon_command({
941 'prefix': 'versions',
942 })
943 j = json.loads(out)
944 for version, count in j.get(daemon_type, {}).items():
945 if version != target_version:
946 self.log.warning(
947 'Upgrade: %d %s daemon(s) are %s != target %s' %
948 (count, daemon_type, version, target_version))
949
950 # push down configs
951 if image_settings.get(daemon_type) != target_name:
952 self.log.info('Upgrade: Setting container_image for all %s...' %
953 daemon_type)
954 ret, out, err = self.mon_command({
955 'prefix': 'config set',
956 'name': 'container_image',
957 'value': target_name,
958 'who': daemon_type,
959 })
960 to_clean = []
961 for section in image_settings.keys():
962 if section.startswith(utils.name_to_config_section(daemon_type) + '.'):
963 to_clean.append(section)
964 if to_clean:
965 self.log.debug('Upgrade: Cleaning up container_image for %s...' %
966 to_clean)
967 for section in to_clean:
968 ret, image, err = self.mon_command({
969 'prefix': 'config rm',
970 'name': 'container_image',
971 'who': section,
972 })
973
974 self.log.info('Upgrade: All %s daemons are up to date.' %
975 daemon_type)
976
977 # clean up
978 self.log.info('Upgrade: Finalizing container_image settings')
979 ret, out, err = self.mon_command({
980 'prefix': 'config set',
981 'name': 'container_image',
982 'value': target_name,
983 'who': 'global',
984 })
985 for daemon_type in CEPH_UPGRADE_ORDER:
986 ret, image, err = self.mon_command({
987 'prefix': 'config rm',
988 'name': 'container_image',
989 'who': utils.name_to_config_section(daemon_type),
990 })
991
992 self.log.info('Upgrade: Complete!')
993 if 'progress_id' in self.upgrade_state:
994 self.remote('progress', 'complete',
995 self.upgrade_state['progress_id'])
996 self.upgrade_state = None
997 self._save_upgrade_state()
998 return
999
1000 def _check_hosts(self):
1001 self.log.debug('_check_hosts')
1002 bad_hosts = []
1003 hosts = self.inventory.keys()
1004 for host in hosts:
1005 if host not in self.inventory:
1006 continue
1007 self.log.debug(' checking %s' % host)
1008 try:
1009 out, err, code = self._run_cephadm(
1010 host, 'client', 'check-host', [],
1011 error_ok=True, no_fsid=True)
1012 if code:
1013 self.log.debug(' host %s failed check' % host)
1014 if self.warn_on_failed_host_check:
1015 bad_hosts.append('host %s failed check: %s' % (host, err))
1016 else:
1017 self.log.debug(' host %s ok' % host)
1018 except Exception as e:
1019 self.log.debug(' host %s failed check' % host)
1020 bad_hosts.append('host %s failed check: %s' % (host, e))
1021 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1022 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
1023 if bad_hosts:
1024 self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
1025 'severity': 'warning',
1026 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
1027 'count': len(bad_hosts),
1028 'detail': bad_hosts,
1029 }
1030 self.set_health_checks(self.health_checks)
1031
1032 def _check_host(self, host):
1033 if host not in self.inventory:
1034 return
1035 self.log.debug(' checking %s' % host)
1036 try:
1037 out, err, code = self._run_cephadm(
1038 host, 'client', 'check-host', [],
1039 error_ok=True, no_fsid=True)
1040 self.cache.update_last_host_check(host)
1041 self.cache.save_host(host)
1042 if code:
1043 self.log.debug(' host %s failed check' % host)
1044 if self.warn_on_failed_host_check:
1045 return 'host %s failed check: %s' % (host, err)
1046 else:
1047 self.log.debug(' host %s ok' % host)
1048 except Exception as e:
1049 self.log.debug(' host %s failed check' % host)
1050 return 'host %s failed check: %s' % (host, e)
1051
1052 def _check_for_strays(self):
1053 self.log.debug('_check_for_strays')
1054 for k in ['CEPHADM_STRAY_HOST',
1055 'CEPHADM_STRAY_DAEMON']:
1056 if k in self.health_checks:
1057 del self.health_checks[k]
1058 if self.warn_on_stray_hosts or self.warn_on_stray_daemons:
1059 ls = self.list_servers()
1060 managed = self.cache.get_daemon_names()
1061 host_detail = [] # type: List[str]
1062 host_num_daemons = 0
1063 daemon_detail = [] # type: List[str]
1064 for item in ls:
1065 host = item.get('hostname')
1066 daemons = item.get('services') # misnomer!
1067 missing_names = []
1068 for s in daemons:
1069 name = '%s.%s' % (s.get('type'), s.get('id'))
1070 if host not in self.inventory:
1071 missing_names.append(name)
1072 host_num_daemons += 1
1073 if name not in managed:
1074 daemon_detail.append(
1075 'stray daemon %s on host %s not managed by cephadm' % (name, host))
1076 if missing_names:
1077 host_detail.append(
1078 'stray host %s has %d stray daemons: %s' % (
1079 host, len(missing_names), missing_names))
1080 if host_detail:
1081 self.health_checks['CEPHADM_STRAY_HOST'] = {
1082 'severity': 'warning',
1083 'summary': '%d stray host(s) with %s daemon(s) '
1084 'not managed by cephadm' % (
1085 len(host_detail), host_num_daemons),
1086 'count': len(host_detail),
1087 'detail': host_detail,
1088 }
1089 if daemon_detail:
1090 self.health_checks['CEPHADM_STRAY_DAEMON'] = {
1091 'severity': 'warning',
1092 'summary': '%d stray daemons(s) not managed by cephadm' % (
1093 len(daemon_detail)),
1094 'count': len(daemon_detail),
1095 'detail': daemon_detail,
1096 }
1097 self.set_health_checks(self.health_checks)
1098
1099 def _serve_sleep(self):
1100 sleep_interval = 600
1101 self.log.debug('Sleeping for %d seconds', sleep_interval)
1102 ret = self.event.wait(sleep_interval)
1103 self.event.clear()
1104
1105 def serve(self):
1106 # type: () -> None
1107 self.log.debug("serve starting")
1108 while self.run:
1109
1110 # refresh daemons
1111 self.log.debug('refreshing hosts')
1112 bad_hosts = []
1113 failures = []
1114 for host in self.cache.get_hosts():
1115 if self.cache.host_needs_check(host):
1116 r = self._check_host(host)
1117 if r is not None:
1118 bad_hosts.append(r)
1119 if self.cache.host_needs_daemon_refresh(host):
1120 self.log.debug('refreshing %s daemons' % host)
1121 r = self._refresh_host_daemons(host)
1122 if r:
1123 failures.append(r)
1124 if self.cache.host_needs_device_refresh(host):
1125 self.log.debug('refreshing %s devices' % host)
1126 r = self._refresh_host_devices(host)
1127 if r:
1128 failures.append(r)
1129
1130 health_changed = False
1131 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1132 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
1133 health_changed = True
1134 if bad_hosts:
1135 self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
1136 'severity': 'warning',
1137 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
1138 'count': len(bad_hosts),
1139 'detail': bad_hosts,
1140 }
1141 health_changed = True
1142 if failures:
1143 self.health_checks['CEPHADM_REFRESH_FAILED'] = {
1144 'severity': 'warning',
1145 'summary': 'failed to probe daemons or devices',
1146 'count': len(failures),
1147 'detail': failures,
1148 }
1149 health_changed = True
1150 elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
1151 del self.health_checks['CEPHADM_REFRESH_FAILED']
1152 health_changed = True
1153 if health_changed:
1154 self.set_health_checks(self.health_checks)
1155
1156
1157
1158 self._check_for_strays()
1159
1160 if self.paused:
1161 self.health_checks['CEPHADM_PAUSED'] = {
1162 'severity': 'warning',
1163 'summary': 'cephadm background work is paused',
1164 'count': 1,
1165 'detail': ["'ceph orch resume' to resume"],
1166 }
1167 self.set_health_checks(self.health_checks)
1168 else:
1169 if 'CEPHADM_PAUSED' in self.health_checks:
1170 del self.health_checks['CEPHADM_PAUSED']
1171 self.set_health_checks(self.health_checks)
1172
1173 self.rm_util._remove_osds_bg()
1174
1175 if self._apply_all_services():
1176 continue # did something, refresh
1177
1178 self._check_daemons()
1179
1180 if self.upgrade_state and not self.upgrade_state.get('paused'):
1181 self._do_upgrade()
1182 continue
1183
1184 self._serve_sleep()
1185 self.log.debug("serve exit")
1186
1187 def config_notify(self):
1188 """
1189 This method is called whenever one of our config options is changed.
1190 """
1191 for opt in self.MODULE_OPTIONS:
1192 setattr(self,
1193 opt['name'], # type: ignore
1194 self.get_module_option(opt['name'])) # type: ignore
1195 self.log.debug(' mgr option %s = %s',
1196 opt['name'], getattr(self, opt['name'])) # type: ignore
1197 for opt in self.NATIVE_OPTIONS:
1198 setattr(self,
1199 opt, # type: ignore
1200 self.get_ceph_option(opt))
1201 self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
1202
1203 self.event.set()
1204
1205 def notify(self, notify_type, notify_id):
1206 pass
1207
1208 def pause(self):
1209 if not self.paused:
1210 self.log.info('Paused')
1211 self.set_store('pause', 'true')
1212 self.paused = True
1213 # wake loop so we update the health status
1214 self._kick_serve_loop()
1215
1216 def resume(self):
1217 if self.paused:
1218 self.log.info('Resumed')
1219 self.paused = False
1220 self.set_store('pause', None)
1221 # unconditionally wake loop so that 'orch resume' can be used to kick
1222 # cephadm
1223 self._kick_serve_loop()
1224
1225 def get_unique_name(self, daemon_type, host, existing, prefix=None,
1226 forcename=None):
1227 # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str
1228 """
1229 Generate a unique random service name
1230 """
1231 suffix = daemon_type not in [
1232 'mon', 'crash', 'nfs',
1233 'prometheus', 'node-exporter', 'grafana', 'alertmanager',
1234 ]
1235 if forcename:
1236 if len([d for d in existing if d.daemon_id == forcename]):
1237 raise orchestrator.OrchestratorValidationError('name %s already in use', forcename)
1238 return forcename
1239
1240 if '.' in host:
1241 host = host.split('.')[0]
1242 while True:
1243 if prefix:
1244 name = prefix + '.'
1245 else:
1246 name = ''
1247 name += host
1248 if suffix:
1249 name += '.' + ''.join(random.choice(string.ascii_lowercase)
1250 for _ in range(6))
1251 if len([d for d in existing if d.daemon_id == name]):
1252 if not suffix:
1253 raise orchestrator.OrchestratorValidationError('name %s already in use', name)
1254 self.log.debug('name %s exists, trying again', name)
1255 continue
1256 return name
1257
1258 def get_service_name(self, daemon_type, daemon_id, host):
1259 # type: (str, str, str) -> (str)
1260 """
1261 Returns the generic service name
1262 """
1263 p = re.compile(r'(.*)\.%s.*' % (host))
1264 p.sub(r'\1', daemon_id)
1265 return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id))
1266
1267 def _save_inventory(self):
1268 self.set_store('inventory', json.dumps(self.inventory))
1269
1270 def _save_upgrade_state(self):
1271 self.set_store('upgrade_state', json.dumps(self.upgrade_state))
1272
1273 def _reconfig_ssh(self):
1274 temp_files = [] # type: list
1275 ssh_options = [] # type: List[str]
1276
1277 # ssh_config
1278 ssh_config_fname = self.ssh_config_file
1279 ssh_config = self.get_store("ssh_config")
1280 if ssh_config is not None or ssh_config_fname is None:
1281 if not ssh_config:
1282 ssh_config = DEFAULT_SSH_CONFIG
1283 f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
1284 os.fchmod(f.fileno(), 0o600)
1285 f.write(ssh_config.encode('utf-8'))
1286 f.flush() # make visible to other processes
1287 temp_files += [f]
1288 ssh_config_fname = f.name
1289 if ssh_config_fname:
1290 self.validate_ssh_config_fname(ssh_config_fname)
1291 ssh_options += ['-F', ssh_config_fname]
1292
1293 # identity
1294 ssh_key = self.get_store("ssh_identity_key")
1295 ssh_pub = self.get_store("ssh_identity_pub")
1296 self.ssh_pub = ssh_pub
1297 self.ssh_key = ssh_key
1298 if ssh_key and ssh_pub:
1299 tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
1300 tkey.write(ssh_key.encode('utf-8'))
1301 os.fchmod(tkey.fileno(), 0o600)
1302 tkey.flush() # make visible to other processes
1303 tpub = open(tkey.name + '.pub', 'w')
1304 os.fchmod(tpub.fileno(), 0o600)
1305 tpub.write(ssh_pub)
1306 tpub.flush() # make visible to other processes
1307 temp_files += [tkey, tpub]
1308 ssh_options += ['-i', tkey.name]
1309
1310 self._temp_files = temp_files
1311 if ssh_options:
1312 self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
1313 else:
1314 self._ssh_options = None
1315
1316 if self.mode == 'root':
1317 self.ssh_user = 'root'
1318 elif self.mode == 'cephadm-package':
1319 self.ssh_user = 'cephadm'
1320
1321 self._reset_cons()
1322
1323 def validate_ssh_config_fname(self, ssh_config_fname):
1324 if not os.path.isfile(ssh_config_fname):
1325 raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
1326 ssh_config_fname))
1327
1328 def _reset_con(self, host):
1329 conn, r = self._cons.get(host, (None, None))
1330 if conn:
1331 self.log.debug('_reset_con close %s' % host)
1332 conn.exit()
1333 del self._cons[host]
1334
1335 def _reset_cons(self):
1336 for host, conn_and_r in self._cons.items():
1337 self.log.debug('_reset_cons close %s' % host)
1338 conn, r = conn_and_r
1339 conn.exit()
1340 self._cons = {}
1341
1342 @staticmethod
1343 def can_run():
1344 if remoto is not None:
1345 return True, ""
1346 else:
1347 return False, "loading remoto library:{}".format(
1348 remoto_import_error)
1349
1350 def available(self):
1351 """
1352 The cephadm orchestrator is always available.
1353 """
1354 return self.can_run()
1355
1356 def process(self, completions):
1357 """
1358 Does nothing, as completions are processed in another thread.
1359 """
1360 if completions:
1361 self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
1362
1363 for p in completions:
1364 p.finalize()
1365
1366 def _require_hosts(self, hosts):
1367 """
1368 Raise an error if any of the given hosts are unregistered.
1369 """
1370 if isinstance(hosts, six.string_types):
1371 hosts = [hosts]
1372 keys = self.inventory.keys()
1373 unregistered_hosts = set(hosts) - keys
1374 if unregistered_hosts:
1375 logger.warning('keys = {}'.format(keys))
1376 raise RuntimeError("Host(s) {} not registered".format(
1377 ", ".join(map(lambda h: "'{}'".format(h),
1378 unregistered_hosts))))
1379
1380 @orchestrator._cli_write_command(
1381 prefix='cephadm set-ssh-config',
1382 desc='Set the ssh_config file (use -i <ssh_config>)')
1383 def _set_ssh_config(self, inbuf=None):
1384 """
1385 Set an ssh_config file provided from stdin
1386
1387 TODO:
1388 - validation
1389 """
1390 if inbuf is None or len(inbuf) == 0:
1391 return -errno.EINVAL, "", "empty ssh config provided"
1392 self.set_store("ssh_config", inbuf)
1393 self.log.info('Set ssh_config')
1394 return 0, "", ""
1395
1396 @orchestrator._cli_write_command(
1397 prefix='cephadm clear-ssh-config',
1398 desc='Clear the ssh_config file')
1399 def _clear_ssh_config(self):
1400 """
1401 Clear the ssh_config file provided from stdin
1402 """
1403 self.set_store("ssh_config", None)
1404 self.ssh_config_tmp = None
1405 self.log.info('Cleared ssh_config')
1406 return 0, "", ""
1407
1408 @orchestrator._cli_read_command(
1409 prefix='cephadm get-ssh-config',
1410 desc='Returns the ssh config as used by cephadm'
1411 )
1412 def _get_ssh_config(self):
1413 if self.ssh_config_file:
1414 self.validate_ssh_config_fname(self.ssh_config_file)
1415 with open(self.ssh_config_file) as f:
1416 return HandleCommandResult(stdout=f.read())
1417 ssh_config = self.get_store("ssh_config")
1418 if ssh_config:
1419 return HandleCommandResult(stdout=ssh_config)
1420 return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
1421
1422
1423 @orchestrator._cli_write_command(
1424 'cephadm generate-key',
1425 desc='Generate a cluster SSH key (if not present)')
1426 def _generate_key(self):
1427 if not self.ssh_pub or not self.ssh_key:
1428 self.log.info('Generating ssh key...')
1429 tmp_dir = TemporaryDirectory()
1430 path = tmp_dir.name + '/key'
1431 try:
1432 subprocess.call([
1433 '/usr/bin/ssh-keygen',
1434 '-C', 'ceph-%s' % self._cluster_fsid,
1435 '-N', '',
1436 '-f', path
1437 ])
1438 with open(path, 'r') as f:
1439 secret = f.read()
1440 with open(path + '.pub', 'r') as f:
1441 pub = f.read()
1442 finally:
1443 os.unlink(path)
1444 os.unlink(path + '.pub')
1445 tmp_dir.cleanup()
1446 self.set_store('ssh_identity_key', secret)
1447 self.set_store('ssh_identity_pub', pub)
1448 self._reconfig_ssh()
1449 return 0, '', ''
1450
1451 @orchestrator._cli_write_command(
1452 'cephadm clear-key',
1453 desc='Clear cluster SSH key')
1454 def _clear_key(self):
1455 self.set_store('ssh_identity_key', None)
1456 self.set_store('ssh_identity_pub', None)
1457 self._reconfig_ssh()
1458 self.log.info('Cleared cluster SSH key')
1459 return 0, '', ''
1460
1461 @orchestrator._cli_read_command(
1462 'cephadm get-pub-key',
1463 desc='Show SSH public key for connecting to cluster hosts')
1464 def _get_pub_key(self):
1465 if self.ssh_pub:
1466 return 0, self.ssh_pub, ''
1467 else:
1468 return -errno.ENOENT, '', 'No cluster SSH key defined'
1469
1470 @orchestrator._cli_read_command(
1471 'cephadm get-user',
1472 desc='Show user for SSHing to cluster hosts')
1473 def _get_user(self):
1474 return 0, self.ssh_user, ''
1475
1476 @orchestrator._cli_read_command(
1477 'cephadm check-host',
1478 'name=host,type=CephString '
1479 'name=addr,type=CephString,req=false',
1480 'Check whether we can access and manage a remote host')
1481 def check_host(self, host, addr=None):
1482 out, err, code = self._run_cephadm(host, 'client', 'check-host',
1483 ['--expect-hostname', host],
1484 addr=addr,
1485 error_ok=True, no_fsid=True)
1486 if code:
1487 return 1, '', ('check-host failed:\n' + '\n'.join(err))
1488 # if we have an outstanding health alert for this host, give the
1489 # serve thread a kick
1490 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1491 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1492 if item.startswith('host %s ' % host):
1493 self.event.set()
1494 return 0, '%s (%s) ok' % (host, addr), err
1495
1496 @orchestrator._cli_read_command(
1497 'cephadm prepare-host',
1498 'name=host,type=CephString '
1499 'name=addr,type=CephString,req=false',
1500 'Prepare a remote host for use with cephadm')
1501 def _prepare_host(self, host, addr=None):
1502 out, err, code = self._run_cephadm(host, 'client', 'prepare-host',
1503 ['--expect-hostname', host],
1504 addr=addr,
1505 error_ok=True, no_fsid=True)
1506 if code:
1507 return 1, '', ('prepare-host failed:\n' + '\n'.join(err))
1508 # if we have an outstanding health alert for this host, give the
1509 # serve thread a kick
1510 if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
1511 for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']:
1512 if item.startswith('host %s ' % host):
1513 self.event.set()
1514 return 0, '%s (%s) ok' % (host, addr), err
1515
1516 def _get_connection(self, host):
1517 """
1518 Setup a connection for running commands on remote host.
1519 """
1520 conn_and_r = self._cons.get(host)
1521 if conn_and_r:
1522 self.log.debug('Have connection to %s' % host)
1523 return conn_and_r
1524 n = self.ssh_user + '@' + host
1525 self.log.debug("Opening connection to {} with ssh options '{}'".format(
1526 n, self._ssh_options))
1527 child_logger=self.log.getChild(n)
1528 child_logger.setLevel('WARNING')
1529 conn = remoto.Connection(
1530 n,
1531 logger=child_logger,
1532 ssh_options=self._ssh_options)
1533
1534 r = conn.import_module(remotes)
1535 self._cons[host] = conn, r
1536
1537 return conn, r
1538
1539 def _executable_path(self, conn, executable):
1540 """
1541 Remote validator that accepts a connection object to ensure that a certain
1542 executable is available returning its full path if so.
1543
1544 Otherwise an exception with thorough details will be raised, informing the
1545 user that the executable was not found.
1546 """
1547 executable_path = conn.remote_module.which(executable)
1548 if not executable_path:
1549 raise RuntimeError("Executable '{}' not found on host '{}'".format(
1550 executable, conn.hostname))
1551 self.log.debug("Found executable '{}' at path '{}'".format(executable,
1552 executable_path))
1553 return executable_path
1554
1555 def _run_cephadm(self, host, entity, command, args,
1556 addr=None,
1557 stdin=None,
1558 no_fsid=False,
1559 error_ok=False,
1560 image=None):
1561 # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int]
1562 """
1563 Run cephadm on the remote host with the given command + args
1564 """
1565 if not addr and host in self.inventory:
1566 addr = self.inventory[host].get('addr', host)
1567
1568 try:
1569 conn, connr = self._get_connection(addr)
1570
1571 assert image or entity
1572 if not image:
1573 daemon_type = entity.split('.', 1)[0] # type: ignore
1574 if daemon_type in CEPH_TYPES:
1575 # get container image
1576 ret, image, err = self.mon_command({
1577 'prefix': 'config get',
1578 'who': utils.name_to_config_section(entity),
1579 'key': 'container_image',
1580 })
1581 image = image.strip() # type: ignore
1582 self.log.debug('%s container image %s' % (entity, image))
1583
1584 final_args = []
1585 if image:
1586 final_args.extend(['--image', image])
1587 final_args.append(command)
1588
1589 if not no_fsid:
1590 final_args += ['--fsid', self._cluster_fsid]
1591 final_args += args
1592
1593 if self.mode == 'root':
1594 self.log.debug('args: %s' % (' '.join(final_args)))
1595 if stdin:
1596 self.log.debug('stdin: %s' % stdin)
1597 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
1598 if stdin:
1599 script += 'injected_stdin = ' + json.dumps(stdin) + '\n'
1600 script += self._cephadm
1601 python = connr.choose_python()
1602 if not python:
1603 raise RuntimeError(
1604 'unable to find python on %s (tried %s in %s)' % (
1605 host, remotes.PYTHONS, remotes.PATH))
1606 try:
1607 out, err, code = remoto.process.check(
1608 conn,
1609 [python, '-u'],
1610 stdin=script.encode('utf-8'))
1611 except RuntimeError as e:
1612 self._reset_con(host)
1613 if error_ok:
1614 return [], [str(e)], 1
1615 raise
1616 elif self.mode == 'cephadm-package':
1617 try:
1618 out, err, code = remoto.process.check(
1619 conn,
1620 ['sudo', '/usr/bin/cephadm'] + final_args,
1621 stdin=stdin)
1622 except RuntimeError as e:
1623 self._reset_con(host)
1624 if error_ok:
1625 return [], [str(e)], 1
1626 raise
1627 else:
1628 assert False, 'unsupported mode'
1629
1630 self.log.debug('code: %d' % code)
1631 if out:
1632 self.log.debug('out: %s' % '\n'.join(out))
1633 if err:
1634 self.log.debug('err: %s' % '\n'.join(err))
1635 if code and not error_ok:
1636 raise RuntimeError(
1637 'cephadm exited with an error code: %d, stderr:%s' % (
1638 code, '\n'.join(err)))
1639 return out, err, code
1640
1641 except execnet.gateway_bootstrap.HostNotFound as e:
1642 # this is a misleading exception as it seems to be thrown for
1643 # any sort of connection failure, even those having nothing to
1644 # do with "host not found" (e.g., ssh key permission denied).
1645 user = 'root' if self.mode == 'root' else 'cephadm'
1646 msg = f'Failed to connect to {host} ({addr}). ' \
1647 f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
1648 f'you may want to run: \n' \
1649 f'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}'
1650 raise OrchestratorError(msg) from e
1651 except Exception as ex:
1652 self.log.exception(ex)
1653 raise
1654
1655 def _get_hosts(self, label=None):
1656 # type: (Optional[str]) -> List[str]
1657 r = []
1658 for h, hostspec in self.inventory.items():
1659 if not label or label in hostspec.get('labels', []):
1660 r.append(h)
1661 return r
1662
1663 @async_completion
1664 def add_host(self, spec):
1665 # type: (HostSpec) -> str
1666 """
1667 Add a host to be managed by the orchestrator.
1668
1669 :param host: host name
1670 """
1671 assert_valid_host(spec.hostname)
1672 out, err, code = self._run_cephadm(spec.hostname, 'client', 'check-host',
1673 ['--expect-hostname', spec.hostname],
1674 addr=spec.addr,
1675 error_ok=True, no_fsid=True)
1676 if code:
1677 raise OrchestratorError('New host %s (%s) failed check: %s' % (
1678 spec.hostname, spec.addr, err))
1679
1680 self.inventory[spec.hostname] = spec.to_json()
1681 self._save_inventory()
1682 self.cache.prime_empty_host(spec.hostname)
1683 self.event.set() # refresh stray health check
1684 self.log.info('Added host %s' % spec.hostname)
1685 return "Added host '{}'".format(spec.hostname)
1686
1687 @async_completion
1688 def remove_host(self, host):
1689 # type: (str) -> str
1690 """
1691 Remove a host from orchestrator management.
1692
1693 :param host: host name
1694 """
1695 del self.inventory[host]
1696 self._save_inventory()
1697 self.cache.rm_host(host)
1698 self._reset_con(host)
1699 self.event.set() # refresh stray health check
1700 self.log.info('Removed host %s' % host)
1701 return "Removed host '{}'".format(host)
1702
1703 @async_completion
1704 def update_host_addr(self, host, addr):
1705 if host not in self.inventory:
1706 raise OrchestratorError('host %s not registered' % host)
1707 self.inventory[host]['addr'] = addr
1708 self._save_inventory()
1709 self._reset_con(host)
1710 self.event.set() # refresh stray health check
1711 self.log.info('Set host %s addr to %s' % (host, addr))
1712 return "Updated host '{}' addr to '{}'".format(host, addr)
1713
1714 @trivial_completion
1715 def get_hosts(self):
1716 # type: () -> List[orchestrator.HostSpec]
1717 """
1718 Return a list of hosts managed by the orchestrator.
1719
1720 Notes:
1721 - skip async: manager reads from cache.
1722 """
1723 r = []
1724 for hostname, info in self.inventory.items():
1725 r.append(orchestrator.HostSpec(
1726 hostname,
1727 addr=info.get('addr', hostname),
1728 labels=info.get('labels', []),
1729 status=info.get('status', ''),
1730 ))
1731 return r
1732
1733 @async_completion
1734 def add_host_label(self, host, label):
1735 if host not in self.inventory:
1736 raise OrchestratorError('host %s does not exist' % host)
1737
1738 if 'labels' not in self.inventory[host]:
1739 self.inventory[host]['labels'] = list()
1740 if label not in self.inventory[host]['labels']:
1741 self.inventory[host]['labels'].append(label)
1742 self._save_inventory()
1743 self.log.info('Added label %s to host %s' % (label, host))
1744 return 'Added label %s to host %s' % (label, host)
1745
1746 @async_completion
1747 def remove_host_label(self, host, label):
1748 if host not in self.inventory:
1749 raise OrchestratorError('host %s does not exist' % host)
1750
1751 if 'labels' not in self.inventory[host]:
1752 self.inventory[host]['labels'] = list()
1753 if label in self.inventory[host]['labels']:
1754 self.inventory[host]['labels'].remove(label)
1755 self._save_inventory()
1756 self.log.info('Removed label %s to host %s' % (label, host))
1757 return 'Removed label %s from host %s' % (label, host)
1758
1759 def _refresh_host_daemons(self, host):
1760 try:
1761 out, err, code = self._run_cephadm(
1762 host, 'mon', 'ls', [], no_fsid=True)
1763 if code:
1764 return 'host %s cephadm ls returned %d: %s' % (
1765 host, code, err)
1766 except Exception as e:
1767 return 'host %s scrape failed: %s' % (host, e)
1768 ls = json.loads(''.join(out))
1769 dm = {}
1770 for d in ls:
1771 if not d['style'].startswith('cephadm'):
1772 continue
1773 if d['fsid'] != self._cluster_fsid:
1774 continue
1775 if '.' not in d['name']:
1776 continue
1777 sd = orchestrator.DaemonDescription()
1778 sd.last_refresh = datetime.datetime.utcnow()
1779 for k in ['created', 'started', 'last_configured', 'last_deployed']:
1780 v = d.get(k, None)
1781 if v:
1782 setattr(sd, k, datetime.datetime.strptime(d[k], DATEFMT))
1783 sd.daemon_type = d['name'].split('.')[0]
1784 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
1785 sd.hostname = host
1786 sd.container_id = d.get('container_id')
1787 if sd.container_id:
1788 # shorten the hash
1789 sd.container_id = sd.container_id[0:12]
1790 sd.container_image_name = d.get('container_image_name')
1791 sd.container_image_id = d.get('container_image_id')
1792 sd.version = d.get('version')
1793 if 'state' in d:
1794 sd.status_desc = d['state']
1795 sd.status = {
1796 'running': 1,
1797 'stopped': 0,
1798 'error': -1,
1799 'unknown': -1,
1800 }[d['state']]
1801 else:
1802 sd.status_desc = 'unknown'
1803 sd.status = None
1804 dm[sd.name()] = sd
1805 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
1806 self.cache.update_host_daemons(host, dm)
1807 self.cache.save_host(host)
1808 return None
1809
1810 def _refresh_host_devices(self, host):
1811 try:
1812 out, err, code = self._run_cephadm(
1813 host, 'osd',
1814 'ceph-volume',
1815 ['--', 'inventory', '--format=json'])
1816 if code:
1817 return 'host %s ceph-volume inventory returned %d: %s' % (
1818 host, code, err)
1819 except Exception as e:
1820 return 'host %s ceph-volume inventory failed: %s' % (host, e)
1821 devices = json.loads(''.join(out))
1822 try:
1823 out, err, code = self._run_cephadm(
1824 host, 'mon',
1825 'list-networks',
1826 [],
1827 no_fsid=True)
1828 if code:
1829 return 'host %s list-networks returned %d: %s' % (
1830 host, code, err)
1831 except Exception as e:
1832 return 'host %s list-networks failed: %s' % (host, e)
1833 networks = json.loads(''.join(out))
1834 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
1835 host, len(devices), len(networks)))
1836 devices = inventory.Devices.from_json(devices)
1837 self.cache.update_host_devices_networks(host, devices.devices, networks)
1838 self.cache.save_host(host)
1839 return None
1840
1841 def _get_spec_size(self, spec):
1842 if spec.placement.count:
1843 return spec.placement.count
1844 elif spec.placement.host_pattern:
1845 return len(spec.placement.pattern_matches_hosts(self.inventory.keys()))
1846 elif spec.placement.label:
1847 return len(self._get_hosts(spec.placement.label))
1848 elif spec.placement.hosts:
1849 return len(spec.placement.hosts)
1850 # hmm!
1851 return 0
1852
1853 @trivial_completion
1854 def describe_service(self, service_type=None, service_name=None,
1855 refresh=False):
1856 if refresh:
1857 # ugly sync path, FIXME someday perhaps?
1858 for host, hi in self.inventory.items():
1859 self._refresh_host_daemons(host)
1860 # <service_map>
1861 sm = {} # type: Dict[str, orchestrator.ServiceDescription]
1862 for h, dm in self.cache.daemons.items():
1863 for name, dd in dm.items():
1864 if service_type and service_type != dd.daemon_type:
1865 continue
1866 n: str = dd.service_name()
1867 if service_name and service_name != n:
1868 continue
1869 if dd.daemon_type == 'osd':
1870 continue # ignore OSDs for now
1871 spec = None
1872 if dd.service_name() in self.spec_store.specs:
1873 spec = self.spec_store.specs[dd.service_name()]
1874 if n not in sm:
1875 sm[n] = orchestrator.ServiceDescription(
1876 service_name=n,
1877 last_refresh=dd.last_refresh,
1878 container_image_id=dd.container_image_id,
1879 container_image_name=dd.container_image_name,
1880 spec=spec,
1881 )
1882 if spec:
1883 sm[n].size = self._get_spec_size(spec)
1884 sm[n].created = self.spec_store.spec_created[dd.service_name()]
1885 else:
1886 sm[n].size = 0
1887 if dd.status == 1:
1888 sm[n].running += 1
1889 if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore
1890 sm[n].last_refresh = dd.last_refresh
1891 if sm[n].container_image_id != dd.container_image_id:
1892 sm[n].container_image_id = 'mix'
1893 if sm[n].container_image_name != dd.container_image_name:
1894 sm[n].container_image_name = 'mix'
1895 for n, spec in self.spec_store.specs.items():
1896 if n in sm:
1897 continue
1898 if service_type is not None and service_type != spec.service_type:
1899 continue
1900 if service_name is not None and service_name != n:
1901 continue
1902 sm[n] = orchestrator.ServiceDescription(
1903 service_name=n,
1904 spec=spec,
1905 size=self._get_spec_size(spec),
1906 running=0,
1907 )
1908 return [s for n, s in sm.items()]
1909
1910 @trivial_completion
1911 def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
1912 host=None, refresh=False):
1913 if refresh:
1914 # ugly sync path, FIXME someday perhaps?
1915 if host:
1916 self._refresh_host_daemons(host)
1917 else:
1918 for hostname, hi in self.inventory.items():
1919 self._refresh_host_daemons(hostname)
1920 result = []
1921 for h, dm in self.cache.daemons.items():
1922 if host and h != host:
1923 continue
1924 for name, dd in dm.items():
1925 if daemon_type is not None and daemon_type != dd.daemon_type:
1926 continue
1927 if daemon_id is not None and daemon_id != dd.daemon_id:
1928 continue
1929 if service_name is not None and service_name != dd.service_name():
1930 continue
1931 result.append(dd)
1932 return result
1933
1934 def service_action(self, action, service_name):
1935 args = []
1936 for host, dm in self.cache.daemons.items():
1937 for name, d in dm.items():
1938 if d.matches_service(service_name):
1939 args.append((d.daemon_type, d.daemon_id,
1940 d.hostname, action))
1941 self.log.info('%s service %s' % (action.capitalize(), service_name))
1942 return self._daemon_actions(args)
1943
1944 @async_map_completion
1945 def _daemon_actions(self, daemon_type, daemon_id, host, action):
1946 return self._daemon_action(daemon_type, daemon_id, host, action)
1947
1948 def _daemon_action(self, daemon_type, daemon_id, host, action):
1949 if action == 'redeploy':
1950 # stop, recreate the container+unit, then restart
1951 return self._create_daemon(daemon_type, daemon_id, host)
1952 elif action == 'reconfig':
1953 return self._create_daemon(daemon_type, daemon_id, host,
1954 reconfig=True)
1955
1956 actions = {
1957 'start': ['reset-failed', 'start'],
1958 'stop': ['stop'],
1959 'restart': ['reset-failed', 'restart'],
1960 }
1961 name = '%s.%s' % (daemon_type, daemon_id)
1962 for a in actions[action]:
1963 out, err, code = self._run_cephadm(
1964 host, name, 'unit',
1965 ['--name', name, a],
1966 error_ok=True)
1967 self.cache.invalidate_host_daemons(host)
1968 return "{} {} from host '{}'".format(action, name, host)
1969
1970 def daemon_action(self, action, daemon_type, daemon_id):
1971 args = []
1972 for host, dm in self.cache.daemons.items():
1973 for name, d in dm.items():
1974 if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
1975 args.append((d.daemon_type, d.daemon_id,
1976 d.hostname, action))
1977 if not args:
1978 raise orchestrator.OrchestratorError(
1979 'Unable to find %s.%s daemon(s)' % (
1980 daemon_type, daemon_id))
1981 self.log.info('%s daemons %s' % (
1982 action.capitalize(),
1983 ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
1984 return self._daemon_actions(args)
1985
1986 def remove_daemons(self, names):
1987 # type: (List[str]) -> orchestrator.Completion
1988 args = []
1989 for host, dm in self.cache.daemons.items():
1990 for name in names:
1991 if name in dm:
1992 args.append((name, host))
1993 if not args:
1994 raise OrchestratorError('Unable to find daemon(s) %s' % (names))
1995 self.log.info('Remove daemons %s' % [a[0] for a in args])
1996 return self._remove_daemons(args)
1997
1998 @trivial_completion
1999 def remove_service(self, service_name):
2000 self.log.info('Remove service %s' % service_name)
2001 self.spec_store.rm(service_name)
2002 self._kick_serve_loop()
2003 return ['Removed service %s' % service_name]
2004
2005 @trivial_completion
2006 def get_inventory(self, host_filter=None, refresh=False):
2007 """
2008 Return the storage inventory of hosts matching the given filter.
2009
2010 :param host_filter: host filter
2011
2012 TODO:
2013 - add filtering by label
2014 """
2015 if refresh:
2016 # ugly sync path, FIXME someday perhaps?
2017 if host_filter:
2018 for host in host_filter.hosts:
2019 self._refresh_host_devices(host)
2020 else:
2021 for host, hi in self.inventory.items():
2022 self._refresh_host_devices(host)
2023
2024 result = []
2025 for host, dls in self.cache.devices.items():
2026 if host_filter and host not in host_filter.hosts:
2027 continue
2028 result.append(orchestrator.InventoryHost(host,
2029 inventory.Devices(dls)))
2030 return result
2031
2032 @trivial_completion
2033 def zap_device(self, host, path):
2034 self.log.info('Zap device %s:%s' % (host, path))
2035 out, err, code = self._run_cephadm(
2036 host, 'osd', 'ceph-volume',
2037 ['--', 'lvm', 'zap', '--destroy', path],
2038 error_ok=True)
2039 self.cache.invalidate_host_devices(host)
2040 if code:
2041 raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
2042 return '\n'.join(out + err)
2043
2044 def blink_device_light(self, ident_fault, on, locs):
2045 @async_map_completion
2046 def blink(host, dev, path):
2047 cmd = [
2048 'lsmcli',
2049 'local-disk-%s-led-%s' % (
2050 ident_fault,
2051 'on' if on else 'off'),
2052 '--path', path or dev,
2053 ]
2054 out, err, code = self._run_cephadm(
2055 host, 'osd', 'shell', ['--'] + cmd,
2056 error_ok=True)
2057 if code:
2058 raise RuntimeError(
2059 'Unable to affect %s light for %s:%s. Command: %s' % (
2060 ident_fault, host, dev, ' '.join(cmd)))
2061 self.log.info('Set %s light for %s:%s %s' % (
2062 ident_fault, host, dev, 'on' if on else 'off'))
2063 return "Set %s light for %s:%s %s" % (
2064 ident_fault, host, dev, 'on' if on else 'off')
2065
2066 return blink(locs)
2067
2068 def get_osd_uuid_map(self, only_up=False):
2069 # type: (bool) -> Dict[str,str]
2070 osd_map = self.get('osd_map')
2071 r = {}
2072 for o in osd_map['osds']:
2073 # only include OSDs that have ever started in this map. this way
2074 # an interrupted osd create can be repeated and succeed the second
2075 # time around.
2076 if not only_up or o['up_from'] > 0:
2077 r[str(o['osd'])] = o['uuid']
2078 return r
2079
2080 @trivial_completion
2081 def apply_drivegroups(self, specs: List[DriveGroupSpec]):
2082 return [self._apply(spec) for spec in specs]
2083
2084 @trivial_completion
2085 def create_osds(self, drive_group: DriveGroupSpec):
2086 self.log.debug("Processing DriveGroup {}".format(drive_group))
2087 # 1) use fn_filter to determine matching_hosts
2088 matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
2089 # 2) Map the inventory to the InventoryHost object
2090
2091 def _find_inv_for_host(hostname: str, inventory_dict: dict):
2092 # This is stupid and needs to be loaded with the host
2093 for _host, _inventory in inventory_dict.items():
2094 if _host == hostname:
2095 return _inventory
2096 raise OrchestratorError("No inventory found for host: {}".format(hostname))
2097
2098 ret = []
2099 # 3) iterate over matching_host and call DriveSelection and to_ceph_volume
2100 self.log.debug(f"Checking matching hosts -> {matching_hosts}")
2101 for host in matching_hosts:
2102 inventory_for_host = _find_inv_for_host(host, self.cache.devices)
2103 self.log.debug(f"Found inventory for host {inventory_for_host}")
2104 drive_selection = selector.DriveSelection(drive_group, inventory_for_host)
2105 self.log.debug(f"Found drive selection {drive_selection}")
2106 cmd = translate.to_ceph_volume(drive_group, drive_selection).run()
2107 self.log.debug(f"translated to cmd {cmd}")
2108 if not cmd:
2109 self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
2110 continue
2111 self.log.info('Applying %s on host %s...' % (
2112 drive_group.service_name(), host))
2113 ret_msg = self._create_osd(host, cmd)
2114 ret.append(ret_msg)
2115 return ", ".join(ret)
2116
2117 def _create_osd(self, host, cmd):
2118
2119 self._require_hosts(host)
2120
2121 # get bootstrap key
2122 ret, keyring, err = self.mon_command({
2123 'prefix': 'auth get',
2124 'entity': 'client.bootstrap-osd',
2125 })
2126
2127 # generate config
2128 ret, config, err = self.mon_command({
2129 "prefix": "config generate-minimal-conf",
2130 })
2131
2132 j = json.dumps({
2133 'config': config,
2134 'keyring': keyring,
2135 })
2136
2137 before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
2138
2139 split_cmd = cmd.split(' ')
2140 _cmd = ['--config-json', '-', '--']
2141 _cmd.extend(split_cmd)
2142 out, err, code = self._run_cephadm(
2143 host, 'osd', 'ceph-volume',
2144 _cmd,
2145 stdin=j,
2146 error_ok=True)
2147 if code == 1 and ', it is already prepared' in '\n'.join(err):
2148 # HACK: when we create against an existing LV, ceph-volume
2149 # returns an error and the above message. To make this
2150 # command idempotent, tolerate this "error" and continue.
2151 self.log.debug('the device was already prepared; continuing')
2152 code = 0
2153 if code:
2154 raise RuntimeError(
2155 'cephadm exited with an error code: %d, stderr:%s' % (
2156 code, '\n'.join(err)))
2157
2158 # check result
2159 out, err, code = self._run_cephadm(
2160 host, 'osd', 'ceph-volume',
2161 [
2162 '--',
2163 'lvm', 'list',
2164 '--format', 'json',
2165 ])
2166 osds_elems = json.loads('\n'.join(out))
2167 fsid = self._cluster_fsid
2168 osd_uuid_map = self.get_osd_uuid_map()
2169 created = []
2170 for osd_id, osds in osds_elems.items():
2171 for osd in osds:
2172 if osd['tags']['ceph.cluster_fsid'] != fsid:
2173 self.log.debug('mismatched fsid, skipping %s' % osd)
2174 continue
2175 if osd_id in before_osd_uuid_map:
2176 # this osd existed before we ran prepare
2177 continue
2178 if osd_id not in osd_uuid_map:
2179 self.log.debug('osd id %d does not exist in cluster' % osd_id)
2180 continue
2181 if osd_uuid_map[osd_id] != osd['tags']['ceph.osd_fsid']:
2182 self.log.debug('mismatched osd uuid (cluster has %s, osd '
2183 'has %s)' % (
2184 osd_uuid_map[osd_id],
2185 osd['tags']['ceph.osd_fsid']))
2186 continue
2187
2188 created.append(osd_id)
2189 self._create_daemon(
2190 'osd', osd_id, host,
2191 osd_uuid_map=osd_uuid_map)
2192
2193 if created:
2194 self.cache.invalidate_host_devices(host)
2195 return "Created osd(s) %s on host '%s'" % (','.join(created), host)
2196 else:
2197 return "Created no osd(s) on host %s; already created?" % host
2198
2199 def _calc_daemon_deps(self, daemon_type, daemon_id):
2200 need = {
2201 'prometheus': ['mgr', 'alertmanager', 'node-exporter'],
2202 'grafana': ['prometheus'],
2203 'alertmanager': ['mgr', 'alertmanager'],
2204 }
2205 deps = []
2206 for dep_type in need.get(daemon_type, []):
2207 for dd in self.cache.get_daemons_by_service(dep_type):
2208 deps.append(dd.name())
2209 return sorted(deps)
2210
2211 def _get_config_and_keyring(self, daemon_type, daemon_id,
2212 keyring=None,
2213 extra_config=None):
2214 # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
2215 # keyring
2216 if not keyring:
2217 if daemon_type == 'mon':
2218 ename = 'mon.'
2219 else:
2220 ename = utils.name_to_config_section(daemon_type + '.' + daemon_id)
2221 ret, keyring, err = self.mon_command({
2222 'prefix': 'auth get',
2223 'entity': ename,
2224 })
2225
2226 # generate config
2227 ret, config, err = self.mon_command({
2228 "prefix": "config generate-minimal-conf",
2229 })
2230 if extra_config:
2231 config += extra_config
2232
2233 return {
2234 'config': config,
2235 'keyring': keyring,
2236 }
2237
2238 def _create_daemon(self, daemon_type, daemon_id, host,
2239 keyring=None,
2240 extra_args=None, extra_config=None,
2241 reconfig=False,
2242 osd_uuid_map=None):
2243 if not extra_args:
2244 extra_args = []
2245 name = '%s.%s' % (daemon_type, daemon_id)
2246
2247 start_time = datetime.datetime.utcnow()
2248 deps = [] # type: List[str]
2249 cephadm_config = {} # type: Dict[str, Any]
2250 if daemon_type == 'prometheus':
2251 cephadm_config, deps = self._generate_prometheus_config()
2252 extra_args.extend(['--config-json', '-'])
2253 elif daemon_type == 'grafana':
2254 cephadm_config, deps = self._generate_grafana_config()
2255 extra_args.extend(['--config-json', '-'])
2256 elif daemon_type == 'nfs':
2257 cephadm_config, deps = \
2258 self._generate_nfs_config(daemon_type, daemon_id, host)
2259 extra_args.extend(['--config-json', '-'])
2260 elif daemon_type == 'alertmanager':
2261 cephadm_config, deps = self._generate_alertmanager_config()
2262 extra_args.extend(['--config-json', '-'])
2263 else:
2264 # Ceph.daemons (mon, mgr, mds, osd, etc)
2265 cephadm_config = self._get_config_and_keyring(
2266 daemon_type, daemon_id,
2267 keyring=keyring,
2268 extra_config=extra_config)
2269 extra_args.extend(['--config-json', '-'])
2270
2271 # osd deployments needs an --osd-uuid arg
2272 if daemon_type == 'osd':
2273 if not osd_uuid_map:
2274 osd_uuid_map = self.get_osd_uuid_map()
2275 osd_uuid = osd_uuid_map.get(daemon_id, None)
2276 if not osd_uuid:
2277 raise OrchestratorError('osd.%d not in osdmap' % daemon_id)
2278 extra_args.extend(['--osd-fsid', osd_uuid])
2279
2280 if reconfig:
2281 extra_args.append('--reconfig')
2282 if self.allow_ptrace:
2283 extra_args.append('--allow-ptrace')
2284
2285 self.log.info('%s daemon %s on %s' % (
2286 'Reconfiguring' if reconfig else 'Deploying',
2287 name, host))
2288
2289 out, err, code = self._run_cephadm(
2290 host, name, 'deploy',
2291 [
2292 '--name', name,
2293 ] + extra_args,
2294 stdin=json.dumps(cephadm_config))
2295 if not code and host in self.cache.daemons:
2296 # prime cached service state with what we (should have)
2297 # just created
2298 sd = orchestrator.DaemonDescription()
2299 sd.daemon_type = daemon_type
2300 sd.daemon_id = daemon_id
2301 sd.hostname = host
2302 sd.status = 1
2303 sd.status_desc = 'starting'
2304 self.cache.add_daemon(host, sd)
2305 self.cache.invalidate_host_daemons(host)
2306 self.cache.update_daemon_config_deps(host, name, deps, start_time)
2307 self.cache.save_host(host)
2308 return "{} {} on host '{}'".format(
2309 'Reconfigured' if reconfig else 'Deployed', name, host)
2310
2311 @async_map_completion
2312 def _remove_daemons(self, name, host):
2313 return self._remove_daemon(name, host)
2314
2315 def _remove_daemon(self, name, host):
2316 """
2317 Remove a daemon
2318 """
2319 (daemon_type, daemon_id) = name.split('.', 1)
2320 if daemon_type == 'mon':
2321 self._check_safe_to_destroy_mon(daemon_id)
2322
2323 # remove mon from quorum before we destroy the daemon
2324 self.log.info('Removing monitor %s from monmap...' % name)
2325 ret, out, err = self.mon_command({
2326 'prefix': 'mon rm',
2327 'name': daemon_id,
2328 })
2329 if ret:
2330 raise OrchestratorError('failed to remove mon %s from monmap' % (
2331 name))
2332
2333 args = ['--name', name, '--force']
2334 self.log.info('Removing daemon %s from %s' % (name, host))
2335 out, err, code = self._run_cephadm(
2336 host, name, 'rm-daemon', args)
2337 if not code:
2338 # remove item from cache
2339 self.cache.rm_daemon(host, name)
2340 self.cache.invalidate_host_daemons(host)
2341 return "Removed {} from host '{}'".format(name, host)
2342
2343 def _apply_service(self, spec):
2344 """
2345 Schedule a service. Deploy new daemons or remove old ones, depending
2346 on the target label and count specified in the placement.
2347 """
2348 daemon_type = spec.service_type
2349 service_name = spec.service_name()
2350 if spec.unmanaged:
2351 self.log.debug('Skipping unmanaged service %s spec' % service_name)
2352 return False
2353 self.log.debug('Applying service %s spec' % service_name)
2354 create_fns = {
2355 'mon': self._create_mon,
2356 'mgr': self._create_mgr,
2357 'osd': self.create_osds,
2358 'mds': self._create_mds,
2359 'rgw': self._create_rgw,
2360 'rbd-mirror': self._create_rbd_mirror,
2361 'nfs': self._create_nfs,
2362 'grafana': self._create_grafana,
2363 'alertmanager': self._create_alertmanager,
2364 'prometheus': self._create_prometheus,
2365 'node-exporter': self._create_node_exporter,
2366 'crash': self._create_crash,
2367 }
2368 config_fns = {
2369 'mds': self._config_mds,
2370 'rgw': self._config_rgw,
2371 'nfs': self._config_nfs,
2372 }
2373 create_func = create_fns.get(daemon_type, None)
2374 if not create_func:
2375 self.log.debug('unrecognized service type %s' % daemon_type)
2376 return False
2377 config_func = config_fns.get(daemon_type, None)
2378
2379 daemons = self.cache.get_daemons_by_service(service_name)
2380
2381 public_network = None
2382 if daemon_type == 'mon':
2383 ret, out, err = self.mon_command({
2384 'prefix': 'config get',
2385 'who': 'mon',
2386 'key': 'public_network',
2387 })
2388 if '/' in out:
2389 public_network = out.strip()
2390 self.log.debug('mon public_network is %s' % public_network)
2391
2392 def matches_network(host):
2393 # type: (str) -> bool
2394 if not public_network:
2395 return False
2396 # make sure we have 1 or more IPs for that network on that
2397 # host
2398 return len(self.cache.networks[host].get(public_network, [])) > 0
2399
2400 hosts = HostAssignment(
2401 spec=spec,
2402 get_hosts_func=self._get_hosts,
2403 get_daemons_func=self.cache.get_daemons_by_service,
2404 filter_new_host=matches_network if daemon_type == 'mon' else None,
2405 ).place()
2406
2407 r = False
2408
2409 if daemon_type == 'osd':
2410 return False if create_func(spec) else True # type: ignore
2411
2412 # sanity check
2413 if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
2414 self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
2415 return False
2416
2417 # add any?
2418 did_config = False
2419 hosts_with_daemons = {d.hostname for d in daemons}
2420 self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
2421 for host, network, name in hosts:
2422 if host not in hosts_with_daemons:
2423 if not did_config and config_func:
2424 config_func(spec)
2425 did_config = True
2426 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2427 spec.service_id, name)
2428 self.log.debug('Placing %s.%s on host %s' % (
2429 daemon_type, daemon_id, host))
2430 if daemon_type == 'mon':
2431 create_func(daemon_id, host, network) # type: ignore
2432 elif daemon_type == 'nfs':
2433 create_func(daemon_id, host, spec) # type: ignore
2434 else:
2435 create_func(daemon_id, host) # type: ignore
2436
2437 # add to daemon list so next name(s) will also be unique
2438 sd = orchestrator.DaemonDescription(
2439 hostname=host,
2440 daemon_type=daemon_type,
2441 daemon_id=daemon_id,
2442 )
2443 daemons.append(sd)
2444 r = True
2445
2446 # remove any?
2447 target_hosts = [h.hostname for h in hosts]
2448 for d in daemons:
2449 if d.hostname not in target_hosts:
2450 # NOTE: we are passing the 'force' flag here, which means
2451 # we can delete a mon instances data.
2452 self._remove_daemon(d.name(), d.hostname)
2453 r = True
2454
2455 return r
2456
2457 def _apply_all_services(self):
2458 r = False
2459 specs = [] # type: List[ServiceSpec]
2460 for sn, spec in self.spec_store.specs.items():
2461 specs.append(spec)
2462 for spec in specs:
2463 try:
2464 if self._apply_service(spec):
2465 r = True
2466 except Exception as e:
2467 self.log.warning('Failed to apply %s spec %s: %s' % (
2468 spec.service_name(), spec, e))
2469 return r
2470
2471 def _check_daemons(self):
2472 # get monmap mtime so we can refresh configs when mons change
2473 monmap = self.get('mon_map')
2474 last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
2475 monmap['modified'], CEPH_DATEFMT)
2476 if last_monmap and last_monmap > datetime.datetime.utcnow():
2477 last_monmap = None # just in case clocks are skewed
2478
2479 daemons = self.cache.get_daemons()
2480 grafanas = [] # type: List[orchestrator.DaemonDescription]
2481 for dd in daemons:
2482 # orphan?
2483 spec = self.spec_store.specs.get(dd.service_name(), None)
2484 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
2485 # (mon and mgr specs should always exist; osds aren't matched
2486 # to a service spec)
2487 self.log.info('Removing orphan daemon %s...' % dd.name())
2488 self._remove_daemon(dd.name(), dd.hostname)
2489
2490 # ignore unmanaged services
2491 if not spec or spec.unmanaged:
2492 continue
2493
2494 # dependencies?
2495 if dd.daemon_type == 'grafana':
2496 # put running instances at the front of the list
2497 grafanas.insert(0, dd)
2498 deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
2499 last_deps, last_config = self.cache.get_daemon_last_config_deps(
2500 dd.hostname, dd.name())
2501 if last_deps is None:
2502 last_deps = []
2503 reconfig = False
2504 if not last_config:
2505 self.log.info('Reconfiguring %s (unknown last config time)...'% (
2506 dd.name()))
2507 reconfig = True
2508 elif last_deps != deps:
2509 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
2510 deps))
2511 self.log.info('Reconfiguring %s (dependencies changed)...' % (
2512 dd.name()))
2513 reconfig = True
2514 elif last_monmap and \
2515 last_monmap > last_config and \
2516 dd.daemon_type in CEPH_TYPES:
2517 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
2518 reconfig = True
2519 if reconfig:
2520 self._create_daemon(dd.daemon_type, dd.daemon_id,
2521 dd.hostname, reconfig=True)
2522
2523 # make sure the dashboard [does not] references grafana
2524 try:
2525 current_url = self.get_module_option_ex('dashboard',
2526 'GRAFANA_API_URL')
2527 if grafanas:
2528 host = grafanas[0].hostname
2529 url = 'https://%s:3000' % (self.inventory[host].get('addr',
2530 host))
2531 if current_url != url:
2532 self.log.info('Setting dashboard grafana config to %s' % url)
2533 self.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
2534 url)
2535 # FIXME: is it a signed cert??
2536 except Exception as e:
2537 self.log.debug('got exception fetching dashboard grafana state: %s',
2538 e)
2539
2540 def _add_daemon(self, daemon_type, spec,
2541 create_func, config_func=None):
2542 """
2543 Add (and place) a daemon. Require explicit host placement. Do not
2544 schedule, and do not apply the related scheduling limitations.
2545 """
2546 self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement))
2547 if not spec.placement.hosts:
2548 raise OrchestratorError('must specify host(s) to deploy on')
2549 count = spec.placement.count or len(spec.placement.hosts)
2550 daemons = self.cache.get_daemons_by_service(spec.service_name())
2551 return self._create_daemons(daemon_type, spec, daemons,
2552 spec.placement.hosts, count,
2553 create_func, config_func)
2554
2555 def _create_daemons(self, daemon_type, spec, daemons,
2556 hosts, count,
2557 create_func, config_func=None):
2558 if count > len(hosts):
2559 raise OrchestratorError('too few hosts: want %d, have %s' % (
2560 count, hosts))
2561
2562 if config_func:
2563 config_func(spec)
2564
2565 args = [] # type: List[tuple]
2566 for host, network, name in hosts:
2567 daemon_id = self.get_unique_name(daemon_type, host, daemons,
2568 spec.service_id, name)
2569 self.log.debug('Placing %s.%s on host %s' % (
2570 daemon_type, daemon_id, host))
2571 if daemon_type == 'mon':
2572 args.append((daemon_id, host, network)) # type: ignore
2573 elif daemon_type == 'nfs':
2574 args.append((daemon_id, host, spec)) # type: ignore
2575 else:
2576 args.append((daemon_id, host)) # type: ignore
2577
2578 # add to daemon list so next name(s) will also be unique
2579 sd = orchestrator.DaemonDescription(
2580 hostname=host,
2581 daemon_type=daemon_type,
2582 daemon_id=daemon_id,
2583 )
2584 daemons.append(sd)
2585
2586 @async_map_completion
2587 def create_func_map(*args):
2588 return create_func(*args)
2589
2590 return create_func_map(args)
2591
2592 @trivial_completion
2593 def apply_mon(self, spec):
2594 return self._apply(spec)
2595
2596 def _create_mon(self, name, host, network):
2597 """
2598 Create a new monitor on the given host.
2599 """
2600 # get mon. key
2601 ret, keyring, err = self.mon_command({
2602 'prefix': 'auth get',
2603 'entity': 'mon.',
2604 })
2605
2606 extra_config = '[mon.%s]\n' % name
2607 if network:
2608 # infer whether this is a CIDR network, addrvec, or plain IP
2609 if '/' in network:
2610 extra_config += 'public network = %s\n' % network
2611 elif network.startswith('[v') and network.endswith(']'):
2612 extra_config += 'public addrv = %s\n' % network
2613 elif ':' not in network:
2614 extra_config += 'public addr = %s\n' % network
2615 else:
2616 raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
2617 else:
2618 # try to get the public_network from the config
2619 ret, network, err = self.mon_command({
2620 'prefix': 'config get',
2621 'who': 'mon',
2622 'key': 'public_network',
2623 })
2624 network = network.strip() # type: ignore
2625 if ret:
2626 raise RuntimeError('Unable to fetch cluster_network config option')
2627 if not network:
2628 raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
2629 if '/' not in network:
2630 raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
2631 extra_config += 'public network = %s\n' % network
2632
2633 return self._create_daemon('mon', name, host,
2634 keyring=keyring,
2635 extra_config=extra_config)
2636
2637 def add_mon(self, spec):
2638 # type: (ServiceSpec) -> orchestrator.Completion
2639 return self._add_daemon('mon', spec, self._create_mon)
2640
2641 def _create_mgr(self, mgr_id, host):
2642 """
2643 Create a new manager instance on a host.
2644 """
2645 # get mgr. key
2646 ret, keyring, err = self.mon_command({
2647 'prefix': 'auth get-or-create',
2648 'entity': 'mgr.%s' % mgr_id,
2649 'caps': ['mon', 'profile mgr',
2650 'osd', 'allow *',
2651 'mds', 'allow *'],
2652 })
2653
2654 return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
2655
2656 def add_mgr(self, spec):
2657 # type: (ServiceSpec) -> orchestrator.Completion
2658 return self._add_daemon('mgr', spec, self._create_mgr)
2659
2660 def _apply(self, spec: ServiceSpec) -> str:
2661 if spec.placement.is_empty():
2662 # fill in default placement
2663 defaults = {
2664 'mon': PlacementSpec(count=5),
2665 'mgr': PlacementSpec(count=2),
2666 'mds': PlacementSpec(count=2),
2667 'rgw': PlacementSpec(count=2),
2668 'rbd-mirror': PlacementSpec(count=2),
2669 'nfs': PlacementSpec(count=1),
2670 'grafana': PlacementSpec(count=1),
2671 'alertmanager': PlacementSpec(count=1),
2672 'prometheus': PlacementSpec(count=1),
2673 'node-exporter': PlacementSpec(host_pattern='*'),
2674 'crash': PlacementSpec(host_pattern='*'),
2675 }
2676 spec.placement = defaults[spec.service_type]
2677 elif spec.service_type in ['mon', 'mgr'] and \
2678 spec.placement.count is not None and \
2679 spec.placement.count < 1:
2680 raise OrchestratorError('cannot scale %s service below 1' % (
2681 spec.service_type))
2682
2683 HostAssignment(
2684 spec=spec,
2685 get_hosts_func=self._get_hosts,
2686 get_daemons_func=self.cache.get_daemons_by_service,
2687 ).validate()
2688
2689 self.log.info('Saving service %s spec with placement %s' % (
2690 spec.service_name(), spec.placement.pretty_str()))
2691 self.spec_store.save(spec)
2692 self._kick_serve_loop()
2693 return "Scheduled %s update..." % spec.service_type
2694
2695 @trivial_completion
2696 def apply(self, specs: List[ServiceSpec]):
2697 return [self._apply(spec) for spec in specs]
2698
2699 @trivial_completion
2700 def apply_mgr(self, spec):
2701 return self._apply(spec)
2702
2703 def add_mds(self, spec: ServiceSpec):
2704 return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
2705
2706 @trivial_completion
2707 def apply_mds(self, spec: ServiceSpec):
2708 return self._apply(spec)
2709
2710 def _config_mds(self, spec):
2711 # ensure mds_join_fs is set for these daemons
2712 assert spec.service_id
2713 ret, out, err = self.mon_command({
2714 'prefix': 'config set',
2715 'who': 'mds.' + spec.service_id,
2716 'name': 'mds_join_fs',
2717 'value': spec.service_id,
2718 })
2719
2720 def _create_mds(self, mds_id, host):
2721 # get mgr. key
2722 ret, keyring, err = self.mon_command({
2723 'prefix': 'auth get-or-create',
2724 'entity': 'mds.' + mds_id,
2725 'caps': ['mon', 'profile mds',
2726 'osd', 'allow rwx',
2727 'mds', 'allow'],
2728 })
2729 return self._create_daemon('mds', mds_id, host, keyring=keyring)
2730
2731 def add_rgw(self, spec):
2732 return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
2733
2734 def _config_rgw(self, spec):
2735 # ensure rgw_realm and rgw_zone is set for these daemons
2736 ret, out, err = self.mon_command({
2737 'prefix': 'config set',
2738 'who': 'client.rgw.' + spec.service_id,
2739 'name': 'rgw_zone',
2740 'value': spec.rgw_zone,
2741 })
2742 ret, out, err = self.mon_command({
2743 'prefix': 'config set',
2744 'who': 'client.rgw.' + spec.rgw_realm,
2745 'name': 'rgw_realm',
2746 'value': spec.rgw_realm,
2747 })
2748 if spec.ssl:
2749 v = 'beast ssl_port=%d' % spec.get_port()
2750 else:
2751 v = 'beast port=%d' % spec.get_port()
2752 ret, out, err = self.mon_command({
2753 'prefix': 'config set',
2754 'who': 'client.rgw.' + spec.service_id,
2755 'name': 'rgw_frontends',
2756 'value': v,
2757 })
2758
2759 def _create_rgw(self, rgw_id, host):
2760 ret, keyring, err = self.mon_command({
2761 'prefix': 'auth get-or-create',
2762 'entity': 'client.rgw.' + rgw_id,
2763 'caps': ['mon', 'allow rw',
2764 'mgr', 'allow rw',
2765 'osd', 'allow rwx'],
2766 })
2767 return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
2768
2769 @trivial_completion
2770 def apply_rgw(self, spec):
2771 return self._apply(spec)
2772
2773 def add_rbd_mirror(self, spec):
2774 return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
2775
2776 def _create_rbd_mirror(self, daemon_id, host):
2777 ret, keyring, err = self.mon_command({
2778 'prefix': 'auth get-or-create',
2779 'entity': 'client.rbd-mirror.' + daemon_id,
2780 'caps': ['mon', 'profile rbd-mirror',
2781 'osd', 'profile rbd'],
2782 })
2783 return self._create_daemon('rbd-mirror', daemon_id, host,
2784 keyring=keyring)
2785
2786 @trivial_completion
2787 def apply_rbd_mirror(self, spec):
2788 return self._apply(spec)
2789
2790 def _generate_nfs_config(self, daemon_type, daemon_id, host):
2791 # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
2792 deps = [] # type: List[str]
2793
2794 # find the matching NFSServiceSpec
2795 # TODO: find the spec and pass via _create_daemon instead ??
2796 service_name = self.get_service_name(daemon_type, daemon_id, host)
2797 specs = self.spec_store.find(service_name)
2798 if not specs:
2799 raise OrchestratorError('Cannot find service spec %s' % (service_name))
2800 elif len(specs) > 1:
2801 raise OrchestratorError('Found multiple service specs for %s' % (service_name))
2802 else:
2803 # cast to keep mypy happy
2804 spec = cast(NFSServiceSpec, specs[0])
2805
2806 nfs = NFSGanesha(self, daemon_id, spec)
2807
2808 # create the keyring
2809 entity = nfs.get_keyring_entity()
2810 keyring = nfs.get_or_create_keyring(entity=entity)
2811
2812 # update the caps after get-or-create, the keyring might already exist!
2813 nfs.update_keyring_caps(entity=entity)
2814
2815 # create the rados config object
2816 nfs.create_rados_config_obj()
2817
2818 # generate the cephadm config
2819 cephadm_config = nfs.get_cephadm_config()
2820 cephadm_config.update(
2821 self._get_config_and_keyring(
2822 daemon_type, daemon_id,
2823 keyring=keyring))
2824
2825 return cephadm_config, deps
2826
2827 def add_nfs(self, spec):
2828 return self._add_daemon('nfs', spec, self._create_nfs, self._config_nfs)
2829
2830 def _config_nfs(self, spec):
2831 logger.info('Saving service %s spec with placement %s' % (
2832 spec.service_name(), spec.placement.pretty_str()))
2833 self.spec_store.save(spec)
2834
2835 def _create_nfs(self, daemon_id, host, spec):
2836 return self._create_daemon('nfs', daemon_id, host)
2837
2838 @trivial_completion
2839 def apply_nfs(self, spec):
2840 return self._apply(spec)
2841
2842 def _generate_prometheus_config(self):
2843 # type: () -> Tuple[Dict[str, Any], List[str]]
2844 deps = [] # type: List[str]
2845
2846 # scrape mgrs
2847 mgr_scrape_list = []
2848 mgr_map = self.get('mgr_map')
2849 port = None
2850 t = mgr_map.get('services', {}).get('prometheus', None)
2851 if t:
2852 t = t.split('/')[2]
2853 mgr_scrape_list.append(t)
2854 port = '9283'
2855 if ':' in t:
2856 port = t.split(':')[1]
2857 # scan all mgrs to generate deps and to get standbys too.
2858 # assume that they are all on the same port as the active mgr.
2859 for dd in self.cache.get_daemons_by_service('mgr'):
2860 # we consider the mgr a dep even if the prometheus module is
2861 # disabled in order to be consistent with _calc_daemon_deps().
2862 deps.append(dd.name())
2863 if not port:
2864 continue
2865 if dd.daemon_id == self.get_mgr_id():
2866 continue
2867 hi = self.inventory.get(dd.hostname, {})
2868 addr = hi.get('addr', dd.hostname)
2869 mgr_scrape_list.append(addr.split(':')[0] + ':' + port)
2870
2871 # scrape node exporters
2872 node_configs = ''
2873 for dd in self.cache.get_daemons_by_service('node-exporter'):
2874 deps.append(dd.name())
2875 hi = self.inventory.get(dd.hostname, {})
2876 addr = hi.get('addr', dd.hostname)
2877 if not node_configs:
2878 node_configs = """
2879 - job_name: 'node'
2880 static_configs:
2881 """
2882 node_configs += """ - targets: {}
2883 labels:
2884 instance: '{}'
2885 """.format([addr.split(':')[0] + ':9100'],
2886 dd.hostname)
2887
2888 # scrape alert managers
2889 alertmgr_configs = ""
2890 alertmgr_targets = []
2891 for dd in self.cache.get_daemons_by_service('alertmanager'):
2892 deps.append(dd.name())
2893 hi = self.inventory.get(dd.hostname, {})
2894 addr = hi.get('addr', dd.hostname)
2895 alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0]))
2896 if alertmgr_targets:
2897 alertmgr_configs = """alerting:
2898 alertmanagers:
2899 - scheme: http
2900 path_prefix: /alertmanager
2901 static_configs:
2902 - targets: [{}]
2903 """.format(", ".join(alertmgr_targets))
2904
2905 # generate the prometheus configuration
2906 r = {
2907 'files': {
2908 'prometheus.yml': """# generated by cephadm
2909 global:
2910 scrape_interval: 5s
2911 evaluation_interval: 10s
2912 rule_files:
2913 - /etc/prometheus/alerting/*
2914 {alertmgr_configs}
2915 scrape_configs:
2916 - job_name: 'ceph'
2917 static_configs:
2918 - targets: {mgr_scrape_list}
2919 labels:
2920 instance: 'ceph_cluster'
2921 {node_configs}
2922 """.format(
2923 mgr_scrape_list=str(mgr_scrape_list),
2924 node_configs=str(node_configs),
2925 alertmgr_configs=str(alertmgr_configs)
2926 ),
2927 },
2928 }
2929
2930 # include alerts, if present in the container
2931 if os.path.exists(self.prometheus_alerts_path):
2932 with open(self.prometheus_alerts_path, "r") as f:
2933 alerts = f.read()
2934 r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
2935
2936 return r, sorted(deps)
2937
2938 def _generate_grafana_config(self):
2939 # type: () -> Tuple[Dict[str, Any], List[str]]
2940 deps = [] # type: List[str]
2941 def generate_grafana_ds_config(hosts: List[str]) -> str:
2942 config = '''# generated by cephadm
2943 deleteDatasources:
2944 {delete_data_sources}
2945
2946 datasources:
2947 {data_sources}
2948 '''
2949 delete_ds_template = '''
2950 - name: '{name}'
2951 orgId: 1\n'''.lstrip('\n')
2952 ds_template = '''
2953 - name: '{name}'
2954 type: 'prometheus'
2955 access: 'proxy'
2956 orgId: 1
2957 url: 'http://{host}:9095'
2958 basicAuth: false
2959 isDefault: {is_default}
2960 editable: false\n'''.lstrip('\n')
2961
2962 delete_data_sources = ''
2963 data_sources = ''
2964 for i, host in enumerate(hosts):
2965 name = "Dashboard %d" % (i + 1)
2966 data_sources += ds_template.format(
2967 name=name,
2968 host=host,
2969 is_default=str(i == 0).lower()
2970 )
2971 delete_data_sources += delete_ds_template.format(
2972 name=name
2973 )
2974 return config.format(
2975 delete_data_sources=delete_data_sources,
2976 data_sources=data_sources,
2977 )
2978
2979 prom_services = [] # type: List[str]
2980 for dd in self.cache.get_daemons_by_service('prometheus'):
2981 prom_services.append(dd.hostname)
2982 deps.append(dd.name())
2983
2984 cert = self.get_store('grafana_crt')
2985 pkey = self.get_store('grafana_key')
2986 if cert and pkey:
2987 try:
2988 verify_tls(cert, pkey)
2989 except ServerConfigException as e:
2990 logger.warning('Provided grafana TLS certificates invalid: %s', str(e))
2991 cert, pkey = None, None
2992 if not (cert and pkey):
2993 cert, pkey = create_self_signed_cert('Ceph', 'cephadm')
2994 self.set_store('grafana_crt', cert)
2995 self.set_store('grafana_key', pkey)
2996
2997 config_file = {
2998 'files': {
2999 "grafana.ini": """# generated by cephadm
3000 [users]
3001 default_theme = light
3002 [auth.anonymous]
3003 enabled = true
3004 org_name = 'Main Org.'
3005 org_role = 'Viewer'
3006 [server]
3007 domain = 'bootstrap.storage.lab'
3008 protocol = https
3009 cert_file = /etc/grafana/certs/cert_file
3010 cert_key = /etc/grafana/certs/cert_key
3011 http_port = 3000
3012 [security]
3013 admin_user = admin
3014 admin_password = admin
3015 allow_embedding = true
3016 """,
3017 'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services),
3018 'certs/cert_file': '# generated by cephadm\n%s' % cert,
3019 'certs/cert_key': '# generated by cephadm\n%s' % pkey,
3020 }
3021 }
3022 return config_file, sorted(deps)
3023
3024 def _get_dashboard_url(self):
3025 # type: () -> str
3026 return self.get('mgr_map').get('services', {}).get('dashboard', '')
3027
3028 def _generate_alertmanager_config(self):
3029 # type: () -> Tuple[Dict[str, Any], List[str]]
3030 deps = [] # type: List[str]
3031
3032 # dashboard(s)
3033 dashboard_urls = []
3034 mgr_map = self.get('mgr_map')
3035 port = None
3036 proto = None # http: or https:
3037 url = mgr_map.get('services', {}).get('dashboard', None)
3038 if url:
3039 dashboard_urls.append(url)
3040 proto = url.split('/')[0]
3041 port = url.split('/')[2].split(':')[1]
3042 # scan all mgrs to generate deps and to get standbys too.
3043 # assume that they are all on the same port as the active mgr.
3044 for dd in self.cache.get_daemons_by_service('mgr'):
3045 # we consider mgr a dep even if the dashboard is disabled
3046 # in order to be consistent with _calc_daemon_deps().
3047 deps.append(dd.name())
3048 if not port:
3049 continue
3050 if dd.daemon_id == self.get_mgr_id():
3051 continue
3052 hi = self.inventory.get(dd.hostname, {})
3053 addr = hi.get('addr', dd.hostname)
3054 dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0],
3055 port))
3056
3057 yml = """# generated by cephadm
3058 # See https://prometheus.io/docs/alerting/configuration/ for documentation.
3059
3060 global:
3061 resolve_timeout: 5m
3062
3063 route:
3064 group_by: ['alertname']
3065 group_wait: 10s
3066 group_interval: 10s
3067 repeat_interval: 1h
3068 receiver: 'ceph-dashboard'
3069 receivers:
3070 - name: 'ceph-dashboard'
3071 webhook_configs:
3072 {urls}
3073 """.format(
3074 urls='\n'.join(
3075 [" - url: '{}api/prometheus_receiver'".format(u)
3076 for u in dashboard_urls]
3077 ))
3078 peers = []
3079 port = '9094'
3080 for dd in self.cache.get_daemons_by_service('alertmanager'):
3081 deps.append(dd.name())
3082 hi = self.inventory.get(dd.hostname, {})
3083 addr = hi.get('addr', dd.hostname)
3084 peers.append(addr.split(':')[0] + ':' + port)
3085 return {
3086 "files": {
3087 "alertmanager.yml": yml
3088 },
3089 "peers": peers
3090 }, sorted(deps)
3091
3092 def add_prometheus(self, spec):
3093 return self._add_daemon('prometheus', spec, self._create_prometheus)
3094
3095 def _create_prometheus(self, daemon_id, host):
3096 return self._create_daemon('prometheus', daemon_id, host)
3097
3098 @trivial_completion
3099 def apply_prometheus(self, spec):
3100 return self._apply(spec)
3101
3102 def add_node_exporter(self, spec):
3103 # type: (ServiceSpec) -> AsyncCompletion
3104 return self._add_daemon('node-exporter', spec,
3105 self._create_node_exporter)
3106
3107 @trivial_completion
3108 def apply_node_exporter(self, spec):
3109 return self._apply(spec)
3110
3111 def _create_node_exporter(self, daemon_id, host):
3112 return self._create_daemon('node-exporter', daemon_id, host)
3113
3114 def add_crash(self, spec):
3115 # type: (ServiceSpec) -> AsyncCompletion
3116 return self._add_daemon('crash', spec,
3117 self._create_crash)
3118
3119 @trivial_completion
3120 def apply_crash(self, spec):
3121 return self._apply(spec)
3122
3123 def _create_crash(self, daemon_id, host):
3124 ret, keyring, err = self.mon_command({
3125 'prefix': 'auth get-or-create',
3126 'entity': 'client.crash.' + host,
3127 'caps': ['mon', 'profile crash',
3128 'mgr', 'profile crash'],
3129 })
3130 return self._create_daemon('crash', daemon_id, host, keyring=keyring)
3131
3132 def add_grafana(self, spec):
3133 # type: (ServiceSpec) -> AsyncCompletion
3134 return self._add_daemon('grafana', spec, self._create_grafana)
3135
3136 @trivial_completion
3137 def apply_grafana(self, spec: ServiceSpec):
3138 return self._apply(spec)
3139
3140 def _create_grafana(self, daemon_id, host):
3141 # type: (str, str) -> str
3142 return self._create_daemon('grafana', daemon_id, host)
3143
3144 def add_alertmanager(self, spec):
3145 # type: (ServiceSpec) -> AsyncCompletion
3146 return self._add_daemon('alertmanager', spec, self._create_alertmanager)
3147
3148 @trivial_completion
3149 def apply_alertmanager(self, spec: ServiceSpec):
3150 return self._apply(spec)
3151
3152 def _create_alertmanager(self, daemon_id, host):
3153 return self._create_daemon('alertmanager', daemon_id, host)
3154
3155
3156 def _get_container_image_id(self, image_name):
3157 # pick a random host...
3158 host = None
3159 for host_name, hi in self.inventory.items():
3160 host = host_name
3161 break
3162 if not host:
3163 raise OrchestratorError('no hosts defined')
3164 out, err, code = self._run_cephadm(
3165 host, None, 'pull', [],
3166 image=image_name,
3167 no_fsid=True,
3168 error_ok=True)
3169 if code:
3170 raise OrchestratorError('Failed to pull %s on %s: %s' % (
3171 image_name, host, '\n'.join(out)))
3172 j = json.loads('\n'.join(out))
3173 image_id = j.get('image_id')
3174 ceph_version = j.get('ceph_version')
3175 self.log.debug('image %s -> id %s version %s' %
3176 (image_name, image_id, ceph_version))
3177 return image_id, ceph_version
3178
3179 @trivial_completion
3180 def upgrade_check(self, image, version):
3181 if version:
3182 target_name = self.container_image_base + ':v' + version
3183 elif image:
3184 target_name = image
3185 else:
3186 raise OrchestratorError('must specify either image or version')
3187
3188 target_id, target_version = self._get_container_image_id(target_name)
3189 self.log.debug('Target image %s id %s version %s' % (
3190 target_name, target_id, target_version))
3191 r = {
3192 'target_name': target_name,
3193 'target_id': target_id,
3194 'target_version': target_version,
3195 'needs_update': dict(),
3196 'up_to_date': list(),
3197 }
3198 for host, dm in self.cache.daemons.items():
3199 for name, dd in dm.items():
3200 if target_id == dd.container_image_id:
3201 r['up_to_date'].append(dd.name())
3202 else:
3203 r['needs_update'][dd.name()] = {
3204 'current_name': dd.container_image_name,
3205 'current_id': dd.container_image_id,
3206 'current_version': dd.version,
3207 }
3208 return json.dumps(r, indent=4, sort_keys=True)
3209
3210 @trivial_completion
3211 def upgrade_status(self):
3212 r = orchestrator.UpgradeStatusSpec()
3213 if self.upgrade_state:
3214 r.target_image = self.upgrade_state.get('target_name')
3215 r.in_progress = True
3216 if self.upgrade_state.get('error'):
3217 r.message = 'Error: ' + self.upgrade_state.get('error')
3218 elif self.upgrade_state.get('paused'):
3219 r.message = 'Upgrade paused'
3220 return r
3221
3222 @trivial_completion
3223 def upgrade_start(self, image, version):
3224 if self.mode != 'root':
3225 raise OrchestratorError('upgrade is not supported in %s mode' % (
3226 self.mode))
3227 if version:
3228 try:
3229 (major, minor, patch) = version.split('.')
3230 assert int(minor) >= 0
3231 assert int(patch) >= 0
3232 except:
3233 raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
3234 if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
3235 raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
3236 target_name = self.container_image_base + ':v' + version
3237 elif image:
3238 target_name = image
3239 else:
3240 raise OrchestratorError('must specify either image or version')
3241 if self.upgrade_state:
3242 if self.upgrade_state.get('target_name') != target_name:
3243 raise OrchestratorError(
3244 'Upgrade to %s (not %s) already in progress' %
3245 (self.upgrade_state.get('target_name'), target_name))
3246 if self.upgrade_state.get('paused'):
3247 del self.upgrade_state['paused']
3248 self._save_upgrade_state()
3249 return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
3250 return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
3251 self.upgrade_state = {
3252 'target_name': target_name,
3253 'progress_id': str(uuid.uuid4()),
3254 }
3255 self._update_upgrade_progress(0.0)
3256 self._save_upgrade_state()
3257 self._clear_upgrade_health_checks()
3258 self.event.set()
3259 return 'Initiating upgrade to %s' % (target_name)
3260
3261 @trivial_completion
3262 def upgrade_pause(self):
3263 if not self.upgrade_state:
3264 raise OrchestratorError('No upgrade in progress')
3265 if self.upgrade_state.get('paused'):
3266 return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
3267 self.upgrade_state['paused'] = True
3268 self._save_upgrade_state()
3269 return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
3270
3271 @trivial_completion
3272 def upgrade_resume(self):
3273 if not self.upgrade_state:
3274 raise OrchestratorError('No upgrade in progress')
3275 if not self.upgrade_state.get('paused'):
3276 return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
3277 del self.upgrade_state['paused']
3278 self._save_upgrade_state()
3279 self.event.set()
3280 return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
3281
3282 @trivial_completion
3283 def upgrade_stop(self):
3284 if not self.upgrade_state:
3285 return 'No upgrade in progress'
3286 target_name = self.upgrade_state.get('target_name')
3287 if 'progress_id' in self.upgrade_state:
3288 self.remote('progress', 'complete',
3289 self.upgrade_state['progress_id'])
3290 self.upgrade_state = None
3291 self._save_upgrade_state()
3292 self._clear_upgrade_health_checks()
3293 self.event.set()
3294 return 'Stopped upgrade to %s' % target_name
3295
3296 @trivial_completion
3297 def remove_osds(self, osd_ids: List[str],
3298 replace: bool = False,
3299 force: bool = False):
3300 """
3301 Takes a list of OSDs and schedules them for removal.
3302 The function that takes care of the actual removal is
3303 _remove_osds_bg().
3304 """
3305
3306 daemons = self.cache.get_daemons_by_service('osd')
3307 found: Set[OSDRemoval] = set()
3308 for daemon in daemons:
3309 if daemon.daemon_id not in osd_ids:
3310 continue
3311 found.add(OSDRemoval(daemon.daemon_id, replace, force,
3312 daemon.hostname, daemon.name(),
3313 datetime.datetime.utcnow(), -1))
3314
3315 not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
3316 if not_found:
3317 raise OrchestratorError('Unable to find OSD: %s' % not_found)
3318
3319 self.rm_util.queue_osds_for_removal(found)
3320
3321 # trigger the serve loop to initiate the removal
3322 self._kick_serve_loop()
3323 return "Scheduled OSD(s) for removal"
3324
3325 @trivial_completion
3326 def remove_osds_status(self):
3327 """
3328 The CLI call to retrieve an osd removal report
3329 """
3330 return self.rm_util.report
3331
3332 @trivial_completion
3333 def list_specs(self, service_name=None):
3334 """
3335 Loads all entries from the service_spec mon_store root.
3336 """
3337 return self.spec_store.find(service_name=service_name)
3338
3339
3340 class BaseScheduler(object):
3341 """
3342 Base Scheduler Interface
3343
3344 * requires a placement_spec
3345
3346 `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
3347 """
3348
3349 def __init__(self, placement_spec):
3350 # type: (PlacementSpec) -> None
3351 self.placement_spec = placement_spec
3352
3353 def place(self, host_pool, count=None):
3354 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3355 raise NotImplementedError
3356
3357
3358 class SimpleScheduler(BaseScheduler):
3359 """
3360 The most simple way to pick/schedule a set of hosts.
3361 1) Shuffle the provided host_pool
3362 2) Select from list up to :count
3363 """
3364 def __init__(self, placement_spec):
3365 super(SimpleScheduler, self).__init__(placement_spec)
3366
3367 def place(self, host_pool, count=None):
3368 # type: (List, Optional[int]) -> List[HostPlacementSpec]
3369 if not host_pool:
3370 return []
3371 host_pool = [x for x in host_pool]
3372 # shuffle for pseudo random selection
3373 random.shuffle(host_pool)
3374 return host_pool[:count]
3375
3376
3377 class HostAssignment(object):
3378 """
3379 A class to detect if hosts are being passed imperative or declarative
3380 If the spec is populated via the `hosts/hosts` field it will not load
3381 any hosts into the list.
3382 If the spec isn't populated, i.e. when only num or label is present (declarative)
3383 it will use the provided `get_host_func` to load it from the inventory.
3384
3385 Schedulers can be assigned to pick hosts from the pool.
3386 """
3387
3388 def __init__(self,
3389 spec, # type: ServiceSpec
3390 get_hosts_func, # type: Callable[[Optional[str]],List[str]]
3391 get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
3392
3393 filter_new_host=None, # type: Optional[Callable[[str],bool]]
3394 scheduler=None, # type: Optional[BaseScheduler]
3395 ):
3396 assert spec and get_hosts_func and get_daemons_func
3397 self.spec = spec # type: ServiceSpec
3398 self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
3399 self.get_hosts_func = get_hosts_func
3400 self.get_daemons_func = get_daemons_func
3401 self.filter_new_host = filter_new_host
3402 self.service_name = spec.service_name()
3403
3404
3405 def validate(self):
3406 self.spec.validate()
3407
3408 if self.spec.placement.hosts:
3409 explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
3410 unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func(None)))
3411 if unknown_hosts:
3412 raise OrchestratorValidationError(
3413 f'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
3414
3415 if self.spec.placement.host_pattern:
3416 pattern_hostnames = self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
3417 if not pattern_hostnames:
3418 raise OrchestratorValidationError(
3419 f'Cannot place {self.spec.one_line_str()}: No matching hosts')
3420
3421 if self.spec.placement.label:
3422 label_hostnames = self.get_hosts_func(self.spec.placement.label)
3423 if not label_hostnames:
3424 raise OrchestratorValidationError(
3425 f'Cannot place {self.spec.one_line_str()}: No matching '
3426 f'hosts for label {self.spec.placement.label}')
3427
3428 def place(self):
3429 # type: () -> List[HostPlacementSpec]
3430 """
3431 Load hosts into the spec.placement.hosts container.
3432 """
3433
3434 self.validate()
3435
3436 # count == 0
3437 if self.spec.placement.count == 0:
3438 return []
3439
3440 # respect any explicit host list
3441 if self.spec.placement.hosts and not self.spec.placement.count:
3442 logger.debug('Provided hosts: %s' % self.spec.placement.hosts)
3443 return self.spec.placement.hosts
3444
3445 # respect host_pattern
3446 if self.spec.placement.host_pattern:
3447 candidates = [
3448 HostPlacementSpec(x, '', '')
3449 for x in self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
3450 ]
3451 logger.debug('All hosts: {}'.format(candidates))
3452 return candidates
3453
3454 count = 0
3455 if self.spec.placement.hosts and \
3456 self.spec.placement.count and \
3457 len(self.spec.placement.hosts) >= self.spec.placement.count:
3458 hosts = self.spec.placement.hosts
3459 logger.debug('place %d over provided host list: %s' % (
3460 count, hosts))
3461 count = self.spec.placement.count
3462 elif self.spec.placement.label:
3463 hosts = [
3464 HostPlacementSpec(x, '', '')
3465 for x in self.get_hosts_func(self.spec.placement.label)
3466 ]
3467 if not self.spec.placement.count:
3468 logger.debug('Labeled hosts: {}'.format(hosts))
3469 return hosts
3470 count = self.spec.placement.count
3471 logger.debug('place %d over label %s: %s' % (
3472 count, self.spec.placement.label, hosts))
3473 else:
3474 hosts = [
3475 HostPlacementSpec(x, '', '')
3476 for x in self.get_hosts_func(None)
3477 ]
3478 if self.spec.placement.count:
3479 count = self.spec.placement.count
3480 else:
3481 # this should be a totally empty spec given all of the
3482 # alternative paths above.
3483 assert self.spec.placement.count is None
3484 assert not self.spec.placement.hosts
3485 assert not self.spec.placement.label
3486 count = 1
3487 logger.debug('place %d over all hosts: %s' % (count, hosts))
3488
3489 # we need to select a subset of the candidates
3490
3491 # if a partial host list is provided, always start with that
3492 if len(self.spec.placement.hosts) < count:
3493 chosen = self.spec.placement.hosts
3494 else:
3495 chosen = []
3496
3497 # prefer hosts that already have services
3498 daemons = self.get_daemons_func(self.service_name)
3499 hosts_with_daemons = {d.hostname for d in daemons}
3500 # calc existing daemons (that aren't already in chosen)
3501 chosen_hosts = [hs.hostname for hs in chosen]
3502 existing = [hs for hs in hosts
3503 if hs.hostname in hosts_with_daemons and \
3504 hs.hostname not in chosen_hosts]
3505 if len(chosen + existing) >= count:
3506 chosen = chosen + self.scheduler.place(
3507 existing,
3508 count - len(chosen))
3509 logger.debug('Hosts with existing daemons: {}'.format(chosen))
3510 return chosen
3511
3512 need = count - len(existing + chosen)
3513 others = [hs for hs in hosts
3514 if hs.hostname not in hosts_with_daemons]
3515 if self.filter_new_host:
3516 old = others
3517 others = [h for h in others if self.filter_new_host(h.hostname)]
3518 logger.debug('filtered %s down to %s' % (old, hosts))
3519 chosen = chosen + self.scheduler.place(others, need)
3520 logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
3521 existing, chosen))
3522 return existing + chosen
3523