from collections import namedtuple, OrderedDict
from contextlib import contextmanager
-from functools import wraps, reduce
+from functools import wraps, reduce, update_wrapper
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
- Sequence, Dict, cast
+ Sequence, Dict, cast, Mapping
try:
from typing import Protocol # Protocol was added in Python 3.8
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- IscsiServiceSpec, IngressSpec
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec, SpecValidationError
from ceph.utils import datetime_to_str, str_to_datetime
try:
return OrchResult(f(*args, **kwargs))
except Exception as e:
+ logger.exception(e)
+ import os
+ if 'UNITTEST' in os.environ:
+ raise # This makes debugging of Tracebacks from unittests a bit easier
return OrchResult(None, exception=e)
return cast(Callable[..., OrchResult[T]], wrapper)
"""
raise NotImplementedError()
- def remove_host(self, host: str) -> OrchResult[str]:
+ def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
"""
Remove a host from the orchestrator inventory.
"""
raise NotImplementedError()
+ def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+ """
+ drain all daemons from a host
+
+ :param hostname: hostname
+ """
+ raise NotImplementedError()
+
def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
"""
Update a host's address
"""
raise NotImplementedError()
+ def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+ """
+ Return hosts metadata(gather_facts).
+ """
+ raise NotImplementedError()
+
def add_host_label(self, host: str, label: str) -> OrchResult[str]:
"""
Add a host label
"""
raise NotImplementedError()
- def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
"""
Remove a host label
"""
'node-exporter': self.apply_node_exporter,
'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
'prometheus': self.apply_prometheus,
+ 'loki': self.apply_loki,
+ 'promtail': self.apply_promtail,
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
'ingress': self.apply_ingress,
+ 'snmp-gateway': self.apply_snmp_gateway,
'host': self.add_host,
- 'cephadm-exporter': self.apply_cephadm_exporter,
}
def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
"""
raise NotImplementedError()
- def remove_service(self, service_name: str) -> OrchResult[str]:
+ def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
"""
Remove a service (a collection of daemons).
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> OrchResult[str]:
+ force: bool = False,
+ zap: bool = False) -> OrchResult[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
:param force: Forces the OSD removal process without waiting for the data to be drained first.
- Note that this can only remove OSDs that were successfully
- created (i.e. got an OSD ID).
+ :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+
+ .. note:: this can only remove OSDs that were successfully
+ created (i.e. got an OSD ID).
"""
raise NotImplementedError()
"""Update mgr cluster"""
raise NotImplementedError()
- def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
+ def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
"""Update MDS cluster"""
raise NotImplementedError()
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
+ def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Loki daemon(s)"""
+ raise NotImplementedError()
+
+ def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Promtail daemon(s)"""
+ raise NotImplementedError()
+
def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
- """Update an existing cephadm exporter daemon"""
+ def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+ """Update an existing snmp gateway service"""
raise NotImplementedError()
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
+ def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
+ raise NotImplementedError()
+
+ def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+ hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
raise NotImplementedError()
def upgrade_pause(self) -> OrchResult[str]:
'alertmanager': 'alertmanager',
'prometheus': 'prometheus',
'node-exporter': 'node-exporter',
+ 'loki': 'loki',
+ 'promtail': 'promtail',
'crash': 'crash',
'crashcollector': 'crash', # Specific Rook Daemon
'container': 'container',
- 'cephadm-exporter': 'cephadm-exporter',
+ 'agent': 'agent',
+ 'snmp-gateway': 'snmp-gateway',
}
return mapping[dtype]
'grafana': ['grafana'],
'alertmanager': ['alertmanager'],
'prometheus': ['prometheus'],
+ 'loki': ['loki'],
+ 'promtail': ['promtail'],
'node-exporter': ['node-exporter'],
'crash': ['crash'],
'container': ['container'],
- 'cephadm-exporter': ['cephadm-exporter'],
+ 'agent': ['agent'],
+ 'snmp-gateway': ['snmp-gateway'],
}
return mapping[stype]
+KNOWN_DAEMON_TYPES: List[str] = list(
+ sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
def __init__(self) -> None:
self.in_progress = False # Is an upgrade underway?
self.target_image: Optional[str] = None
self.services_complete: List[str] = [] # Which daemon types are fully updated?
+ self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
self.progress: Optional[str] = None # How many of the daemons have we upgraded
self.message = "" # Freeform description
class DaemonDescriptionStatus(enum.IntEnum):
+ unknown = -2
error = -1
stopped = 0
running = 1
+ starting = 2 #: Daemon is deployed, but not yet running
+
+ @staticmethod
+ def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+ if status is None:
+ status = DaemonDescriptionStatus.unknown
+ return {
+ DaemonDescriptionStatus.unknown: 'unknown',
+ DaemonDescriptionStatus.error: 'error',
+ DaemonDescriptionStatus.stopped: 'stopped',
+ DaemonDescriptionStatus.running: 'running',
+ DaemonDescriptionStatus.starting: 'starting',
+ }.get(status, '<unknown>')
class DaemonDescription(object):
memory_usage: Optional[int] = None,
memory_request: Optional[int] = None,
memory_limit: Optional[int] = None,
+ cpu_percentage: Optional[str] = None,
service_name: Optional[str] = None,
ports: Optional[List[int]] = None,
ip: Optional[str] = None,
deployed_by: Optional[List[str]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None,
) -> None:
- # Host is at the same granularity as InventoryHost
+ #: Host is at the same granularity as InventoryHost
self.hostname: Optional[str] = hostname
# Not everyone runs in containers, but enough people do to
self.container_image_name = container_image_name # image friendly name
self.container_image_digests = container_image_digests # reg hashes
- # The type of service (osd, mon, mgr, etc.)
+ #: The type of service (osd, mon, mgr, etc.)
self.daemon_type = daemon_type
- # The orchestrator will have picked some names for daemons,
- # typically either based on hostnames or on pod names.
- # This is the <foo> in mds.<foo>, the ID that will appear
- # in the FSMap/ServiceMap.
+ #: The orchestrator will have picked some names for daemons,
+ #: typically either based on hostnames or on pod names.
+ #: This is the <foo> in mds.<foo>, the ID that will appear
+ #: in the FSMap/ServiceMap.
self.daemon_id: Optional[str] = daemon_id
+ self.daemon_name = self.name()
+
+ #: Some daemon types have a numeric rank assigned
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
self._service_name: Optional[str] = service_name
- # Service version that was deployed
+ #: Service version that was deployed
self.version = version
- # Service status: -1 error, 0 stopped, 1 running
- self.status = status
+ # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+ self._status = status
- # Service status description when status == error.
+ #: Service status description when status == error.
self.status_desc = status_desc
- # datetime when this info was last refreshed
+ #: datetime when this info was last refreshed
self.last_refresh: Optional[datetime.datetime] = last_refresh
self.created: Optional[datetime.datetime] = created
self.last_configured: Optional[datetime.datetime] = last_configured
self.last_deployed: Optional[datetime.datetime] = last_deployed
- # Affinity to a certain OSDSpec
+ #: Affinity to a certain OSDSpec
self.osdspec_affinity: Optional[str] = osdspec_affinity
self.events: List[OrchestratorEvent] = events or []
self.memory_request: Optional[int] = memory_request
self.memory_limit: Optional[int] = memory_limit
+ self.cpu_percentage: Optional[str] = cpu_percentage
+
self.ports: Optional[List[int]] = ports
self.ip: Optional[str] = ip
self.is_active = is_active
+ self.extra_container_args = extra_container_args
+
+ @property
+ def status(self) -> Optional[DaemonDescriptionStatus]:
+ return self._status
+
+ @status.setter
+ def status(self, new: DaemonDescriptionStatus) -> None:
+ self._status = new
+ self.status_desc = DaemonDescriptionStatus.to_str(new)
+
def get_port_summary(self) -> str:
if not self.ports:
return ''
if self.daemon_type == 'osd':
if self.osdspec_affinity and self.osdspec_affinity != 'None':
return self.osdspec_affinity
- return 'unmanaged'
+ return ''
def _match() -> str:
assert self.daemon_id is not None
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)
+ def __str__(self) -> str:
+ return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
def to_json(self) -> dict:
out: Dict[str, Any] = OrderedDict()
out['daemon_type'] = self.daemon_type
out['daemon_id'] = self.daemon_id
+ out['service_name'] = self._service_name
+ out['daemon_name'] = self.name()
out['hostname'] = self.hostname
out['container_id'] = self.container_id
out['container_image_id'] = self.container_image_id
out['memory_usage'] = self.memory_usage
out['memory_request'] = self.memory_request
out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
out['version'] = self.version
out['status'] = self.status.value if self.status is not None else None
out['status_desc'] = self.status_desc
out['is_active'] = self.is_active
out['ports'] = self.ports
out['ip'] = self.ip
+ out['rank'] = self.rank
+ out['rank_generation'] = self.rank_generation
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
del out[e]
return out
+ def to_dict(self) -> dict:
+ out: Dict[str, Any] = OrderedDict()
+ out['daemon_type'] = self.daemon_type
+ out['daemon_id'] = self.daemon_id
+ out['daemon_name'] = self.name()
+ out['hostname'] = self.hostname
+ out['container_id'] = self.container_id
+ out['container_image_id'] = self.container_image_id
+ out['container_image_name'] = self.container_image_name
+ out['container_image_digests'] = self.container_image_digests
+ out['memory_usage'] = self.memory_usage
+ out['memory_request'] = self.memory_request
+ out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
+ out['version'] = self.version
+ out['status'] = self.status.value if self.status is not None else None
+ out['status_desc'] = self.status_desc
+ if self.daemon_type == 'osd':
+ out['osdspec_affinity'] = self.osdspec_affinity
+ out['is_active'] = self.is_active
+ out['ports'] = self.ports
+ out['ip'] = self.ip
+
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if getattr(self, k):
+ out[k] = datetime_to_str(getattr(self, k))
+
+ if self.events:
+ out['events'] = [e.to_dict() for e in self.events]
+
+ empty = [k for k, v in out.items() if v is None]
+ for e in empty:
+ del out[e]
+ return out
+
@classmethod
@handle_type_error
def from_json(cls, data: dict) -> 'DaemonDescription':
c[k] = str_to_datetime(c[k])
events = [OrchestratorEvent.from_json(e) for e in event_strs]
status_int = c.pop('status', None)
+ if 'daemon_name' in c:
+ del c['daemon_name']
+ if 'service_name' in c and c['service_name'].startswith('osd.'):
+ # if the service_name is a osd.NNN (numeric osd id) then
+ # ignore it -- it is not a valid service_name and
+ # (presumably) came from an older version of cephadm.
+ try:
+ int(c['service_name'][4:])
+ del c['service_name']
+ except ValueError:
+ pass
status = DaemonDescriptionStatus(status_int) if status_int is not None else None
return cls(events=events, status=status, **c)
@staticmethod
def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
- return dumper.represent_dict(data.to_json().items())
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
spec: ServiceSpec,
container_image_id: Optional[str] = None,
container_image_name: Optional[str] = None,
- rados_config_location: Optional[str] = None,
service_url: Optional[str] = None,
last_refresh: Optional[datetime.datetime] = None,
created: Optional[datetime.datetime] = None,
self.container_image_id = container_image_id # image hash
self.container_image_name = container_image_name # image friendly name
- # Location of the service configuration when stored in rados
- # object. Format: "rados://<pool>/[<namespace/>]<object>"
- self.rados_config_location = rados_config_location
-
# If the service exposes REST-like API, this attribute should hold
# the URL.
self.service_url = service_url
status = {
'container_image_id': self.container_image_id,
'container_image_name': self.container_image_name,
- 'rados_config_location': self.rados_config_location,
'service_url': self.service_url,
'size': self.size,
'running': self.running,
out['events'] = [e.to_json() for e in self.events]
return out
+ def to_dict(self) -> OrderedDict:
+ out = self.spec.to_json()
+ status = {
+ 'container_image_id': self.container_image_id,
+ 'container_image_name': self.container_image_name,
+ 'service_url': self.service_url,
+ 'size': self.size,
+ 'running': self.running,
+ 'last_refresh': self.last_refresh,
+ 'created': self.created,
+ 'virtual_ip': self.virtual_ip,
+ 'ports': self.ports if self.ports else None,
+ }
+ for k in ['last_refresh', 'created']:
+ if getattr(self, k):
+ status[k] = datetime_to_str(getattr(self, k))
+ status = {k: v for (k, v) in status.items() if v is not None}
+ out['status'] = status
+ if self.events:
+ out['events'] = [e.to_dict() for e in self.events]
+ return out
+
@classmethod
@handle_type_error
def from_json(cls, data: dict) -> 'ServiceDescription':
return cls(spec=spec, events=events, **c_status)
@staticmethod
- def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
- return dumper.represent_dict(data.to_json().items())
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
When fetching inventory, use this filter to avoid unnecessarily
scanning the whole estate.
- Typical use: filter by host when presenting UI workflow for configuring
- a particular server.
- filter by label when not all of estate is Ceph servers,
- and we want to only learn about the Ceph servers.
- filter by label when we are interested particularly
- in e.g. OSD servers.
+ Typical use:
+ filter by host when presentig UI workflow for configuring
+ a particular server.
+ filter by label when not all of estate is Ceph servers,
+ and we want to only learn about the Ceph servers.
+ filter by label when we are interested particularly
+ in e.g. OSD servers.
"""
def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
created = datetime_to_str(self.created)
return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
+ def to_dict(self) -> dict:
+ # Convert events data to dict.
+ return {
+ 'created': datetime_to_str(self.created),
+ 'subject': self.kind_subject(),
+ 'level': self.level,
+ 'message': self.message
+ }
+
@classmethod
@handle_type_error
def from_json(cls, data: str) -> "OrchestratorEvent":
return self.created == other.created and self.kind == other.kind \
and self.subject == other.subject and self.message == other.message
+ def __repr__(self) -> str:
+ return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
def _mk_orch_methods(cls: Any) -> Any:
# Needs to be defined outside of for.
return completion
return inner
- for meth in Orchestrator.__dict__:
- if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
- setattr(cls, meth, shim(meth))
+ for name, method in Orchestrator.__dict__.items():
+ if not name.startswith('_') and name not in ['is_orchestrator_module']:
+ remote_call = update_wrapper(shim(name), method)
+ setattr(cls, name, remote_call)
return cls