import errno
import logging
from collections import defaultdict
+from contextlib import contextmanager
from functools import wraps
from tempfile import TemporaryDirectory
from threading import Event
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast
+ Any, Set, TYPE_CHECKING, cast, Iterator, Union
import datetime
import six
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
- NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
+ NFSServiceSpec, RGWSpec, ServiceSpec, PlacementSpec, assert_valid_host
+from cephadm.services.cephadmservice import CephadmDaemonSpec
from mgr_module import MgrModule, HandleCommandResult
import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
- CLICommandMeta
+ CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
from orchestrator._interface import GenericSpec
from . import remotes
from . import utils
+from .migrations import Migrations
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
RbdMirrorService, CrashService, CephadmService
from .services.iscsi import IscsiService
from .services.nfs import NFSService
-from .services.osd import RemoveUtil, OSDRemoval, OSDService
+from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
-from .schedule import HostAssignment
-from .inventory import Inventory, SpecStore, HostCache
+from .schedule import HostAssignment, HostPlacementSpec
+from .inventory import Inventory, SpecStore, HostCache, EventStore
from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
from .template import TemplateMgr
+from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
try:
import remoto
CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
-def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
- @wraps(f)
- def forall_hosts_wrapper(*args) -> List[T]:
-
- # Some weired logic to make calling functions with multiple arguments work.
- if len(args) == 1:
- vals = args[0]
- self = None
- elif len(args) == 2:
- self, vals = args
- else:
- assert 'either f([...]) or self.f([...])'
-
- def do_work(arg):
- if not isinstance(arg, tuple):
- arg = (arg, )
- try:
- if self:
- return f(self, *arg)
- return f(*arg)
- except Exception as e:
- logger.exception(f'executing {f.__name__}({args}) failed.')
- raise
-
- assert CephadmOrchestrator.instance is not None
- return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
-
-
- return forall_hosts_wrapper
-
-
-class CephadmCompletion(orchestrator.Completion):
+class CephadmCompletion(orchestrator.Completion[T]):
def evaluate(self):
self.finalize(None)
-def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
+def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
"""
Decorator to make CephadmCompletion methods return
a completion object that executes themselves.
instance = None
NATIVE_OPTIONS = [] # type: List[Any]
- MODULE_OPTIONS = [
+ MODULE_OPTIONS: List[dict] = [
{
'name': 'ssh_config_file',
'type': 'str',
},
{
'name': 'container_image_grafana',
- 'default': 'ceph/ceph-grafana:latest',
+ 'default': 'ceph/ceph-grafana:6.6.2',
'desc': 'Prometheus container image',
},
{
'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
'desc': 'location of alerts to include in prometheus deployments',
},
+ {
+ 'name': 'migration_current',
+ 'type': 'int',
+ 'default': None,
+ 'desc': 'internal - do not modify',
+ # used to track track spec and other data migrations.
+ },
+ {
+ 'name': 'config_dashboard',
+ 'type': 'bool',
+ 'default': True,
+ 'desc': 'manage configs like API endpoints in Dashboard.'
+ },
+ {
+ 'name': 'manage_etc_ceph_ceph_conf',
+ 'type': 'bool',
+ 'default': False,
+ 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
+ },
+ {
+ 'name': 'registry_url',
+ 'type': 'str',
+ 'default': None,
+ 'desc': 'Custom repository url'
+ },
+ {
+ 'name': 'registry_username',
+ 'type': 'str',
+ 'default': None,
+ 'desc': 'Custom repository username'
+ },
+ {
+ 'name': 'registry_password',
+ 'type': 'str',
+ 'default': None,
+ 'desc': 'Custom repository password'
+ },
]
def __init__(self, *args, **kwargs):
super(CephadmOrchestrator, self).__init__(*args, **kwargs)
self._cluster_fsid = self.get('mon_map')['fsid']
+ self.last_monmap: Optional[datetime.datetime] = None
# for serve()
self.run = True
self.warn_on_failed_host_check = True
self.allow_ptrace = False
self.prometheus_alerts_path = ''
+ self.migration_current = None
+ self.config_dashboard = True
+ self.manage_etc_ceph_ceph_conf = True
+ self.registry_url: Optional[str] = None
+ self.registry_username: Optional[str] = None
+ self.registry_password: Optional[str] = None
self._cons = {} # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
+
+ self.notify('mon_map', None)
self.config_notify()
path = self.get_ceph_option('cephadm_path')
self.cache = HostCache(self)
self.cache.load()
+
self.rm_util = RemoveUtil(self)
+ self.to_remove_osds = OSDQueue()
+ self.rm_util.load_from_store()
self.spec_store = SpecStore(self)
self.spec_store.load()
if h not in self.inventory:
self.cache.rm_host(h)
+
# in-memory only.
+ self.events = EventStore(self)
self.offline_hosts: Set[str] = set()
+ self.migration = Migrations(self)
+
# services:
self.osd_service = OSDService(self)
self.nfs_service = NFSService(self)
self.template = TemplateMgr()
+ self.requires_post_actions = set()
+
def shutdown(self):
self.log.debug('shutdown')
self._worker_pool.close()
self.log.debug('_kick_serve_loop')
self.event.set()
- def _check_safe_to_destroy_mon(self, mon_id):
- # type: (str) -> None
- ret, out, err = self.check_mon_command({
- 'prefix': 'quorum_status',
- })
- try:
- j = json.loads(out)
- except Exception as e:
- raise OrchestratorError('failed to parse quorum status')
+ # function responsible for logging single host into custom registry
+ def _registry_login(self, host, url, username, password):
+ self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
+ # want to pass info over stdin rather than through normal list of args
+ args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
+ " \"password\": \"" + password + "\"}")
+ out, err, code = self._run_cephadm(
+ host, 'mon', 'registry-login',
+ ['--registry-json', '-'], stdin=args_str, error_ok=True)
+ if code:
+ return f"Host {host} failed to login to {url} as {username} with given password"
+ return
- mons = [m['name'] for m in j['monmap']['mons']]
- if mon_id not in mons:
- self.log.info('Safe to remove mon.%s: not in monmap (%s)' % (
- mon_id, mons))
- return
- new_mons = [m for m in mons if m != mon_id]
- new_quorum = [m for m in j['quorum_names'] if m != mon_id]
- if len(new_quorum) > len(new_mons) / 2:
- self.log.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
- return
- raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
def _check_host(self, host):
if host not in self.inventory:
self.log.debug(' checking %s' % host)
try:
out, err, code = self._run_cephadm(
- host, 'client', 'check-host', [],
+ host, cephadmNoImage, 'check-host', [],
error_ok=True, no_fsid=True)
self.cache.update_last_host_check(host)
self.cache.save_host(host)
ret = self.event.wait(sleep_interval)
self.event.clear()
- def serve(self):
- # type: () -> None
+ def serve(self) -> None:
+ """
+ The main loop of cephadm.
+
+ A command handler will typically change the declarative state
+ of cephadm. This loop will then attempt to apply this new state.
+ """
self.log.debug("serve starting")
while self.run:
- # refresh daemons
- self.log.debug('refreshing hosts')
- bad_hosts = []
- failures = []
- for host in self.cache.get_hosts():
- if self.cache.host_needs_check(host):
- r = self._check_host(host)
- if r is not None:
- bad_hosts.append(r)
- if self.cache.host_needs_daemon_refresh(host):
- self.log.debug('refreshing %s daemons' % host)
- r = self._refresh_host_daemons(host)
- if r:
- failures.append(r)
- if self.cache.host_needs_device_refresh(host):
- self.log.debug('refreshing %s devices' % host)
- r = self._refresh_host_devices(host)
- if r:
- failures.append(r)
-
- if self.cache.host_needs_osdspec_preview_refresh(host):
- self.log.debug(f"refreshing OSDSpec previews for {host}")
- r = self._refresh_host_osdspec_previews(host)
- if r:
- failures.append(r)
-
- health_changed = False
- if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
- del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
- health_changed = True
- if bad_hosts:
- self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
- 'severity': 'warning',
- 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
- 'count': len(bad_hosts),
- 'detail': bad_hosts,
- }
- health_changed = True
- if failures:
- self.health_checks['CEPHADM_REFRESH_FAILED'] = {
- 'severity': 'warning',
- 'summary': 'failed to probe daemons or devices',
- 'count': len(failures),
- 'detail': failures,
- }
- health_changed = True
- elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
- del self.health_checks['CEPHADM_REFRESH_FAILED']
- health_changed = True
- if health_changed:
- self.set_health_checks(self.health_checks)
+ try:
- self._check_for_strays()
+ # refresh daemons
+ self.log.debug('refreshing hosts and daemons')
+ self._refresh_hosts_and_daemons()
- if self.paused:
- self.health_checks['CEPHADM_PAUSED'] = {
- 'severity': 'warning',
- 'summary': 'cephadm background work is paused',
- 'count': 1,
- 'detail': ["'ceph orch resume' to resume"],
- }
- self.set_health_checks(self.health_checks)
- else:
- if 'CEPHADM_PAUSED' in self.health_checks:
- del self.health_checks['CEPHADM_PAUSED']
- self.set_health_checks(self.health_checks)
+ self._check_for_strays()
- self.rm_util._remove_osds_bg()
+ self._update_paused_health()
- if self._apply_all_services():
- continue # did something, refresh
+ if not self.paused:
+ self.rm_util.process_removal_queue()
- self._check_daemons()
+ self.migration.migrate()
+ if self.migration.is_migration_ongoing():
+ continue
- if self.upgrade.continue_upgrade():
- continue
+ if self._apply_all_services():
+ continue # did something, refresh
+
+ self._check_daemons()
+
+ if self.upgrade.continue_upgrade():
+ continue
+
+ except OrchestratorError as e:
+ if e.event_subject:
+ self.events.from_orch_error(e)
self._serve_sleep()
self.log.debug("serve exit")
+ def _update_paused_health(self):
+ if self.paused:
+ self.health_checks['CEPHADM_PAUSED'] = {
+ 'severity': 'warning',
+ 'summary': 'cephadm background work is paused',
+ 'count': 1,
+ 'detail': ["'ceph orch resume' to resume"],
+ }
+ self.set_health_checks(self.health_checks)
+ else:
+ if 'CEPHADM_PAUSED' in self.health_checks:
+ del self.health_checks['CEPHADM_PAUSED']
+ self.set_health_checks(self.health_checks)
+
def config_notify(self):
"""
This method is called whenever one of our config options is changed.
+
+ TODO: this method should be moved into mgr_module.py
"""
for opt in self.MODULE_OPTIONS:
setattr(self,
self.event.set()
def notify(self, notify_type, notify_id):
- pass
+ if notify_type == "mon_map":
+ # get monmap mtime so we can refresh configs when mons change
+ monmap = self.get('mon_map')
+ self.last_monmap = datetime.datetime.strptime(
+ monmap['modified'], CEPH_DATEFMT)
+ if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
+ self.last_monmap = None # just in case clocks are skewed
+ if notify_type == "pg_summary":
+ self._trigger_osd_removal()
+
+ def _trigger_osd_removal(self):
+ data = self.get("osd_stats")
+ for osd in data.get('osd_stats', []):
+ if osd.get('num_pgs') == 0:
+ # if _ANY_ osd that is currently in the queue appears to be empty,
+ # start the removal process
+ if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
+ self.log.debug(f"Found empty osd. Starting removal process")
+ # if the osd that is now empty is also part of the removal queue
+ # start the process
+ self.rm_util.process_removal_queue()
def pause(self):
if not self.paused:
]
if forcename:
if len([d for d in existing if d.daemon_id == forcename]):
- raise orchestrator.OrchestratorValidationError('name %s already in use', forcename)
+ raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{forcename} already in use')
return forcename
if '.' in host:
for _ in range(6))
if len([d for d in existing if d.daemon_id == name]):
if not suffix:
- raise orchestrator.OrchestratorValidationError('name %s already in use', name)
+ raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{name} already in use')
self.log.debug('name %s exists, trying again', name)
continue
return name
if ssh_config_fname:
self.validate_ssh_config_fname(ssh_config_fname)
ssh_options += ['-F', ssh_config_fname]
+ self.ssh_config = ssh_config
# identity
ssh_key = self.get_store("ssh_identity_key")
self._ssh_options = None
if self.mode == 'root':
- self.ssh_user = 'root'
+ self.ssh_user = self.get_store('ssh_user', default='root')
elif self.mode == 'cephadm-package':
self.ssh_user = 'cephadm'
"""
The cephadm orchestrator is always available.
"""
- return self.can_run()
+ ok, err = self.can_run()
+ if not ok:
+ return ok, err
+ if not self.ssh_key or not self.ssh_pub:
+ return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
+ return True, ''
def process(self, completions):
"""
"""
if inbuf is None or len(inbuf) == 0:
return -errno.EINVAL, "", "empty ssh config provided"
+ if inbuf == self.ssh_config:
+ return 0, "value unchanged", ""
self.set_store("ssh_config", inbuf)
self.log.info('Set ssh_config')
+ self._reconfig_ssh()
return 0, "", ""
@orchestrator._cli_write_command(
self.set_store("ssh_config", None)
self.ssh_config_tmp = None
self.log.info('Cleared ssh_config')
+ self._reconfig_ssh()
return 0, "", ""
@orchestrator._cli_read_command(
def _set_priv_key(self, inbuf=None):
if inbuf is None or len(inbuf) == 0:
return -errno.EINVAL, "", "empty private ssh key provided"
+ if inbuf == self.ssh_key:
+ return 0, "value unchanged", ""
self.set_store("ssh_identity_key", inbuf)
self.log.info('Set ssh private key')
self._reconfig_ssh()
def _set_pub_key(self, inbuf=None):
if inbuf is None or len(inbuf) == 0:
return -errno.EINVAL, "", "empty public ssh key provided"
+ if inbuf == self.ssh_pub:
+ return 0, "value unchanged", ""
self.set_store("ssh_identity_pub", inbuf)
self.log.info('Set ssh public key')
self._reconfig_ssh()
def _get_user(self):
return 0, self.ssh_user, ''
+ @orchestrator._cli_read_command(
+ 'cephadm set-user',
+ 'name=user,type=CephString',
+ 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
+ def set_ssh_user(self, user):
+ current_user = self.ssh_user
+ if user == current_user:
+ return 0, "value unchanged", ""
+
+ self.set_store('ssh_user', user)
+ self._reconfig_ssh()
+
+ host = self.cache.get_hosts()[0]
+ r = self._check_host(host)
+ if r is not None:
+ #connection failed reset user
+ self.set_store('ssh_user', current_user)
+ self._reconfig_ssh()
+ return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
+
+ msg = 'ssh user set to %s' % user
+ if user != 'root':
+ msg += ' sudo will be used'
+ self.log.info(msg)
+ return 0, msg, ''
+
+ @orchestrator._cli_read_command(
+ 'cephadm registry-login',
+ "name=url,type=CephString,req=false "
+ "name=username,type=CephString,req=false "
+ "name=password,type=CephString,req=false",
+ 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
+ def registry_login(self, url=None, username=None, password=None, inbuf=None):
+ # if password not given in command line, get it through file input
+ if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
+ return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
+ "or -i <login credentials json file>")
+ elif not (url and username and password):
+ login_info = json.loads(inbuf)
+ if "url" in login_info and "username" in login_info and "password" in login_info:
+ url = login_info["url"]
+ username = login_info["username"]
+ password = login_info["password"]
+ else:
+ return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
+ "Please setup json file as\n"
+ "{\n"
+ " \"url\": \"REGISTRY_URL\",\n"
+ " \"username\": \"REGISTRY_USERNAME\",\n"
+ " \"password\": \"REGISTRY_PASSWORD\"\n"
+ "}\n")
+ # verify login info works by attempting login on random host
+ host = None
+ for host_name in self.inventory.keys():
+ host = host_name
+ break
+ if not host:
+ raise OrchestratorError('no hosts defined')
+ r = self._registry_login(host, url, username, password)
+ if r is not None:
+ return 1, '', r
+ # if logins succeeded, store info
+ self.log.debug("Host logins successful. Storing login info.")
+ self.set_module_option('registry_url', url)
+ self.set_module_option('registry_username', username)
+ self.set_module_option('registry_password', password)
+ # distribute new login info to all hosts
+ self.cache.distribute_new_registry_login_info()
+ return 0, "registry login scheduled", ''
+
@orchestrator._cli_read_command(
'cephadm check-host',
'name=host,type=CephString '
'name=addr,type=CephString,req=false',
'Check whether we can access and manage a remote host')
def check_host(self, host, addr=None):
- out, err, code = self._run_cephadm(host, 'client', 'check-host',
- ['--expect-hostname', host],
- addr=addr,
- error_ok=True, no_fsid=True)
- if code:
- return 1, '', ('check-host failed:\n' + '\n'.join(err))
+ try:
+ out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
+ ['--expect-hostname', host],
+ addr=addr,
+ error_ok=True, no_fsid=True)
+ if code:
+ return 1, '', ('check-host failed:\n' + '\n'.join(err))
+ except OrchestratorError as e:
+ self.log.exception(f"check-host failed for '{host}'")
+ return 1, '', ('check-host failed:\n' +
+ f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
# if we have an outstanding health alert for this host, give the
# serve thread a kick
if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
'name=addr,type=CephString,req=false',
'Prepare a remote host for use with cephadm')
def _prepare_host(self, host, addr=None):
- out, err, code = self._run_cephadm(host, 'client', 'prepare-host',
+ out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
['--expect-hostname', host],
addr=addr,
error_ok=True, no_fsid=True)
self.event.set()
return 0, '%s (%s) ok' % (host, addr), err
- def _get_connection(self, host):
+ def _get_connection(self, host: str):
"""
Setup a connection for running commands on remote host.
"""
conn = remoto.Connection(
n,
logger=child_logger,
- ssh_options=self._ssh_options)
+ ssh_options=self._ssh_options,
+ sudo=True if self.ssh_user != 'root' else False)
r = conn.import_module(remotes)
self._cons[host] = conn, r
executable_path))
return executable_path
- def _run_cephadm(self,
- host: str,
- entity: Optional[str],
- command: str,
- args: List[str],
- addr: Optional[str] = None,
- stdin: Optional[str] = None,
- no_fsid=False,
- error_ok=False,
- image: Optional[str] = None,
- env_vars: Optional[List[str]] = None,
- ) -> Tuple[List[str], List[str], int]:
- """
- Run cephadm on the remote host with the given command + args
-
- :env_vars: in format -> [KEY=VALUE, ..]
- """
+ @contextmanager
+ def _remote_connection(self,
+ host: str,
+ addr: Optional[str]=None,
+ ) -> Iterator[Tuple["BaseConnection", Any]]:
if not addr and host in self.inventory:
addr = self.inventory.get_addr(host)
try:
try:
+ if not addr:
+ raise OrchestratorError("host address is empty")
conn, connr = self._get_connection(addr)
except OSError as e:
- if error_ok:
- self.log.exception('failed to establish ssh connection')
- return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
- raise execnet.gateway_bootstrap.HostNotFound(str(e)) from e
+ self._reset_con(host)
+ msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
+ raise execnet.gateway_bootstrap.HostNotFound(msg)
+ yield (conn, connr)
+
+ except execnet.gateway_bootstrap.HostNotFound as e:
+ # this is a misleading exception as it seems to be thrown for
+ # any sort of connection failure, even those having nothing to
+ # do with "host not found" (e.g., ssh key permission denied).
+ self.offline_hosts.add(host)
+ self._reset_con(host)
+
+ user = self.ssh_user if self.mode == 'root' else 'cephadm'
+ msg = f'''Failed to connect to {host} ({addr}).
+Check that the host is reachable and accepts connections using the cephadm SSH key
+
+you may want to run:
+> ceph cephadm get-ssh-config > ssh_config
+> ceph config-key get mgr/cephadm/ssh_identity_key > key
+> ssh -F ssh_config -i key {user}@{host}'''
+ raise OrchestratorError(msg) from e
+ except Exception as ex:
+ self.log.exception(ex)
+ raise
+
+ def _get_container_image(self, daemon_name: str) -> str:
+ daemon_type = daemon_name.split('.', 1)[0] # type: ignore
+ if daemon_type in CEPH_TYPES or \
+ daemon_type == 'nfs' or \
+ daemon_type == 'iscsi':
+ # get container image
+ ret, image, err = self.check_mon_command({
+ 'prefix': 'config get',
+ 'who': utils.name_to_config_section(daemon_name),
+ 'key': 'container_image',
+ })
+ image = image.strip() # type: ignore
+ elif daemon_type == 'prometheus':
+ image = self.container_image_prometheus
+ elif daemon_type == 'grafana':
+ image = self.container_image_grafana
+ elif daemon_type == 'alertmanager':
+ image = self.container_image_alertmanager
+ elif daemon_type == 'node-exporter':
+ image = self.container_image_node_exporter
+ else:
+ assert False, daemon_type
+
+ self.log.debug('%s container image %s' % (daemon_name, image))
+
+ return image
+
+ def _run_cephadm(self,
+ host: str,
+ entity: Union[CephadmNoImage, str],
+ command: str,
+ args: List[str],
+ addr: Optional[str] = "",
+ stdin: Optional[str] = "",
+ no_fsid: Optional[bool] = False,
+ error_ok: Optional[bool] = False,
+ image: Optional[str] = "",
+ env_vars: Optional[List[str]]= None,
+ ) -> Tuple[List[str], List[str], int]:
+ """
+ Run cephadm on the remote host with the given command + args
+
+ :env_vars: in format -> [KEY=VALUE, ..]
+ """
+ with self._remote_connection(host, addr) as tpl:
+ conn, connr = tpl
assert image or entity
- if not image:
- daemon_type = entity.split('.', 1)[0] # type: ignore
- if daemon_type in CEPH_TYPES or \
- daemon_type == 'nfs' or \
- daemon_type == 'iscsi':
- # get container image
- ret, image, err = self.check_mon_command({
- 'prefix': 'config get',
- 'who': utils.name_to_config_section(entity),
- 'key': 'container_image',
- })
- image = image.strip() # type: ignore
- elif daemon_type == 'prometheus':
- image = self.container_image_prometheus
- elif daemon_type == 'grafana':
- image = self.container_image_grafana
- elif daemon_type == 'alertmanager':
- image = self.container_image_alertmanager
- elif daemon_type == 'node-exporter':
- image = self.container_image_node_exporter
-
- self.log.debug('%s container image %s' % (entity, image))
+ if not image and entity is not cephadmNoImage:
+ image = self._get_container_image(entity)
final_args = []
host, remotes.PYTHONS, remotes.PATH))
try:
out, err, code = remoto.process.check(
- conn,
- [python, '-u'],
- stdin=script.encode('utf-8'))
+ conn,
+ [python, '-u'],
+ stdin=script.encode('utf-8'))
except RuntimeError as e:
self._reset_con(host)
if error_ok:
if err:
self.log.debug('err: %s' % '\n'.join(err))
if code and not error_ok:
- raise RuntimeError(
+ raise OrchestratorError(
'cephadm exited with an error code: %d, stderr:%s' % (
code, '\n'.join(err)))
return out, err, code
- except execnet.gateway_bootstrap.HostNotFound as e:
- # this is a misleading exception as it seems to be thrown for
- # any sort of connection failure, even those having nothing to
- # do with "host not found" (e.g., ssh key permission denied).
- self.offline_hosts.add(host)
- user = 'root' if self.mode == 'root' else 'cephadm'
- msg = f'''Failed to connect to {host} ({addr}).
-Check that the host is reachable and accepts connections using the cephadm SSH key
-
-you may want to run:
-> ceph cephadm get-ssh-config > ssh_config
-> ceph config-key get mgr/cephadm/ssh_identity_key > key
-> ssh -F ssh_config -i key {user}@{host}'''
- raise OrchestratorError(msg) from e
- except Exception as ex:
- self.log.exception(ex)
- raise
def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
:param host: host name
"""
assert_valid_host(spec.hostname)
- out, err, code = self._run_cephadm(spec.hostname, 'client', 'check-host',
+ out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
['--expect-hostname', spec.hostname],
addr=spec.addr,
error_ok=True, no_fsid=True)
return "Removed host '{}'".format(host)
@trivial_completion
- def update_host_addr(self, host, addr):
+ def update_host_addr(self, host, addr) -> str:
self.inventory.set_addr(host, addr)
self._reset_con(host)
self.event.set() # refresh stray health check
return list(self.inventory.all_specs())
@trivial_completion
- def add_host_label(self, host, label):
+ def add_host_label(self, host, label) -> str:
self.inventory.add_label(host, label)
self.log.info('Added label %s to host %s' % (label, host))
return 'Added label %s to host %s' % (label, host)
@trivial_completion
- def remove_host_label(self, host, label):
+ def remove_host_label(self, host, label) -> str:
self.inventory.rm_label(host, label)
self.log.info('Removed label %s to host %s' % (label, host))
return 'Removed label %s from host %s' % (label, host)
+ @trivial_completion
+ def host_ok_to_stop(self, hostname: str):
+ if hostname not in self.cache.get_hosts():
+ raise OrchestratorError(f'Cannot find host "{hostname}"')
+
+ daemons = self.cache.get_daemons()
+ daemon_map = defaultdict(lambda: [])
+ for dd in daemons:
+ if dd.hostname == hostname:
+ daemon_map[dd.daemon_type].append(dd.daemon_id)
+
+ for daemon_type,daemon_ids in daemon_map.items():
+ r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
+ if r.retval:
+ self.log.error(f'It is NOT safe to stop host {hostname}')
+ raise orchestrator.OrchestratorError(
+ r.stderr,
+ errno=r.retval)
+
+ msg = f'It is presumed safe to stop host {hostname}'
+ self.log.info(msg)
+ return msg
+
def update_osdspec_previews(self, search_host: str = ''):
# Set global 'pending' flag for host
self.cache.loading_osdspec_preview.add(search_host)
self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
return True
- def _refresh_host_daemons(self, host):
+ def _refresh_hosts_and_daemons(self) -> None:
+ bad_hosts = []
+ failures = []
+
+ @forall_hosts
+ def refresh(host):
+ if self.cache.host_needs_check(host):
+ r = self._check_host(host)
+ if r is not None:
+ bad_hosts.append(r)
+ if self.cache.host_needs_daemon_refresh(host):
+ self.log.debug('refreshing %s daemons' % host)
+ r = self._refresh_host_daemons(host)
+ if r:
+ failures.append(r)
+
+ if self.cache.host_needs_registry_login(host) and self.registry_url:
+ self.log.debug(f"Logging `{host}` into custom registry")
+ r = self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
+ if r:
+ bad_hosts.append(r)
+
+ if self.cache.host_needs_device_refresh(host):
+ self.log.debug('refreshing %s devices' % host)
+ r = self._refresh_host_devices(host)
+ if r:
+ failures.append(r)
+
+ if self.cache.host_needs_osdspec_preview_refresh(host):
+ self.log.debug(f"refreshing OSDSpec previews for {host}")
+ r = self._refresh_host_osdspec_previews(host)
+ if r:
+ failures.append(r)
+
+ if self.cache.host_needs_new_etc_ceph_ceph_conf(host):
+ self.log.debug(f"deploying new /etc/ceph/ceph.conf on `{host}`")
+ r = self._deploy_etc_ceph_ceph_conf(host)
+ if r:
+ bad_hosts.append(r)
+
+ refresh(self.cache.get_hosts())
+
+ health_changed = False
+ if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
+ del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
+ health_changed = True
+ if bad_hosts:
+ self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
+ 'severity': 'warning',
+ 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
+ 'count': len(bad_hosts),
+ 'detail': bad_hosts,
+ }
+ health_changed = True
+ if failures:
+ self.health_checks['CEPHADM_REFRESH_FAILED'] = {
+ 'severity': 'warning',
+ 'summary': 'failed to probe daemons or devices',
+ 'count': len(failures),
+ 'detail': failures,
+ }
+ health_changed = True
+ elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
+ del self.health_checks['CEPHADM_REFRESH_FAILED']
+ health_changed = True
+ if health_changed:
+ self.set_health_checks(self.health_checks)
+
+ def _refresh_host_daemons(self, host) -> Optional[str]:
try:
out, err, code = self._run_cephadm(
host, 'mon', 'ls', [], no_fsid=True)
self.cache.save_host(host)
return None
- def _refresh_host_devices(self, host):
+ def _refresh_host_devices(self, host) -> Optional[str]:
try:
out, err, code = self._run_cephadm(
host, 'osd',
self.cache.save_host(host)
return None
+ def _deploy_etc_ceph_ceph_conf(self, host: str) -> Optional[str]:
+ ret, config, err = self.check_mon_command({
+ "prefix": "config generate-minimal-conf",
+ })
+
+ try:
+ with self._remote_connection(host) as tpl:
+ conn, connr = tpl
+ out, err, code = remoto.process.check(
+ conn,
+ ['mkdir', '-p', '/etc/ceph'])
+ if code:
+ return f'failed to create /etc/ceph on {host}: {err}'
+ out, err, code = remoto.process.check(
+ conn,
+ ['dd', 'of=/etc/ceph/ceph.conf'],
+ stdin=config.encode('utf-8')
+ )
+ if code:
+ return f'failed to create /etc/ceph/ceph.conf on {host}: {err}'
+ self.cache.update_last_etc_ceph_ceph_conf(host)
+ self.cache.save_host(host)
+ except OrchestratorError as e:
+ return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
+ return None
+
+ def _invalidate_daemons_and_kick_serve(self, filter_host=None):
+ if filter_host:
+ self.cache.invalidate_host_daemons(filter_host)
+ else:
+ for h in self.cache.get_hosts():
+ # Also discover daemons deployed manually
+ self.cache.invalidate_host_daemons(h)
+
+ self._kick_serve_loop()
+
@trivial_completion
- def describe_service(self, service_type=None, service_name=None,
- refresh=False):
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.ServiceDescription]:
if refresh:
- # ugly sync path, FIXME someday perhaps?
- for host in self.inventory.keys():
- self._refresh_host_daemons(host)
+ self._invalidate_daemons_and_kick_serve()
+ self.log.info('Kicked serve() loop to refresh all services')
+
# <service_map>
- sm = {} # type: Dict[str, orchestrator.ServiceDescription]
+ sm: Dict[str, orchestrator.ServiceDescription] = {}
osd_count = 0
for h, dm in self.cache.get_daemons_with_volatile_status():
for name, dd in dm.items():
OSDs do not know the affinity to their spec out of the box.
"""
n = f"osd.{dd.osdspec_affinity}"
+ if not dd.osdspec_affinity:
+ # If there is no osdspec_affinity, the spec should suffice for displaying
+ continue
if n in self.spec_store.specs:
spec = self.spec_store.specs[n]
else:
container_image_id=dd.container_image_id,
container_image_name=dd.container_image_name,
spec=spec,
+ events=self.events.get_for_service(spec.service_name()),
)
if n in self.spec_store.specs:
if dd.daemon_type == 'osd':
"""
The osd count can't be determined by the Placement spec.
- It's rather pointless to show a actual/expected representation
+ Showing an actual/expected representation cannot be determined
here. So we're setting running = size for now.
"""
osd_count += 1
sm[n].size = osd_count
else:
- sm[n].size = spec.placement.get_host_selection_size(self._get_hosts)
+ sm[n].size = spec.placement.get_host_selection_size(
+ self.inventory.all_specs())
sm[n].created = self.spec_store.spec_created[n]
if service_type == 'nfs':
continue
sm[n] = orchestrator.ServiceDescription(
spec=spec,
- size=spec.placement.get_host_selection_size(self._get_hosts),
+ size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
running=0,
+ events=self.events.get_for_service(spec.service_name()),
)
if service_type == 'nfs':
spec = cast(NFSServiceSpec, spec)
return list(sm.values())
@trivial_completion
- def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
- host=None, refresh=False):
+ def list_daemons(self,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.DaemonDescription]:
if refresh:
- # ugly sync path, FIXME someday perhaps?
- if host:
- self._refresh_host_daemons(host)
- else:
- for hostname in self.inventory.keys():
- self._refresh_host_daemons(hostname)
+ self._invalidate_daemons_and_kick_serve(host)
+ self.log.info('Kicked serve() loop to refresh all daemons')
+
result = []
for h, dm in self.cache.get_daemons_with_volatile_status():
if host and h != host:
return result
@trivial_completion
- def service_action(self, action, service_name):
+ def service_action(self, action, service_name) -> List[str]:
args = []
for host, dm in self.cache.daemons.items():
for name, d in dm.items():
return self._daemon_actions(args)
@forall_hosts
- def _daemon_actions(self, daemon_type, daemon_id, host, action):
- return self._daemon_action(daemon_type, daemon_id, host, action)
+ def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
+ with set_exception_subject('daemon', DaemonDescription(
+ daemon_type=daemon_type,
+ daemon_id=daemon_id
+ ).name()):
+ return self._daemon_action(daemon_type, daemon_id, host, action)
+
+ def _daemon_action(self, daemon_type, daemon_id, host, action, image=None):
+ daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
+ host=host,
+ daemon_id=daemon_id,
+ daemon_type=daemon_type,
+ )
+
+ if image is not None:
+ if action != 'redeploy':
+ raise OrchestratorError(
+ f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
+ if daemon_type not in CEPH_TYPES:
+ raise OrchestratorError(
+ f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
+ f'types are: {", ".join(CEPH_TYPES)}')
+
+ self.check_mon_command({
+ 'prefix': 'config set',
+ 'name': 'container_image',
+ 'value': image,
+ 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
+ })
- def _daemon_action(self, daemon_type, daemon_id, host, action):
if action == 'redeploy':
# stop, recreate the container+unit, then restart
- return self._create_daemon(daemon_type, daemon_id, host)
+ return self._create_daemon(daemon_spec)
elif action == 'reconfig':
- return self._create_daemon(daemon_type, daemon_id, host,
- reconfig=True)
+ return self._create_daemon(daemon_spec, reconfig=True)
actions = {
'start': ['reset-failed', 'start'],
'stop': ['stop'],
'restart': ['reset-failed', 'restart'],
}
- name = '%s.%s' % (daemon_type, daemon_id)
+ name = daemon_spec.name()
for a in actions[action]:
- out, err, code = self._run_cephadm(
- host, name, 'unit',
- ['--name', name, a],
- error_ok=True)
- self.cache.invalidate_host_daemons(host)
- return "{} {} from host '{}'".format(action, name, host)
+ try:
+ out, err, code = self._run_cephadm(
+ host, name, 'unit',
+ ['--name', name, a])
+ except Exception:
+ self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
+ self.cache.invalidate_host_daemons(daemon_spec.host)
+ msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
+ self.events.for_daemon(name, 'INFO', msg)
+ return msg
@trivial_completion
- def daemon_action(self, action, daemon_type, daemon_id):
- args = []
- for host, dm in self.cache.daemons.items():
- for name, d in dm.items():
- if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
- args.append((d.daemon_type, d.daemon_id,
- d.hostname, action))
- if not args:
- raise orchestrator.OrchestratorError(
- 'Unable to find %s.%s daemon(s)' % (
- daemon_type, daemon_id))
- self.log.info('%s daemons %s' % (
- action.capitalize(),
- ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
- return self._daemon_actions(args)
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> str:
+ d = self.cache.get_daemon(daemon_name)
+
+ self.log.info(f'{action} daemon {daemon_name}')
+ return self._daemon_action(d.daemon_type, d.daemon_id,
+ d.hostname, action, image=image)
@trivial_completion
def remove_daemons(self, names):
return self._remove_daemons(args)
@trivial_completion
- def remove_service(self, service_name):
+ def remove_service(self, service_name) -> str:
self.log.info('Remove service %s' % service_name)
self._trigger_preview_refresh(service_name=service_name)
found = self.spec_store.rm(service_name)
if found:
self._kick_serve_loop()
- return ['Removed service %s' % service_name]
+ return 'Removed service %s' % service_name
else:
# must be idempotent: still a success.
- return [f'Failed to remove service. <{service_name}> was not found.']
+ return f'Failed to remove service. <{service_name}> was not found.'
@trivial_completion
- def get_inventory(self, host_filter=None, refresh=False):
+ def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
"""
Return the storage inventory of hosts matching the given filter.
- add filtering by label
"""
if refresh:
- # ugly sync path, FIXME someday perhaps?
- if host_filter:
- for host in host_filter.hosts:
- self._refresh_host_devices(host)
+ if host_filter and host_filter.hosts:
+ for h in host_filter.hosts:
+ self.cache.invalidate_host_devices(h)
else:
- for host in self.inventory.keys():
- self._refresh_host_devices(host)
+ for h in self.cache.get_hosts():
+ self.cache.invalidate_host_devices(h)
+
+ self.event.set()
+ self.log.info('Kicked serve() loop to refresh devices')
result = []
for host, dls in self.cache.devices.items():
- if host_filter and host not in host_filter.hosts:
+ if host_filter and host_filter.hosts and host not in host_filter.hosts:
continue
result.append(orchestrator.InventoryHost(host,
inventory.Devices(dls)))
return result
@trivial_completion
- def zap_device(self, host, path):
+ def zap_device(self, host, path) -> str:
self.log.info('Zap device %s:%s' % (host, path))
out, err, code = self._run_cephadm(
host, 'osd', 'ceph-volume',
return '\n'.join(out + err)
@trivial_completion
- def blink_device_light(self, ident_fault, on, locs):
+ def blink_device_light(self, ident_fault, on, locs) -> List[str]:
@forall_hosts
def blink(host, dev, path):
cmd = [
host, 'osd', 'shell', ['--'] + cmd,
error_ok=True)
if code:
- raise RuntimeError(
+ raise OrchestratorError(
'Unable to affect %s light for %s:%s. Command: %s' % (
ident_fault, host, dev, ' '.join(cmd)))
self.log.info('Set %s light for %s:%s %s' % (
r[str(osd_id)] = o.get('uuid', '')
return r
- def resolve_hosts_for_osdspecs(self,
- specs: Optional[List[DriveGroupSpec]] = None,
- service_name: Optional[str] = None
- ) -> List[str]:
- osdspecs = []
- if service_name:
- self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
- osdspecs = self.spec_store.find(service_name=service_name)
- self.log.debug(f"Found OSDSpecs: {osdspecs}")
- if specs:
- osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
- if not service_name and not specs:
- # if neither parameters are fulfilled, search for all available osdspecs
- osdspecs = self.spec_store.find(service_name='osd')
- self.log.debug(f"Found OSDSpecs: {osdspecs}")
- if not osdspecs:
- self.log.debug("No OSDSpecs found")
- return []
- return sum([spec.placement.filter_matching_hosts(self._get_hosts) for spec in osdspecs], [])
-
- def resolve_osdspecs_for_host(self, host):
- matching_specs = []
- self.log.debug(f"Finding OSDSpecs for host: <{host}>")
- for spec in self.spec_store.find('osd'):
- if host in spec.placement.filter_matching_hosts(self._get_hosts):
- self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
- matching_specs.append(spec)
- return matching_specs
-
def _trigger_preview_refresh(self,
specs: Optional[List[DriveGroupSpec]] = None,
- service_name: Optional[str] = None):
- refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+ service_name: Optional[str] = None,
+ ) -> None:
+ # Only trigger a refresh when a spec has changed
+ trigger_specs = []
+ if specs:
+ for spec in specs:
+ preview_spec = self.spec_store.spec_preview.get(spec.service_name())
+ # the to-be-preview spec != the actual spec, this means we need to
+ # trigger a refresh, if the spec has been removed (==None) we need to
+ # refresh as well.
+ if not preview_spec or spec != preview_spec:
+ trigger_specs.append(spec)
+ if service_name:
+ trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
+ if not any(trigger_specs):
+ return None
+
+ refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
for host in refresh_hosts:
self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
self.cache.osdspec_previews_refresh_queue.append(host)
@trivial_completion
- def apply_drivegroups(self, specs: List[DriveGroupSpec]):
- self._trigger_preview_refresh(specs=specs)
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
+ """
+ Deprecated. Please use `apply()` instead.
+
+ Keeping this around to be compapatible to mgr/dashboard
+ """
return [self._apply(spec) for spec in specs]
@trivial_completion
- def create_osds(self, drive_group: DriveGroupSpec):
- return self.osd_service.create(drive_group)
+ def create_osds(self, drive_group: DriveGroupSpec) -> str:
+ return self.osd_service.create_from_spec(drive_group)
- @trivial_completion
- def preview_osdspecs(self,
- osdspec_name: Optional[str] = None,
- osdspecs: Optional[List[DriveGroupSpec]] = None
- ):
- matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+ def _preview_osdspecs(self,
+ osdspecs: Optional[List[DriveGroupSpec]] = None
+ ):
+ if not osdspecs:
+ return {'n/a': [{'error': True,
+ 'message': 'No OSDSpec or matching hosts found.'}]}
+ matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
if not matching_hosts:
return {'n/a': [{'error': True,
'message': 'No OSDSpec or matching hosts found.'}]}
# Report 'pending' when any of the matching hosts is still loading previews (flag is True)
return {'n/a': [{'error': True,
'message': 'Preview data is being generated.. '
- 'Please try again in a bit.'}]}
- # drop all keys that are not in search_hosts and return preview struct
- return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
+ 'Please re-run this command in a bit.'}]}
+ # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
+ previews_for_specs = {}
+ for host, raw_reports in self.cache.osdspec_previews.items():
+ if host not in matching_hosts:
+ continue
+ osd_reports = []
+ for osd_report in raw_reports:
+ if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
+ osd_reports.append(osd_report)
+ previews_for_specs.update({host: osd_reports})
+ return previews_for_specs
def _calc_daemon_deps(self, daemon_type, daemon_id):
need = {
deps.append(dd.name())
return sorted(deps)
- def _get_config_and_keyring(self, daemon_type, daemon_id,
+ def _get_config_and_keyring(self, daemon_type, daemon_id, host,
keyring=None,
extra_ceph_config=None):
- # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
+ # type: (str, str, str, Optional[str], Optional[str]) -> Dict[str, Any]
# keyring
if not keyring:
- ename = utils.name_to_auth_entity(daemon_type + '.' + daemon_id)
+ ename = utils.name_to_auth_entity(daemon_type, daemon_id, host=host)
ret, keyring, err = self.check_mon_command({
'prefix': 'auth get',
'entity': ename,
}
def _create_daemon(self,
- daemon_type: str,
- daemon_id: str,
- host: str,
- keyring: Optional[str] = None,
- extra_args: Optional[List[str]] = None,
- extra_config: Optional[Dict[str, Any]] = None,
+ daemon_spec: CephadmDaemonSpec,
reconfig=False,
osd_uuid_map: Optional[Dict[str, Any]] = None,
redeploy=False,
) -> str:
- if not extra_args:
- extra_args = []
- if not extra_config:
- extra_config = {}
- name = '%s.%s' % (daemon_type, daemon_id)
-
- start_time = datetime.datetime.utcnow()
- deps = [] # type: List[str]
- cephadm_config = {} # type: Dict[str, Any]
- if daemon_type == 'prometheus':
- cephadm_config, deps = self.prometheus_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'grafana':
- cephadm_config, deps = self.grafana_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'nfs':
- cephadm_config, deps = \
- self.nfs_service._generate_nfs_config(daemon_type, daemon_id, host)
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'alertmanager':
- cephadm_config, deps = self.alertmanager_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- elif daemon_type == 'node-exporter':
- cephadm_config, deps = self.node_exporter_service.generate_config()
- extra_args.extend(['--config-json', '-'])
- else:
- # Ceph.daemons (mon, mgr, mds, osd, etc)
- cephadm_config = self._get_config_and_keyring(
- daemon_type, daemon_id,
- keyring=keyring,
- extra_ceph_config=extra_config.pop('config', ''))
- if extra_config:
- cephadm_config.update({'files': extra_config})
- extra_args.extend(['--config-json', '-'])
+
+ with set_exception_subject('service', orchestrator.DaemonDescription(
+ daemon_type=daemon_spec.daemon_type,
+ daemon_id=daemon_spec.daemon_id,
+ hostname=daemon_spec.host,
+ ).service_id(), overwrite=True):
+
+ start_time = datetime.datetime.utcnow()
+ cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
+
+ daemon_spec.extra_args.extend(['--config-json', '-'])
+
+ # TCP port to open in the host firewall
+ if daemon_spec.ports:
+ daemon_spec.extra_args.extend(['--tcp-ports', ' '.join(map(str,daemon_spec.ports))])
# osd deployments needs an --osd-uuid arg
- if daemon_type == 'osd':
+ if daemon_spec.daemon_type == 'osd':
if not osd_uuid_map:
osd_uuid_map = self.get_osd_uuid_map()
- osd_uuid = osd_uuid_map.get(daemon_id)
+ osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
if not osd_uuid:
- raise OrchestratorError('osd.%s not in osdmap' % daemon_id)
- extra_args.extend(['--osd-fsid', osd_uuid])
+ raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+ daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
- if reconfig:
- extra_args.append('--reconfig')
- if self.allow_ptrace:
- extra_args.append('--allow-ptrace')
+ if reconfig:
+ daemon_spec.extra_args.append('--reconfig')
+ if self.allow_ptrace:
+ daemon_spec.extra_args.append('--allow-ptrace')
- self.log.info('%s daemon %s on %s' % (
- 'Reconfiguring' if reconfig else 'Deploying',
- name, host))
+ if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
+ self._registry_login(daemon_spec.host, self.registry_url, self.registry_username, self.registry_password)
- out, err, code = self._run_cephadm(
- host, name, 'deploy',
- [
- '--name', name,
- ] + extra_args,
- stdin=json.dumps(cephadm_config))
- if not code and host in self.cache.daemons:
- # prime cached service state with what we (should have)
- # just created
- sd = orchestrator.DaemonDescription()
- sd.daemon_type = daemon_type
- sd.daemon_id = daemon_id
- sd.hostname = host
- sd.status = 1
- sd.status_desc = 'starting'
- self.cache.add_daemon(host, sd)
- self.cache.invalidate_host_daemons(host)
- self.cache.update_daemon_config_deps(host, name, deps, start_time)
- self.cache.save_host(host)
- return "{} {} on host '{}'".format(
- 'Reconfigured' if reconfig else 'Deployed', name, host)
+ self.log.info('%s daemon %s on %s' % (
+ 'Reconfiguring' if reconfig else 'Deploying',
+ daemon_spec.name(), daemon_spec.host))
+
+ out, err, code = self._run_cephadm(
+ daemon_spec.host, daemon_spec.name(), 'deploy',
+ [
+ '--name', daemon_spec.name(),
+ ] + daemon_spec.extra_args,
+ stdin=json.dumps(cephadm_config))
+ if not code and daemon_spec.host in self.cache.daemons:
+ # prime cached service state with what we (should have)
+ # just created
+ sd = orchestrator.DaemonDescription()
+ sd.daemon_type = daemon_spec.daemon_type
+ sd.daemon_id = daemon_spec.daemon_id
+ sd.hostname = daemon_spec.host
+ sd.status = 1
+ sd.status_desc = 'starting'
+ self.cache.add_daemon(daemon_spec.host, sd)
+ if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
+ self.requires_post_actions.add(daemon_spec.daemon_type)
+ self.cache.invalidate_host_daemons(daemon_spec.host)
+ self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
+ self.cache.save_host(daemon_spec.host)
+ msg = "{} {} on host '{}'".format(
+ 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+ if not code:
+ self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+ else:
+ what = 'reconfigure' if reconfig else 'deploy'
+ self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+ return msg
@forall_hosts
def _remove_daemons(self, name, host) -> str:
Remove a daemon
"""
(daemon_type, daemon_id) = name.split('.', 1)
- if daemon_type == 'mon':
- self._check_safe_to_destroy_mon(daemon_id)
- # remove mon from quorum before we destroy the daemon
- self.log.info('Removing monitor %s from monmap...' % name)
- ret, out, err = self.check_mon_command({
- 'prefix': 'mon rm',
- 'name': daemon_id,
- })
+ with set_exception_subject('service', orchestrator.DaemonDescription(
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ hostname=host,
+ ).service_id(), overwrite=True):
- args = ['--name', name, '--force']
- self.log.info('Removing daemon %s from %s' % (name, host))
- out, err, code = self._run_cephadm(
- host, name, 'rm-daemon', args)
- if not code:
- # remove item from cache
- self.cache.rm_daemon(host, name)
- self.cache.invalidate_host_daemons(host)
- return "Removed {} from host '{}'".format(name, host)
-
- def _create_fn(self, service_type: str) -> Callable[..., str]:
- try:
- d: Dict[str, function] = {
- 'mon': self.mon_service.create,
- 'mgr': self.mgr_service.create,
- 'osd': self.osd_service.create,
- 'mds': self.mds_service.create,
- 'rgw': self.rgw_service.create,
- 'rbd-mirror': self.rbd_mirror_service.create,
- 'nfs': self.nfs_service.create,
- 'grafana': self.grafana_service.create,
- 'alertmanager': self.alertmanager_service.create,
- 'prometheus': self.prometheus_service.create,
- 'node-exporter': self.node_exporter_service.create,
- 'crash': self.crash_service.create,
- 'iscsi': self.iscsi_service.create,
- }
- return d[service_type] # type: ignore
- except KeyError:
- self.log.exception(f'unknown service type {service_type}')
- raise OrchestratorError(f'unknown service type {service_type}') from e
+
+ self.cephadm_services[daemon_type].pre_remove(daemon_id)
+
+ args = ['--name', name, '--force']
+ self.log.info('Removing daemon %s from %s' % (name, host))
+ out, err, code = self._run_cephadm(
+ host, name, 'rm-daemon', args)
+ if not code:
+ # remove item from cache
+ self.cache.rm_daemon(host, name)
+ self.cache.invalidate_host_daemons(host)
+ return "Removed {} from host '{}'".format(name, host)
def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
- return {
+ fn = {
'mds': self.mds_service.config,
'rgw': self.rgw_service.config,
'nfs': self.nfs_service.config,
'iscsi': self.iscsi_service.config,
}.get(service_type)
+ return cast(Callable[[ServiceSpec], None], fn)
def _apply_service(self, spec: ServiceSpec) -> bool:
"""
daemon_type = spec.service_type
service_name = spec.service_name()
if spec.unmanaged:
- self.log.debug('Skipping unmanaged service %s spec' % service_name)
+ self.log.debug('Skipping unmanaged service %s' % service_name)
+ return False
+ if spec.preview_only:
+ self.log.debug('Skipping preview_only service %s' % service_name)
return False
self.log.debug('Applying service %s spec' % service_name)
- create_func = self._create_fn(daemon_type)
config_func = self._config_fn(daemon_type)
if daemon_type == 'osd':
- create_func(spec)
+ self.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
# TODO: return True would result in a busy loop
return False
# host
return len(self.cache.networks[host].get(public_network, [])) > 0
- hosts = HostAssignment(
+ ha = HostAssignment(
spec=spec,
get_hosts_func=self._get_hosts,
get_daemons_func=self.cache.get_daemons_by_service,
filter_new_host=matches_network if daemon_type == 'mon' else None,
- ).place()
+ )
+
+ hosts: List[HostPlacementSpec] = ha.place()
+ self.log.debug('Usable hosts: %s' % hosts)
r = False
# add any?
did_config = False
- hosts_with_daemons = {d.hostname for d in daemons}
- self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
- for host, network, name in hosts:
- if host not in hosts_with_daemons:
- if not did_config and config_func:
- config_func(spec)
- did_config = True
- daemon_id = self.get_unique_name(daemon_type, host, daemons,
- prefix=spec.service_id,
- forcename=name)
- self.log.debug('Placing %s.%s on host %s' % (
- daemon_type, daemon_id, host))
- if daemon_type == 'mon':
- create_func(daemon_id, host, network) # type: ignore
- elif daemon_type in ['nfs', 'iscsi']:
- create_func(daemon_id, host, spec) # type: ignore
+
+ add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
+ self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
+
+ remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
+ self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
+
+ for host, network, name in add_daemon_hosts:
+ daemon_id = self.get_unique_name(daemon_type, host, daemons,
+ prefix=spec.service_id,
+ forcename=name)
+
+ if not did_config and config_func:
+ if daemon_type == 'rgw':
+ rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
+ rgw_config_func(cast(RGWSpec, spec), daemon_id)
else:
- create_func(daemon_id, host) # type: ignore
+ config_func(spec)
+ did_config = True
- # add to daemon list so next name(s) will also be unique
- sd = orchestrator.DaemonDescription(
- hostname=host,
- daemon_type=daemon_type,
- daemon_id=daemon_id,
- )
- daemons.append(sd)
- r = True
+ daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
+ self.log.debug('Placing %s.%s on host %s' % (
+ daemon_type, daemon_id, host))
+
+ self.cephadm_services[daemon_type].create(daemon_spec)
+
+ # add to daemon list so next name(s) will also be unique
+ sd = orchestrator.DaemonDescription(
+ hostname=host,
+ daemon_type=daemon_type,
+ daemon_id=daemon_id,
+ )
+ daemons.append(sd)
+ r = True
# remove any?
- target_hosts = [h.hostname for h in hosts]
- for d in daemons:
- if d.hostname not in target_hosts:
- # NOTE: we are passing the 'force' flag here, which means
- # we can delete a mon instances data.
- self._remove_daemon(d.name(), d.hostname)
- r = True
+ def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
+ daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
+ r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
+ return not r.retval
+
+ while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
+ # let's find a subset that is ok-to-stop
+ remove_daemon_hosts.pop()
+ for d in remove_daemon_hosts:
+ # NOTE: we are passing the 'force' flag here, which means
+ # we can delete a mon instances data.
+ self._remove_daemon(d.name(), d.hostname)
+ r = True
return r
except Exception as e:
self.log.exception('Failed to apply %s spec %s: %s' % (
spec.service_name(), spec, e))
+ self.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
+
return r
def _check_pool_exists(self, pool, service_name):
f'service {service_name}')
def _check_daemons(self):
- # get monmap mtime so we can refresh configs when mons change
- monmap = self.get('mon_map')
- last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
- monmap['modified'], CEPH_DATEFMT)
- if last_monmap and last_monmap > datetime.datetime.utcnow():
- last_monmap = None # just in case clocks are skewed
daemons = self.cache.get_daemons()
- daemons_post = defaultdict(list)
+ daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
for dd in daemons:
# orphan?
spec = self.spec_store.specs.get(dd.service_name(), None)
continue
# These daemon types require additional configs after creation
- if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+ if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
daemons_post[dd.daemon_type].append(dd)
+
+ if self.cephadm_services[dd.daemon_type].get_active_daemon(
+ self.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
+ dd.is_active = True
+ else:
+ dd.is_active = False
deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
last_deps, last_config = self.cache.get_daemon_last_config_deps(
self.log.info('Reconfiguring %s (dependencies changed)...' % (
dd.name()))
reconfig = True
- elif last_monmap and \
- last_monmap > last_config and \
+ elif self.last_monmap and \
+ self.last_monmap > last_config and \
dd.daemon_type in CEPH_TYPES:
self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
reconfig = True
if reconfig:
- self._create_daemon(dd.daemon_type, dd.daemon_id,
- dd.hostname, reconfig=True)
+ try:
+ self._create_daemon(
+ CephadmDaemonSpec(
+ host=dd.hostname,
+ daemon_id=dd.daemon_id,
+ daemon_type=dd.daemon_type),
+ reconfig=True)
+ except OrchestratorError as e:
+ self.events.from_orch_error(e)
+ if dd.daemon_type in daemons_post:
+ del daemons_post[dd.daemon_type]
+ # continue...
+ except Exception as e:
+ self.events.for_daemon_from_exception(dd.name(), e)
+ if dd.daemon_type in daemons_post:
+ del daemons_post[dd.daemon_type]
+ # continue...
# do daemon post actions
for daemon_type, daemon_descs in daemons_post.items():
- self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
+ if daemon_type in self.requires_post_actions:
+ self.requires_post_actions.remove(daemon_type)
+ self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
def _add_daemon(self, daemon_type, spec,
create_func: Callable[..., T], config_func=None) -> List[T]:
raise OrchestratorError('too few hosts: want %d, have %s' % (
count, hosts))
- if config_func:
- config_func(spec)
+ did_config = False
- args = [] # type: List[tuple]
+ args = [] # type: List[CephadmDaemonSpec]
for host, network, name in hosts:
daemon_id = self.get_unique_name(daemon_type, host, daemons,
prefix=spec.service_id,
forcename=name)
+
+ if not did_config and config_func:
+ if daemon_type == 'rgw':
+ config_func(spec, daemon_id)
+ else:
+ config_func(spec)
+ did_config = True
+
+ daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
self.log.debug('Placing %s.%s on host %s' % (
daemon_type, daemon_id, host))
- if daemon_type == 'mon':
- args.append((daemon_id, host, network)) # type: ignore
- elif daemon_type in ['nfs', 'iscsi']:
- args.append((daemon_id, host, spec)) # type: ignore
- else:
- args.append((daemon_id, host)) # type: ignore
+ args.append(daemon_spec)
# add to daemon list so next name(s) will also be unique
sd = orchestrator.DaemonDescription(
return create_func_map(args)
@trivial_completion
- def apply_mon(self, spec):
+ def apply_mon(self, spec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('mgr', spec, self.mgr_service.create)
def _apply(self, spec: GenericSpec) -> str:
+ self.migration.verify_no_migration()
+
if spec.service_type == 'host':
return self._add_host(cast(HostSpec, spec))
+ if spec.service_type == 'osd':
+ # _trigger preview refresh needs to be smart and
+ # should only refresh if a change has been detected
+ self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
+
return self._apply_service_spec(cast(ServiceSpec, spec))
+ def _plan(self, spec: ServiceSpec):
+ if spec.service_type == 'osd':
+ return {'service_name': spec.service_name(),
+ 'service_type': spec.service_type,
+ 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
+
+ ha = HostAssignment(
+ spec=spec,
+ get_hosts_func=self._get_hosts,
+ get_daemons_func=self.cache.get_daemons_by_service,
+ )
+ ha.validate()
+ hosts = ha.place()
+
+ add_daemon_hosts = ha.add_daemon_hosts(hosts)
+ remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
+
+ return {
+ 'service_name': spec.service_name(),
+ 'service_type': spec.service_type,
+ 'add': [hs.hostname for hs in add_daemon_hosts],
+ 'remove': [d.hostname for d in remove_daemon_hosts]
+ }
+
+ @trivial_completion
+ def plan(self, specs: List[GenericSpec]) -> List:
+ results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
+ 'to the current inventory setup. If any on these conditions changes, the \n'
+ 'preview will be invalid. Please make sure to have a minimal \n'
+ 'timeframe between planning and applying the specs.'}]
+ if any([spec.service_type == 'host' for spec in specs]):
+ return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
+ for spec in specs:
+ results.append(self._plan(cast(ServiceSpec, spec)))
+ return results
+
def _apply_service_spec(self, spec: ServiceSpec) -> str:
if spec.placement.is_empty():
# fill in default placement
return "Scheduled %s update..." % spec.service_name()
@trivial_completion
- def apply(self, specs: List[GenericSpec]):
+ def apply(self, specs: List[GenericSpec]) -> List[str]:
results = []
for spec in specs:
results.append(self._apply(spec))
return results
@trivial_completion
- def apply_mgr(self, spec):
+ def apply_mgr(self, spec) -> str:
return self._apply(spec)
@trivial_completion
- def add_mds(self, spec: ServiceSpec):
+ def add_mds(self, spec: ServiceSpec) -> List[str]:
return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
@trivial_completion
- def apply_mds(self, spec: ServiceSpec):
+ def apply_mds(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
- def add_rgw(self, spec):
+ def add_rgw(self, spec) -> List[str]:
return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
@trivial_completion
- def apply_rgw(self, spec):
+ def apply_rgw(self, spec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
@trivial_completion
- def apply_iscsi(self, spec):
+ def apply_iscsi(self, spec) -> str:
return self._apply(spec)
@trivial_completion
- def add_rbd_mirror(self, spec):
+ def add_rbd_mirror(self, spec) -> List[str]:
return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
@trivial_completion
- def apply_rbd_mirror(self, spec):
+ def apply_rbd_mirror(self, spec) -> str:
return self._apply(spec)
@trivial_completion
- def add_nfs(self, spec):
+ def add_nfs(self, spec) -> List[str]:
return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
@trivial_completion
- def apply_nfs(self, spec):
+ def apply_nfs(self, spec) -> str:
return self._apply(spec)
def _get_dashboard_url(self):
return self.get('mgr_map').get('services', {}).get('dashboard', '')
@trivial_completion
- def add_prometheus(self, spec):
+ def add_prometheus(self, spec) -> List[str]:
return self._add_daemon('prometheus', spec, self.prometheus_service.create)
@trivial_completion
- def apply_prometheus(self, spec):
+ def apply_prometheus(self, spec) -> str:
return self._apply(spec)
@trivial_completion
self.node_exporter_service.create)
@trivial_completion
- def apply_node_exporter(self, spec):
+ def apply_node_exporter(self, spec) -> str:
return self._apply(spec)
@trivial_completion
self.crash_service.create)
@trivial_completion
- def apply_crash(self, spec):
+ def apply_crash(self, spec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('grafana', spec, self.grafana_service.create)
@trivial_completion
- def apply_grafana(self, spec: ServiceSpec):
+ def apply_grafana(self, spec: ServiceSpec) -> str:
return self._apply(spec)
@trivial_completion
return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
@trivial_completion
- def apply_alertmanager(self, spec: ServiceSpec):
+ def apply_alertmanager(self, spec: ServiceSpec) -> str:
return self._apply(spec)
def _get_container_image_id(self, image_name):
break
if not host:
raise OrchestratorError('no hosts defined')
+ if self.cache.host_needs_registry_login(host) and self.registry_url:
+ self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
out, err, code = self._run_cephadm(
- host, None, 'pull', [],
+ host, '', 'pull', [],
image=image_name,
no_fsid=True,
error_ok=True)
return image_id, ceph_version
@trivial_completion
- def upgrade_check(self, image, version):
+ def upgrade_check(self, image, version) -> str:
if version:
target_name = self.container_image_base + ':v' + version
elif image:
return json.dumps(r, indent=4, sort_keys=True)
@trivial_completion
- def upgrade_status(self):
+ def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
return self.upgrade.upgrade_status()
@trivial_completion
- def upgrade_start(self, image, version):
+ def upgrade_start(self, image, version) -> str:
return self.upgrade.upgrade_start(image, version)
@trivial_completion
- def upgrade_pause(self):
+ def upgrade_pause(self) -> str:
return self.upgrade.upgrade_pause()
@trivial_completion
- def upgrade_resume(self):
+ def upgrade_resume(self) -> str:
return self.upgrade.upgrade_resume()
@trivial_completion
- def upgrade_stop(self):
+ def upgrade_stop(self) -> str:
return self.upgrade.upgrade_stop()
@trivial_completion
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False):
+ force: bool = False) -> str:
"""
Takes a list of OSDs and schedules them for removal.
The function that takes care of the actual removal is
- _remove_osds_bg().
+ process_removal_queue().
"""
- daemons = self.cache.get_daemons_by_service('osd')
- found: Set[OSDRemoval] = set()
+ daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
+ to_remove_daemons = list()
for daemon in daemons:
- if daemon.daemon_id not in osd_ids:
- continue
- found.add(OSDRemoval(daemon.daemon_id, replace, force,
- daemon.hostname, daemon.name(),
- datetime.datetime.utcnow(), -1))
+ if daemon.daemon_id in osd_ids:
+ to_remove_daemons.append(daemon)
+ if not to_remove_daemons:
+ return f"Unable to find OSDs: {osd_ids}"
- not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
- if not_found:
- raise OrchestratorError('Unable to find OSD: %s' % not_found)
-
- self.rm_util.queue_osds_for_removal(found)
+ for daemon in to_remove_daemons:
+ try:
+ self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
+ replace=replace,
+ force=force,
+ hostname=daemon.hostname,
+ fullname=daemon.name(),
+ process_started_at=datetime.datetime.utcnow(),
+ remove_util=self.rm_util))
+ except NotFoundError:
+ return f"Unable to find OSDs: {osd_ids}"
# trigger the serve loop to initiate the removal
self._kick_serve_loop()
return "Scheduled OSD(s) for removal"
+ @trivial_completion
+ def stop_remove_osds(self, osd_ids: List[str]):
+ """
+ Stops a `removal` process for a List of OSDs.
+ This will revert their weight and remove it from the osds_to_remove queue
+ """
+ for osd_id in osd_ids:
+ try:
+ self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
+ remove_util=self.rm_util))
+ except (NotFoundError, KeyError):
+ return f'Unable to find OSD in the queue: {osd_id}'
+
+ # trigger the serve loop to halt the removal
+ self._kick_serve_loop()
+ return "Stopped OSD(s) removal"
+
@trivial_completion
def remove_osds_status(self):
"""
The CLI call to retrieve an osd removal report
"""
- return self.rm_util.report
+ return self.to_remove_osds.all_osds()