]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
index 7a7d6b4073a5e99ea587955189c12ebbfac018b7..8abb0e63a2c102181d7937c3cf8f99c357a4fbb9 100644 (file)
@@ -2,6 +2,8 @@ import errno
 import json
 import logging
 import re
+import socket
+import time
 from abc import ABCMeta, abstractmethod
 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
     Optional, Dict, Any, Tuple, NewType, cast
@@ -10,6 +12,7 @@ from mgr_module import HandleCommandResult, MonCommandFailed
 
 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
+from mgr_util import build_url
 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
 from orchestrator._interface import daemon_type_to_service
 from cephadm import utils
@@ -36,7 +39,8 @@ class CephadmDaemonDeploySpec:
                  ip: Optional[str] = None,
                  ports: Optional[List[int]] = None,
                  rank: Optional[int] = None,
-                 rank_generation: Optional[int] = None):
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None):
         """
         A data struction to encapsulate `cephadm deploy ...
         """
@@ -71,6 +75,8 @@ class CephadmDaemonDeploySpec:
         self.rank: Optional[int] = rank
         self.rank_generation: Optional[int] = rank_generation
 
+        self.extra_container_args = extra_container_args
+
     def name(self) -> str:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
@@ -95,6 +101,7 @@ class CephadmDaemonDeploySpec:
             ports=dd.ports,
             rank=dd.rank,
             rank_generation=dd.rank_generation,
+            extra_container_args=dd.extra_container_args,
         )
 
     def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
@@ -109,6 +116,7 @@ class CephadmDaemonDeploySpec:
             ports=self.ports,
             rank=self.rank,
             rank_generation=self.rank_generation,
+            extra_container_args=self.extra_container_args,
         )
 
 
@@ -170,6 +178,10 @@ class CephadmService(metaclass=ABCMeta):
             rank: Optional[int] = None,
             rank_generation: Optional[int] = None,
     ) -> CephadmDaemonDeploySpec:
+        try:
+            eca = spec.extra_container_args
+        except AttributeError:
+            eca = None
         return CephadmDaemonDeploySpec(
             host=host,
             daemon_id=daemon_id,
@@ -180,6 +192,7 @@ class CephadmService(metaclass=ABCMeta):
             ip=ip,
             rank=rank,
             rank_generation=rank_generation,
+            extra_container_args=eca,
         )
 
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
@@ -228,9 +241,14 @@ class CephadmService(metaclass=ABCMeta):
                 self.mgr.log.warning(f"Unable to update caps for {entity}")
         return keyring
 
-    def _inventory_get_addr(self, hostname: str) -> str:
-        """Get a host's address with its hostname."""
-        return self.mgr.inventory.get_addr(hostname)
+    def _inventory_get_fqdn(self, hostname: str) -> str:
+        """Get a host's FQDN with its hostname.
+
+           If the FQDN can't be resolved, the address from the inventory will
+           be returned instead.
+        """
+        addr = self.mgr.inventory.get_addr(hostname)
+        return socket.getfqdn(addr)
 
     def _set_service_url_on_dashboard(self,
                                       service_name: str,
@@ -401,7 +419,7 @@ class CephadmService(metaclass=ABCMeta):
         assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
         logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
         """
         Called after the daemon is removed.
         """
@@ -429,8 +447,8 @@ class CephService(CephadmService):
 
         return cephadm_config, []
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
-        super().post_remove(daemon)
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
         self.remove_keyring(daemon)
 
     def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
@@ -441,9 +459,10 @@ class CephService(CephadmService):
         # the CephService class refers to service types, not daemon types
         if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
             return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
-        elif self.TYPE == 'crash':
+        elif self.TYPE in ['crash', 'agent']:
             if host == "":
-                raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+                raise OrchestratorError(
+                    f'Host not provided to generate <{self.TYPE}> auth entity name')
             return AuthEntity(f'client.{self.TYPE}.{host}')
         elif self.TYPE == 'mon':
             return AuthEntity('mon.')
@@ -483,9 +502,7 @@ class CephService(CephadmService):
         daemon_id: str = daemon.daemon_id
         host: str = daemon.hostname
 
-        if daemon_id == 'mon':
-            # do not remove the mon keyring
-            return
+        assert daemon.daemon_type != 'mon'
 
         entity = self.get_auth_entity(daemon_id, host=host)
 
@@ -586,6 +603,11 @@ class MonService(CephService):
             'name': daemon_id,
         })
 
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        # Do not remove the mon keyring.
+        # super().post_remove(daemon)
+        pass
+
 
 class MgrService(CephService):
     TYPE = 'mgr'
@@ -651,19 +673,31 @@ class MgrService(CephService):
         return DaemonDescription()
 
     def fail_over(self) -> None:
-        if not self.mgr_map_has_standby():
-            raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
-                'daemon', 'mgr' + self.mgr.get_mgr_id()))
-
-        self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
-                                   'INFO', 'Failing over to other MGR')
-        logger.info('Failing over to other MGR')
-
-        # fail over
-        ret, out, err = self.mgr.check_mon_command({
-            'prefix': 'mgr fail',
-            'who': self.mgr.get_mgr_id(),
-        })
+        # this has been seen to sometimes transiently fail even when there are multiple
+        # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+        class NoStandbyError(OrchestratorError):
+            pass
+        no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+            'daemon', 'mgr' + self.mgr.get_mgr_id()))
+        for sleep_secs in [2, 8, 15]:
+            try:
+                if not self.mgr_map_has_standby():
+                    raise no_standby_exc
+                self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+                                           'INFO', 'Failing over to other MGR')
+                logger.info('Failing over to other MGR')
+
+                # fail over
+                ret, out, err = self.mgr.check_mon_command({
+                    'prefix': 'mgr fail',
+                    'who': self.mgr.get_mgr_id(),
+                })
+                return
+            except NoStandbyError:
+                logger.info(
+                    f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+                time.sleep(sleep_secs)
+        raise no_standby_exc
 
     def mgr_map_has_standby(self) -> bool:
         """
@@ -818,25 +852,27 @@ class RgwService(CephService):
         if ftype == 'beast':
             if spec.ssl:
                 if daemon_spec.ip:
-                    args.append(f"ssl_endpoint={daemon_spec.ip}:{port}")
+                    args.append(
+                        f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"ssl_port={port}")
                 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
             else:
                 if daemon_spec.ip:
-                    args.append(f"endpoint={daemon_spec.ip}:{port}")
+                    args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"port={port}")
         elif ftype == 'civetweb':
             if spec.ssl:
                 if daemon_spec.ip:
-                    args.append(f"port={daemon_spec.ip}:{port}s")  # note the 's' suffix on port
+                    # note the 's' suffix on port
+                    args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
                 else:
                     args.append(f"port={port}s")  # note the 's' suffix on port
                 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
             else:
                 if daemon_spec.ip:
-                    args.append(f"port={daemon_spec.ip}:{port}")
+                    args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"port={port}")
         frontend = f'{ftype} {" ".join(args)}'
@@ -877,8 +913,8 @@ class RgwService(CephService):
         })
         self.mgr.trigger_connect_dashboard_rgw()
 
-    def post_remove(self, daemon: DaemonDescription) -> None:
-        super().post_remove(daemon)
+    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+        super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
         self.mgr.check_mon_command({
             'prefix': 'config rm',
             'who': utils.name_to_config_section(daemon.name()),
@@ -979,6 +1015,18 @@ class CrashService(CephService):
 class CephfsMirrorService(CephService):
     TYPE = 'cephfs-mirror'
 
+    def config(self, spec: ServiceSpec) -> None:
+        # make sure mirroring module is enabled
+        mgr_map = self.mgr.get('mgr_map')
+        mod_name = 'mirroring'
+        if mod_name not in mgr_map.get('services', {}):
+            self.mgr.check_mon_command({
+                'prefix': 'mgr module enable',
+                'module': mod_name
+            })
+            # we shouldn't get here (mon will tell the mgr to respawn), but no
+            # harm done if we do.
+
     def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
         assert self.TYPE == daemon_spec.daemon_type
 
@@ -994,3 +1042,52 @@ class CephfsMirrorService(CephService):
         daemon_spec.keyring = keyring
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
         return daemon_spec
+
+
+class CephadmAgent(CephService):
+    TYPE = 'agent'
+
+    def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+        if not self.mgr.cherrypy_thread:
+            raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+        keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+        daemon_spec.keyring = keyring
+        self.mgr.agent_cache.agent_keys[host] = keyring
+
+        daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+        return daemon_spec
+
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        try:
+            assert self.mgr.cherrypy_thread
+            assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+            assert self.mgr.cherrypy_thread.server_port
+        except Exception:
+            raise OrchestratorError(
+                'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
+
+        cfg = {'target_ip': self.mgr.get_mgr_ip(),
+               'target_port': self.mgr.cherrypy_thread.server_port,
+               'refresh_period': self.mgr.agent_refresh_rate,
+               'listener_port': self.mgr.agent_starting_port,
+               'host': daemon_spec.host,
+               'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
+
+        listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
+            self.mgr.inventory.get_addr(daemon_spec.host))
+        config = {
+            'agent.json': json.dumps(cfg),
+            'keyring': daemon_spec.keyring,
+            'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+            'listener.crt': listener_cert,
+            'listener.key': listener_key,
+        }
+
+        return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
+                               self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+                               str(self.mgr.get_module_option('device_enhanced_scan'))])