]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
index 117d3d0b3d6d07ea2db164ff8486e43dc52469b0..b0ccf73570b7772f4d2957efea31d50e69c6886d 100644 (file)
@@ -7,38 +7,42 @@ Please see the ceph-mgr module developer's guide for more information.
 
 import copy
 import datetime
 
 import copy
 import datetime
+import enum
 import errno
 import logging
 import pickle
 import re
 import errno
 import logging
 import pickle
 import re
-import time
-import uuid
 
 from collections import namedtuple, OrderedDict
 from contextlib import contextmanager
 
 from collections import namedtuple, OrderedDict
 from contextlib import contextmanager
-from functools import wraps
+from functools import wraps, reduce, update_wrapper
+
+from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
+    Sequence, Dict, cast, Mapping
+
+try:
+    from typing import Protocol  # Protocol was added in Python 3.8
+except ImportError:
+    class Protocol:  # type: ignore
+        pass
+
 
 import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
 
 import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
-    ServiceSpecValidationError, IscsiServiceSpec
+    IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.hostspec import HostSpec
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
+from ceph.utils import datetime_to_str, str_to_datetime
 
 from mgr_module import MgrModule, CLICommand, HandleCommandResult
 
 
 from mgr_module import MgrModule, CLICommand, HandleCommandResult
 
-try:
-    from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
-        Type, Sequence, Dict, cast
-except ImportError:
-    pass
 
 logger = logging.getLogger(__name__)
 
 
 logger = logging.getLogger(__name__)
 
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
-
 T = TypeVar('T')
 T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
 
 
 class OrchestratorError(Exception):
 
 
 class OrchestratorError(Exception):
@@ -53,7 +57,7 @@ class OrchestratorError(Exception):
     def __init__(self,
                  msg: str,
                  errno: int = -errno.EINVAL,
     def __init__(self,
                  msg: str,
                  errno: int = -errno.EINVAL,
-                 event_kind_subject: Optional[Tuple[str, str]] = None):
+                 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
         super(Exception, self).__init__(msg)
         self.errno = errno
         # See OrchestratorEvent.subject
         super(Exception, self).__init__(msg)
         self.errno = errno
         # See OrchestratorEvent.subject
@@ -65,7 +69,7 @@ class NoOrchestrator(OrchestratorError):
     No orchestrator in configured.
     """
 
     No orchestrator in configured.
     """
 
-    def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
+    def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
         super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
 
 
         super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
 
 
@@ -76,7 +80,7 @@ class OrchestratorValidationError(OrchestratorError):
 
 
 @contextmanager
 
 
 @contextmanager
-def set_exception_subject(kind, subject, overwrite=False):
+def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
     try:
         yield
     except OrchestratorError as e:
     try:
         yield
     except OrchestratorError as e:
@@ -85,12 +89,12 @@ def set_exception_subject(kind, subject, overwrite=False):
         raise
 
 
         raise
 
 
-def handle_exception(prefix, cmd_args, desc, perm, func):
+def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
     @wraps(func)
     @wraps(func)
-    def wrapper(*args, **kwargs):
+    def wrapper(*args: Any, **kwargs: Any) -> Any:
         try:
             return func(*args, **kwargs)
         try:
             return func(*args, **kwargs)
-        except (OrchestratorError, ServiceSpecValidationError) as e:
+        except (OrchestratorError, SpecValidationError) as e:
             # Do not print Traceback for expected errors.
             return HandleCommandResult(e.errno, stderr=str(e))
         except ImportError as e:
             # Do not print Traceback for expected errors.
             return HandleCommandResult(e.errno, stderr=str(e))
         except ImportError as e:
@@ -99,18 +103,44 @@ def handle_exception(prefix, cmd_args, desc, perm, func):
             msg = 'This Orchestrator does not support `{}`'.format(prefix)
             return HandleCommandResult(-errno.ENOENT, stderr=msg)
 
             msg = 'This Orchestrator does not support `{}`'.format(prefix)
             return HandleCommandResult(-errno.ENOENT, stderr=msg)
 
-    # misuse partial to copy `wrapper`
-    wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
+    # misuse lambda to copy `wrapper`
+    wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)  # noqa: E731
     wrapper_copy._prefix = prefix  # type: ignore
     wrapper_copy._prefix = prefix  # type: ignore
-    wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm)  # type: ignore
+    wrapper_copy._cli_command = CLICommand(prefix, perm)  # type: ignore
+    wrapper_copy._cli_command.store_func_metadata(func)  # type: ignore
     wrapper_copy._cli_command.func = wrapper_copy  # type: ignore
 
     wrapper_copy._cli_command.func = wrapper_copy  # type: ignore
 
-    return wrapper_copy
+    return cast(FuncT, wrapper_copy)
+
+
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+    """
+    Decorator to make Orchestrator methods return
+    an OrchResult.
+    """
+
+    @wraps(f)
+    def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+        try:
+            return OrchResult(f(*args, **kwargs))
+        except Exception as e:
+            logger.exception(e)
+            import os
+            if 'UNITTEST' in os.environ:
+                raise  # This makes debugging of Tracebacks from unittests a bit easier
+            return OrchResult(None, exception=e)
+
+    return cast(Callable[..., OrchResult[T]], wrapper)
 
 
 
 
-def _cli_command(perm):
-    def inner_cli_command(prefix, cmd_args="", desc=""):
-        return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
+class InnerCliCommandCallable(Protocol):
+    def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
+        ...
+
+
+def _cli_command(perm: str) -> InnerCliCommandCallable:
+    def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
+        return lambda func: handle_exception(prefix, perm, func)
     return inner_cli_command
 
 
     return inner_cli_command
 
 
@@ -125,7 +155,7 @@ class CLICommandMeta(type):
 
     We make use of CLICommand, except for the use of the global variable.
     """
 
     We make use of CLICommand, except for the use of the global variable.
     """
-    def __init__(cls, name, bases, dct):
+    def __init__(cls, name: str, bases: Any, dct: Any) -> None:
         super(CLICommandMeta, cls).__init__(name, bases, dct)
         dispatch: Dict[str, CLICommand] = {}
         for v in dct.values():
         super(CLICommandMeta, cls).__init__(name, bases, dct)
         dispatch: Dict[str, CLICommand] = {}
         for v in dct.values():
@@ -134,7 +164,7 @@ class CLICommandMeta(type):
             except AttributeError:
                 pass
 
             except AttributeError:
                 pass
 
-        def handle_command(self, inbuf, cmd):
+        def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
             if cmd['prefix'] not in dispatch:
                 return self.handle_command(inbuf, cmd)
 
             if cmd['prefix'] not in dispatch:
                 return self.handle_command(inbuf, cmd)
 
@@ -144,57 +174,30 @@ class CLICommandMeta(type):
         cls.handle_command = handle_command
 
 
         cls.handle_command = handle_command
 
 
-def _no_result():
-    return object()
-
-
-class _Promise(object):
+class OrchResult(Generic[T]):
     """
     """
-    A completion may need multiple promises to be fulfilled. `_Promise` is one
-    step.
-
-    Typically ``Orchestrator`` implementations inherit from this class to
-    build their own way of finishing a step to fulfil a future.
-
-    They are not exposed in the orchestrator interface and can be seen as a
-    helper to build orchestrator modules.
+    Stores a result and an exception. Mainly to circumvent the
+    MgrModule.remote() method that hides all exceptions and for
+    handling different sub-interpreters.
     """
     """
-    INITIALIZED = 1  # We have a parent completion and a next completion
-    RUNNING = 2
-    FINISHED = 3  # we have a final result
 
 
-    NO_RESULT: None = _no_result()
-    ASYNC_RESULT = object()
-
-    def __init__(self,
-                 _first_promise: Optional["_Promise"] = None,
-                 value: Optional[Any] = NO_RESULT,
-                 on_complete: Optional[Callable] = None,
-                 name: Optional[str] = None,
-                 ):
-        self._on_complete_ = on_complete
-        self._name = name
-        self._next_promise: Optional[_Promise] = None
+    def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+        self.result = result
+        self.serialized_exception: Optional[bytes] = None
+        self.exception_str: str = ''
+        self.set_exception(exception)
 
 
-        self._state = self.INITIALIZED
-        self._exception: Optional[Exception] = None
+    __slots__ = 'result', 'serialized_exception', 'exception_str'
 
 
-        # Value of this _Promise. may be an intermediate result.
-        self._value = value
+    def set_exception(self, e: Optional[Exception]) -> None:
+        if e is None:
+            self.serialized_exception = None
+            self.exception_str = ''
+            return
 
 
-        # _Promise is not a continuation monad, as `_result` is of type
-        # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
-        self._first_promise: '_Promise' = _first_promise or self
-
-    @property
-    def _exception(self) -> Optional[Exception]:
-        return getattr(self, '_exception_', None)
-
-    @_exception.setter
-    def _exception(self, e):
-        self._exception_ = e
+        self.exception_str = f'{type(e)}: {str(e)}'
         try:
         try:
-            self._serialized_exception_ = pickle.dumps(e) if e is not None else None
+            self.serialized_exception = pickle.dumps(e)
         except pickle.PicklingError:
             logger.error(f"failed to pickle {e}")
             if isinstance(e, Exception):
         except pickle.PicklingError:
             logger.error(f"failed to pickle {e}")
             if isinstance(e, Exception):
@@ -202,365 +205,7 @@ class _Promise(object):
             else:
                 e = Exception(str(e))
             # degenerate to a plain Exception
             else:
                 e = Exception(str(e))
             # degenerate to a plain Exception
-            self._serialized_exception_ = pickle.dumps(e)
-
-    @property
-    def _serialized_exception(self) -> Optional[bytes]:
-        return getattr(self, '_serialized_exception_', None)
-
-    @property
-    def _on_complete(self) -> Optional[Callable]:
-        # https://github.com/python/mypy/issues/4125
-        return self._on_complete_
-
-    @_on_complete.setter
-    def _on_complete(self, val: Optional[Callable]) -> None:
-        self._on_complete_ = val
-
-    def __repr__(self):
-        name = self._name or getattr(self._on_complete, '__name__',
-                                     '??') if self._on_complete else 'None'
-        val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
-        return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
-            self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
-                next, '_progress_reference', 'NA'), repr(self._next_promise)
-        )
-
-    def pretty_print_1(self):
-        if self._name:
-            name = self._name
-        elif self._on_complete is None:
-            name = 'lambda x: x'
-        elif hasattr(self._on_complete, '__name__'):
-            name = getattr(self._on_complete, '__name__')
-        else:
-            name = self._on_complete.__class__.__name__
-        val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
-        prefix = {
-            self.INITIALIZED: '      ',
-            self.RUNNING:     '   >>>',
-            self.FINISHED:    '(done)'
-        }[self._state]
-        return '{} {}({}),'.format(prefix, name, val)
-
-    def then(self: Any, on_complete: Callable) -> Any:
-        """
-        Call ``on_complete`` as soon as this promise is finalized.
-        """
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        if self._next_promise is not None:
-            return self._next_promise.then(on_complete)
-
-        if self._on_complete is not None:
-            self._set_next_promise(self.__class__(
-                _first_promise=self._first_promise,
-                on_complete=on_complete
-            ))
-            return self._next_promise
-
-        else:
-            self._on_complete = on_complete
-            self._set_next_promise(self.__class__(_first_promise=self._first_promise))
-            return self._next_promise
-
-    def _set_next_promise(self, next: '_Promise') -> None:
-        assert self is not next
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        self._next_promise = next
-        assert self._next_promise is not None
-        for p in iter(self._next_promise):
-            p._first_promise = self._first_promise
-
-    def _finalize(self, value=NO_RESULT):
-        """
-        Sets this promise to complete.
-
-        Orchestrators may choose to use this helper function.
-
-        :param value: new value.
-        """
-        if self._state not in (self.INITIALIZED, self.RUNNING):
-            raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
-        self._state = self.RUNNING
-
-        if value is not self.NO_RESULT:
-            self._value = value
-        assert self._value is not self.NO_RESULT, repr(self)
-
-        if self._on_complete:
-            try:
-                next_result = self._on_complete(self._value)
-            except Exception as e:
-                self.fail(e)
-                return
-        else:
-            next_result = self._value
-
-        if isinstance(next_result, _Promise):
-            # hack: _Promise is not a continuation monad.
-            next_result = next_result._first_promise  # type: ignore
-            assert next_result not in self, repr(self._first_promise) + repr(next_result)
-            assert self not in next_result
-            next_result._append_promise(self._next_promise)
-            self._set_next_promise(next_result)
-            assert self._next_promise
-            if self._next_promise._value is self.NO_RESULT:
-                self._next_promise._value = self._value
-            self.propagate_to_next()
-        elif next_result is not self.ASYNC_RESULT:
-            # simple map. simply forward
-            if self._next_promise:
-                self._next_promise._value = next_result
-            else:
-                # Hack: next_result is of type U, _value is of type T
-                self._value = next_result  # type: ignore
-            self.propagate_to_next()
-        else:
-            # asynchronous promise
-            pass
-
-    def propagate_to_next(self):
-        self._state = self.FINISHED
-        logger.debug('finalized {}'.format(repr(self)))
-        if self._next_promise:
-            self._next_promise._finalize()
-
-    def fail(self, e: Exception) -> None:
-        """
-        Sets the whole completion to be faild with this exception and end the
-        evaluation.
-        """
-        if self._state == self.FINISHED:
-            raise ValueError(
-                'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-        logger.exception('_Promise failed')
-        self._exception = e
-        self._value = f'_exception: {e}'
-        if self._next_promise:
-            self._next_promise.fail(e)
-        self._state = self.FINISHED
-
-    def __contains__(self, item):
-        return any(item is p for p in iter(self._first_promise))
-
-    def __iter__(self):
-        yield self
-        elem = self._next_promise
-        while elem is not None:
-            yield elem
-            elem = elem._next_promise
-
-    def _append_promise(self, other):
-        if other is not None:
-            assert self not in other
-            assert other not in self
-            self._last_promise()._set_next_promise(other)
-
-    def _last_promise(self) -> '_Promise':
-        return list(iter(self))[-1]
-
-
-class ProgressReference(object):
-    def __init__(self,
-                 message: str,
-                 mgr,
-                 completion: Optional[Callable[[], 'Completion']] = None
-                 ):
-        """
-        ProgressReference can be used within Completions::
-
-            +---------------+      +---------------------------------+
-            |               | then |                                 |
-            | My Completion | +--> | on_complete=ProgressReference() |
-            |               |      |                                 |
-            +---------------+      +---------------------------------+
-
-        See :func:`Completion.with_progress` for an easy way to create
-        a progress reference
-
-        """
-        super(ProgressReference, self).__init__()
-        self.progress_id = str(uuid.uuid4())
-        self.message = message
-        self.mgr = mgr
-
-        #: The completion can already have a result, before the write
-        #: operation is effective. progress == 1 means, the services are
-        #: created / removed.
-        self.completion: Optional[Callable[[], Completion]] = completion
-
-        #: if a orchestrator module can provide a more detailed
-        #: progress information, it needs to also call ``progress.update()``.
-        self.progress = 0.0
-
-        self._completion_has_result = False
-        self.mgr.all_progress_references.append(self)
-
-    def __str__(self):
-        """
-        ``__str__()`` is used for determining the message for progress events.
-        """
-        return self.message or super(ProgressReference, self).__str__()
-
-    def __call__(self, arg):
-        self._completion_has_result = True
-        self.progress = 1.0
-        return arg
-
-    @property
-    def progress(self):
-        return self._progress
-
-    @progress.setter
-    def progress(self, progress):
-        assert progress <= 1.0
-        self._progress = progress
-        try:
-            if self.effective:
-                self.mgr.remote("progress", "complete", self.progress_id)
-                self.mgr.all_progress_references = [
-                    p for p in self.mgr.all_progress_references if p is not self]
-            else:
-                self.mgr.remote("progress", "update", self.progress_id, self.message,
-                                progress,
-                                [("origin", "orchestrator")])
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
-
-    @property
-    def effective(self):
-        return self.progress == 1 and self._completion_has_result
-
-    def update(self):
-        def progress_run(progress):
-            self.progress = progress
-        if self.completion:
-            c = self.completion().then(progress_run)
-            self.mgr.process([c._first_promise])
-        else:
-            self.progress = 1
-
-    def fail(self):
-        self._completion_has_result = True
-        self.progress = 1
-
-
-class Completion(_Promise, Generic[T]):
-    """
-    Combines multiple promises into one overall operation.
-
-    Completions are composable by being able to
-    call one completion from another completion. I.e. making them re-usable
-    using Promises E.g.::
-
-        >>> #doctest: +SKIP
-        ... return Orchestrator().get_hosts().then(self._create_osd)
-
-    where ``get_hosts`` returns a Completion of list of hosts and
-    ``_create_osd`` takes a list of hosts.
-
-    The concept behind this is to store the computation steps
-    explicit and then explicitly evaluate the chain:
-
-        >>> #doctest: +SKIP
-        ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
-        ... p.finalize(2)
-        ... assert p.result = "4"
-
-    or graphically::
-
-        +---------------+      +-----------------+
-        |               | then |                 |
-        | lambda x: x*x | +--> | lambda x: str(x)|
-        |               |      |                 |
-        +---------------+      +-----------------+
-
-    """
-
-    def __init__(self,
-                 _first_promise: Optional["Completion"] = None,
-                 value: Any = _Promise.NO_RESULT,
-                 on_complete: Optional[Callable] = None,
-                 name: Optional[str] = None,
-                 ):
-        super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
-    @property
-    def _progress_reference(self) -> Optional[ProgressReference]:
-        if hasattr(self._on_complete, 'progress_id'):
-            return self._on_complete  # type: ignore
-        return None
-
-    @property
-    def progress_reference(self) -> Optional[ProgressReference]:
-        """
-        ProgressReference. Marks this completion
-        as a write completeion.
-        """
-
-        references = [c._progress_reference for c in iter(
-            self) if c._progress_reference is not None]
-        if references:
-            assert len(references) == 1
-            return references[0]
-        return None
-
-    @classmethod
-    def with_progress(cls: Any,
-                      message: str,
-                      mgr,
-                      _first_promise: Optional["Completion"] = None,
-                      value: Any = _Promise.NO_RESULT,
-                      on_complete: Optional[Callable] = None,
-                      calc_percent: Optional[Callable[[], Any]] = None
-                      ) -> Any:
-
-        c = cls(
-            _first_promise=_first_promise,
-            value=value,
-            on_complete=on_complete
-        ).add_progress(message, mgr, calc_percent)
-
-        return c._first_promise
-
-    def add_progress(self,
-                     message: str,
-                     mgr,
-                     calc_percent: Optional[Callable[[], Any]] = None
-                     ):
-        return self.then(
-            on_complete=ProgressReference(
-                message=message,
-                mgr=mgr,
-                completion=calc_percent
-            )
-        )
-
-    def fail(self, e: Exception):
-        super(Completion, self).fail(e)
-        if self._progress_reference:
-            self._progress_reference.fail()
-
-    def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
-        if self._first_promise._state == self.INITIALIZED:
-            self._first_promise._finalize(result)
-
-    @property
-    def result(self) -> T:
-        """
-        The result of the operation that we were waited
-        for.  Only valid after calling Orchestrator.process() on this
-        completion.
-        """
-        last = self._last_promise()
-        assert last._state == _Promise.FINISHED
-        return cast(T, last._value)
+            self.serialized_exception = pickle.dumps(e)
 
     def result_str(self) -> str:
         """Force a string."""
 
     def result_str(self) -> str:
         """Force a string."""
@@ -570,92 +215,23 @@ class Completion(_Promise, Generic[T]):
             return '\n'.join(str(x) for x in self.result)
         return str(self.result)
 
             return '\n'.join(str(x) for x in self.result)
         return str(self.result)
 
-    @property
-    def exception(self) -> Optional[Exception]:
-        return self._last_promise()._exception
-
-    @property
-    def serialized_exception(self) -> Optional[bytes]:
-        return self._last_promise()._serialized_exception
 
 
-    @property
-    def has_result(self) -> bool:
-        """
-        Has the operation already a result?
-
-        For Write operations, it can already have a
-        result, if the orchestrator's configuration is
-        persistently written. Typically this would
-        indicate that an update had been written to
-        a manifest, but that the update had not
-        necessarily been pushed out to the cluster.
-
-        :return:
-        """
-        return self._last_promise()._state == _Promise.FINISHED
-
-    @property
-    def is_errored(self) -> bool:
-        """
-        Has the completion failed. Default implementation looks for
-        self.exception. Can be overwritten.
-        """
-        return self.exception is not None
-
-    @property
-    def needs_result(self) -> bool:
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return not self.is_errored and not self.has_result
-
-    @property
-    def is_finished(self) -> bool:
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return self.is_errored or (self.has_result)
-
-    def pretty_print(self):
-
-        reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
-        return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-
-
-def pretty_print(completions: Sequence[Completion]) -> str:
-    return ', '.join(c.pretty_print() for c in completions)
-
-
-def raise_if_exception(c: Completion) -> None:
+def raise_if_exception(c: OrchResult[T]) -> T:
     """
     """
-    :raises OrchestratorError: Some user error or a config error.
-    :raises Exception: Some internal error
+    Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
     """
     if c.serialized_exception is not None:
         try:
             e = pickle.loads(c.serialized_exception)
         except (KeyError, AttributeError):
     """
     if c.serialized_exception is not None:
         try:
             e = pickle.loads(c.serialized_exception)
         except (KeyError, AttributeError):
-            raise Exception('{}: {}'.format(type(c.exception), c.exception))
+            raise Exception(c.exception_str)
         raise e
         raise e
+    assert c.result is not None, 'OrchResult should either have an exception or a result'
+    return c.result
 
 
 
 
-class TrivialReadCompletion(Completion[T]):
-    """
-    This is the trivial completion simply wrapping a result.
-    """
-
-    def __init__(self, result: T):
-        super(TrivialReadCompletion, self).__init__()
-        if result:
-            self.finalize(result)
-
-
-def _hide_in_features(f):
-    f._hide_in_features = True
+def _hide_in_features(f: FuncT) -> FuncT:
+    f._hide_in_features = True  # type: ignore
     return f
 
 
     return f
 
 
@@ -683,7 +259,7 @@ class Orchestrator(object):
     """
 
     @_hide_in_features
     """
 
     @_hide_in_features
-    def is_orchestrator_module(self):
+    def is_orchestrator_module(self) -> bool:
         """
         Enable other modules to interrogate this module to discover
         whether it's usable as an orchestrator module.
         """
         Enable other modules to interrogate this module to discover
         whether it's usable as an orchestrator module.
@@ -693,7 +269,7 @@ class Orchestrator(object):
         return True
 
     @_hide_in_features
         return True
 
     @_hide_in_features
-    def available(self) -> Tuple[bool, str]:
+    def available(self) -> Tuple[bool, str, Dict[str, Any]]:
         """
         Report whether we can talk to the orchestrator.  This is the
         place to give the user a meaningful message if the orchestrator
         """
         Report whether we can talk to the orchestrator.  This is the
         place to give the user a meaningful message if the orchestrator
@@ -714,27 +290,14 @@ class Orchestrator(object):
                 ... if OrchestratorClientMixin().available()[0]:  # wrong.
                 ...     OrchestratorClientMixin().get_hosts()
 
                 ... if OrchestratorClientMixin().available()[0]:  # wrong.
                 ...     OrchestratorClientMixin().get_hosts()
 
-        :return: two-tuple of boolean, string
+        :return: boolean representing whether the module is available/usable
+        :return: string describing any error
+        :return: dict containing any module specific information
         """
         raise NotImplementedError()
 
     @_hide_in_features
         """
         raise NotImplementedError()
 
     @_hide_in_features
-    def process(self, completions: List[Completion]) -> None:
-        """
-        Given a list of Completion instances, process any which are
-        incomplete.
-
-        Callers should inspect the detail of each completion to identify
-        partial completion/progress information, and present that information
-        to the user.
-
-        This method should not block, as this would make it slow to query
-        a status, while other long running operations are in progress.
-        """
-        raise NotImplementedError()
-
-    @_hide_in_features
-    def get_feature_set(self):
+    def get_feature_set(self) -> Dict[str, dict]:
         """Describes which methods this orchestrator implements
 
         .. note::
         """Describes which methods this orchestrator implements
 
         .. note::
@@ -776,7 +339,7 @@ class Orchestrator(object):
     def resume(self) -> None:
         raise NotImplementedError()
 
     def resume(self) -> None:
         raise NotImplementedError()
 
-    def add_host(self, host_spec: HostSpec) -> Completion[str]:
+    def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
         """
         Add a host to the orchestrator inventory.
 
         """
         Add a host to the orchestrator inventory.
 
@@ -784,7 +347,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def remove_host(self, host: str) -> Completion[str]:
+    def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
         """
         Remove a host from the orchestrator inventory.
 
         """
         Remove a host from the orchestrator inventory.
 
@@ -792,7 +355,15 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def update_host_addr(self, host: str, addr: str) -> Completion[str]:
+    def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+        """
+        drain all daemons from a host
+
+        :param hostname: hostname
+        """
+        raise NotImplementedError()
+
+    def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
         """
         Update a host's address
 
         """
         Update a host's address
 
@@ -801,7 +372,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def get_hosts(self) -> Completion[List[HostSpec]]:
+    def get_hosts(self) -> OrchResult[List[HostSpec]]:
         """
         Report the hosts in the cluster.
 
         """
         Report the hosts in the cluster.
 
@@ -809,19 +380,25 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def add_host_label(self, host: str, label: str) -> Completion[str]:
+    def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+        """
+        Return hosts metadata(gather_facts).
+        """
+        raise NotImplementedError()
+
+    def add_host_label(self, host: str, label: str) -> OrchResult[str]:
         """
         Add a host label
         """
         raise NotImplementedError()
 
         """
         Add a host label
         """
         raise NotImplementedError()
 
-    def remove_host_label(self, host: str, label: str) -> Completion[str]:
+    def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a host label
         """
         raise NotImplementedError()
 
         """
         Remove a host label
         """
         raise NotImplementedError()
 
-    def host_ok_to_stop(self, hostname: str) -> Completion:
+    def host_ok_to_stop(self, hostname: str) -> OrchResult:
         """
         Check if the specified host can be safely stopped without reducing availability
 
         """
         Check if the specified host can be safely stopped without reducing availability
 
@@ -829,7 +406,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
+    def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
+        """
+        Place a host in maintenance, stopping daemons and disabling it's systemd target
+        """
+        raise NotImplementedError()
+
+    def exit_host_maintenance(self, hostname: str) -> OrchResult:
+        """
+        Return a host from maintenance, restarting the clusters systemd target
+        """
+        raise NotImplementedError()
+
+    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
         """
         Returns something that was created by `ceph-volume inventory`.
 
         """
         Returns something that was created by `ceph-volume inventory`.
 
@@ -837,7 +426,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
+    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -851,7 +440,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
+    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
         """
         Describe a daemon (of any kind) that is already configured in
         the orchestrator.
         """
         Describe a daemon (of any kind) that is already configured in
         the orchestrator.
@@ -860,11 +449,12 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
+    @handle_orch_error
+    def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
         """
         Applies any spec
         """
         """
         Applies any spec
         """
-        fns: Dict[str, function] = {
+        fns: Dict[str, Callable[..., OrchResult[str]]] = {
             'alertmanager': self.apply_alertmanager,
             'crash': self.apply_crash,
             'grafana': self.apply_grafana,
             'alertmanager': self.apply_alertmanager,
             'crash': self.apply_crash,
             'grafana': self.apply_grafana,
@@ -874,36 +464,31 @@ class Orchestrator(object):
             'mon': self.apply_mon,
             'nfs': self.apply_nfs,
             'node-exporter': self.apply_node_exporter,
             'mon': self.apply_mon,
             'nfs': self.apply_nfs,
             'node-exporter': self.apply_node_exporter,
-            'osd': lambda dg: self.apply_drivegroups([dg]),
+            'osd': lambda dg: self.apply_drivegroups([dg]),  # type: ignore
             'prometheus': self.apply_prometheus,
             'prometheus': self.apply_prometheus,
+            'loki': self.apply_loki,
+            'promtail': self.apply_promtail,
             'rbd-mirror': self.apply_rbd_mirror,
             'rgw': self.apply_rgw,
             'rbd-mirror': self.apply_rbd_mirror,
             'rgw': self.apply_rgw,
+            'ingress': self.apply_ingress,
+            'snmp-gateway': self.apply_snmp_gateway,
             'host': self.add_host,
         }
 
             'host': self.add_host,
         }
 
-        def merge(ls, r):
-            if isinstance(ls, list):
-                return ls + [r]
-            return [ls, r]
-
-        spec, *specs = specs
+        def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]:  # noqa: E741
+            l_res = raise_if_exception(l)
+            r_res = raise_if_exception(r)
+            l_res.append(r_res)
+            return OrchResult(l_res)
+        return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
 
 
-        fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-        completion = fn(spec)
-        for s in specs:
-            def next(ls):
-                fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-                return fn(s).then(lambda r: merge(ls, r))
-            completion = completion.then(next)
-        return completion
-
-    def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
+    def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
         """
         Plan (Dry-run, Preview) a List of Specs.
         """
         raise NotImplementedError()
 
         """
         Plan (Dry-run, Preview) a List of Specs.
         """
         raise NotImplementedError()
 
-    def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
+    def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
         """
         Remove specific daemon(s).
 
         """
         Remove specific daemon(s).
 
@@ -911,7 +496,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def remove_service(self, service_name: str) -> Completion[str]:
+    def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a service (a collection of daemons).
 
         """
         Remove a service (a collection of daemons).
 
@@ -919,7 +504,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
+    def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
         """
         Perform an action (start/stop/reload) on a service (i.e., all daemons
         providing the logical service).
         """
         Perform an action (start/stop/reload) on a service (i.e., all daemons
         providing the logical service).
@@ -927,24 +512,24 @@ class Orchestrator(object):
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param service_name: service_type + '.' + service_id
                             (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param service_name: service_type + '.' + service_id
                             (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
-        :rtype: Completion
+        :rtype: OrchResult
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
+    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
         """
         Perform an action (start/stop/reload) on a daemon.
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param daemon_name: name of daemon
         :param image: Container image when redeploying that daemon
         """
         Perform an action (start/stop/reload) on a daemon.
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param daemon_name: name of daemon
         :param image: Container image when redeploying that daemon
-        :rtype: Completion
+        :rtype: OrchResult
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
+    def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
         """
         Create one or more OSDs within a single Drive Group.
 
         """
         Create one or more OSDs within a single Drive Group.
 
@@ -955,49 +540,53 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
         """ Update OSD cluster """
         raise NotImplementedError()
 
     def set_unmanaged_flag(self,
                            unmanaged_flag: bool,
                            service_type: str = 'osd',
         """ Update OSD cluster """
         raise NotImplementedError()
 
     def set_unmanaged_flag(self,
                            unmanaged_flag: bool,
                            service_type: str = 'osd',
-                           service_name=None
+                           service_name: Optional[str] = None
                            ) -> HandleCommandResult:
         raise NotImplementedError()
 
     def preview_osdspecs(self,
                          osdspec_name: Optional[str] = 'osd',
                          osdspecs: Optional[List[DriveGroupSpec]] = None
                            ) -> HandleCommandResult:
         raise NotImplementedError()
 
     def preview_osdspecs(self,
                          osdspec_name: Optional[str] = 'osd',
                          osdspecs: Optional[List[DriveGroupSpec]] = None
-                         ) -> Completion[str]:
+                         ) -> OrchResult[str]:
         """ Get a preview for OSD deployments """
         raise NotImplementedError()
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
         """ Get a preview for OSD deployments """
         raise NotImplementedError()
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> Completion[str]:
+                    force: bool = False,
+                    zap: bool = False) -> OrchResult[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
         :param force: Forces the OSD removal process without waiting for the data to be drained first.
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
         :param force: Forces the OSD removal process without waiting for the data to be drained first.
-        Note that this can only remove OSDs that were successfully
-        created (i.e. got an OSD ID).
+        :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+
+        .. note:: this can only remove OSDs that were successfully
+            created (i.e. got an OSD ID).
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+    def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
         """
         TODO
         """
         raise NotImplementedError()
 
         """
         TODO
         """
         raise NotImplementedError()
 
-    def remove_osds_status(self) -> Completion:
+    def remove_osds_status(self) -> OrchResult:
         """
         Returns a status of the ongoing OSD removal operations.
         """
         raise NotImplementedError()
 
         """
         Returns a status of the ongoing OSD removal operations.
         """
         raise NotImplementedError()
 
-    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
+    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
         """
         Instructs the orchestrator to enable or disable either the ident or the fault LED.
 
         """
         Instructs the orchestrator to enable or disable either the ident or the fault LED.
 
@@ -1007,122 +596,98 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
         """
         raise NotImplementedError()
 
-    def zap_device(self, host: str, path: str) -> Completion[str]:
+    def zap_device(self, host: str, path: str) -> OrchResult[str]:
         """Zap/Erase a device (DESTROYS DATA)"""
         raise NotImplementedError()
 
         """Zap/Erase a device (DESTROYS DATA)"""
         raise NotImplementedError()
 
-    def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create mon daemon(s)"""
+    def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
+        """Create daemons daemon(s) for unmanaged services"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mon cluster"""
         raise NotImplementedError()
 
         """Update mon cluster"""
         raise NotImplementedError()
 
-    def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create mgr daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mgr cluster"""
         raise NotImplementedError()
 
         """Update mgr cluster"""
         raise NotImplementedError()
 
-    def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create MDS daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
         """Update MDS cluster"""
         raise NotImplementedError()
 
         """Update MDS cluster"""
         raise NotImplementedError()
 
-    def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
-        """Create RGW daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
+    def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
         """Update RGW cluster"""
         raise NotImplementedError()
 
         """Update RGW cluster"""
         raise NotImplementedError()
 
-    def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create rbd-mirror daemon(s)"""
+    def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
+        """Update ingress daemons"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update rbd-mirror cluster"""
         raise NotImplementedError()
 
         """Update rbd-mirror cluster"""
         raise NotImplementedError()
 
-    def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
-        """Create NFS daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
+    def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
         """Update NFS cluster"""
         raise NotImplementedError()
 
         """Update NFS cluster"""
         raise NotImplementedError()
 
-    def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
-        """Create iscsi daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
+    def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
         """Update iscsi cluster"""
         raise NotImplementedError()
 
         """Update iscsi cluster"""
         raise NotImplementedError()
 
-    def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create new prometheus daemon"""
-        raise NotImplementedError()
-
-    def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update prometheus cluster"""
         raise NotImplementedError()
 
         """Update prometheus cluster"""
         raise NotImplementedError()
 
-    def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new Node-Exporter service"""
+    def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
-        """Update existing a Node-Exporter daemon(s)"""
+    def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Loki daemon(s)"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new crash service"""
+    def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Promtail daemon(s)"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
 
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
 
-    def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new Node-Exporter service"""
+    def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a grafana service"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
-        """Update existing a Node-Exporter daemon(s)"""
+    def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update an existing AlertManager daemon(s)"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new AlertManager service"""
+    def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+        """Update an existing snmp gateway service"""
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
-        """Update an existing AlertManager daemon(s)"""
+    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+    def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+    def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def upgrade_pause(self) -> Completion[str]:
+    def upgrade_pause(self) -> OrchResult[str]:
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def upgrade_resume(self) -> Completion[str]:
+    def upgrade_resume(self) -> OrchResult[str]:
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def upgrade_stop(self) -> Completion[str]:
+    def upgrade_stop(self) -> OrchResult[str]:
         raise NotImplementedError()
 
         raise NotImplementedError()
 
-    def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
+    def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
@@ -1132,7 +697,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
         raise NotImplementedError()
 
     @_hide_in_features
-    def upgrade_available(self) -> Completion:
+    def upgrade_available(self) -> OrchResult:
         """
         Report on what versions are available to upgrade to
 
         """
         Report on what versions are available to upgrade to
 
@@ -1151,24 +716,104 @@ def json_to_generic_spec(spec: dict) -> GenericSpec:
         return ServiceSpec.from_json(spec)
 
 
         return ServiceSpec.from_json(spec)
 
 
+def daemon_type_to_service(dtype: str) -> str:
+    mapping = {
+        'mon': 'mon',
+        'mgr': 'mgr',
+        'mds': 'mds',
+        'rgw': 'rgw',
+        'osd': 'osd',
+        'haproxy': 'ingress',
+        'keepalived': 'ingress',
+        'iscsi': 'iscsi',
+        'rbd-mirror': 'rbd-mirror',
+        'cephfs-mirror': 'cephfs-mirror',
+        'nfs': 'nfs',
+        'grafana': 'grafana',
+        'alertmanager': 'alertmanager',
+        'prometheus': 'prometheus',
+        'node-exporter': 'node-exporter',
+        'loki': 'loki',
+        'promtail': 'promtail',
+        'crash': 'crash',
+        'crashcollector': 'crash',  # Specific Rook Daemon
+        'container': 'container',
+        'agent': 'agent',
+        'snmp-gateway': 'snmp-gateway',
+    }
+    return mapping[dtype]
+
+
+def service_to_daemon_types(stype: str) -> List[str]:
+    mapping = {
+        'mon': ['mon'],
+        'mgr': ['mgr'],
+        'mds': ['mds'],
+        'rgw': ['rgw'],
+        'osd': ['osd'],
+        'ingress': ['haproxy', 'keepalived'],
+        'iscsi': ['iscsi'],
+        'rbd-mirror': ['rbd-mirror'],
+        'cephfs-mirror': ['cephfs-mirror'],
+        'nfs': ['nfs'],
+        'grafana': ['grafana'],
+        'alertmanager': ['alertmanager'],
+        'prometheus': ['prometheus'],
+        'loki': ['loki'],
+        'promtail': ['promtail'],
+        'node-exporter': ['node-exporter'],
+        'crash': ['crash'],
+        'container': ['container'],
+        'agent': ['agent'],
+        'snmp-gateway': ['snmp-gateway'],
+    }
+    return mapping[stype]
+
+
+KNOWN_DAEMON_TYPES: List[str] = list(
+    sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
 class UpgradeStatusSpec(object):
     # Orchestrator's report on what's going on with any ongoing upgrade
 class UpgradeStatusSpec(object):
     # Orchestrator's report on what's going on with any ongoing upgrade
-    def __init__(self):
+    def __init__(self) -> None:
         self.in_progress = False  # Is an upgrade underway?
         self.in_progress = False  # Is an upgrade underway?
-        self.target_image = None
-        self.services_complete = []  # Which daemon types are fully updated?
+        self.target_image: Optional[str] = None
+        self.services_complete: List[str] = []  # Which daemon types are fully updated?
+        self.which: str = '<unknown>'  # for if user specified daemon types, services or hosts
+        self.progress: Optional[str] = None  # How many of the daemons have we upgraded
         self.message = ""  # Freeform description
 
 
         self.message = ""  # Freeform description
 
 
-def handle_type_error(method):
+def handle_type_error(method: FuncT) -> FuncT:
     @wraps(method)
     @wraps(method)
-    def inner(cls, *args, **kwargs):
+    def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
         try:
             return method(cls, *args, **kwargs)
         except TypeError as e:
             error_msg = '{}: {}'.format(cls.__name__, e)
         raise OrchestratorValidationError(error_msg)
         try:
             return method(cls, *args, **kwargs)
         except TypeError as e:
             error_msg = '{}: {}'.format(cls.__name__, e)
         raise OrchestratorValidationError(error_msg)
-    return inner
+    return cast(FuncT, inner)
+
+
+class DaemonDescriptionStatus(enum.IntEnum):
+    unknown = -2
+    error = -1
+    stopped = 0
+    running = 1
+    starting = 2  #: Daemon is deployed, but not yet running
+
+    @staticmethod
+    def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+        if status is None:
+            status = DaemonDescriptionStatus.unknown
+        return {
+            DaemonDescriptionStatus.unknown: 'unknown',
+            DaemonDescriptionStatus.error: 'error',
+            DaemonDescriptionStatus.stopped: 'stopped',
+            DaemonDescriptionStatus.running: 'running',
+            DaemonDescriptionStatus.starting: 'starting',
+        }.get(status, '<unknown>')
 
 
 class DaemonDescription(object):
 
 
 class DaemonDescription(object):
@@ -1185,53 +830,74 @@ class DaemonDescription(object):
     """
 
     def __init__(self,
     """
 
     def __init__(self,
-                 daemon_type=None,
-                 daemon_id=None,
-                 hostname=None,
-                 container_id=None,
-                 container_image_id=None,
-                 container_image_name=None,
-                 version=None,
-                 status=None,
-                 status_desc=None,
-                 last_refresh=None,
-                 created=None,
-                 started=None,
-                 last_configured=None,
-                 osdspec_affinity=None,
-                 last_deployed=None,
+                 daemon_type: Optional[str] = None,
+                 daemon_id: Optional[str] = None,
+                 hostname: Optional[str] = None,
+                 container_id: Optional[str] = None,
+                 container_image_id: Optional[str] = None,
+                 container_image_name: Optional[str] = None,
+                 container_image_digests: Optional[List[str]] = None,
+                 version: Optional[str] = None,
+                 status: Optional[DaemonDescriptionStatus] = None,
+                 status_desc: Optional[str] = None,
+                 last_refresh: Optional[datetime.datetime] = None,
+                 created: Optional[datetime.datetime] = None,
+                 started: Optional[datetime.datetime] = None,
+                 last_configured: Optional[datetime.datetime] = None,
+                 osdspec_affinity: Optional[str] = None,
+                 last_deployed: Optional[datetime.datetime] = None,
                  events: Optional[List['OrchestratorEvent']] = None,
                  events: Optional[List['OrchestratorEvent']] = None,
-                 is_active: bool = False):
-
-        # Host is at the same granularity as InventoryHost
-        self.hostname: str = hostname
+                 is_active: bool = False,
+                 memory_usage: Optional[int] = None,
+                 memory_request: Optional[int] = None,
+                 memory_limit: Optional[int] = None,
+                 cpu_percentage: Optional[str] = None,
+                 service_name: Optional[str] = None,
+                 ports: Optional[List[int]] = None,
+                 ip: Optional[str] = None,
+                 deployed_by: Optional[List[str]] = None,
+                 rank: Optional[int] = None,
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None,
+                 ) -> None:
+
+        #: Host is at the same granularity as InventoryHost
+        self.hostname: Optional[str] = hostname
 
         # Not everyone runs in containers, but enough people do to
         # justify having the container_id (runtime id) and container_image
         # (image name)
         self.container_id = container_id                  # runtime id
 
         # Not everyone runs in containers, but enough people do to
         # justify having the container_id (runtime id) and container_image
         # (image name)
         self.container_id = container_id                  # runtime id
-        self.container_image_id = container_image_id      # image hash
+        self.container_image_id = container_image_id      # image id locally
         self.container_image_name = container_image_name  # image friendly name
         self.container_image_name = container_image_name  # image friendly name
+        self.container_image_digests = container_image_digests  # reg hashes
 
 
-        # The type of service (osd, mon, mgr, etc.)
+        #: The type of service (osd, mon, mgr, etc.)
         self.daemon_type = daemon_type
 
         self.daemon_type = daemon_type
 
-        # The orchestrator will have picked some names for daemons,
-        # typically either based on hostnames or on pod names.
-        # This is the <foo> in mds.<foo>, the ID that will appear
-        # in the FSMap/ServiceMap.
-        self.daemon_id: str = daemon_id
+        #: The orchestrator will have picked some names for daemons,
+        #: typically either based on hostnames or on pod names.
+        #: This is the <foo> in mds.<foo>, the ID that will appear
+        #: in the FSMap/ServiceMap.
+        self.daemon_id: Optional[str] = daemon_id
+        self.daemon_name = self.name()
+
+        #: Some daemon types have a numeric rank assigned
+        self.rank: Optional[int] = rank
+        self.rank_generation: Optional[int] = rank_generation
 
 
-        # Service version that was deployed
+        self._service_name: Optional[str] = service_name
+
+        #: Service version that was deployed
         self.version = version
 
         self.version = version
 
-        # Service status: -1 error, 0 stopped, 1 running
-        self.status = status
+        # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+        self._status = status
 
 
-        # Service status description when status == -1.
+        #: Service status description when status == error.
         self.status_desc = status_desc
 
         self.status_desc = status_desc
 
-        # datetime when this info was last refreshed
+        #: datetime when this info was last refreshed
         self.last_refresh: Optional[datetime.datetime] = last_refresh
 
         self.created: Optional[datetime.datetime] = created
         self.last_refresh: Optional[datetime.datetime] = last_refresh
 
         self.created: Optional[datetime.datetime] = created
@@ -1239,26 +905,67 @@ class DaemonDescription(object):
         self.last_configured: Optional[datetime.datetime] = last_configured
         self.last_deployed: Optional[datetime.datetime] = last_deployed
 
         self.last_configured: Optional[datetime.datetime] = last_configured
         self.last_deployed: Optional[datetime.datetime] = last_deployed
 
-        # Affinity to a certain OSDSpec
+        #: Affinity to a certain OSDSpec
         self.osdspec_affinity: Optional[str] = osdspec_affinity
 
         self.events: List[OrchestratorEvent] = events or []
 
         self.osdspec_affinity: Optional[str] = osdspec_affinity
 
         self.events: List[OrchestratorEvent] = events or []
 
+        self.memory_usage: Optional[int] = memory_usage
+        self.memory_request: Optional[int] = memory_request
+        self.memory_limit: Optional[int] = memory_limit
+
+        self.cpu_percentage: Optional[str] = cpu_percentage
+
+        self.ports: Optional[List[int]] = ports
+        self.ip: Optional[str] = ip
+
+        self.deployed_by = deployed_by
+
         self.is_active = is_active
 
         self.is_active = is_active
 
-    def name(self):
+        self.extra_container_args = extra_container_args
+
+    @property
+    def status(self) -> Optional[DaemonDescriptionStatus]:
+        return self._status
+
+    @status.setter
+    def status(self, new: DaemonDescriptionStatus) -> None:
+        self._status = new
+        self.status_desc = DaemonDescriptionStatus.to_str(new)
+
+    def get_port_summary(self) -> str:
+        if not self.ports:
+            return ''
+        return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
+
+    def name(self) -> str:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
     def matches_service(self, service_name: Optional[str]) -> bool:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
     def matches_service(self, service_name: Optional[str]) -> bool:
+        assert self.daemon_id is not None
+        assert self.daemon_type is not None
         if service_name:
         if service_name:
-            return self.name().startswith(service_name + '.')
+            return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
         return False
 
         return False
 
-    def service_id(self):
-        if self.daemon_type == 'osd' and self.osdspec_affinity:
-            return self.osdspec_affinity
+    def service_id(self) -> str:
+        assert self.daemon_id is not None
+        assert self.daemon_type is not None
+
+        if self._service_name:
+            if '.' in self._service_name:
+                return self._service_name.split('.', 1)[1]
+            else:
+                return ''
+
+        if self.daemon_type == 'osd':
+            if self.osdspec_affinity and self.osdspec_affinity != 'None':
+                return self.osdspec_affinity
+            return ''
 
 
-        def _match():
+        def _match() -> str:
+            assert self.daemon_id is not None
             err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
                                     f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
 
             err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
                                     f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
 
@@ -1298,39 +1005,56 @@ class DaemonDescription(object):
             # daemon_id == "service_id"
             return self.daemon_id
 
             # daemon_id == "service_id"
             return self.daemon_id
 
-        if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
+        if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
             return _match()
 
         return self.daemon_id
 
             return _match()
 
         return self.daemon_id
 
-    def service_name(self):
-        if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
-            return f'{self.daemon_type}.{self.service_id()}'
-        return self.daemon_type
+    def service_name(self) -> str:
+        if self._service_name:
+            return self._service_name
+        assert self.daemon_type is not None
+        if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
+            return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
+        return daemon_type_to_service(self.daemon_type)
 
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
-    def to_json(self):
-        out = OrderedDict()
+    def __str__(self) -> str:
+        return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
+    def to_json(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
         out['daemon_type'] = self.daemon_type
         out['daemon_id'] = self.daemon_id
         out['daemon_type'] = self.daemon_type
         out['daemon_id'] = self.daemon_id
+        out['service_name'] = self._service_name
+        out['daemon_name'] = self.name()
         out['hostname'] = self.hostname
         out['container_id'] = self.container_id
         out['container_image_id'] = self.container_image_id
         out['container_image_name'] = self.container_image_name
         out['hostname'] = self.hostname
         out['container_id'] = self.container_id
         out['container_image_id'] = self.container_image_id
         out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
         out['version'] = self.version
         out['version'] = self.version
-        out['status'] = self.status
+        out['status'] = self.status.value if self.status is not None else None
         out['status_desc'] = self.status_desc
         if self.daemon_type == 'osd':
             out['osdspec_affinity'] = self.osdspec_affinity
         out['is_active'] = self.is_active
         out['status_desc'] = self.status_desc
         if self.daemon_type == 'osd':
             out['osdspec_affinity'] = self.osdspec_affinity
         out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+        out['rank'] = self.rank
+        out['rank_generation'] = self.rank_generation
 
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if getattr(self, k):
 
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if getattr(self, k):
-                out[k] = getattr(self, k).strftime(DATEFMT)
+                out[k] = datetime_to_str(getattr(self, k))
 
         if self.events:
             out['events'] = [e.to_json() for e in self.events]
 
         if self.events:
             out['events'] = [e.to_json() for e in self.events]
@@ -1340,25 +1064,74 @@ class DaemonDescription(object):
             del out[e]
         return out
 
             del out[e]
         return out
 
+    def to_dict(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
+        out['daemon_type'] = self.daemon_type
+        out['daemon_id'] = self.daemon_id
+        out['daemon_name'] = self.name()
+        out['hostname'] = self.hostname
+        out['container_id'] = self.container_id
+        out['container_image_id'] = self.container_image_id
+        out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
+        out['version'] = self.version
+        out['status'] = self.status.value if self.status is not None else None
+        out['status_desc'] = self.status_desc
+        if self.daemon_type == 'osd':
+            out['osdspec_affinity'] = self.osdspec_affinity
+        out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+
+        for k in ['last_refresh', 'created', 'started', 'last_deployed',
+                  'last_configured']:
+            if getattr(self, k):
+                out[k] = datetime_to_str(getattr(self, k))
+
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+
+        empty = [k for k, v in out.items() if v is None]
+        for e in empty:
+            del out[e]
+        return out
+
     @classmethod
     @handle_type_error
     @classmethod
     @handle_type_error
-    def from_json(cls, data):
+    def from_json(cls, data: dict) -> 'DaemonDescription':
         c = data.copy()
         event_strs = c.pop('events', [])
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if k in c:
         c = data.copy()
         event_strs = c.pop('events', [])
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if k in c:
-                c[k] = datetime.datetime.strptime(c[k], DATEFMT)
+                c[k] = str_to_datetime(c[k])
         events = [OrchestratorEvent.from_json(e) for e in event_strs]
         events = [OrchestratorEvent.from_json(e) for e in event_strs]
-        return cls(events=events, **c)
+        status_int = c.pop('status', None)
+        if 'daemon_name' in c:
+            del c['daemon_name']
+        if 'service_name' in c and c['service_name'].startswith('osd.'):
+            # if the service_name is a osd.NNN (numeric osd id) then
+            # ignore it -- it is not a valid service_name and
+            # (presumably) came from an older version of cephadm.
+            try:
+                int(c['service_name'][4:])
+                del c['service_name']
+            except ValueError:
+                pass
+        status = DaemonDescriptionStatus(status_int) if status_int is not None else None
+        return cls(events=events, status=status, **c)
 
 
-    def __copy__(self):
+    def __copy__(self) -> 'DaemonDescription':
         # feel free to change this:
         return DaemonDescription.from_json(self.to_json())
 
     @staticmethod
         # feel free to change this:
         return DaemonDescription.from_json(self.to_json())
 
     @staticmethod
-    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
-        return dumper.represent_dict(data.to_json().items())
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
 
 
 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
 
 
 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
@@ -1379,25 +1152,23 @@ class ServiceDescription(object):
 
     def __init__(self,
                  spec: ServiceSpec,
 
     def __init__(self,
                  spec: ServiceSpec,
-                 container_image_id=None,
-                 container_image_name=None,
-                 rados_config_location=None,
-                 service_url=None,
-                 last_refresh=None,
-                 created=None,
-                 size=0,
-                 running=0,
-                 events: Optional[List['OrchestratorEvent']] = None):
+                 container_image_id: Optional[str] = None,
+                 container_image_name: Optional[str] = None,
+                 service_url: Optional[str] = None,
+                 last_refresh: Optional[datetime.datetime] = None,
+                 created: Optional[datetime.datetime] = None,
+                 deleted: Optional[datetime.datetime] = None,
+                 size: int = 0,
+                 running: int = 0,
+                 events: Optional[List['OrchestratorEvent']] = None,
+                 virtual_ip: Optional[str] = None,
+                 ports: List[int] = []) -> None:
         # Not everyone runs in containers, but enough people do to
         # justify having the container_image_id (image hash) and container_image
         # (image name)
         self.container_image_id = container_image_id      # image hash
         self.container_image_name = container_image_name  # image friendly name
 
         # Not everyone runs in containers, but enough people do to
         # justify having the container_image_id (image hash) and container_image
         # (image name)
         self.container_image_id = container_image_id      # image hash
         self.container_image_name = container_image_name  # image friendly name
 
-        # Location of the service configuration when stored in rados
-        # object. Format: "rados://<pool>/[<namespace/>]<object>"
-        self.rados_config_location = rados_config_location
-
         # If the service exposes REST-like API, this attribute should hold
         # the URL.
         self.service_url = service_url
         # If the service exposes REST-like API, this attribute should hold
         # the URL.
         self.service_url = service_url
@@ -1411,41 +1182,73 @@ class ServiceDescription(object):
         # datetime when this info was last refreshed
         self.last_refresh: Optional[datetime.datetime] = last_refresh
         self.created: Optional[datetime.datetime] = created
         # datetime when this info was last refreshed
         self.last_refresh: Optional[datetime.datetime] = last_refresh
         self.created: Optional[datetime.datetime] = created
+        self.deleted: Optional[datetime.datetime] = deleted
 
         self.spec: ServiceSpec = spec
 
         self.events: List[OrchestratorEvent] = events or []
 
 
         self.spec: ServiceSpec = spec
 
         self.events: List[OrchestratorEvent] = events or []
 
-    def service_type(self):
+        self.virtual_ip = virtual_ip
+        self.ports = ports
+
+    def service_type(self) -> str:
         return self.spec.service_type
 
         return self.spec.service_type
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return f"<ServiceDescription of {self.spec.one_line_str()}>"
 
         return f"<ServiceDescription of {self.spec.one_line_str()}>"
 
+    def get_port_summary(self) -> str:
+        if not self.ports:
+            return ''
+        return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
+
     def to_json(self) -> OrderedDict:
         out = self.spec.to_json()
         status = {
             'container_image_id': self.container_image_id,
             'container_image_name': self.container_image_name,
     def to_json(self) -> OrderedDict:
         out = self.spec.to_json()
         status = {
             'container_image_id': self.container_image_id,
             'container_image_name': self.container_image_name,
-            'rados_config_location': self.rados_config_location,
             'service_url': self.service_url,
             'size': self.size,
             'running': self.running,
             'last_refresh': self.last_refresh,
             'created': self.created,
             'service_url': self.service_url,
             'size': self.size,
             'running': self.running,
             'last_refresh': self.last_refresh,
             'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
         }
         for k in ['last_refresh', 'created']:
             if getattr(self, k):
         }
         for k in ['last_refresh', 'created']:
             if getattr(self, k):
-                status[k] = getattr(self, k).strftime(DATEFMT)
+                status[k] = datetime_to_str(getattr(self, k))
         status = {k: v for (k, v) in status.items() if v is not None}
         out['status'] = status
         if self.events:
             out['events'] = [e.to_json() for e in self.events]
         return out
 
         status = {k: v for (k, v) in status.items() if v is not None}
         out['status'] = status
         if self.events:
             out['events'] = [e.to_json() for e in self.events]
         return out
 
+    def to_dict(self) -> OrderedDict:
+        out = self.spec.to_json()
+        status = {
+            'container_image_id': self.container_image_id,
+            'container_image_name': self.container_image_name,
+            'service_url': self.service_url,
+            'size': self.size,
+            'running': self.running,
+            'last_refresh': self.last_refresh,
+            'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
+        }
+        for k in ['last_refresh', 'created']:
+            if getattr(self, k):
+                status[k] = datetime_to_str(getattr(self, k))
+        status = {k: v for (k, v) in status.items() if v is not None}
+        out['status'] = status
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+        return out
+
     @classmethod
     @handle_type_error
     @classmethod
     @handle_type_error
-    def from_json(cls, data: dict):
+    def from_json(cls, data: dict) -> 'ServiceDescription':
         c = data.copy()
         status = c.pop('status', {})
         event_strs = c.pop('events', [])
         c = data.copy()
         status = c.pop('status', {})
         event_strs = c.pop('events', [])
@@ -1454,13 +1257,13 @@ class ServiceDescription(object):
         c_status = status.copy()
         for k in ['last_refresh', 'created']:
             if k in c_status:
         c_status = status.copy()
         for k in ['last_refresh', 'created']:
             if k in c_status:
-                c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
+                c_status[k] = str_to_datetime(c_status[k])
         events = [OrchestratorEvent.from_json(e) for e in event_strs]
         return cls(spec=spec, events=events, **c_status)
 
     @staticmethod
         events = [OrchestratorEvent.from_json(e) for e in event_strs]
         return cls(spec=spec, events=events, **c_status)
 
     @staticmethod
-    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
-        return dumper.represent_dict(data.to_json().items())
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
 
 
 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
 
 
 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
@@ -1471,13 +1274,14 @@ class InventoryFilter(object):
     When fetching inventory, use this filter to avoid unnecessarily
     scanning the whole estate.
 
     When fetching inventory, use this filter to avoid unnecessarily
     scanning the whole estate.
 
-    Typical use: filter by host when presenting UI workflow for configuring
-                 a particular server.
-                 filter by label when not all of estate is Ceph servers,
-                 and we want to only learn about the Ceph servers.
-                 filter by label when we are interested particularly
-                 in e.g. OSD servers.
+    Typical use:
 
 
+      filter by host when presentig UI workflow for configuring
+      a particular server.
+      filter by label when not all of estate is Ceph servers,
+      and we want to only learn about the Ceph servers.
+      filter by label when we are interested particularly
+      in e.g. OSD servers.
     """
 
     def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
     """
 
     def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
@@ -1507,7 +1311,7 @@ class InventoryHost(object):
         self.devices = devices
         self.labels = labels
 
         self.devices = devices
         self.labels = labels
 
-    def to_json(self):
+    def to_json(self) -> dict:
         return {
             'name': self.name,
             'addr': self.addr,
         return {
             'name': self.name,
             'addr': self.addr,
@@ -1516,7 +1320,7 @@ class InventoryHost(object):
         }
 
     @classmethod
         }
 
     @classmethod
-    def from_json(cls, data):
+    def from_json(cls, data: dict) -> 'InventoryHost':
         try:
             _data = copy.deepcopy(data)
             name = _data.pop('name')
         try:
             _data = copy.deepcopy(data)
             name = _data.pop('name')
@@ -1534,18 +1338,18 @@ class InventoryHost(object):
             raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
 
     @classmethod
             raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
 
     @classmethod
-    def from_nested_items(cls, hosts):
+    def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
         devs = inventory.Devices.from_json
         return [cls(item[0], devs(item[1].data)) for item in hosts]
 
         devs = inventory.Devices.from_json
         return [cls(item[0], devs(item[1].data)) for item in hosts]
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "<InventoryHost>({name})".format(name=self.name)
 
     @staticmethod
     def get_host_names(hosts: List['InventoryHost']) -> List[str]:
         return [host.name for host in hosts]
 
         return "<InventoryHost>({name})".format(name=self.name)
 
     @staticmethod
     def get_host_names(hosts: List['InventoryHost']) -> List[str]:
         return [host.name for host in hosts]
 
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
         return self.name == other.name and self.devices == other.devices
 
 
         return self.name == other.name and self.devices == other.devices
 
 
@@ -1572,9 +1376,10 @@ class OrchestratorEvent:
     ERROR = 'ERROR'
     regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
 
     ERROR = 'ERROR'
     regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
 
-    def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
+    def __init__(self, created: Union[str, datetime.datetime], kind: str,
+                 subject: str, level: str, message: str) -> None:
         if isinstance(created, str):
         if isinstance(created, str):
-            created = datetime.datetime.strptime(created, DATEFMT)
+            created = str_to_datetime(created)
         self.created: datetime.datetime = created
 
         assert kind in "service daemon".split()
         self.created: datetime.datetime = created
 
         assert kind in "service daemon".split()
@@ -1596,15 +1401,24 @@ class OrchestratorEvent:
 
     def to_json(self) -> str:
         # Make a long list of events readable.
 
     def to_json(self) -> str:
         # Make a long list of events readable.
-        created = self.created.strftime(DATEFMT)
+        created = datetime_to_str(self.created)
         return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
 
         return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
 
+    def to_dict(self) -> dict:
+        # Convert events data to dict.
+        return {
+            'created': datetime_to_str(self.created),
+            'subject': self.kind_subject(),
+            'level': self.level,
+            'message': self.message
+        }
+
     @classmethod
     @handle_type_error
     @classmethod
     @handle_type_error
-    def from_json(cls, data) -> "OrchestratorEvent":
+    def from_json(cls, data: str) -> "OrchestratorEvent":
         """
         >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
         """
         >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
-        '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+        '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
 
         :param data:
         :return:
 
         :param data:
         :return:
@@ -1614,26 +1428,30 @@ class OrchestratorEvent:
             return cls(*match.groups())
         raise ValueError(f'Unable to match: "{data}"')
 
             return cls(*match.groups())
         raise ValueError(f'Unable to match: "{data}"')
 
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
         if not isinstance(other, OrchestratorEvent):
             return False
 
         return self.created == other.created and self.kind == other.kind \
             and self.subject == other.subject and self.message == other.message
 
         if not isinstance(other, OrchestratorEvent):
             return False
 
         return self.created == other.created and self.kind == other.kind \
             and self.subject == other.subject and self.message == other.message
 
+    def __repr__(self) -> str:
+        return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
 
 
-def _mk_orch_methods(cls):
+def _mk_orch_methods(cls: Any) -> Any:
     # Needs to be defined outside of for.
     # Otherwise meth is always bound to last key
     # Needs to be defined outside of for.
     # Otherwise meth is always bound to last key
-    def shim(method_name):
-        def inner(self, *args, **kwargs):
+    def shim(method_name: str) -> Callable:
+        def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
             completion = self._oremote(method_name, args, kwargs)
             return completion
         return inner
 
             completion = self._oremote(method_name, args, kwargs)
             return completion
         return inner
 
-    for meth in Orchestrator.__dict__:
-        if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
-            setattr(cls, meth, shim(meth))
+    for name, method in Orchestrator.__dict__.items():
+        if not name.startswith('_') and name not in ['is_orchestrator_module']:
+            remote_call = update_wrapper(shim(name), method)
+            setattr(cls, name, remote_call)
     return cls
 
 
     return cls
 
 
@@ -1649,7 +1467,6 @@ class OrchestratorClientMixin(Orchestrator):
     >>> class MyModule(OrchestratorClientMixin):
     ...    def func(self):
     ...        completion = self.add_host('somehost')  # calls `_oremote()`
     >>> class MyModule(OrchestratorClientMixin):
     ...    def func(self):
     ...        completion = self.add_host('somehost')  # calls `_oremote()`
-    ...        self._orchestrator_wait([completion])
     ...        self.log.debug(completion.result)
 
     .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
     ...        self.log.debug(completion.result)
 
     .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
@@ -1672,13 +1489,13 @@ class OrchestratorClientMixin(Orchestrator):
 
         self.__mgr = mgr  # Make sure we're not overwriting any other `mgr` properties
 
 
         self.__mgr = mgr  # Make sure we're not overwriting any other `mgr` properties
 
-    def __get_mgr(self):
+    def __get_mgr(self) -> Any:
         try:
             return self.__mgr
         except AttributeError:
             return self
 
         try:
             return self.__mgr
         except AttributeError:
             return self
 
-    def _oremote(self, meth, args, kwargs):
+    def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
         """
         Helper for invoking `remote` on whichever orchestrator is enabled
 
         """
         Helper for invoking `remote` on whichever orchestrator is enabled
 
@@ -1706,24 +1523,3 @@ class OrchestratorClientMixin(Orchestrator):
             if meth not in f_set or not f_set[meth]['available']:
                 raise NotImplementedError(f'{o} does not implement {meth}') from e
             raise
             if meth not in f_set or not f_set[meth]['available']:
                 raise NotImplementedError(f'{o} does not implement {meth}') from e
             raise
-
-    def _orchestrator_wait(self, completions: List[Completion]) -> None:
-        """
-        Wait for completions to complete (reads) or
-        become persistent (writes).
-
-        Waits for writes to be *persistent* but not *effective*.
-
-        :param completions: List of Completions
-        :raises NoOrchestrator:
-        :raises RuntimeError: something went wrong while calling the process method.
-        :raises ImportError: no `orchestrator` module or backend not found.
-        """
-        while any(not c.has_result for c in completions):
-            self.process(completions)
-            self.__get_mgr().log.info("Operations pending: %s",
-                                      sum(1 for c in completions if not c.has_result))
-            if any(c.needs_result for c in completions):
-                time.sleep(1)
-            else:
-                break