X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fpybind%2Fmgr%2Forchestrator%2F_interface.py;h=b0ccf73570b7772f4d2957efea31d50e69c6886d;hb=33c7a0ef2143973309014ab28861a6fa401a5aa5;hp=a27c000f99b5b362c47c63f94d6100305943becd;hpb=e306af509c4d4816a1f73b17a825ea5186fa0030;p=ceph.git diff --git a/ceph/src/pybind/mgr/orchestrator/_interface.py b/ceph/src/pybind/mgr/orchestrator/_interface.py index a27c000f9..b0ccf7357 100644 --- a/ceph/src/pybind/mgr/orchestrator/_interface.py +++ b/ceph/src/pybind/mgr/orchestrator/_interface.py @@ -7,32 +7,42 @@ Please see the ceph-mgr module developer's guide for more information. import copy import datetime +import enum import errno import logging import pickle import re -import time -import uuid -from collections import namedtuple -from functools import wraps +from collections import namedtuple, OrderedDict +from contextlib import contextmanager +from functools import wraps, reduce, update_wrapper + +from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \ + Sequence, Dict, cast, Mapping + +try: + from typing import Protocol # Protocol was added in Python 3.8 +except ImportError: + class Protocol: # type: ignore + pass + + +import yaml from ceph.deployment import inventory from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \ - ServiceSpecValidationError, IscsiServiceSpec + IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec from ceph.deployment.drive_group import DriveGroupSpec +from ceph.deployment.hostspec import HostSpec, SpecValidationError +from ceph.utils import datetime_to_str, str_to_datetime from mgr_module import MgrModule, CLICommand, HandleCommandResult -try: - from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \ - Type, Sequence, Dict, cast -except ImportError: - pass logger = logging.getLogger(__name__) -DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' +T = TypeVar('T') +FuncT = TypeVar('FuncT', bound=Callable[..., Any]) class OrchestratorError(Exception): @@ -44,13 +54,23 @@ class OrchestratorError(Exception): It's not intended for programming errors or orchestrator internal errors. """ + def __init__(self, + msg: str, + errno: int = -errno.EINVAL, + event_kind_subject: Optional[Tuple[str, str]] = None) -> None: + super(Exception, self).__init__(msg) + self.errno = errno + # See OrchestratorEvent.subject + self.event_subject = event_kind_subject + class NoOrchestrator(OrchestratorError): """ No orchestrator in configured. """ - def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"): - super(NoOrchestrator, self).__init__(msg) + + def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None: + super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT) class OrchestratorValidationError(OrchestratorError): @@ -59,30 +79,68 @@ class OrchestratorValidationError(OrchestratorError): """ -def handle_exception(prefix, cmd_args, desc, perm, func): +@contextmanager +def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]: + try: + yield + except OrchestratorError as e: + if overwrite or hasattr(e, 'event_subject'): + e.event_subject = (kind, subject) + raise + + +def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT: @wraps(func) - def wrapper(*args, **kwargs): + def wrapper(*args: Any, **kwargs: Any) -> Any: try: return func(*args, **kwargs) - except (OrchestratorError, ImportError, ServiceSpecValidationError) as e: + except (OrchestratorError, SpecValidationError) as e: # Do not print Traceback for expected errors. + return HandleCommandResult(e.errno, stderr=str(e)) + except ImportError as e: return HandleCommandResult(-errno.ENOENT, stderr=str(e)) except NotImplementedError: msg = 'This Orchestrator does not support `{}`'.format(prefix) return HandleCommandResult(-errno.ENOENT, stderr=msg) - # misuse partial to copy `wrapper` - wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) + # misuse lambda to copy `wrapper` + wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs) # noqa: E731 wrapper_copy._prefix = prefix # type: ignore - wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm) # type: ignore + wrapper_copy._cli_command = CLICommand(prefix, perm) # type: ignore + wrapper_copy._cli_command.store_func_metadata(func) # type: ignore wrapper_copy._cli_command.func = wrapper_copy # type: ignore - return wrapper_copy + return cast(FuncT, wrapper_copy) + + +def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']: + """ + Decorator to make Orchestrator methods return + an OrchResult. + """ + + @wraps(f) + def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]: + try: + return OrchResult(f(*args, **kwargs)) + except Exception as e: + logger.exception(e) + import os + if 'UNITTEST' in os.environ: + raise # This makes debugging of Tracebacks from unittests a bit easier + return OrchResult(None, exception=e) + return cast(Callable[..., OrchResult[T]], wrapper) -def _cli_command(perm): - def inner_cli_command(prefix, cmd_args="", desc=""): - return lambda func: handle_exception(prefix, cmd_args, desc, perm, func) + +class InnerCliCommandCallable(Protocol): + def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]: + ... + + +def _cli_command(perm: str) -> InnerCliCommandCallable: + def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]: + return lambda func: handle_exception(prefix, perm, func) return inner_cli_command @@ -97,16 +155,16 @@ class CLICommandMeta(type): We make use of CLICommand, except for the use of the global variable. """ - def __init__(cls, name, bases, dct): + def __init__(cls, name: str, bases: Any, dct: Any) -> None: super(CLICommandMeta, cls).__init__(name, bases, dct) - dispatch = {} # type: Dict[str, CLICommand] + dispatch: Dict[str, CLICommand] = {} for v in dct.values(): try: dispatch[v._prefix] = v._cli_command except AttributeError: pass - def handle_command(self, inbuf, cmd): + def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any: if cmd['prefix'] not in dispatch: return self.handle_command(inbuf, cmd) @@ -116,58 +174,30 @@ class CLICommandMeta(type): cls.handle_command = handle_command -def _no_result(): - return object() - - -class _Promise(object): +class OrchResult(Generic[T]): """ - A completion may need multiple promises to be fulfilled. `_Promise` is one - step. - - Typically ``Orchestrator`` implementations inherit from this class to - build their own way of finishing a step to fulfil a future. - - They are not exposed in the orchestrator interface and can be seen as a - helper to build orchestrator modules. + Stores a result and an exception. Mainly to circumvent the + MgrModule.remote() method that hides all exceptions and for + handling different sub-interpreters. """ - INITIALIZED = 1 # We have a parent completion and a next completion - RUNNING = 2 - FINISHED = 3 # we have a final result - - NO_RESULT = _no_result() # type: None - ASYNC_RESULT = object() - - def __init__(self, - _first_promise=None, # type: Optional["_Promise"] - value=NO_RESULT, # type: Optional[Any] - on_complete=None, # type: Optional[Callable] - name=None, # type: Optional[str] - ): - self._on_complete_ = on_complete - self._name = name - self._next_promise = None # type: Optional[_Promise] - self._state = self.INITIALIZED - self._exception = None # type: Optional[Exception] + def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None: + self.result = result + self.serialized_exception: Optional[bytes] = None + self.exception_str: str = '' + self.set_exception(exception) - # Value of this _Promise. may be an intermediate result. - self._value = value + __slots__ = 'result', 'serialized_exception', 'exception_str' - # _Promise is not a continuation monad, as `_result` is of type - # T instead of (T -> r) -> r. Therefore we need to store the first promise here. - self._first_promise = _first_promise or self # type: '_Promise' + def set_exception(self, e: Optional[Exception]) -> None: + if e is None: + self.serialized_exception = None + self.exception_str = '' + return - @property - def _exception(self): - # type: () -> Optional[Exception] - return getattr(self, '_exception_', None) - - @_exception.setter - def _exception(self, e): - self._exception_ = e + self.exception_str = f'{type(e)}: {str(e)}' try: - self._serialized_exception_ = pickle.dumps(e) if e is not None else None + self.serialized_exception = pickle.dumps(e) except pickle.PicklingError: logger.error(f"failed to pickle {e}") if isinstance(e, Exception): @@ -175,376 +205,9 @@ class _Promise(object): else: e = Exception(str(e)) # degenerate to a plain Exception - self._serialized_exception_ = pickle.dumps(e) - - @property - def _serialized_exception(self): - # type: () -> Optional[bytes] - return getattr(self, '_serialized_exception_', None) - - - - @property - def _on_complete(self): - # type: () -> Optional[Callable] - # https://github.com/python/mypy/issues/4125 - return self._on_complete_ - - @_on_complete.setter - def _on_complete(self, val): - # type: (Optional[Callable]) -> None - self._on_complete_ = val - - - def __repr__(self): - name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None' - val = repr(self._value) if self._value is not self.NO_RESULT else 'NA' - return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format( - self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise) - ) - - def pretty_print_1(self): - if self._name: - name = self._name - elif self._on_complete is None: - name = 'lambda x: x' - elif hasattr(self._on_complete, '__name__'): - name = getattr(self._on_complete, '__name__') - else: - name = self._on_complete.__class__.__name__ - val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...' - prefix = { - self.INITIALIZED: ' ', - self.RUNNING: ' >>>', - self.FINISHED: '(done)' - }[self._state] - return '{} {}({}),'.format(prefix, name, val) - - def then(self, on_complete): - # type: (Any, Callable) -> Any - """ - Call ``on_complete`` as soon as this promise is finalized. - """ - assert self._state in (self.INITIALIZED, self.RUNNING) - - if self._next_promise is not None: - return self._next_promise.then(on_complete) - - if self._on_complete is not None: - self._set_next_promise(self.__class__( - _first_promise=self._first_promise, - on_complete=on_complete - )) - return self._next_promise - - else: - self._on_complete = on_complete - self._set_next_promise(self.__class__(_first_promise=self._first_promise)) - return self._next_promise - - def _set_next_promise(self, next): - # type: (_Promise) -> None - assert self is not next - assert self._state in (self.INITIALIZED, self.RUNNING) - - self._next_promise = next - assert self._next_promise is not None - for p in iter(self._next_promise): - p._first_promise = self._first_promise - - def _finalize(self, value=NO_RESULT): - """ - Sets this promise to complete. - - Orchestrators may choose to use this helper function. - - :param value: new value. - """ - if self._state not in (self.INITIALIZED, self.RUNNING): - raise ValueError('finalize: {} already finished. {}'.format(repr(self), value)) - - self._state = self.RUNNING - - if value is not self.NO_RESULT: - self._value = value - assert self._value is not self.NO_RESULT, repr(self) - - if self._on_complete: - try: - next_result = self._on_complete(self._value) - except Exception as e: - self.fail(e) - return - else: - next_result = self._value - - if isinstance(next_result, _Promise): - # hack: _Promise is not a continuation monad. - next_result = next_result._first_promise # type: ignore - assert next_result not in self, repr(self._first_promise) + repr(next_result) - assert self not in next_result - next_result._append_promise(self._next_promise) - self._set_next_promise(next_result) - assert self._next_promise - if self._next_promise._value is self.NO_RESULT: - self._next_promise._value = self._value - self.propagate_to_next() - elif next_result is not self.ASYNC_RESULT: - # simple map. simply forward - if self._next_promise: - self._next_promise._value = next_result - else: - # Hack: next_result is of type U, _value is of type T - self._value = next_result # type: ignore - self.propagate_to_next() - else: - # asynchronous promise - pass - - - def propagate_to_next(self): - self._state = self.FINISHED - logger.debug('finalized {}'.format(repr(self))) - if self._next_promise: - self._next_promise._finalize() - - def fail(self, e): - # type: (Exception) -> None - """ - Sets the whole completion to be faild with this exception and end the - evaluation. - """ - if self._state == self.FINISHED: - raise ValueError( - 'Invalid State: called fail, but Completion is already finished: {}'.format(str(e))) - assert self._state in (self.INITIALIZED, self.RUNNING) - logger.exception('_Promise failed') - self._exception = e - self._value = f'_exception: {e}' - if self._next_promise: - self._next_promise.fail(e) - self._state = self.FINISHED - - def __contains__(self, item): - return any(item is p for p in iter(self._first_promise)) - - def __iter__(self): - yield self - elem = self._next_promise - while elem is not None: - yield elem - elem = elem._next_promise - - def _append_promise(self, other): - if other is not None: - assert self not in other - assert other not in self - self._last_promise()._set_next_promise(other) - - def _last_promise(self): - # type: () -> _Promise - return list(iter(self))[-1] - - -class ProgressReference(object): - def __init__(self, - message, # type: str - mgr, - completion=None # type: Optional[Callable[[], Completion]] - ): - """ - ProgressReference can be used within Completions:: - - +---------------+ +---------------------------------+ - | | then | | - | My Completion | +--> | on_complete=ProgressReference() | - | | | | - +---------------+ +---------------------------------+ - - See :func:`Completion.with_progress` for an easy way to create - a progress reference - - """ - super(ProgressReference, self).__init__() - self.progress_id = str(uuid.uuid4()) - self.message = message - self.mgr = mgr - - #: The completion can already have a result, before the write - #: operation is effective. progress == 1 means, the services are - #: created / removed. - self.completion = completion # type: Optional[Callable[[], Completion]] - - #: if a orchestrator module can provide a more detailed - #: progress information, it needs to also call ``progress.update()``. - self.progress = 0.0 - - self._completion_has_result = False - self.mgr.all_progress_references.append(self) - - def __str__(self): - """ - ``__str__()`` is used for determining the message for progress events. - """ - return self.message or super(ProgressReference, self).__str__() - - def __call__(self, arg): - self._completion_has_result = True - self.progress = 1.0 - return arg - - @property - def progress(self): - return self._progress - - @progress.setter - def progress(self, progress): - assert progress <= 1.0 - self._progress = progress - try: - if self.effective: - self.mgr.remote("progress", "complete", self.progress_id) - self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self] - else: - self.mgr.remote("progress", "update", self.progress_id, self.message, - progress, - [("origin", "orchestrator")]) - except ImportError: - # If the progress module is disabled that's fine, - # they just won't see the output. - pass - - @property - def effective(self): - return self.progress == 1 and self._completion_has_result - - def update(self): - def progress_run(progress): - self.progress = progress - if self.completion: - c = self.completion().then(progress_run) - self.mgr.process([c._first_promise]) - else: - self.progress = 1 - - def fail(self): - self._completion_has_result = True - self.progress = 1 - - -class Completion(_Promise): - """ - Combines multiple promises into one overall operation. - - Completions are composable by being able to - call one completion from another completion. I.e. making them re-usable - using Promises E.g.:: - - >>> #doctest: +SKIP - ... return Orchestrator().get_hosts().then(self._create_osd) - - where ``get_hosts`` returns a Completion of list of hosts and - ``_create_osd`` takes a list of hosts. - - The concept behind this is to store the computation steps - explicit and then explicitly evaluate the chain: - - >>> #doctest: +SKIP - ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x)) - ... p.finalize(2) - ... assert p.result = "4" - - or graphically:: - - +---------------+ +-----------------+ - | | then | | - | lambda x: x*x | +--> | lambda x: str(x)| - | | | | - +---------------+ +-----------------+ - - """ - def __init__(self, - _first_promise=None, # type: Optional["Completion"] - value=_Promise.NO_RESULT, # type: Any - on_complete=None, # type: Optional[Callable] - name=None, # type: Optional[str] - ): - super(Completion, self).__init__(_first_promise, value, on_complete, name) - - @property - def _progress_reference(self): - # type: () -> Optional[ProgressReference] - if hasattr(self._on_complete, 'progress_id'): - return self._on_complete # type: ignore - return None + self.serialized_exception = pickle.dumps(e) - @property - def progress_reference(self): - # type: () -> Optional[ProgressReference] - """ - ProgressReference. Marks this completion - as a write completeion. - """ - - references = [c._progress_reference for c in iter(self) if c._progress_reference is not None] - if references: - assert len(references) == 1 - return references[0] - return None - - @classmethod - def with_progress(cls, # type: Any - message, # type: str - mgr, - _first_promise=None, # type: Optional["Completion"] - value=_Promise.NO_RESULT, # type: Any - on_complete=None, # type: Optional[Callable] - calc_percent=None # type: Optional[Callable[[], Any]] - ): - # type: (...) -> Any - - c = cls( - _first_promise=_first_promise, - value=value, - on_complete=on_complete - ).add_progress(message, mgr, calc_percent) - - return c._first_promise - - def add_progress(self, - message, # type: str - mgr, - calc_percent=None # type: Optional[Callable[[], Any]] - ): - return self.then( - on_complete=ProgressReference( - message=message, - mgr=mgr, - completion=calc_percent - ) - ) - - def fail(self, e): - super(Completion, self).fail(e) - if self._progress_reference: - self._progress_reference.fail() - - def finalize(self, result=_Promise.NO_RESULT): - if self._first_promise._state == self.INITIALIZED: - self._first_promise._finalize(result) - - @property - def result(self): - """ - The result of the operation that we were waited - for. Only valid after calling Orchestrator.process() on this - completion. - """ - last = self._last_promise() - assert last._state == _Promise.FINISHED - return last._value - - def result_str(self): + def result_str(self) -> str: """Force a string.""" if self.result is None: return '' @@ -552,99 +215,23 @@ class Completion(_Promise): return '\n'.join(str(x) for x in self.result) return str(self.result) - @property - def exception(self): - # type: () -> Optional[Exception] - return self._last_promise()._exception - - @property - def serialized_exception(self): - # type: () -> Optional[bytes] - return self._last_promise()._serialized_exception - - @property - def has_result(self): - # type: () -> bool - """ - Has the operation already a result? - - For Write operations, it can already have a - result, if the orchestrator's configuration is - persistently written. Typically this would - indicate that an update had been written to - a manifest, but that the update had not - necessarily been pushed out to the cluster. - - :return: - """ - return self._last_promise()._state == _Promise.FINISHED - - @property - def is_errored(self): - # type: () -> bool - """ - Has the completion failed. Default implementation looks for - self.exception. Can be overwritten. - """ - return self.exception is not None - - @property - def needs_result(self): - # type: () -> bool - """ - Could the external operation be deemed as complete, - or should we wait? - We must wait for a read operation only if it is not complete. - """ - return not self.is_errored and not self.has_result - - @property - def is_finished(self): - # type: () -> bool - """ - Could the external operation be deemed as complete, - or should we wait? - We must wait for a read operation only if it is not complete. - """ - return self.is_errored or (self.has_result) - - def pretty_print(self): - - reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise)) - return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs) - - -def pretty_print(completions): - # type: (Sequence[Completion]) -> str - return ', '.join(c.pretty_print() for c in completions) - -def raise_if_exception(c): - # type: (Completion) -> None +def raise_if_exception(c: OrchResult[T]) -> T: """ - :raises OrchestratorError: Some user error or a config error. - :raises Exception: Some internal error + Due to different sub-interpreters, this MUST not be in the `OrchResult` class. """ if c.serialized_exception is not None: try: e = pickle.loads(c.serialized_exception) except (KeyError, AttributeError): - raise Exception('{}: {}'.format(type(c.exception), c.exception)) + raise Exception(c.exception_str) raise e + assert c.result is not None, 'OrchResult should either have an exception or a result' + return c.result -class TrivialReadCompletion(Completion): - """ - This is the trivial completion simply wrapping a result. - """ - def __init__(self, result): - super(TrivialReadCompletion, self).__init__() - if result: - self.finalize(result) - - -def _hide_in_features(f): - f._hide_in_features = True +def _hide_in_features(f: FuncT) -> FuncT: + f._hide_in_features = True # type: ignore return f @@ -672,7 +259,7 @@ class Orchestrator(object): """ @_hide_in_features - def is_orchestrator_module(self): + def is_orchestrator_module(self) -> bool: """ Enable other modules to interrogate this module to discover whether it's usable as an orchestrator module. @@ -682,8 +269,7 @@ class Orchestrator(object): return True @_hide_in_features - def available(self): - # type: () -> Tuple[bool, str] + def available(self) -> Tuple[bool, str, Dict[str, Any]]: """ Report whether we can talk to the orchestrator. This is the place to give the user a meaningful message if the orchestrator @@ -704,28 +290,14 @@ class Orchestrator(object): ... if OrchestratorClientMixin().available()[0]: # wrong. ... OrchestratorClientMixin().get_hosts() - :return: two-tuple of boolean, string + :return: boolean representing whether the module is available/usable + :return: string describing any error + :return: dict containing any module specific information """ raise NotImplementedError() @_hide_in_features - def process(self, completions): - # type: (List[Completion]) -> None - """ - Given a list of Completion instances, process any which are - incomplete. - - Callers should inspect the detail of each completion to identify - partial completion/progress information, and present that information - to the user. - - This method should not block, as this would make it slow to query - a status, while other long running operations are in progress. - """ - raise NotImplementedError() - - @_hide_in_features - def get_feature_set(self): + def get_feature_set(self) -> Dict[str, dict]: """Describes which methods this orchestrator implements .. note:: @@ -755,23 +327,19 @@ class Orchestrator(object): } return features - def cancel_completions(self): - # type: () -> None + def cancel_completions(self) -> None: """ Cancels ongoing completions. Unstuck the mgr. """ raise NotImplementedError() - def pause(self): - # type: () -> None + def pause(self) -> None: raise NotImplementedError() - def resume(self): - # type: () -> None + def resume(self) -> None: raise NotImplementedError() - def add_host(self, host_spec): - # type: (HostSpec) -> Completion + def add_host(self, host_spec: HostSpec) -> OrchResult[str]: """ Add a host to the orchestrator inventory. @@ -779,8 +347,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def remove_host(self, host): - # type: (str) -> Completion + def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]: """ Remove a host from the orchestrator inventory. @@ -788,8 +355,15 @@ class Orchestrator(object): """ raise NotImplementedError() - def update_host_addr(self, host, addr): - # type: (str, str) -> Completion + def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]: + """ + drain all daemons from a host + + :param hostname: hostname + """ + raise NotImplementedError() + + def update_host_addr(self, host: str, addr: str) -> OrchResult[str]: """ Update a host's address @@ -798,8 +372,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def get_hosts(self): - # type: () -> Completion + def get_hosts(self) -> OrchResult[List[HostSpec]]: """ Report the hosts in the cluster. @@ -807,22 +380,45 @@ class Orchestrator(object): """ raise NotImplementedError() - def add_host_label(self, host, label): - # type: (str, str) -> Completion + def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]: + """ + Return hosts metadata(gather_facts). + """ + raise NotImplementedError() + + def add_host_label(self, host: str, label: str) -> OrchResult[str]: """ Add a host label """ raise NotImplementedError() - def remove_host_label(self, host, label): - # type: (str, str) -> Completion + def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: """ Remove a host label """ raise NotImplementedError() - def get_inventory(self, host_filter=None, refresh=False): - # type: (Optional[InventoryFilter], bool) -> Completion + def host_ok_to_stop(self, hostname: str) -> OrchResult: + """ + Check if the specified host can be safely stopped without reducing availability + + :param host: hostname + """ + raise NotImplementedError() + + def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult: + """ + Place a host in maintenance, stopping daemons and disabling it's systemd target + """ + raise NotImplementedError() + + def exit_host_maintenance(self, hostname: str) -> OrchResult: + """ + Return a host from maintenance, restarting the clusters systemd target + """ + raise NotImplementedError() + + def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]: """ Returns something that was created by `ceph-volume inventory`. @@ -830,8 +426,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def describe_service(self, service_type=None, service_name=None, refresh=False): - # type: (Optional[str], Optional[str], bool) -> Completion + def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]: """ Describe a service (of any kind) that is already configured in the orchestrator. For example, when viewing an OSD in the dashboard @@ -845,8 +440,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False): - # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion + def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]: """ Describe a daemon (of any kind) that is already configured in the orchestrator. @@ -855,11 +449,12 @@ class Orchestrator(object): """ raise NotImplementedError() - def apply(self, specs: List["GenericSpec"]) -> Completion: + @handle_orch_error + def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]: """ Applies any spec """ - fns: Dict[str, function] = { + fns: Dict[str, Callable[..., OrchResult[str]]] = { 'alertmanager': self.apply_alertmanager, 'crash': self.apply_crash, 'grafana': self.apply_grafana, @@ -869,31 +464,31 @@ class Orchestrator(object): 'mon': self.apply_mon, 'nfs': self.apply_nfs, 'node-exporter': self.apply_node_exporter, - 'osd': lambda dg: self.apply_drivegroups([dg]), + 'osd': lambda dg: self.apply_drivegroups([dg]), # type: ignore 'prometheus': self.apply_prometheus, + 'loki': self.apply_loki, + 'promtail': self.apply_promtail, 'rbd-mirror': self.apply_rbd_mirror, 'rgw': self.apply_rgw, + 'ingress': self.apply_ingress, + 'snmp-gateway': self.apply_snmp_gateway, 'host': self.add_host, } - def merge(ls, r): - if isinstance(ls, list): - return ls + [r] - return [ls, r] + def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]: # noqa: E741 + l_res = raise_if_exception(l) + r_res = raise_if_exception(r) + l_res.append(r_res) + return OrchResult(l_res) + return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([]))) - spec, *specs = specs - - fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type]) - completion = fn(spec) - for s in specs: - def next(ls): - fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type]) - return fn(s).then(lambda r: merge(ls, r)) - completion = completion.then(next) - return completion + def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]: + """ + Plan (Dry-run, Preview) a List of Specs. + """ + raise NotImplementedError() - def remove_daemons(self, names): - # type: (List[str]) -> Completion + def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]: """ Remove specific daemon(s). @@ -901,8 +496,7 @@ class Orchestrator(object): """ raise NotImplementedError() - def remove_service(self, service_name): - # type: (str) -> Completion + def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]: """ Remove a service (a collection of daemons). @@ -910,34 +504,32 @@ class Orchestrator(object): """ raise NotImplementedError() - def service_action(self, action, service_name): - # type: (str, str) -> Completion + def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]: """ Perform an action (start/stop/reload) on a service (i.e., all daemons providing the logical service). :param action: one of "start", "stop", "restart", "redeploy", "reconfig" - :param service_type: e.g. "mds", "rgw", ... - :param service_name: name of logical service ("cephfs", "us-east", ...) - :rtype: Completion + :param service_name: service_type + '.' + service_id + (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...) + :rtype: OrchResult """ - #assert action in ["start", "stop", "reload, "restart", "redeploy"] + # assert action in ["start", "stop", "reload, "restart", "redeploy"] raise NotImplementedError() - def daemon_action(self, action, daemon_type, daemon_id): - # type: (str, str, str) -> Completion + def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]: """ Perform an action (start/stop/reload) on a daemon. :param action: one of "start", "stop", "restart", "redeploy", "reconfig" - :param name: name of daemon - :rtype: Completion + :param daemon_name: name of daemon + :param image: Container image when redeploying that daemon + :rtype: OrchResult """ - #assert action in ["start", "stop", "reload, "restart", "redeploy"] + # assert action in ["start", "stop", "reload, "restart", "redeploy"] raise NotImplementedError() - def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> Completion + def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]: """ Create one or more OSDs within a single Drive Group. @@ -948,45 +540,53 @@ class Orchestrator(object): """ raise NotImplementedError() - def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion: + def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: """ Update OSD cluster """ raise NotImplementedError() def set_unmanaged_flag(self, unmanaged_flag: bool, service_type: str = 'osd', - service_name=None + service_name: Optional[str] = None ) -> HandleCommandResult: raise NotImplementedError() def preview_osdspecs(self, osdspec_name: Optional[str] = 'osd', osdspecs: Optional[List[DriveGroupSpec]] = None - ) -> Completion: + ) -> OrchResult[str]: """ Get a preview for OSD deployments """ raise NotImplementedError() def remove_osds(self, osd_ids: List[str], replace: bool = False, - force: bool = False) -> Completion: + force: bool = False, + zap: bool = False) -> OrchResult[str]: """ :param osd_ids: list of OSD IDs :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace` :param force: Forces the OSD removal process without waiting for the data to be drained first. - Note that this can only remove OSDs that were successfully - created (i.e. got an OSD ID). + :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA) + + + .. note:: this can only remove OSDs that were successfully + created (i.e. got an OSD ID). + """ + raise NotImplementedError() + + def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult: + """ + TODO """ raise NotImplementedError() - def remove_osds_status(self): - # type: () -> Completion + def remove_osds_status(self) -> OrchResult: """ Returns a status of the ongoing OSD removal operations. """ raise NotImplementedError() - def blink_device_light(self, ident_fault, on, locations): - # type: (str, bool, List[DeviceLightLoc]) -> Completion + def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]: """ Instructs the orchestrator to enable or disable either the ident or the fault LED. @@ -996,153 +596,98 @@ class Orchestrator(object): """ raise NotImplementedError() - def zap_device(self, host, path): - # type: (str, str) -> Completion + def zap_device(self, host: str, path: str) -> OrchResult[str]: """Zap/Erase a device (DESTROYS DATA)""" raise NotImplementedError() - def add_mon(self, spec): - # type: (ServiceSpec) -> Completion - """Create mon daemon(s)""" + def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]: + """Create daemons daemon(s) for unmanaged services""" raise NotImplementedError() - def apply_mon(self, spec): - # type: (ServiceSpec) -> Completion + def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]: """Update mon cluster""" raise NotImplementedError() - def add_mgr(self, spec): - # type: (ServiceSpec) -> Completion - """Create mgr daemon(s)""" - raise NotImplementedError() - - def apply_mgr(self, spec): - # type: (ServiceSpec) -> Completion + def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]: """Update mgr cluster""" raise NotImplementedError() - def add_mds(self, spec): - # type: (ServiceSpec) -> Completion - """Create MDS daemon(s)""" - raise NotImplementedError() - - def apply_mds(self, spec): - # type: (ServiceSpec) -> Completion + def apply_mds(self, spec: MDSSpec) -> OrchResult[str]: """Update MDS cluster""" raise NotImplementedError() - def add_rgw(self, spec): - # type: (RGWSpec) -> Completion - """Create RGW daemon(s)""" - raise NotImplementedError() - - def apply_rgw(self, spec): - # type: (RGWSpec) -> Completion + def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: """Update RGW cluster""" raise NotImplementedError() - def add_rbd_mirror(self, spec): - # type: (ServiceSpec) -> Completion - """Create rbd-mirror daemon(s)""" + def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]: + """Update ingress daemons""" raise NotImplementedError() - def apply_rbd_mirror(self, spec): - # type: (ServiceSpec) -> Completion + def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]: """Update rbd-mirror cluster""" raise NotImplementedError() - def add_nfs(self, spec): - # type: (NFSServiceSpec) -> Completion - """Create NFS daemon(s)""" - raise NotImplementedError() - - def apply_nfs(self, spec): - # type: (NFSServiceSpec) -> Completion + def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]: """Update NFS cluster""" raise NotImplementedError() - def add_iscsi(self, spec): - # type: (IscsiServiceSpec) -> Completion - """Create iscsi daemon(s)""" - raise NotImplementedError() - - def apply_iscsi(self, spec): - # type: (IscsiServiceSpec) -> Completion + def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]: """Update iscsi cluster""" raise NotImplementedError() - def add_prometheus(self, spec): - # type: (ServiceSpec) -> Completion - """Create new prometheus daemon""" - raise NotImplementedError() - - def apply_prometheus(self, spec): - # type: (ServiceSpec) -> Completion + def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]: """Update prometheus cluster""" raise NotImplementedError() - def add_node_exporter(self, spec): - # type: (ServiceSpec) -> Completion - """Create a new Node-Exporter service""" + def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Node-Exporter daemon(s)""" raise NotImplementedError() - def apply_node_exporter(self, spec): - # type: (ServiceSpec) -> Completion - """Update existing a Node-Exporter daemon(s)""" + def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Loki daemon(s)""" raise NotImplementedError() - def add_crash(self, spec): - # type: (ServiceSpec) -> Completion - """Create a new crash service""" + def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a Promtail daemon(s)""" raise NotImplementedError() - def apply_crash(self, spec): - # type: (ServiceSpec) -> Completion + def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]: """Update existing a crash daemon(s)""" raise NotImplementedError() - def add_grafana(self, spec): - # type: (ServiceSpec) -> Completion - """Create a new Node-Exporter service""" + def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]: + """Update existing a grafana service""" raise NotImplementedError() - def apply_grafana(self, spec): - # type: (ServiceSpec) -> Completion - """Update existing a Node-Exporter daemon(s)""" + def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]: + """Update an existing AlertManager daemon(s)""" raise NotImplementedError() - def add_alertmanager(self, spec): - # type: (ServiceSpec) -> Completion - """Create a new AlertManager service""" + def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]: + """Update an existing snmp gateway service""" raise NotImplementedError() - def apply_alertmanager(self, spec): - # type: (ServiceSpec) -> Completion - """Update an existing AlertManager daemon(s)""" + def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]: raise NotImplementedError() - def upgrade_check(self, image, version): - # type: (Optional[str], Optional[str]) -> Completion + def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]: raise NotImplementedError() - def upgrade_start(self, image, version): - # type: (Optional[str], Optional[str]) -> Completion + def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]], + hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]: raise NotImplementedError() - def upgrade_pause(self): - # type: () -> Completion + def upgrade_pause(self) -> OrchResult[str]: raise NotImplementedError() - def upgrade_resume(self): - # type: () -> Completion + def upgrade_resume(self) -> OrchResult[str]: raise NotImplementedError() - def upgrade_stop(self): - # type: () -> Completion + def upgrade_stop(self) -> OrchResult[str]: raise NotImplementedError() - def upgrade_status(self): - # type: () -> Completion + def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']: """ If an upgrade is currently underway, report on where we are in the process, or if some error has occurred. @@ -1152,8 +697,7 @@ class Orchestrator(object): raise NotImplementedError() @_hide_in_features - def upgrade_available(self): - # type: () -> Completion + def upgrade_available(self) -> OrchResult: """ Report on what versions are available to upgrade to @@ -1162,89 +706,114 @@ class Orchestrator(object): raise NotImplementedError() -class HostSpec(object): - """ - Information about hosts. Like e.g. ``kubectl get nodes`` - """ - def __init__(self, - hostname, # type: str - addr=None, # type: Optional[str] - labels=None, # type: Optional[List[str]] - status=None, # type: Optional[str] - ): - self.service_type = 'host' - - #: the bare hostname on the host. Not the FQDN. - self.hostname = hostname # type: str - - #: DNS name or IP address to reach it - self.addr = addr or hostname # type: str - - #: label(s), if any - self.labels = labels or [] # type: List[str] - - #: human readable status - self.status = status or '' # type: str - - def to_json(self): - return { - 'hostname': self.hostname, - 'addr': self.addr, - 'labels': self.labels, - 'status': self.status, - } - - @classmethod - def from_json(cls, host_spec): - _cls = cls(host_spec['hostname'], - host_spec['addr'] if 'addr' in host_spec else None, - host_spec['labels'] if 'labels' in host_spec else None) - return _cls - - def __repr__(self): - args = [self.hostname] # type: List[Any] - if self.addr is not None: - args.append(self.addr) - if self.labels: - args.append(self.labels) - if self.status: - args.append(self.status) - - return "({})".format(', '.join(map(repr, args))) - - def __eq__(self, other): - # Let's omit `status` for the moment, as it is still the very same host. - return self.hostname == other.hostname and \ - self.addr == other.addr and \ - self.labels == other.labels - GenericSpec = Union[ServiceSpec, HostSpec] -def json_to_generic_spec(spec): - # type: (dict) -> GenericSpec + +def json_to_generic_spec(spec: dict) -> GenericSpec: if 'service_type' in spec and spec['service_type'] == 'host': return HostSpec.from_json(spec) else: return ServiceSpec.from_json(spec) + +def daemon_type_to_service(dtype: str) -> str: + mapping = { + 'mon': 'mon', + 'mgr': 'mgr', + 'mds': 'mds', + 'rgw': 'rgw', + 'osd': 'osd', + 'haproxy': 'ingress', + 'keepalived': 'ingress', + 'iscsi': 'iscsi', + 'rbd-mirror': 'rbd-mirror', + 'cephfs-mirror': 'cephfs-mirror', + 'nfs': 'nfs', + 'grafana': 'grafana', + 'alertmanager': 'alertmanager', + 'prometheus': 'prometheus', + 'node-exporter': 'node-exporter', + 'loki': 'loki', + 'promtail': 'promtail', + 'crash': 'crash', + 'crashcollector': 'crash', # Specific Rook Daemon + 'container': 'container', + 'agent': 'agent', + 'snmp-gateway': 'snmp-gateway', + } + return mapping[dtype] + + +def service_to_daemon_types(stype: str) -> List[str]: + mapping = { + 'mon': ['mon'], + 'mgr': ['mgr'], + 'mds': ['mds'], + 'rgw': ['rgw'], + 'osd': ['osd'], + 'ingress': ['haproxy', 'keepalived'], + 'iscsi': ['iscsi'], + 'rbd-mirror': ['rbd-mirror'], + 'cephfs-mirror': ['cephfs-mirror'], + 'nfs': ['nfs'], + 'grafana': ['grafana'], + 'alertmanager': ['alertmanager'], + 'prometheus': ['prometheus'], + 'loki': ['loki'], + 'promtail': ['promtail'], + 'node-exporter': ['node-exporter'], + 'crash': ['crash'], + 'container': ['container'], + 'agent': ['agent'], + 'snmp-gateway': ['snmp-gateway'], + } + return mapping[stype] + + +KNOWN_DAEMON_TYPES: List[str] = list( + sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), [])) + + class UpgradeStatusSpec(object): # Orchestrator's report on what's going on with any ongoing upgrade - def __init__(self): + def __init__(self) -> None: self.in_progress = False # Is an upgrade underway? - self.target_image = None - self.services_complete = [] # Which daemon types are fully updated? + self.target_image: Optional[str] = None + self.services_complete: List[str] = [] # Which daemon types are fully updated? + self.which: str = '' # for if user specified daemon types, services or hosts + self.progress: Optional[str] = None # How many of the daemons have we upgraded self.message = "" # Freeform description -def handle_type_error(method): +def handle_type_error(method: FuncT) -> FuncT: @wraps(method) - def inner(cls, *args, **kwargs): + def inner(cls: Any, *args: Any, **kwargs: Any) -> Any: try: return method(cls, *args, **kwargs) except TypeError as e: error_msg = '{}: {}'.format(cls.__name__, e) raise OrchestratorValidationError(error_msg) - return inner + return cast(FuncT, inner) + + +class DaemonDescriptionStatus(enum.IntEnum): + unknown = -2 + error = -1 + stopped = 0 + running = 1 + starting = 2 #: Daemon is deployed, but not yet running + + @staticmethod + def to_str(status: Optional['DaemonDescriptionStatus']) -> str: + if status is None: + status = DaemonDescriptionStatus.unknown + return { + DaemonDescriptionStatus.unknown: 'unknown', + DaemonDescriptionStatus.error: 'error', + DaemonDescriptionStatus.stopped: 'stopped', + DaemonDescriptionStatus.running: 'running', + DaemonDescriptionStatus.starting: 'starting', + }.get(status, '') class DaemonDescription(object): @@ -1261,73 +830,144 @@ class DaemonDescription(object): """ def __init__(self, - daemon_type=None, - daemon_id=None, - hostname=None, - container_id=None, - container_image_id=None, - container_image_name=None, - version=None, - status=None, - status_desc=None, - last_refresh=None, - created=None, - started=None, - last_configured=None, - osdspec_affinity=None, - last_deployed=None): - # Host is at the same granularity as InventoryHost - self.hostname = hostname + daemon_type: Optional[str] = None, + daemon_id: Optional[str] = None, + hostname: Optional[str] = None, + container_id: Optional[str] = None, + container_image_id: Optional[str] = None, + container_image_name: Optional[str] = None, + container_image_digests: Optional[List[str]] = None, + version: Optional[str] = None, + status: Optional[DaemonDescriptionStatus] = None, + status_desc: Optional[str] = None, + last_refresh: Optional[datetime.datetime] = None, + created: Optional[datetime.datetime] = None, + started: Optional[datetime.datetime] = None, + last_configured: Optional[datetime.datetime] = None, + osdspec_affinity: Optional[str] = None, + last_deployed: Optional[datetime.datetime] = None, + events: Optional[List['OrchestratorEvent']] = None, + is_active: bool = False, + memory_usage: Optional[int] = None, + memory_request: Optional[int] = None, + memory_limit: Optional[int] = None, + cpu_percentage: Optional[str] = None, + service_name: Optional[str] = None, + ports: Optional[List[int]] = None, + ip: Optional[str] = None, + deployed_by: Optional[List[str]] = None, + rank: Optional[int] = None, + rank_generation: Optional[int] = None, + extra_container_args: Optional[List[str]] = None, + ) -> None: + + #: Host is at the same granularity as InventoryHost + self.hostname: Optional[str] = hostname # Not everyone runs in containers, but enough people do to # justify having the container_id (runtime id) and container_image # (image name) self.container_id = container_id # runtime id - self.container_image_id = container_image_id # image hash + self.container_image_id = container_image_id # image id locally self.container_image_name = container_image_name # image friendly name + self.container_image_digests = container_image_digests # reg hashes - # The type of service (osd, mon, mgr, etc.) + #: The type of service (osd, mon, mgr, etc.) self.daemon_type = daemon_type - # The orchestrator will have picked some names for daemons, - # typically either based on hostnames or on pod names. - # This is the in mds., the ID that will appear - # in the FSMap/ServiceMap. - self.daemon_id = daemon_id + #: The orchestrator will have picked some names for daemons, + #: typically either based on hostnames or on pod names. + #: This is the in mds., the ID that will appear + #: in the FSMap/ServiceMap. + self.daemon_id: Optional[str] = daemon_id + self.daemon_name = self.name() - # Service version that was deployed + #: Some daemon types have a numeric rank assigned + self.rank: Optional[int] = rank + self.rank_generation: Optional[int] = rank_generation + + self._service_name: Optional[str] = service_name + + #: Service version that was deployed self.version = version - # Service status: -1 error, 0 stopped, 1 running - self.status = status + # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting + self._status = status - # Service status description when status == -1. + #: Service status description when status == error. self.status_desc = status_desc - # datetime when this info was last refreshed - self.last_refresh = last_refresh # type: Optional[datetime.datetime] + #: datetime when this info was last refreshed + self.last_refresh: Optional[datetime.datetime] = last_refresh + + self.created: Optional[datetime.datetime] = created + self.started: Optional[datetime.datetime] = started + self.last_configured: Optional[datetime.datetime] = last_configured + self.last_deployed: Optional[datetime.datetime] = last_deployed + + #: Affinity to a certain OSDSpec + self.osdspec_affinity: Optional[str] = osdspec_affinity + + self.events: List[OrchestratorEvent] = events or [] + + self.memory_usage: Optional[int] = memory_usage + self.memory_request: Optional[int] = memory_request + self.memory_limit: Optional[int] = memory_limit + + self.cpu_percentage: Optional[str] = cpu_percentage + + self.ports: Optional[List[int]] = ports + self.ip: Optional[str] = ip - self.created = created # type: Optional[datetime.datetime] - self.started = started # type: Optional[datetime.datetime] - self.last_configured = last_configured # type: Optional[datetime.datetime] - self.last_deployed = last_deployed # type: Optional[datetime.datetime] + self.deployed_by = deployed_by - # Affinity to a certain OSDSpec - self.osdspec_affinity = osdspec_affinity # type: Optional[str] + self.is_active = is_active - def name(self): + self.extra_container_args = extra_container_args + + @property + def status(self) -> Optional[DaemonDescriptionStatus]: + return self._status + + @status.setter + def status(self, new: DaemonDescriptionStatus) -> None: + self._status = new + self.status_desc = DaemonDescriptionStatus.to_str(new) + + def get_port_summary(self) -> str: + if not self.ports: + return '' + return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}" + + def name(self) -> str: return '%s.%s' % (self.daemon_type, self.daemon_id) - def matches_service(self, service_name): - # type: (Optional[str]) -> bool + def matches_service(self, service_name: Optional[str]) -> bool: + assert self.daemon_id is not None + assert self.daemon_type is not None if service_name: - return self.name().startswith(service_name + '.') + return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.') return False - def service_id(self): - def _match(): - err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " \ - f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'") + def service_id(self) -> str: + assert self.daemon_id is not None + assert self.daemon_type is not None + + if self._service_name: + if '.' in self._service_name: + return self._service_name.split('.', 1)[1] + else: + return '' + + if self.daemon_type == 'osd': + if self.osdspec_affinity and self.osdspec_affinity != 'None': + return self.osdspec_affinity + return '' + + def _match() -> str: + assert self.daemon_id is not None + err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " + f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'") if not self.hostname: # TODO: can a DaemonDescription exist without a hostname? @@ -1358,55 +998,145 @@ class DaemonDescription(object): if len(v) in [3, 4]: return '.'.join(v[0:2]) + if self.daemon_type == 'iscsi': + v = self.daemon_id.split('.') + return '.'.join(v[0:-1]) + # daemon_id == "service_id" return self.daemon_id - if self.daemon_type in ['mds', 'nfs', 'iscsi', 'rgw']: + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: return _match() return self.daemon_id - def service_name(self): - if self.daemon_type in ['rgw', 'mds', 'nfs', 'iscsi']: - return f'{self.daemon_type}.{self.service_id()}' - return self.daemon_type + def service_name(self) -> str: + if self._service_name: + return self._service_name + assert self.daemon_type is not None + if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID: + return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}' + return daemon_type_to_service(self.daemon_type) - def __repr__(self): + def __repr__(self) -> str: return "({type}.{id})".format(type=self.daemon_type, id=self.daemon_id) - def to_json(self): - out = { - 'hostname': self.hostname, - 'container_id': self.container_id, - 'container_image_id': self.container_image_id, - 'container_image_name': self.container_image_name, - 'daemon_id': self.daemon_id, - 'daemon_type': self.daemon_type, - 'version': self.version, - 'status': self.status, - 'status_desc': self.status_desc, - } + def __str__(self) -> str: + return f"{self.name()} in status {self.status_desc} on {self.hostname}" + + def to_json(self) -> dict: + out: Dict[str, Any] = OrderedDict() + out['daemon_type'] = self.daemon_type + out['daemon_id'] = self.daemon_id + out['service_name'] = self._service_name + out['daemon_name'] = self.name() + out['hostname'] = self.hostname + out['container_id'] = self.container_id + out['container_image_id'] = self.container_image_id + out['container_image_name'] = self.container_image_name + out['container_image_digests'] = self.container_image_digests + out['memory_usage'] = self.memory_usage + out['memory_request'] = self.memory_request + out['memory_limit'] = self.memory_limit + out['cpu_percentage'] = self.cpu_percentage + out['version'] = self.version + out['status'] = self.status.value if self.status is not None else None + out['status_desc'] = self.status_desc + if self.daemon_type == 'osd': + out['osdspec_affinity'] = self.osdspec_affinity + out['is_active'] = self.is_active + out['ports'] = self.ports + out['ip'] = self.ip + out['rank'] = self.rank + out['rank_generation'] = self.rank_generation + + for k in ['last_refresh', 'created', 'started', 'last_deployed', + 'last_configured']: + if getattr(self, k): + out[k] = datetime_to_str(getattr(self, k)) + + if self.events: + out['events'] = [e.to_json() for e in self.events] + + empty = [k for k, v in out.items() if v is None] + for e in empty: + del out[e] + return out + + def to_dict(self) -> dict: + out: Dict[str, Any] = OrderedDict() + out['daemon_type'] = self.daemon_type + out['daemon_id'] = self.daemon_id + out['daemon_name'] = self.name() + out['hostname'] = self.hostname + out['container_id'] = self.container_id + out['container_image_id'] = self.container_image_id + out['container_image_name'] = self.container_image_name + out['container_image_digests'] = self.container_image_digests + out['memory_usage'] = self.memory_usage + out['memory_request'] = self.memory_request + out['memory_limit'] = self.memory_limit + out['cpu_percentage'] = self.cpu_percentage + out['version'] = self.version + out['status'] = self.status.value if self.status is not None else None + out['status_desc'] = self.status_desc + if self.daemon_type == 'osd': + out['osdspec_affinity'] = self.osdspec_affinity + out['is_active'] = self.is_active + out['ports'] = self.ports + out['ip'] = self.ip + for k in ['last_refresh', 'created', 'started', 'last_deployed', 'last_configured']: if getattr(self, k): - out[k] = getattr(self, k).strftime(DATEFMT) - return {k: v for (k, v) in out.items() if v is not None} + out[k] = datetime_to_str(getattr(self, k)) + + if self.events: + out['events'] = [e.to_dict() for e in self.events] + + empty = [k for k, v in out.items() if v is None] + for e in empty: + del out[e] + return out @classmethod @handle_type_error - def from_json(cls, data): + def from_json(cls, data: dict) -> 'DaemonDescription': c = data.copy() + event_strs = c.pop('events', []) for k in ['last_refresh', 'created', 'started', 'last_deployed', 'last_configured']: if k in c: - c[k] = datetime.datetime.strptime(c[k], DATEFMT) - return cls(**c) + c[k] = str_to_datetime(c[k]) + events = [OrchestratorEvent.from_json(e) for e in event_strs] + status_int = c.pop('status', None) + if 'daemon_name' in c: + del c['daemon_name'] + if 'service_name' in c and c['service_name'].startswith('osd.'): + # if the service_name is a osd.NNN (numeric osd id) then + # ignore it -- it is not a valid service_name and + # (presumably) came from an older version of cephadm. + try: + int(c['service_name'][4:]) + del c['service_name'] + except ValueError: + pass + status = DaemonDescriptionStatus(status_int) if status_int is not None else None + return cls(events=events, status=status, **c) - def __copy__(self): + def __copy__(self) -> 'DaemonDescription': # feel free to change this: return DaemonDescription.from_json(self.to_json()) + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer) + + class ServiceDescription(object): """ For responding to queries about the status of a particular service, @@ -1422,24 +1152,23 @@ class ServiceDescription(object): def __init__(self, spec: ServiceSpec, - container_image_id=None, - container_image_name=None, - rados_config_location=None, - service_url=None, - last_refresh=None, - created=None, - size=0, - running=0): + container_image_id: Optional[str] = None, + container_image_name: Optional[str] = None, + service_url: Optional[str] = None, + last_refresh: Optional[datetime.datetime] = None, + created: Optional[datetime.datetime] = None, + deleted: Optional[datetime.datetime] = None, + size: int = 0, + running: int = 0, + events: Optional[List['OrchestratorEvent']] = None, + virtual_ip: Optional[str] = None, + ports: List[int] = []) -> None: # Not everyone runs in containers, but enough people do to # justify having the container_image_id (image hash) and container_image # (image name) self.container_image_id = container_image_id # image hash self.container_image_name = container_image_name # image friendly name - # Location of the service configuration when stored in rados - # object. Format: "rados:///[]" - self.rados_config_location = rados_config_location - # If the service exposes REST-like API, this attribute should hold # the URL. self.service_url = service_url @@ -1451,48 +1180,93 @@ class ServiceDescription(object): self.running = running # datetime when this info was last refreshed - self.last_refresh = last_refresh # type: Optional[datetime.datetime] - self.created = created # type: Optional[datetime.datetime] + self.last_refresh: Optional[datetime.datetime] = last_refresh + self.created: Optional[datetime.datetime] = created + self.deleted: Optional[datetime.datetime] = deleted self.spec: ServiceSpec = spec - def service_type(self): + self.events: List[OrchestratorEvent] = events or [] + + self.virtual_ip = virtual_ip + self.ports = ports + + def service_type(self) -> str: return self.spec.service_type - def __repr__(self): + def __repr__(self) -> str: return f"" - def to_json(self): + def get_port_summary(self) -> str: + if not self.ports: + return '' + return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}" + + def to_json(self) -> OrderedDict: out = self.spec.to_json() status = { 'container_image_id': self.container_image_id, 'container_image_name': self.container_image_name, - 'rados_config_location': self.rados_config_location, 'service_url': self.service_url, 'size': self.size, 'running': self.running, 'last_refresh': self.last_refresh, - 'created': self.created + 'created': self.created, + 'virtual_ip': self.virtual_ip, + 'ports': self.ports if self.ports else None, } for k in ['last_refresh', 'created']: if getattr(self, k): - status[k] = getattr(self, k).strftime(DATEFMT) + status[k] = datetime_to_str(getattr(self, k)) status = {k: v for (k, v) in status.items() if v is not None} out['status'] = status + if self.events: + out['events'] = [e.to_json() for e in self.events] + return out + + def to_dict(self) -> OrderedDict: + out = self.spec.to_json() + status = { + 'container_image_id': self.container_image_id, + 'container_image_name': self.container_image_name, + 'service_url': self.service_url, + 'size': self.size, + 'running': self.running, + 'last_refresh': self.last_refresh, + 'created': self.created, + 'virtual_ip': self.virtual_ip, + 'ports': self.ports if self.ports else None, + } + for k in ['last_refresh', 'created']: + if getattr(self, k): + status[k] = datetime_to_str(getattr(self, k)) + status = {k: v for (k, v) in status.items() if v is not None} + out['status'] = status + if self.events: + out['events'] = [e.to_dict() for e in self.events] return out @classmethod @handle_type_error - def from_json(cls, data: dict): + def from_json(cls, data: dict) -> 'ServiceDescription': c = data.copy() status = c.pop('status', {}) + event_strs = c.pop('events', []) spec = ServiceSpec.from_json(c) c_status = status.copy() for k in ['last_refresh', 'created']: if k in c_status: - c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT) - return cls(spec=spec, **c_status) + c_status[k] = str_to_datetime(c_status[k]) + events = [OrchestratorEvent.from_json(e) for e in event_strs] + return cls(spec=spec, events=events, **c_status) + + @staticmethod + def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any: + return dumper.represent_dict(cast(Mapping, data.to_json().items())) + + +yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer) class InventoryFilter(object): @@ -1500,16 +1274,17 @@ class InventoryFilter(object): When fetching inventory, use this filter to avoid unnecessarily scanning the whole estate. - Typical use: filter by host when presenting UI workflow for configuring - a particular server. - filter by label when not all of estate is Ceph servers, - and we want to only learn about the Ceph servers. - filter by label when we are interested particularly - in e.g. OSD servers. + Typical use: + filter by host when presentig UI workflow for configuring + a particular server. + filter by label when not all of estate is Ceph servers, + and we want to only learn about the Ceph servers. + filter by label when we are interested particularly + in e.g. OSD servers. """ - def __init__(self, labels=None, hosts=None): - # type: (Optional[List[str]], Optional[List[str]]) -> None + + def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None: #: Optional: get info about hosts matching labels self.labels = labels @@ -1523,8 +1298,8 @@ class InventoryHost(object): When fetching inventory, all Devices are groups inside of an InventoryHost. """ - def __init__(self, name, devices=None, labels=None, addr=None): - # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None + + def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None: if devices is None: devices = inventory.Devices([]) if labels is None: @@ -1536,7 +1311,7 @@ class InventoryHost(object): self.devices = devices self.labels = labels - def to_json(self): + def to_json(self) -> dict: return { 'name': self.name, 'addr': self.addr, @@ -1545,7 +1320,7 @@ class InventoryHost(object): } @classmethod - def from_json(cls, data): + def from_json(cls, data: dict) -> 'InventoryHost': try: _data = copy.deepcopy(data) name = _data.pop('name') @@ -1562,21 +1337,19 @@ class InventoryHost(object): except TypeError as e: raise OrchestratorValidationError('Failed to read inventory: {}'.format(e)) - @classmethod - def from_nested_items(cls, hosts): + def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']: devs = inventory.Devices.from_json return [cls(item[0], devs(item[1].data)) for item in hosts] - def __repr__(self): + def __repr__(self) -> str: return "({name})".format(name=self.name) @staticmethod - def get_host_names(hosts): - # type: (List[InventoryHost]) -> List[str] + def get_host_names(hosts: List['InventoryHost']) -> List[str]: return [host.name for host in hosts] - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return self.name == other.name and self.devices == other.devices @@ -1593,18 +1366,92 @@ class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])): __slots__ = () -def _mk_orch_methods(cls): +class OrchestratorEvent: + """ + Similar to K8s Events. + + Some form of "important" log message attached to something. + """ + INFO = 'INFO' + ERROR = 'ERROR' + regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE) + + def __init__(self, created: Union[str, datetime.datetime], kind: str, + subject: str, level: str, message: str) -> None: + if isinstance(created, str): + created = str_to_datetime(created) + self.created: datetime.datetime = created + + assert kind in "service daemon".split() + self.kind: str = kind + + # service name, or daemon danem or something + self.subject: str = subject + + # Events are not meant for debugging. debugs should end in the log. + assert level in "INFO ERROR".split() + self.level = level + + self.message: str = message + + __slots__ = ('created', 'kind', 'subject', 'level', 'message') + + def kind_subject(self) -> str: + return f'{self.kind}:{self.subject}' + + def to_json(self) -> str: + # Make a long list of events readable. + created = datetime_to_str(self.created) + return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"' + + def to_dict(self) -> dict: + # Convert events data to dict. + return { + 'created': datetime_to_str(self.created), + 'subject': self.kind_subject(), + 'level': self.level, + 'message': self.message + } + + @classmethod + @handle_type_error + def from_json(cls, data: str) -> "OrchestratorEvent": + """ + >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json() + '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"' + + :param data: + :return: + """ + match = cls.regex_v1.match(data) + if match: + return cls(*match.groups()) + raise ValueError(f'Unable to match: "{data}"') + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, OrchestratorEvent): + return False + + return self.created == other.created and self.kind == other.kind \ + and self.subject == other.subject and self.message == other.message + + def __repr__(self) -> str: + return f'OrchestratorEvent.from_json({self.to_json()!r})' + + +def _mk_orch_methods(cls: Any) -> Any: # Needs to be defined outside of for. # Otherwise meth is always bound to last key - def shim(method_name): - def inner(self, *args, **kwargs): + def shim(method_name: str) -> Callable: + def inner(self: Any, *args: Any, **kwargs: Any) -> Any: completion = self._oremote(method_name, args, kwargs) return completion return inner - for meth in Orchestrator.__dict__: - if not meth.startswith('_') and meth not in ['is_orchestrator_module']: - setattr(cls, meth, shim(meth)) + for name, method in Orchestrator.__dict__.items(): + if not name.startswith('_') and name not in ['is_orchestrator_module']: + remote_call = update_wrapper(shim(name), method) + setattr(cls, name, remote_call) return cls @@ -1620,7 +1467,6 @@ class OrchestratorClientMixin(Orchestrator): >>> class MyModule(OrchestratorClientMixin): ... def func(self): ... completion = self.add_host('somehost') # calls `_oremote()` - ... self._orchestrator_wait([completion]) ... self.log.debug(completion.result) .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`. @@ -1636,21 +1482,20 @@ class OrchestratorClientMixin(Orchestrator): ... self.orch_client.set_mgr(self.mgr)) """ - def set_mgr(self, mgr): - # type: (MgrModule) -> None + def set_mgr(self, mgr: MgrModule) -> None: """ Useable in the Dashbord that uses a global ``mgr`` """ self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties - def __get_mgr(self): + def __get_mgr(self) -> Any: try: return self.__mgr except AttributeError: return self - def _oremote(self, meth, args, kwargs): + def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any: """ Helper for invoking `remote` on whichever orchestrator is enabled @@ -1678,25 +1523,3 @@ class OrchestratorClientMixin(Orchestrator): if meth not in f_set or not f_set[meth]['available']: raise NotImplementedError(f'{o} does not implement {meth}') from e raise - - def _orchestrator_wait(self, completions): - # type: (List[Completion]) -> None - """ - Wait for completions to complete (reads) or - become persistent (writes). - - Waits for writes to be *persistent* but not *effective*. - - :param completions: List of Completions - :raises NoOrchestrator: - :raises RuntimeError: something went wrong while calling the process method. - :raises ImportError: no `orchestrator` module or backend not found. - """ - while any(not c.has_result for c in completions): - self.process(completions) - self.__get_mgr().log.info("Operations pending: %s", - sum(1 for c in completions if not c.has_result)) - if any(c.needs_result for c in completions): - time.sleep(1) - else: - break