import time
import uuid
-from collections import namedtuple
+from collections import namedtuple, OrderedDict
+from contextlib import contextmanager
from functools import wraps
+import yaml
+
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
ServiceSpecValidationError, IscsiServiceSpec
from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.hostspec import HostSpec
from mgr_module import MgrModule, CLICommand, HandleCommandResult
try:
from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
- Type, Sequence, Dict, cast
+ Type, Sequence, Dict, cast
except ImportError:
pass
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
+T = TypeVar('T')
+
class OrchestratorError(Exception):
"""
It's not intended for programming errors or orchestrator internal errors.
"""
+ def __init__(self,
+ msg: str,
+ errno: int = -errno.EINVAL,
+ event_kind_subject: Optional[Tuple[str, str]] = None):
+ super(Exception, self).__init__(msg)
+ self.errno = errno
+ # See OrchestratorEvent.subject
+ self.event_subject = event_kind_subject
+
class NoOrchestrator(OrchestratorError):
"""
No orchestrator in configured.
"""
+
def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
- super(NoOrchestrator, self).__init__(msg)
+ super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
class OrchestratorValidationError(OrchestratorError):
"""
+@contextmanager
+def set_exception_subject(kind, subject, overwrite=False):
+ try:
+ yield
+ except OrchestratorError as e:
+ if overwrite or hasattr(e, 'event_subject'):
+ e.event_subject = (kind, subject)
+ raise
+
+
def handle_exception(prefix, cmd_args, desc, perm, func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
- except (OrchestratorError, ImportError, ServiceSpecValidationError) as e:
+ except (OrchestratorError, ServiceSpecValidationError) as e:
# Do not print Traceback for expected errors.
+ return HandleCommandResult(e.errno, stderr=str(e))
+ except ImportError as e:
return HandleCommandResult(-errno.ENOENT, stderr=str(e))
except NotImplementedError:
msg = 'This Orchestrator does not support `{}`'.format(prefix)
"""
def __init__(cls, name, bases, dct):
super(CLICommandMeta, cls).__init__(name, bases, dct)
- dispatch = {} # type: Dict[str, CLICommand]
+ dispatch: Dict[str, CLICommand] = {}
for v in dct.values():
try:
dispatch[v._prefix] = v._cli_command
RUNNING = 2
FINISHED = 3 # we have a final result
- NO_RESULT = _no_result() # type: None
+ NO_RESULT: None = _no_result()
ASYNC_RESULT = object()
def __init__(self,
- _first_promise=None, # type: Optional["_Promise"]
- value=NO_RESULT, # type: Optional[Any]
- on_complete=None, # type: Optional[Callable]
- name=None, # type: Optional[str]
+ _first_promise: Optional["_Promise"] = None,
+ value: Optional[Any] = NO_RESULT,
+ on_complete: Optional[Callable] = None,
+ name: Optional[str] = None,
):
self._on_complete_ = on_complete
self._name = name
- self._next_promise = None # type: Optional[_Promise]
+ self._next_promise: Optional[_Promise] = None
self._state = self.INITIALIZED
- self._exception = None # type: Optional[Exception]
+ self._exception: Optional[Exception] = None
# Value of this _Promise. may be an intermediate result.
self._value = value
# _Promise is not a continuation monad, as `_result` is of type
# T instead of (T -> r) -> r. Therefore we need to store the first promise here.
- self._first_promise = _first_promise or self # type: '_Promise'
+ self._first_promise: '_Promise' = _first_promise or self
@property
- def _exception(self):
- # type: () -> Optional[Exception]
+ def _exception(self) -> Optional[Exception]:
return getattr(self, '_exception_', None)
@_exception.setter
self._serialized_exception_ = pickle.dumps(e)
@property
- def _serialized_exception(self):
- # type: () -> Optional[bytes]
+ def _serialized_exception(self) -> Optional[bytes]:
return getattr(self, '_serialized_exception_', None)
-
-
@property
- def _on_complete(self):
- # type: () -> Optional[Callable]
+ def _on_complete(self) -> Optional[Callable]:
# https://github.com/python/mypy/issues/4125
return self._on_complete_
@_on_complete.setter
- def _on_complete(self, val):
- # type: (Optional[Callable]) -> None
+ def _on_complete(self, val: Optional[Callable]) -> None:
self._on_complete_ = val
-
def __repr__(self):
- name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
+ name = self._name or getattr(self._on_complete, '__name__',
+ '??') if self._on_complete else 'None'
val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
- self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
+ self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
+ next, '_progress_reference', 'NA'), repr(self._next_promise)
)
def pretty_print_1(self):
}[self._state]
return '{} {}({}),'.format(prefix, name, val)
- def then(self, on_complete):
- # type: (Any, Callable) -> Any
+ def then(self: Any, on_complete: Callable) -> Any:
"""
Call ``on_complete`` as soon as this promise is finalized.
"""
self._set_next_promise(self.__class__(_first_promise=self._first_promise))
return self._next_promise
- def _set_next_promise(self, next):
- # type: (_Promise) -> None
+ def _set_next_promise(self, next: '_Promise') -> None:
assert self is not next
assert self._state in (self.INITIALIZED, self.RUNNING)
# asynchronous promise
pass
-
def propagate_to_next(self):
self._state = self.FINISHED
logger.debug('finalized {}'.format(repr(self)))
if self._next_promise:
self._next_promise._finalize()
- def fail(self, e):
- # type: (Exception) -> None
+ def fail(self, e: Exception) -> None:
"""
Sets the whole completion to be faild with this exception and end the
evaluation.
assert other not in self
self._last_promise()._set_next_promise(other)
- def _last_promise(self):
- # type: () -> _Promise
+ def _last_promise(self) -> '_Promise':
return list(iter(self))[-1]
class ProgressReference(object):
def __init__(self,
- message, # type: str
+ message: str,
mgr,
- completion=None # type: Optional[Callable[[], Completion]]
- ):
+ completion: Optional[Callable[[], 'Completion']] = None
+ ):
"""
ProgressReference can be used within Completions::
#: The completion can already have a result, before the write
#: operation is effective. progress == 1 means, the services are
#: created / removed.
- self.completion = completion # type: Optional[Callable[[], Completion]]
+ self.completion: Optional[Callable[[], Completion]] = completion
#: if a orchestrator module can provide a more detailed
#: progress information, it needs to also call ``progress.update()``.
try:
if self.effective:
self.mgr.remote("progress", "complete", self.progress_id)
- self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
+ self.mgr.all_progress_references = [
+ p for p in self.mgr.all_progress_references if p is not self]
else:
self.mgr.remote("progress", "update", self.progress_id, self.message,
progress,
self.progress = 1
-class Completion(_Promise):
+class Completion(_Promise, Generic[T]):
"""
Combines multiple promises into one overall operation.
+---------------+ +-----------------+
"""
+
def __init__(self,
- _first_promise=None, # type: Optional["Completion"]
- value=_Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable]
- name=None, # type: Optional[str]
+ _first_promise: Optional["Completion"] = None,
+ value: Any = _Promise.NO_RESULT,
+ on_complete: Optional[Callable] = None,
+ name: Optional[str] = None,
):
super(Completion, self).__init__(_first_promise, value, on_complete, name)
@property
- def _progress_reference(self):
- # type: () -> Optional[ProgressReference]
+ def _progress_reference(self) -> Optional[ProgressReference]:
if hasattr(self._on_complete, 'progress_id'):
return self._on_complete # type: ignore
return None
@property
- def progress_reference(self):
- # type: () -> Optional[ProgressReference]
+ def progress_reference(self) -> Optional[ProgressReference]:
"""
ProgressReference. Marks this completion
as a write completeion.
"""
- references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
+ references = [c._progress_reference for c in iter(
+ self) if c._progress_reference is not None]
if references:
assert len(references) == 1
return references[0]
return None
@classmethod
- def with_progress(cls, # type: Any
- message, # type: str
+ def with_progress(cls: Any,
+ message: str,
mgr,
- _first_promise=None, # type: Optional["Completion"]
- value=_Promise.NO_RESULT, # type: Any
- on_complete=None, # type: Optional[Callable]
- calc_percent=None # type: Optional[Callable[[], Any]]
- ):
- # type: (...) -> Any
+ _first_promise: Optional["Completion"] = None,
+ value: Any = _Promise.NO_RESULT,
+ on_complete: Optional[Callable] = None,
+ calc_percent: Optional[Callable[[], Any]] = None
+ ) -> Any:
c = cls(
_first_promise=_first_promise,
return c._first_promise
def add_progress(self,
- message, # type: str
+ message: str,
mgr,
- calc_percent=None # type: Optional[Callable[[], Any]]
+ calc_percent: Optional[Callable[[], Any]] = None
):
return self.then(
on_complete=ProgressReference(
)
)
- def fail(self, e):
+ def fail(self, e: Exception):
super(Completion, self).fail(e)
if self._progress_reference:
self._progress_reference.fail()
- def finalize(self, result=_Promise.NO_RESULT):
+ def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
if self._first_promise._state == self.INITIALIZED:
self._first_promise._finalize(result)
@property
- def result(self):
+ def result(self) -> T:
"""
The result of the operation that we were waited
for. Only valid after calling Orchestrator.process() on this
"""
last = self._last_promise()
assert last._state == _Promise.FINISHED
- return last._value
+ return cast(T, last._value)
- def result_str(self):
+ def result_str(self) -> str:
"""Force a string."""
if self.result is None:
return ''
return str(self.result)
@property
- def exception(self):
- # type: () -> Optional[Exception]
+ def exception(self) -> Optional[Exception]:
return self._last_promise()._exception
@property
- def serialized_exception(self):
- # type: () -> Optional[bytes]
+ def serialized_exception(self) -> Optional[bytes]:
return self._last_promise()._serialized_exception
@property
- def has_result(self):
- # type: () -> bool
+ def has_result(self) -> bool:
"""
Has the operation already a result?
return self._last_promise()._state == _Promise.FINISHED
@property
- def is_errored(self):
- # type: () -> bool
+ def is_errored(self) -> bool:
"""
Has the completion failed. Default implementation looks for
self.exception. Can be overwritten.
return self.exception is not None
@property
- def needs_result(self):
- # type: () -> bool
+ def needs_result(self) -> bool:
"""
Could the external operation be deemed as complete,
or should we wait?
return not self.is_errored and not self.has_result
@property
- def is_finished(self):
- # type: () -> bool
+ def is_finished(self) -> bool:
"""
Could the external operation be deemed as complete,
or should we wait?
return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-def pretty_print(completions):
- # type: (Sequence[Completion]) -> str
+def pretty_print(completions: Sequence[Completion]) -> str:
return ', '.join(c.pretty_print() for c in completions)
-def raise_if_exception(c):
- # type: (Completion) -> None
+def raise_if_exception(c: Completion) -> None:
"""
:raises OrchestratorError: Some user error or a config error.
:raises Exception: Some internal error
raise e
-class TrivialReadCompletion(Completion):
+class TrivialReadCompletion(Completion[T]):
"""
This is the trivial completion simply wrapping a result.
"""
- def __init__(self, result):
+
+ def __init__(self, result: T):
super(TrivialReadCompletion, self).__init__()
if result:
self.finalize(result)
return True
@_hide_in_features
- def available(self):
- # type: () -> Tuple[bool, str]
+ def available(self) -> Tuple[bool, str]:
"""
Report whether we can talk to the orchestrator. This is the
place to give the user a meaningful message if the orchestrator
raise NotImplementedError()
@_hide_in_features
- def process(self, completions):
- # type: (List[Completion]) -> None
+ def process(self, completions: List[Completion]) -> None:
"""
Given a list of Completion instances, process any which are
incomplete.
}
return features
- def cancel_completions(self):
- # type: () -> None
+ def cancel_completions(self) -> None:
"""
Cancels ongoing completions. Unstuck the mgr.
"""
raise NotImplementedError()
- def pause(self):
- # type: () -> None
+ def pause(self) -> None:
raise NotImplementedError()
- def resume(self):
- # type: () -> None
+ def resume(self) -> None:
raise NotImplementedError()
- def add_host(self, host_spec):
- # type: (HostSpec) -> Completion
+ def add_host(self, host_spec: HostSpec) -> Completion[str]:
"""
Add a host to the orchestrator inventory.
"""
raise NotImplementedError()
- def remove_host(self, host):
- # type: (str) -> Completion
+ def remove_host(self, host: str) -> Completion[str]:
"""
Remove a host from the orchestrator inventory.
"""
raise NotImplementedError()
- def update_host_addr(self, host, addr):
- # type: (str, str) -> Completion
+ def update_host_addr(self, host: str, addr: str) -> Completion[str]:
"""
Update a host's address
"""
raise NotImplementedError()
- def get_hosts(self):
- # type: () -> Completion
+ def get_hosts(self) -> Completion[List[HostSpec]]:
"""
Report the hosts in the cluster.
"""
raise NotImplementedError()
- def add_host_label(self, host, label):
- # type: (str, str) -> Completion
+ def add_host_label(self, host: str, label: str) -> Completion[str]:
"""
Add a host label
"""
raise NotImplementedError()
- def remove_host_label(self, host, label):
- # type: (str, str) -> Completion
+ def remove_host_label(self, host: str, label: str) -> Completion[str]:
"""
Remove a host label
"""
raise NotImplementedError()
- def get_inventory(self, host_filter=None, refresh=False):
- # type: (Optional[InventoryFilter], bool) -> Completion
+ def host_ok_to_stop(self, hostname: str) -> Completion:
+ """
+ Check if the specified host can be safely stopped without reducing availability
+
+ :param host: hostname
+ """
+ raise NotImplementedError()
+
+ def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
"""
Returns something that was created by `ceph-volume inventory`.
"""
raise NotImplementedError()
- def describe_service(self, service_type=None, service_name=None, refresh=False):
- # type: (Optional[str], Optional[str], bool) -> Completion
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
"""
raise NotImplementedError()
- def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
- # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion
+ def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
"""
Describe a daemon (of any kind) that is already configured in
the orchestrator.
"""
raise NotImplementedError()
- def apply(self, specs: List["GenericSpec"]) -> Completion:
+ def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
"""
Applies any spec
"""
completion = completion.then(next)
return completion
- def remove_daemons(self, names):
- # type: (List[str]) -> Completion
+ def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
+ """
+ Plan (Dry-run, Preview) a List of Specs.
+ """
+ raise NotImplementedError()
+
+ def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
"""
Remove specific daemon(s).
"""
raise NotImplementedError()
- def remove_service(self, service_name):
- # type: (str) -> Completion
+ def remove_service(self, service_name: str) -> Completion[str]:
"""
Remove a service (a collection of daemons).
"""
raise NotImplementedError()
- def service_action(self, action, service_name):
- # type: (str, str) -> Completion
+ def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
"""
Perform an action (start/stop/reload) on a service (i.e., all daemons
providing the logical service).
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
- :param service_type: e.g. "mds", "rgw", ...
- :param service_name: name of logical service ("cephfs", "us-east", ...)
+ :param service_name: service_type + '.' + service_id
+ (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
:rtype: Completion
"""
- #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ # assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def daemon_action(self, action, daemon_type, daemon_id):
- # type: (str, str, str) -> Completion
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> Completion[str]:
"""
Perform an action (start/stop/reload) on a daemon.
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
- :param name: name of daemon
+ :param daemon_name: name of daemon
+ :param image: Container image when redeploying that daemon
:rtype: Completion
"""
- #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+ # assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> Completion
+ def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
"""
Create one or more OSDs within a single Drive Group.
"""
raise NotImplementedError()
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion:
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
""" Update OSD cluster """
raise NotImplementedError()
def preview_osdspecs(self,
osdspec_name: Optional[str] = 'osd',
osdspecs: Optional[List[DriveGroupSpec]] = None
- ) -> Completion:
+ ) -> Completion[str]:
""" Get a preview for OSD deployments """
raise NotImplementedError()
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> Completion:
+ force: bool = False) -> Completion[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
"""
raise NotImplementedError()
- def remove_osds_status(self):
- # type: () -> Completion
+ def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+ """
+ TODO
+ """
+ raise NotImplementedError()
+
+ def remove_osds_status(self) -> Completion:
"""
Returns a status of the ongoing OSD removal operations.
"""
raise NotImplementedError()
- def blink_device_light(self, ident_fault, on, locations):
- # type: (str, bool, List[DeviceLightLoc]) -> Completion
+ def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
"""
Instructs the orchestrator to enable or disable either the ident or the fault LED.
"""
raise NotImplementedError()
- def zap_device(self, host, path):
- # type: (str, str) -> Completion
+ def zap_device(self, host: str, path: str) -> Completion[str]:
"""Zap/Erase a device (DESTROYS DATA)"""
raise NotImplementedError()
- def add_mon(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create mon daemon(s)"""
raise NotImplementedError()
- def apply_mon(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
"""Update mon cluster"""
raise NotImplementedError()
- def add_mgr(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create mgr daemon(s)"""
raise NotImplementedError()
- def apply_mgr(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
"""Update mgr cluster"""
raise NotImplementedError()
- def add_mds(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create MDS daemon(s)"""
raise NotImplementedError()
- def apply_mds(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
"""Update MDS cluster"""
raise NotImplementedError()
- def add_rgw(self, spec):
- # type: (RGWSpec) -> Completion
+ def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
"""Create RGW daemon(s)"""
raise NotImplementedError()
- def apply_rgw(self, spec):
- # type: (RGWSpec) -> Completion
+ def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
"""Update RGW cluster"""
raise NotImplementedError()
- def add_rbd_mirror(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create rbd-mirror daemon(s)"""
raise NotImplementedError()
- def apply_rbd_mirror(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
"""Update rbd-mirror cluster"""
raise NotImplementedError()
- def add_nfs(self, spec):
- # type: (NFSServiceSpec) -> Completion
+ def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
"""Create NFS daemon(s)"""
raise NotImplementedError()
- def apply_nfs(self, spec):
- # type: (NFSServiceSpec) -> Completion
+ def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
"""Update NFS cluster"""
raise NotImplementedError()
- def add_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> Completion
+ def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
"""Create iscsi daemon(s)"""
raise NotImplementedError()
- def apply_iscsi(self, spec):
- # type: (IscsiServiceSpec) -> Completion
+ def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
"""Update iscsi cluster"""
raise NotImplementedError()
- def add_prometheus(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create new prometheus daemon"""
raise NotImplementedError()
- def apply_prometheus(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
"""Update prometheus cluster"""
raise NotImplementedError()
- def add_node_exporter(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new Node-Exporter service"""
raise NotImplementedError()
- def apply_node_exporter(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
- def add_crash(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new crash service"""
raise NotImplementedError()
- def apply_crash(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
- def add_grafana(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new Node-Exporter service"""
raise NotImplementedError()
- def apply_grafana(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
- def add_alertmanager(self, spec):
- # type: (ServiceSpec) -> Completion
+ def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
"""Create a new AlertManager service"""
raise NotImplementedError()
- def apply_alertmanager(self, spec):
- # type: (ServiceSpec) -> Completion
+ def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
"""Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def upgrade_check(self, image, version):
- # type: (Optional[str], Optional[str]) -> Completion
+ def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
raise NotImplementedError()
- def upgrade_start(self, image, version):
- # type: (Optional[str], Optional[str]) -> Completion
+ def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
raise NotImplementedError()
- def upgrade_pause(self):
- # type: () -> Completion
+ def upgrade_pause(self) -> Completion[str]:
raise NotImplementedError()
- def upgrade_resume(self):
- # type: () -> Completion
+ def upgrade_resume(self) -> Completion[str]:
raise NotImplementedError()
- def upgrade_stop(self):
- # type: () -> Completion
+ def upgrade_stop(self) -> Completion[str]:
raise NotImplementedError()
- def upgrade_status(self):
- # type: () -> Completion
+ def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
raise NotImplementedError()
@_hide_in_features
- def upgrade_available(self):
- # type: () -> Completion
+ def upgrade_available(self) -> Completion:
"""
Report on what versions are available to upgrade to
raise NotImplementedError()
-class HostSpec(object):
- """
- Information about hosts. Like e.g. ``kubectl get nodes``
- """
- def __init__(self,
- hostname, # type: str
- addr=None, # type: Optional[str]
- labels=None, # type: Optional[List[str]]
- status=None, # type: Optional[str]
- ):
- self.service_type = 'host'
-
- #: the bare hostname on the host. Not the FQDN.
- self.hostname = hostname # type: str
-
- #: DNS name or IP address to reach it
- self.addr = addr or hostname # type: str
-
- #: label(s), if any
- self.labels = labels or [] # type: List[str]
-
- #: human readable status
- self.status = status or '' # type: str
-
- def to_json(self):
- return {
- 'hostname': self.hostname,
- 'addr': self.addr,
- 'labels': self.labels,
- 'status': self.status,
- }
-
- @classmethod
- def from_json(cls, host_spec):
- _cls = cls(host_spec['hostname'],
- host_spec['addr'] if 'addr' in host_spec else None,
- host_spec['labels'] if 'labels' in host_spec else None)
- return _cls
-
- def __repr__(self):
- args = [self.hostname] # type: List[Any]
- if self.addr is not None:
- args.append(self.addr)
- if self.labels:
- args.append(self.labels)
- if self.status:
- args.append(self.status)
-
- return "<HostSpec>({})".format(', '.join(map(repr, args)))
-
- def __eq__(self, other):
- # Let's omit `status` for the moment, as it is still the very same host.
- return self.hostname == other.hostname and \
- self.addr == other.addr and \
- self.labels == other.labels
-
GenericSpec = Union[ServiceSpec, HostSpec]
-def json_to_generic_spec(spec):
- # type: (dict) -> GenericSpec
+
+def json_to_generic_spec(spec: dict) -> GenericSpec:
if 'service_type' in spec and spec['service_type'] == 'host':
return HostSpec.from_json(spec)
else:
return ServiceSpec.from_json(spec)
+
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
def __init__(self):
started=None,
last_configured=None,
osdspec_affinity=None,
- last_deployed=None):
+ last_deployed=None,
+ events: Optional[List['OrchestratorEvent']] = None,
+ is_active: bool=False):
+
# Host is at the same granularity as InventoryHost
- self.hostname = hostname
+ self.hostname: str = hostname
# Not everyone runs in containers, but enough people do to
# justify having the container_id (runtime id) and container_image
# typically either based on hostnames or on pod names.
# This is the <foo> in mds.<foo>, the ID that will appear
# in the FSMap/ServiceMap.
- self.daemon_id = daemon_id
+ self.daemon_id: str = daemon_id
# Service version that was deployed
self.version = version
self.status_desc = status_desc
# datetime when this info was last refreshed
- self.last_refresh = last_refresh # type: Optional[datetime.datetime]
+ self.last_refresh: Optional[datetime.datetime] = last_refresh
- self.created = created # type: Optional[datetime.datetime]
- self.started = started # type: Optional[datetime.datetime]
- self.last_configured = last_configured # type: Optional[datetime.datetime]
- self.last_deployed = last_deployed # type: Optional[datetime.datetime]
+ self.created: Optional[datetime.datetime] = created
+ self.started: Optional[datetime.datetime] = started
+ self.last_configured: Optional[datetime.datetime] = last_configured
+ self.last_deployed: Optional[datetime.datetime] = last_deployed
# Affinity to a certain OSDSpec
- self.osdspec_affinity = osdspec_affinity # type: Optional[str]
+ self.osdspec_affinity: Optional[str] = osdspec_affinity
+
+ self.events: List[OrchestratorEvent] = events or []
+
+ self.is_active = is_active
def name(self):
return '%s.%s' % (self.daemon_type, self.daemon_id)
- def matches_service(self, service_name):
- # type: (Optional[str]) -> bool
+ def matches_service(self, service_name: Optional[str]) -> bool:
if service_name:
return self.name().startswith(service_name + '.')
return False
def service_id(self):
+ if self.daemon_type == 'osd' and self.osdspec_affinity:
+ return self.osdspec_affinity
+
def _match():
- err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " \
- f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
+ err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
+ f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
if not self.hostname:
# TODO: can a DaemonDescription exist without a hostname?
# daemon_id == "service_id"
return self.daemon_id
- if self.daemon_type in ['mds', 'nfs', 'iscsi', 'rgw']:
+ if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
return _match()
return self.daemon_id
def service_name(self):
- if self.daemon_type in ['rgw', 'mds', 'nfs', 'iscsi']:
+ if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
return f'{self.daemon_type}.{self.service_id()}'
return self.daemon_type
id=self.daemon_id)
def to_json(self):
- out = {
- 'hostname': self.hostname,
- 'container_id': self.container_id,
- 'container_image_id': self.container_image_id,
- 'container_image_name': self.container_image_name,
- 'daemon_id': self.daemon_id,
- 'daemon_type': self.daemon_type,
- 'version': self.version,
- 'status': self.status,
- 'status_desc': self.status_desc,
- }
+ out = OrderedDict()
+ out['daemon_type'] = self.daemon_type
+ out['daemon_id'] = self.daemon_id
+ out['hostname'] = self.hostname
+ out['container_id'] = self.container_id
+ out['container_image_id'] = self.container_image_id
+ out['container_image_name'] = self.container_image_name
+ out['version'] = self.version
+ out['status'] = self.status
+ out['status_desc'] = self.status_desc
+ if self.daemon_type == 'osd':
+ out['osdspec_affinity'] = self.osdspec_affinity
+ out['is_active'] = self.is_active
+
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
if getattr(self, k):
out[k] = getattr(self, k).strftime(DATEFMT)
- return {k: v for (k, v) in out.items() if v is not None}
+
+ if self.events:
+ out['events'] = [e.to_json() for e in self.events]
+
+ empty = [k for k, v in out.items() if v is None]
+ for e in empty:
+ del out[e]
+ return out
@classmethod
@handle_type_error
def from_json(cls, data):
c = data.copy()
+ event_strs = c.pop('events', [])
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
if k in c:
c[k] = datetime.datetime.strptime(c[k], DATEFMT)
- return cls(**c)
+ events = [OrchestratorEvent.from_json(e) for e in event_strs]
+ return cls(events=events, **c)
def __copy__(self):
# feel free to change this:
return DaemonDescription.from_json(self.to_json())
+ @staticmethod
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
+ return dumper.represent_dict(data.to_json().items())
+
+
+yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
+
+
class ServiceDescription(object):
"""
For responding to queries about the status of a particular service,
last_refresh=None,
created=None,
size=0,
- running=0):
+ running=0,
+ events: Optional[List['OrchestratorEvent']] = None):
# Not everyone runs in containers, but enough people do to
# justify having the container_image_id (image hash) and container_image
# (image name)
self.running = running
# datetime when this info was last refreshed
- self.last_refresh = last_refresh # type: Optional[datetime.datetime]
- self.created = created # type: Optional[datetime.datetime]
+ self.last_refresh: Optional[datetime.datetime] = last_refresh
+ self.created: Optional[datetime.datetime] = created
self.spec: ServiceSpec = spec
+ self.events: List[OrchestratorEvent] = events or []
+
def service_type(self):
return self.spec.service_type
def __repr__(self):
return f"<ServiceDescription of {self.spec.one_line_str()}>"
- def to_json(self):
+ def to_json(self) -> OrderedDict:
out = self.spec.to_json()
status = {
'container_image_id': self.container_image_id,
'size': self.size,
'running': self.running,
'last_refresh': self.last_refresh,
- 'created': self.created
+ 'created': self.created,
}
for k in ['last_refresh', 'created']:
if getattr(self, k):
status[k] = getattr(self, k).strftime(DATEFMT)
status = {k: v for (k, v) in status.items() if v is not None}
out['status'] = status
+ if self.events:
+ out['events'] = [e.to_json() for e in self.events]
return out
@classmethod
def from_json(cls, data: dict):
c = data.copy()
status = c.pop('status', {})
+ event_strs = c.pop('events', [])
spec = ServiceSpec.from_json(c)
c_status = status.copy()
for k in ['last_refresh', 'created']:
if k in c_status:
c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
- return cls(spec=spec, **c_status)
+ events = [OrchestratorEvent.from_json(e) for e in event_strs]
+ return cls(spec=spec, events=events, **c_status)
+
+ @staticmethod
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
+ return dumper.represent_dict(data.to_json().items())
+
+
+yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
class InventoryFilter(object):
in e.g. OSD servers.
"""
- def __init__(self, labels=None, hosts=None):
- # type: (Optional[List[str]], Optional[List[str]]) -> None
+
+ def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
#: Optional: get info about hosts matching labels
self.labels = labels
When fetching inventory, all Devices are groups inside of an
InventoryHost.
"""
- def __init__(self, name, devices=None, labels=None, addr=None):
- # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
+
+ def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
if devices is None:
devices = inventory.Devices([])
if labels is None:
except TypeError as e:
raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
-
@classmethod
def from_nested_items(cls, hosts):
devs = inventory.Devices.from_json
return "<InventoryHost>({name})".format(name=self.name)
@staticmethod
- def get_host_names(hosts):
- # type: (List[InventoryHost]) -> List[str]
+ def get_host_names(hosts: List['InventoryHost']) -> List[str]:
return [host.name for host in hosts]
def __eq__(self, other):
__slots__ = ()
+class OrchestratorEvent:
+ """
+ Similar to K8s Events.
+
+ Some form of "important" log message attached to something.
+ """
+ INFO = 'INFO'
+ ERROR = 'ERROR'
+ regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
+
+ def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
+ if isinstance(created, str):
+ created = datetime.datetime.strptime(created, DATEFMT)
+ self.created: datetime.datetime = created
+
+ assert kind in "service daemon".split()
+ self.kind: str = kind
+
+ # service name, or daemon danem or something
+ self.subject: str = subject
+
+ # Events are not meant for debugging. debugs should end in the log.
+ assert level in "INFO ERROR".split()
+ self.level = level
+
+ self.message: str = message
+
+ __slots__ = ('created', 'kind', 'subject', 'level', 'message')
+
+ def kind_subject(self) -> str:
+ return f'{self.kind}:{self.subject}'
+
+ def to_json(self) -> str:
+ # Make a long list of events readable.
+ created = self.created.strftime(DATEFMT)
+ return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
+
+ @classmethod
+ @handle_type_error
+ def from_json(cls, data) -> "OrchestratorEvent":
+ """
+ >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
+ '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+
+ :param data:
+ :return:
+ """
+ match = cls.regex_v1.match(data)
+ if match:
+ return cls(*match.groups())
+ raise ValueError(f'Unable to match: "{data}"')
+
+ def __eq__(self, other):
+ if not isinstance(other, OrchestratorEvent):
+ return False
+
+ return self.created == other.created and self.kind == other.kind \
+ and self.subject == other.subject and self.message == other.message
+
+
def _mk_orch_methods(cls):
# Needs to be defined outside of for.
# Otherwise meth is always bound to last key
... self.orch_client.set_mgr(self.mgr))
"""
- def set_mgr(self, mgr):
- # type: (MgrModule) -> None
+ def set_mgr(self, mgr: MgrModule) -> None:
"""
Useable in the Dashbord that uses a global ``mgr``
"""
raise NotImplementedError(f'{o} does not implement {meth}') from e
raise
- def _orchestrator_wait(self, completions):
- # type: (List[Completion]) -> None
+ def _orchestrator_wait(self, completions: List[Completion]) -> None:
"""
Wait for completions to complete (reads) or
become persistent (writes).