]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
index f5ec2d908c42b51509f093ffd88e7f0c2e904b9b..8abb0e63a2c102181d7937c3cf8f99c357a4fbb9 100644 (file)
@@ -2,6 +2,8 @@ import errno
 import json
 import logging
 import re
+import socket
+import time
 from abc import ABCMeta, abstractmethod
 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
     Optional, Dict, Any, Tuple, NewType, cast
@@ -10,6 +12,7 @@ from mgr_module import HandleCommandResult, MonCommandFailed
 
 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
+from mgr_util import build_url
 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
 from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
@@ -34,7 +37,10 @@ class CephadmDaemonDeploySpec:
                  extra_files: Optional[Dict[str, Any]] = None,
                  daemon_type: Optional[str] = None,
                  ip: Optional[str] = None,
-                 ports: Optional[List[int]] = None):
+                 ports: Optional[List[int]] = None,
+                 rank: Optional[int] = None,
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None):
         """
         A data struction to encapsulate `cephadm deploy ...
         """
@@ -66,6 +72,11 @@ class CephadmDaemonDeploySpec:
         self.final_config: Dict[str, Any] = {}
         self.deps: List[str] = []
 
+        self.rank: Optional[int] = rank
+        self.rank_generation: Optional[int] = rank_generation
+
+        self.extra_container_args = extra_container_args
+
     def name(self) -> str:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
@@ -88,17 +99,24 @@ class CephadmDaemonDeploySpec:
             service_name=dd.service_name(),
             ip=dd.ip,
             ports=dd.ports,
+            rank=dd.rank,
+            rank_generation=dd.rank_generation,
+            extra_container_args=dd.extra_container_args,
         )
 
     def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
         return DaemonDescription(
             daemon_type=self.daemon_type,
             daemon_id=self.daemon_id,
+            service_name=self.service_name,
             hostname=self.host,
             status=status,
             status_desc=status_desc,
             ip=self.ip,
             ports=self.ports,
+            rank=self.rank,
+            rank_generation=self.rank_generation,
+            extra_container_args=self.extra_container_args,
         )
 
 
@@ -116,23 +134,54 @@ class CephadmService(metaclass=ABCMeta):
         self.mgr: "CephadmOrchestrator" = mgr
 
     def allow_colo(self) -> bool:
+        """
+        Return True if multiple daemons of the same type can colocate on
+        the same host.
+        """
         return False
 
+    def primary_daemon_type(self) -> str:
+        """
+        This is the type of the primary (usually only) daemon to be deployed.
+        """
+        return self.TYPE
+
     def per_host_daemon_type(self) -> Optional[str]:
+        """
+        If defined, this type of daemon will be deployed once for each host
+        containing one or more daemons of the primary type.
+        """
         return None
 
-    def primary_daemon_type(self) -> str:
-        return self.TYPE
+    def ranked(self) -> bool:
+        """
+        If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+        generation (0, 1, ...) to each daemon we create/deploy.
+        """
+        return False
+
+    def fence_old_ranks(self,
+                        spec: ServiceSpec,
+                        rank_map: Dict[int, Dict[int, Optional[str]]],
+                        num_ranks: int) -> None:
+        assert False
 
     def make_daemon_spec(
-            self, host: str,
+            self,
+            host: str,
             daemon_id: str,
             network: str,
             spec: ServiceSpecs,
             daemon_type: Optional[str] = None,
             ports: Optional[List[int]] = None,
             ip: Optional[str] = None,
+            rank: Optional[int] = None,
+            rank_generation: Optional[int] = None,
     ) -> CephadmDaemonDeploySpec:
+        try:
+            eca = spec.extra_container_args
+        except AttributeError:
+            eca = None
         return CephadmDaemonDeploySpec(
             host=host,
             daemon_id=daemon_id,
@@ -141,6 +190,9 @@ class CephadmService(metaclass=ABCMeta):
             daemon_type=daemon_type,
             ports=ports,
             ip=ip,
+            rank=rank,
+            rank_generation=rank_generation,
+            extra_container_args=eca,
         )
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
@@ -149,7 +201,7 @@ class CephadmService(metaclass=ABCMeta):
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
         raise NotImplementedError()
 
-    def config(self, spec: ServiceSpec, daemon_id: str) -> None:
+    def config(self, spec: ServiceSpec) -> None:
         """
         Configure the cluster for this service. Only called *once* per
         service apply. Not for every daemon.
@@ -189,9 +241,14 @@ class CephadmService(metaclass=ABCMeta):
                 self.mgr.log.warning(f"Unable to update caps for {entity}")
         return keyring
 
-    def _inventory_get_addr(self, hostname: str) -> str:
-        """Get a host's address with its hostname."""
-        return self.mgr.inventory.get_addr(hostname)
+    def _inventory_get_fqdn(self, hostname: str) -> str:
+        """Get a host's FQDN with its hostname.
+
+           If the FQDN can't be resolved, the address from the inventory will
+           be returned instead.
+        """
+        addr = self.mgr.inventory.get_addr(hostname)
+        return socket.getfqdn(addr)
 
     def _set_service_url_on_dashboard(self,
                                       service_name: str,
@@ -362,7 +419,7 @@ class CephadmService(metaclass=ABCMeta):
         assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
         logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
         """
         Called after the daemon is removed.
         """
@@ -390,8 +447,8 @@ class CephService(CephadmService):
 
         return cephadm_config, []
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
-        super().post_remove(daemon)
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
         self.remove_keyring(daemon)
 
     def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
@@ -402,9 +459,10 @@ class CephService(CephadmService):
         # the CephService class refers to service types, not daemon types
         if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
             return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
-        elif self.TYPE == 'crash':
+        elif self.TYPE in ['crash', 'agent']:
             if host == "":
-                raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+                raise OrchestratorError(
+                    f'Host not provided to generate <{self.TYPE}> auth entity name')
             return AuthEntity(f'client.{self.TYPE}.{host}')
         elif self.TYPE == 'mon':
             return AuthEntity('mon.')
@@ -444,9 +502,7 @@ class CephService(CephadmService):
         daemon_id: str = daemon.daemon_id
         host: str = daemon.hostname
 
-        if daemon_id == 'mon':
-            # do not remove the mon keyring
-            return
+        assert daemon.daemon_type != 'mon'
 
         entity = self.get_auth_entity(daemon_id, host=host)
 
@@ -547,10 +603,27 @@ class MonService(CephService):
             'name': daemon_id,
         })
 
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        # Do not remove the mon keyring.
+        # super().post_remove(daemon)
+        pass
+
 
 class MgrService(CephService):
     TYPE = 'mgr'
 
+    def allow_colo(self) -> bool:
+        if self.mgr.get_ceph_option('mgr_standby_modules'):
+            # traditional mgr mode: standby daemons' modules listen on
+            # ports and redirect to the primary.  we must not schedule
+            # multiple mgrs on the same host or else ports will
+            # conflict.
+            return False
+        else:
+            # standby daemons do nothing, and therefore port conflicts
+            # are not a concern.
+            return True
+
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         """
         Create a new manager instance on a host.
@@ -600,19 +673,31 @@ class MgrService(CephService):
         return DaemonDescription()
 
     def fail_over(self) -> None:
-        if not self.mgr_map_has_standby():
-            raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
-                'daemon', 'mgr' + self.mgr.get_mgr_id()))
-
-        self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
-                                   'INFO', 'Failing over to other MGR')
-        logger.info('Failing over to other MGR')
-
-        # fail over
-        ret, out, err = self.mgr.check_mon_command({
-            'prefix': 'mgr fail',
-            'who': self.mgr.get_mgr_id(),
-        })
+        # this has been seen to sometimes transiently fail even when there are multiple
+        # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+        class NoStandbyError(OrchestratorError):
+            pass
+        no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+            'daemon', 'mgr' + self.mgr.get_mgr_id()))
+        for sleep_secs in [2, 8, 15]:
+            try:
+                if not self.mgr_map_has_standby():
+                    raise no_standby_exc
+                self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+                                           'INFO', 'Failing over to other MGR')
+                logger.info('Failing over to other MGR')
+
+                # fail over
+                ret, out, err = self.mgr.check_mon_command({
+                    'prefix': 'mgr fail',
+                    'who': self.mgr.get_mgr_id(),
+                })
+                return
+            except NoStandbyError:
+                logger.info(
+                    f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+                time.sleep(sleep_secs)
+        raise no_standby_exc
 
     def mgr_map_has_standby(self) -> bool:
         """
@@ -650,7 +735,7 @@ class MdsService(CephService):
     def allow_colo(self) -> bool:
         return True
 
-    def config(self, spec: ServiceSpec, daemon_id: str) -> None:
+    def config(self, spec: ServiceSpec) -> None:
         assert self.TYPE == spec.service_type
         assert spec.service_id
 
@@ -706,7 +791,7 @@ class RgwService(CephService):
     def allow_colo(self) -> bool:
         return True
 
-    def config(self, spec: RGWSpec, rgw_id: str) -> None:  # type: ignore
+    def config(self, spec: RGWSpec) -> None:  # type: ignore
         assert self.TYPE == spec.service_type
 
         # set rgw_realm and rgw_zone, if present
@@ -744,6 +829,7 @@ class RgwService(CephService):
         logger.info('Saving service %s spec with placement %s' % (
             spec.service_name(), spec.placement.pretty_str()))
         self.mgr.spec_store.save(spec)
+        self.mgr.trigger_connect_dashboard_rgw()
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
@@ -766,25 +852,27 @@ class RgwService(CephService):
         if ftype == 'beast':
             if spec.ssl:
                 if daemon_spec.ip:
-                    args.append(f"ssl_endpoint={daemon_spec.ip}:{port}")
+                    args.append(
+                        f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"ssl_port={port}")
                 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
             else:
                 if daemon_spec.ip:
-                    args.append(f"endpoint={daemon_spec.ip}:{port}")
+                    args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"port={port}")
         elif ftype == 'civetweb':
             if spec.ssl:
                 if daemon_spec.ip:
-                    args.append(f"port={daemon_spec.ip}:{port}s")  # note the 's' suffix on port
+                    # note the 's' suffix on port
+                    args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
                 else:
                     args.append(f"port={port}s")  # note the 's' suffix on port
                 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
             else:
                 if daemon_spec.ip:
-                    args.append(f"port={daemon_spec.ip}:{port}")
+                    args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"port={port}")
         frontend = f'{ftype} {" ".join(args)}'
@@ -823,9 +911,10 @@ class RgwService(CephService):
             'prefix': 'config-key rm',
             'key': f'rgw/cert/{service_name}',
         })
+        self.mgr.trigger_connect_dashboard_rgw()
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
-        super().post_remove(daemon)
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
         self.mgr.check_mon_command({
             'prefix': 'config rm',
             'who': utils.name_to_config_section(daemon.name()),
@@ -867,6 +956,9 @@ class RgwService(CephService):
         warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
         return HandleCommandResult(-errno.EBUSY, '', warn_message)
 
+    def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+        self.mgr.trigger_connect_dashboard_rgw()
+
 
 class RbdMirrorService(CephService):
     TYPE = 'rbd-mirror'
@@ -923,13 +1015,25 @@ class CrashService(CephService):
 class CephfsMirrorService(CephService):
     TYPE = 'cephfs-mirror'
 
+    def config(self, spec: ServiceSpec) -> None:
+        # make sure mirroring module is enabled
+        mgr_map = self.mgr.get('mgr_map')
+        mod_name = 'mirroring'
+        if mod_name not in mgr_map.get('services', {}):
+            self.mgr.check_mon_command({
+                'prefix': 'mgr module enable',
+                'module': mod_name
+            })
+            # we shouldn't get here (mon will tell the mgr to respawn), but no
+            # harm done if we do.
+
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
 
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
             'entity': self.get_auth_entity(daemon_spec.daemon_id),
-            'caps': ['mon', 'allow r',
+            'caps': ['mon', 'profile cephfs-mirror',
                      'mds', 'allow r',
                      'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
                      'mgr', 'allow r'],
@@ -938,3 +1042,52 @@ class CephfsMirrorService(CephService):
         daemon_spec.keyring = keyring
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
         return daemon_spec
+
+
+class CephadmAgent(CephService):
+    TYPE = 'agent'
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+        if not self.mgr.cherrypy_thread:
+            raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+        daemon_spec.keyring = keyring
+        self.mgr.agent_cache.agent_keys[host] = keyring
+
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+        return daemon_spec
+
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        try:
+            assert self.mgr.cherrypy_thread
+            assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+            assert self.mgr.cherrypy_thread.server_port
+        except Exception:
+            raise OrchestratorError(
+                'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
+
+        cfg = {'target_ip': self.mgr.get_mgr_ip(),
+               'target_port': self.mgr.cherrypy_thread.server_port,
+               'refresh_period': self.mgr.agent_refresh_rate,
+               'listener_port': self.mgr.agent_starting_port,
+               'host': daemon_spec.host,
+               'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
+
+        listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
+            self.mgr.inventory.get_addr(daemon_spec.host))
+        config = {
+            'agent.json': json.dumps(cfg),
+            'keyring': daemon_spec.keyring,
+            'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+            'listener.crt': listener_cert,
+            'listener.key': listener_key,
+        }
+
+        return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
+                               self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+                               str(self.mgr.get_module_option('device_enhanced_scan'))])