import json
import errno
import logging
-import time
-from copy import copy
-from threading import Event
+from collections import defaultdict
from functools import wraps
-
-from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
+from tempfile import TemporaryDirectory
+from threading import Event
import string
-try:
- from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, \
- Any, NamedTuple, Iterator, Set, Sequence
- from typing import TYPE_CHECKING, cast
-except ImportError:
- TYPE_CHECKING = False # just for type checking
-
+from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
+ Any, Set, TYPE_CHECKING, cast
import datetime
import six
import random
import tempfile
import multiprocessing.pool
-import re
import shutil
import subprocess
-import uuid
-from ceph.deployment import inventory, translate
+from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.service_spec import \
- HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
+ NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
from mgr_module import MgrModule, HandleCommandResult
import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
CLICommandMeta
+from orchestrator._interface import GenericSpec
from . import remotes
from . import utils
-from .nfs import NFSGanesha
-from .osd import RemoveUtil, OSDRemoval
-
+from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
+ RbdMirrorService, CrashService, CephadmService
+from .services.iscsi import IscsiService
+from .services.nfs import NFSService
+from .services.osd import RemoveUtil, OSDRemoval, OSDService
+from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
+ NodeExporterService
+from .schedule import HostAssignment
+from .inventory import Inventory, SpecStore, HostCache
+from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
+from .template import TemplateMgr
try:
import remoto
+ # NOTE(mattoliverau) Patch remoto until remoto PR
+ # (https://github.com/alfredodeza/remoto/pull/56) lands
+ from distutils.version import StrictVersion
+ if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
+ def remoto_has_connection(self):
+ return self.gateway.hasreceiver()
+
+ from remoto.backends import BaseConnection
+ BaseConnection.has_connection = remoto_has_connection
import remoto.process
import execnet.gateway_bootstrap
except ImportError as e:
logger = logging.getLogger(__name__)
+T = TypeVar('T')
+
DEFAULT_SSH_CONFIG = """
Host *
User root
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
-HOST_CACHE_PREFIX = "host."
-SPEC_STORE_PREFIX = "spec."
-
-# ceph daemon types that use the ceph container image.
-# NOTE: listed in upgrade order!
-CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
-# for py2 compat
-try:
- from tempfile import TemporaryDirectory # py3
-except ImportError:
- # define a minimal (but sufficient) equivalent for <= py 3.2
- class TemporaryDirectory(object): # type: ignore
- def __init__(self):
- self.name = tempfile.mkdtemp()
-
- def __enter__(self):
- if not self.name:
- self.name = tempfile.mkdtemp()
- return self.name
-
- def cleanup(self):
- shutil.rmtree(self.name)
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.cleanup()
-
-
-class SpecStore():
- def __init__(self, mgr):
- # type: (CephadmOrchestrator) -> None
- self.mgr = mgr
- self.specs = {} # type: Dict[str, ServiceSpec]
- self.spec_created = {} # type: Dict[str, datetime.datetime]
+def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
+ @wraps(f)
+ def forall_hosts_wrapper(*args) -> List[T]:
+
+ # Some weired logic to make calling functions with multiple arguments work.
+ if len(args) == 1:
+ vals = args[0]
+ self = None
+ elif len(args) == 2:
+ self, vals = args
+ else:
+ assert 'either f([...]) or self.f([...])'
- def load(self):
- # type: () -> None
- for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
- service_name = k[len(SPEC_STORE_PREFIX):]
+ def do_work(arg):
+ if not isinstance(arg, tuple):
+ arg = (arg, )
try:
- v = json.loads(v)
- spec = ServiceSpec.from_json(v['spec'])
- created = datetime.datetime.strptime(v['created'], DATEFMT)
- self.specs[service_name] = spec
- self.spec_created[service_name] = created
- self.mgr.log.debug('SpecStore: loaded spec for %s' % (
- service_name))
+ if self:
+ return f(self, *arg)
+ return f(*arg)
except Exception as e:
- self.mgr.log.warning('unable to load spec for %s: %s' % (
- service_name, e))
- pass
-
- def save(self, spec):
- # type: (ServiceSpec) -> None
- self.specs[spec.service_name()] = spec
- self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
- self.mgr.set_store(
- SPEC_STORE_PREFIX + spec.service_name(),
- json.dumps({
- 'spec': spec.to_json(),
- 'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
- }, sort_keys=True),
- )
-
- def rm(self, service_name):
- # type: (str) -> bool
- found = service_name in self.specs
- if found:
- del self.specs[service_name]
- del self.spec_created[service_name]
- self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
- return found
-
- def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
- specs = []
- for sn, spec in self.specs.items():
- if not service_name or \
- sn == service_name or \
- sn.startswith(service_name + '.'):
- specs.append(spec)
- self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
- service_name, specs))
- return specs
-
-class HostCache():
- def __init__(self, mgr):
- # type: (CephadmOrchestrator) -> None
- self.mgr: CephadmOrchestrator = mgr
- self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
- self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
- self.devices = {} # type: Dict[str, List[inventory.Device]]
- self.networks = {} # type: Dict[str, Dict[str, List[str]]]
- self.last_device_update = {} # type: Dict[str, datetime.datetime]
- self.daemon_refresh_queue = [] # type: List[str]
- self.device_refresh_queue = [] # type: List[str]
- self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
- self.last_host_check = {} # type: Dict[str, datetime.datetime]
-
- def load(self):
- # type: () -> None
- for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
- host = k[len(HOST_CACHE_PREFIX):]
- if host not in self.mgr.inventory:
- self.mgr.log.warning('removing stray HostCache host record %s' % (
- host))
- self.mgr.set_store(k, None)
- try:
- j = json.loads(v)
- if 'last_device_update' in j:
- self.last_device_update[host] = datetime.datetime.strptime(
- j['last_device_update'], DATEFMT)
- else:
- self.device_refresh_queue.append(host)
- # for services, we ignore the persisted last_*_update
- # and always trigger a new scrape on mgr restart.
- self.daemon_refresh_queue.append(host)
- self.daemons[host] = {}
- self.devices[host] = []
- self.networks[host] = {}
- self.daemon_config_deps[host] = {}
- for name, d in j.get('daemons', {}).items():
- self.daemons[host][name] = \
- orchestrator.DaemonDescription.from_json(d)
- for d in j.get('devices', []):
- self.devices[host].append(inventory.Device.from_json(d))
- self.networks[host] = j.get('networks', {})
- for name, d in j.get('daemon_config_deps', {}).items():
- self.daemon_config_deps[host][name] = {
- 'deps': d.get('deps', []),
- 'last_config': datetime.datetime.strptime(
- d['last_config'], DATEFMT),
- }
- if 'last_host_check' in j:
- self.last_host_check[host] = datetime.datetime.strptime(
- j['last_host_check'], DATEFMT)
- self.mgr.log.debug(
- 'HostCache.load: host %s has %d daemons, '
- '%d devices, %d networks' % (
- host, len(self.daemons[host]), len(self.devices[host]),
- len(self.networks[host])))
- except Exception as e:
- self.mgr.log.warning('unable to load cached state for %s: %s' % (
- host, e))
- pass
-
- def update_host_daemons(self, host, dm):
- # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
- self.daemons[host] = dm
- self.last_daemon_update[host] = datetime.datetime.utcnow()
-
- def update_host_devices_networks(self, host, dls, nets):
- # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
- self.devices[host] = dls
- self.networks[host] = nets
- self.last_device_update[host] = datetime.datetime.utcnow()
-
- def update_daemon_config_deps(self, host, name, deps, stamp):
- self.daemon_config_deps[host][name] = {
- 'deps': deps,
- 'last_config': stamp,
- }
-
- def update_last_host_check(self, host):
- # type: (str) -> None
- self.last_host_check[host] = datetime.datetime.utcnow()
-
- def prime_empty_host(self, host):
- # type: (str) -> None
- """
- Install an empty entry for a host
- """
- self.daemons[host] = {}
- self.devices[host] = []
- self.networks[host] = {}
- self.daemon_config_deps[host] = {}
- self.daemon_refresh_queue.append(host)
- self.device_refresh_queue.append(host)
-
- def invalidate_host_daemons(self, host):
- # type: (str) -> None
- self.daemon_refresh_queue.append(host)
- if host in self.last_daemon_update:
- del self.last_daemon_update[host]
- self.mgr.event.set()
-
- def invalidate_host_devices(self, host):
- # type: (str) -> None
- self.device_refresh_queue.append(host)
- if host in self.last_device_update:
- del self.last_device_update[host]
- self.mgr.event.set()
-
- def save_host(self, host):
- # type: (str) -> None
- j = { # type: ignore
- 'daemons': {},
- 'devices': [],
- 'daemon_config_deps': {},
- }
- if host in self.last_daemon_update:
- j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
- if host in self.last_device_update:
- j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
- for name, dd in self.daemons[host].items():
- j['daemons'][name] = dd.to_json() # type: ignore
- for d in self.devices[host]:
- j['devices'].append(d.to_json()) # type: ignore
- j['networks'] = self.networks[host]
- for name, depi in self.daemon_config_deps[host].items():
- j['daemon_config_deps'][name] = { # type: ignore
- 'deps': depi.get('deps', []),
- 'last_config': depi['last_config'].strftime(DATEFMT),
- }
- if host in self.last_host_check:
- j['last_host_check']= self.last_host_check[host].strftime(DATEFMT)
- self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
-
- def rm_host(self, host):
- # type: (str) -> None
- if host in self.daemons:
- del self.daemons[host]
- if host in self.devices:
- del self.devices[host]
- if host in self.networks:
- del self.networks[host]
- if host in self.last_daemon_update:
- del self.last_daemon_update[host]
- if host in self.last_device_update:
- del self.last_device_update[host]
- if host in self.daemon_config_deps:
- del self.daemon_config_deps[host]
- self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
-
- def get_hosts(self):
- # type: () -> List[str]
- r = []
- for host, di in self.daemons.items():
- r.append(host)
- return r
-
- def get_daemons(self):
- # type: () -> List[orchestrator.DaemonDescription]
- r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
- r.append(dd)
- return r
-
- def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
- for host, dm in self.daemons.items():
- if host in self.mgr.offline_hosts:
- def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
- ret = copy(dd)
- ret.status = -1
- ret.status_desc = 'host is offline'
- return ret
- yield host, {name: set_offline(d) for name, d in dm.items()}
- else:
- yield host, dm
-
- def get_daemons_by_service(self, service_name):
- # type: (str) -> List[orchestrator.DaemonDescription]
- result = [] # type: List[orchestrator.DaemonDescription]
- for host, dm in self.daemons.items():
- for name, d in dm.items():
- if name.startswith(service_name + '.'):
- result.append(d)
- return result
-
- def get_daemon_names(self):
- # type: () -> List[str]
- r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
- r.append(name)
- return r
-
- def get_daemon_last_config_deps(self, host, name):
- if host in self.daemon_config_deps:
- if name in self.daemon_config_deps[host]:
- return self.daemon_config_deps[host][name].get('deps', []), \
- self.daemon_config_deps[host][name].get('last_config', None)
- return None, None
-
- def host_needs_daemon_refresh(self, host):
- # type: (str) -> bool
- if host in self.mgr.offline_hosts:
- logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
- return False
- if host in self.daemon_refresh_queue:
- self.daemon_refresh_queue.remove(host)
- return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
- seconds=self.mgr.daemon_cache_timeout)
- if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
- return True
- return False
-
- def host_needs_device_refresh(self, host):
- # type: (str) -> bool
- if host in self.mgr.offline_hosts:
- logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
- return False
- if host in self.device_refresh_queue:
- self.device_refresh_queue.remove(host)
- return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
- seconds=self.mgr.device_cache_timeout)
- if host not in self.last_device_update or self.last_device_update[host] < cutoff:
- return True
- return False
-
- def host_needs_check(self, host):
- # type: (str) -> bool
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
- seconds=self.mgr.host_check_interval)
- return host not in self.last_host_check or self.last_host_check[host] < cutoff
-
- def add_daemon(self, host, dd):
- # type: (str, orchestrator.DaemonDescription) -> None
- assert host in self.daemons
- self.daemons[host][dd.name()] = dd
-
- def rm_daemon(self, host, name):
- if host in self.daemons:
- if name in self.daemons[host]:
- del self.daemons[host][name]
-
-
-class AsyncCompletion(orchestrator.Completion):
- def __init__(self,
- _first_promise=None, # type: Optional[orchestrator.Completion]
- value=orchestrator._Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable]
- name=None, # type: Optional[str]
- many=False, # type: bool
- update_progress=False, # type: bool
- ):
+ logger.exception(f'executing {f.__name__}({args}) failed.')
+ raise
assert CephadmOrchestrator.instance is not None
- self.many = many
- self.update_progress = update_progress
- if name is None and on_complete is not None:
- name = getattr(on_complete, '__name__', None)
- super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
-
- @property
- def _progress_reference(self):
- # type: () -> Optional[orchestrator.ProgressReference]
- if hasattr(self._on_complete_, 'progress_id'): # type: ignore
- return self._on_complete_ # type: ignore
- return None
-
- @property
- def _on_complete(self):
- # type: () -> Optional[Callable]
- if self._on_complete_ is None:
- return None
-
- def callback(result):
- try:
- if self.update_progress:
- assert self.progress_reference
- self.progress_reference.progress = 1.0
- self._on_complete_ = None
- self._finalize(result)
- except Exception as e:
- try:
- self.fail(e)
- except Exception:
- logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<')
- if 'UNITTEST' in os.environ:
- assert False
-
- def error_callback(e):
- pass
-
- def run(value):
- def do_work(*args, **kwargs):
- assert self._on_complete_ is not None
- try:
- res = self._on_complete_(*args, **kwargs)
- if self.update_progress and self.many:
- assert self.progress_reference
- self.progress_reference.progress += 1.0 / len(value)
- return res
- except Exception as e:
- self.fail(e)
- raise
-
- assert CephadmOrchestrator.instance
- if self.many:
- if not value:
- logger.info('calling map_async without values')
- callback([])
- if six.PY3:
- CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
- callback=callback,
- error_callback=error_callback)
- else:
- CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
- callback=callback)
- else:
- if six.PY3:
- CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
- callback=callback, error_callback=error_callback)
- else:
- CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
- callback=callback)
- return self.ASYNC_RESULT
-
- return run
-
- @_on_complete.setter
- def _on_complete(self, inner):
- # type: (Callable) -> None
- self._on_complete_ = inner
-
-
-def ssh_completion(cls=AsyncCompletion, **c_kwargs):
- # type: (Type[orchestrator.Completion], Any) -> Callable
- """
- See ./HACKING.rst for a how-to
- """
- def decorator(f):
- @wraps(f)
- def wrapper(*args):
-
- name = f.__name__
- many = c_kwargs.get('many', False)
-
- # Some weired logic to make calling functions with multiple arguments work.
- if len(args) == 1:
- [value] = args
- if many and value and isinstance(value[0], tuple):
- return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
- else:
- return cls(on_complete=f, value=value, name=name, **c_kwargs)
- else:
- if many:
- self, value = args
+ return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
- def call_self(inner_args):
- if not isinstance(inner_args, tuple):
- inner_args = (inner_args, )
- return f(self, *inner_args)
- return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
- else:
- return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
+ return forall_hosts_wrapper
- return wrapper
- return decorator
+class CephadmCompletion(orchestrator.Completion):
+ def evaluate(self):
+ self.finalize(None)
-def async_completion(f):
- # type: (Callable) -> Callable[..., AsyncCompletion]
+def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
"""
- See ./HACKING.rst for a how-to
-
- :param f: wrapped function
- """
- return ssh_completion()(f)
-
-
-def async_map_completion(f):
- # type: (Callable) -> Callable[..., AsyncCompletion]
- """
- See ./HACKING.rst for a how-to
-
- :param f: wrapped function
-
- kind of similar to
-
- >>> def sync_map(f):
- ... return lambda x: map(f, x)
-
+ Decorator to make CephadmCompletion methods return
+ a completion object that executes themselves.
"""
- return ssh_completion(many=True)(f)
-
-def trivial_completion(f):
- # type: (Callable) -> Callable[..., orchestrator.Completion]
@wraps(f)
def wrapper(*args, **kwargs):
- return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
+ return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
+
return wrapper
'desc': 'Container image name, without the tag',
'runtime': True,
},
+ {
+ 'name': 'container_image_prometheus',
+ 'default': 'prom/prometheus:v2.18.1',
+ 'desc': 'Prometheus container image',
+ },
+ {
+ 'name': 'container_image_grafana',
+ 'default': 'ceph/ceph-grafana:latest',
+ 'desc': 'Prometheus container image',
+ },
+ {
+ 'name': 'container_image_alertmanager',
+ 'default': 'prom/alertmanager:v0.20.0',
+ 'desc': 'Prometheus container image',
+ },
+ {
+ 'name': 'container_image_node_exporter',
+ 'default': 'prom/node-exporter:v0.18.1',
+ 'desc': 'Prometheus container image',
+ },
{
'name': 'warn_on_stray_hosts',
'type': 'bool',
self.host_check_interval = 0
self.mode = ''
self.container_image_base = ''
+ self.container_image_prometheus = ''
+ self.container_image_grafana = ''
+ self.container_image_alertmanager = ''
+ self.container_image_node_exporter = ''
self.warn_on_stray_hosts = True
self.warn_on_stray_daemons = True
self.warn_on_failed_host_check = True
CephadmOrchestrator.instance = self
- t = self.get_store('upgrade_state')
- if t:
- self.upgrade_state = json.loads(t)
- else:
- self.upgrade_state = None
+ self.upgrade = CephadmUpgrade(self)
self.health_checks = {}
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
- # load inventory
- i = self.get_store('inventory')
- if i:
- self.inventory: Dict[str, dict] = json.loads(i)
- else:
- self.inventory = dict()
- self.log.debug('Loaded inventory %s' % self.inventory)
+ self.inventory = Inventory(self)
self.cache = HostCache(self)
self.cache.load()
# in-memory only.
self.offline_hosts: Set[str] = set()
+ # services:
+ self.osd_service = OSDService(self)
+ self.nfs_service = NFSService(self)
+ self.mon_service = MonService(self)
+ self.mgr_service = MgrService(self)
+ self.mds_service = MdsService(self)
+ self.rgw_service = RgwService(self)
+ self.rbd_mirror_service = RbdMirrorService(self)
+ self.grafana_service = GrafanaService(self)
+ self.alertmanager_service = AlertmanagerService(self)
+ self.prometheus_service = PrometheusService(self)
+ self.node_exporter_service = NodeExporterService(self)
+ self.crash_service = CrashService(self)
+ self.iscsi_service = IscsiService(self)
+ self.cephadm_services = {
+ 'mon': self.mon_service,
+ 'mgr': self.mgr_service,
+ 'osd': self.osd_service,
+ 'mds': self.mds_service,
+ 'rgw': self.rgw_service,
+ 'rbd-mirror': self.rbd_mirror_service,
+ 'nfs': self.nfs_service,
+ 'grafana': self.grafana_service,
+ 'alertmanager': self.alertmanager_service,
+ 'prometheus': self.prometheus_service,
+ 'node-exporter': self.node_exporter_service,
+ 'crash': self.crash_service,
+ 'iscsi': self.iscsi_service,
+ }
+
+ self.template = TemplateMgr()
+
def shutdown(self):
self.log.debug('shutdown')
self._worker_pool.close()
self.run = False
self.event.set()
+ def _get_cephadm_service(self, service_type: str) -> CephadmService:
+ assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
+ return self.cephadm_services[service_type]
+
def _kick_serve_loop(self):
self.log.debug('_kick_serve_loop')
self.event.set()
def _check_safe_to_destroy_mon(self, mon_id):
# type: (str) -> None
- ret, out, err = self.mon_command({
+ ret, out, err = self.check_mon_command({
'prefix': 'quorum_status',
})
- if ret:
- raise OrchestratorError('failed to check mon quorum status')
try:
j = json.loads(out)
except Exception as e:
return
raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
- def _wait_for_ok_to_stop(self, s):
- # only wait a little bit; the service might go away for something
- tries = 4
- while tries > 0:
- if s.daemon_type not in ['mon', 'osd', 'mds']:
- self.log.info('Upgrade: It is presumed safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- return True
- ret, out, err = self.mon_command({
- 'prefix': '%s ok-to-stop' % s.daemon_type,
- 'ids': [s.daemon_id],
- })
- if not self.upgrade_state or self.upgrade_state.get('paused'):
- return False
- if ret:
- self.log.info('Upgrade: It is NOT safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- time.sleep(15)
- tries -= 1
- else:
- self.log.info('Upgrade: It is safe to stop %s.%s' %
- (s.daemon_type, s.daemon_id))
- return True
- return False
-
- def _clear_upgrade_health_checks(self):
- for k in ['UPGRADE_NO_STANDBY_MGR',
- 'UPGRADE_FAILED_PULL']:
- if k in self.health_checks:
- del self.health_checks[k]
- self.set_health_checks(self.health_checks)
-
- def _fail_upgrade(self, alert_id, alert):
- self.log.error('Upgrade: Paused due to %s: %s' % (alert_id,
- alert['summary']))
- self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
- self.upgrade_state['paused'] = True
- self._save_upgrade_state()
- self.health_checks[alert_id] = alert
- self.set_health_checks(self.health_checks)
-
- def _update_upgrade_progress(self, progress):
- if 'progress_id' not in self.upgrade_state:
- self.upgrade_state['progress_id'] = str(uuid.uuid4())
- self._save_upgrade_state()
- self.remote('progress', 'update', self.upgrade_state['progress_id'],
- ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
- ev_progress=progress)
-
- def _do_upgrade(self):
- # type: () -> None
- if not self.upgrade_state:
- self.log.debug('_do_upgrade no state, exiting')
- return
-
- target_name = self.upgrade_state.get('target_name')
- target_id = self.upgrade_state.get('target_id', None)
- if not target_id:
- # need to learn the container hash
- self.log.info('Upgrade: First pull of %s' % target_name)
- try:
- target_id, target_version = self._get_container_image_id(target_name)
- except OrchestratorError as e:
- self._fail_upgrade('UPGRADE_FAILED_PULL', {
- 'severity': 'warning',
- 'summary': 'Upgrade: failed to pull target image',
- 'count': 1,
- 'detail': [str(e)],
- })
- return
- self.upgrade_state['target_id'] = target_id
- self.upgrade_state['target_version'] = target_version
- self._save_upgrade_state()
- target_version = self.upgrade_state.get('target_version')
- self.log.info('Upgrade: Target is %s with id %s' % (target_name,
- target_id))
-
- # get all distinct container_image settings
- image_settings = {}
- ret, out, err = self.mon_command({
- 'prefix': 'config dump',
- 'format': 'json',
- })
- config = json.loads(out)
- for opt in config:
- if opt['name'] == 'container_image':
- image_settings[opt['section']] = opt['value']
-
- daemons = self.cache.get_daemons()
- done = 0
- for daemon_type in CEPH_UPGRADE_ORDER:
- self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
- need_upgrade_self = False
- for d in daemons:
- if d.daemon_type != daemon_type:
- continue
- if d.container_image_id == target_id:
- self.log.debug('daemon %s.%s version correct' % (
- daemon_type, d.daemon_id))
- done += 1
- continue
- self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % (
- daemon_type, d.daemon_id,
- d.container_image_name, d.container_image_id, d.version))
-
- if daemon_type == 'mgr' and \
- d.daemon_id == self.get_mgr_id():
- self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
- self.get_mgr_id())
- need_upgrade_self = True
- continue
-
- # make sure host has latest container image
- out, err, code = self._run_cephadm(
- d.hostname, None, 'inspect-image', [],
- image=target_name, no_fsid=True, error_ok=True)
- if code or json.loads(''.join(out)).get('image_id') != target_id:
- self.log.info('Upgrade: Pulling %s on %s' % (target_name,
- d.hostname))
- out, err, code = self._run_cephadm(
- d.hostname, None, 'pull', [],
- image=target_name, no_fsid=True, error_ok=True)
- if code:
- self._fail_upgrade('UPGRADE_FAILED_PULL', {
- 'severity': 'warning',
- 'summary': 'Upgrade: failed to pull target image',
- 'count': 1,
- 'detail': [
- 'failed to pull %s on host %s' % (target_name,
- d.hostname)],
- })
- return
- r = json.loads(''.join(out))
- if r.get('image_id') != target_id:
- self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
- self.upgrade_state['target_id'] = r['image_id']
- self._save_upgrade_state()
- return
-
- self._update_upgrade_progress(done / len(daemons))
-
- if not d.container_image_id:
- if d.container_image_name == target_name:
- self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
- continue
- if not self._wait_for_ok_to_stop(d):
- return
- self.log.info('Upgrade: Redeploying %s.%s' %
- (d.daemon_type, d.daemon_id))
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'name': 'container_image',
- 'value': target_name,
- 'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id),
- })
- self._daemon_action(
- d.daemon_type,
- d.daemon_id,
- d.hostname,
- 'redeploy'
- )
- return
-
- if need_upgrade_self:
- mgr_map = self.get('mgr_map')
- num = len(mgr_map.get('standbys'))
- if not num:
- self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
- 'severity': 'warning',
- 'summary': 'Upgrade: Need standby mgr daemon',
- 'count': 1,
- 'detail': [
- 'The upgrade process needs to upgrade the mgr, '
- 'but it needs at least one standby to proceed.',
- ],
- })
- return
-
- self.log.info('Upgrade: there are %d other already-upgraded '
- 'standby mgrs, failing over' % num)
-
- self._update_upgrade_progress(done / len(daemons))
-
- # fail over
- ret, out, err = self.mon_command({
- 'prefix': 'mgr fail',
- 'who': self.get_mgr_id(),
- })
- return
- elif daemon_type == 'mgr':
- if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
- del self.health_checks['UPGRADE_NO_STANDBY_MGR']
- self.set_health_checks(self.health_checks)
-
- # make sure 'ceph versions' agrees
- ret, out, err = self.mon_command({
- 'prefix': 'versions',
- })
- j = json.loads(out)
- for version, count in j.get(daemon_type, {}).items():
- if version != target_version:
- self.log.warning(
- 'Upgrade: %d %s daemon(s) are %s != target %s' %
- (count, daemon_type, version, target_version))
-
- # push down configs
- if image_settings.get(daemon_type) != target_name:
- self.log.info('Upgrade: Setting container_image for all %s...' %
- daemon_type)
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'name': 'container_image',
- 'value': target_name,
- 'who': daemon_type,
- })
- to_clean = []
- for section in image_settings.keys():
- if section.startswith(utils.name_to_config_section(daemon_type) + '.'):
- to_clean.append(section)
- if to_clean:
- self.log.debug('Upgrade: Cleaning up container_image for %s...' %
- to_clean)
- for section in to_clean:
- ret, image, err = self.mon_command({
- 'prefix': 'config rm',
- 'name': 'container_image',
- 'who': section,
- })
-
- self.log.info('Upgrade: All %s daemons are up to date.' %
- daemon_type)
-
- # clean up
- self.log.info('Upgrade: Finalizing container_image settings')
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'name': 'container_image',
- 'value': target_name,
- 'who': 'global',
- })
- for daemon_type in CEPH_UPGRADE_ORDER:
- ret, image, err = self.mon_command({
- 'prefix': 'config rm',
- 'name': 'container_image',
- 'who': utils.name_to_config_section(daemon_type),
- })
-
- self.log.info('Upgrade: Complete!')
- if 'progress_id' in self.upgrade_state:
- self.remote('progress', 'complete',
- self.upgrade_state['progress_id'])
- self.upgrade_state = None
- self._save_upgrade_state()
- return
-
def _check_host(self, host):
if host not in self.inventory:
return
if r:
failures.append(r)
+ if self.cache.host_needs_osdspec_preview_refresh(host):
+ self.log.debug(f"refreshing OSDSpec previews for {host}")
+ r = self._refresh_host_osdspec_previews(host)
+ if r:
+ failures.append(r)
+
health_changed = False
if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
if health_changed:
self.set_health_checks(self.health_checks)
-
-
self._check_for_strays()
if self.paused:
self._check_daemons()
- if self.upgrade_state and not self.upgrade_state.get('paused'):
- self._do_upgrade()
+ if self.upgrade.continue_upgrade():
continue
self._serve_sleep()
continue
return name
- def get_service_name(self, daemon_type, daemon_id, host):
- # type: (str, str, str) -> (str)
- """
- Returns the generic service name
- """
- p = re.compile(r'(.*)\.%s.*' % (host))
- return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id))
-
- def _save_inventory(self):
- self.set_store('inventory', json.dumps(self.inventory))
-
- def _save_upgrade_state(self):
- self.set_store('upgrade_state', json.dumps(self.upgrade_state))
-
def _reconfig_ssh(self):
temp_files = [] # type: list
ssh_options = [] # type: List[str]
self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
for p in completions:
- p.finalize()
-
- def _require_hosts(self, hosts):
- """
- Raise an error if any of the given hosts are unregistered.
- """
- if isinstance(hosts, six.string_types):
- hosts = [hosts]
- keys = self.inventory.keys()
- unregistered_hosts = set(hosts) - keys
- if unregistered_hosts:
- logger.warning('keys = {}'.format(keys))
- raise RuntimeError("Host(s) {} not registered".format(
- ", ".join(map(lambda h: "'{}'".format(h),
- unregistered_hosts))))
+ p.evaluate()
@orchestrator._cli_write_command(
prefix='cephadm set-ssh-config',
self._reconfig_ssh()
return 0, '', ''
+ @orchestrator._cli_write_command(
+ 'cephadm set-priv-key',
+ desc='Set cluster SSH private key (use -i <private_key>)')
+ def _set_priv_key(self, inbuf=None):
+ if inbuf is None or len(inbuf) == 0:
+ return -errno.EINVAL, "", "empty private ssh key provided"
+ self.set_store("ssh_identity_key", inbuf)
+ self.log.info('Set ssh private key')
+ self._reconfig_ssh()
+ return 0, "", ""
+
+ @orchestrator._cli_write_command(
+ 'cephadm set-pub-key',
+ desc='Set cluster SSH public key (use -i <public_key>)')
+ def _set_pub_key(self, inbuf=None):
+ if inbuf is None or len(inbuf) == 0:
+ return -errno.EINVAL, "", "empty public ssh key provided"
+ self.set_store("ssh_identity_pub", inbuf)
+ self.log.info('Set ssh public key')
+ self._reconfig_ssh()
+ return 0, "", ""
+
@orchestrator._cli_write_command(
'cephadm clear-key',
desc='Clear cluster SSH key')
"""
Setup a connection for running commands on remote host.
"""
- conn_and_r = self._cons.get(host)
- if conn_and_r:
- self.log.debug('Have connection to %s' % host)
- return conn_and_r
+ conn, r = self._cons.get(host, (None, None))
+ if conn:
+ if conn.has_connection():
+ self.log.debug('Have connection to %s' % host)
+ return conn, r
+ else:
+ self._reset_con(host)
n = self.ssh_user + '@' + host
self.log.debug("Opening connection to {} with ssh options '{}'".format(
n, self._ssh_options))
executable_path))
return executable_path
- def _run_cephadm(self, host, entity, command, args,
- addr=None,
- stdin=None,
+ def _run_cephadm(self,
+ host: str,
+ entity: Optional[str],
+ command: str,
+ args: List[str],
+ addr: Optional[str] = None,
+ stdin: Optional[str] = None,
no_fsid=False,
error_ok=False,
- image=None):
- # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int]
+ image: Optional[str] = None,
+ env_vars: Optional[List[str]] = None,
+ ) -> Tuple[List[str], List[str], int]:
"""
Run cephadm on the remote host with the given command + args
+
+ :env_vars: in format -> [KEY=VALUE, ..]
"""
if not addr and host in self.inventory:
- addr = self.inventory[host].get('addr', host)
+ addr = self.inventory.get_addr(host)
self.offline_hosts_remove(host)
try:
try:
conn, connr = self._get_connection(addr)
- except IOError as e:
+ except OSError as e:
if error_ok:
self.log.exception('failed to establish ssh connection')
return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
- raise
+ raise execnet.gateway_bootstrap.HostNotFound(str(e)) from e
assert image or entity
if not image:
- daemon_type = entity.split('.', 1)[0] # type: ignore
+ daemon_type = entity.split('.', 1)[0] # type: ignore
if daemon_type in CEPH_TYPES or \
- daemon_type == 'nfs':
+ daemon_type == 'nfs' or \
+ daemon_type == 'iscsi':
# get container image
- ret, image, err = self.mon_command({
+ ret, image, err = self.check_mon_command({
'prefix': 'config get',
'who': utils.name_to_config_section(entity),
'key': 'container_image',
})
- image = image.strip() # type: ignore
+ image = image.strip() # type: ignore
+ elif daemon_type == 'prometheus':
+ image = self.container_image_prometheus
+ elif daemon_type == 'grafana':
+ image = self.container_image_grafana
+ elif daemon_type == 'alertmanager':
+ image = self.container_image_alertmanager
+ elif daemon_type == 'node-exporter':
+ image = self.container_image_node_exporter
+
self.log.debug('%s container image %s' % (entity, image))
final_args = []
+
+ if env_vars:
+ for env_var_pair in env_vars:
+ final_args.extend(['--env', env_var_pair])
+
if image:
final_args.extend(['--image', image])
final_args.append(command)
final_args += ['--fsid', self._cluster_fsid]
final_args += args
+ self.log.debug('args: %s' % (' '.join(final_args)))
if self.mode == 'root':
- self.log.debug('args: %s' % (' '.join(final_args)))
if stdin:
self.log.debug('stdin: %s' % stdin)
script = 'injected_argv = ' + json.dumps(final_args) + '\n'
# do with "host not found" (e.g., ssh key permission denied).
self.offline_hosts.add(host)
user = 'root' if self.mode == 'root' else 'cephadm'
- msg = f'Failed to connect to {host} ({addr}). ' \
- f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
- f'you may want to run: \n' \
- f'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}'
+ msg = f'''Failed to connect to {host} ({addr}).
+Check that the host is reachable and accepts connections using the cephadm SSH key
+
+you may want to run:
+> ceph cephadm get-ssh-config > ssh_config
+> ceph config-key get mgr/cephadm/ssh_identity_key > key
+> ssh -F ssh_config -i key {user}@{host}'''
raise OrchestratorError(msg) from e
except Exception as ex:
self.log.exception(ex)
raise
- def _get_hosts(self, label=None):
- # type: (Optional[str]) -> List[str]
- r = []
- for h, hostspec in self.inventory.items():
- if not label or label in hostspec.get('labels', []):
- r.append(h)
- return r
+ def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
+ return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
- @async_completion
- def add_host(self, spec):
+ def _add_host(self, spec):
# type: (HostSpec) -> str
"""
Add a host to be managed by the orchestrator.
raise OrchestratorError('New host %s (%s) failed check: %s' % (
spec.hostname, spec.addr, err))
- self.inventory[spec.hostname] = spec.to_json()
- self._save_inventory()
+ self.inventory.add_host(spec)
self.cache.prime_empty_host(spec.hostname)
self.offline_hosts_remove(spec.hostname)
self.event.set() # refresh stray health check
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}'".format(spec.hostname)
- @async_completion
+ @trivial_completion
+ def add_host(self, spec: HostSpec) -> str:
+ return self._add_host(spec)
+
+ @trivial_completion
def remove_host(self, host):
# type: (str) -> str
"""
:param host: host name
"""
- del self.inventory[host]
- self._save_inventory()
+ self.inventory.rm_host(host)
self.cache.rm_host(host)
self._reset_con(host)
self.event.set() # refresh stray health check
self.log.info('Removed host %s' % host)
return "Removed host '{}'".format(host)
- @async_completion
+ @trivial_completion
def update_host_addr(self, host, addr):
- if host not in self.inventory:
- raise OrchestratorError('host %s not registered' % host)
- self.inventory[host]['addr'] = addr
- self._save_inventory()
+ self.inventory.set_addr(host, addr)
self._reset_con(host)
self.event.set() # refresh stray health check
self.log.info('Set host %s addr to %s' % (host, addr))
Notes:
- skip async: manager reads from cache.
"""
- r = []
- for hostname, info in self.inventory.items():
- r.append(orchestrator.HostSpec(
- hostname,
- addr=info.get('addr', hostname),
- labels=info.get('labels', []),
- status='Offline' if hostname in self.offline_hosts else info.get('status', ''),
- ))
- return r
+ return list(self.inventory.all_specs())
- @async_completion
+ @trivial_completion
def add_host_label(self, host, label):
- if host not in self.inventory:
- raise OrchestratorError('host %s does not exist' % host)
-
- if 'labels' not in self.inventory[host]:
- self.inventory[host]['labels'] = list()
- if label not in self.inventory[host]['labels']:
- self.inventory[host]['labels'].append(label)
- self._save_inventory()
+ self.inventory.add_label(host, label)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
- @async_completion
+ @trivial_completion
def remove_host_label(self, host, label):
- if host not in self.inventory:
- raise OrchestratorError('host %s does not exist' % host)
-
- if 'labels' not in self.inventory[host]:
- self.inventory[host]['labels'] = list()
- if label in self.inventory[host]['labels']:
- self.inventory[host]['labels'].remove(label)
- self._save_inventory()
+ self.inventory.rm_label(host, label)
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
+ def update_osdspec_previews(self, search_host: str = ''):
+ # Set global 'pending' flag for host
+ self.cache.loading_osdspec_preview.add(search_host)
+ previews = []
+ # query OSDSpecs for host <search host> and generate/get the preview
+ # There can be multiple previews for one host due to multiple OSDSpecs.
+ previews.extend(self.osd_service.get_previews(search_host))
+ self.log.debug(f"Loading OSDSpec previews to HostCache")
+ self.cache.osdspec_previews[search_host] = previews
+ # Unset global 'pending' flag for host
+ self.cache.loading_osdspec_preview.remove(search_host)
+
+ def _refresh_host_osdspec_previews(self, host) -> bool:
+ self.update_osdspec_previews(host)
+ self.cache.save_host(host)
+ self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
+ return True
+
def _refresh_host_daemons(self, host):
try:
out, err, code = self._run_cephadm(
sd.container_image_name = d.get('container_image_name')
sd.container_image_id = d.get('container_image_id')
sd.version = d.get('version')
+ if sd.daemon_type == 'osd':
+ sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
if 'state' in d:
sd.status_desc = d['state']
sd.status = {
host, len(devices), len(networks)))
devices = inventory.Devices.from_json(devices)
self.cache.update_host_devices_networks(host, devices.devices, networks)
+ self.update_osdspec_previews(host)
self.cache.save_host(host)
return None
- def _get_spec_size(self, spec):
- if spec.placement.count:
- return spec.placement.count
- elif spec.placement.host_pattern:
- return len(spec.placement.pattern_matches_hosts(self.inventory.keys()))
- elif spec.placement.label:
- return len(self._get_hosts(spec.placement.label))
- elif spec.placement.hosts:
- return len(spec.placement.hosts)
- # hmm!
- return 0
-
@trivial_completion
def describe_service(self, service_type=None, service_name=None,
refresh=False):
if refresh:
# ugly sync path, FIXME someday perhaps?
- for host, hi in self.inventory.items():
+ for host in self.inventory.keys():
self._refresh_host_daemons(host)
# <service_map>
sm = {} # type: Dict[str, orchestrator.ServiceDescription]
+ osd_count = 0
for h, dm in self.cache.get_daemons_with_volatile_status():
for name, dd in dm.items():
if service_type and service_type != dd.daemon_type:
if service_name and service_name != n:
continue
if dd.daemon_type == 'osd':
- continue # ignore OSDs for now
- if dd.service_name() in self.spec_store.specs:
- spec = self.spec_store.specs[dd.service_name()]
+ """
+ OSDs do not know the affinity to their spec out of the box.
+ """
+ n = f"osd.{dd.osdspec_affinity}"
+ if n in self.spec_store.specs:
+ spec = self.spec_store.specs[n]
else:
spec = ServiceSpec(
unmanaged=True,
container_image_name=dd.container_image_name,
spec=spec,
)
- if dd.service_name() in self.spec_store.specs:
- sm[n].size = self._get_spec_size(spec)
- sm[n].created = self.spec_store.spec_created[dd.service_name()]
+ if n in self.spec_store.specs:
+ if dd.daemon_type == 'osd':
+ """
+ The osd count can't be determined by the Placement spec.
+ It's rather pointless to show a actual/expected representation
+ here. So we're setting running = size for now.
+ """
+ osd_count += 1
+ sm[n].size = osd_count
+ else:
+ sm[n].size = spec.placement.get_host_selection_size(self._get_hosts)
+
+ sm[n].created = self.spec_store.spec_created[n]
if service_type == 'nfs':
spec = cast(NFSServiceSpec, spec)
sm[n].rados_config_location = spec.rados_config_location()
continue
sm[n] = orchestrator.ServiceDescription(
spec=spec,
- size=self._get_spec_size(spec),
+ size=spec.placement.get_host_selection_size(self._get_hosts),
running=0,
)
if service_type == 'nfs':
if host:
self._refresh_host_daemons(host)
else:
- for hostname, hi in self.inventory.items():
+ for hostname in self.inventory.keys():
self._refresh_host_daemons(hostname)
result = []
for h, dm in self.cache.get_daemons_with_volatile_status():
result.append(dd)
return result
+ @trivial_completion
def service_action(self, action, service_name):
args = []
for host, dm in self.cache.daemons.items():
self.log.info('%s service %s' % (action.capitalize(), service_name))
return self._daemon_actions(args)
- @async_map_completion
+ @forall_hosts
def _daemon_actions(self, daemon_type, daemon_id, host, action):
return self._daemon_action(daemon_type, daemon_id, host, action)
self.cache.invalidate_host_daemons(host)
return "{} {} from host '{}'".format(action, name, host)
+ @trivial_completion
def daemon_action(self, action, daemon_type, daemon_id):
args = []
for host, dm in self.cache.daemons.items():
','.join(['%s.%s' % (a[0], a[1]) for a in args])))
return self._daemon_actions(args)
+ @trivial_completion
def remove_daemons(self, names):
- # type: (List[str]) -> orchestrator.Completion
+ # type: (List[str]) -> List[str]
args = []
for host, dm in self.cache.daemons.items():
for name in names:
@trivial_completion
def remove_service(self, service_name):
self.log.info('Remove service %s' % service_name)
+ self._trigger_preview_refresh(service_name=service_name)
found = self.spec_store.rm(service_name)
if found:
self._kick_serve_loop()
for host in host_filter.hosts:
self._refresh_host_devices(host)
else:
- for host, hi in self.inventory.items():
+ for host in self.inventory.keys():
self._refresh_host_devices(host)
result = []
raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
return '\n'.join(out + err)
+ @trivial_completion
def blink_device_light(self, ident_fault, on, locs):
- @async_map_completion
+ @forall_hosts
def blink(host, dev, path):
cmd = [
'lsmcli',
r[str(osd_id)] = o.get('uuid', '')
return r
+ def resolve_hosts_for_osdspecs(self,
+ specs: Optional[List[DriveGroupSpec]] = None,
+ service_name: Optional[str] = None
+ ) -> List[str]:
+ osdspecs = []
+ if service_name:
+ self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
+ osdspecs = self.spec_store.find(service_name=service_name)
+ self.log.debug(f"Found OSDSpecs: {osdspecs}")
+ if specs:
+ osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
+ if not service_name and not specs:
+ # if neither parameters are fulfilled, search for all available osdspecs
+ osdspecs = self.spec_store.find(service_name='osd')
+ self.log.debug(f"Found OSDSpecs: {osdspecs}")
+ if not osdspecs:
+ self.log.debug("No OSDSpecs found")
+ return []
+ return sum([spec.placement.filter_matching_hosts(self._get_hosts) for spec in osdspecs], [])
+
+ def resolve_osdspecs_for_host(self, host):
+ matching_specs = []
+ self.log.debug(f"Finding OSDSpecs for host: <{host}>")
+ for spec in self.spec_store.find('osd'):
+ if host in spec.placement.filter_matching_hosts(self._get_hosts):
+ self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
+ matching_specs.append(spec)
+ return matching_specs
+
+ def _trigger_preview_refresh(self,
+ specs: Optional[List[DriveGroupSpec]] = None,
+ service_name: Optional[str] = None):
+ refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+ for host in refresh_hosts:
+ self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
+ self.cache.osdspec_previews_refresh_queue.append(host)
+
@trivial_completion
def apply_drivegroups(self, specs: List[DriveGroupSpec]):
+ self._trigger_preview_refresh(specs=specs)
return [self._apply(spec) for spec in specs]
- def find_destroyed_osds(self) -> Dict[str, List[str]]:
- osd_host_map: Dict[str, List[str]] = dict()
- ret, out, err = self.mon_command({
- 'prefix': 'osd tree',
- 'states': ['destroyed'],
- 'format': 'json'
- })
- if ret != 0:
- raise OrchestratorError(f"Caught error on calling 'osd tree destroyed' -> {err}")
- try:
- tree = json.loads(out)
- except json.decoder.JSONDecodeError:
- self.log.error(f"Could not decode json -> {out}")
- return osd_host_map
-
- nodes = tree.get('nodes', {})
- for node in nodes:
- if node.get('type') == 'host':
- osd_host_map.update(
- {node.get('name'): [str(_id) for _id in node.get('children', list())]}
- )
- return osd_host_map
-
@trivial_completion
def create_osds(self, drive_group: DriveGroupSpec):
- self.log.debug(f"Processing DriveGroup {drive_group}")
- ret = []
- drive_group.osd_id_claims = self.find_destroyed_osds()
- self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
- for host, drive_selection in self.prepare_drivegroup(drive_group):
- self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
- cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
- drive_group.osd_id_claims.get(host, []))
- if not cmd:
- self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
- continue
- ret_msg = self._create_osd(host, cmd,
- replace_osd_ids=drive_group.osd_id_claims.get(host, []))
- ret.append(ret_msg)
- return ", ".join(ret)
-
- def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
- # 1) use fn_filter to determine matching_hosts
- matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
- # 2) Map the inventory to the InventoryHost object
- host_ds_map = []
-
- # set osd_id_claims
-
- def _find_inv_for_host(hostname: str, inventory_dict: dict):
- # This is stupid and needs to be loaded with the host
- for _host, _inventory in inventory_dict.items():
- if _host == hostname:
- return _inventory
- raise OrchestratorError("No inventory found for host: {}".format(hostname))
-
- # 3) iterate over matching_host and call DriveSelection
- self.log.debug(f"Checking matching hosts -> {matching_hosts}")
- for host in matching_hosts:
- inventory_for_host = _find_inv_for_host(host, self.cache.devices)
- self.log.debug(f"Found inventory for host {inventory_for_host}")
- drive_selection = DriveSelection(drive_group, inventory_for_host)
- self.log.debug(f"Found drive selection {drive_selection}")
- host_ds_map.append((host, drive_selection))
- return host_ds_map
-
- def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
- drive_selection: DriveSelection,
- osd_id_claims: Optional[List[str]] = None,
- preview: bool = False) -> Optional[str]:
- self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
- cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
- self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
- return cmd
-
- def preview_drivegroups(self, drive_group_name: Optional[str] = None,
- dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
- # find drivegroups
- if drive_group_name:
- drive_groups = cast(List[DriveGroupSpec],
- self.spec_store.find(service_name=drive_group_name))
- elif dg_specs:
- drive_groups = dg_specs
- else:
- drive_groups = []
- ret_all = []
- for drive_group in drive_groups:
- drive_group.osd_id_claims = self.find_destroyed_osds()
- self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
- # prepare driveselection
- for host, ds in self.prepare_drivegroup(drive_group):
- cmd = self.driveselection_to_ceph_volume(drive_group, ds,
- drive_group.osd_id_claims.get(host, []), preview=True)
- if not cmd:
- self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
- continue
- out, err, code = self._run_ceph_volume_command(host, cmd)
- if out:
- concat_out = json.loads(" ".join(out))
- ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
- return ret_all
-
- def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
- self._require_hosts(host)
-
- # get bootstrap key
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get',
- 'entity': 'client.bootstrap-osd',
- })
-
- # generate config
- ret, config, err = self.mon_command({
- "prefix": "config generate-minimal-conf",
- })
-
- j = json.dumps({
- 'config': config,
- 'keyring': keyring,
- })
-
- split_cmd = cmd.split(' ')
- _cmd = ['--config-json', '-', '--']
- _cmd.extend(split_cmd)
- out, err, code = self._run_cephadm(
- host, 'osd', 'ceph-volume',
- _cmd,
- stdin=j,
- error_ok=True)
- return out, err, code
-
- def _create_osd(self, host, cmd, replace_osd_ids=None):
- out, err, code = self._run_ceph_volume_command(host, cmd)
-
- if code == 1 and ', it is already prepared' in '\n'.join(err):
- # HACK: when we create against an existing LV, ceph-volume
- # returns an error and the above message. To make this
- # command idempotent, tolerate this "error" and continue.
- self.log.debug('the device was already prepared; continuing')
- code = 0
- if code:
- raise RuntimeError(
- 'cephadm exited with an error code: %d, stderr:%s' % (
- code, '\n'.join(err)))
-
- # check result
- out, err, code = self._run_cephadm(
- host, 'osd', 'ceph-volume',
- [
- '--',
- 'lvm', 'list',
- '--format', 'json',
- ])
- before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
- osds_elems = json.loads('\n'.join(out))
- fsid = self._cluster_fsid
- osd_uuid_map = self.get_osd_uuid_map()
- created = []
- for osd_id, osds in osds_elems.items():
- for osd in osds:
- if osd['tags']['ceph.cluster_fsid'] != fsid:
- self.log.debug('mismatched fsid, skipping %s' % osd)
- continue
- if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
- # if it exists but is part of the replacement operation, don't skip
- continue
- if osd_id not in osd_uuid_map:
- self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
- continue
- if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
- self.log.debug('mismatched osd uuid (cluster has %s, osd '
- 'has %s)' % (
- osd_uuid_map.get(osd_id),
- osd['tags']['ceph.osd_fsid']))
- continue
-
- created.append(osd_id)
- self._create_daemon(
- 'osd', osd_id, host,
- osd_uuid_map=osd_uuid_map)
+ return self.osd_service.create(drive_group)
- if created:
- self.cache.invalidate_host_devices(host)
- return "Created osd(s) %s on host '%s'" % (','.join(created), host)
- else:
- return "Created no osd(s) on host %s; already created?" % host
+ @trivial_completion
+ def preview_osdspecs(self,
+ osdspec_name: Optional[str] = None,
+ osdspecs: Optional[List[DriveGroupSpec]] = None
+ ):
+ matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+ if not matching_hosts:
+ return {'n/a': [{'error': True,
+ 'message': 'No OSDSpec or matching hosts found.'}]}
+ # Is any host still loading previews
+ pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
+ if pending_hosts:
+ # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
+ return {'n/a': [{'error': True,
+ 'message': 'Preview data is being generated.. '
+ 'Please try again in a bit.'}]}
+ # drop all keys that are not in search_hosts and return preview struct
+ return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
def _calc_daemon_deps(self, daemon_type, daemon_id):
need = {
# type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
# keyring
if not keyring:
- if daemon_type == 'mon':
- ename = 'mon.'
- else:
- ename = utils.name_to_config_section(daemon_type + '.' + daemon_id)
- ret, keyring, err = self.mon_command({
+ ename = utils.name_to_auth_entity(daemon_type + '.' + daemon_id)
+ ret, keyring, err = self.check_mon_command({
'prefix': 'auth get',
'entity': ename,
})
# generate config
- ret, config, err = self.mon_command({
+ ret, config, err = self.check_mon_command({
"prefix": "config generate-minimal-conf",
})
if extra_ceph_config:
'keyring': keyring,
}
- def _create_daemon(self, daemon_type, daemon_id, host,
- keyring=None,
- extra_args=None, extra_config=None,
+ def _create_daemon(self,
+ daemon_type: str,
+ daemon_id: str,
+ host: str,
+ keyring: Optional[str] = None,
+ extra_args: Optional[List[str]] = None,
+ extra_config: Optional[Dict[str, Any]] = None,
reconfig=False,
- osd_uuid_map=None):
+ osd_uuid_map: Optional[Dict[str, Any]] = None,
+ redeploy=False,
+ ) -> str:
+
if not extra_args:
extra_args = []
if not extra_config:
start_time = datetime.datetime.utcnow()
deps = [] # type: List[str]
- cephadm_config = {} # type: Dict[str, Any]
+ cephadm_config = {} # type: Dict[str, Any]
if daemon_type == 'prometheus':
- cephadm_config, deps = self._generate_prometheus_config()
+ cephadm_config, deps = self.prometheus_service.generate_config()
extra_args.extend(['--config-json', '-'])
elif daemon_type == 'grafana':
- cephadm_config, deps = self._generate_grafana_config()
+ cephadm_config, deps = self.grafana_service.generate_config()
extra_args.extend(['--config-json', '-'])
elif daemon_type == 'nfs':
cephadm_config, deps = \
- self._generate_nfs_config(daemon_type, daemon_id, host)
+ self.nfs_service._generate_nfs_config(daemon_type, daemon_id, host)
extra_args.extend(['--config-json', '-'])
elif daemon_type == 'alertmanager':
- cephadm_config, deps = self._generate_alertmanager_config()
+ cephadm_config, deps = self.alertmanager_service.generate_config()
+ extra_args.extend(['--config-json', '-'])
+ elif daemon_type == 'node-exporter':
+ cephadm_config, deps = self.node_exporter_service.generate_config()
extra_args.extend(['--config-json', '-'])
else:
# Ceph.daemons (mon, mgr, mds, osd, etc)
osd_uuid_map = self.get_osd_uuid_map()
osd_uuid = osd_uuid_map.get(daemon_id)
if not osd_uuid:
- raise OrchestratorError('osd.%d not in osdmap' % daemon_id)
+ raise OrchestratorError('osd.%s not in osdmap' % daemon_id)
extra_args.extend(['--osd-fsid', osd_uuid])
if reconfig:
return "{} {} on host '{}'".format(
'Reconfigured' if reconfig else 'Deployed', name, host)
- @async_map_completion
- def _remove_daemons(self, name, host):
+ @forall_hosts
+ def _remove_daemons(self, name, host) -> str:
return self._remove_daemon(name, host)
- def _remove_daemon(self, name, host):
+ def _remove_daemon(self, name, host) -> str:
"""
Remove a daemon
"""
# remove mon from quorum before we destroy the daemon
self.log.info('Removing monitor %s from monmap...' % name)
- ret, out, err = self.mon_command({
+ ret, out, err = self.check_mon_command({
'prefix': 'mon rm',
'name': daemon_id,
})
- if ret:
- raise OrchestratorError('failed to remove mon %s from monmap' % (
- name))
args = ['--name', name, '--force']
self.log.info('Removing daemon %s from %s' % (name, host))
self.cache.invalidate_host_daemons(host)
return "Removed {} from host '{}'".format(name, host)
- def _apply_service(self, spec):
+ def _create_fn(self, service_type: str) -> Callable[..., str]:
+ try:
+ d: Dict[str, function] = {
+ 'mon': self.mon_service.create,
+ 'mgr': self.mgr_service.create,
+ 'osd': self.osd_service.create,
+ 'mds': self.mds_service.create,
+ 'rgw': self.rgw_service.create,
+ 'rbd-mirror': self.rbd_mirror_service.create,
+ 'nfs': self.nfs_service.create,
+ 'grafana': self.grafana_service.create,
+ 'alertmanager': self.alertmanager_service.create,
+ 'prometheus': self.prometheus_service.create,
+ 'node-exporter': self.node_exporter_service.create,
+ 'crash': self.crash_service.create,
+ 'iscsi': self.iscsi_service.create,
+ }
+ return d[service_type] # type: ignore
+ except KeyError:
+ self.log.exception(f'unknown service type {service_type}')
+ raise OrchestratorError(f'unknown service type {service_type}') from e
+
+ def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
+ return {
+ 'mds': self.mds_service.config,
+ 'rgw': self.rgw_service.config,
+ 'nfs': self.nfs_service.config,
+ 'iscsi': self.iscsi_service.config,
+ }.get(service_type)
+
+ def _apply_service(self, spec: ServiceSpec) -> bool:
"""
Schedule a service. Deploy new daemons or remove old ones, depending
on the target label and count specified in the placement.
self.log.debug('Skipping unmanaged service %s spec' % service_name)
return False
self.log.debug('Applying service %s spec' % service_name)
- create_fns = {
- 'mon': self._create_mon,
- 'mgr': self._create_mgr,
- 'osd': self.create_osds,
- 'mds': self._create_mds,
- 'rgw': self._create_rgw,
- 'rbd-mirror': self._create_rbd_mirror,
- 'nfs': self._create_nfs,
- 'grafana': self._create_grafana,
- 'alertmanager': self._create_alertmanager,
- 'prometheus': self._create_prometheus,
- 'node-exporter': self._create_node_exporter,
- 'crash': self._create_crash,
- 'iscsi': self._create_iscsi,
- }
- config_fns = {
- 'mds': self._config_mds,
- 'rgw': self._config_rgw,
- 'nfs': self._config_nfs,
- 'iscsi': self._config_iscsi,
- }
- create_func = create_fns.get(daemon_type, None)
- if not create_func:
- self.log.debug('unrecognized service type %s' % daemon_type)
+
+ create_func = self._create_fn(daemon_type)
+ config_func = self._config_fn(daemon_type)
+
+ if daemon_type == 'osd':
+ create_func(spec)
+ # TODO: return True would result in a busy loop
return False
- config_func = config_fns.get(daemon_type, None)
daemons = self.cache.get_daemons_by_service(service_name)
public_network = None
if daemon_type == 'mon':
- ret, out, err = self.mon_command({
+ ret, out, err = self.check_mon_command({
'prefix': 'config get',
'who': 'mon',
'key': 'public_network',
r = False
- if daemon_type == 'osd':
- return False if create_func(spec) else True # type: ignore
-
# sanity check
if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
config_func(spec)
did_config = True
daemon_id = self.get_unique_name(daemon_type, host, daemons,
- spec.service_id, name)
+ prefix=spec.service_id,
+ forcename=name)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
if daemon_type == 'mon':
create_func(daemon_id, host, network) # type: ignore
- elif daemon_type == 'nfs':
+ elif daemon_type in ['nfs', 'iscsi']:
create_func(daemon_id, host, spec) # type: ignore
else:
- create_func(daemon_id, host) # type: ignore
+ create_func(daemon_id, host) # type: ignore
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
if self._apply_service(spec):
r = True
except Exception as e:
- self.log.warning('Failed to apply %s spec %s: %s' % (
+ self.log.exception('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
return r
+ def _check_pool_exists(self, pool, service_name):
+ logger.info(f'Checking pool "{pool}" exists for service {service_name}')
+ if not self.rados.pool_exists(pool):
+ raise OrchestratorError(f'Cannot find pool "{pool}" for '
+ f'service {service_name}')
+
def _check_daemons(self):
# get monmap mtime so we can refresh configs when mons change
monmap = self.get('mon_map')
last_monmap = None # just in case clocks are skewed
daemons = self.cache.get_daemons()
- grafanas = [] # type: List[orchestrator.DaemonDescription]
+ daemons_post = defaultdict(list)
for dd in daemons:
# orphan?
spec = self.spec_store.specs.get(dd.service_name(), None)
self._remove_daemon(dd.name(), dd.hostname)
# ignore unmanaged services
- if not spec or spec.unmanaged:
+ if spec and spec.unmanaged:
continue
- # dependencies?
- if dd.daemon_type == 'grafana':
- # put running instances at the front of the list
- grafanas.insert(0, dd)
+ # These daemon types require additional configs after creation
+ if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+ daemons_post[dd.daemon_type].append(dd)
+
deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
last_deps, last_config = self.cache.get_daemon_last_config_deps(
dd.hostname, dd.name())
self._create_daemon(dd.daemon_type, dd.daemon_id,
dd.hostname, reconfig=True)
- # make sure the dashboard [does not] references grafana
- try:
- current_url = self.get_module_option_ex('dashboard',
- 'GRAFANA_API_URL')
- if grafanas:
- host = grafanas[0].hostname
- url = 'https://%s:3000' % (self.inventory[host].get('addr',
- host))
- if current_url != url:
- self.log.info('Setting dashboard grafana config to %s' % url)
- self.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
- url)
- # FIXME: is it a signed cert??
- except Exception as e:
- self.log.debug('got exception fetching dashboard grafana state: %s',
- e)
+ # do daemon post actions
+ for daemon_type, daemon_descs in daemons_post.items():
+ self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
def _add_daemon(self, daemon_type, spec,
- create_func, config_func=None):
+ create_func: Callable[..., T], config_func=None) -> List[T]:
"""
Add (and place) a daemon. Require explicit host placement. Do not
schedule, and do not apply the related scheduling limitations.
def _create_daemons(self, daemon_type, spec, daemons,
hosts, count,
- create_func, config_func=None):
+ create_func: Callable[..., T], config_func=None) -> List[T]:
if count > len(hosts):
raise OrchestratorError('too few hosts: want %d, have %s' % (
count, hosts))
args = [] # type: List[tuple]
for host, network, name in hosts:
daemon_id = self.get_unique_name(daemon_type, host, daemons,
- spec.service_id, name)
+ prefix=spec.service_id,
+ forcename=name)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
if daemon_type == 'mon':
args.append((daemon_id, host, network)) # type: ignore
- elif daemon_type == 'nfs':
- args.append((daemon_id, host, spec)) # type: ignore
- elif daemon_type == 'iscsi':
+ elif daemon_type in ['nfs', 'iscsi']:
args.append((daemon_id, host, spec)) # type: ignore
else:
args.append((daemon_id, host)) # type: ignore
)
daemons.append(sd)
- @async_map_completion
+ @forall_hosts
def create_func_map(*args):
return create_func(*args)
def apply_mon(self, spec):
return self._apply(spec)
- def _create_mon(self, name, host, network):
- """
- Create a new monitor on the given host.
- """
- # get mon. key
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get',
- 'entity': 'mon.',
- })
-
- extra_config = '[mon.%s]\n' % name
- if network:
- # infer whether this is a CIDR network, addrvec, or plain IP
- if '/' in network:
- extra_config += 'public network = %s\n' % network
- elif network.startswith('[v') and network.endswith(']'):
- extra_config += 'public addrv = %s\n' % network
- elif ':' not in network:
- extra_config += 'public addr = %s\n' % network
- else:
- raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
- else:
- # try to get the public_network from the config
- ret, network, err = self.mon_command({
- 'prefix': 'config get',
- 'who': 'mon',
- 'key': 'public_network',
- })
- network = network.strip() # type: ignore
- if ret:
- raise RuntimeError('Unable to fetch cluster_network config option')
- if not network:
- raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
- if '/' not in network:
- raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
- extra_config += 'public network = %s\n' % network
-
- return self._create_daemon('mon', name, host,
- keyring=keyring,
- extra_config={'config': extra_config})
-
+ @trivial_completion
def add_mon(self, spec):
- # type: (ServiceSpec) -> orchestrator.Completion
- return self._add_daemon('mon', spec, self._create_mon)
+ # type: (ServiceSpec) -> List[str]
+ return self._add_daemon('mon', spec, self.mon_service.create)
- def _create_mgr(self, mgr_id, host):
- """
- Create a new manager instance on a host.
- """
- # get mgr. key
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': 'mgr.%s' % mgr_id,
- 'caps': ['mon', 'profile mgr',
- 'osd', 'allow *',
- 'mds', 'allow *'],
- })
+ @trivial_completion
+ def add_mgr(self, spec):
+ # type: (ServiceSpec) -> List[str]
+ return self._add_daemon('mgr', spec, self.mgr_service.create)
- return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
+ def _apply(self, spec: GenericSpec) -> str:
+ if spec.service_type == 'host':
+ return self._add_host(cast(HostSpec, spec))
- def add_mgr(self, spec):
- # type: (ServiceSpec) -> orchestrator.Completion
- return self._add_daemon('mgr', spec, self._create_mgr)
+ return self._apply_service_spec(cast(ServiceSpec, spec))
- def _apply(self, spec: ServiceSpec) -> str:
+ def _apply_service_spec(self, spec: ServiceSpec) -> str:
if spec.placement.is_empty():
# fill in default placement
defaults = {
return "Scheduled %s update..." % spec.service_name()
@trivial_completion
- def apply(self, specs: List[ServiceSpec]):
- return [self._apply(spec) for spec in specs]
+ def apply(self, specs: List[GenericSpec]):
+ results = []
+ for spec in specs:
+ results.append(self._apply(spec))
+ return results
@trivial_completion
def apply_mgr(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_mds(self, spec: ServiceSpec):
- return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
+ return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
@trivial_completion
def apply_mds(self, spec: ServiceSpec):
return self._apply(spec)
- def _config_mds(self, spec):
- # ensure mds_join_fs is set for these daemons
- assert spec.service_id
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'who': 'mds.' + spec.service_id,
- 'name': 'mds_join_fs',
- 'value': spec.service_id,
- })
-
- def _create_mds(self, mds_id, host):
- # get mgr. key
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': 'mds.' + mds_id,
- 'caps': ['mon', 'profile mds',
- 'osd', 'allow rwx',
- 'mds', 'allow'],
- })
- return self._create_daemon('mds', mds_id, host, keyring=keyring)
-
+ @trivial_completion
def add_rgw(self, spec):
- return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
-
- def _config_rgw(self, spec):
- # ensure rgw_realm and rgw_zone is set for these daemons
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
- 'name': 'rgw_zone',
- 'value': spec.rgw_zone,
- })
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
- 'name': 'rgw_realm',
- 'value': spec.rgw_realm,
- })
- ret, out, err = self.mon_command({
- 'prefix': 'config set',
- 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
- 'name': 'rgw_frontends',
- 'value': spec.rgw_frontends_config_value(),
- })
-
- if spec.rgw_frontend_ssl_certificate:
- if isinstance(spec.rgw_frontend_ssl_certificate, list):
- cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
- else:
- cert_data = spec.rgw_frontend_ssl_certificate
- ret, out, err = self.mon_command({
- 'prefix': 'config-key set',
- 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
- 'val': cert_data,
- })
-
- if spec.rgw_frontend_ssl_key:
- if isinstance(spec.rgw_frontend_ssl_key, list):
- key_data = '\n'.join(spec.rgw_frontend_ssl_key)
- else:
- key_data = spec.rgw_frontend_ssl_key
- ret, out, err = self.mon_command({
- 'prefix': 'config-key set',
- 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
- 'val': key_data,
- })
-
- logger.info('Saving service %s spec with placement %s' % (
- spec.service_name(), spec.placement.pretty_str()))
- self.spec_store.save(spec)
-
- def _create_rgw(self, rgw_id, host):
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
- 'caps': ['mon', 'allow *',
- 'mgr', 'allow rw',
- 'osd', 'allow rwx'],
- })
- return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
+ return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
@trivial_completion
def apply_rgw(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_iscsi(self, spec):
- # type: (ServiceSpec) -> orchestrator.Completion
- return self._add_daemon('iscsi', spec, self._create_iscsi, self._config_iscsi)
-
- def _config_iscsi(self, spec):
- logger.info('Saving service %s spec with placement %s' % (
- spec.service_name(), spec.placement.pretty_str()))
- self.spec_store.save(spec)
-
- def _create_iscsi(self, igw_id, host, spec):
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': utils.name_to_config_section('iscsi') + '.' + igw_id,
- 'caps': ['mon', 'allow rw',
- 'osd', f'allow rwx pool={spec.pool}'],
- })
-
- api_secure = 'false' if spec.api_secure is None else spec.api_secure
- igw_conf = f"""
-# generated by cephadm
-[config]
-cluster_client_name = {utils.name_to_config_section('iscsi')}.{igw_id}
-pool = {spec.pool}
-trusted_ip_list = {spec.trusted_ip_list or ''}
-minimum_gateways = 1
-fqdn_enabled = {spec.fqdn_enabled or ''}
-api_port = {spec.api_port or ''}
-api_user = {spec.api_user or ''}
-api_password = {spec.api_password or ''}
-api_secure = {api_secure}
-"""
- extra_config = {'iscsi-gateway.cfg': igw_conf}
- return self._create_daemon('iscsi', igw_id, host, keyring=keyring,
- extra_config=extra_config)
+ # type: (ServiceSpec) -> List[str]
+ return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
@trivial_completion
def apply_iscsi(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_rbd_mirror(self, spec):
- return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
-
- def _create_rbd_mirror(self, daemon_id, host):
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': 'client.rbd-mirror.' + daemon_id,
- 'caps': ['mon', 'profile rbd-mirror',
- 'osd', 'profile rbd'],
- })
- return self._create_daemon('rbd-mirror', daemon_id, host,
- keyring=keyring)
+ return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
@trivial_completion
def apply_rbd_mirror(self, spec):
return self._apply(spec)
- def _generate_nfs_config(self, daemon_type, daemon_id, host):
- # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
- deps = [] # type: List[str]
-
- # find the matching NFSServiceSpec
- # TODO: find the spec and pass via _create_daemon instead ??
- service_name = self.get_service_name(daemon_type, daemon_id, host)
- specs = self.spec_store.find(service_name)
- if not specs:
- raise OrchestratorError('Cannot find service spec %s' % (service_name))
- elif len(specs) > 1:
- raise OrchestratorError('Found multiple service specs for %s' % (service_name))
- else:
- # cast to keep mypy happy
- spec = cast(NFSServiceSpec, specs[0])
-
- nfs = NFSGanesha(self, daemon_id, spec)
-
- # create the keyring
- entity = nfs.get_keyring_entity()
- keyring = nfs.get_or_create_keyring(entity=entity)
-
- # update the caps after get-or-create, the keyring might already exist!
- nfs.update_keyring_caps(entity=entity)
-
- # create the rados config object
- nfs.create_rados_config_obj()
-
- # generate the cephadm config
- cephadm_config = nfs.get_cephadm_config()
- cephadm_config.update(
- self._get_config_and_keyring(
- daemon_type, daemon_id,
- keyring=keyring))
-
- return cephadm_config, deps
-
+ @trivial_completion
def add_nfs(self, spec):
- return self._add_daemon('nfs', spec, self._create_nfs, self._config_nfs)
-
- def _config_nfs(self, spec):
- logger.info('Saving service %s spec with placement %s' % (
- spec.service_name(), spec.placement.pretty_str()))
- self.spec_store.save(spec)
-
- def _create_nfs(self, daemon_id, host, spec):
- return self._create_daemon('nfs', daemon_id, host)
+ return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
@trivial_completion
def apply_nfs(self, spec):
return self._apply(spec)
- def _generate_prometheus_config(self):
- # type: () -> Tuple[Dict[str, Any], List[str]]
- deps = [] # type: List[str]
-
- # scrape mgrs
- mgr_scrape_list = []
- mgr_map = self.get('mgr_map')
- port = None
- t = mgr_map.get('services', {}).get('prometheus', None)
- if t:
- t = t.split('/')[2]
- mgr_scrape_list.append(t)
- port = '9283'
- if ':' in t:
- port = t.split(':')[1]
- # scan all mgrs to generate deps and to get standbys too.
- # assume that they are all on the same port as the active mgr.
- for dd in self.cache.get_daemons_by_service('mgr'):
- # we consider the mgr a dep even if the prometheus module is
- # disabled in order to be consistent with _calc_daemon_deps().
- deps.append(dd.name())
- if not port:
- continue
- if dd.daemon_id == self.get_mgr_id():
- continue
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
- mgr_scrape_list.append(addr.split(':')[0] + ':' + port)
-
- # scrape node exporters
- node_configs = ''
- for dd in self.cache.get_daemons_by_service('node-exporter'):
- deps.append(dd.name())
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
- if not node_configs:
- node_configs = """
- - job_name: 'node'
- static_configs:
-"""
- node_configs += """ - targets: {}
- labels:
- instance: '{}'
-""".format([addr.split(':')[0] + ':9100'],
- dd.hostname)
-
- # scrape alert managers
- alertmgr_configs = ""
- alertmgr_targets = []
- for dd in self.cache.get_daemons_by_service('alertmanager'):
- deps.append(dd.name())
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
- alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0]))
- if alertmgr_targets:
- alertmgr_configs = """alerting:
- alertmanagers:
- - scheme: http
- path_prefix: /alertmanager
- static_configs:
- - targets: [{}]
-""".format(", ".join(alertmgr_targets))
-
- # generate the prometheus configuration
- r = {
- 'files': {
- 'prometheus.yml': """# generated by cephadm
-global:
- scrape_interval: 5s
- evaluation_interval: 10s
-rule_files:
- - /etc/prometheus/alerting/*
-{alertmgr_configs}
-scrape_configs:
- - job_name: 'ceph'
- static_configs:
- - targets: {mgr_scrape_list}
- labels:
- instance: 'ceph_cluster'
-{node_configs}
-""".format(
- mgr_scrape_list=str(mgr_scrape_list),
- node_configs=str(node_configs),
- alertmgr_configs=str(alertmgr_configs)
- ),
- },
- }
-
- # include alerts, if present in the container
- if os.path.exists(self.prometheus_alerts_path):
- with open(self.prometheus_alerts_path, "r") as f:
- alerts = f.read()
- r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
-
- return r, sorted(deps)
-
- def _generate_grafana_config(self):
- # type: () -> Tuple[Dict[str, Any], List[str]]
- deps = [] # type: List[str]
- def generate_grafana_ds_config(hosts: List[str]) -> str:
- config = '''# generated by cephadm
-deleteDatasources:
-{delete_data_sources}
-
-datasources:
-{data_sources}
-'''
- delete_ds_template = '''
- - name: '{name}'
- orgId: 1\n'''.lstrip('\n')
- ds_template = '''
- - name: '{name}'
- type: 'prometheus'
- access: 'proxy'
- orgId: 1
- url: 'http://{host}:9095'
- basicAuth: false
- isDefault: {is_default}
- editable: false\n'''.lstrip('\n')
-
- delete_data_sources = ''
- data_sources = ''
- for i, host in enumerate(hosts):
- name = "Dashboard %d" % (i + 1)
- data_sources += ds_template.format(
- name=name,
- host=host,
- is_default=str(i == 0).lower()
- )
- delete_data_sources += delete_ds_template.format(
- name=name
- )
- return config.format(
- delete_data_sources=delete_data_sources,
- data_sources=data_sources,
- )
-
- prom_services = [] # type: List[str]
- for dd in self.cache.get_daemons_by_service('prometheus'):
- prom_services.append(dd.hostname)
- deps.append(dd.name())
-
- cert = self.get_store('grafana_crt')
- pkey = self.get_store('grafana_key')
- if cert and pkey:
- try:
- verify_tls(cert, pkey)
- except ServerConfigException as e:
- logger.warning('Provided grafana TLS certificates invalid: %s', str(e))
- cert, pkey = None, None
- if not (cert and pkey):
- cert, pkey = create_self_signed_cert('Ceph', 'cephadm')
- self.set_store('grafana_crt', cert)
- self.set_store('grafana_key', pkey)
- self.mon_command({
- 'prefix': 'dashboard set-grafana-api-ssl-verify',
- 'value': 'false',
- })
-
-
-
- config_file = {
- 'files': {
- "grafana.ini": """# generated by cephadm
-[users]
- default_theme = light
-[auth.anonymous]
- enabled = true
- org_name = 'Main Org.'
- org_role = 'Viewer'
-[server]
- domain = 'bootstrap.storage.lab'
- protocol = https
- cert_file = /etc/grafana/certs/cert_file
- cert_key = /etc/grafana/certs/cert_key
- http_port = 3000
-[security]
- admin_user = admin
- admin_password = admin
- allow_embedding = true
-""",
- 'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services),
- 'certs/cert_file': '# generated by cephadm\n%s' % cert,
- 'certs/cert_key': '# generated by cephadm\n%s' % pkey,
- }
- }
- return config_file, sorted(deps)
-
def _get_dashboard_url(self):
# type: () -> str
return self.get('mgr_map').get('services', {}).get('dashboard', '')
- def _generate_alertmanager_config(self):
- # type: () -> Tuple[Dict[str, Any], List[str]]
- deps = [] # type: List[str]
-
- # dashboard(s)
- dashboard_urls = []
- mgr_map = self.get('mgr_map')
- port = None
- proto = None # http: or https:
- url = mgr_map.get('services', {}).get('dashboard', None)
- if url:
- dashboard_urls.append(url)
- proto = url.split('/')[0]
- port = url.split('/')[2].split(':')[1]
- # scan all mgrs to generate deps and to get standbys too.
- # assume that they are all on the same port as the active mgr.
- for dd in self.cache.get_daemons_by_service('mgr'):
- # we consider mgr a dep even if the dashboard is disabled
- # in order to be consistent with _calc_daemon_deps().
- deps.append(dd.name())
- if not port:
- continue
- if dd.daemon_id == self.get_mgr_id():
- continue
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
- dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0],
- port))
-
- yml = """# generated by cephadm
-# See https://prometheus.io/docs/alerting/configuration/ for documentation.
-
-global:
- resolve_timeout: 5m
-
-route:
- group_by: ['alertname']
- group_wait: 10s
- group_interval: 10s
- repeat_interval: 1h
- receiver: 'ceph-dashboard'
-receivers:
-- name: 'ceph-dashboard'
- webhook_configs:
-{urls}
-""".format(
- urls='\n'.join(
- [" - url: '{}api/prometheus_receiver'".format(u)
- for u in dashboard_urls]
- ))
- peers = []
- port = '9094'
- for dd in self.cache.get_daemons_by_service('alertmanager'):
- deps.append(dd.name())
- hi = self.inventory.get(dd.hostname, {})
- addr = hi.get('addr', dd.hostname)
- peers.append(addr.split(':')[0] + ':' + port)
- return {
- "files": {
- "alertmanager.yml": yml
- },
- "peers": peers
- }, sorted(deps)
-
+ @trivial_completion
def add_prometheus(self, spec):
- return self._add_daemon('prometheus', spec, self._create_prometheus)
-
- def _create_prometheus(self, daemon_id, host):
- return self._create_daemon('prometheus', daemon_id, host)
+ return self._add_daemon('prometheus', spec, self.prometheus_service.create)
@trivial_completion
def apply_prometheus(self, spec):
return self._apply(spec)
+ @trivial_completion
def add_node_exporter(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
+ # type: (ServiceSpec) -> List[str]
return self._add_daemon('node-exporter', spec,
- self._create_node_exporter)
+ self.node_exporter_service.create)
@trivial_completion
def apply_node_exporter(self, spec):
return self._apply(spec)
- def _create_node_exporter(self, daemon_id, host):
- return self._create_daemon('node-exporter', daemon_id, host)
-
+ @trivial_completion
def add_crash(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
+ # type: (ServiceSpec) -> List[str]
return self._add_daemon('crash', spec,
- self._create_crash)
+ self.crash_service.create)
@trivial_completion
def apply_crash(self, spec):
return self._apply(spec)
- def _create_crash(self, daemon_id, host):
- ret, keyring, err = self.mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': 'client.crash.' + host,
- 'caps': ['mon', 'profile crash',
- 'mgr', 'profile crash'],
- })
- return self._create_daemon('crash', daemon_id, host, keyring=keyring)
-
+ @trivial_completion
def add_grafana(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
- return self._add_daemon('grafana', spec, self._create_grafana)
+ # type: (ServiceSpec) -> List[str]
+ return self._add_daemon('grafana', spec, self.grafana_service.create)
@trivial_completion
def apply_grafana(self, spec: ServiceSpec):
return self._apply(spec)
- def _create_grafana(self, daemon_id, host):
- # type: (str, str) -> str
- return self._create_daemon('grafana', daemon_id, host)
-
+ @trivial_completion
def add_alertmanager(self, spec):
- # type: (ServiceSpec) -> AsyncCompletion
- return self._add_daemon('alertmanager', spec, self._create_alertmanager)
+ # type: (ServiceSpec) -> List[str]
+ return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
@trivial_completion
def apply_alertmanager(self, spec: ServiceSpec):
return self._apply(spec)
- def _create_alertmanager(self, daemon_id, host):
- return self._create_daemon('alertmanager', daemon_id, host)
-
-
def _get_container_image_id(self, image_name):
# pick a random host...
host = None
- for host_name, hi in self.inventory.items():
+ for host_name in self.inventory.keys():
host = host_name
break
if not host:
@trivial_completion
def upgrade_status(self):
- r = orchestrator.UpgradeStatusSpec()
- if self.upgrade_state:
- r.target_image = self.upgrade_state.get('target_name')
- r.in_progress = True
- if self.upgrade_state.get('error'):
- r.message = 'Error: ' + self.upgrade_state.get('error')
- elif self.upgrade_state.get('paused'):
- r.message = 'Upgrade paused'
- return r
+ return self.upgrade.upgrade_status()
@trivial_completion
def upgrade_start(self, image, version):
- if self.mode != 'root':
- raise OrchestratorError('upgrade is not supported in %s mode' % (
- self.mode))
- if version:
- try:
- (major, minor, patch) = version.split('.')
- assert int(minor) >= 0
- assert int(patch) >= 0
- except:
- raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
- if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
- raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
- target_name = self.container_image_base + ':v' + version
- elif image:
- target_name = image
- else:
- raise OrchestratorError('must specify either image or version')
- if self.upgrade_state:
- if self.upgrade_state.get('target_name') != target_name:
- raise OrchestratorError(
- 'Upgrade to %s (not %s) already in progress' %
- (self.upgrade_state.get('target_name'), target_name))
- if self.upgrade_state.get('paused'):
- del self.upgrade_state['paused']
- self._save_upgrade_state()
- return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
- return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
- self.upgrade_state = {
- 'target_name': target_name,
- 'progress_id': str(uuid.uuid4()),
- }
- self._update_upgrade_progress(0.0)
- self._save_upgrade_state()
- self._clear_upgrade_health_checks()
- self.event.set()
- return 'Initiating upgrade to %s' % (target_name)
+ return self.upgrade.upgrade_start(image, version)
@trivial_completion
def upgrade_pause(self):
- if not self.upgrade_state:
- raise OrchestratorError('No upgrade in progress')
- if self.upgrade_state.get('paused'):
- return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
- self.upgrade_state['paused'] = True
- self._save_upgrade_state()
- return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
+ return self.upgrade.upgrade_pause()
@trivial_completion
def upgrade_resume(self):
- if not self.upgrade_state:
- raise OrchestratorError('No upgrade in progress')
- if not self.upgrade_state.get('paused'):
- return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
- del self.upgrade_state['paused']
- self._save_upgrade_state()
- self.event.set()
- return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
+ return self.upgrade.upgrade_resume()
@trivial_completion
def upgrade_stop(self):
- if not self.upgrade_state:
- return 'No upgrade in progress'
- target_name = self.upgrade_state.get('target_name')
- if 'progress_id' in self.upgrade_state:
- self.remote('progress', 'complete',
- self.upgrade_state['progress_id'])
- self.upgrade_state = None
- self._save_upgrade_state()
- self._clear_upgrade_health_checks()
- self.event.set()
- return 'Stopped upgrade to %s' % target_name
+ return self.upgrade.upgrade_stop()
@trivial_completion
def remove_osds(self, osd_ids: List[str],
The CLI call to retrieve an osd removal report
"""
return self.rm_util.report
-
-
-class BaseScheduler(object):
- """
- Base Scheduler Interface
-
- * requires a placement_spec
-
- `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
- """
-
- def __init__(self, placement_spec):
- # type: (PlacementSpec) -> None
- self.placement_spec = placement_spec
-
- def place(self, host_pool, count=None):
- # type: (List, Optional[int]) -> List[HostPlacementSpec]
- raise NotImplementedError
-
-
-class SimpleScheduler(BaseScheduler):
- """
- The most simple way to pick/schedule a set of hosts.
- 1) Shuffle the provided host_pool
- 2) Select from list up to :count
- """
- def __init__(self, placement_spec):
- super(SimpleScheduler, self).__init__(placement_spec)
-
- def place(self, host_pool, count=None):
- # type: (List, Optional[int]) -> List[HostPlacementSpec]
- if not host_pool:
- return []
- host_pool = [x for x in host_pool]
- # shuffle for pseudo random selection
- random.shuffle(host_pool)
- return host_pool[:count]
-
-
-class HostAssignment(object):
- """
- A class to detect if hosts are being passed imperative or declarative
- If the spec is populated via the `hosts/hosts` field it will not load
- any hosts into the list.
- If the spec isn't populated, i.e. when only num or label is present (declarative)
- it will use the provided `get_host_func` to load it from the inventory.
-
- Schedulers can be assigned to pick hosts from the pool.
- """
-
- def __init__(self,
- spec, # type: ServiceSpec
- get_hosts_func, # type: Callable[[Optional[str]],List[str]]
- get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
-
- filter_new_host=None, # type: Optional[Callable[[str],bool]]
- scheduler=None, # type: Optional[BaseScheduler]
- ):
- assert spec and get_hosts_func and get_daemons_func
- self.spec = spec # type: ServiceSpec
- self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
- self.get_hosts_func = get_hosts_func
- self.get_daemons_func = get_daemons_func
- self.filter_new_host = filter_new_host
- self.service_name = spec.service_name()
-
-
- def validate(self):
- self.spec.validate()
-
- if self.spec.placement.hosts:
- explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
- unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func(None)))
- if unknown_hosts:
- raise OrchestratorValidationError(
- f'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
-
- if self.spec.placement.host_pattern:
- pattern_hostnames = self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
- if not pattern_hostnames:
- raise OrchestratorValidationError(
- f'Cannot place {self.spec.one_line_str()}: No matching hosts')
-
- if self.spec.placement.label:
- label_hostnames = self.get_hosts_func(self.spec.placement.label)
- if not label_hostnames:
- raise OrchestratorValidationError(
- f'Cannot place {self.spec.one_line_str()}: No matching '
- f'hosts for label {self.spec.placement.label}')
-
- def place(self):
- # type: () -> List[HostPlacementSpec]
- """
- Load hosts into the spec.placement.hosts container.
- """
-
- self.validate()
-
- # count == 0
- if self.spec.placement.count == 0:
- return []
-
- # respect any explicit host list
- if self.spec.placement.hosts and not self.spec.placement.count:
- logger.debug('Provided hosts: %s' % self.spec.placement.hosts)
- return self.spec.placement.hosts
-
- # respect host_pattern
- if self.spec.placement.host_pattern:
- candidates = [
- HostPlacementSpec(x, '', '')
- for x in self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
- ]
- logger.debug('All hosts: {}'.format(candidates))
- return candidates
-
- count = 0
- if self.spec.placement.hosts and \
- self.spec.placement.count and \
- len(self.spec.placement.hosts) >= self.spec.placement.count:
- hosts = self.spec.placement.hosts
- logger.debug('place %d over provided host list: %s' % (
- count, hosts))
- count = self.spec.placement.count
- elif self.spec.placement.label:
- hosts = [
- HostPlacementSpec(x, '', '')
- for x in self.get_hosts_func(self.spec.placement.label)
- ]
- if not self.spec.placement.count:
- logger.debug('Labeled hosts: {}'.format(hosts))
- return hosts
- count = self.spec.placement.count
- logger.debug('place %d over label %s: %s' % (
- count, self.spec.placement.label, hosts))
- else:
- hosts = [
- HostPlacementSpec(x, '', '')
- for x in self.get_hosts_func(None)
- ]
- if self.spec.placement.count:
- count = self.spec.placement.count
- else:
- # this should be a totally empty spec given all of the
- # alternative paths above.
- assert self.spec.placement.count is None
- assert not self.spec.placement.hosts
- assert not self.spec.placement.label
- count = 1
- logger.debug('place %d over all hosts: %s' % (count, hosts))
-
- # we need to select a subset of the candidates
-
- # if a partial host list is provided, always start with that
- if len(self.spec.placement.hosts) < count:
- chosen = self.spec.placement.hosts
- else:
- chosen = []
-
- # prefer hosts that already have services
- daemons = self.get_daemons_func(self.service_name)
- hosts_with_daemons = {d.hostname for d in daemons}
- # calc existing daemons (that aren't already in chosen)
- chosen_hosts = [hs.hostname for hs in chosen]
- existing = [hs for hs in hosts
- if hs.hostname in hosts_with_daemons and \
- hs.hostname not in chosen_hosts]
- if len(chosen + existing) >= count:
- chosen = chosen + self.scheduler.place(
- existing,
- count - len(chosen))
- logger.debug('Hosts with existing daemons: {}'.format(chosen))
- return chosen
-
- need = count - len(existing + chosen)
- others = [hs for hs in hosts
- if hs.hostname not in hosts_with_daemons]
- if self.filter_new_host:
- old = others
- others = [h for h in others if self.filter_new_host(h.hostname)]
- logger.debug('filtered %s down to %s' % (old, hosts))
- chosen = chosen + self.scheduler.place(others, need)
- logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
- existing, chosen))
- return existing + chosen