]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
index 6f14a78cde3f504ae6026b5ad405687b3673b39e..b0ccf73570b7772f4d2957efea31d50e69c6886d 100644 (file)
@@ -15,10 +15,10 @@ import re
 
 from collections import namedtuple, OrderedDict
 from contextlib import contextmanager
-from functools import wraps, reduce
+from functools import wraps, reduce, update_wrapper
 
 from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
-    Sequence, Dict, cast
+    Sequence, Dict, cast, Mapping
 
 try:
     from typing import Protocol  # Protocol was added in Python 3.8
@@ -31,7 +31,7 @@ import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
-    IscsiServiceSpec, IngressSpec
+    IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.hostspec import HostSpec, SpecValidationError
 from ceph.utils import datetime_to_str, str_to_datetime
@@ -124,6 +124,10 @@ def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
         try:
             return OrchResult(f(*args, **kwargs))
         except Exception as e:
+            logger.exception(e)
+            import os
+            if 'UNITTEST' in os.environ:
+                raise  # This makes debugging of Tracebacks from unittests a bit easier
             return OrchResult(None, exception=e)
 
     return cast(Callable[..., OrchResult[T]], wrapper)
@@ -343,7 +347,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_host(self, host: str) -> OrchResult[str]:
+    def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
         """
         Remove a host from the orchestrator inventory.
 
@@ -351,6 +355,14 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+        """
+        drain all daemons from a host
+
+        :param hostname: hostname
+        """
+        raise NotImplementedError()
+
     def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
         """
         Update a host's address
@@ -368,13 +380,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
+    def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+        """
+        Return hosts metadata(gather_facts).
+        """
+        raise NotImplementedError()
+
     def add_host_label(self, host: str, label: str) -> OrchResult[str]:
         """
         Add a host label
         """
         raise NotImplementedError()
 
-    def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
+    def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a host label
         """
@@ -448,11 +466,13 @@ class Orchestrator(object):
             'node-exporter': self.apply_node_exporter,
             'osd': lambda dg: self.apply_drivegroups([dg]),  # type: ignore
             'prometheus': self.apply_prometheus,
+            'loki': self.apply_loki,
+            'promtail': self.apply_promtail,
             'rbd-mirror': self.apply_rbd_mirror,
             'rgw': self.apply_rgw,
             'ingress': self.apply_ingress,
+            'snmp-gateway': self.apply_snmp_gateway,
             'host': self.add_host,
-            'cephadm-exporter': self.apply_cephadm_exporter,
         }
 
         def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]:  # noqa: E741
@@ -476,7 +496,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_service(self, service_name: str) -> OrchResult[str]:
+    def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a service (a collection of daemons).
 
@@ -540,13 +560,17 @@ class Orchestrator(object):
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> OrchResult[str]:
+                    force: bool = False,
+                    zap: bool = False) -> OrchResult[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
         :param force: Forces the OSD removal process without waiting for the data to be drained first.
-        Note that this can only remove OSDs that were successfully
-        created (i.e. got an OSD ID).
+        :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+
+        .. note:: this can only remove OSDs that were successfully
+            created (i.e. got an OSD ID).
         """
         raise NotImplementedError()
 
@@ -588,7 +612,7 @@ class Orchestrator(object):
         """Update mgr cluster"""
         raise NotImplementedError()
 
-    def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
+    def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
         """Update MDS cluster"""
         raise NotImplementedError()
 
@@ -620,6 +644,14 @@ class Orchestrator(object):
         """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
+    def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Loki daemon(s)"""
+        raise NotImplementedError()
+
+    def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Promtail daemon(s)"""
+        raise NotImplementedError()
+
     def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
@@ -632,14 +664,18 @@ class Orchestrator(object):
         """Update an existing AlertManager daemon(s)"""
         raise NotImplementedError()
 
-    def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
-        """Update an existing cephadm exporter daemon"""
+    def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+        """Update an existing snmp gateway service"""
         raise NotImplementedError()
 
     def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
+    def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
+        raise NotImplementedError()
+
+    def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
         raise NotImplementedError()
 
     def upgrade_pause(self) -> OrchResult[str]:
@@ -697,10 +733,13 @@ def daemon_type_to_service(dtype: str) -> str:
         'alertmanager': 'alertmanager',
         'prometheus': 'prometheus',
         'node-exporter': 'node-exporter',
+        'loki': 'loki',
+        'promtail': 'promtail',
         'crash': 'crash',
         'crashcollector': 'crash',  # Specific Rook Daemon
         'container': 'container',
-        'cephadm-exporter': 'cephadm-exporter',
+        'agent': 'agent',
+        'snmp-gateway': 'snmp-gateway',
     }
     return mapping[dtype]
 
@@ -720,20 +759,28 @@ def service_to_daemon_types(stype: str) -> List[str]:
         'grafana': ['grafana'],
         'alertmanager': ['alertmanager'],
         'prometheus': ['prometheus'],
+        'loki': ['loki'],
+        'promtail': ['promtail'],
         'node-exporter': ['node-exporter'],
         'crash': ['crash'],
         'container': ['container'],
-        'cephadm-exporter': ['cephadm-exporter'],
+        'agent': ['agent'],
+        'snmp-gateway': ['snmp-gateway'],
     }
     return mapping[stype]
 
 
+KNOWN_DAEMON_TYPES: List[str] = list(
+    sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
 class UpgradeStatusSpec(object):
     # Orchestrator's report on what's going on with any ongoing upgrade
     def __init__(self) -> None:
         self.in_progress = False  # Is an upgrade underway?
         self.target_image: Optional[str] = None
         self.services_complete: List[str] = []  # Which daemon types are fully updated?
+        self.which: str = '<unknown>'  # for if user specified daemon types, services or hosts
         self.progress: Optional[str] = None  # How many of the daemons have we upgraded
         self.message = ""  # Freeform description
 
@@ -750,9 +797,23 @@ def handle_type_error(method: FuncT) -> FuncT:
 
 
 class DaemonDescriptionStatus(enum.IntEnum):
+    unknown = -2
     error = -1
     stopped = 0
     running = 1
+    starting = 2  #: Daemon is deployed, but not yet running
+
+    @staticmethod
+    def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+        if status is None:
+            status = DaemonDescriptionStatus.unknown
+        return {
+            DaemonDescriptionStatus.unknown: 'unknown',
+            DaemonDescriptionStatus.error: 'error',
+            DaemonDescriptionStatus.stopped: 'stopped',
+            DaemonDescriptionStatus.running: 'running',
+            DaemonDescriptionStatus.starting: 'starting',
+        }.get(status, '<unknown>')
 
 
 class DaemonDescription(object):
@@ -790,13 +851,17 @@ class DaemonDescription(object):
                  memory_usage: Optional[int] = None,
                  memory_request: Optional[int] = None,
                  memory_limit: Optional[int] = None,
+                 cpu_percentage: Optional[str] = None,
                  service_name: Optional[str] = None,
                  ports: Optional[List[int]] = None,
                  ip: Optional[str] = None,
                  deployed_by: Optional[List[str]] = None,
+                 rank: Optional[int] = None,
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None,
                  ) -> None:
 
-        # Host is at the same granularity as InventoryHost
+        #: Host is at the same granularity as InventoryHost
         self.hostname: Optional[str] = hostname
 
         # Not everyone runs in containers, but enough people do to
@@ -807,27 +872,32 @@ class DaemonDescription(object):
         self.container_image_name = container_image_name  # image friendly name
         self.container_image_digests = container_image_digests  # reg hashes
 
-        # The type of service (osd, mon, mgr, etc.)
+        #: The type of service (osd, mon, mgr, etc.)
         self.daemon_type = daemon_type
 
-        # The orchestrator will have picked some names for daemons,
-        # typically either based on hostnames or on pod names.
-        # This is the <foo> in mds.<foo>, the ID that will appear
-        # in the FSMap/ServiceMap.
+        #: The orchestrator will have picked some names for daemons,
+        #: typically either based on hostnames or on pod names.
+        #: This is the <foo> in mds.<foo>, the ID that will appear
+        #: in the FSMap/ServiceMap.
         self.daemon_id: Optional[str] = daemon_id
+        self.daemon_name = self.name()
+
+        #: Some daemon types have a numeric rank assigned
+        self.rank: Optional[int] = rank
+        self.rank_generation: Optional[int] = rank_generation
 
         self._service_name: Optional[str] = service_name
 
-        # Service version that was deployed
+        #: Service version that was deployed
         self.version = version
 
-        # Service status: -1 error, 0 stopped, 1 running
-        self.status = status
+        # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+        self._status = status
 
-        # Service status description when status == error.
+        #: Service status description when status == error.
         self.status_desc = status_desc
 
-        # datetime when this info was last refreshed
+        #: datetime when this info was last refreshed
         self.last_refresh: Optional[datetime.datetime] = last_refresh
 
         self.created: Optional[datetime.datetime] = created
@@ -835,7 +905,7 @@ class DaemonDescription(object):
         self.last_configured: Optional[datetime.datetime] = last_configured
         self.last_deployed: Optional[datetime.datetime] = last_deployed
 
-        # Affinity to a certain OSDSpec
+        #: Affinity to a certain OSDSpec
         self.osdspec_affinity: Optional[str] = osdspec_affinity
 
         self.events: List[OrchestratorEvent] = events or []
@@ -844,6 +914,8 @@ class DaemonDescription(object):
         self.memory_request: Optional[int] = memory_request
         self.memory_limit: Optional[int] = memory_limit
 
+        self.cpu_percentage: Optional[str] = cpu_percentage
+
         self.ports: Optional[List[int]] = ports
         self.ip: Optional[str] = ip
 
@@ -851,6 +923,17 @@ class DaemonDescription(object):
 
         self.is_active = is_active
 
+        self.extra_container_args = extra_container_args
+
+    @property
+    def status(self) -> Optional[DaemonDescriptionStatus]:
+        return self._status
+
+    @status.setter
+    def status(self, new: DaemonDescriptionStatus) -> None:
+        self._status = new
+        self.status_desc = DaemonDescriptionStatus.to_str(new)
+
     def get_port_summary(self) -> str:
         if not self.ports:
             return ''
@@ -879,7 +962,7 @@ class DaemonDescription(object):
         if self.daemon_type == 'osd':
             if self.osdspec_affinity and self.osdspec_affinity != 'None':
                 return self.osdspec_affinity
-            return 'unmanaged'
+            return ''
 
         def _match() -> str:
             assert self.daemon_id is not None
@@ -939,10 +1022,15 @@ class DaemonDescription(object):
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
+    def __str__(self) -> str:
+        return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
     def to_json(self) -> dict:
         out: Dict[str, Any] = OrderedDict()
         out['daemon_type'] = self.daemon_type
         out['daemon_id'] = self.daemon_id
+        out['service_name'] = self._service_name
+        out['daemon_name'] = self.name()
         out['hostname'] = self.hostname
         out['container_id'] = self.container_id
         out['container_image_id'] = self.container_image_id
@@ -951,6 +1039,7 @@ class DaemonDescription(object):
         out['memory_usage'] = self.memory_usage
         out['memory_request'] = self.memory_request
         out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
         out['version'] = self.version
         out['status'] = self.status.value if self.status is not None else None
         out['status_desc'] = self.status_desc
@@ -959,6 +1048,8 @@ class DaemonDescription(object):
         out['is_active'] = self.is_active
         out['ports'] = self.ports
         out['ip'] = self.ip
+        out['rank'] = self.rank
+        out['rank_generation'] = self.rank_generation
 
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
@@ -973,6 +1064,42 @@ class DaemonDescription(object):
             del out[e]
         return out
 
+    def to_dict(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
+        out['daemon_type'] = self.daemon_type
+        out['daemon_id'] = self.daemon_id
+        out['daemon_name'] = self.name()
+        out['hostname'] = self.hostname
+        out['container_id'] = self.container_id
+        out['container_image_id'] = self.container_image_id
+        out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
+        out['version'] = self.version
+        out['status'] = self.status.value if self.status is not None else None
+        out['status_desc'] = self.status_desc
+        if self.daemon_type == 'osd':
+            out['osdspec_affinity'] = self.osdspec_affinity
+        out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+
+        for k in ['last_refresh', 'created', 'started', 'last_deployed',
+                  'last_configured']:
+            if getattr(self, k):
+                out[k] = datetime_to_str(getattr(self, k))
+
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+
+        empty = [k for k, v in out.items() if v is None]
+        for e in empty:
+            del out[e]
+        return out
+
     @classmethod
     @handle_type_error
     def from_json(cls, data: dict) -> 'DaemonDescription':
@@ -984,6 +1111,17 @@ class DaemonDescription(object):
                 c[k] = str_to_datetime(c[k])
         events = [OrchestratorEvent.from_json(e) for e in event_strs]
         status_int = c.pop('status', None)
+        if 'daemon_name' in c:
+            del c['daemon_name']
+        if 'service_name' in c and c['service_name'].startswith('osd.'):
+            # if the service_name is a osd.NNN (numeric osd id) then
+            # ignore it -- it is not a valid service_name and
+            # (presumably) came from an older version of cephadm.
+            try:
+                int(c['service_name'][4:])
+                del c['service_name']
+            except ValueError:
+                pass
         status = DaemonDescriptionStatus(status_int) if status_int is not None else None
         return cls(events=events, status=status, **c)
 
@@ -993,7 +1131,7 @@ class DaemonDescription(object):
 
     @staticmethod
     def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
-        return dumper.represent_dict(data.to_json().items())
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
 
 
 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
@@ -1016,7 +1154,6 @@ class ServiceDescription(object):
                  spec: ServiceSpec,
                  container_image_id: Optional[str] = None,
                  container_image_name: Optional[str] = None,
-                 rados_config_location: Optional[str] = None,
                  service_url: Optional[str] = None,
                  last_refresh: Optional[datetime.datetime] = None,
                  created: Optional[datetime.datetime] = None,
@@ -1032,10 +1169,6 @@ class ServiceDescription(object):
         self.container_image_id = container_image_id      # image hash
         self.container_image_name = container_image_name  # image friendly name
 
-        # Location of the service configuration when stored in rados
-        # object. Format: "rados://<pool>/[<namespace/>]<object>"
-        self.rados_config_location = rados_config_location
-
         # If the service exposes REST-like API, this attribute should hold
         # the URL.
         self.service_url = service_url
@@ -1074,7 +1207,6 @@ class ServiceDescription(object):
         status = {
             'container_image_id': self.container_image_id,
             'container_image_name': self.container_image_name,
-            'rados_config_location': self.rados_config_location,
             'service_url': self.service_url,
             'size': self.size,
             'running': self.running,
@@ -1092,6 +1224,28 @@ class ServiceDescription(object):
             out['events'] = [e.to_json() for e in self.events]
         return out
 
+    def to_dict(self) -> OrderedDict:
+        out = self.spec.to_json()
+        status = {
+            'container_image_id': self.container_image_id,
+            'container_image_name': self.container_image_name,
+            'service_url': self.service_url,
+            'size': self.size,
+            'running': self.running,
+            'last_refresh': self.last_refresh,
+            'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
+        }
+        for k in ['last_refresh', 'created']:
+            if getattr(self, k):
+                status[k] = datetime_to_str(getattr(self, k))
+        status = {k: v for (k, v) in status.items() if v is not None}
+        out['status'] = status
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+        return out
+
     @classmethod
     @handle_type_error
     def from_json(cls, data: dict) -> 'ServiceDescription':
@@ -1108,8 +1262,8 @@ class ServiceDescription(object):
         return cls(spec=spec, events=events, **c_status)
 
     @staticmethod
-    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
-        return dumper.represent_dict(data.to_json().items())
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
 
 
 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
@@ -1120,13 +1274,14 @@ class InventoryFilter(object):
     When fetching inventory, use this filter to avoid unnecessarily
     scanning the whole estate.
 
-    Typical use: filter by host when presenting UI workflow for configuring
-                 a particular server.
-                 filter by label when not all of estate is Ceph servers,
-                 and we want to only learn about the Ceph servers.
-                 filter by label when we are interested particularly
-                 in e.g. OSD servers.
+    Typical use:
 
+      filter by host when presentig UI workflow for configuring
+      a particular server.
+      filter by label when not all of estate is Ceph servers,
+      and we want to only learn about the Ceph servers.
+      filter by label when we are interested particularly
+      in e.g. OSD servers.
     """
 
     def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
@@ -1249,6 +1404,15 @@ class OrchestratorEvent:
         created = datetime_to_str(self.created)
         return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
 
+    def to_dict(self) -> dict:
+        # Convert events data to dict.
+        return {
+            'created': datetime_to_str(self.created),
+            'subject': self.kind_subject(),
+            'level': self.level,
+            'message': self.message
+        }
+
     @classmethod
     @handle_type_error
     def from_json(cls, data: str) -> "OrchestratorEvent":
@@ -1271,6 +1435,9 @@ class OrchestratorEvent:
         return self.created == other.created and self.kind == other.kind \
             and self.subject == other.subject and self.message == other.message
 
+    def __repr__(self) -> str:
+        return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
 
 def _mk_orch_methods(cls: Any) -> Any:
     # Needs to be defined outside of for.
@@ -1281,9 +1448,10 @@ def _mk_orch_methods(cls: Any) -> Any:
             return completion
         return inner
 
-    for meth in Orchestrator.__dict__:
-        if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
-            setattr(cls, meth, shim(meth))
+    for name, method in Orchestrator.__dict__.items():
+        if not name.startswith('_') and name not in ['is_orchestrator_module']:
+            remote_call = update_wrapper(shim(name), method)
+            setattr(cls, name, remote_call)
     return cls