]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
index c779ff34f180a82e327b80c66e718bf5a7037e58..8abb0e63a2c102181d7937c3cf8f99c357a4fbb9 100644 (file)
@@ -1,16 +1,20 @@
+import errno
 import json
-import re
 import logging
-import subprocess
+import re
+import socket
+import time
 from abc import ABCMeta, abstractmethod
-from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, \
-    Optional, Dict, Any, Tuple, NewType
+from typing import TYPE_CHECKING, List, Callable, TypeVar, \
+    Optional, Dict, Any, Tuple, NewType, cast
 
 from mgr_module import HandleCommandResult, MonCommandFailed
 
 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
-from orchestrator import OrchestratorError, DaemonDescription
+from mgr_util import build_url
+from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
+from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
 
 if TYPE_CHECKING:
@@ -22,33 +26,31 @@ ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
 AuthEntity = NewType('AuthEntity', str)
 
 
-class CephadmDaemonSpec(Generic[ServiceSpecs]):
+class CephadmDaemonDeploySpec:
     # typing.NamedTuple + Generic is broken in py36
     def __init__(self, host: str, daemon_id: str,
-                 spec: Optional[ServiceSpecs] = None,
+                 service_name: str,
                  network: Optional[str] = None,
                  keyring: Optional[str] = None,
                  extra_args: Optional[List[str]] = None,
                  ceph_conf: str = '',
                  extra_files: Optional[Dict[str, Any]] = None,
                  daemon_type: Optional[str] = None,
-                 ports: Optional[List[int]] = None,):
+                 ip: Optional[str] = None,
+                 ports: Optional[List[int]] = None,
+                 rank: Optional[int] = None,
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None):
         """
-        Used for
-        * deploying new daemons. then everything is set
-        * redeploying existing daemons, then only the first three attrs are set.
-
-        Would be great to have a consistent usage where all properties are set.
+        A data struction to encapsulate `cephadm deploy ...
         """
         self.host: str = host
         self.daemon_id = daemon_id
-        daemon_type = daemon_type or (spec.service_type if spec else None)
+        self.service_name = service_name
+        daemon_type = daemon_type or (service_name.split('.')[0])
         assert daemon_type is not None
         self.daemon_type: str = daemon_type
 
-        # would be great to have the spec always available:
-        self.spec: Optional[ServiceSpecs] = spec
-
         # mons
         self.network = network
 
@@ -62,7 +64,18 @@ class CephadmDaemonSpec(Generic[ServiceSpecs]):
         self.extra_files = extra_files or {}
 
         # TCP ports used by the daemon
-        self.ports:  List[int] = ports or []
+        self.ports: List[int] = ports or []
+        self.ip: Optional[str] = ip
+
+        # values to be populated during generate_config calls
+        # and then used in _run_cephadm
+        self.final_config: Dict[str, Any] = {}
+        self.deps: List[str] = []
+
+        self.rank: Optional[int] = rank
+        self.rank_generation: Optional[int] = rank_generation
+
+        self.extra_container_args = extra_container_args
 
     def name(self) -> str:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
@@ -74,6 +87,38 @@ class CephadmDaemonSpec(Generic[ServiceSpecs]):
 
         return files
 
+    @staticmethod
+    def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
+        assert dd.hostname
+        assert dd.daemon_id
+        assert dd.daemon_type
+        return CephadmDaemonDeploySpec(
+            host=dd.hostname,
+            daemon_id=dd.daemon_id,
+            daemon_type=dd.daemon_type,
+            service_name=dd.service_name(),
+            ip=dd.ip,
+            ports=dd.ports,
+            rank=dd.rank,
+            rank_generation=dd.rank_generation,
+            extra_container_args=dd.extra_container_args,
+        )
+
+    def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
+        return DaemonDescription(
+            daemon_type=self.daemon_type,
+            daemon_id=self.daemon_id,
+            service_name=self.service_name,
+            hostname=self.host,
+            status=status,
+            status_desc=status_desc,
+            ip=self.ip,
+            ports=self.ports,
+            rank=self.rank,
+            rank_generation=self.rank_generation,
+            extra_container_args=self.extra_container_args,
+        )
+
 
 class CephadmService(metaclass=ABCMeta):
     """
@@ -88,23 +133,81 @@ class CephadmService(metaclass=ABCMeta):
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr: "CephadmOrchestrator" = mgr
 
-    def make_daemon_spec(self, host: str,
-                         daemon_id: str,
-                         netowrk: str,
-                         spec: ServiceSpecs) -> CephadmDaemonSpec:
-        return CephadmDaemonSpec(
+    def allow_colo(self) -> bool:
+        """
+        Return True if multiple daemons of the same type can colocate on
+        the same host.
+        """
+        return False
+
+    def primary_daemon_type(self) -> str:
+        """
+        This is the type of the primary (usually only) daemon to be deployed.
+        """
+        return self.TYPE
+
+    def per_host_daemon_type(self) -> Optional[str]:
+        """
+        If defined, this type of daemon will be deployed once for each host
+        containing one or more daemons of the primary type.
+        """
+        return None
+
+    def ranked(self) -> bool:
+        """
+        If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+        generation (0, 1, ...) to each daemon we create/deploy.
+        """
+        return False
+
+    def fence_old_ranks(self,
+                        spec: ServiceSpec,
+                        rank_map: Dict[int, Dict[int, Optional[str]]],
+                        num_ranks: int) -> None:
+        assert False
+
+    def make_daemon_spec(
+            self,
+            host: str,
+            daemon_id: str,
+            network: str,
+            spec: ServiceSpecs,
+            daemon_type: Optional[str] = None,
+            ports: Optional[List[int]] = None,
+            ip: Optional[str] = None,
+            rank: Optional[int] = None,
+            rank_generation: Optional[int] = None,
+    ) -> CephadmDaemonDeploySpec:
+        try:
+            eca = spec.extra_container_args
+        except AttributeError:
+            eca = None
+        return CephadmDaemonDeploySpec(
             host=host,
             daemon_id=daemon_id,
-            spec=spec,
-            network=netowrk
+            service_name=spec.service_name(),
+            network=network,
+            daemon_type=daemon_type,
+            ports=ports,
+            ip=ip,
+            rank=rank,
+            rank_generation=rank_generation,
+            extra_container_args=eca,
         )
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         raise NotImplementedError()
 
-    def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         raise NotImplementedError()
 
+    def config(self, spec: ServiceSpec) -> None:
+        """
+        Configure the cluster for this service. Only called *once* per
+        service apply. Not for every daemon.
+        """
+        pass
+
     def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
         """The post actions needed to be done after daemons are checked"""
         if self.mgr.config_dashboard:
@@ -122,9 +225,30 @@ class CephadmService(metaclass=ABCMeta):
         # defined, return empty Daemon Desc
         return DaemonDescription()
 
-    def _inventory_get_addr(self, hostname: str) -> str:
-        """Get a host's address with its hostname."""
-        return self.mgr.inventory.get_addr(hostname)
+    def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
+        ret, keyring, err = self.mgr.mon_command({
+            'prefix': 'auth get-or-create',
+            'entity': entity,
+            'caps': caps,
+        })
+        if err:
+            ret, out, err = self.mgr.mon_command({
+                'prefix': 'auth caps',
+                'entity': entity,
+                'caps': caps,
+            })
+            if err:
+                self.mgr.log.warning(f"Unable to update caps for {entity}")
+        return keyring
+
+    def _inventory_get_fqdn(self, hostname: str) -> str:
+        """Get a host's FQDN with its hostname.
+
+           If the FQDN can't be resolved, the address from the inventory will
+           be returned instead.
+        """
+        addr = self.mgr.inventory.get_addr(hostname)
+        return socket.getfqdn(addr)
 
     def _set_service_url_on_dashboard(self,
                                       service_name: str,
@@ -197,19 +321,54 @@ class CephadmService(metaclass=ABCMeta):
         cmd_dicts = get_set_cmd_dicts(out.strip())
         for cmd_dict in list(cmd_dicts):
             try:
-                logger.info('Setting Dashboard config for %s: command: %s', service_name, cmd_dict)
-                _, out, _ = self.mgr.check_mon_command(cmd_dict)
+                inbuf = cmd_dict.pop('inbuf', None)
+                _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
             except MonCommandFailed as e:
                 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
 
-    def ok_to_stop(self, daemon_ids: List[str]) -> HandleCommandResult:
+    def ok_to_stop_osd(
+            self,
+            osds: List[str],
+            known: Optional[List[str]] = None,  # output argument
+            force: bool = False) -> HandleCommandResult:
+        r = HandleCommandResult(*self.mgr.mon_command({
+            'prefix': "osd ok-to-stop",
+            'ids': osds,
+            'max': 16,
+        }))
+        j = None
+        try:
+            j = json.loads(r.stdout)
+        except json.decoder.JSONDecodeError:
+            self.mgr.log.warning("osd ok-to-stop didn't return structured result")
+            raise
+        if r.retval:
+            return r
+        if known is not None and j and j.get('ok_to_stop'):
+            self.mgr.log.debug(f"got {j}")
+            known.extend([f'osd.{x}' for x in j.get('osds', [])])
+        return HandleCommandResult(
+            0,
+            f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
+            ''
+        )
+
+    def ok_to_stop(
+            self,
+            daemon_ids: List[str],
+            force: bool = False,
+            known: Optional[List[str]] = None    # output argument
+    ) -> HandleCommandResult:
         names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
-        out = f'It is presumed safe to stop {names}'
-        err = f'It is NOT safe to stop {names}'
+        out = f'It appears safe to stop {",".join(names)}'
+        err = f'It is NOT safe to stop {",".join(names)} at this time'
 
         if self.TYPE not in ['mon', 'osd', 'mds']:
-            logger.info(out)
-            return HandleCommandResult(0, out, None)
+            logger.debug(out)
+            return HandleCommandResult(0, out)
+
+        if self.TYPE == 'osd':
+            return self.ok_to_stop_osd(daemon_ids, known, force)
 
         r = HandleCommandResult(*self.mgr.mon_command({
             'prefix': f'{self.TYPE} ok-to-stop',
@@ -218,30 +377,63 @@ class CephadmService(metaclass=ABCMeta):
 
         if r.retval:
             err = f'{err}: {r.stderr}' if r.stderr else err
-            logger.error(err)
+            logger.debug(err)
             return HandleCommandResult(r.retval, r.stdout, err)
 
         out = f'{out}: {r.stdout}' if r.stdout else out
-        logger.info(out)
+        logger.debug(out)
         return HandleCommandResult(r.retval, out, r.stderr)
 
+    def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
+        # Provides a warning about if it possible or not to stop <n> daemons in a service
+        names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
+        number_of_running_daemons = len(
+            [daemon
+             for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
+             if daemon.status == DaemonDescriptionStatus.running])
+        if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
+            return False, f'It is presumed safe to stop {names}'
+
+        num_daemons_left = number_of_running_daemons - len(daemon_ids)
+
+        def plural(count: int) -> str:
+            return 'daemon' if count == 1 else 'daemons'
+
+        left_count = "no" if num_daemons_left == 0 else num_daemons_left
+
+        if alert:
+            out = (f'ALERT: Cannot stop {names} in {service} service. '
+                   f'Not enough remaining {service} daemons. '
+                   f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
+        else:
+            out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
+                   f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
+                   f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
+        return True, out
+
     def pre_remove(self, daemon: DaemonDescription) -> None:
         """
         Called before the daemon is removed.
         """
-        assert self.TYPE == daemon.daemon_type
+        assert daemon.daemon_type is not None
+        assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
         logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
         """
         Called after the daemon is removed.
         """
-        assert self.TYPE == daemon.daemon_type
+        assert daemon.daemon_type is not None
+        assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
         logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
 
+    def purge(self, service_name: str) -> None:
+        """Called to carry out any purge tasks following service removal"""
+        logger.debug(f'Purge called for {self.TYPE} - no action taken')
+
 
 class CephService(CephadmService):
-    def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         # Ceph.daemons (mon, mgr, mds, osd, etc)
         cephadm_config = self.get_config_and_keyring(
             daemon_spec.daemon_type,
@@ -255,19 +447,22 @@ class CephService(CephadmService):
 
         return cephadm_config, []
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
-        super().post_remove(daemon)
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
         self.remove_keyring(daemon)
 
     def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
         """
         Map the daemon id to a cephx keyring entity name
         """
-        if self.TYPE in ['rgw', 'rbd-mirror', 'nfs', "iscsi"]:
+        # despite this mapping entity names to daemons, self.TYPE within
+        # the CephService class refers to service types, not daemon types
+        if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
             return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
-        elif self.TYPE == 'crash':
+        elif self.TYPE in ['crash', 'agent']:
             if host == "":
-                raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+                raise OrchestratorError(
+                    f'Host not provided to generate <{self.TYPE}> auth entity name')
             return AuthEntity(f'client.{self.TYPE}.{host}')
         elif self.TYPE == 'mon':
             return AuthEntity('mon.')
@@ -302,17 +497,17 @@ class CephService(CephadmService):
         }
 
     def remove_keyring(self, daemon: DaemonDescription) -> None:
+        assert daemon.daemon_id is not None
+        assert daemon.hostname is not None
         daemon_id: str = daemon.daemon_id
         host: str = daemon.hostname
 
-        if daemon_id == 'mon':
-            # do not remove the mon keyring
-            return
+        assert daemon.daemon_type != 'mon'
 
         entity = self.get_auth_entity(daemon_id, host=host)
 
-        logger.info(f'Remove keyring: {entity}')
-        ret, out, err = self.mgr.check_mon_command({
+        logger.info(f'Removing key for {entity}')
+        ret, out, err = self.mgr.mon_command({
             'prefix': 'auth rm',
             'entity': entity,
         })
@@ -321,12 +516,12 @@ class CephService(CephadmService):
 class MonService(CephService):
     TYPE = 'mon'
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         """
         Create a new monitor on the given host.
         """
         assert self.TYPE == daemon_spec.daemon_type
-        name, host, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+        name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
 
         # get mon. key
         ret, keyring, err = self.mgr.check_mon_command({
@@ -367,6 +562,8 @@ class MonService(CephService):
         daemon_spec.ceph_conf = extra_config
         daemon_spec.keyring = keyring
 
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
         return daemon_spec
 
     def _check_safe_to_destroy(self, mon_id: str) -> None:
@@ -375,7 +572,7 @@ class MonService(CephService):
         })
         try:
             j = json.loads(out)
-        except Exception as e:
+        except Exception:
             raise OrchestratorError('failed to parse quorum status')
 
         mons = [m['name'] for m in j['monmap']['mons']]
@@ -395,6 +592,7 @@ class MonService(CephService):
     def pre_remove(self, daemon: DaemonDescription) -> None:
         super().pre_remove(daemon)
 
+        assert daemon.daemon_id is not None
         daemon_id: str = daemon.daemon_id
         self._check_safe_to_destroy(daemon_id)
 
@@ -405,25 +603,39 @@ class MonService(CephService):
             'name': daemon_id,
         })
 
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        # Do not remove the mon keyring.
+        # super().post_remove(daemon)
+        pass
+
 
 class MgrService(CephService):
     TYPE = 'mgr'
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def allow_colo(self) -> bool:
+        if self.mgr.get_ceph_option('mgr_standby_modules'):
+            # traditional mgr mode: standby daemons' modules listen on
+            # ports and redirect to the primary.  we must not schedule
+            # multiple mgrs on the same host or else ports will
+            # conflict.
+            return False
+        else:
+            # standby daemons do nothing, and therefore port conflicts
+            # are not a concern.
+            return True
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         """
         Create a new manager instance on a host.
         """
         assert self.TYPE == daemon_spec.daemon_type
-        mgr_id, host = daemon_spec.daemon_id, daemon_spec.host
+        mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
 
         # get mgr. key
-        ret, keyring, err = self.mgr.check_mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': self.get_auth_entity(mgr_id),
-            'caps': ['mon', 'profile mgr',
-                     'osd', 'allow *',
-                     'mds', 'allow *'],
-        })
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
+                                             ['mon', 'profile mgr',
+                                              'osd', 'allow *',
+                                              'mds', 'allow *'])
 
         # Retrieve ports used by manager modules
         # In the case of the dashboard port and with several manager daemons
@@ -432,7 +644,6 @@ class MgrService(CephService):
         # If this is the case then the dashboard port opened will be only the used
         # as default.
         ports = []
-        config_ports = ''
         ret, mgr_services, err = self.mgr.check_mon_command({
             'prefix': 'mgr services',
         })
@@ -448,29 +659,45 @@ class MgrService(CephService):
 
         daemon_spec.keyring = keyring
 
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
         return daemon_spec
 
     def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
         for daemon in daemon_descrs:
+            assert daemon.daemon_type is not None
+            assert daemon.daemon_id is not None
             if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
                 return daemon
         # if no active mgr found, return empty Daemon Desc
         return DaemonDescription()
 
     def fail_over(self) -> None:
-        if not self.mgr_map_has_standby():
-            raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
-                'daemon', 'mgr' + self.mgr.get_mgr_id()))
-
-        self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
-                                   'INFO', 'Failing over to other MGR')
-        logger.info('Failing over to other MGR')
-
-        # fail over
-        ret, out, err = self.mgr.check_mon_command({
-            'prefix': 'mgr fail',
-            'who': self.mgr.get_mgr_id(),
-        })
+        # this has been seen to sometimes transiently fail even when there are multiple
+        # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+        class NoStandbyError(OrchestratorError):
+            pass
+        no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+            'daemon', 'mgr' + self.mgr.get_mgr_id()))
+        for sleep_secs in [2, 8, 15]:
+            try:
+                if not self.mgr_map_has_standby():
+                    raise no_standby_exc
+                self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+                                           'INFO', 'Failing over to other MGR')
+                logger.info('Failing over to other MGR')
+
+                # fail over
+                ret, out, err = self.mgr.check_mon_command({
+                    'prefix': 'mgr fail',
+                    'who': self.mgr.get_mgr_id(),
+                })
+                return
+            except NoStandbyError:
+                logger.info(
+                    f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+                time.sleep(sleep_secs)
+        raise no_standby_exc
 
     def mgr_map_has_standby(self) -> bool:
         """
@@ -481,10 +708,33 @@ class MgrService(CephService):
         num = len(mgr_map.get('standbys'))
         return bool(num)
 
+    def ok_to_stop(
+            self,
+            daemon_ids: List[str],
+            force: bool = False,
+            known: Optional[List[str]] = None  # output argument
+    ) -> HandleCommandResult:
+        # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
+
+        warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
+        if warn:
+            return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+        mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
+        active = self.get_active_daemon(mgr_daemons).daemon_id
+        if active in daemon_ids:
+            warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
+            return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+        return HandleCommandResult(0, warn_message, '')
+
 
 class MdsService(CephService):
     TYPE = 'mds'
 
+    def allow_colo(self) -> bool:
+        return True
+
     def config(self, spec: ServiceSpec) -> None:
         assert self.TYPE == spec.service_type
         assert spec.service_id
@@ -497,20 +747,19 @@ class MdsService(CephService):
             'value': spec.service_id,
         })
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
-        mds_id, host = daemon_spec.daemon_id, daemon_spec.host
+        mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
 
-        # get mgr. key
-        ret, keyring, err = self.mgr.check_mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': self.get_auth_entity(mds_id),
-            'caps': ['mon', 'profile mds',
-                     'osd', 'allow rw tag cephfs *=*',
-                     'mds', 'allow'],
-        })
+        # get mds. key
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
+                                             ['mon', 'profile mds',
+                                              'osd', 'allow rw tag cephfs *=*',
+                                              'mds', 'allow'])
         daemon_spec.keyring = keyring
 
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
         return daemon_spec
 
     def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
@@ -528,35 +777,38 @@ class MdsService(CephService):
         # if no mds found, return empty Daemon Desc
         return DaemonDescription()
 
+    def purge(self, service_name: str) -> None:
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': service_name,
+            'name': 'mds_join_fs',
+        })
+
 
 class RgwService(CephService):
     TYPE = 'rgw'
 
-    def config(self, spec: RGWSpec, rgw_id: str) -> None:
-        assert self.TYPE == spec.service_type
+    def allow_colo(self) -> bool:
+        return True
 
-        # create realm, zonegroup, and zone if needed
-        self.create_realm_zonegroup_zone(spec, rgw_id)
+    def config(self, spec: RGWSpec) -> None:  # type: ignore
+        assert self.TYPE == spec.service_type
 
-        # ensure rgw_realm and rgw_zone is set for these daemons
-        ret, out, err = self.mgr.check_mon_command({
-            'prefix': 'config set',
-            'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
-            'name': 'rgw_zone',
-            'value': spec.rgw_zone,
-        })
-        ret, out, err = self.mgr.check_mon_command({
-            'prefix': 'config set',
-            'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
-            'name': 'rgw_realm',
-            'value': spec.rgw_realm,
-        })
-        ret, out, err = self.mgr.check_mon_command({
-            'prefix': 'config set',
-            'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
-            'name': 'rgw_frontends',
-            'value': spec.rgw_frontends_config_value(),
-        })
+        # set rgw_realm and rgw_zone, if present
+        if spec.rgw_realm:
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+                'name': 'rgw_realm',
+                'value': spec.rgw_realm,
+            })
+        if spec.rgw_zone:
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+                'name': 'rgw_zone',
+                'value': spec.rgw_zone,
+            })
 
         if spec.rgw_frontend_ssl_certificate:
             if isinstance(spec.rgw_frontend_ssl_certificate, list):
@@ -569,199 +821,273 @@ class RgwService(CephService):
                     % spec.rgw_frontend_ssl_certificate)
             ret, out, err = self.mgr.check_mon_command({
                 'prefix': 'config-key set',
-                'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
+                'key': f'rgw/cert/{spec.service_name()}',
                 'val': cert_data,
             })
 
-        if spec.rgw_frontend_ssl_key:
-            if isinstance(spec.rgw_frontend_ssl_key, list):
-                key_data = '\n'.join(spec.rgw_frontend_ssl_key)
-            elif isinstance(spec.rgw_frontend_ssl_certificate, str):
-                key_data = spec.rgw_frontend_ssl_key
-            else:
-                raise OrchestratorError(
-                    'Invalid rgw_frontend_ssl_key: %s'
-                    % spec.rgw_frontend_ssl_key)
-            ret, out, err = self.mgr.check_mon_command({
-                'prefix': 'config-key set',
-                'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
-                'val': key_data,
-            })
-
+        # TODO: fail, if we don't have a spec
         logger.info('Saving service %s spec with placement %s' % (
             spec.service_name(), spec.placement.pretty_str()))
         self.mgr.spec_store.save(spec)
+        self.mgr.trigger_connect_dashboard_rgw()
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
-        rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
+        rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
+        spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
 
         keyring = self.get_keyring(rgw_id)
 
+        if daemon_spec.ports:
+            port = daemon_spec.ports[0]
+        else:
+            # this is a redeploy of older instance that doesn't have an explicitly
+            # assigned port, in which case we can assume there is only 1 per host
+            # and it matches the spec.
+            port = spec.get_port()
+
+        # configure frontend
+        args = []
+        ftype = spec.rgw_frontend_type or "beast"
+        if ftype == 'beast':
+            if spec.ssl:
+                if daemon_spec.ip:
+                    args.append(
+                        f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+                else:
+                    args.append(f"ssl_port={port}")
+                args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+            else:
+                if daemon_spec.ip:
+                    args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+                else:
+                    args.append(f"port={port}")
+        elif ftype == 'civetweb':
+            if spec.ssl:
+                if daemon_spec.ip:
+                    # note the 's' suffix on port
+                    args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
+                else:
+                    args.append(f"port={port}s")  # note the 's' suffix on port
+                args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
+            else:
+                if daemon_spec.ip:
+                    args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
+                else:
+                    args.append(f"port={port}")
+        frontend = f'{ftype} {" ".join(args)}'
+
+        ret, out, err = self.mgr.check_mon_command({
+            'prefix': 'config set',
+            'who': utils.name_to_config_section(daemon_spec.name()),
+            'name': 'rgw_frontends',
+            'value': frontend
+        })
+
         daemon_spec.keyring = keyring
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
 
         return daemon_spec
 
     def get_keyring(self, rgw_id: str) -> str:
-        ret, keyring, err = self.mgr.check_mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': self.get_auth_entity(rgw_id),
-            'caps': ['mon', 'allow *',
-                     'mgr', 'allow rw',
-                     'osd', 'allow rwx'],
-        })
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
+                                             ['mon', 'allow *',
+                                              'mgr', 'allow rw',
+                                              'osd', 'allow rwx tag rgw *=*'])
         return keyring
 
-    def create_realm_zonegroup_zone(self, spec: RGWSpec, rgw_id: str) -> None:
-        if utils.get_cluster_health(self.mgr) != 'HEALTH_OK':
-            raise OrchestratorError('Health not ok, will try agin when health ok')
-
-        # get keyring needed to run rados commands and strip out just the keyring
-        keyring = self.get_keyring(rgw_id).split('key = ', 1)[1].rstrip()
-
-        # We can call radosgw-admin within the container, cause cephadm gives the MGR the required keyring permissions
-
-        def get_realms() -> List[str]:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'realm', 'list',
-                   '--format=json']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            out = result.stdout
-            if not out:
-                return []
-            try:
-                j = json.loads(out)
-                return j.get('realms', [])
-            except Exception as e:
-                raise OrchestratorError('failed to parse realm info')
-
-        def create_realm() -> None:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'realm', 'create',
-                   '--rgw-realm=%s' % spec.rgw_realm,
-                   '--default']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            self.mgr.log.info('created realm: %s' % spec.rgw_realm)
-
-        def get_zonegroups() -> List[str]:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'zonegroup', 'list',
-                   '--format=json']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            out = result.stdout
-            if not out:
-                return []
-            try:
-                j = json.loads(out)
-                return j.get('zonegroups', [])
-            except Exception as e:
-                raise OrchestratorError('failed to parse zonegroup info')
-
-        def create_zonegroup() -> None:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'zonegroup', 'create',
-                   '--rgw-zonegroup=default',
-                   '--master', '--default']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            self.mgr.log.info('created zonegroup: default')
-
-        def create_zonegroup_if_required() -> None:
-            zonegroups = get_zonegroups()
-            if 'default' not in zonegroups:
-                create_zonegroup()
-
-        def get_zones() -> List[str]:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'zone', 'list',
-                   '--format=json']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            out = result.stdout
-            if not out:
-                return []
-            try:
-                j = json.loads(out)
-                return j.get('zones', [])
-            except Exception as e:
-                raise OrchestratorError('failed to parse zone info')
-
-        def create_zone() -> None:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'zone', 'create',
-                   '--rgw-zonegroup=default',
-                   '--rgw-zone=%s' % spec.rgw_zone,
-                   '--master', '--default']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            self.mgr.log.info('created zone: %s' % spec.rgw_zone)
-
-        changes = False
-        realms = get_realms()
-        if spec.rgw_realm not in realms:
-            create_realm()
-            changes = True
-
-        zones = get_zones()
-        if spec.rgw_zone not in zones:
-            create_zonegroup_if_required()
-            create_zone()
-            changes = True
-
-        # update period if changes were made
-        if changes:
-            cmd = ['radosgw-admin',
-                   '--key=%s' % keyring,
-                   '--user', 'rgw.%s' % rgw_id,
-                   'period', 'update',
-                   '--rgw-realm=%s' % spec.rgw_realm,
-                   '--commit']
-            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-            self.mgr.log.info('updated period')
+    def purge(self, service_name: str) -> None:
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(service_name),
+            'name': 'rgw_realm',
+        })
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(service_name),
+            'name': 'rgw_zone',
+        })
+        self.mgr.check_mon_command({
+            'prefix': 'config-key rm',
+            'key': f'rgw/cert/{service_name}',
+        })
+        self.mgr.trigger_connect_dashboard_rgw()
+
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
+        self.mgr.check_mon_command({
+            'prefix': 'config rm',
+            'who': utils.name_to_config_section(daemon.name()),
+            'name': 'rgw_frontends',
+        })
+
+    def ok_to_stop(
+            self,
+            daemon_ids: List[str],
+            force: bool = False,
+            known: Optional[List[str]] = None  # output argument
+    ) -> HandleCommandResult:
+        # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
+        # if no load balancer, warn if > 1 daemon, block if only 1 daemon
+        def ingress_present() -> bool:
+            running_ingress_daemons = [
+                daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
+            running_haproxy_daemons = [
+                daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
+            running_keepalived_daemons = [
+                daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
+            # check that there is at least one haproxy and keepalived daemon running
+            if running_haproxy_daemons and running_keepalived_daemons:
+                return True
+            return False
+
+        # if only 1 rgw, alert user (this is not passable with --force)
+        warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
+        if warn:
+            return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+        # if reached here, there is > 1 rgw daemon.
+        # Say okay if load balancer present or force flag set
+        if ingress_present() or force:
+            return HandleCommandResult(0, warn_message, '')
+
+        # if reached here, > 1 RGW daemon, no load balancer and no force flag.
+        # Provide warning
+        warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
+        return HandleCommandResult(-errno.EBUSY, '', warn_message)
+
+    def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+        self.mgr.trigger_connect_dashboard_rgw()
 
 
 class RbdMirrorService(CephService):
     TYPE = 'rbd-mirror'
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def allow_colo(self) -> bool:
+        return True
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
-        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+        daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
 
-        ret, keyring, err = self.mgr.check_mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': self.get_auth_entity(daemon_id),
-            'caps': ['mon', 'profile rbd-mirror',
-                     'osd', 'profile rbd'],
-        })
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
+                                             ['mon', 'profile rbd-mirror',
+                                              'osd', 'profile rbd'])
 
         daemon_spec.keyring = keyring
 
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
         return daemon_spec
 
+    def ok_to_stop(
+            self,
+            daemon_ids: List[str],
+            force: bool = False,
+            known: Optional[List[str]] = None  # output argument
+    ) -> HandleCommandResult:
+        # if only 1 rbd-mirror, alert user (this is not passable with --force)
+        warn, warn_message = self._enough_daemons_to_stop(
+            self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
+        if warn:
+            return HandleCommandResult(-errno.EBUSY, '', warn_message)
+        return HandleCommandResult(0, warn_message, '')
+
 
 class CrashService(CephService):
     TYPE = 'crash'
 
-    def prepare_create(self, daemon_spec: CephadmDaemonSpec) -> CephadmDaemonSpec:
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
         daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
 
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
+                                             ['mon', 'profile crash',
+                                              'mgr', 'profile crash'])
+
+        daemon_spec.keyring = keyring
+
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+        return daemon_spec
+
+
+class CephfsMirrorService(CephService):
+    TYPE = 'cephfs-mirror'
+
+    def config(self, spec: ServiceSpec) -> None:
+        # make sure mirroring module is enabled
+        mgr_map = self.mgr.get('mgr_map')
+        mod_name = 'mirroring'
+        if mod_name not in mgr_map.get('services', {}):
+            self.mgr.check_mon_command({
+                'prefix': 'mgr module enable',
+                'module': mod_name
+            })
+            # we shouldn't get here (mon will tell the mgr to respawn), but no
+            # harm done if we do.
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
-            'entity': self.get_auth_entity(daemon_id, host=host),
-            'caps': ['mon', 'profile crash',
-                     'mgr', 'profile crash'],
+            'entity': self.get_auth_entity(daemon_spec.daemon_id),
+            'caps': ['mon', 'profile cephfs-mirror',
+                     'mds', 'allow r',
+                     'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
+                     'mgr', 'allow r'],
         })
 
         daemon_spec.keyring = keyring
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+        return daemon_spec
+
+
+class CephadmAgent(CephService):
+    TYPE = 'agent'
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+        if not self.mgr.cherrypy_thread:
+            raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+        daemon_spec.keyring = keyring
+        self.mgr.agent_cache.agent_keys[host] = keyring
+
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
 
         return daemon_spec
+
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        try:
+            assert self.mgr.cherrypy_thread
+            assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+            assert self.mgr.cherrypy_thread.server_port
+        except Exception:
+            raise OrchestratorError(
+                'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
+
+        cfg = {'target_ip': self.mgr.get_mgr_ip(),
+               'target_port': self.mgr.cherrypy_thread.server_port,
+               'refresh_period': self.mgr.agent_refresh_rate,
+               'listener_port': self.mgr.agent_starting_port,
+               'host': daemon_spec.host,
+               'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
+
+        listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
+            self.mgr.inventory.get_addr(daemon_spec.host))
+        config = {
+            'agent.json': json.dumps(cfg),
+            'keyring': daemon_spec.keyring,
+            'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+            'listener.crt': listener_cert,
+            'listener.key': listener_key,
+        }
+
+        return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
+                               self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+                               str(self.mgr.get_module_option('device_enhanced_scan'))])