]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
index 6acbb60493b5ca0a90a96d6f1f1411dfeeac9cc3..c42cc121d61fcda4e8425d2791c34d28de6acb37 100644 (file)
@@ -1,7 +1,11 @@
+import json
+import re
 import logging
-from typing import TYPE_CHECKING, List
+import subprocess
+from abc import ABCMeta, abstractmethod
+from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic,  Optional, Dict, Any, Tuple
 
-from mgr_module import MonCommandFailed
+from mgr_module import HandleCommandResult, MonCommandFailed
 
 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
 from orchestrator import OrchestratorError, DaemonDescription
@@ -12,22 +16,110 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
 
-class CephadmService:
+
+class CephadmDaemonSpec(Generic[ServiceSpecs]):
+    # typing.NamedTuple + Generic is broken in py36
+    def __init__(self, host: str, daemon_id,
+                 spec: Optional[ServiceSpecs]=None,
+                 network: Optional[str]=None,
+                 keyring: Optional[str]=None,
+                 extra_args: Optional[List[str]]=None,
+                 extra_config: Optional[Dict[str, Any]]=None,
+                 daemon_type: Optional[str]=None,
+                 ports: Optional[List[int]]=None,):
+        """
+        Used for
+        * deploying new daemons. then everything is set
+        * redeploying existing daemons, then only the first three attrs are set.
+
+        Would be great to have a consistent usage where all properties are set.
+        """
+        self.host: str = host
+        self.daemon_id = daemon_id
+        daemon_type = daemon_type or (spec.service_type if spec else None)
+        assert daemon_type is not None
+        self.daemon_type: str = daemon_type
+
+        # would be great to have the spec always available:
+        self.spec: Optional[ServiceSpecs] = spec
+
+        # mons
+        self.network = network
+
+        # for run_cephadm.
+        self.keyring: Optional[str] = keyring
+
+        # For run_cephadm. Would be great to have more expressive names.
+        self.extra_args: List[str] = extra_args or []
+        self.extra_config: Dict[str, Any] = extra_config or {}
+
+        # TCP ports used by the daemon
+        self.ports:  List[int] = ports or []
+
+
+    def name(self) -> str:
+        return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+
+class CephadmService(metaclass=ABCMeta):
     """
     Base class for service types. Often providing a create() and config() fn.
     """
+
+    @property
+    @abstractmethod
+    def TYPE(self):
+        pass
+
     def __init__(self, mgr: "CephadmOrchestrator"):
         self.mgr: "CephadmOrchestrator" = mgr
 
+    def make_daemon_spec(self, host, daemon_id, netowrk, spec: ServiceSpecs) -> CephadmDaemonSpec:
+        return CephadmDaemonSpec(
+            host=host,
+            daemon_id=daemon_id,
+            spec=spec,
+            network=netowrk
+        )
+
+    def create(self, daemon_spec: CephadmDaemonSpec):
+        raise NotImplementedError()
+
+    def generate_config(self, daemon_spec: CephadmDaemonSpec) -> Tuple[Dict[str, Any], List[str]]:
+        # Ceph.daemons (mon, mgr, mds, osd, etc)
+        cephadm_config = self.mgr._get_config_and_keyring(
+            daemon_spec.daemon_type,
+            daemon_spec.daemon_id,
+            host=daemon_spec.host,
+            keyring=daemon_spec.keyring,
+            extra_ceph_config=daemon_spec.extra_config.pop('config', ''))
+
+
+        if daemon_spec.extra_config:
+            cephadm_config.update({'files': daemon_spec.extra_config})
+
+        return cephadm_config, []
+
     def daemon_check_post(self, daemon_descrs: List[DaemonDescription]):
         """The post actions needed to be done after daemons are checked"""
+        if self.mgr.config_dashboard:
+            if 'dashboard' in self.mgr.get('mgr_map')['modules']:
+                self.config_dashboard(daemon_descrs)
+            else:
+                logger.debug('Dashboard is not enabled. Skip configuration.')
+
+    def config_dashboard(self, daemon_descrs: List[DaemonDescription]):
+        """Config dashboard settings."""
         raise NotImplementedError()
 
     def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
-        raise NotImplementedError()
+        # if this is called for a service type where it hasn't explcitly been
+        # defined, return empty Daemon Desc
+        return DaemonDescription()
 
-    def _inventory_get_addr(self, hostname: str):
+    def _inventory_get_addr(self, hostname: str) -> str:
         """Get a host's address with its hostname."""
         return self.mgr.inventory.get_addr(hostname)
 
@@ -36,32 +128,118 @@ class CephadmService:
                                       get_mon_cmd: str,
                                       set_mon_cmd: str,
                                       service_url: str):
-        """A helper to get and set service_url via Dashboard's MON command."""
+        """A helper to get and set service_url via Dashboard's MON command.
+
+           If result of get_mon_cmd differs from service_url, set_mon_cmd will
+           be sent to set the service_url.
+        """
+        def get_set_cmd_dicts(out: str) -> List[dict]:
+            cmd_dict = {
+                'prefix': set_mon_cmd,
+                'value': service_url
+            }
+            return [cmd_dict] if service_url != out else []
+
+        self._check_and_set_dashboard(
+            service_name=service_name,
+            get_cmd=get_mon_cmd,
+            get_set_cmd_dicts=get_set_cmd_dicts
+        )
+
+    def _check_and_set_dashboard(self,
+                                 service_name: str,
+                                 get_cmd: str,
+                                 get_set_cmd_dicts: Callable[[str], List[dict]]):
+        """A helper to set configs in the Dashboard.
+
+        The method is useful for the pattern:
+            - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
+              gateways.
+            - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
+            - Determine if the config need to be update. NOTE: This step is important because if a
+              Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
+              kicks the serve() loop and the logic using this method is likely to be called again.
+              A config should be updated only when needed.
+            - Update a config in Dashboard by using a Dashboard command.
+
+        :param service_name: the service name to be used for logging
+        :type service_name: str
+        :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
+        :type get_cmd: str
+        :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
+            e.g.
+            [
+                {
+                   'prefix': 'dashboard iscsi-gateway-add',
+                   'service_url': 'http://admin:admin@aaa:5000',
+                   'name': 'aaa'
+                },
+                {
+                    'prefix': 'dashboard iscsi-gateway-add',
+                    'service_url': 'http://admin:admin@bbb:5000',
+                    'name': 'bbb'
+                }
+            ]
+            The function should return empty list if no command need to be sent.
+        :type get_set_cmd_dicts: Callable[[str], List[dict]]
+        """
+
         try:
             _, out, _ = self.mgr.check_mon_command({
-                'prefix': get_mon_cmd
+                'prefix': get_cmd
             })
         except MonCommandFailed as e:
-            logger.warning('Failed to get service URL for %s: %s', service_name, e)
+            logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
             return
-        if out.strip() != service_url:
+        cmd_dicts = get_set_cmd_dicts(out.strip())
+        for cmd_dict in list(cmd_dicts):
             try:
-                logger.info(
-                    'Setting service URL %s for %s in the Dashboard', service_url, service_name)
-                _, out, _ = self.mgr.check_mon_command({
-                    'prefix': set_mon_cmd,
-                    'value': service_url,
-                })
+                logger.info('Setting Dashboard config for %s: command: %s', service_name, cmd_dict)
+                _, out, _ = self.mgr.check_mon_command(cmd_dict)
             except MonCommandFailed as e:
-                logger.warning('Failed to set service URL %s for %s in the Dashboard: %s',
-                               service_url, service_name, e)
+                logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
+
+
+
+    def ok_to_stop(self, daemon_ids: List[str]) -> HandleCommandResult:
+        names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
+        out = f'It is presumed safe to stop {names}'
+        err = f'It is NOT safe to stop {names}'
+
+        if self.TYPE not in ['mon', 'osd', 'mds']:
+            logger.info(out)
+            return HandleCommandResult(0, out, None)
+
+        r = HandleCommandResult(*self.mgr.mon_command({
+            'prefix': f'{self.TYPE} ok-to-stop',
+            'ids': daemon_ids,
+        }))
+
+        if r.retval:
+            err = f'{err}: {r.stderr}' if r.stderr else err
+            logger.error(err)
+            return HandleCommandResult(r.retval, r.stdout, err)
 
+        out = f'{out}: {r.stdout}' if r.stdout else out
+        logger.info(out)
+        return HandleCommandResult(r.retval, out, r.stderr)
+
+    def pre_remove(self, daemon_id: str) -> None:
+        """
+        Called before the daemon is removed.
+        """
+        pass
 
 class MonService(CephadmService):
-    def create(self, name, host, network):
+    TYPE = 'mon'
+
+    def create(self, daemon_spec: CephadmDaemonSpec) -> str:
         """
         Create a new monitor on the given host.
         """
+        assert self.TYPE == daemon_spec.daemon_type
+        name, host, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
+
         # get mon. key
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get',
@@ -86,23 +264,61 @@ class MonService(CephadmService):
                 'who': 'mon',
                 'key': 'public_network',
             })
-            network = network.strip() # type: ignore
+            network = network.strip() if network else network
             if not network:
                 raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
             if '/' not in network:
                 raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
             extra_config += 'public network = %s\n' % network
 
-        return self.mgr._create_daemon('mon', name, host,
-                                       keyring=keyring,
-                                       extra_config={'config': extra_config})
+        daemon_spec.extra_config={'config': extra_config}
+        daemon_spec.keyring=keyring
+
+        return self.mgr._create_daemon(daemon_spec)
+
+    def _check_safe_to_destroy(self, mon_id: str) -> None:
+        ret, out, err = self.mgr.check_mon_command({
+            'prefix': 'quorum_status',
+        })
+        try:
+            j = json.loads(out)
+        except Exception as e:
+            raise OrchestratorError('failed to parse quorum status')
+
+        mons = [m['name'] for m in j['monmap']['mons']]
+        if mon_id not in mons:
+            logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
+                mon_id, mons))
+            return
+        new_mons = [m for m in mons if m != mon_id]
+        new_quorum = [m for m in j['quorum_names'] if m != mon_id]
+        if len(new_quorum) > len(new_mons) / 2:
+            logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
+            return
+        raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
+
+
+    def pre_remove(self, daemon_id: str) -> None:
+        self._check_safe_to_destroy(daemon_id)
+
+        # remove mon from quorum before we destroy the daemon
+        logger.info('Removing monitor %s from monmap...' % daemon_id)
+        ret, out, err = self.mgr.check_mon_command({
+            'prefix': 'mon rm',
+            'name': daemon_id,
+        })
 
 
 class MgrService(CephadmService):
-    def create(self, mgr_id, host):
+    TYPE = 'mgr'
+
+    def create(self, daemon_spec: CephadmDaemonSpec) -> str:
         """
         Create a new manager instance on a host.
         """
+        assert self.TYPE == daemon_spec.daemon_type
+        mgr_id, host = daemon_spec.daemon_id, daemon_spec.host
+
         # get mgr. key
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
@@ -112,13 +328,49 @@ class MgrService(CephadmService):
                      'mds', 'allow *'],
         })
 
-        return self.mgr._create_daemon('mgr', mgr_id, host, keyring=keyring)
+
+        # Retrieve ports used by manager modules
+        # In the case of the dashboard port and with several manager daemons
+        # running in different hosts, it exists the possibility that the
+        # user has decided to use different dashboard ports in each server
+        # If this is the case then the dashboard port opened will be only the used
+        # as default.
+        ports = []
+        config_ports = ''
+        ret, mgr_services, err = self.mgr.check_mon_command({
+                'prefix': 'mgr services',
+        })
+        if mgr_services:
+            mgr_endpoints = json.loads(mgr_services)
+            for end_point in mgr_endpoints.values():
+                port = re.search('\:\d+\/', end_point)
+                if port:
+                    ports.append(int(port[0][1:-1]))
+
+        if ports:
+            daemon_spec.ports = ports
+
+        daemon_spec.keyring = keyring
+
+        return self.mgr._create_daemon(daemon_spec)
+    
+    def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+        active_mgr_str = self.mgr.get('mgr_map')['active_name']
+        for daemon in daemon_descrs:
+            if daemon.daemon_id == active_mgr_str:
+                return daemon
+        # if no active mgr found, return empty Daemon Desc
+        return DaemonDescription()
 
 
 class MdsService(CephadmService):
-    def config(self, spec: ServiceSpec):
-        # ensure mds_join_fs is set for these daemons
+    TYPE = 'mds'
+
+    def config(self, spec: ServiceSpec) -> None:
+        assert self.TYPE == spec.service_type
         assert spec.service_id
+
+        # ensure mds_join_fs is set for these daemons
         ret, out, err = self.mgr.check_mon_command({
             'prefix': 'config set',
             'who': 'mds.' + spec.service_id,
@@ -126,20 +378,47 @@ class MdsService(CephadmService):
             'value': spec.service_id,
         })
 
-    def create(self, mds_id, host) -> str:
+    def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+        assert self.TYPE == daemon_spec.daemon_type
+        mds_id, host = daemon_spec.daemon_id, daemon_spec.host
+
         # get mgr. key
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
             'entity': 'mds.' + mds_id,
             'caps': ['mon', 'profile mds',
-                     'osd', 'allow rwx',
+                     'osd', 'allow rw tag cephfs *=*',
                      'mds', 'allow'],
         })
-        return self.mgr._create_daemon('mds', mds_id, host, keyring=keyring)
+        daemon_spec.keyring = keyring
+
+        return self.mgr._create_daemon(daemon_spec)
+    
+    def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
+        active_mds_strs = list()
+        for fs in self.mgr.get('fs_map')['filesystems']:
+            mds_map = fs['mdsmap']
+            if mds_map is not None:
+                for mds_id, mds_status in mds_map['info'].items():
+                    if mds_status['state'] == 'up:active':
+                        active_mds_strs.append(mds_status['name'])
+        if len(active_mds_strs) != 0:
+            for daemon in daemon_descrs:
+                if daemon.daemon_id in active_mds_strs:
+                    return daemon
+        # if no mds found, return empty Daemon Desc
+        return DaemonDescription()
 
 
 class RgwService(CephadmService):
-    def config(self, spec: RGWSpec):
+    TYPE = 'rgw'
+
+    def config(self, spec: RGWSpec, rgw_id: str):
+        assert self.TYPE == spec.service_type
+
+        # create realm, zonegroup, and zone if needed
+        self.create_realm_zonegroup_zone(spec, rgw_id)
+
         # ensure rgw_realm and rgw_zone is set for these daemons
         ret, out, err = self.mgr.check_mon_command({
             'prefix': 'config set',
@@ -194,7 +473,17 @@ class RgwService(CephadmService):
             spec.service_name(), spec.placement.pretty_str()))
         self.mgr.spec_store.save(spec)
 
-    def create(self, rgw_id, host) -> str:
+    def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+        assert self.TYPE == daemon_spec.daemon_type
+        rgw_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+        keyring = self.get_keyring(rgw_id)
+
+        daemon_spec.keyring = keyring
+
+        return self.mgr._create_daemon(daemon_spec)
+
+    def get_keyring(self, rgw_id: str):
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
             'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
@@ -202,27 +491,159 @@ class RgwService(CephadmService):
                      'mgr', 'allow rw',
                      'osd', 'allow rwx'],
         })
-        return self.mgr._create_daemon('rgw', rgw_id, host, keyring=keyring)
+        return keyring
+
+    def create_realm_zonegroup_zone(self, spec: RGWSpec, rgw_id: str):
+        if utils.get_cluster_health(self.mgr) != 'HEALTH_OK':
+            raise OrchestratorError('Health not ok, will try agin when health ok')
+
+        #get keyring needed to run rados commands and strip out just the keyring
+        keyring = self.get_keyring(rgw_id).split('key = ',1)[1].rstrip()
+
+        # We can call radosgw-admin within the container, cause cephadm gives the MGR the required keyring permissions
+
+        def get_realms() -> List[str]:
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'realm', 'list',
+                   '--format=json']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            out = result.stdout
+            if not out:
+                return []
+            try:
+                j = json.loads(out)
+                return j.get('realms', [])
+            except Exception as e:
+                raise OrchestratorError('failed to parse realm info')
+
+        def create_realm():
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'realm', 'create',
+                   '--rgw-realm=%s'%spec.rgw_realm,
+                   '--default']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            self.mgr.log.info('created realm: %s'%spec.rgw_realm)
+
+        def get_zonegroups() -> List[str]:
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'zonegroup', 'list',
+                   '--format=json']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            out = result.stdout
+            if not out:
+                return []
+            try:
+                j = json.loads(out)
+                return j.get('zonegroups', [])
+            except Exception as e:
+                raise OrchestratorError('failed to parse zonegroup info')
+
+        def create_zonegroup():
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'zonegroup', 'create',
+                   '--rgw-zonegroup=default',
+                   '--master', '--default']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            self.mgr.log.info('created zonegroup: default')
+
+        def create_zonegroup_if_required():
+            zonegroups = get_zonegroups()
+            if 'default' not in zonegroups:
+                create_zonegroup()
+
+        def get_zones() -> List[str]:
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'zone', 'list',
+                   '--format=json']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            out = result.stdout
+            if not out:
+                return []
+            try:
+                j = json.loads(out)
+                return j.get('zones', [])
+            except Exception as e:
+                raise OrchestratorError('failed to parse zone info')
+
+        def create_zone():
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'zone', 'create',
+                   '--rgw-zonegroup=default',
+                   '--rgw-zone=%s'%spec.rgw_zone,
+                   '--master', '--default']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            self.mgr.log.info('created zone: %s'%spec.rgw_zone)
+
+
+        changes = False
+        realms = get_realms()
+        if spec.rgw_realm not in realms:
+            create_realm()
+            changes = True
+
+        zones = get_zones()
+        if spec.rgw_zone not in zones:
+            create_zonegroup_if_required()
+            create_zone()
+            changes = True
+
+        # update period if changes were made
+        if changes:
+            cmd = ['radosgw-admin',
+                   '--key=%s'%keyring,
+                   '--user', 'rgw.%s'%rgw_id,
+                   'period', 'update',
+                   '--rgw-realm=%s'%spec.rgw_realm,
+                   '--commit']
+            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+            self.mgr.log.info('updated period')
 
 
 class RbdMirrorService(CephadmService):
-    def create(self, daemon_id, host) -> str:
+    TYPE = 'rbd-mirror'
+
+    def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
             'entity': 'client.rbd-mirror.' + daemon_id,
             'caps': ['mon', 'profile rbd-mirror',
                      'osd', 'profile rbd'],
         })
-        return self.mgr._create_daemon('rbd-mirror', daemon_id, host,
-                                       keyring=keyring)
+
+        daemon_spec.keyring = keyring
+
+        return self.mgr._create_daemon(daemon_spec)
 
 
 class CrashService(CephadmService):
-    def create(self, daemon_id, host) -> str:
+    TYPE = 'crash'
+
+    def create(self, daemon_spec: CephadmDaemonSpec) -> str:
+        assert self.TYPE == daemon_spec.daemon_type
+        daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
         ret, keyring, err = self.mgr.check_mon_command({
             'prefix': 'auth get-or-create',
             'entity': 'client.crash.' + host,
             'caps': ['mon', 'profile crash',
                      'mgr', 'profile crash'],
         })
-        return self.mgr._create_daemon('crash', daemon_id, host, keyring=keyring)
+
+        daemon_spec.keyring = keyring
+
+        return self.mgr._create_daemon(daemon_spec)