from collections import namedtuple, OrderedDict
from contextlib import contextmanager
-from functools import wraps, reduce
+from functools import wraps, reduce, update_wrapper
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
Sequence, Dict, cast, Mapping
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- IscsiServiceSpec, IngressSpec
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.hostspec import HostSpec, SpecValidationError
from ceph.utils import datetime_to_str, str_to_datetime
return OrchResult(f(*args, **kwargs))
except Exception as e:
logger.exception(e)
+ import os
+ if 'UNITTEST' in os.environ:
+ raise # This makes debugging of Tracebacks from unittests a bit easier
return OrchResult(None, exception=e)
return cast(Callable[..., OrchResult[T]], wrapper)
"""
raise NotImplementedError()
- def drain_host(self, hostname: str) -> OrchResult[str]:
+ def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
"""
drain all daemons from a host
"""
raise NotImplementedError()
- def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
"""
Remove a host label
"""
'node-exporter': self.apply_node_exporter,
'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
'prometheus': self.apply_prometheus,
+ 'loki': self.apply_loki,
+ 'promtail': self.apply_promtail,
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
'ingress': self.apply_ingress,
+ 'snmp-gateway': self.apply_snmp_gateway,
'host': self.add_host,
- 'cephadm-exporter': self.apply_cephadm_exporter,
}
def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
:param force: Forces the OSD removal process without waiting for the data to be drained first.
:param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
- Note that this can only remove OSDs that were successfully
- created (i.e. got an OSD ID).
+
+
+ .. note:: this can only remove OSDs that were successfully
+ created (i.e. got an OSD ID).
"""
raise NotImplementedError()
"""Update mgr cluster"""
raise NotImplementedError()
- def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]:
+ def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
"""Update MDS cluster"""
raise NotImplementedError()
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
+ def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Loki daemon(s)"""
+ raise NotImplementedError()
+
+ def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Promtail daemon(s)"""
+ raise NotImplementedError()
+
def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def apply_cephadm_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
- """Update an existing cephadm exporter daemon"""
+ def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+ """Update an existing snmp gateway service"""
raise NotImplementedError()
def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_ls(self, image: Optional[str], tags: bool) -> OrchResult[Dict[Any, Any]]:
+ def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
raise NotImplementedError()
- def upgrade_start(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
+ def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+ hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
raise NotImplementedError()
def upgrade_pause(self) -> OrchResult[str]:
'alertmanager': 'alertmanager',
'prometheus': 'prometheus',
'node-exporter': 'node-exporter',
+ 'loki': 'loki',
+ 'promtail': 'promtail',
'crash': 'crash',
'crashcollector': 'crash', # Specific Rook Daemon
'container': 'container',
- 'cephadm-exporter': 'cephadm-exporter',
+ 'agent': 'agent',
+ 'snmp-gateway': 'snmp-gateway',
}
return mapping[dtype]
'grafana': ['grafana'],
'alertmanager': ['alertmanager'],
'prometheus': ['prometheus'],
+ 'loki': ['loki'],
+ 'promtail': ['promtail'],
'node-exporter': ['node-exporter'],
'crash': ['crash'],
'container': ['container'],
- 'cephadm-exporter': ['cephadm-exporter'],
+ 'agent': ['agent'],
+ 'snmp-gateway': ['snmp-gateway'],
}
return mapping[stype]
self.in_progress = False # Is an upgrade underway?
self.target_image: Optional[str] = None
self.services_complete: List[str] = [] # Which daemon types are fully updated?
+ self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
self.progress: Optional[str] = None # How many of the daemons have we upgraded
self.message = "" # Freeform description
class DaemonDescriptionStatus(enum.IntEnum):
+ unknown = -2
error = -1
stopped = 0
running = 1
+ starting = 2 #: Daemon is deployed, but not yet running
+
+ @staticmethod
+ def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+ if status is None:
+ status = DaemonDescriptionStatus.unknown
+ return {
+ DaemonDescriptionStatus.unknown: 'unknown',
+ DaemonDescriptionStatus.error: 'error',
+ DaemonDescriptionStatus.stopped: 'stopped',
+ DaemonDescriptionStatus.running: 'running',
+ DaemonDescriptionStatus.starting: 'starting',
+ }.get(status, '<unknown>')
class DaemonDescription(object):
memory_usage: Optional[int] = None,
memory_request: Optional[int] = None,
memory_limit: Optional[int] = None,
+ cpu_percentage: Optional[str] = None,
service_name: Optional[str] = None,
ports: Optional[List[int]] = None,
ip: Optional[str] = None,
deployed_by: Optional[List[str]] = None,
rank: Optional[int] = None,
rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None,
) -> None:
- # Host is at the same granularity as InventoryHost
+ #: Host is at the same granularity as InventoryHost
self.hostname: Optional[str] = hostname
# Not everyone runs in containers, but enough people do to
self.container_image_name = container_image_name # image friendly name
self.container_image_digests = container_image_digests # reg hashes
- # The type of service (osd, mon, mgr, etc.)
+ #: The type of service (osd, mon, mgr, etc.)
self.daemon_type = daemon_type
- # The orchestrator will have picked some names for daemons,
- # typically either based on hostnames or on pod names.
- # This is the <foo> in mds.<foo>, the ID that will appear
- # in the FSMap/ServiceMap.
+ #: The orchestrator will have picked some names for daemons,
+ #: typically either based on hostnames or on pod names.
+ #: This is the <foo> in mds.<foo>, the ID that will appear
+ #: in the FSMap/ServiceMap.
self.daemon_id: Optional[str] = daemon_id
self.daemon_name = self.name()
- # Some daemon types have a numeric rank assigned
+ #: Some daemon types have a numeric rank assigned
self.rank: Optional[int] = rank
self.rank_generation: Optional[int] = rank_generation
self._service_name: Optional[str] = service_name
- # Service version that was deployed
+ #: Service version that was deployed
self.version = version
- # Service status: -1 error, 0 stopped, 1 running
- self.status = status
+ # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+ self._status = status
- # Service status description when status == error.
+ #: Service status description when status == error.
self.status_desc = status_desc
- # datetime when this info was last refreshed
+ #: datetime when this info was last refreshed
self.last_refresh: Optional[datetime.datetime] = last_refresh
self.created: Optional[datetime.datetime] = created
self.last_configured: Optional[datetime.datetime] = last_configured
self.last_deployed: Optional[datetime.datetime] = last_deployed
- # Affinity to a certain OSDSpec
+ #: Affinity to a certain OSDSpec
self.osdspec_affinity: Optional[str] = osdspec_affinity
self.events: List[OrchestratorEvent] = events or []
self.memory_request: Optional[int] = memory_request
self.memory_limit: Optional[int] = memory_limit
+ self.cpu_percentage: Optional[str] = cpu_percentage
+
self.ports: Optional[List[int]] = ports
self.ip: Optional[str] = ip
self.is_active = is_active
+ self.extra_container_args = extra_container_args
+
+ @property
+ def status(self) -> Optional[DaemonDescriptionStatus]:
+ return self._status
+
+ @status.setter
+ def status(self, new: DaemonDescriptionStatus) -> None:
+ self._status = new
+ self.status_desc = DaemonDescriptionStatus.to_str(new)
+
def get_port_summary(self) -> str:
if not self.ports:
return ''
if self.daemon_type == 'osd':
if self.osdspec_affinity and self.osdspec_affinity != 'None':
return self.osdspec_affinity
- return 'unmanaged'
+ return ''
def _match() -> str:
assert self.daemon_id is not None
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)
+ def __str__(self) -> str:
+ return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
def to_json(self) -> dict:
out: Dict[str, Any] = OrderedDict()
out['daemon_type'] = self.daemon_type
out['memory_usage'] = self.memory_usage
out['memory_request'] = self.memory_request
out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
out['version'] = self.version
out['status'] = self.status.value if self.status is not None else None
out['status_desc'] = self.status_desc
out['memory_usage'] = self.memory_usage
out['memory_request'] = self.memory_request
out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
out['version'] = self.version
out['status'] = self.status.value if self.status is not None else None
out['status_desc'] = self.status_desc
status_int = c.pop('status', None)
if 'daemon_name' in c:
del c['daemon_name']
+ if 'service_name' in c and c['service_name'].startswith('osd.'):
+ # if the service_name is a osd.NNN (numeric osd id) then
+ # ignore it -- it is not a valid service_name and
+ # (presumably) came from an older version of cephadm.
+ try:
+ int(c['service_name'][4:])
+ del c['service_name']
+ except ValueError:
+ pass
status = DaemonDescriptionStatus(status_int) if status_int is not None else None
return cls(events=events, status=status, **c)
When fetching inventory, use this filter to avoid unnecessarily
scanning the whole estate.
- Typical use: filter by host when presenting UI workflow for configuring
- a particular server.
- filter by label when not all of estate is Ceph servers,
- and we want to only learn about the Ceph servers.
- filter by label when we are interested particularly
- in e.g. OSD servers.
+ Typical use:
+ filter by host when presentig UI workflow for configuring
+ a particular server.
+ filter by label when not all of estate is Ceph servers,
+ and we want to only learn about the Ceph servers.
+ filter by label when we are interested particularly
+ in e.g. OSD servers.
"""
def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
return self.created == other.created and self.kind == other.kind \
and self.subject == other.subject and self.message == other.message
+ def __repr__(self) -> str:
+ return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
def _mk_orch_methods(cls: Any) -> Any:
# Needs to be defined outside of for.
return completion
return inner
- for meth in Orchestrator.__dict__:
- if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
- setattr(cls, meth, shim(meth))
+ for name, method in Orchestrator.__dict__.items():
+ if not name.startswith('_') and name not in ['is_orchestrator_module']:
+ remote_call = update_wrapper(shim(name), method)
+ setattr(cls, name, remote_call)
return cls