import copy
import datetime
+import enum
import errno
import logging
import pickle
import re
-import time
-import uuid
from collections import namedtuple, OrderedDict
from contextlib import contextmanager
-from functools import wraps
+from functools import wraps, reduce, update_wrapper
+
+from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
+ Sequence, Dict, cast, Mapping
+
+try:
+ from typing import Protocol # Protocol was added in Python 3.8
+except ImportError:
+ class Protocol: # type: ignore
+ pass
+
import yaml
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- ServiceSpecValidationError, IscsiServiceSpec
+ IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.hostspec import HostSpec
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
+from ceph.utils import datetime_to_str, str_to_datetime
from mgr_module import MgrModule, CLICommand, HandleCommandResult
-try:
- from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
- Type, Sequence, Dict, cast
-except ImportError:
- pass
logger = logging.getLogger(__name__)
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
-
T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
class OrchestratorError(Exception):
def __init__(self,
msg: str,
errno: int = -errno.EINVAL,
- event_kind_subject: Optional[Tuple[str, str]] = None):
+ event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
super(Exception, self).__init__(msg)
self.errno = errno
# See OrchestratorEvent.subject
No orchestrator in configured.
"""
- def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
+ def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
@contextmanager
-def set_exception_subject(kind, subject, overwrite=False):
+def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
try:
yield
except OrchestratorError as e:
raise
-def handle_exception(prefix, cmd_args, desc, perm, func):
+def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
@wraps(func)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
- except (OrchestratorError, ServiceSpecValidationError) as e:
+ except (OrchestratorError, SpecValidationError) as e:
# Do not print Traceback for expected errors.
return HandleCommandResult(e.errno, stderr=str(e))
except ImportError as e:
msg = 'This Orchestrator does not support `{}`'.format(prefix)
return HandleCommandResult(-errno.ENOENT, stderr=msg)
- # misuse partial to copy `wrapper`
- wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
+ # misuse lambda to copy `wrapper`
+ wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731
wrapper_copy._prefix = prefix # type: ignore
- wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore
+ wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore
+ wrapper_copy._cli_command.store_func_metadata(func) # type: ignore
wrapper_copy._cli_command.func = wrapper_copy # type: ignore
- return wrapper_copy
+ return cast(FuncT, wrapper_copy)
+
+
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+ """
+ Decorator to make Orchestrator methods return
+ an OrchResult.
+ """
+
+ @wraps(f)
+ def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+ try:
+ return OrchResult(f(*args, **kwargs))
+ except Exception as e:
+ logger.exception(e)
+ import os
+ if 'UNITTEST' in os.environ:
+ raise # This makes debugging of Tracebacks from unittests a bit easier
+ return OrchResult(None, exception=e)
+
+ return cast(Callable[..., OrchResult[T]], wrapper)
-def _cli_command(perm):
- def inner_cli_command(prefix, cmd_args="", desc=""):
- return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
+class InnerCliCommandCallable(Protocol):
+ def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
+ ...
+
+
+def _cli_command(perm: str) -> InnerCliCommandCallable:
+ def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
+ return lambda func: handle_exception(prefix, perm, func)
return inner_cli_command
We make use of CLICommand, except for the use of the global variable.
"""
- def __init__(cls, name, bases, dct):
+ def __init__(cls, name: str, bases: Any, dct: Any) -> None:
super(CLICommandMeta, cls).__init__(name, bases, dct)
dispatch: Dict[str, CLICommand] = {}
for v in dct.values():
except AttributeError:
pass
- def handle_command(self, inbuf, cmd):
+ def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
if cmd['prefix'] not in dispatch:
return self.handle_command(inbuf, cmd)
cls.handle_command = handle_command
-def _no_result():
- return object()
-
-
-class _Promise(object):
+class OrchResult(Generic[T]):
"""
- A completion may need multiple promises to be fulfilled. `_Promise` is one
- step.
-
- Typically ``Orchestrator`` implementations inherit from this class to
- build their own way of finishing a step to fulfil a future.
-
- They are not exposed in the orchestrator interface and can be seen as a
- helper to build orchestrator modules.
+ Stores a result and an exception. Mainly to circumvent the
+ MgrModule.remote() method that hides all exceptions and for
+ handling different sub-interpreters.
"""
- INITIALIZED = 1 # We have a parent completion and a next completion
- RUNNING = 2
- FINISHED = 3 # we have a final result
- NO_RESULT: None = _no_result()
- ASYNC_RESULT = object()
-
- def __init__(self,
- _first_promise: Optional["_Promise"] = None,
- value: Optional[Any] = NO_RESULT,
- on_complete: Optional[Callable] = None,
- name: Optional[str] = None,
- ):
- self._on_complete_ = on_complete
- self._name = name
- self._next_promise: Optional[_Promise] = None
+ def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+ self.result = result
+ self.serialized_exception: Optional[bytes] = None
+ self.exception_str: str = ''
+ self.set_exception(exception)
- self._state = self.INITIALIZED
- self._exception: Optional[Exception] = None
+ __slots__ = 'result', 'serialized_exception', 'exception_str'
- # Value of this _Promise. may be an intermediate result.
- self._value = value
+ def set_exception(self, e: Optional[Exception]) -> None:
+ if e is None:
+ self.serialized_exception = None
+ self.exception_str = ''
+ return
- # _Promise is not a continuation monad, as `_result` is of type
- # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
- self._first_promise: '_Promise' = _first_promise or self
-
- @property
- def _exception(self) -> Optional[Exception]:
- return getattr(self, '_exception_', None)
-
- @_exception.setter
- def _exception(self, e):
- self._exception_ = e
+ self.exception_str = f'{type(e)}: {str(e)}'
try:
- self._serialized_exception_ = pickle.dumps(e) if e is not None else None
+ self.serialized_exception = pickle.dumps(e)
except pickle.PicklingError:
logger.error(f"failed to pickle {e}")
if isinstance(e, Exception):
else:
e = Exception(str(e))
# degenerate to a plain Exception
- self._serialized_exception_ = pickle.dumps(e)
-
- @property
- def _serialized_exception(self) -> Optional[bytes]:
- return getattr(self, '_serialized_exception_', None)
-
- @property
- def _on_complete(self) -> Optional[Callable]:
- # https://github.com/python/mypy/issues/4125
- return self._on_complete_
-
- @_on_complete.setter
- def _on_complete(self, val: Optional[Callable]) -> None:
- self._on_complete_ = val
-
- def __repr__(self):
- name = self._name or getattr(self._on_complete, '__name__',
- '??') if self._on_complete else 'None'
- val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
- return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
- self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
- next, '_progress_reference', 'NA'), repr(self._next_promise)
- )
-
- def pretty_print_1(self):
- if self._name:
- name = self._name
- elif self._on_complete is None:
- name = 'lambda x: x'
- elif hasattr(self._on_complete, '__name__'):
- name = getattr(self._on_complete, '__name__')
- else:
- name = self._on_complete.__class__.__name__
- val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
- prefix = {
- self.INITIALIZED: ' ',
- self.RUNNING: ' >>>',
- self.FINISHED: '(done)'
- }[self._state]
- return '{} {}({}),'.format(prefix, name, val)
-
- def then(self: Any, on_complete: Callable) -> Any:
- """
- Call ``on_complete`` as soon as this promise is finalized.
- """
- assert self._state in (self.INITIALIZED, self.RUNNING)
-
- if self._next_promise is not None:
- return self._next_promise.then(on_complete)
-
- if self._on_complete is not None:
- self._set_next_promise(self.__class__(
- _first_promise=self._first_promise,
- on_complete=on_complete
- ))
- return self._next_promise
-
- else:
- self._on_complete = on_complete
- self._set_next_promise(self.__class__(_first_promise=self._first_promise))
- return self._next_promise
-
- def _set_next_promise(self, next: '_Promise') -> None:
- assert self is not next
- assert self._state in (self.INITIALIZED, self.RUNNING)
-
- self._next_promise = next
- assert self._next_promise is not None
- for p in iter(self._next_promise):
- p._first_promise = self._first_promise
-
- def _finalize(self, value=NO_RESULT):
- """
- Sets this promise to complete.
-
- Orchestrators may choose to use this helper function.
-
- :param value: new value.
- """
- if self._state not in (self.INITIALIZED, self.RUNNING):
- raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
- self._state = self.RUNNING
-
- if value is not self.NO_RESULT:
- self._value = value
- assert self._value is not self.NO_RESULT, repr(self)
-
- if self._on_complete:
- try:
- next_result = self._on_complete(self._value)
- except Exception as e:
- self.fail(e)
- return
- else:
- next_result = self._value
-
- if isinstance(next_result, _Promise):
- # hack: _Promise is not a continuation monad.
- next_result = next_result._first_promise # type: ignore
- assert next_result not in self, repr(self._first_promise) + repr(next_result)
- assert self not in next_result
- next_result._append_promise(self._next_promise)
- self._set_next_promise(next_result)
- assert self._next_promise
- if self._next_promise._value is self.NO_RESULT:
- self._next_promise._value = self._value
- self.propagate_to_next()
- elif next_result is not self.ASYNC_RESULT:
- # simple map. simply forward
- if self._next_promise:
- self._next_promise._value = next_result
- else:
- # Hack: next_result is of type U, _value is of type T
- self._value = next_result # type: ignore
- self.propagate_to_next()
- else:
- # asynchronous promise
- pass
-
- def propagate_to_next(self):
- self._state = self.FINISHED
- logger.debug('finalized {}'.format(repr(self)))
- if self._next_promise:
- self._next_promise._finalize()
-
- def fail(self, e: Exception) -> None:
- """
- Sets the whole completion to be faild with this exception and end the
- evaluation.
- """
- if self._state == self.FINISHED:
- raise ValueError(
- 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
- assert self._state in (self.INITIALIZED, self.RUNNING)
- logger.exception('_Promise failed')
- self._exception = e
- self._value = f'_exception: {e}'
- if self._next_promise:
- self._next_promise.fail(e)
- self._state = self.FINISHED
-
- def __contains__(self, item):
- return any(item is p for p in iter(self._first_promise))
-
- def __iter__(self):
- yield self
- elem = self._next_promise
- while elem is not None:
- yield elem
- elem = elem._next_promise
-
- def _append_promise(self, other):
- if other is not None:
- assert self not in other
- assert other not in self
- self._last_promise()._set_next_promise(other)
-
- def _last_promise(self) -> '_Promise':
- return list(iter(self))[-1]
-
-
-class ProgressReference(object):
- def __init__(self,
- message: str,
- mgr,
- completion: Optional[Callable[[], 'Completion']] = None
- ):
- """
- ProgressReference can be used within Completions::
-
- +---------------+ +---------------------------------+
- | | then | |
- | My Completion | +--> | on_complete=ProgressReference() |
- | | | |
- +---------------+ +---------------------------------+
-
- See :func:`Completion.with_progress` for an easy way to create
- a progress reference
-
- """
- super(ProgressReference, self).__init__()
- self.progress_id = str(uuid.uuid4())
- self.message = message
- self.mgr = mgr
-
- #: The completion can already have a result, before the write
- #: operation is effective. progress == 1 means, the services are
- #: created / removed.
- self.completion: Optional[Callable[[], Completion]] = completion
-
- #: if a orchestrator module can provide a more detailed
- #: progress information, it needs to also call ``progress.update()``.
- self.progress = 0.0
-
- self._completion_has_result = False
- self.mgr.all_progress_references.append(self)
-
- def __str__(self):
- """
- ``__str__()`` is used for determining the message for progress events.
- """
- return self.message or super(ProgressReference, self).__str__()
-
- def __call__(self, arg):
- self._completion_has_result = True
- self.progress = 1.0
- return arg
-
- @property
- def progress(self):
- return self._progress
-
- @progress.setter
- def progress(self, progress):
- assert progress <= 1.0
- self._progress = progress
- try:
- if self.effective:
- self.mgr.remote("progress", "complete", self.progress_id)
- self.mgr.all_progress_references = [
- p for p in self.mgr.all_progress_references if p is not self]
- else:
- self.mgr.remote("progress", "update", self.progress_id, self.message,
- progress,
- [("origin", "orchestrator")])
- except ImportError:
- # If the progress module is disabled that's fine,
- # they just won't see the output.
- pass
-
- @property
- def effective(self):
- return self.progress == 1 and self._completion_has_result
-
- def update(self):
- def progress_run(progress):
- self.progress = progress
- if self.completion:
- c = self.completion().then(progress_run)
- self.mgr.process([c._first_promise])
- else:
- self.progress = 1
-
- def fail(self):
- self._completion_has_result = True
- self.progress = 1
-
-
-class Completion(_Promise, Generic[T]):
- """
- Combines multiple promises into one overall operation.
-
- Completions are composable by being able to
- call one completion from another completion. I.e. making them re-usable
- using Promises E.g.::
-
- >>> #doctest: +SKIP
- ... return Orchestrator().get_hosts().then(self._create_osd)
-
- where ``get_hosts`` returns a Completion of list of hosts and
- ``_create_osd`` takes a list of hosts.
-
- The concept behind this is to store the computation steps
- explicit and then explicitly evaluate the chain:
-
- >>> #doctest: +SKIP
- ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
- ... p.finalize(2)
- ... assert p.result = "4"
-
- or graphically::
-
- +---------------+ +-----------------+
- | | then | |
- | lambda x: x*x | +--> | lambda x: str(x)|
- | | | |
- +---------------+ +-----------------+
-
- """
-
- def __init__(self,
- _first_promise: Optional["Completion"] = None,
- value: Any = _Promise.NO_RESULT,
- on_complete: Optional[Callable] = None,
- name: Optional[str] = None,
- ):
- super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
- @property
- def _progress_reference(self) -> Optional[ProgressReference]:
- if hasattr(self._on_complete, 'progress_id'):
- return self._on_complete # type: ignore
- return None
-
- @property
- def progress_reference(self) -> Optional[ProgressReference]:
- """
- ProgressReference. Marks this completion
- as a write completeion.
- """
-
- references = [c._progress_reference for c in iter(
- self) if c._progress_reference is not None]
- if references:
- assert len(references) == 1
- return references[0]
- return None
-
- @classmethod
- def with_progress(cls: Any,
- message: str,
- mgr,
- _first_promise: Optional["Completion"] = None,
- value: Any = _Promise.NO_RESULT,
- on_complete: Optional[Callable] = None,
- calc_percent: Optional[Callable[[], Any]] = None
- ) -> Any:
-
- c = cls(
- _first_promise=_first_promise,
- value=value,
- on_complete=on_complete
- ).add_progress(message, mgr, calc_percent)
-
- return c._first_promise
-
- def add_progress(self,
- message: str,
- mgr,
- calc_percent: Optional[Callable[[], Any]] = None
- ):
- return self.then(
- on_complete=ProgressReference(
- message=message,
- mgr=mgr,
- completion=calc_percent
- )
- )
-
- def fail(self, e: Exception):
- super(Completion, self).fail(e)
- if self._progress_reference:
- self._progress_reference.fail()
-
- def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
- if self._first_promise._state == self.INITIALIZED:
- self._first_promise._finalize(result)
-
- @property
- def result(self) -> T:
- """
- The result of the operation that we were waited
- for. Only valid after calling Orchestrator.process() on this
- completion.
- """
- last = self._last_promise()
- assert last._state == _Promise.FINISHED
- return cast(T, last._value)
+ self.serialized_exception = pickle.dumps(e)
def result_str(self) -> str:
"""Force a string."""
return '\n'.join(str(x) for x in self.result)
return str(self.result)
- @property
- def exception(self) -> Optional[Exception]:
- return self._last_promise()._exception
-
- @property
- def serialized_exception(self) -> Optional[bytes]:
- return self._last_promise()._serialized_exception
- @property
- def has_result(self) -> bool:
- """
- Has the operation already a result?
-
- For Write operations, it can already have a
- result, if the orchestrator's configuration is
- persistently written. Typically this would
- indicate that an update had been written to
- a manifest, but that the update had not
- necessarily been pushed out to the cluster.
-
- :return:
- """
- return self._last_promise()._state == _Promise.FINISHED
-
- @property
- def is_errored(self) -> bool:
- """
- Has the completion failed. Default implementation looks for
- self.exception. Can be overwritten.
- """
- return self.exception is not None
-
- @property
- def needs_result(self) -> bool:
- """
- Could the external operation be deemed as complete,
- or should we wait?
- We must wait for a read operation only if it is not complete.
- """
- return not self.is_errored and not self.has_result
-
- @property
- def is_finished(self) -> bool:
- """
- Could the external operation be deemed as complete,
- or should we wait?
- We must wait for a read operation only if it is not complete.
- """
- return self.is_errored or (self.has_result)
-
- def pretty_print(self):
-
- reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
- return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-
-
-def pretty_print(completions: Sequence[Completion]) -> str:
- return ', '.join(c.pretty_print() for c in completions)
-
-
-def raise_if_exception(c: Completion) -> None:
+def raise_if_exception(c: OrchResult[T]) -> T:
"""
- :raises OrchestratorError: Some user error or a config error.
- :raises Exception: Some internal error
+ Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
"""
if c.serialized_exception is not None:
try:
e = pickle.loads(c.serialized_exception)
except (KeyError, AttributeError):
- raise Exception('{}: {}'.format(type(c.exception), c.exception))
+ raise Exception(c.exception_str)
raise e
+ assert c.result is not None, 'OrchResult should either have an exception or a result'
+ return c.result
-class TrivialReadCompletion(Completion[T]):
- """
- This is the trivial completion simply wrapping a result.
- """
-
- def __init__(self, result: T):
- super(TrivialReadCompletion, self).__init__()
- if result:
- self.finalize(result)
-
-
-def _hide_in_features(f):
- f._hide_in_features = True
+def _hide_in_features(f: FuncT) -> FuncT:
+ f._hide_in_features = True # type: ignore
return f
"""
@_hide_in_features
- def is_orchestrator_module(self):
+ def is_orchestrator_module(self) -> bool:
"""
Enable other modules to interrogate this module to discover
whether it's usable as an orchestrator module.
return True
@_hide_in_features
- def available(self) -> Tuple[bool, str]:
+ def available(self) -> Tuple[bool, str, Dict[str, Any]]:
"""
Report whether we can talk to the orchestrator. This is the
place to give the user a meaningful message if the orchestrator
... if OrchestratorClientMixin().available()[0]: # wrong.
... OrchestratorClientMixin().get_hosts()
- :return: two-tuple of boolean, string
+ :return: boolean representing whether the module is available/usable
+ :return: string describing any error
+ :return: dict containing any module specific information
"""
raise NotImplementedError()
@_hide_in_features
- def process(self, completions: List[Completion]) -> None:
- """
- Given a list of Completion instances, process any which are
- incomplete.
-
- Callers should inspect the detail of each completion to identify
- partial completion/progress information, and present that information
- to the user.
-
- This method should not block, as this would make it slow to query
- a status, while other long running operations are in progress.
- """
- raise NotImplementedError()
-
- @_hide_in_features
- def get_feature_set(self):
+ def get_feature_set(self) -> Dict[str, dict]:
"""Describes which methods this orchestrator implements
.. note::
def resume(self) -> None:
raise NotImplementedError()
- def add_host(self, host_spec: HostSpec) -> Completion[str]:
+ def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
"""
Add a host to the orchestrator inventory.
"""
raise NotImplementedError()
- def remove_host(self, host: str) -> Completion[str]:
+ def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
"""
Remove a host from the orchestrator inventory.
"""
raise NotImplementedError()
- def update_host_addr(self, host: str, addr: str) -> Completion[str]:
+ def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+ """
+ drain all daemons from a host
+
+ :param hostname: hostname
+ """
+ raise NotImplementedError()
+
+ def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
"""
Update a host's address
"""
raise NotImplementedError()
- def get_hosts(self) -> Completion[List[HostSpec]]:
+ def get_hosts(self) -> OrchResult[List[HostSpec]]:
"""
Report the hosts in the cluster.
"""
raise NotImplementedError()
- def add_host_label(self, host: str, label: str) -> Completion[str]:
+ def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+ """
+ Return hosts metadata(gather_facts).
+ """
+ raise NotImplementedError()
+
+ def add_host_label(self, host: str, label: str) -> OrchResult[str]:
"""
Add a host label
"""
raise NotImplementedError()
- def remove_host_label(self, host: str, label: str) -> Completion[str]:
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
"""
Remove a host label
"""
raise NotImplementedError()
- def host_ok_to_stop(self, hostname: str) -> Completion:
+ def host_ok_to_stop(self, hostname: str) -> OrchResult:
"""
Check if the specified host can be safely stopped without reducing availability
"""
raise NotImplementedError()
- def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
+ def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
+ """
+ Place a host in maintenance, stopping daemons and disabling it's systemd target
+ """
+ raise NotImplementedError()
+
+ def exit_host_maintenance(self, hostname: str) -> OrchResult:
+ """
+ Return a host from maintenance, restarting the clusters systemd target
+ """
+ raise NotImplementedError()
+
+ def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
"""
Returns something that was created by `ceph-volume inventory`.
"""
raise NotImplementedError()
- def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
+ def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
the orchestrator. For example, when viewing an OSD in the dashboard
"""
raise NotImplementedError()
- def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
+ def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
"""
Describe a daemon (of any kind) that is already configured in
the orchestrator.
"""
raise NotImplementedError()
- def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
+ @handle_orch_error
+ def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
"""
Applies any spec
"""
- fns: Dict[str, function] = {
+ fns: Dict[str, Callable[..., OrchResult[str]]] = {
'alertmanager': self.apply_alertmanager,
'crash': self.apply_crash,
'grafana': self.apply_grafana,
'mon': self.apply_mon,
'nfs': self.apply_nfs,
'node-exporter': self.apply_node_exporter,
- 'osd': lambda dg: self.apply_drivegroups([dg]),
+ 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore
'prometheus': self.apply_prometheus,
+ 'loki': self.apply_loki,
+ 'promtail': self.apply_promtail,
'rbd-mirror': self.apply_rbd_mirror,
'rgw': self.apply_rgw,
+ 'ingress': self.apply_ingress,
+ 'snmp-gateway': self.apply_snmp_gateway,
'host': self.add_host,
}
- def merge(ls, r):
- if isinstance(ls, list):
- return ls + [r]
- return [ls, r]
-
- spec, *specs = specs
+ def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741
+ l_res = raise_if_exception(l)
+ r_res = raise_if_exception(r)
+ l_res.append(r_res)
+ return OrchResult(l_res)
+ return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
- fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
- completion = fn(spec)
- for s in specs:
- def next(ls):
- fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
- return fn(s).then(lambda r: merge(ls, r))
- completion = completion.then(next)
- return completion
-
- def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
+ def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
"""
Plan (Dry-run, Preview) a List of Specs.
"""
raise NotImplementedError()
- def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
+ def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
"""
Remove specific daemon(s).
"""
raise NotImplementedError()
- def remove_service(self, service_name: str) -> Completion[str]:
+ def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
"""
Remove a service (a collection of daemons).
"""
raise NotImplementedError()
- def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
+ def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
"""
Perform an action (start/stop/reload) on a service (i.e., all daemons
providing the logical service).
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
:param service_name: service_type + '.' + service_id
(e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
- :rtype: Completion
+ :rtype: OrchResult
"""
# assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
+ def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
"""
Perform an action (start/stop/reload) on a daemon.
:param action: one of "start", "stop", "restart", "redeploy", "reconfig"
:param daemon_name: name of daemon
:param image: Container image when redeploying that daemon
- :rtype: Completion
+ :rtype: OrchResult
"""
# assert action in ["start", "stop", "reload, "restart", "redeploy"]
raise NotImplementedError()
- def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
+ def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
"""
Create one or more OSDs within a single Drive Group.
"""
raise NotImplementedError()
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
+ def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
""" Update OSD cluster """
raise NotImplementedError()
def set_unmanaged_flag(self,
unmanaged_flag: bool,
service_type: str = 'osd',
- service_name=None
+ service_name: Optional[str] = None
) -> HandleCommandResult:
raise NotImplementedError()
def preview_osdspecs(self,
osdspec_name: Optional[str] = 'osd',
osdspecs: Optional[List[DriveGroupSpec]] = None
- ) -> Completion[str]:
+ ) -> OrchResult[str]:
""" Get a preview for OSD deployments """
raise NotImplementedError()
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
- force: bool = False) -> Completion[str]:
+ force: bool = False,
+ zap: bool = False) -> OrchResult[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
:param force: Forces the OSD removal process without waiting for the data to be drained first.
- Note that this can only remove OSDs that were successfully
- created (i.e. got an OSD ID).
+ :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+
+ .. note:: this can only remove OSDs that were successfully
+ created (i.e. got an OSD ID).
"""
raise NotImplementedError()
- def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+ def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
"""
TODO
"""
raise NotImplementedError()
- def remove_osds_status(self) -> Completion:
+ def remove_osds_status(self) -> OrchResult:
"""
Returns a status of the ongoing OSD removal operations.
"""
raise NotImplementedError()
- def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
+ def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
"""
Instructs the orchestrator to enable or disable either the ident or the fault LED.
"""
raise NotImplementedError()
- def zap_device(self, host: str, path: str) -> Completion[str]:
+ def zap_device(self, host: str, path: str) -> OrchResult[str]:
"""Zap/Erase a device (DESTROYS DATA)"""
raise NotImplementedError()
- def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create mon daemon(s)"""
+ def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
+ """Create daemons daemon(s) for unmanaged services"""
raise NotImplementedError()
- def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update mon cluster"""
raise NotImplementedError()
- def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create mgr daemon(s)"""
- raise NotImplementedError()
-
- def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update mgr cluster"""
raise NotImplementedError()
- def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create MDS daemon(s)"""
- raise NotImplementedError()
-
- def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
"""Update MDS cluster"""
raise NotImplementedError()
- def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
- """Create RGW daemon(s)"""
- raise NotImplementedError()
-
- def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
+ def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
"""Update RGW cluster"""
raise NotImplementedError()
- def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create rbd-mirror daemon(s)"""
+ def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
+ """Update ingress daemons"""
raise NotImplementedError()
- def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update rbd-mirror cluster"""
raise NotImplementedError()
- def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
- """Create NFS daemon(s)"""
- raise NotImplementedError()
-
- def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
+ def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
"""Update NFS cluster"""
raise NotImplementedError()
- def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
- """Create iscsi daemon(s)"""
- raise NotImplementedError()
-
- def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
+ def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
"""Update iscsi cluster"""
raise NotImplementedError()
- def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create new prometheus daemon"""
- raise NotImplementedError()
-
- def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update prometheus cluster"""
raise NotImplementedError()
- def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create a new Node-Exporter service"""
+ def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
- def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
- """Update existing a Node-Exporter daemon(s)"""
+ def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Loki daemon(s)"""
raise NotImplementedError()
- def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create a new crash service"""
+ def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a Promtail daemon(s)"""
raise NotImplementedError()
- def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
+ def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a crash daemon(s)"""
raise NotImplementedError()
- def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create a new Node-Exporter service"""
+ def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update existing a grafana service"""
raise NotImplementedError()
- def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
- """Update existing a Node-Exporter daemon(s)"""
+ def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
+ """Update an existing AlertManager daemon(s)"""
raise NotImplementedError()
- def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
- """Create a new AlertManager service"""
+ def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+ """Update an existing snmp gateway service"""
raise NotImplementedError()
- def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
- """Update an existing AlertManager daemon(s)"""
+ def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+ def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
raise NotImplementedError()
- def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+ def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+ hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_pause(self) -> Completion[str]:
+ def upgrade_pause(self) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_resume(self) -> Completion[str]:
+ def upgrade_resume(self) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_stop(self) -> Completion[str]:
+ def upgrade_stop(self) -> OrchResult[str]:
raise NotImplementedError()
- def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
+ def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
"""
If an upgrade is currently underway, report on where
we are in the process, or if some error has occurred.
raise NotImplementedError()
@_hide_in_features
- def upgrade_available(self) -> Completion:
+ def upgrade_available(self) -> OrchResult:
"""
Report on what versions are available to upgrade to
return ServiceSpec.from_json(spec)
+def daemon_type_to_service(dtype: str) -> str:
+ mapping = {
+ 'mon': 'mon',
+ 'mgr': 'mgr',
+ 'mds': 'mds',
+ 'rgw': 'rgw',
+ 'osd': 'osd',
+ 'haproxy': 'ingress',
+ 'keepalived': 'ingress',
+ 'iscsi': 'iscsi',
+ 'rbd-mirror': 'rbd-mirror',
+ 'cephfs-mirror': 'cephfs-mirror',
+ 'nfs': 'nfs',
+ 'grafana': 'grafana',
+ 'alertmanager': 'alertmanager',
+ 'prometheus': 'prometheus',
+ 'node-exporter': 'node-exporter',
+ 'loki': 'loki',
+ 'promtail': 'promtail',
+ 'crash': 'crash',
+ 'crashcollector': 'crash', # Specific Rook Daemon
+ 'container': 'container',
+ 'agent': 'agent',
+ 'snmp-gateway': 'snmp-gateway',
+ }
+ return mapping[dtype]
+
+
+def service_to_daemon_types(stype: str) -> List[str]:
+ mapping = {
+ 'mon': ['mon'],
+ 'mgr': ['mgr'],
+ 'mds': ['mds'],
+ 'rgw': ['rgw'],
+ 'osd': ['osd'],
+ 'ingress': ['haproxy', 'keepalived'],
+ 'iscsi': ['iscsi'],
+ 'rbd-mirror': ['rbd-mirror'],
+ 'cephfs-mirror': ['cephfs-mirror'],
+ 'nfs': ['nfs'],
+ 'grafana': ['grafana'],
+ 'alertmanager': ['alertmanager'],
+ 'prometheus': ['prometheus'],
+ 'loki': ['loki'],
+ 'promtail': ['promtail'],
+ 'node-exporter': ['node-exporter'],
+ 'crash': ['crash'],
+ 'container': ['container'],
+ 'agent': ['agent'],
+ 'snmp-gateway': ['snmp-gateway'],
+ }
+ return mapping[stype]
+
+
+KNOWN_DAEMON_TYPES: List[str] = list(
+ sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
class UpgradeStatusSpec(object):
# Orchestrator's report on what's going on with any ongoing upgrade
- def __init__(self):
+ def __init__(self) -> None:
self.in_progress = False # Is an upgrade underway?
- self.target_image = None
- self.services_complete = [] # Which daemon types are fully updated?
+ self.target_image: Optional[str] = None
+ self.services_complete: List[str] = [] # Which daemon types are fully updated?
+ self.which: str = '<unknown>' # for if user specified daemon types, services or hosts
+ self.progress: Optional[str] = None # How many of the daemons have we upgraded
self.message = "" # Freeform description
-def handle_type_error(method):
+def handle_type_error(method: FuncT) -> FuncT:
@wraps(method)
- def inner(cls, *args, **kwargs):
+ def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
try:
return method(cls, *args, **kwargs)
except TypeError as e:
error_msg = '{}: {}'.format(cls.__name__, e)
raise OrchestratorValidationError(error_msg)
- return inner
+ return cast(FuncT, inner)
+
+
+class DaemonDescriptionStatus(enum.IntEnum):
+ unknown = -2
+ error = -1
+ stopped = 0
+ running = 1
+ starting = 2 #: Daemon is deployed, but not yet running
+
+ @staticmethod
+ def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+ if status is None:
+ status = DaemonDescriptionStatus.unknown
+ return {
+ DaemonDescriptionStatus.unknown: 'unknown',
+ DaemonDescriptionStatus.error: 'error',
+ DaemonDescriptionStatus.stopped: 'stopped',
+ DaemonDescriptionStatus.running: 'running',
+ DaemonDescriptionStatus.starting: 'starting',
+ }.get(status, '<unknown>')
class DaemonDescription(object):
"""
def __init__(self,
- daemon_type=None,
- daemon_id=None,
- hostname=None,
- container_id=None,
- container_image_id=None,
- container_image_name=None,
- version=None,
- status=None,
- status_desc=None,
- last_refresh=None,
- created=None,
- started=None,
- last_configured=None,
- osdspec_affinity=None,
- last_deployed=None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ hostname: Optional[str] = None,
+ container_id: Optional[str] = None,
+ container_image_id: Optional[str] = None,
+ container_image_name: Optional[str] = None,
+ container_image_digests: Optional[List[str]] = None,
+ version: Optional[str] = None,
+ status: Optional[DaemonDescriptionStatus] = None,
+ status_desc: Optional[str] = None,
+ last_refresh: Optional[datetime.datetime] = None,
+ created: Optional[datetime.datetime] = None,
+ started: Optional[datetime.datetime] = None,
+ last_configured: Optional[datetime.datetime] = None,
+ osdspec_affinity: Optional[str] = None,
+ last_deployed: Optional[datetime.datetime] = None,
events: Optional[List['OrchestratorEvent']] = None,
- is_active: bool = False):
-
- # Host is at the same granularity as InventoryHost
- self.hostname: str = hostname
+ is_active: bool = False,
+ memory_usage: Optional[int] = None,
+ memory_request: Optional[int] = None,
+ memory_limit: Optional[int] = None,
+ cpu_percentage: Optional[str] = None,
+ service_name: Optional[str] = None,
+ ports: Optional[List[int]] = None,
+ ip: Optional[str] = None,
+ deployed_by: Optional[List[str]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None,
+ ) -> None:
+
+ #: Host is at the same granularity as InventoryHost
+ self.hostname: Optional[str] = hostname
# Not everyone runs in containers, but enough people do to
# justify having the container_id (runtime id) and container_image
# (image name)
self.container_id = container_id # runtime id
- self.container_image_id = container_image_id # image hash
+ self.container_image_id = container_image_id # image id locally
self.container_image_name = container_image_name # image friendly name
+ self.container_image_digests = container_image_digests # reg hashes
- # The type of service (osd, mon, mgr, etc.)
+ #: The type of service (osd, mon, mgr, etc.)
self.daemon_type = daemon_type
- # The orchestrator will have picked some names for daemons,
- # typically either based on hostnames or on pod names.
- # This is the <foo> in mds.<foo>, the ID that will appear
- # in the FSMap/ServiceMap.
- self.daemon_id: str = daemon_id
+ #: The orchestrator will have picked some names for daemons,
+ #: typically either based on hostnames or on pod names.
+ #: This is the <foo> in mds.<foo>, the ID that will appear
+ #: in the FSMap/ServiceMap.
+ self.daemon_id: Optional[str] = daemon_id
+ self.daemon_name = self.name()
+
+ #: Some daemon types have a numeric rank assigned
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
- # Service version that was deployed
+ self._service_name: Optional[str] = service_name
+
+ #: Service version that was deployed
self.version = version
- # Service status: -1 error, 0 stopped, 1 running
- self.status = status
+ # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+ self._status = status
- # Service status description when status == -1.
+ #: Service status description when status == error.
self.status_desc = status_desc
- # datetime when this info was last refreshed
+ #: datetime when this info was last refreshed
self.last_refresh: Optional[datetime.datetime] = last_refresh
self.created: Optional[datetime.datetime] = created
self.last_configured: Optional[datetime.datetime] = last_configured
self.last_deployed: Optional[datetime.datetime] = last_deployed
- # Affinity to a certain OSDSpec
+ #: Affinity to a certain OSDSpec
self.osdspec_affinity: Optional[str] = osdspec_affinity
self.events: List[OrchestratorEvent] = events or []
+ self.memory_usage: Optional[int] = memory_usage
+ self.memory_request: Optional[int] = memory_request
+ self.memory_limit: Optional[int] = memory_limit
+
+ self.cpu_percentage: Optional[str] = cpu_percentage
+
+ self.ports: Optional[List[int]] = ports
+ self.ip: Optional[str] = ip
+
+ self.deployed_by = deployed_by
+
self.is_active = is_active
- def name(self):
+ self.extra_container_args = extra_container_args
+
+ @property
+ def status(self) -> Optional[DaemonDescriptionStatus]:
+ return self._status
+
+ @status.setter
+ def status(self, new: DaemonDescriptionStatus) -> None:
+ self._status = new
+ self.status_desc = DaemonDescriptionStatus.to_str(new)
+
+ def get_port_summary(self) -> str:
+ if not self.ports:
+ return ''
+ return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
+
+ def name(self) -> str:
return '%s.%s' % (self.daemon_type, self.daemon_id)
def matches_service(self, service_name: Optional[str]) -> bool:
+ assert self.daemon_id is not None
+ assert self.daemon_type is not None
if service_name:
- return self.name().startswith(service_name + '.')
+ return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
return False
- def service_id(self):
- if self.daemon_type == 'osd' and self.osdspec_affinity:
- return self.osdspec_affinity
+ def service_id(self) -> str:
+ assert self.daemon_id is not None
+ assert self.daemon_type is not None
+
+ if self._service_name:
+ if '.' in self._service_name:
+ return self._service_name.split('.', 1)[1]
+ else:
+ return ''
+
+ if self.daemon_type == 'osd':
+ if self.osdspec_affinity and self.osdspec_affinity != 'None':
+ return self.osdspec_affinity
+ return ''
- def _match():
+ def _match() -> str:
+ assert self.daemon_id is not None
err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
# daemon_id == "service_id"
return self.daemon_id
- if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
+ if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
return _match()
return self.daemon_id
- def service_name(self):
- if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
- return f'{self.daemon_type}.{self.service_id()}'
- return self.daemon_type
+ def service_name(self) -> str:
+ if self._service_name:
+ return self._service_name
+ assert self.daemon_type is not None
+ if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
+ return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
+ return daemon_type_to_service(self.daemon_type)
- def __repr__(self):
+ def __repr__(self) -> str:
return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
id=self.daemon_id)
- def to_json(self):
- out = OrderedDict()
+ def __str__(self) -> str:
+ return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
+ def to_json(self) -> dict:
+ out: Dict[str, Any] = OrderedDict()
out['daemon_type'] = self.daemon_type
out['daemon_id'] = self.daemon_id
+ out['service_name'] = self._service_name
+ out['daemon_name'] = self.name()
out['hostname'] = self.hostname
out['container_id'] = self.container_id
out['container_image_id'] = self.container_image_id
out['container_image_name'] = self.container_image_name
+ out['container_image_digests'] = self.container_image_digests
+ out['memory_usage'] = self.memory_usage
+ out['memory_request'] = self.memory_request
+ out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
out['version'] = self.version
- out['status'] = self.status
+ out['status'] = self.status.value if self.status is not None else None
out['status_desc'] = self.status_desc
if self.daemon_type == 'osd':
out['osdspec_affinity'] = self.osdspec_affinity
out['is_active'] = self.is_active
+ out['ports'] = self.ports
+ out['ip'] = self.ip
+ out['rank'] = self.rank
+ out['rank_generation'] = self.rank_generation
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
if getattr(self, k):
- out[k] = getattr(self, k).strftime(DATEFMT)
+ out[k] = datetime_to_str(getattr(self, k))
if self.events:
out['events'] = [e.to_json() for e in self.events]
del out[e]
return out
+ def to_dict(self) -> dict:
+ out: Dict[str, Any] = OrderedDict()
+ out['daemon_type'] = self.daemon_type
+ out['daemon_id'] = self.daemon_id
+ out['daemon_name'] = self.name()
+ out['hostname'] = self.hostname
+ out['container_id'] = self.container_id
+ out['container_image_id'] = self.container_image_id
+ out['container_image_name'] = self.container_image_name
+ out['container_image_digests'] = self.container_image_digests
+ out['memory_usage'] = self.memory_usage
+ out['memory_request'] = self.memory_request
+ out['memory_limit'] = self.memory_limit
+ out['cpu_percentage'] = self.cpu_percentage
+ out['version'] = self.version
+ out['status'] = self.status.value if self.status is not None else None
+ out['status_desc'] = self.status_desc
+ if self.daemon_type == 'osd':
+ out['osdspec_affinity'] = self.osdspec_affinity
+ out['is_active'] = self.is_active
+ out['ports'] = self.ports
+ out['ip'] = self.ip
+
+ for k in ['last_refresh', 'created', 'started', 'last_deployed',
+ 'last_configured']:
+ if getattr(self, k):
+ out[k] = datetime_to_str(getattr(self, k))
+
+ if self.events:
+ out['events'] = [e.to_dict() for e in self.events]
+
+ empty = [k for k, v in out.items() if v is None]
+ for e in empty:
+ del out[e]
+ return out
+
@classmethod
@handle_type_error
- def from_json(cls, data):
+ def from_json(cls, data: dict) -> 'DaemonDescription':
c = data.copy()
event_strs = c.pop('events', [])
for k in ['last_refresh', 'created', 'started', 'last_deployed',
'last_configured']:
if k in c:
- c[k] = datetime.datetime.strptime(c[k], DATEFMT)
+ c[k] = str_to_datetime(c[k])
events = [OrchestratorEvent.from_json(e) for e in event_strs]
- return cls(events=events, **c)
+ status_int = c.pop('status', None)
+ if 'daemon_name' in c:
+ del c['daemon_name']
+ if 'service_name' in c and c['service_name'].startswith('osd.'):
+ # if the service_name is a osd.NNN (numeric osd id) then
+ # ignore it -- it is not a valid service_name and
+ # (presumably) came from an older version of cephadm.
+ try:
+ int(c['service_name'][4:])
+ del c['service_name']
+ except ValueError:
+ pass
+ status = DaemonDescriptionStatus(status_int) if status_int is not None else None
+ return cls(events=events, status=status, **c)
- def __copy__(self):
+ def __copy__(self) -> 'DaemonDescription':
# feel free to change this:
return DaemonDescription.from_json(self.to_json())
@staticmethod
- def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
- return dumper.represent_dict(data.to_json().items())
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
def __init__(self,
spec: ServiceSpec,
- container_image_id=None,
- container_image_name=None,
- rados_config_location=None,
- service_url=None,
- last_refresh=None,
- created=None,
- size=0,
- running=0,
- events: Optional[List['OrchestratorEvent']] = None):
+ container_image_id: Optional[str] = None,
+ container_image_name: Optional[str] = None,
+ service_url: Optional[str] = None,
+ last_refresh: Optional[datetime.datetime] = None,
+ created: Optional[datetime.datetime] = None,
+ deleted: Optional[datetime.datetime] = None,
+ size: int = 0,
+ running: int = 0,
+ events: Optional[List['OrchestratorEvent']] = None,
+ virtual_ip: Optional[str] = None,
+ ports: List[int] = []) -> None:
# Not everyone runs in containers, but enough people do to
# justify having the container_image_id (image hash) and container_image
# (image name)
self.container_image_id = container_image_id # image hash
self.container_image_name = container_image_name # image friendly name
- # Location of the service configuration when stored in rados
- # object. Format: "rados://<pool>/[<namespace/>]<object>"
- self.rados_config_location = rados_config_location
-
# If the service exposes REST-like API, this attribute should hold
# the URL.
self.service_url = service_url
# datetime when this info was last refreshed
self.last_refresh: Optional[datetime.datetime] = last_refresh
self.created: Optional[datetime.datetime] = created
+ self.deleted: Optional[datetime.datetime] = deleted
self.spec: ServiceSpec = spec
self.events: List[OrchestratorEvent] = events or []
- def service_type(self):
+ self.virtual_ip = virtual_ip
+ self.ports = ports
+
+ def service_type(self) -> str:
return self.spec.service_type
- def __repr__(self):
+ def __repr__(self) -> str:
return f"<ServiceDescription of {self.spec.one_line_str()}>"
+ def get_port_summary(self) -> str:
+ if not self.ports:
+ return ''
+ return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
+
def to_json(self) -> OrderedDict:
out = self.spec.to_json()
status = {
'container_image_id': self.container_image_id,
'container_image_name': self.container_image_name,
- 'rados_config_location': self.rados_config_location,
'service_url': self.service_url,
'size': self.size,
'running': self.running,
'last_refresh': self.last_refresh,
'created': self.created,
+ 'virtual_ip': self.virtual_ip,
+ 'ports': self.ports if self.ports else None,
}
for k in ['last_refresh', 'created']:
if getattr(self, k):
- status[k] = getattr(self, k).strftime(DATEFMT)
+ status[k] = datetime_to_str(getattr(self, k))
status = {k: v for (k, v) in status.items() if v is not None}
out['status'] = status
if self.events:
out['events'] = [e.to_json() for e in self.events]
return out
+ def to_dict(self) -> OrderedDict:
+ out = self.spec.to_json()
+ status = {
+ 'container_image_id': self.container_image_id,
+ 'container_image_name': self.container_image_name,
+ 'service_url': self.service_url,
+ 'size': self.size,
+ 'running': self.running,
+ 'last_refresh': self.last_refresh,
+ 'created': self.created,
+ 'virtual_ip': self.virtual_ip,
+ 'ports': self.ports if self.ports else None,
+ }
+ for k in ['last_refresh', 'created']:
+ if getattr(self, k):
+ status[k] = datetime_to_str(getattr(self, k))
+ status = {k: v for (k, v) in status.items() if v is not None}
+ out['status'] = status
+ if self.events:
+ out['events'] = [e.to_dict() for e in self.events]
+ return out
+
@classmethod
@handle_type_error
- def from_json(cls, data: dict):
+ def from_json(cls, data: dict) -> 'ServiceDescription':
c = data.copy()
status = c.pop('status', {})
event_strs = c.pop('events', [])
c_status = status.copy()
for k in ['last_refresh', 'created']:
if k in c_status:
- c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
+ c_status[k] = str_to_datetime(c_status[k])
events = [OrchestratorEvent.from_json(e) for e in event_strs]
return cls(spec=spec, events=events, **c_status)
@staticmethod
- def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
- return dumper.represent_dict(data.to_json().items())
+ def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+ return dumper.represent_dict(cast(Mapping, data.to_json().items()))
yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
When fetching inventory, use this filter to avoid unnecessarily
scanning the whole estate.
- Typical use: filter by host when presenting UI workflow for configuring
- a particular server.
- filter by label when not all of estate is Ceph servers,
- and we want to only learn about the Ceph servers.
- filter by label when we are interested particularly
- in e.g. OSD servers.
+ Typical use:
+ filter by host when presentig UI workflow for configuring
+ a particular server.
+ filter by label when not all of estate is Ceph servers,
+ and we want to only learn about the Ceph servers.
+ filter by label when we are interested particularly
+ in e.g. OSD servers.
"""
def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
self.devices = devices
self.labels = labels
- def to_json(self):
+ def to_json(self) -> dict:
return {
'name': self.name,
'addr': self.addr,
}
@classmethod
- def from_json(cls, data):
+ def from_json(cls, data: dict) -> 'InventoryHost':
try:
_data = copy.deepcopy(data)
name = _data.pop('name')
raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
@classmethod
- def from_nested_items(cls, hosts):
+ def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
devs = inventory.Devices.from_json
return [cls(item[0], devs(item[1].data)) for item in hosts]
- def __repr__(self):
+ def __repr__(self) -> str:
return "<InventoryHost>({name})".format(name=self.name)
@staticmethod
def get_host_names(hosts: List['InventoryHost']) -> List[str]:
return [host.name for host in hosts]
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
return self.name == other.name and self.devices == other.devices
ERROR = 'ERROR'
regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
- def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
+ def __init__(self, created: Union[str, datetime.datetime], kind: str,
+ subject: str, level: str, message: str) -> None:
if isinstance(created, str):
- created = datetime.datetime.strptime(created, DATEFMT)
+ created = str_to_datetime(created)
self.created: datetime.datetime = created
assert kind in "service daemon".split()
def to_json(self) -> str:
# Make a long list of events readable.
- created = self.created.strftime(DATEFMT)
+ created = datetime_to_str(self.created)
return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
+ def to_dict(self) -> dict:
+ # Convert events data to dict.
+ return {
+ 'created': datetime_to_str(self.created),
+ 'subject': self.kind_subject(),
+ 'level': self.level,
+ 'message': self.message
+ }
+
@classmethod
@handle_type_error
- def from_json(cls, data) -> "OrchestratorEvent":
+ def from_json(cls, data: str) -> "OrchestratorEvent":
"""
>>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
- '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+ '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
:param data:
:return:
return cls(*match.groups())
raise ValueError(f'Unable to match: "{data}"')
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if not isinstance(other, OrchestratorEvent):
return False
return self.created == other.created and self.kind == other.kind \
and self.subject == other.subject and self.message == other.message
+ def __repr__(self) -> str:
+ return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
-def _mk_orch_methods(cls):
+def _mk_orch_methods(cls: Any) -> Any:
# Needs to be defined outside of for.
# Otherwise meth is always bound to last key
- def shim(method_name):
- def inner(self, *args, **kwargs):
+ def shim(method_name: str) -> Callable:
+ def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
completion = self._oremote(method_name, args, kwargs)
return completion
return inner
- for meth in Orchestrator.__dict__:
- if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
- setattr(cls, meth, shim(meth))
+ for name, method in Orchestrator.__dict__.items():
+ if not name.startswith('_') and name not in ['is_orchestrator_module']:
+ remote_call = update_wrapper(shim(name), method)
+ setattr(cls, name, remote_call)
return cls
>>> class MyModule(OrchestratorClientMixin):
... def func(self):
... completion = self.add_host('somehost') # calls `_oremote()`
- ... self._orchestrator_wait([completion])
... self.log.debug(completion.result)
.. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties
- def __get_mgr(self):
+ def __get_mgr(self) -> Any:
try:
return self.__mgr
except AttributeError:
return self
- def _oremote(self, meth, args, kwargs):
+ def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
"""
Helper for invoking `remote` on whichever orchestrator is enabled
if meth not in f_set or not f_set[meth]['available']:
raise NotImplementedError(f'{o} does not implement {meth}') from e
raise
-
- def _orchestrator_wait(self, completions: List[Completion]) -> None:
- """
- Wait for completions to complete (reads) or
- become persistent (writes).
-
- Waits for writes to be *persistent* but not *effective*.
-
- :param completions: List of Completions
- :raises NoOrchestrator:
- :raises RuntimeError: something went wrong while calling the process method.
- :raises ImportError: no `orchestrator` module or backend not found.
- """
- while any(not c.has_result for c in completions):
- self.process(completions)
- self.__get_mgr().log.info("Operations pending: %s",
- sum(1 for c in completions if not c.has_result))
- if any(c.needs_result for c in completions):
- time.sleep(1)
- else:
- break