]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
index e7c15132d79cba16fa24618ee5ab5977ca4083c4..5ab8810db0edf0a7a474b73ef1215592fd8a07c7 100644 (file)
@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, List, Callable, TypeVar, \
 
 from mgr_module import HandleCommandResult, MonCommandFailed
 
-from ceph.deployment.service_spec import ServiceSpec, RGWSpec, CephExporterSpec
+from ceph.deployment.service_spec import ServiceSpec, RGWSpec, CephExporterSpec, MONSpec
 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
 from mgr_util import build_url, merge_dicts
 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
@@ -169,13 +169,13 @@ class CephadmService(metaclass=ABCMeta):
         """
         return False
 
-    def primary_daemon_type(self) -> str:
+    def primary_daemon_type(self, spec: Optional[ServiceSpec] = None) -> str:
         """
         This is the type of the primary (usually only) daemon to be deployed.
         """
         return self.TYPE
 
-    def per_host_daemon_type(self) -> Optional[str]:
+    def per_host_daemon_type(self, spec: Optional[ServiceSpec] = None) -> Optional[str]:
         """
         If defined, this type of daemon will be deployed once for each host
         containing one or more daemons of the primary type.
@@ -249,7 +249,7 @@ class CephadmService(metaclass=ABCMeta):
         raise NotImplementedError()
 
     def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
-        # if this is called for a service type where it hasn't explcitly been
+        # if this is called for a service type where it hasn't explicitly been
         # defined, return empty Daemon Desc
         return DaemonDescription()
 
@@ -599,22 +599,29 @@ class MonService(CephService):
 
         return daemon_spec
 
-    def _check_safe_to_destroy(self, mon_id: str) -> None:
+    def config(self, spec: ServiceSpec) -> None:
+        assert self.TYPE == spec.service_type
+        self.set_crush_locations(self.mgr.cache.get_daemons_by_type('mon'), spec)
+
+    def _get_quorum_status(self) -> Dict[Any, Any]:
         ret, out, err = self.mgr.check_mon_command({
             'prefix': 'quorum_status',
         })
         try:
             j = json.loads(out)
-        except Exception:
-            raise OrchestratorError('failed to parse quorum status')
+        except Exception as e:
+            raise OrchestratorError(f'failed to parse mon quorum status: {e}')
+        return j
 
-        mons = [m['name'] for m in j['monmap']['mons']]
+    def _check_safe_to_destroy(self, mon_id: str) -> None:
+        quorum_status = self._get_quorum_status()
+        mons = [m['name'] for m in quorum_status['monmap']['mons']]
         if mon_id not in mons:
             logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
                 mon_id, mons))
             return
         new_mons = [m for m in mons if m != mon_id]
-        new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+        new_quorum = [m for m in quorum_status['quorum_names'] if m != mon_id]
         if len(new_quorum) > len(new_mons) / 2:
             logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
                         (mon_id, new_quorum, new_mons))
@@ -641,6 +648,66 @@ class MonService(CephService):
         # super().post_remove(daemon)
         pass
 
+    def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        daemon_spec.final_config, daemon_spec.deps = super().generate_config(daemon_spec)
+
+        # realistically, we expect there to always be a mon spec
+        # in a real deployment, but the way teuthology deploys some daemons
+        # it's possible there might not be. For that reason we need to
+        # verify the service is present in the spec store.
+        if daemon_spec.service_name in self.mgr.spec_store:
+            mon_spec = cast(MONSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
+            if mon_spec.crush_locations:
+                if daemon_spec.host in mon_spec.crush_locations:
+                    # the --crush-location flag only supports a single bucket=loc pair so
+                    # others will have to be handled later. The idea is to set the flag
+                    # for the first bucket=loc pair in the list in order to facilitate
+                    # replacing a tiebreaker mon (https://docs.ceph.com/en/quincy/rados/operations/stretch-mode/#other-commands)
+                    c_loc = mon_spec.crush_locations[daemon_spec.host][0]
+                    daemon_spec.final_config['crush_location'] = c_loc
+
+        return daemon_spec.final_config, daemon_spec.deps
+
+    def set_crush_locations(self, daemon_descrs: List[DaemonDescription], spec: ServiceSpec) -> None:
+        logger.debug('Setting mon crush locations from spec')
+        if not daemon_descrs:
+            return
+        assert self.TYPE == spec.service_type
+        mon_spec = cast(MONSpec, spec)
+
+        if not mon_spec.crush_locations:
+            return
+
+        quorum_status = self._get_quorum_status()
+        mons_in_monmap = [m['name'] for m in quorum_status['monmap']['mons']]
+        for dd in daemon_descrs:
+            assert dd.daemon_id is not None
+            assert dd.hostname is not None
+            if dd.hostname not in mon_spec.crush_locations:
+                continue
+            if dd.daemon_id not in mons_in_monmap:
+                continue
+            # expected format for crush_locations from the quorum status is
+            # {bucket1=loc1,bucket2=loc2} etc. for the number of bucket=loc pairs
+            try:
+                current_crush_locs = [m['crush_location'] for m in quorum_status['monmap']['mons'] if m['name'] == dd.daemon_id][0]
+            except (KeyError, IndexError) as e:
+                logger.warning(f'Failed setting crush location for mon {dd.daemon_id}: {e}\n'
+                               'Mon may not have a monmap entry yet. Try re-applying mon spec once mon is confirmed up.')
+            desired_crush_locs = '{' + ','.join(mon_spec.crush_locations[dd.hostname]) + '}'
+            logger.debug(f'Found spec defined crush locations for mon on {dd.hostname}: {desired_crush_locs}')
+            logger.debug(f'Current crush locations for mon on {dd.hostname}: {current_crush_locs}')
+            if current_crush_locs != desired_crush_locs:
+                logger.info(f'Setting crush location for mon {dd.daemon_id} to {desired_crush_locs}')
+                try:
+                    ret, out, err = self.mgr.check_mon_command({
+                        'prefix': 'mon set_location',
+                        'name': dd.daemon_id,
+                        'args': mon_spec.crush_locations[dd.hostname]
+                    })
+                except Exception as e:
+                    logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}')
+
 
 class MgrService(CephService):
     TYPE = 'mgr'
@@ -690,6 +757,7 @@ class MgrService(CephService):
         if ports:
             daemon_spec.ports = ports
 
+        daemon_spec.ports.append(self.mgr.service_discovery_port)
         daemon_spec.keyring = keyring
 
         daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
@@ -827,7 +895,7 @@ class RgwService(CephService):
     def config(self, spec: RGWSpec) -> None:  # type: ignore
         assert self.TYPE == spec.service_type
 
-        # set rgw_realm and rgw_zone, if present
+        # set rgw_realm rgw_zonegroup and rgw_zone, if present
         if spec.rgw_realm:
             ret, out, err = self.mgr.check_mon_command({
                 'prefix': 'config set',
@@ -835,6 +903,13 @@ class RgwService(CephService):
                 'name': 'rgw_realm',
                 'value': spec.rgw_realm,
             })
+        if spec.rgw_zonegroup:
+            ret, out, err = self.mgr.check_mon_command({
+                'prefix': 'config set',
+                'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
+                'name': 'rgw_zonegroup',
+                'value': spec.rgw_zonegroup,
+            })
         if spec.rgw_zone:
             ret, out, err = self.mgr.check_mon_command({
                 'prefix': 'config set',
@@ -908,6 +983,12 @@ class RgwService(CephService):
                     args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
                 else:
                     args.append(f"port={port}")
+        else:
+            raise OrchestratorError(f'Invalid rgw_frontend_type parameter: {ftype}. Valid values are: beast, civetweb.')
+
+        if spec.rgw_frontend_extra_args is not None:
+            args.extend(spec.rgw_frontend_extra_args)
+
         frontend = f'{ftype} {" ".join(args)}'
 
         ret, out, err = self.mgr.check_mon_command({
@@ -1112,7 +1193,7 @@ class CephadmAgent(CephService):
         assert self.TYPE == daemon_spec.daemon_type
         daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
 
-        if not self.mgr.cherrypy_thread:
+        if not self.mgr.http_server.agent:
             raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
 
         keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
@@ -1124,31 +1205,31 @@ class CephadmAgent(CephService):
         return daemon_spec
 
     def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+        agent = self.mgr.http_server.agent
         try:
-            assert self.mgr.cherrypy_thread
-            assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
-            assert self.mgr.cherrypy_thread.server_port
+            assert agent
+            assert agent.ssl_certs.get_root_cert()
+            assert agent.server_port
         except Exception:
             raise OrchestratorError(
                 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
 
         cfg = {'target_ip': self.mgr.get_mgr_ip(),
-               'target_port': self.mgr.cherrypy_thread.server_port,
+               'target_port': agent.server_port,
                'refresh_period': self.mgr.agent_refresh_rate,
                'listener_port': self.mgr.agent_starting_port,
                'host': daemon_spec.host,
                'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
 
-        listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
-            self.mgr.inventory.get_addr(daemon_spec.host))
+        listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
         config = {
             'agent.json': json.dumps(cfg),
             'keyring': daemon_spec.keyring,
-            'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+            'root_cert.pem': agent.ssl_certs.get_root_cert(),
             'listener.crt': listener_cert,
             'listener.key': listener_key,
         }
 
-        return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
-                               self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+        return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port),
+                               agent.ssl_certs.get_root_cert(),
                                str(self.mgr.get_module_option('device_enhanced_scan'))])