+import json
+import re
import logging
-from typing import TYPE_CHECKING, List
+import subprocess
+from abc import ABCMeta, abstractmethod
+from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, Optional, Dict, Any, Tuple
-from mgr_module import MonCommandFailed
+from mgr_module import HandleCommandResult, MonCommandFailed
from ceph.deployment.service_spec import ServiceSpec, RGWSpec
from orchestrator import OrchestratorError, DaemonDescription
logger = logging.getLogger(__name__)
+ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
-class CephadmService:
+
+class CephadmDaemonSpec(Generic[ServiceSpecs]):
+ # typing.NamedTuple + Generic is broken in py36
+ def __init__(self, host: str, daemon_id,
+ spec: Optional[ServiceSpecs]=None,
+ network: Optional[str]=None,
+ keyring: Optional[str]=None,
+ extra_args: Optional[List[str]]=None,
+ extra_config: Optional[Dict[str, Any]]=None,
+ daemon_type: Optional[str]=None,
+ ports: Optional[List[int]]=None,):
+ """
+ Used for
+ * deploying new daemons. then everything is set
+ * redeploying existing daemons, then only the first three attrs are set.
+
+ Would be great to have a consistent usage where all properties are set.
+ """
+ self.host: str = host
+ self.daemon_id = daemon_id
+ daemon_type = daemon_type or (spec.service_type if spec else None)
+ assert daemon_type is not None
+ self.daemon_type: str = daemon_type
+
+ # would be great to have the spec always available:
+ self.spec: Optional[ServiceSpecs] = spec
+
+ # mons
+ self.network = network
+
+ # for run_cephadm.
+ self.keyring: Optional[str] = keyring
+
+ # For run_cephadm. Would be great to have more expressive names.
+ self.extra_args: List[str] = extra_args or []
+ self.extra_config: Dict[str, Any] = extra_config or {}
+
+ # TCP ports used by the daemon
+ self.ports: List[int] = ports or []
+
+
+ def name(self) -> str:
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+
+class CephadmService(metaclass=ABCMeta):
"""
Base class for service types. Often providing a create() and config() fn.
"""
+
+ @property
+ @abstractmethod
+ def TYPE(self):
+ pass
+
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
+ def make_daemon_spec(self, host, daemon_id, netowrk, spec: ServiceSpecs) -> CephadmDaemonSpec:
+ return CephadmDaemonSpec(
+ host=host,
+ daemon_id=daemon_id,
+ spec=spec,
+ network=netowrk
+ )
+
+ def create(self, daemon_spec: CephadmDaemonSpec):
+ raise NotImplementedError()
+
+ def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+ # Ceph.daemons (mon, mgr, mds, osd, etc)
+ cephadm_config = self.mgr._get_config_and_keyring(
+ daemon_spec.daemon_type,
+ daemon_spec.daemon_id,
+ host=daemon_spec.host,
+ keyring=daemon_spec.keyring,
+ extra_ceph_config=daemon_spec.extra_config.pop('config', ''))
+
+
+ if daemon_spec.extra_config:
+ cephadm_config.update({'files': daemon_spec.extra_config})
+
+ return cephadm_config, []
+
def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
"""The post actions needed to be done after daemons are checked"""
+ if self.mgr.config_dashboard:
+ if 'dashboard' in self.mgr.get('mgr_map')['modules']:
+ self.config_dashboard(daemon_descrs)
+ else:
+ logger.debug('Dashboard is not enabled. Skip configuration.')
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+ """Config dashboard settings."""
raise NotImplementedError()
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
- raise NotImplementedError()
+ # if this is called for a service type where it hasn't explcitly been
+ # defined, return empty Daemon Desc
+ return DaemonDescription()
- def _inventory_get_addr(self, hostname: str):
+ def _inventory_get_addr(self, hostname: str) -> str:
"""Get a host's address with its hostname."""
return self.mgr.inventory.get_addr(hostname)
get_mon_cmd: str,
set_mon_cmd: str,
service_url: str):
- """A helper to get and set service_url via Dashboard's MON command."""
+ """A helper to get and set service_url via Dashboard's MON command.
+
+ If result of get_mon_cmd differs from service_url, set_mon_cmd will
+ be sent to set the service_url.
+ """
+ def get_set_cmd_dicts(out: str) -> List[dict]:
+ cmd_dict = {
+ 'prefix': set_mon_cmd,
+ 'value': service_url
+ }
+ return [cmd_dict] if service_url != out else []
+
+ self._check_and_set_dashboard(
+ service_name=service_name,
+ get_cmd=get_mon_cmd,
+ get_set_cmd_dicts=get_set_cmd_dicts
+ )
+
+ def _check_and_set_dashboard(self,
+ service_name: str,
+ get_cmd: str,
+ get_set_cmd_dicts: Callable[[str], List[dict]]):
+ """A helper to set configs in the Dashboard.
+
+ The method is useful for the pattern:
+ - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
+ gateways.
+ - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
+ - Determine if the config need to be update. NOTE: This step is important because if a
+ Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
+ kicks the serve() loop and the logic using this method is likely to be called again.
+ A config should be updated only when needed.
+ - Update a config in Dashboard by using a Dashboard command.
+
+ :param service_name: the service name to be used for logging
+ :type service_name: str
+ :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
+ :type get_cmd: str
+ :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
+ e.g.
+ [
+ {
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'service_url': 'http://admin:admin@aaa:5000',
+ 'name': 'aaa'
+ },
+ {
+ 'prefix': 'dashboard iscsi-gateway-add',
+ 'service_url': 'http://admin:admin@bbb:5000',
+ 'name': 'bbb'
+ }
+ ]
+ The function should return empty list if no command need to be sent.
+ :type get_set_cmd_dicts: Callable[[str], List[dict]]
+ """
+
try:
_, out, _ = self.mgr.check_mon_command({
- 'prefix': get_mon_cmd
+ 'prefix': get_cmd
})
except MonCommandFailed as e:
- logger.warning('Failed to get service URL for %s: %s', service_name, e)
+ logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
return
- if out.strip() != service_url:
+ cmd_dicts = get_set_cmd_dicts(out.strip())
+ for cmd_dict in list(cmd_dicts):
try:
- logger.info(
- 'Setting service URL %s for %s in the Dashboard', service_url, service_name)
- _, out, _ = self.mgr.check_mon_command({
- 'prefix': set_mon_cmd,
- 'value': service_url,
- })
+ logger.info('Setting Dashboard config for %s: command: %s', service_name, cmd_dict)
+ _, out, _ = self.mgr.check_mon_command(cmd_dict)
except MonCommandFailed as e:
- logger.warning('Failed to set service URL %s for %s in the Dashboard: %s',
- service_url, service_name, e)
+ logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
+
+
+
+ def ok_to_stop(self, daemon_ids: List[str]) -> HandleCommandResult:
+ names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+ out = f'It is presumed safe to stop {names}'
+ err = f'It is NOT safe to stop {names}'
+
+ if self.TYPE not in ['mon', 'osd', 'mds']:
+ logger.info(out)
+ return HandleCommandResult(0, out, None)
+
+ r = HandleCommandResult(*self.mgr.mon_command({
+ 'prefix': f'{self.TYPE} ok-to-stop',
+ 'ids': daemon_ids,
+ }))
+
+ if r.retval:
+ err = f'{err}: {r.stderr}' if r.stderr else err
+ logger.error(err)
+ return HandleCommandResult(r.retval, r.stdout, err)
+ out = f'{out}: {r.stdout}' if r.stdout else out
+ logger.info(out)
+ return HandleCommandResult(r.retval, out, r.stderr)
+
+ def pre_remove(self, daemon_id: str) -> None:
+ """
+ Called before the daemon is removed.
+ """
+ pass
class MonService(CephadmService):
- def create(self, name, host, network):
+ TYPE = 'mon'
+
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
"""
Create a new monitor on the given host.
"""
+ assert self.TYPE == daemon_spec.daemon_type
+ name, host, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+
# get mon. key
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get',
'who': 'mon',
'key': 'public_network',
})
- network = network.strip() # type: ignore
+ network = network.strip() if network else network
if not network:
raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
if '/' not in network:
raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
extra_config += 'public network = %s\n' % network
- return self.mgr._create_daemon('mon', name, host,
- keyring=keyring,
- extra_config={'config': extra_config})
+ daemon_spec.extra_config={'config': extra_config}
+ daemon_spec.keyring=keyring
+
+ return self.mgr._create_daemon(daemon_spec)
+
+ def _check_safe_to_destroy(self, mon_id: str) -> None:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'quorum_status',
+ })
+ try:
+ j = json.loads(out)
+ except Exception as e:
+ raise OrchestratorError('failed to parse quorum status')
+
+ mons = [m['name'] for m in j['monmap']['mons']]
+ if mon_id not in mons:
+ logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
+ mon_id, mons))
+ return
+ new_mons = [m for m in mons if m != mon_id]
+ new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+ if len(new_quorum) > len(new_mons) / 2:
+ logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
+ return
+ raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
+
+
+ def pre_remove(self, daemon_id: str) -> None:
+ self._check_safe_to_destroy(daemon_id)
+
+ # remove mon from quorum before we destroy the daemon
+ logger.info('Removing monitor %s from monmap...' % daemon_id)
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mon rm',
+ 'name': daemon_id,
+ })
class MgrService(CephadmService):
- def create(self, mgr_id, host):
+ TYPE = 'mgr'
+
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
"""
Create a new manager instance on a host.
"""
+ assert self.TYPE == daemon_spec.daemon_type
+ mgr_id, host = daemon_spec.daemon_id, daemon_spec.host
+
# get mgr. key
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'mds', 'allow *'],
})
- return self.mgr._create_daemon('mgr', mgr_id, host, keyring=keyring)
+
+ # Retrieve ports used by manager modules
+ # In the case of the dashboard port and with several manager daemons
+ # running in different hosts, it exists the possibility that the
+ # user has decided to use different dashboard ports in each server
+ # If this is the case then the dashboard port opened will be only the used
+ # as default.
+ ports = []
+ config_ports = ''
+ ret, mgr_services, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr services',
+ })
+ if mgr_services:
+ mgr_endpoints = json.loads(mgr_services)
+ for end_point in mgr_endpoints.values():
+ port = re.search('\:\d+\/', end_point)
+ if port:
+ ports.append(int(port[0][1:-1]))
+
+ if ports:
+ daemon_spec.ports = ports
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ active_mgr_str = self.mgr.get('mgr_map')['active_name']
+ for daemon in daemon_descrs:
+ if daemon.daemon_id == active_mgr_str:
+ return daemon
+ # if no active mgr found, return empty Daemon Desc
+ return DaemonDescription()
class MdsService(CephadmService):
- def config(self, spec: ServiceSpec):
- # ensure mds_join_fs is set for these daemons
+ TYPE = 'mds'
+
+ def config(self, spec: ServiceSpec) -> None:
+ assert self.TYPE == spec.service_type
assert spec.service_id
+
+ # ensure mds_join_fs is set for these daemons
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config set',
'who': 'mds.' + spec.service_id,
'value': spec.service_id,
})
- def create(self, mds_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+ assert self.TYPE == daemon_spec.daemon_type
+ mds_id, host = daemon_spec.daemon_id, daemon_spec.host
+
# get mgr. key
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'mds.' + mds_id,
'caps': ['mon', 'profile mds',
- 'osd', 'allow rwx',
+ 'osd', 'allow rw tag cephfs *=*',
'mds', 'allow'],
})
- return self.mgr._create_daemon('mds', mds_id, host, keyring=keyring)
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
+
+ def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+ active_mds_strs = list()
+ for fs in self.mgr.get('fs_map')['filesystems']:
+ mds_map = fs['mdsmap']
+ if mds_map is not None:
+ for mds_id, mds_status in mds_map['info'].items():
+ if mds_status['state'] == 'up:active':
+ active_mds_strs.append(mds_status['name'])
+ if len(active_mds_strs) != 0:
+ for daemon in daemon_descrs:
+ if daemon.daemon_id in active_mds_strs:
+ return daemon
+ # if no mds found, return empty Daemon Desc
+ return DaemonDescription()
class RgwService(CephadmService):
- def config(self, spec: RGWSpec):
+ TYPE = 'rgw'
+
+ def config(self, spec: RGWSpec, rgw_id: str):
+ assert self.TYPE == spec.service_type
+
+ # create realm, zonegroup, and zone if needed
+ self.create_realm_zonegroup_zone(spec, rgw_id)
+
# ensure rgw_realm and rgw_zone is set for these daemons
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config set',
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
- def create(self, rgw_id, host) -> str:
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+ assert self.TYPE == daemon_spec.daemon_type
+ rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ keyring = self.get_keyring(rgw_id)
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
+
+ def get_keyring(self, rgw_id: str):
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
'mgr', 'allow rw',
'osd', 'allow rwx'],
})
- return self.mgr._create_daemon('rgw', rgw_id, host, keyring=keyring)
+ return keyring
+
+ def create_realm_zonegroup_zone(self, spec: RGWSpec, rgw_id: str):
+ if utils.get_cluster_health(self.mgr) != 'HEALTH_OK':
+ raise OrchestratorError('Health not ok, will try agin when health ok')
+
+ #get keyring needed to run rados commands and strip out just the keyring
+ keyring = self.get_keyring(rgw_id).split('key = ',1)[1].rstrip()
+
+ # We can call radosgw-admin within the container, cause cephadm gives the MGR the required keyring permissions
+
+ def get_realms() -> List[str]:
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'realm', 'list',
+ '--format=json']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out = result.stdout
+ if not out:
+ return []
+ try:
+ j = json.loads(out)
+ return j.get('realms', [])
+ except Exception as e:
+ raise OrchestratorError('failed to parse realm info')
+
+ def create_realm():
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'realm', 'create',
+ '--rgw-realm=%s'%spec.rgw_realm,
+ '--default']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.mgr.log.info('created realm: %s'%spec.rgw_realm)
+
+ def get_zonegroups() -> List[str]:
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'zonegroup', 'list',
+ '--format=json']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out = result.stdout
+ if not out:
+ return []
+ try:
+ j = json.loads(out)
+ return j.get('zonegroups', [])
+ except Exception as e:
+ raise OrchestratorError('failed to parse zonegroup info')
+
+ def create_zonegroup():
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'zonegroup', 'create',
+ '--rgw-zonegroup=default',
+ '--master', '--default']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.mgr.log.info('created zonegroup: default')
+
+ def create_zonegroup_if_required():
+ zonegroups = get_zonegroups()
+ if 'default' not in zonegroups:
+ create_zonegroup()
+
+ def get_zones() -> List[str]:
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'zone', 'list',
+ '--format=json']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out = result.stdout
+ if not out:
+ return []
+ try:
+ j = json.loads(out)
+ return j.get('zones', [])
+ except Exception as e:
+ raise OrchestratorError('failed to parse zone info')
+
+ def create_zone():
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'zone', 'create',
+ '--rgw-zonegroup=default',
+ '--rgw-zone=%s'%spec.rgw_zone,
+ '--master', '--default']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.mgr.log.info('created zone: %s'%spec.rgw_zone)
+
+
+ changes = False
+ realms = get_realms()
+ if spec.rgw_realm not in realms:
+ create_realm()
+ changes = True
+
+ zones = get_zones()
+ if spec.rgw_zone not in zones:
+ create_zonegroup_if_required()
+ create_zone()
+ changes = True
+
+ # update period if changes were made
+ if changes:
+ cmd = ['radosgw-admin',
+ '--key=%s'%keyring,
+ '--user', 'rgw.%s'%rgw_id,
+ 'period', 'update',
+ '--rgw-realm=%s'%spec.rgw_realm,
+ '--commit']
+ result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ self.mgr.log.info('updated period')
class RbdMirrorService(CephadmService):
- def create(self, daemon_id, host) -> str:
+ TYPE = 'rbd-mirror'
+
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'client.rbd-mirror.' + daemon_id,
'caps': ['mon', 'profile rbd-mirror',
'osd', 'profile rbd'],
})
- return self.mgr._create_daemon('rbd-mirror', daemon_id, host,
- keyring=keyring)
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)
class CrashService(CephadmService):
- def create(self, daemon_id, host) -> str:
+ TYPE = 'crash'
+
+ def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': 'client.crash.' + host,
'caps': ['mon', 'profile crash',
'mgr', 'profile crash'],
})
- return self.mgr._create_daemon('crash', daemon_id, host, keyring=keyring)
+
+ daemon_spec.keyring = keyring
+
+ return self.mgr._create_daemon(daemon_spec)