]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
index a27c000f99b5b362c47c63f94d6100305943becd..b0ccf73570b7772f4d2957efea31d50e69c6886d 100644 (file)
@@ -7,32 +7,42 @@ Please see the ceph-mgr module developer's guide for more information.
 
 import copy
 import datetime
+import enum
 import errno
 import logging
 import pickle
 import re
-import time
-import uuid
 
-from collections import namedtuple
-from functools import wraps
+from collections import namedtuple, OrderedDict
+from contextlib import contextmanager
+from functools import wraps, reduce, update_wrapper
+
+from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
+    Sequence, Dict, cast, Mapping
+
+try:
+    from typing import Protocol  # Protocol was added in Python 3.8
+except ImportError:
+    class Protocol:  # type: ignore
+        pass
+
+
+import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
-    ServiceSpecValidationError, IscsiServiceSpec
+    IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
 from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
+from ceph.utils import datetime_to_str, str_to_datetime
 
 from mgr_module import MgrModule, CLICommand, HandleCommandResult
 
-try:
-    from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
-    Type, Sequence, Dict, cast
-except ImportError:
-    pass
 
 logger = logging.getLogger(__name__)
 
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
 
 
 class OrchestratorError(Exception):
@@ -44,13 +54,23 @@ class OrchestratorError(Exception):
     It's not intended for programming errors or orchestrator internal errors.
     """
 
+    def __init__(self,
+                 msg: str,
+                 errno: int = -errno.EINVAL,
+                 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
+        super(Exception, self).__init__(msg)
+        self.errno = errno
+        # See OrchestratorEvent.subject
+        self.event_subject = event_kind_subject
+
 
 class NoOrchestrator(OrchestratorError):
     """
     No orchestrator in configured.
     """
-    def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
-        super(NoOrchestrator, self).__init__(msg)
+
+    def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
+        super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
 
 
 class OrchestratorValidationError(OrchestratorError):
@@ -59,30 +79,68 @@ class OrchestratorValidationError(OrchestratorError):
     """
 
 
-def handle_exception(prefix, cmd_args, desc, perm, func):
+@contextmanager
+def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
+    try:
+        yield
+    except OrchestratorError as e:
+        if overwrite or hasattr(e, 'event_subject'):
+            e.event_subject = (kind, subject)
+        raise
+
+
+def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
     @wraps(func)
-    def wrapper(*args, **kwargs):
+    def wrapper(*args: Any, **kwargs: Any) -> Any:
         try:
             return func(*args, **kwargs)
-        except (OrchestratorError, ImportError, ServiceSpecValidationError) as e:
+        except (OrchestratorError, SpecValidationError) as e:
             # Do not print Traceback for expected errors.
+            return HandleCommandResult(e.errno, stderr=str(e))
+        except ImportError as e:
             return HandleCommandResult(-errno.ENOENT, stderr=str(e))
         except NotImplementedError:
             msg = 'This Orchestrator does not support `{}`'.format(prefix)
             return HandleCommandResult(-errno.ENOENT, stderr=msg)
 
-    # misuse partial to copy `wrapper`
-    wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
+    # misuse lambda to copy `wrapper`
+    wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)  # noqa: E731
     wrapper_copy._prefix = prefix  # type: ignore
-    wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm)  # type: ignore
+    wrapper_copy._cli_command = CLICommand(prefix, perm)  # type: ignore
+    wrapper_copy._cli_command.store_func_metadata(func)  # type: ignore
     wrapper_copy._cli_command.func = wrapper_copy  # type: ignore
 
-    return wrapper_copy
+    return cast(FuncT, wrapper_copy)
+
+
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+    """
+    Decorator to make Orchestrator methods return
+    an OrchResult.
+    """
+
+    @wraps(f)
+    def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+        try:
+            return OrchResult(f(*args, **kwargs))
+        except Exception as e:
+            logger.exception(e)
+            import os
+            if 'UNITTEST' in os.environ:
+                raise  # This makes debugging of Tracebacks from unittests a bit easier
+            return OrchResult(None, exception=e)
 
+    return cast(Callable[..., OrchResult[T]], wrapper)
 
-def _cli_command(perm):
-    def inner_cli_command(prefix, cmd_args="", desc=""):
-        return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
+
+class InnerCliCommandCallable(Protocol):
+    def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
+        ...
+
+
+def _cli_command(perm: str) -> InnerCliCommandCallable:
+    def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
+        return lambda func: handle_exception(prefix, perm, func)
     return inner_cli_command
 
 
@@ -97,16 +155,16 @@ class CLICommandMeta(type):
 
     We make use of CLICommand, except for the use of the global variable.
     """
-    def __init__(cls, name, bases, dct):
+    def __init__(cls, name: str, bases: Any, dct: Any) -> None:
         super(CLICommandMeta, cls).__init__(name, bases, dct)
-        dispatch = {}  # type: Dict[str, CLICommand]
+        dispatch: Dict[str, CLICommand] = {}
         for v in dct.values():
             try:
                 dispatch[v._prefix] = v._cli_command
             except AttributeError:
                 pass
 
-        def handle_command(self, inbuf, cmd):
+        def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
             if cmd['prefix'] not in dispatch:
                 return self.handle_command(inbuf, cmd)
 
@@ -116,58 +174,30 @@ class CLICommandMeta(type):
         cls.handle_command = handle_command
 
 
-def _no_result():
-    return object()
-
-
-class _Promise(object):
+class OrchResult(Generic[T]):
     """
-    A completion may need multiple promises to be fulfilled. `_Promise` is one
-    step.
-
-    Typically ``Orchestrator`` implementations inherit from this class to
-    build their own way of finishing a step to fulfil a future.
-
-    They are not exposed in the orchestrator interface and can be seen as a
-    helper to build orchestrator modules.
+    Stores a result and an exception. Mainly to circumvent the
+    MgrModule.remote() method that hides all exceptions and for
+    handling different sub-interpreters.
     """
-    INITIALIZED = 1  # We have a parent completion and a next completion
-    RUNNING = 2
-    FINISHED = 3  # we have a final result
-
-    NO_RESULT = _no_result()  # type: None
-    ASYNC_RESULT = object()
-
-    def __init__(self,
-                 _first_promise=None,  # type: Optional["_Promise"]
-                 value=NO_RESULT,  # type: Optional[Any]
-                 on_complete=None,    # type: Optional[Callable]
-                 name=None,  # type: Optional[str]
-                 ):
-        self._on_complete_ = on_complete
-        self._name = name
-        self._next_promise = None  # type: Optional[_Promise]
 
-        self._state = self.INITIALIZED
-        self._exception = None  # type: Optional[Exception]
+    def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+        self.result = result
+        self.serialized_exception: Optional[bytes] = None
+        self.exception_str: str = ''
+        self.set_exception(exception)
 
-        # Value of this _Promise. may be an intermediate result.
-        self._value = value
+    __slots__ = 'result', 'serialized_exception', 'exception_str'
 
-        # _Promise is not a continuation monad, as `_result` is of type
-        # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
-        self._first_promise = _first_promise or self  # type: '_Promise'
+    def set_exception(self, e: Optional[Exception]) -> None:
+        if e is None:
+            self.serialized_exception = None
+            self.exception_str = ''
+            return
 
-    @property
-    def _exception(self):
-        # type: () -> Optional[Exception]
-        return getattr(self, '_exception_', None)
-
-    @_exception.setter
-    def _exception(self, e):
-        self._exception_ = e
+        self.exception_str = f'{type(e)}: {str(e)}'
         try:
-            self._serialized_exception_ = pickle.dumps(e) if e is not None else None
+            self.serialized_exception = pickle.dumps(e)
         except pickle.PicklingError:
             logger.error(f"failed to pickle {e}")
             if isinstance(e, Exception):
@@ -175,376 +205,9 @@ class _Promise(object):
             else:
                 e = Exception(str(e))
             # degenerate to a plain Exception
-            self._serialized_exception_ = pickle.dumps(e)
-
-    @property
-    def _serialized_exception(self):
-        # type: () -> Optional[bytes]
-        return getattr(self, '_serialized_exception_', None)
-
-
-
-    @property
-    def _on_complete(self):
-        # type: () -> Optional[Callable]
-        # https://github.com/python/mypy/issues/4125
-        return self._on_complete_
-
-    @_on_complete.setter
-    def _on_complete(self, val):
-        # type: (Optional[Callable]) -> None
-        self._on_complete_ = val
-
-
-    def __repr__(self):
-        name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
-        val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
-        return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
-            self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
-        )
-
-    def pretty_print_1(self):
-        if self._name:
-            name = self._name
-        elif self._on_complete is None:
-            name = 'lambda x: x'
-        elif hasattr(self._on_complete, '__name__'):
-            name = getattr(self._on_complete, '__name__')
-        else:
-            name = self._on_complete.__class__.__name__
-        val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
-        prefix = {
-            self.INITIALIZED: '      ',
-            self.RUNNING:     '   >>>',
-            self.FINISHED:    '(done)'
-        }[self._state]
-        return '{} {}({}),'.format(prefix, name, val)
-
-    def then(self, on_complete):
-        # type: (Any, Callable) -> Any
-        """
-        Call ``on_complete`` as soon as this promise is finalized.
-        """
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        if self._next_promise is not None:
-            return self._next_promise.then(on_complete)
-
-        if self._on_complete is not None:
-            self._set_next_promise(self.__class__(
-                _first_promise=self._first_promise,
-                on_complete=on_complete
-            ))
-            return self._next_promise
-
-        else:
-            self._on_complete = on_complete
-            self._set_next_promise(self.__class__(_first_promise=self._first_promise))
-            return self._next_promise
-
-    def _set_next_promise(self, next):
-        # type: (_Promise) -> None
-        assert self is not next
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        self._next_promise = next
-        assert self._next_promise is not None
-        for p in iter(self._next_promise):
-            p._first_promise = self._first_promise
-
-    def _finalize(self, value=NO_RESULT):
-        """
-        Sets this promise to complete.
-
-        Orchestrators may choose to use this helper function.
-
-        :param value: new value.
-        """
-        if self._state not in (self.INITIALIZED, self.RUNNING):
-            raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
-        self._state = self.RUNNING
-
-        if value is not self.NO_RESULT:
-            self._value = value
-        assert self._value is not self.NO_RESULT, repr(self)
-
-        if self._on_complete:
-            try:
-                next_result = self._on_complete(self._value)
-            except Exception as e:
-                self.fail(e)
-                return
-        else:
-            next_result = self._value
-
-        if isinstance(next_result, _Promise):
-            # hack: _Promise is not a continuation monad.
-            next_result = next_result._first_promise  # type: ignore
-            assert next_result not in self, repr(self._first_promise) + repr(next_result)
-            assert self not in next_result
-            next_result._append_promise(self._next_promise)
-            self._set_next_promise(next_result)
-            assert self._next_promise
-            if self._next_promise._value is self.NO_RESULT:
-                self._next_promise._value = self._value
-            self.propagate_to_next()
-        elif next_result is not self.ASYNC_RESULT:
-            # simple map. simply forward
-            if self._next_promise:
-                self._next_promise._value = next_result
-            else:
-                # Hack: next_result is of type U, _value is of type T
-                self._value = next_result  # type: ignore
-            self.propagate_to_next()
-        else:
-            # asynchronous promise
-            pass
-
-
-    def propagate_to_next(self):
-        self._state = self.FINISHED
-        logger.debug('finalized {}'.format(repr(self)))
-        if self._next_promise:
-            self._next_promise._finalize()
-
-    def fail(self, e):
-        # type: (Exception) -> None
-        """
-        Sets the whole completion to be faild with this exception and end the
-        evaluation.
-        """
-        if self._state == self.FINISHED:
-            raise ValueError(
-                'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-        logger.exception('_Promise failed')
-        self._exception = e
-        self._value = f'_exception: {e}'
-        if self._next_promise:
-            self._next_promise.fail(e)
-        self._state = self.FINISHED
-
-    def __contains__(self, item):
-        return any(item is p for p in iter(self._first_promise))
-
-    def __iter__(self):
-        yield self
-        elem = self._next_promise
-        while elem is not None:
-            yield elem
-            elem = elem._next_promise
-
-    def _append_promise(self, other):
-        if other is not None:
-            assert self not in other
-            assert other not in self
-            self._last_promise()._set_next_promise(other)
-
-    def _last_promise(self):
-        # type: () -> _Promise
-        return list(iter(self))[-1]
-
-
-class ProgressReference(object):
-    def __init__(self,
-                 message,  # type: str
-                 mgr,
-                 completion=None  # type: Optional[Callable[[], Completion]]
-                ):
-        """
-        ProgressReference can be used within Completions::
-
-            +---------------+      +---------------------------------+
-            |               | then |                                 |
-            | My Completion | +--> | on_complete=ProgressReference() |
-            |               |      |                                 |
-            +---------------+      +---------------------------------+
-
-        See :func:`Completion.with_progress` for an easy way to create
-        a progress reference
-
-        """
-        super(ProgressReference, self).__init__()
-        self.progress_id = str(uuid.uuid4())
-        self.message = message
-        self.mgr = mgr
-
-        #: The completion can already have a result, before the write
-        #: operation is effective. progress == 1 means, the services are
-        #: created / removed.
-        self.completion = completion  # type: Optional[Callable[[], Completion]]
-
-        #: if a orchestrator module can provide a more detailed
-        #: progress information, it needs to also call ``progress.update()``.
-        self.progress = 0.0
-
-        self._completion_has_result = False
-        self.mgr.all_progress_references.append(self)
-
-    def __str__(self):
-        """
-        ``__str__()`` is used for determining the message for progress events.
-        """
-        return self.message or super(ProgressReference, self).__str__()
-
-    def __call__(self, arg):
-        self._completion_has_result = True
-        self.progress = 1.0
-        return arg
-
-    @property
-    def progress(self):
-        return self._progress
-
-    @progress.setter
-    def progress(self, progress):
-        assert progress <= 1.0
-        self._progress = progress
-        try:
-            if self.effective:
-                self.mgr.remote("progress", "complete", self.progress_id)
-                self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
-            else:
-                self.mgr.remote("progress", "update", self.progress_id, self.message,
-                                progress,
-                                [("origin", "orchestrator")])
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
-
-    @property
-    def effective(self):
-        return self.progress == 1 and self._completion_has_result
-
-    def update(self):
-        def progress_run(progress):
-            self.progress = progress
-        if self.completion:
-            c = self.completion().then(progress_run)
-            self.mgr.process([c._first_promise])
-        else:
-            self.progress = 1
-
-    def fail(self):
-        self._completion_has_result = True
-        self.progress = 1
-
-
-class Completion(_Promise):
-    """
-    Combines multiple promises into one overall operation.
-
-    Completions are composable by being able to
-    call one completion from another completion. I.e. making them re-usable
-    using Promises E.g.::
-
-        >>> #doctest: +SKIP
-        ... return Orchestrator().get_hosts().then(self._create_osd)
-
-    where ``get_hosts`` returns a Completion of list of hosts and
-    ``_create_osd`` takes a list of hosts.
-
-    The concept behind this is to store the computation steps
-    explicit and then explicitly evaluate the chain:
-
-        >>> #doctest: +SKIP
-        ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
-        ... p.finalize(2)
-        ... assert p.result = "4"
-
-    or graphically::
-
-        +---------------+      +-----------------+
-        |               | then |                 |
-        | lambda x: x*x | +--> | lambda x: str(x)|
-        |               |      |                 |
-        +---------------+      +-----------------+
-
-    """
-    def __init__(self,
-                 _first_promise=None,  # type: Optional["Completion"]
-                 value=_Promise.NO_RESULT,  # type: Any
-                 on_complete=None,  # type: Optional[Callable]
-                 name=None,  # type: Optional[str]
-                 ):
-        super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
-    @property
-    def _progress_reference(self):
-        # type: () -> Optional[ProgressReference]
-        if hasattr(self._on_complete, 'progress_id'):
-            return self._on_complete  # type: ignore
-        return None
+            self.serialized_exception = pickle.dumps(e)
 
-    @property
-    def progress_reference(self):
-        # type: () -> Optional[ProgressReference]
-        """
-        ProgressReference. Marks this completion
-        as a write completeion.
-        """
-
-        references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
-        if references:
-            assert len(references) == 1
-            return references[0]
-        return None
-
-    @classmethod
-    def with_progress(cls,  # type: Any
-                      message,  # type: str
-                      mgr,
-                      _first_promise=None,  # type: Optional["Completion"]
-                      value=_Promise.NO_RESULT,  # type: Any
-                      on_complete=None,  # type: Optional[Callable]
-                      calc_percent=None  # type: Optional[Callable[[], Any]]
-                      ):
-        # type: (...) -> Any
-
-        c = cls(
-            _first_promise=_first_promise,
-            value=value,
-            on_complete=on_complete
-        ).add_progress(message, mgr, calc_percent)
-
-        return c._first_promise
-
-    def add_progress(self,
-                     message,  # type: str
-                     mgr,
-                     calc_percent=None  # type: Optional[Callable[[], Any]]
-                     ):
-        return self.then(
-            on_complete=ProgressReference(
-                message=message,
-                mgr=mgr,
-                completion=calc_percent
-            )
-        )
-
-    def fail(self, e):
-        super(Completion, self).fail(e)
-        if self._progress_reference:
-            self._progress_reference.fail()
-
-    def finalize(self, result=_Promise.NO_RESULT):
-        if self._first_promise._state == self.INITIALIZED:
-            self._first_promise._finalize(result)
-
-    @property
-    def result(self):
-        """
-        The result of the operation that we were waited
-        for.  Only valid after calling Orchestrator.process() on this
-        completion.
-        """
-        last = self._last_promise()
-        assert last._state == _Promise.FINISHED
-        return last._value
-
-    def result_str(self):
+    def result_str(self) -> str:
         """Force a string."""
         if self.result is None:
             return ''
@@ -552,99 +215,23 @@ class Completion(_Promise):
             return '\n'.join(str(x) for x in self.result)
         return str(self.result)
 
-    @property
-    def exception(self):
-        # type: () -> Optional[Exception]
-        return self._last_promise()._exception
-
-    @property
-    def serialized_exception(self):
-        # type: () -> Optional[bytes]
-        return self._last_promise()._serialized_exception
-
-    @property
-    def has_result(self):
-        # type: () -> bool
-        """
-        Has the operation already a result?
-
-        For Write operations, it can already have a
-        result, if the orchestrator's configuration is
-        persistently written. Typically this would
-        indicate that an update had been written to
-        a manifest, but that the update had not
-        necessarily been pushed out to the cluster.
-
-        :return:
-        """
-        return self._last_promise()._state == _Promise.FINISHED
-
-    @property
-    def is_errored(self):
-        # type: () -> bool
-        """
-        Has the completion failed. Default implementation looks for
-        self.exception. Can be overwritten.
-        """
-        return self.exception is not None
-
-    @property
-    def needs_result(self):
-        # type: () -> bool
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return not self.is_errored and not self.has_result
-
-    @property
-    def is_finished(self):
-        # type: () -> bool
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return self.is_errored or (self.has_result)
-
-    def pretty_print(self):
-
-        reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
-        return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-
-
-def pretty_print(completions):
-    # type: (Sequence[Completion]) -> str
-    return ', '.join(c.pretty_print() for c in completions)
-
 
-def raise_if_exception(c):
-    # type: (Completion) -> None
+def raise_if_exception(c: OrchResult[T]) -> T:
     """
-    :raises OrchestratorError: Some user error or a config error.
-    :raises Exception: Some internal error
+    Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
     """
     if c.serialized_exception is not None:
         try:
             e = pickle.loads(c.serialized_exception)
         except (KeyError, AttributeError):
-            raise Exception('{}: {}'.format(type(c.exception), c.exception))
+            raise Exception(c.exception_str)
         raise e
+    assert c.result is not None, 'OrchResult should either have an exception or a result'
+    return c.result
 
 
-class TrivialReadCompletion(Completion):
-    """
-    This is the trivial completion simply wrapping a result.
-    """
-    def __init__(self, result):
-        super(TrivialReadCompletion, self).__init__()
-        if result:
-            self.finalize(result)
-
-
-def _hide_in_features(f):
-    f._hide_in_features = True
+def _hide_in_features(f: FuncT) -> FuncT:
+    f._hide_in_features = True  # type: ignore
     return f
 
 
@@ -672,7 +259,7 @@ class Orchestrator(object):
     """
 
     @_hide_in_features
-    def is_orchestrator_module(self):
+    def is_orchestrator_module(self) -> bool:
         """
         Enable other modules to interrogate this module to discover
         whether it's usable as an orchestrator module.
@@ -682,8 +269,7 @@ class Orchestrator(object):
         return True
 
     @_hide_in_features
-    def available(self):
-        # type: () -> Tuple[bool, str]
+    def available(self) -> Tuple[bool, str, Dict[str, Any]]:
         """
         Report whether we can talk to the orchestrator.  This is the
         place to give the user a meaningful message if the orchestrator
@@ -704,28 +290,14 @@ class Orchestrator(object):
                 ... if OrchestratorClientMixin().available()[0]:  # wrong.
                 ...     OrchestratorClientMixin().get_hosts()
 
-        :return: two-tuple of boolean, string
+        :return: boolean representing whether the module is available/usable
+        :return: string describing any error
+        :return: dict containing any module specific information
         """
         raise NotImplementedError()
 
     @_hide_in_features
-    def process(self, completions):
-        # type: (List[Completion]) -> None
-        """
-        Given a list of Completion instances, process any which are
-        incomplete.
-
-        Callers should inspect the detail of each completion to identify
-        partial completion/progress information, and present that information
-        to the user.
-
-        This method should not block, as this would make it slow to query
-        a status, while other long running operations are in progress.
-        """
-        raise NotImplementedError()
-
-    @_hide_in_features
-    def get_feature_set(self):
+    def get_feature_set(self) -> Dict[str, dict]:
         """Describes which methods this orchestrator implements
 
         .. note::
@@ -755,23 +327,19 @@ class Orchestrator(object):
                     }
         return features
 
-    def cancel_completions(self):
-        # type: () -> None
+    def cancel_completions(self) -> None:
         """
         Cancels ongoing completions. Unstuck the mgr.
         """
         raise NotImplementedError()
 
-    def pause(self):
-        # type: () -> None
+    def pause(self) -> None:
         raise NotImplementedError()
 
-    def resume(self):
-        # type: () -> None
+    def resume(self) -> None:
         raise NotImplementedError()
 
-    def add_host(self, host_spec):
-        # type: (HostSpec) -> Completion
+    def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
         """
         Add a host to the orchestrator inventory.
 
@@ -779,8 +347,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_host(self, host):
-        # type: (str) -> Completion
+    def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
         """
         Remove a host from the orchestrator inventory.
 
@@ -788,8 +355,15 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def update_host_addr(self, host, addr):
-        # type: (str, str) -> Completion
+    def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+        """
+        drain all daemons from a host
+
+        :param hostname: hostname
+        """
+        raise NotImplementedError()
+
+    def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
         """
         Update a host's address
 
@@ -798,8 +372,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def get_hosts(self):
-        # type: () -> Completion
+    def get_hosts(self) -> OrchResult[List[HostSpec]]:
         """
         Report the hosts in the cluster.
 
@@ -807,22 +380,45 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def add_host_label(self, host, label):
-        # type: (str, str) -> Completion
+    def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+        """
+        Return hosts metadata(gather_facts).
+        """
+        raise NotImplementedError()
+
+    def add_host_label(self, host: str, label: str) -> OrchResult[str]:
         """
         Add a host label
         """
         raise NotImplementedError()
 
-    def remove_host_label(self, host, label):
-        # type: (str, str) -> Completion
+    def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a host label
         """
         raise NotImplementedError()
 
-    def get_inventory(self, host_filter=None, refresh=False):
-        # type: (Optional[InventoryFilter], bool) -> Completion
+    def host_ok_to_stop(self, hostname: str) -> OrchResult:
+        """
+        Check if the specified host can be safely stopped without reducing availability
+
+        :param host: hostname
+        """
+        raise NotImplementedError()
+
+    def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
+        """
+        Place a host in maintenance, stopping daemons and disabling it's systemd target
+        """
+        raise NotImplementedError()
+
+    def exit_host_maintenance(self, hostname: str) -> OrchResult:
+        """
+        Return a host from maintenance, restarting the clusters systemd target
+        """
+        raise NotImplementedError()
+
+    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
         """
         Returns something that was created by `ceph-volume inventory`.
 
@@ -830,8 +426,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def describe_service(self, service_type=None, service_name=None, refresh=False):
-        # type: (Optional[str], Optional[str], bool) -> Completion
+    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -845,8 +440,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
-        # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion
+    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
         """
         Describe a daemon (of any kind) that is already configured in
         the orchestrator.
@@ -855,11 +449,12 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply(self, specs: List["GenericSpec"]) -> Completion:
+    @handle_orch_error
+    def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
         """
         Applies any spec
         """
-        fns: Dict[str, function] = {
+        fns: Dict[str, Callable[..., OrchResult[str]]] = {
             'alertmanager': self.apply_alertmanager,
             'crash': self.apply_crash,
             'grafana': self.apply_grafana,
@@ -869,31 +464,31 @@ class Orchestrator(object):
             'mon': self.apply_mon,
             'nfs': self.apply_nfs,
             'node-exporter': self.apply_node_exporter,
-            'osd': lambda dg: self.apply_drivegroups([dg]),
+            'osd': lambda dg: self.apply_drivegroups([dg]),  # type: ignore
             'prometheus': self.apply_prometheus,
+            'loki': self.apply_loki,
+            'promtail': self.apply_promtail,
             'rbd-mirror': self.apply_rbd_mirror,
             'rgw': self.apply_rgw,
+            'ingress': self.apply_ingress,
+            'snmp-gateway': self.apply_snmp_gateway,
             'host': self.add_host,
         }
 
-        def merge(ls, r):
-            if isinstance(ls, list):
-                return ls + [r]
-            return [ls, r]
+        def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]:  # noqa: E741
+            l_res = raise_if_exception(l)
+            r_res = raise_if_exception(r)
+            l_res.append(r_res)
+            return OrchResult(l_res)
+        return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
 
-        spec, *specs = specs
-
-        fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-        completion = fn(spec)
-        for s in specs:
-            def next(ls):
-                fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-                return fn(s).then(lambda r: merge(ls, r))
-            completion = completion.then(next)
-        return completion
+    def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
+        """
+        Plan (Dry-run, Preview) a List of Specs.
+        """
+        raise NotImplementedError()
 
-    def remove_daemons(self, names):
-        # type: (List[str]) -> Completion
+    def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
         """
         Remove specific daemon(s).
 
@@ -901,8 +496,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_service(self, service_name):
-        # type: (str) -> Completion
+    def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a service (a collection of daemons).
 
@@ -910,34 +504,32 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def service_action(self, action, service_name):
-        # type: (str, str) -> Completion
+    def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
         """
         Perform an action (start/stop/reload) on a service (i.e., all daemons
         providing the logical service).
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
-        :param service_type: e.g. "mds", "rgw", ...
-        :param service_name: name of logical service ("cephfs", "us-east", ...)
-        :rtype: Completion
+        :param service_name: service_type + '.' + service_id
+                            (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
+        :rtype: OrchResult
         """
-        #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+        # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def daemon_action(self, action, daemon_type, daemon_id):
-        # type: (str, str, str) -> Completion
+    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
         """
         Perform an action (start/stop/reload) on a daemon.
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
-        :param name: name of daemon
-        :rtype: Completion
+        :param daemon_name: name of daemon
+        :param image: Container image when redeploying that daemon
+        :rtype: OrchResult
         """
-        #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+        # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> Completion
+    def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
         """
         Create one or more OSDs within a single Drive Group.
 
@@ -948,45 +540,53 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion:
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
         """ Update OSD cluster """
         raise NotImplementedError()
 
     def set_unmanaged_flag(self,
                            unmanaged_flag: bool,
                            service_type: str = 'osd',
-                           service_name=None
+                           service_name: Optional[str] = None
                            ) -> HandleCommandResult:
         raise NotImplementedError()
 
     def preview_osdspecs(self,
                          osdspec_name: Optional[str] = 'osd',
                          osdspecs: Optional[List[DriveGroupSpec]] = None
-                         ) -> Completion:
+                         ) -> OrchResult[str]:
         """ Get a preview for OSD deployments """
         raise NotImplementedError()
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> Completion:
+                    force: bool = False,
+                    zap: bool = False) -> OrchResult[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
         :param force: Forces the OSD removal process without waiting for the data to be drained first.
-        Note that this can only remove OSDs that were successfully
-        created (i.e. got an OSD ID).
+        :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+
+        .. note:: this can only remove OSDs that were successfully
+            created (i.e. got an OSD ID).
+        """
+        raise NotImplementedError()
+
+    def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
+        """
+        TODO
         """
         raise NotImplementedError()
 
-    def remove_osds_status(self):
-        # type: () -> Completion
+    def remove_osds_status(self) -> OrchResult:
         """
         Returns a status of the ongoing OSD removal operations.
         """
         raise NotImplementedError()
 
-    def blink_device_light(self, ident_fault, on, locations):
-        # type: (str, bool, List[DeviceLightLoc]) -> Completion
+    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
         """
         Instructs the orchestrator to enable or disable either the ident or the fault LED.
 
@@ -996,153 +596,98 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def zap_device(self, host, path):
-        # type: (str, str) -> Completion
+    def zap_device(self, host: str, path: str) -> OrchResult[str]:
         """Zap/Erase a device (DESTROYS DATA)"""
         raise NotImplementedError()
 
-    def add_mon(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create mon daemon(s)"""
+    def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
+        """Create daemons daemon(s) for unmanaged services"""
         raise NotImplementedError()
 
-    def apply_mon(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mon cluster"""
         raise NotImplementedError()
 
-    def add_mgr(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create mgr daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_mgr(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mgr cluster"""
         raise NotImplementedError()
 
-    def add_mds(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create MDS daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_mds(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
         """Update MDS cluster"""
         raise NotImplementedError()
 
-    def add_rgw(self, spec):
-        # type: (RGWSpec) -> Completion
-        """Create RGW daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_rgw(self, spec):
-        # type: (RGWSpec) -> Completion
+    def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
         """Update RGW cluster"""
         raise NotImplementedError()
 
-    def add_rbd_mirror(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create rbd-mirror daemon(s)"""
+    def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
+        """Update ingress daemons"""
         raise NotImplementedError()
 
-    def apply_rbd_mirror(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update rbd-mirror cluster"""
         raise NotImplementedError()
 
-    def add_nfs(self, spec):
-        # type: (NFSServiceSpec) -> Completion
-        """Create NFS daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_nfs(self, spec):
-        # type: (NFSServiceSpec) -> Completion
+    def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
         """Update NFS cluster"""
         raise NotImplementedError()
 
-    def add_iscsi(self, spec):
-        # type: (IscsiServiceSpec) -> Completion
-        """Create iscsi daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_iscsi(self, spec):
-        # type: (IscsiServiceSpec) -> Completion
+    def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
         """Update iscsi cluster"""
         raise NotImplementedError()
 
-    def add_prometheus(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create new prometheus daemon"""
-        raise NotImplementedError()
-
-    def apply_prometheus(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update prometheus cluster"""
         raise NotImplementedError()
 
-    def add_node_exporter(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create a new Node-Exporter service"""
+    def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
-    def apply_node_exporter(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Update existing a Node-Exporter daemon(s)"""
+    def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Loki daemon(s)"""
         raise NotImplementedError()
 
-    def add_crash(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create a new crash service"""
+    def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Promtail daemon(s)"""
         raise NotImplementedError()
 
-    def apply_crash(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
 
-    def add_grafana(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create a new Node-Exporter service"""
+    def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a grafana service"""
         raise NotImplementedError()
 
-    def apply_grafana(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Update existing a Node-Exporter daemon(s)"""
+    def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update an existing AlertManager daemon(s)"""
         raise NotImplementedError()
 
-    def add_alertmanager(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Create a new AlertManager service"""
+    def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+        """Update an existing snmp gateway service"""
         raise NotImplementedError()
 
-    def apply_alertmanager(self, spec):
-        # type: (ServiceSpec) -> Completion
-        """Update an existing AlertManager daemon(s)"""
+    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_check(self, image, version):
-        # type: (Optional[str], Optional[str]) -> Completion
+    def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
         raise NotImplementedError()
 
-    def upgrade_start(self, image, version):
-        # type: (Optional[str], Optional[str]) -> Completion
+    def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_pause(self):
-        # type: () -> Completion
+    def upgrade_pause(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_resume(self):
-        # type: () -> Completion
+    def upgrade_resume(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_stop(self):
-        # type: () -> Completion
+    def upgrade_stop(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_status(self):
-        # type: () -> Completion
+    def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
@@ -1152,8 +697,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
-    def upgrade_available(self):
-        # type: () -> Completion
+    def upgrade_available(self) -> OrchResult:
         """
         Report on what versions are available to upgrade to
 
@@ -1162,89 +706,114 @@ class Orchestrator(object):
         raise NotImplementedError()
 
 
-class HostSpec(object):
-    """
-    Information about hosts. Like e.g. ``kubectl get nodes``
-    """
-    def __init__(self,
-                 hostname,  # type: str
-                 addr=None,  # type: Optional[str]
-                 labels=None,  # type: Optional[List[str]]
-                 status=None,  # type: Optional[str]
-                 ):
-        self.service_type = 'host'
-
-        #: the bare hostname on the host. Not the FQDN.
-        self.hostname = hostname  # type: str
-
-        #: DNS name or IP address to reach it
-        self.addr = addr or hostname  # type: str
-
-        #: label(s), if any
-        self.labels = labels or []  # type: List[str]
-
-        #: human readable status
-        self.status = status or ''  # type: str
-
-    def to_json(self):
-        return {
-            'hostname': self.hostname,
-            'addr': self.addr,
-            'labels': self.labels,
-            'status': self.status,
-        }
-
-    @classmethod
-    def from_json(cls, host_spec):
-        _cls = cls(host_spec['hostname'],
-                   host_spec['addr'] if 'addr' in host_spec else None,
-                   host_spec['labels'] if 'labels' in host_spec else None)
-        return _cls
-
-    def __repr__(self):
-        args = [self.hostname]  # type: List[Any]
-        if self.addr is not None:
-            args.append(self.addr)
-        if self.labels:
-            args.append(self.labels)
-        if self.status:
-            args.append(self.status)
-
-        return "<HostSpec>({})".format(', '.join(map(repr, args)))
-
-    def __eq__(self, other):
-        # Let's omit `status` for the moment, as it is still the very same host.
-        return self.hostname == other.hostname and \
-               self.addr == other.addr and \
-               self.labels == other.labels
-
 GenericSpec = Union[ServiceSpec, HostSpec]
 
-def json_to_generic_spec(spec):
-    # type: (dict) -> GenericSpec
+
+def json_to_generic_spec(spec: dict) -> GenericSpec:
     if 'service_type' in spec and spec['service_type'] == 'host':
         return HostSpec.from_json(spec)
     else:
         return ServiceSpec.from_json(spec)
 
+
+def daemon_type_to_service(dtype: str) -> str:
+    mapping = {
+        'mon': 'mon',
+        'mgr': 'mgr',
+        'mds': 'mds',
+        'rgw': 'rgw',
+        'osd': 'osd',
+        'haproxy': 'ingress',
+        'keepalived': 'ingress',
+        'iscsi': 'iscsi',
+        'rbd-mirror': 'rbd-mirror',
+        'cephfs-mirror': 'cephfs-mirror',
+        'nfs': 'nfs',
+        'grafana': 'grafana',
+        'alertmanager': 'alertmanager',
+        'prometheus': 'prometheus',
+        'node-exporter': 'node-exporter',
+        'loki': 'loki',
+        'promtail': 'promtail',
+        'crash': 'crash',
+        'crashcollector': 'crash',  # Specific Rook Daemon
+        'container': 'container',
+        'agent': 'agent',
+        'snmp-gateway': 'snmp-gateway',
+    }
+    return mapping[dtype]
+
+
+def service_to_daemon_types(stype: str) -> List[str]:
+    mapping = {
+        'mon': ['mon'],
+        'mgr': ['mgr'],
+        'mds': ['mds'],
+        'rgw': ['rgw'],
+        'osd': ['osd'],
+        'ingress': ['haproxy', 'keepalived'],
+        'iscsi': ['iscsi'],
+        'rbd-mirror': ['rbd-mirror'],
+        'cephfs-mirror': ['cephfs-mirror'],
+        'nfs': ['nfs'],
+        'grafana': ['grafana'],
+        'alertmanager': ['alertmanager'],
+        'prometheus': ['prometheus'],
+        'loki': ['loki'],
+        'promtail': ['promtail'],
+        'node-exporter': ['node-exporter'],
+        'crash': ['crash'],
+        'container': ['container'],
+        'agent': ['agent'],
+        'snmp-gateway': ['snmp-gateway'],
+    }
+    return mapping[stype]
+
+
+KNOWN_DAEMON_TYPES: List[str] = list(
+    sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
 class UpgradeStatusSpec(object):
     # Orchestrator's report on what's going on with any ongoing upgrade
-    def __init__(self):
+    def __init__(self) -> None:
         self.in_progress = False  # Is an upgrade underway?
-        self.target_image = None
-        self.services_complete = []  # Which daemon types are fully updated?
+        self.target_image: Optional[str] = None
+        self.services_complete: List[str] = []  # Which daemon types are fully updated?
+        self.which: str = '<unknown>'  # for if user specified daemon types, services or hosts
+        self.progress: Optional[str] = None  # How many of the daemons have we upgraded
         self.message = ""  # Freeform description
 
 
-def handle_type_error(method):
+def handle_type_error(method: FuncT) -> FuncT:
     @wraps(method)
-    def inner(cls, *args, **kwargs):
+    def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
         try:
             return method(cls, *args, **kwargs)
         except TypeError as e:
             error_msg = '{}: {}'.format(cls.__name__, e)
         raise OrchestratorValidationError(error_msg)
-    return inner
+    return cast(FuncT, inner)
+
+
+class DaemonDescriptionStatus(enum.IntEnum):
+    unknown = -2
+    error = -1
+    stopped = 0
+    running = 1
+    starting = 2  #: Daemon is deployed, but not yet running
+
+    @staticmethod
+    def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+        if status is None:
+            status = DaemonDescriptionStatus.unknown
+        return {
+            DaemonDescriptionStatus.unknown: 'unknown',
+            DaemonDescriptionStatus.error: 'error',
+            DaemonDescriptionStatus.stopped: 'stopped',
+            DaemonDescriptionStatus.running: 'running',
+            DaemonDescriptionStatus.starting: 'starting',
+        }.get(status, '<unknown>')
 
 
 class DaemonDescription(object):
@@ -1261,73 +830,144 @@ class DaemonDescription(object):
     """
 
     def __init__(self,
-                 daemon_type=None,
-                 daemon_id=None,
-                 hostname=None,
-                 container_id=None,
-                 container_image_id=None,
-                 container_image_name=None,
-                 version=None,
-                 status=None,
-                 status_desc=None,
-                 last_refresh=None,
-                 created=None,
-                 started=None,
-                 last_configured=None,
-                 osdspec_affinity=None,
-                 last_deployed=None):
-        # Host is at the same granularity as InventoryHost
-        self.hostname = hostname
+                 daemon_type: Optional[str] = None,
+                 daemon_id: Optional[str] = None,
+                 hostname: Optional[str] = None,
+                 container_id: Optional[str] = None,
+                 container_image_id: Optional[str] = None,
+                 container_image_name: Optional[str] = None,
+                 container_image_digests: Optional[List[str]] = None,
+                 version: Optional[str] = None,
+                 status: Optional[DaemonDescriptionStatus] = None,
+                 status_desc: Optional[str] = None,
+                 last_refresh: Optional[datetime.datetime] = None,
+                 created: Optional[datetime.datetime] = None,
+                 started: Optional[datetime.datetime] = None,
+                 last_configured: Optional[datetime.datetime] = None,
+                 osdspec_affinity: Optional[str] = None,
+                 last_deployed: Optional[datetime.datetime] = None,
+                 events: Optional[List['OrchestratorEvent']] = None,
+                 is_active: bool = False,
+                 memory_usage: Optional[int] = None,
+                 memory_request: Optional[int] = None,
+                 memory_limit: Optional[int] = None,
+                 cpu_percentage: Optional[str] = None,
+                 service_name: Optional[str] = None,
+                 ports: Optional[List[int]] = None,
+                 ip: Optional[str] = None,
+                 deployed_by: Optional[List[str]] = None,
+                 rank: Optional[int] = None,
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None,
+                 ) -> None:
+
+        #: Host is at the same granularity as InventoryHost
+        self.hostname: Optional[str] = hostname
 
         # Not everyone runs in containers, but enough people do to
         # justify having the container_id (runtime id) and container_image
         # (image name)
         self.container_id = container_id                  # runtime id
-        self.container_image_id = container_image_id      # image hash
+        self.container_image_id = container_image_id      # image id locally
         self.container_image_name = container_image_name  # image friendly name
+        self.container_image_digests = container_image_digests  # reg hashes
 
-        # The type of service (osd, mon, mgr, etc.)
+        #: The type of service (osd, mon, mgr, etc.)
         self.daemon_type = daemon_type
 
-        # The orchestrator will have picked some names for daemons,
-        # typically either based on hostnames or on pod names.
-        # This is the <foo> in mds.<foo>, the ID that will appear
-        # in the FSMap/ServiceMap.
-        self.daemon_id = daemon_id
+        #: The orchestrator will have picked some names for daemons,
+        #: typically either based on hostnames or on pod names.
+        #: This is the <foo> in mds.<foo>, the ID that will appear
+        #: in the FSMap/ServiceMap.
+        self.daemon_id: Optional[str] = daemon_id
+        self.daemon_name = self.name()
 
-        # Service version that was deployed
+        #: Some daemon types have a numeric rank assigned
+        self.rank: Optional[int] = rank
+        self.rank_generation: Optional[int] = rank_generation
+
+        self._service_name: Optional[str] = service_name
+
+        #: Service version that was deployed
         self.version = version
 
-        # Service status: -1 error, 0 stopped, 1 running
-        self.status = status
+        # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+        self._status = status
 
-        # Service status description when status == -1.
+        #: Service status description when status == error.
         self.status_desc = status_desc
 
-        # datetime when this info was last refreshed
-        self.last_refresh = last_refresh  # type: Optional[datetime.datetime]
+        #: datetime when this info was last refreshed
+        self.last_refresh: Optional[datetime.datetime] = last_refresh
+
+        self.created: Optional[datetime.datetime] = created
+        self.started: Optional[datetime.datetime] = started
+        self.last_configured: Optional[datetime.datetime] = last_configured
+        self.last_deployed: Optional[datetime.datetime] = last_deployed
+
+        #: Affinity to a certain OSDSpec
+        self.osdspec_affinity: Optional[str] = osdspec_affinity
+
+        self.events: List[OrchestratorEvent] = events or []
+
+        self.memory_usage: Optional[int] = memory_usage
+        self.memory_request: Optional[int] = memory_request
+        self.memory_limit: Optional[int] = memory_limit
+
+        self.cpu_percentage: Optional[str] = cpu_percentage
+
+        self.ports: Optional[List[int]] = ports
+        self.ip: Optional[str] = ip
 
-        self.created = created    # type: Optional[datetime.datetime]
-        self.started = started    # type: Optional[datetime.datetime]
-        self.last_configured = last_configured # type: Optional[datetime.datetime]
-        self.last_deployed = last_deployed    # type: Optional[datetime.datetime]
+        self.deployed_by = deployed_by
 
-        # Affinity to a certain OSDSpec
-        self.osdspec_affinity = osdspec_affinity  # type: Optional[str]
+        self.is_active = is_active
 
-    def name(self):
+        self.extra_container_args = extra_container_args
+
+    @property
+    def status(self) -> Optional[DaemonDescriptionStatus]:
+        return self._status
+
+    @status.setter
+    def status(self, new: DaemonDescriptionStatus) -> None:
+        self._status = new
+        self.status_desc = DaemonDescriptionStatus.to_str(new)
+
+    def get_port_summary(self) -> str:
+        if not self.ports:
+            return ''
+        return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
+
+    def name(self) -> str:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
-    def matches_service(self, service_name):
-        # type: (Optional[str]) -> bool
+    def matches_service(self, service_name: Optional[str]) -> bool:
+        assert self.daemon_id is not None
+        assert self.daemon_type is not None
         if service_name:
-            return self.name().startswith(service_name + '.')
+            return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
         return False
 
-    def service_id(self):
-        def _match():
-            err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " \
-                    f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
+    def service_id(self) -> str:
+        assert self.daemon_id is not None
+        assert self.daemon_type is not None
+
+        if self._service_name:
+            if '.' in self._service_name:
+                return self._service_name.split('.', 1)[1]
+            else:
+                return ''
+
+        if self.daemon_type == 'osd':
+            if self.osdspec_affinity and self.osdspec_affinity != 'None':
+                return self.osdspec_affinity
+            return ''
+
+        def _match() -> str:
+            assert self.daemon_id is not None
+            err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
+                                    f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
 
             if not self.hostname:
                 # TODO: can a DaemonDescription exist without a hostname?
@@ -1358,55 +998,145 @@ class DaemonDescription(object):
                 if len(v) in [3, 4]:
                     return '.'.join(v[0:2])
 
+            if self.daemon_type == 'iscsi':
+                v = self.daemon_id.split('.')
+                return '.'.join(v[0:-1])
+
             # daemon_id == "service_id"
             return self.daemon_id
 
-        if self.daemon_type in ['mds', 'nfs', 'iscsi', 'rgw']:
+        if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
             return _match()
 
         return self.daemon_id
 
-    def service_name(self):
-        if self.daemon_type in ['rgw', 'mds', 'nfs', 'iscsi']:
-            return f'{self.daemon_type}.{self.service_id()}'
-        return self.daemon_type
+    def service_name(self) -> str:
+        if self._service_name:
+            return self._service_name
+        assert self.daemon_type is not None
+        if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
+            return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
+        return daemon_type_to_service(self.daemon_type)
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
-    def to_json(self):
-        out = {
-            'hostname': self.hostname,
-            'container_id': self.container_id,
-            'container_image_id': self.container_image_id,
-            'container_image_name': self.container_image_name,
-            'daemon_id': self.daemon_id,
-            'daemon_type': self.daemon_type,
-            'version': self.version,
-            'status': self.status,
-            'status_desc': self.status_desc,
-        }
+    def __str__(self) -> str:
+        return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
+    def to_json(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
+        out['daemon_type'] = self.daemon_type
+        out['daemon_id'] = self.daemon_id
+        out['service_name'] = self._service_name
+        out['daemon_name'] = self.name()
+        out['hostname'] = self.hostname
+        out['container_id'] = self.container_id
+        out['container_image_id'] = self.container_image_id
+        out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
+        out['version'] = self.version
+        out['status'] = self.status.value if self.status is not None else None
+        out['status_desc'] = self.status_desc
+        if self.daemon_type == 'osd':
+            out['osdspec_affinity'] = self.osdspec_affinity
+        out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+        out['rank'] = self.rank
+        out['rank_generation'] = self.rank_generation
+
+        for k in ['last_refresh', 'created', 'started', 'last_deployed',
+                  'last_configured']:
+            if getattr(self, k):
+                out[k] = datetime_to_str(getattr(self, k))
+
+        if self.events:
+            out['events'] = [e.to_json() for e in self.events]
+
+        empty = [k for k, v in out.items() if v is None]
+        for e in empty:
+            del out[e]
+        return out
+
+    def to_dict(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
+        out['daemon_type'] = self.daemon_type
+        out['daemon_id'] = self.daemon_id
+        out['daemon_name'] = self.name()
+        out['hostname'] = self.hostname
+        out['container_id'] = self.container_id
+        out['container_image_id'] = self.container_image_id
+        out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
+        out['version'] = self.version
+        out['status'] = self.status.value if self.status is not None else None
+        out['status_desc'] = self.status_desc
+        if self.daemon_type == 'osd':
+            out['osdspec_affinity'] = self.osdspec_affinity
+        out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if getattr(self, k):
-                out[k] = getattr(self, k).strftime(DATEFMT)
-        return {k: v for (k, v) in out.items() if v is not None}
+                out[k] = datetime_to_str(getattr(self, k))
+
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+
+        empty = [k for k, v in out.items() if v is None]
+        for e in empty:
+            del out[e]
+        return out
 
     @classmethod
     @handle_type_error
-    def from_json(cls, data):
+    def from_json(cls, data: dict) -> 'DaemonDescription':
         c = data.copy()
+        event_strs = c.pop('events', [])
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if k in c:
-                c[k] = datetime.datetime.strptime(c[k], DATEFMT)
-        return cls(**c)
+                c[k] = str_to_datetime(c[k])
+        events = [OrchestratorEvent.from_json(e) for e in event_strs]
+        status_int = c.pop('status', None)
+        if 'daemon_name' in c:
+            del c['daemon_name']
+        if 'service_name' in c and c['service_name'].startswith('osd.'):
+            # if the service_name is a osd.NNN (numeric osd id) then
+            # ignore it -- it is not a valid service_name and
+            # (presumably) came from an older version of cephadm.
+            try:
+                int(c['service_name'][4:])
+                del c['service_name']
+            except ValueError:
+                pass
+        status = DaemonDescriptionStatus(status_int) if status_int is not None else None
+        return cls(events=events, status=status, **c)
 
-    def __copy__(self):
+    def __copy__(self) -> 'DaemonDescription':
         # feel free to change this:
         return DaemonDescription.from_json(self.to_json())
 
+    @staticmethod
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
+
+
+yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
+
+
 class ServiceDescription(object):
     """
     For responding to queries about the status of a particular service,
@@ -1422,24 +1152,23 @@ class ServiceDescription(object):
 
     def __init__(self,
                  spec: ServiceSpec,
-                 container_image_id=None,
-                 container_image_name=None,
-                 rados_config_location=None,
-                 service_url=None,
-                 last_refresh=None,
-                 created=None,
-                 size=0,
-                 running=0):
+                 container_image_id: Optional[str] = None,
+                 container_image_name: Optional[str] = None,
+                 service_url: Optional[str] = None,
+                 last_refresh: Optional[datetime.datetime] = None,
+                 created: Optional[datetime.datetime] = None,
+                 deleted: Optional[datetime.datetime] = None,
+                 size: int = 0,
+                 running: int = 0,
+                 events: Optional[List['OrchestratorEvent']] = None,
+                 virtual_ip: Optional[str] = None,
+                 ports: List[int] = []) -> None:
         # Not everyone runs in containers, but enough people do to
         # justify having the container_image_id (image hash) and container_image
         # (image name)
         self.container_image_id = container_image_id      # image hash
         self.container_image_name = container_image_name  # image friendly name
 
-        # Location of the service configuration when stored in rados
-        # object. Format: "rados://<pool>/[<namespace/>]<object>"
-        self.rados_config_location = rados_config_location
-
         # If the service exposes REST-like API, this attribute should hold
         # the URL.
         self.service_url = service_url
@@ -1451,48 +1180,93 @@ class ServiceDescription(object):
         self.running = running
 
         # datetime when this info was last refreshed
-        self.last_refresh = last_refresh   # type: Optional[datetime.datetime]
-        self.created = created   # type: Optional[datetime.datetime]
+        self.last_refresh: Optional[datetime.datetime] = last_refresh
+        self.created: Optional[datetime.datetime] = created
+        self.deleted: Optional[datetime.datetime] = deleted
 
         self.spec: ServiceSpec = spec
 
-    def service_type(self):
+        self.events: List[OrchestratorEvent] = events or []
+
+        self.virtual_ip = virtual_ip
+        self.ports = ports
+
+    def service_type(self) -> str:
         return self.spec.service_type
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return f"<ServiceDescription of {self.spec.one_line_str()}>"
 
-    def to_json(self):
+    def get_port_summary(self) -> str:
+        if not self.ports:
+            return ''
+        return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
+
+    def to_json(self) -> OrderedDict:
         out = self.spec.to_json()
         status = {
             'container_image_id': self.container_image_id,
             'container_image_name': self.container_image_name,
-            'rados_config_location': self.rados_config_location,
             'service_url': self.service_url,
             'size': self.size,
             'running': self.running,
             'last_refresh': self.last_refresh,
-            'created': self.created
+            'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
         }
         for k in ['last_refresh', 'created']:
             if getattr(self, k):
-                status[k] = getattr(self, k).strftime(DATEFMT)
+                status[k] = datetime_to_str(getattr(self, k))
         status = {k: v for (k, v) in status.items() if v is not None}
         out['status'] = status
+        if self.events:
+            out['events'] = [e.to_json() for e in self.events]
+        return out
+
+    def to_dict(self) -> OrderedDict:
+        out = self.spec.to_json()
+        status = {
+            'container_image_id': self.container_image_id,
+            'container_image_name': self.container_image_name,
+            'service_url': self.service_url,
+            'size': self.size,
+            'running': self.running,
+            'last_refresh': self.last_refresh,
+            'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
+        }
+        for k in ['last_refresh', 'created']:
+            if getattr(self, k):
+                status[k] = datetime_to_str(getattr(self, k))
+        status = {k: v for (k, v) in status.items() if v is not None}
+        out['status'] = status
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
         return out
 
     @classmethod
     @handle_type_error
-    def from_json(cls, data: dict):
+    def from_json(cls, data: dict) -> 'ServiceDescription':
         c = data.copy()
         status = c.pop('status', {})
+        event_strs = c.pop('events', [])
         spec = ServiceSpec.from_json(c)
 
         c_status = status.copy()
         for k in ['last_refresh', 'created']:
             if k in c_status:
-                c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
-        return cls(spec=spec, **c_status)
+                c_status[k] = str_to_datetime(c_status[k])
+        events = [OrchestratorEvent.from_json(e) for e in event_strs]
+        return cls(spec=spec, events=events, **c_status)
+
+    @staticmethod
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
+
+
+yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
 
 
 class InventoryFilter(object):
@@ -1500,16 +1274,17 @@ class InventoryFilter(object):
     When fetching inventory, use this filter to avoid unnecessarily
     scanning the whole estate.
 
-    Typical use: filter by host when presenting UI workflow for configuring
-                 a particular server.
-                 filter by label when not all of estate is Ceph servers,
-                 and we want to only learn about the Ceph servers.
-                 filter by label when we are interested particularly
-                 in e.g. OSD servers.
+    Typical use:
 
+      filter by host when presentig UI workflow for configuring
+      a particular server.
+      filter by label when not all of estate is Ceph servers,
+      and we want to only learn about the Ceph servers.
+      filter by label when we are interested particularly
+      in e.g. OSD servers.
     """
-    def __init__(self, labels=None, hosts=None):
-        # type: (Optional[List[str]], Optional[List[str]]) -> None
+
+    def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
 
         #: Optional: get info about hosts matching labels
         self.labels = labels
@@ -1523,8 +1298,8 @@ class InventoryHost(object):
     When fetching inventory, all Devices are groups inside of an
     InventoryHost.
     """
-    def __init__(self, name, devices=None, labels=None, addr=None):
-        # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
+
+    def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
         if devices is None:
             devices = inventory.Devices([])
         if labels is None:
@@ -1536,7 +1311,7 @@ class InventoryHost(object):
         self.devices = devices
         self.labels = labels
 
-    def to_json(self):
+    def to_json(self) -> dict:
         return {
             'name': self.name,
             'addr': self.addr,
@@ -1545,7 +1320,7 @@ class InventoryHost(object):
         }
 
     @classmethod
-    def from_json(cls, data):
+    def from_json(cls, data: dict) -> 'InventoryHost':
         try:
             _data = copy.deepcopy(data)
             name = _data.pop('name')
@@ -1562,21 +1337,19 @@ class InventoryHost(object):
         except TypeError as e:
             raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
 
-
     @classmethod
-    def from_nested_items(cls, hosts):
+    def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
         devs = inventory.Devices.from_json
         return [cls(item[0], devs(item[1].data)) for item in hosts]
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "<InventoryHost>({name})".format(name=self.name)
 
     @staticmethod
-    def get_host_names(hosts):
-        # type: (List[InventoryHost]) -> List[str]
+    def get_host_names(hosts: List['InventoryHost']) -> List[str]:
         return [host.name for host in hosts]
 
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
         return self.name == other.name and self.devices == other.devices
 
 
@@ -1593,18 +1366,92 @@ class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
     __slots__ = ()
 
 
-def _mk_orch_methods(cls):
+class OrchestratorEvent:
+    """
+    Similar to K8s Events.
+
+    Some form of "important" log message attached to something.
+    """
+    INFO = 'INFO'
+    ERROR = 'ERROR'
+    regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
+
+    def __init__(self, created: Union[str, datetime.datetime], kind: str,
+                 subject: str, level: str, message: str) -> None:
+        if isinstance(created, str):
+            created = str_to_datetime(created)
+        self.created: datetime.datetime = created
+
+        assert kind in "service daemon".split()
+        self.kind: str = kind
+
+        # service name, or daemon danem or something
+        self.subject: str = subject
+
+        # Events are not meant for debugging. debugs should end in the log.
+        assert level in "INFO ERROR".split()
+        self.level = level
+
+        self.message: str = message
+
+    __slots__ = ('created', 'kind', 'subject', 'level', 'message')
+
+    def kind_subject(self) -> str:
+        return f'{self.kind}:{self.subject}'
+
+    def to_json(self) -> str:
+        # Make a long list of events readable.
+        created = datetime_to_str(self.created)
+        return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
+
+    def to_dict(self) -> dict:
+        # Convert events data to dict.
+        return {
+            'created': datetime_to_str(self.created),
+            'subject': self.kind_subject(),
+            'level': self.level,
+            'message': self.message
+        }
+
+    @classmethod
+    @handle_type_error
+    def from_json(cls, data: str) -> "OrchestratorEvent":
+        """
+        >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
+        '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+
+        :param data:
+        :return:
+        """
+        match = cls.regex_v1.match(data)
+        if match:
+            return cls(*match.groups())
+        raise ValueError(f'Unable to match: "{data}"')
+
+    def __eq__(self, other: Any) -> bool:
+        if not isinstance(other, OrchestratorEvent):
+            return False
+
+        return self.created == other.created and self.kind == other.kind \
+            and self.subject == other.subject and self.message == other.message
+
+    def __repr__(self) -> str:
+        return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
+
+def _mk_orch_methods(cls: Any) -> Any:
     # Needs to be defined outside of for.
     # Otherwise meth is always bound to last key
-    def shim(method_name):
-        def inner(self, *args, **kwargs):
+    def shim(method_name: str) -> Callable:
+        def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
             completion = self._oremote(method_name, args, kwargs)
             return completion
         return inner
 
-    for meth in Orchestrator.__dict__:
-        if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
-            setattr(cls, meth, shim(meth))
+    for name, method in Orchestrator.__dict__.items():
+        if not name.startswith('_') and name not in ['is_orchestrator_module']:
+            remote_call = update_wrapper(shim(name), method)
+            setattr(cls, name, remote_call)
     return cls
 
 
@@ -1620,7 +1467,6 @@ class OrchestratorClientMixin(Orchestrator):
     >>> class MyModule(OrchestratorClientMixin):
     ...    def func(self):
     ...        completion = self.add_host('somehost')  # calls `_oremote()`
-    ...        self._orchestrator_wait([completion])
     ...        self.log.debug(completion.result)
 
     .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
@@ -1636,21 +1482,20 @@ class OrchestratorClientMixin(Orchestrator):
     ...         self.orch_client.set_mgr(self.mgr))
     """
 
-    def set_mgr(self, mgr):
-        # type: (MgrModule) -> None
+    def set_mgr(self, mgr: MgrModule) -> None:
         """
         Useable in the Dashbord that uses a global ``mgr``
         """
 
         self.__mgr = mgr  # Make sure we're not overwriting any other `mgr` properties
 
-    def __get_mgr(self):
+    def __get_mgr(self) -> Any:
         try:
             return self.__mgr
         except AttributeError:
             return self
 
-    def _oremote(self, meth, args, kwargs):
+    def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
         """
         Helper for invoking `remote` on whichever orchestrator is enabled
 
@@ -1678,25 +1523,3 @@ class OrchestratorClientMixin(Orchestrator):
             if meth not in f_set or not f_set[meth]['available']:
                 raise NotImplementedError(f'{o} does not implement {meth}') from e
             raise
-
-    def _orchestrator_wait(self, completions):
-        # type: (List[Completion]) -> None
-        """
-        Wait for completions to complete (reads) or
-        become persistent (writes).
-
-        Waits for writes to be *persistent* but not *effective*.
-
-        :param completions: List of Completions
-        :raises NoOrchestrator:
-        :raises RuntimeError: something went wrong while calling the process method.
-        :raises ImportError: no `orchestrator` module or backend not found.
-        """
-        while any(not c.has_result for c in completions):
-            self.process(completions)
-            self.__get_mgr().log.info("Operations pending: %s",
-                                      sum(1 for c in completions if not c.has_result))
-            if any(c.needs_result for c in completions):
-                time.sleep(1)
-            else:
-                break