+import errno
import json
-import re
import logging
-import subprocess
+import re
+import socket
+import time
from abc import ABCMeta, abstractmethod
-from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, \
- Optional, Dict, Any, Tuple, NewType
+from typing import TYPE_CHECKING, List, Callable, TypeVar, \
+ Optional, Dict, Any, Tuple, NewType, cast
from mgr_module import HandleCommandResult, MonCommandFailed
from ceph.deployment.service_spec import ServiceSpec, RGWSpec
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
-from orchestrator import OrchestratorError, DaemonDescription
+from mgr_util import build_url
+from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
+from orchestrator._interface import daemon_type_to_service
from cephadm import utils
if TYPE_CHECKING:
AuthEntity = NewType('AuthEntity', str)
-class CephadmDaemonSpec(Generic[ServiceSpecs]):
+class CephadmDaemonDeploySpec:
# typing.NamedTuple + Generic is broken in py36
def __init__(self, host: str, daemon_id: str,
- spec: Optional[ServiceSpecs] = None,
+ service_name: str,
network: Optional[str] = None,
keyring: Optional[str] = None,
extra_args: Optional[List[str]] = None,
ceph_conf: str = '',
extra_files: Optional[Dict[str, Any]] = None,
daemon_type: Optional[str] = None,
- ports: Optional[List[int]] = None,):
+ ip: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None):
"""
- Used for
- * deploying new daemons. then everything is set
- * redeploying existing daemons, then only the first three attrs are set.
-
- Would be great to have a consistent usage where all properties are set.
+ A data struction to encapsulate `cephadm deploy ...
"""
self.host: str = host
self.daemon_id = daemon_id
- daemon_type = daemon_type or (spec.service_type if spec else None)
+ self.service_name = service_name
+ daemon_type = daemon_type or (service_name.split('.')[0])
assert daemon_type is not None
self.daemon_type: str = daemon_type
- # would be great to have the spec always available:
- self.spec: Optional[ServiceSpecs] = spec
-
# mons
self.network = network
self.extra_files = extra_files or {}
# TCP ports used by the daemon
- self.ports: List[int] = ports or []
+ self.ports: List[int] = ports or []
+ self.ip: Optional[str] = ip
+
+ # values to be populated during generate_config calls
+ # and then used in _run_cephadm
+ self.final_config: Dict[str, Any] = {}
+ self.deps: List[str] = []
+
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
+
+ self.extra_container_args = extra_container_args
def name(self) -> str:
return '%s.%s' % (self.daemon_type, self.daemon_id)
return files
+ @staticmethod
+ def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
+ assert dd.hostname
+ assert dd.daemon_id
+ assert dd.daemon_type
+ return CephadmDaemonDeploySpec(
+ host=dd.hostname,
+ daemon_id=dd.daemon_id,
+ daemon_type=dd.daemon_type,
+ service_name=dd.service_name(),
+ ip=dd.ip,
+ ports=dd.ports,
+ rank=dd.rank,
+ rank_generation=dd.rank_generation,
+ extra_container_args=dd.extra_container_args,
+ )
+
+ def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
+ return DaemonDescription(
+ daemon_type=self.daemon_type,
+ daemon_id=self.daemon_id,
+ service_name=self.service_name,
+ hostname=self.host,
+ status=status,
+ status_desc=status_desc,
+ ip=self.ip,
+ ports=self.ports,
+ rank=self.rank,
+ rank_generation=self.rank_generation,
+ extra_container_args=self.extra_container_args,
+ )
+
class CephadmService(metaclass=ABCMeta):
"""
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr: "CephadmOrchestrator" = mgr
- def make_daemon_spec(self, host: str,
- daemon_id: str,
- netowrk: str,
- spec: ServiceSpecs) -> CephadmDaemonSpec:
- return CephadmDaemonSpec(
+ def allow_colo(self) -> bool:
+ """
+ Return True if multiple daemons of the same type can colocate on
+ the same host.
+ """
+ return False
+
+ def primary_daemon_type(self) -> str:
+ """
+ This is the type of the primary (usually only) daemon to be deployed.
+ """
+ return self.TYPE
+
+ def per_host_daemon_type(self) -> Optional[str]:
+ """
+ If defined, this type of daemon will be deployed once for each host
+ containing one or more daemons of the primary type.
+ """
+ return None
+
+ def ranked(self) -> bool:
+ """
+ If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+ generation (0, 1, ...) to each daemon we create/deploy.
+ """
+ return False
+
+ def fence_old_ranks(self,
+ spec: ServiceSpec,
+ rank_map: Dict[int, Dict[int, Optional[str]]],
+ num_ranks: int) -> None:
+ assert False
+
+ def make_daemon_spec(
+ self,
+ host: str,
+ daemon_id: str,
+ network: str,
+ spec: ServiceSpecs,
+ daemon_type: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ ip: Optional[str] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ ) -> CephadmDaemonDeploySpec:
+ try:
+ eca = spec.extra_container_args
+ except AttributeError:
+ eca = None
+ return CephadmDaemonDeploySpec(
host=host,
daemon_id=daemon_id,
- spec=spec,
- network=netowrk
+ service_name=spec.service_name(),
+ network=network,
+ daemon_type=daemon_type,
+ ports=ports,
+ ip=ip,
+ rank=rank,
+ rank_generation=rank_generation,
+ extra_container_args=eca,
)
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
raise NotImplementedError()
- def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
raise NotImplementedError()
+ def config(self, spec: ServiceSpec) -> None:
+ """
+ Configure the cluster for this service. Only called *once* per
+ service apply. Not for every daemon.
+ """
+ pass
+
def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
"""The post actions needed to be done after daemons are checked"""
if self.mgr.config_dashboard:
# defined, return empty Daemon Desc
return DaemonDescription()
- def _inventory_get_addr(self, hostname: str) -> str:
- """Get a host's address with its hostname."""
- return self.mgr.inventory.get_addr(hostname)
+ def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
+ ret, keyring, err = self.mgr.mon_command({
+ 'prefix': 'auth get-or-create',
+ 'entity': entity,
+ 'caps': caps,
+ })
+ if err:
+ ret, out, err = self.mgr.mon_command({
+ 'prefix': 'auth caps',
+ 'entity': entity,
+ 'caps': caps,
+ })
+ if err:
+ self.mgr.log.warning(f"Unable to update caps for {entity}")
+ return keyring
+
+ def _inventory_get_fqdn(self, hostname: str) -> str:
+ """Get a host's FQDN with its hostname.
+
+ If the FQDN can't be resolved, the address from the inventory will
+ be returned instead.
+ """
+ addr = self.mgr.inventory.get_addr(hostname)
+ return socket.getfqdn(addr)
def _set_service_url_on_dashboard(self,
service_name: str,
cmd_dicts = get_set_cmd_dicts(out.strip())
for cmd_dict in list(cmd_dicts):
try:
- logger.info('Setting Dashboard config for %s: command: %s', service_name, cmd_dict)
- _, out, _ = self.mgr.check_mon_command(cmd_dict)
+ inbuf = cmd_dict.pop('inbuf', None)
+ _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
except MonCommandFailed as e:
logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
- def ok_to_stop(self, daemon_ids: List[str]) -> HandleCommandResult:
+ def ok_to_stop_osd(
+ self,
+ osds: List[str],
+ known: Optional[List[str]] = None, # output argument
+ force: bool = False) -> HandleCommandResult:
+ r = HandleCommandResult(*self.mgr.mon_command({
+ 'prefix': "osd ok-to-stop",
+ 'ids': osds,
+ 'max': 16,
+ }))
+ j = None
+ try:
+ j = json.loads(r.stdout)
+ except json.decoder.JSONDecodeError:
+ self.mgr.log.warning("osd ok-to-stop didn't return structured result")
+ raise
+ if r.retval:
+ return r
+ if known is not None and j and j.get('ok_to_stop'):
+ self.mgr.log.debug(f"got {j}")
+ known.extend([f'osd.{x}' for x in j.get('osds', [])])
+ return HandleCommandResult(
+ 0,
+ f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
+ ''
+ )
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
- out = f'It is presumed safe to stop {names}'
- err = f'It is NOT safe to stop {names}'
+ out = f'It appears safe to stop {",".join(names)}'
+ err = f'It is NOT safe to stop {",".join(names)} at this time'
if self.TYPE not in ['mon', 'osd', 'mds']:
- logger.info(out)
- return HandleCommandResult(0, out, None)
+ logger.debug(out)
+ return HandleCommandResult(0, out)
+
+ if self.TYPE == 'osd':
+ return self.ok_to_stop_osd(daemon_ids, known, force)
r = HandleCommandResult(*self.mgr.mon_command({
'prefix': f'{self.TYPE} ok-to-stop',
if r.retval:
err = f'{err}: {r.stderr}' if r.stderr else err
- logger.error(err)
+ logger.debug(err)
return HandleCommandResult(r.retval, r.stdout, err)
out = f'{out}: {r.stdout}' if r.stdout else out
- logger.info(out)
+ logger.debug(out)
return HandleCommandResult(r.retval, out, r.stderr)
+ def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
+ # Provides a warning about if it possible or not to stop <n> daemons in a service
+ names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
+ number_of_running_daemons = len(
+ [daemon
+ for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
+ if daemon.status == DaemonDescriptionStatus.running])
+ if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
+ return False, f'It is presumed safe to stop {names}'
+
+ num_daemons_left = number_of_running_daemons - len(daemon_ids)
+
+ def plural(count: int) -> str:
+ return 'daemon' if count == 1 else 'daemons'
+
+ left_count = "no" if num_daemons_left == 0 else num_daemons_left
+
+ if alert:
+ out = (f'ALERT: Cannot stop {names} in {service} service. '
+ f'Not enough remaining {service} daemons. '
+ f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
+ else:
+ out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
+ f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
+ f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
+ return True, out
+
def pre_remove(self, daemon: DaemonDescription) -> None:
"""
Called before the daemon is removed.
"""
- assert self.TYPE == daemon.daemon_type
+ assert daemon.daemon_type is not None
+ assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
- def post_remove(self, daemon: DaemonDescription) -> None:
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
"""
Called after the daemon is removed.
"""
- assert self.TYPE == daemon.daemon_type
+ assert daemon.daemon_type is not None
+ assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
+ def purge(self, service_name: str) -> None:
+ """Called to carry out any purge tasks following service removal"""
+ logger.debug(f'Purge called for {self.TYPE} - no action taken')
+
class CephService(CephadmService):
- def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
# Ceph.daemons (mon, mgr, mds, osd, etc)
cephadm_config = self.get_config_and_keyring(
daemon_spec.daemon_type,
return cephadm_config, []
- def post_remove(self, daemon: DaemonDescription) -> None:
- super().post_remove(daemon)
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
self.remove_keyring(daemon)
def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
"""
Map the daemon id to a cephx keyring entity name
"""
- if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
+ # despite this mapping entity names to daemons, self.TYPE within
+ # the CephService class refers to service types, not daemon types
+ if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
- elif self.TYPE == 'crash':
+ elif self.TYPE in ['crash', 'agent']:
if host == "":
- raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+ raise OrchestratorError(
+ f'Host not provided to generate <{self.TYPE}> auth entity name')
return AuthEntity(f'client.{self.TYPE}.{host}')
elif self.TYPE == 'mon':
return AuthEntity('mon.')
}
def remove_keyring(self, daemon: DaemonDescription) -> None:
+ assert daemon.daemon_id is not None
+ assert daemon.hostname is not None
daemon_id: str = daemon.daemon_id
host: str = daemon.hostname
- if daemon_id == 'mon':
- # do not remove the mon keyring
- return
+ assert daemon.daemon_type != 'mon'
entity = self.get_auth_entity(daemon_id, host=host)
- logger.info(f'Remove keyring: {entity}')
- ret, out, err = self.mgr.check_mon_command({
+ logger.info(f'Removing key for {entity}')
+ ret, out, err = self.mgr.mon_command({
'prefix': 'auth rm',
'entity': entity,
})
class MonService(CephService):
TYPE = 'mon'
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
"""
Create a new monitor on the given host.
"""
assert self.TYPE == daemon_spec.daemon_type
- name, host, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+ name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
# get mon. key
ret, keyring, err = self.mgr.check_mon_command({
daemon_spec.ceph_conf = extra_config
daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
return daemon_spec
def _check_safe_to_destroy(self, mon_id: str) -> None:
})
try:
j = json.loads(out)
- except Exception as e:
+ except Exception:
raise OrchestratorError('failed to parse quorum status')
mons = [m['name'] for m in j['monmap']['mons']]
def pre_remove(self, daemon: DaemonDescription) -> None:
super().pre_remove(daemon)
+ assert daemon.daemon_id is not None
daemon_id: str = daemon.daemon_id
self._check_safe_to_destroy(daemon_id)
'name': daemon_id,
})
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ # Do not remove the mon keyring.
+ # super().post_remove(daemon)
+ pass
+
class MgrService(CephService):
TYPE = 'mgr'
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def allow_colo(self) -> bool:
+ if self.mgr.get_ceph_option('mgr_standby_modules'):
+ # traditional mgr mode: standby daemons' modules listen on
+ # ports and redirect to the primary. we must not schedule
+ # multiple mgrs on the same host or else ports will
+ # conflict.
+ return False
+ else:
+ # standby daemons do nothing, and therefore port conflicts
+ # are not a concern.
+ return True
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
"""
Create a new manager instance on a host.
"""
assert self.TYPE == daemon_spec.daemon_type
- mgr_id, host = daemon_spec.daemon_id, daemon_spec.host
+ mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
# get mgr. key
- ret, keyring, err = self.mgr.check_mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': self.get_auth_entity(mgr_id),
- 'caps': ['mon', 'profile mgr',
- 'osd', 'allow *',
- 'mds', 'allow *'],
- })
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
+ ['mon', 'profile mgr',
+ 'osd', 'allow *',
+ 'mds', 'allow *'])
# Retrieve ports used by manager modules
# In the case of the dashboard port and with several manager daemons
# If this is the case then the dashboard port opened will be only the used
# as default.
ports = []
- config_ports = ''
ret, mgr_services, err = self.mgr.check_mon_command({
'prefix': 'mgr services',
})
daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
return daemon_spec
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
for daemon in daemon_descrs:
+ assert daemon.daemon_type is not None
+ assert daemon.daemon_id is not None
if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
return daemon
# if no active mgr found, return empty Daemon Desc
return DaemonDescription()
def fail_over(self) -> None:
- if not self.mgr_map_has_standby():
- raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
- 'daemon', 'mgr' + self.mgr.get_mgr_id()))
-
- self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
- 'INFO', 'Failing over to other MGR')
- logger.info('Failing over to other MGR')
-
- # fail over
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'mgr fail',
- 'who': self.mgr.get_mgr_id(),
- })
+ # this has been seen to sometimes transiently fail even when there are multiple
+ # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+ class NoStandbyError(OrchestratorError):
+ pass
+ no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+ 'daemon', 'mgr' + self.mgr.get_mgr_id()))
+ for sleep_secs in [2, 8, 15]:
+ try:
+ if not self.mgr_map_has_standby():
+ raise no_standby_exc
+ self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+ 'INFO', 'Failing over to other MGR')
+ logger.info('Failing over to other MGR')
+
+ # fail over
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr fail',
+ 'who': self.mgr.get_mgr_id(),
+ })
+ return
+ except NoStandbyError:
+ logger.info(
+ f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+ time.sleep(sleep_secs)
+ raise no_standby_exc
def mgr_map_has_standby(self) -> bool:
"""
num = len(mgr_map.get('standbys'))
return bool(num)
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
+
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+ active = self.get_active_daemon(mgr_daemons).daemon_id
+ if active in daemon_ids:
+ warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ return HandleCommandResult(0, warn_message, '')
+
class MdsService(CephService):
TYPE = 'mds'
+ def allow_colo(self) -> bool:
+ return True
+
def config(self, spec: ServiceSpec) -> None:
assert self.TYPE == spec.service_type
assert spec.service_id
'value': spec.service_id,
})
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
- mds_id, host = daemon_spec.daemon_id, daemon_spec.host
+ mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
- # get mgr. key
- ret, keyring, err = self.mgr.check_mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': self.get_auth_entity(mds_id),
- 'caps': ['mon', 'profile mds',
- 'osd', 'allow rw tag cephfs *=*',
- 'mds', 'allow'],
- })
+ # get mds. key
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
+ ['mon', 'profile mds',
+ 'osd', 'allow rw tag cephfs *=*',
+ 'mds', 'allow'])
daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
return daemon_spec
def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
# if no mds found, return empty Daemon Desc
return DaemonDescription()
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': service_name,
+ 'name': 'mds_join_fs',
+ })
+
class RgwService(CephService):
TYPE = 'rgw'
- def config(self, spec: RGWSpec, rgw_id: str) -> None:
- assert self.TYPE == spec.service_type
+ def allow_colo(self) -> bool:
+ return True
- # create realm, zonegroup, and zone if needed
- self.create_realm_zonegroup_zone(spec, rgw_id)
+ def config(self, spec: RGWSpec) -> None: # type: ignore
+ assert self.TYPE == spec.service_type
- # ensure rgw_realm and rgw_zone is set for these daemons
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config set',
- 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
- 'name': 'rgw_zone',
- 'value': spec.rgw_zone,
- })
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config set',
- 'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
- 'name': 'rgw_realm',
- 'value': spec.rgw_realm,
- })
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config set',
- 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
- 'name': 'rgw_frontends',
- 'value': spec.rgw_frontends_config_value(),
- })
+ # set rgw_realm and rgw_zone, if present
+ if spec.rgw_realm:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_realm',
+ 'value': spec.rgw_realm,
+ })
+ if spec.rgw_zone:
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+ 'name': 'rgw_zone',
+ 'value': spec.rgw_zone,
+ })
if spec.rgw_frontend_ssl_certificate:
if isinstance(spec.rgw_frontend_ssl_certificate, list):
% spec.rgw_frontend_ssl_certificate)
ret, out, err = self.mgr.check_mon_command({
'prefix': 'config-key set',
- 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
+ 'key': f'rgw/cert/{spec.service_name()}',
'val': cert_data,
})
- if spec.rgw_frontend_ssl_key:
- if isinstance(spec.rgw_frontend_ssl_key, list):
- key_data = '\n'.join(spec.rgw_frontend_ssl_key)
- elif isinstance(spec.rgw_frontend_ssl_certificate, str):
- key_data = spec.rgw_frontend_ssl_key
- else:
- raise OrchestratorError(
- 'Invalid rgw_frontend_ssl_key: %s'
- % spec.rgw_frontend_ssl_key)
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'config-key set',
- 'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
- 'val': key_data,
- })
-
+ # TODO: fail, if we don't have a spec
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
+ self.mgr.trigger_connect_dashboard_rgw()
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
- rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
+ rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
+ spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
keyring = self.get_keyring(rgw_id)
+ if daemon_spec.ports:
+ port = daemon_spec.ports[0]
+ else:
+ # this is a redeploy of older instance that doesn't have an explicitly
+ # assigned port, in which case we can assume there is only 1 per host
+ # and it matches the spec.
+ port = spec.get_port()
+
+ # configure frontend
+ args = []
+ ftype = spec.rgw_frontend_type or "beast"
+ if ftype == 'beast':
+ if spec.ssl:
+ if daemon_spec.ip:
+ args.append(
+ f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"ssl_port={port}")
+ args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+ else:
+ if daemon_spec.ip:
+ args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"port={port}")
+ elif ftype == 'civetweb':
+ if spec.ssl:
+ if daemon_spec.ip:
+ # note the 's' suffix on port
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
+ else:
+ args.append(f"port={port}s") # note the 's' suffix on port
+ args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+ else:
+ if daemon_spec.ip:
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+ else:
+ args.append(f"port={port}")
+ frontend = f'{ftype} {" ".join(args)}'
+
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'config set',
+ 'who': utils.name_to_config_section(daemon_spec.name()),
+ 'name': 'rgw_frontends',
+ 'value': frontend
+ })
+
daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
def get_keyring(self, rgw_id: str) -> str:
- ret, keyring, err = self.mgr.check_mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': self.get_auth_entity(rgw_id),
- 'caps': ['mon', 'allow *',
- 'mgr', 'allow rw',
- 'osd', 'allow rwx'],
- })
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
+ ['mon', 'allow *',
+ 'mgr', 'allow rw',
+ 'osd', 'allow rwx tag rgw *=*'])
return keyring
- def create_realm_zonegroup_zone(self, spec: RGWSpec, rgw_id: str) -> None:
- if utils.get_cluster_health(self.mgr) != 'HEALTH_OK':
- raise OrchestratorError('Health not ok, will try agin when health ok')
-
- # get keyring needed to run rados commands and strip out just the keyring
- keyring = self.get_keyring(rgw_id).split('key = ', 1)[1].rstrip()
-
- # We can call radosgw-admin within the container, cause cephadm gives the MGR the required keyring permissions
-
- def get_realms() -> List[str]:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'realm', 'list',
- '--format=json']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out = result.stdout
- if not out:
- return []
- try:
- j = json.loads(out)
- return j.get('realms', [])
- except Exception as e:
- raise OrchestratorError('failed to parse realm info')
-
- def create_realm() -> None:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'realm', 'create',
- '--rgw-realm=%s' % spec.rgw_realm,
- '--default']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.mgr.log.info('created realm: %s' % spec.rgw_realm)
-
- def get_zonegroups() -> List[str]:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'zonegroup', 'list',
- '--format=json']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out = result.stdout
- if not out:
- return []
- try:
- j = json.loads(out)
- return j.get('zonegroups', [])
- except Exception as e:
- raise OrchestratorError('failed to parse zonegroup info')
-
- def create_zonegroup() -> None:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'zonegroup', 'create',
- '--rgw-zonegroup=default',
- '--master', '--default']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.mgr.log.info('created zonegroup: default')
-
- def create_zonegroup_if_required() -> None:
- zonegroups = get_zonegroups()
- if 'default' not in zonegroups:
- create_zonegroup()
-
- def get_zones() -> List[str]:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'zone', 'list',
- '--format=json']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- out = result.stdout
- if not out:
- return []
- try:
- j = json.loads(out)
- return j.get('zones', [])
- except Exception as e:
- raise OrchestratorError('failed to parse zone info')
-
- def create_zone() -> None:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'zone', 'create',
- '--rgw-zonegroup=default',
- '--rgw-zone=%s' % spec.rgw_zone,
- '--master', '--default']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.mgr.log.info('created zone: %s' % spec.rgw_zone)
-
- changes = False
- realms = get_realms()
- if spec.rgw_realm not in realms:
- create_realm()
- changes = True
-
- zones = get_zones()
- if spec.rgw_zone not in zones:
- create_zonegroup_if_required()
- create_zone()
- changes = True
-
- # update period if changes were made
- if changes:
- cmd = ['radosgw-admin',
- '--key=%s' % keyring,
- '--user', 'rgw.%s' % rgw_id,
- 'period', 'update',
- '--rgw-realm=%s' % spec.rgw_realm,
- '--commit']
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- self.mgr.log.info('updated period')
+ def purge(self, service_name: str) -> None:
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(service_name),
+ 'name': 'rgw_realm',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(service_name),
+ 'name': 'rgw_zone',
+ })
+ self.mgr.check_mon_command({
+ 'prefix': 'config-key rm',
+ 'key': f'rgw/cert/{service_name}',
+ })
+ self.mgr.trigger_connect_dashboard_rgw()
+
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+ self.mgr.check_mon_command({
+ 'prefix': 'config rm',
+ 'who': utils.name_to_config_section(daemon.name()),
+ 'name': 'rgw_frontends',
+ })
+
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
+ # if no load balancer, warn if > 1 daemon, block if only 1 daemon
+ def ingress_present() -> bool:
+ running_ingress_daemons = [
+ daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
+ running_haproxy_daemons = [
+ daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
+ running_keepalived_daemons = [
+ daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
+ # check that there is at least one haproxy and keepalived daemon running
+ if running_haproxy_daemons and running_keepalived_daemons:
+ return True
+ return False
+
+ # if only 1 rgw, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ # if reached here, there is > 1 rgw daemon.
+ # Say okay if load balancer present or force flag set
+ if ingress_present() or force:
+ return HandleCommandResult(0, warn_message, '')
+
+ # if reached here, > 1 RGW daemon, no load balancer and no force flag.
+ # Provide warning
+ warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ self.mgr.trigger_connect_dashboard_rgw()
class RbdMirrorService(CephService):
TYPE = 'rbd-mirror'
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def allow_colo(self) -> bool:
+ return True
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
- daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+ daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
- ret, keyring, err = self.mgr.check_mon_command({
- 'prefix': 'auth get-or-create',
- 'entity': self.get_auth_entity(daemon_id),
- 'caps': ['mon', 'profile rbd-mirror',
- 'osd', 'profile rbd'],
- })
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
+ ['mon', 'profile rbd-mirror',
+ 'osd', 'profile rbd'])
daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
return daemon_spec
+ def ok_to_stop(
+ self,
+ daemon_ids: List[str],
+ force: bool = False,
+ known: Optional[List[str]] = None # output argument
+ ) -> HandleCommandResult:
+ # if only 1 rbd-mirror, alert user (this is not passable with --force)
+ warn, warn_message = self._enough_daemons_to_stop(
+ self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
+ if warn:
+ return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ return HandleCommandResult(0, warn_message, '')
+
class CrashService(CephService):
TYPE = 'crash'
- def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
+ ['mon', 'profile crash',
+ 'mgr', 'profile crash'])
+
+ daemon_spec.keyring = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+
+class CephfsMirrorService(CephService):
+ TYPE = 'cephfs-mirror'
+
+ def config(self, spec: ServiceSpec) -> None:
+ # make sure mirroring module is enabled
+ mgr_map = self.mgr.get('mgr_map')
+ mod_name = 'mirroring'
+ if mod_name not in mgr_map.get('services', {}):
+ self.mgr.check_mon_command({
+ 'prefix': 'mgr module enable',
+ 'module': mod_name
+ })
+ # we shouldn't get here (mon will tell the mgr to respawn), but no
+ # harm done if we do.
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
- 'entity': self.get_auth_entity(daemon_id, host=host),
- 'caps': ['mon', 'profile crash',
- 'mgr', 'profile crash'],
+ 'entity': self.get_auth_entity(daemon_spec.daemon_id),
+ 'caps': ['mon', 'profile cephfs-mirror',
+ 'mds', 'allow r',
+ 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
+ 'mgr', 'allow r'],
})
daemon_spec.keyring = keyring
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+ return daemon_spec
+
+
+class CephadmAgent(CephService):
+ TYPE = 'agent'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ if not self.mgr.cherrypy_thread:
+ raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+ daemon_spec.keyring = keyring
+ self.mgr.agent_cache.agent_keys[host] = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ try:
+ assert self.mgr.cherrypy_thread
+ assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ assert self.mgr.cherrypy_thread.server_port
+ except Exception:
+ raise OrchestratorError(
+ 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
+
+ cfg = {'target_ip': self.mgr.get_mgr_ip(),
+ 'target_port': self.mgr.cherrypy_thread.server_port,
+ 'refresh_period': self.mgr.agent_refresh_rate,
+ 'listener_port': self.mgr.agent_starting_port,
+ 'host': daemon_spec.host,
+ 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
+
+ listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
+ self.mgr.inventory.get_addr(daemon_spec.host))
+ config = {
+ 'agent.json': json.dumps(cfg),
+ 'keyring': daemon_spec.keyring,
+ 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ 'listener.crt': listener_cert,
+ 'listener.key': listener_key,
+ }
+
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
+ self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ str(self.mgr.get_module_option('device_enhanced_scan'))])