]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/orchestrator/_interface.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
index c22ac4c2d3fefe038cfb9d03b25de13665875ca1..b0ccf73570b7772f4d2957efea31d50e69c6886d 100644 (file)
@@ -7,37 +7,42 @@ Please see the ceph-mgr module developer's guide for more information.
 
 import copy
 import datetime
+import enum
 import errno
 import logging
 import pickle
 import re
-import time
-import uuid
 
 from collections import namedtuple, OrderedDict
 from contextlib import contextmanager
-from functools import wraps
+from functools import wraps, reduce, update_wrapper
+
+from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
+    Sequence, Dict, cast, Mapping
+
+try:
+    from typing import Protocol  # Protocol was added in Python 3.8
+except ImportError:
+    class Protocol:  # type: ignore
+        pass
+
 
 import yaml
 
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
-    ServiceSpecValidationError, IscsiServiceSpec
+    IscsiServiceSpec, IngressSpec, SNMPGatewaySpec, MDSSpec
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.hostspec import HostSpec
+from ceph.deployment.hostspec import HostSpec, SpecValidationError
 from ceph.utils import datetime_to_str, str_to_datetime
 
 from mgr_module import MgrModule, CLICommand, HandleCommandResult
 
-try:
-    from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
-        Type, Sequence, Dict, cast
-except ImportError:
-    pass
 
 logger = logging.getLogger(__name__)
 
 T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
 
 
 class OrchestratorError(Exception):
@@ -52,7 +57,7 @@ class OrchestratorError(Exception):
     def __init__(self,
                  msg: str,
                  errno: int = -errno.EINVAL,
-                 event_kind_subject: Optional[Tuple[str, str]] = None):
+                 event_kind_subject: Optional[Tuple[str, str]] = None) -> None:
         super(Exception, self).__init__(msg)
         self.errno = errno
         # See OrchestratorEvent.subject
@@ -64,7 +69,7 @@ class NoOrchestrator(OrchestratorError):
     No orchestrator in configured.
     """
 
-    def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
+    def __init__(self, msg: str = "No orchestrator configured (try `ceph orch set backend`)") -> None:
         super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
 
 
@@ -75,7 +80,7 @@ class OrchestratorValidationError(OrchestratorError):
 
 
 @contextmanager
-def set_exception_subject(kind, subject, overwrite=False):
+def set_exception_subject(kind: str, subject: str, overwrite: bool = False) -> Iterator[None]:
     try:
         yield
     except OrchestratorError as e:
@@ -84,12 +89,12 @@ def set_exception_subject(kind, subject, overwrite=False):
         raise
 
 
-def handle_exception(prefix, cmd_args, desc, perm, func):
+def handle_exception(prefix: str, perm: str, func: FuncT) -> FuncT:
     @wraps(func)
-    def wrapper(*args, **kwargs):
+    def wrapper(*args: Any, **kwargs: Any) -> Any:
         try:
             return func(*args, **kwargs)
-        except (OrchestratorError, ServiceSpecValidationError) as e:
+        except (OrchestratorError, SpecValidationError) as e:
             # Do not print Traceback for expected errors.
             return HandleCommandResult(e.errno, stderr=str(e))
         except ImportError as e:
@@ -98,18 +103,44 @@ def handle_exception(prefix, cmd_args, desc, perm, func):
             msg = 'This Orchestrator does not support `{}`'.format(prefix)
             return HandleCommandResult(-errno.ENOENT, stderr=msg)
 
-    # misuse partial to copy `wrapper`
-    wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)
+    # misuse lambda to copy `wrapper`
+    wrapper_copy = lambda *l_args, **l_kwargs: wrapper(*l_args, **l_kwargs)  # noqa: E731
     wrapper_copy._prefix = prefix  # type: ignore
-    wrapper_copy._cli_command = CLICommand(prefix, cmd_args, desc, perm)  # type: ignore
+    wrapper_copy._cli_command = CLICommand(prefix, perm)  # type: ignore
+    wrapper_copy._cli_command.store_func_metadata(func)  # type: ignore
     wrapper_copy._cli_command.func = wrapper_copy  # type: ignore
 
-    return wrapper_copy
+    return cast(FuncT, wrapper_copy)
+
+
+def handle_orch_error(f: Callable[..., T]) -> Callable[..., 'OrchResult[T]']:
+    """
+    Decorator to make Orchestrator methods return
+    an OrchResult.
+    """
+
+    @wraps(f)
+    def wrapper(*args: Any, **kwargs: Any) -> OrchResult[T]:
+        try:
+            return OrchResult(f(*args, **kwargs))
+        except Exception as e:
+            logger.exception(e)
+            import os
+            if 'UNITTEST' in os.environ:
+                raise  # This makes debugging of Tracebacks from unittests a bit easier
+            return OrchResult(None, exception=e)
+
+    return cast(Callable[..., OrchResult[T]], wrapper)
+
 
+class InnerCliCommandCallable(Protocol):
+    def __call__(self, prefix: str) -> Callable[[FuncT], FuncT]:
+        ...
 
-def _cli_command(perm):
-    def inner_cli_command(prefix, cmd_args="", desc=""):
-        return lambda func: handle_exception(prefix, cmd_args, desc, perm, func)
+
+def _cli_command(perm: str) -> InnerCliCommandCallable:
+    def inner_cli_command(prefix: str) -> Callable[[FuncT], FuncT]:
+        return lambda func: handle_exception(prefix, perm, func)
     return inner_cli_command
 
 
@@ -124,7 +155,7 @@ class CLICommandMeta(type):
 
     We make use of CLICommand, except for the use of the global variable.
     """
-    def __init__(cls, name, bases, dct):
+    def __init__(cls, name: str, bases: Any, dct: Any) -> None:
         super(CLICommandMeta, cls).__init__(name, bases, dct)
         dispatch: Dict[str, CLICommand] = {}
         for v in dct.values():
@@ -133,7 +164,7 @@ class CLICommandMeta(type):
             except AttributeError:
                 pass
 
-        def handle_command(self, inbuf, cmd):
+        def handle_command(self: Any, inbuf: Optional[str], cmd: dict) -> Any:
             if cmd['prefix'] not in dispatch:
                 return self.handle_command(inbuf, cmd)
 
@@ -143,57 +174,30 @@ class CLICommandMeta(type):
         cls.handle_command = handle_command
 
 
-def _no_result():
-    return object()
-
-
-class _Promise(object):
+class OrchResult(Generic[T]):
     """
-    A completion may need multiple promises to be fulfilled. `_Promise` is one
-    step.
-
-    Typically ``Orchestrator`` implementations inherit from this class to
-    build their own way of finishing a step to fulfil a future.
-
-    They are not exposed in the orchestrator interface and can be seen as a
-    helper to build orchestrator modules.
+    Stores a result and an exception. Mainly to circumvent the
+    MgrModule.remote() method that hides all exceptions and for
+    handling different sub-interpreters.
     """
-    INITIALIZED = 1  # We have a parent completion and a next completion
-    RUNNING = 2
-    FINISHED = 3  # we have a final result
-
-    NO_RESULT: None = _no_result()
-    ASYNC_RESULT = object()
-
-    def __init__(self,
-                 _first_promise: Optional["_Promise"] = None,
-                 value: Optional[Any] = NO_RESULT,
-                 on_complete: Optional[Callable] = None,
-                 name: Optional[str] = None,
-                 ):
-        self._on_complete_ = on_complete
-        self._name = name
-        self._next_promise: Optional[_Promise] = None
 
-        self._state = self.INITIALIZED
-        self._exception: Optional[Exception] = None
+    def __init__(self, result: Optional[T], exception: Optional[Exception] = None) -> None:
+        self.result = result
+        self.serialized_exception: Optional[bytes] = None
+        self.exception_str: str = ''
+        self.set_exception(exception)
 
-        # Value of this _Promise. may be an intermediate result.
-        self._value = value
+    __slots__ = 'result', 'serialized_exception', 'exception_str'
 
-        # _Promise is not a continuation monad, as `_result` is of type
-        # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
-        self._first_promise: '_Promise' = _first_promise or self
+    def set_exception(self, e: Optional[Exception]) -> None:
+        if e is None:
+            self.serialized_exception = None
+            self.exception_str = ''
+            return
 
-    @property
-    def _exception(self) -> Optional[Exception]:
-        return getattr(self, '_exception_', None)
-
-    @_exception.setter
-    def _exception(self, e):
-        self._exception_ = e
+        self.exception_str = f'{type(e)}: {str(e)}'
         try:
-            self._serialized_exception_ = pickle.dumps(e) if e is not None else None
+            self.serialized_exception = pickle.dumps(e)
         except pickle.PicklingError:
             logger.error(f"failed to pickle {e}")
             if isinstance(e, Exception):
@@ -201,365 +205,7 @@ class _Promise(object):
             else:
                 e = Exception(str(e))
             # degenerate to a plain Exception
-            self._serialized_exception_ = pickle.dumps(e)
-
-    @property
-    def _serialized_exception(self) -> Optional[bytes]:
-        return getattr(self, '_serialized_exception_', None)
-
-    @property
-    def _on_complete(self) -> Optional[Callable]:
-        # https://github.com/python/mypy/issues/4125
-        return self._on_complete_
-
-    @_on_complete.setter
-    def _on_complete(self, val: Optional[Callable]) -> None:
-        self._on_complete_ = val
-
-    def __repr__(self):
-        name = self._name or getattr(self._on_complete, '__name__',
-                                     '??') if self._on_complete else 'None'
-        val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
-        return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
-            self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
-                next, '_progress_reference', 'NA'), repr(self._next_promise)
-        )
-
-    def pretty_print_1(self):
-        if self._name:
-            name = self._name
-        elif self._on_complete is None:
-            name = 'lambda x: x'
-        elif hasattr(self._on_complete, '__name__'):
-            name = getattr(self._on_complete, '__name__')
-        else:
-            name = self._on_complete.__class__.__name__
-        val = repr(self._value) if self._value not in (self.NO_RESULT, self.ASYNC_RESULT) else '...'
-        prefix = {
-            self.INITIALIZED: '      ',
-            self.RUNNING:     '   >>>',
-            self.FINISHED:    '(done)'
-        }[self._state]
-        return '{} {}({}),'.format(prefix, name, val)
-
-    def then(self: Any, on_complete: Callable) -> Any:
-        """
-        Call ``on_complete`` as soon as this promise is finalized.
-        """
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        if self._next_promise is not None:
-            return self._next_promise.then(on_complete)
-
-        if self._on_complete is not None:
-            self._set_next_promise(self.__class__(
-                _first_promise=self._first_promise,
-                on_complete=on_complete
-            ))
-            return self._next_promise
-
-        else:
-            self._on_complete = on_complete
-            self._set_next_promise(self.__class__(_first_promise=self._first_promise))
-            return self._next_promise
-
-    def _set_next_promise(self, next: '_Promise') -> None:
-        assert self is not next
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-
-        self._next_promise = next
-        assert self._next_promise is not None
-        for p in iter(self._next_promise):
-            p._first_promise = self._first_promise
-
-    def _finalize(self, value=NO_RESULT):
-        """
-        Sets this promise to complete.
-
-        Orchestrators may choose to use this helper function.
-
-        :param value: new value.
-        """
-        if self._state not in (self.INITIALIZED, self.RUNNING):
-            raise ValueError('finalize: {} already finished. {}'.format(repr(self), value))
-
-        self._state = self.RUNNING
-
-        if value is not self.NO_RESULT:
-            self._value = value
-        assert self._value is not self.NO_RESULT, repr(self)
-
-        if self._on_complete:
-            try:
-                next_result = self._on_complete(self._value)
-            except Exception as e:
-                self.fail(e)
-                return
-        else:
-            next_result = self._value
-
-        if isinstance(next_result, _Promise):
-            # hack: _Promise is not a continuation monad.
-            next_result = next_result._first_promise  # type: ignore
-            assert next_result not in self, repr(self._first_promise) + repr(next_result)
-            assert self not in next_result
-            next_result._append_promise(self._next_promise)
-            self._set_next_promise(next_result)
-            assert self._next_promise
-            if self._next_promise._value is self.NO_RESULT:
-                self._next_promise._value = self._value
-            self.propagate_to_next()
-        elif next_result is not self.ASYNC_RESULT:
-            # simple map. simply forward
-            if self._next_promise:
-                self._next_promise._value = next_result
-            else:
-                # Hack: next_result is of type U, _value is of type T
-                self._value = next_result  # type: ignore
-            self.propagate_to_next()
-        else:
-            # asynchronous promise
-            pass
-
-    def propagate_to_next(self):
-        self._state = self.FINISHED
-        logger.debug('finalized {}'.format(repr(self)))
-        if self._next_promise:
-            self._next_promise._finalize()
-
-    def fail(self, e: Exception) -> None:
-        """
-        Sets the whole completion to be faild with this exception and end the
-        evaluation.
-        """
-        if self._state == self.FINISHED:
-            raise ValueError(
-                'Invalid State: called fail, but Completion is already finished: {}'.format(str(e)))
-        assert self._state in (self.INITIALIZED, self.RUNNING)
-        logger.exception('_Promise failed')
-        self._exception = e
-        self._value = f'_exception: {e}'
-        if self._next_promise:
-            self._next_promise.fail(e)
-        self._state = self.FINISHED
-
-    def __contains__(self, item):
-        return any(item is p for p in iter(self._first_promise))
-
-    def __iter__(self):
-        yield self
-        elem = self._next_promise
-        while elem is not None:
-            yield elem
-            elem = elem._next_promise
-
-    def _append_promise(self, other):
-        if other is not None:
-            assert self not in other
-            assert other not in self
-            self._last_promise()._set_next_promise(other)
-
-    def _last_promise(self) -> '_Promise':
-        return list(iter(self))[-1]
-
-
-class ProgressReference(object):
-    def __init__(self,
-                 message: str,
-                 mgr,
-                 completion: Optional[Callable[[], 'Completion']] = None
-                 ):
-        """
-        ProgressReference can be used within Completions::
-
-            +---------------+      +---------------------------------+
-            |               | then |                                 |
-            | My Completion | +--> | on_complete=ProgressReference() |
-            |               |      |                                 |
-            +---------------+      +---------------------------------+
-
-        See :func:`Completion.with_progress` for an easy way to create
-        a progress reference
-
-        """
-        super(ProgressReference, self).__init__()
-        self.progress_id = str(uuid.uuid4())
-        self.message = message
-        self.mgr = mgr
-
-        #: The completion can already have a result, before the write
-        #: operation is effective. progress == 1 means, the services are
-        #: created / removed.
-        self.completion: Optional[Callable[[], Completion]] = completion
-
-        #: if a orchestrator module can provide a more detailed
-        #: progress information, it needs to also call ``progress.update()``.
-        self.progress = 0.0
-
-        self._completion_has_result = False
-        self.mgr.all_progress_references.append(self)
-
-    def __str__(self):
-        """
-        ``__str__()`` is used for determining the message for progress events.
-        """
-        return self.message or super(ProgressReference, self).__str__()
-
-    def __call__(self, arg):
-        self._completion_has_result = True
-        self.progress = 1.0
-        return arg
-
-    @property
-    def progress(self):
-        return self._progress
-
-    @progress.setter
-    def progress(self, progress):
-        assert progress <= 1.0
-        self._progress = progress
-        try:
-            if self.effective:
-                self.mgr.remote("progress", "complete", self.progress_id)
-                self.mgr.all_progress_references = [
-                    p for p in self.mgr.all_progress_references if p is not self]
-            else:
-                self.mgr.remote("progress", "update", self.progress_id, self.message,
-                                progress,
-                                [("origin", "orchestrator")])
-        except ImportError:
-            # If the progress module is disabled that's fine,
-            # they just won't see the output.
-            pass
-
-    @property
-    def effective(self):
-        return self.progress == 1 and self._completion_has_result
-
-    def update(self):
-        def progress_run(progress):
-            self.progress = progress
-        if self.completion:
-            c = self.completion().then(progress_run)
-            self.mgr.process([c._first_promise])
-        else:
-            self.progress = 1
-
-    def fail(self):
-        self._completion_has_result = True
-        self.progress = 1
-
-
-class Completion(_Promise, Generic[T]):
-    """
-    Combines multiple promises into one overall operation.
-
-    Completions are composable by being able to
-    call one completion from another completion. I.e. making them re-usable
-    using Promises E.g.::
-
-        >>> #doctest: +SKIP
-        ... return Orchestrator().get_hosts().then(self._create_osd)
-
-    where ``get_hosts`` returns a Completion of list of hosts and
-    ``_create_osd`` takes a list of hosts.
-
-    The concept behind this is to store the computation steps
-    explicit and then explicitly evaluate the chain:
-
-        >>> #doctest: +SKIP
-        ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
-        ... p.finalize(2)
-        ... assert p.result = "4"
-
-    or graphically::
-
-        +---------------+      +-----------------+
-        |               | then |                 |
-        | lambda x: x*x | +--> | lambda x: str(x)|
-        |               |      |                 |
-        +---------------+      +-----------------+
-
-    """
-
-    def __init__(self,
-                 _first_promise: Optional["Completion"] = None,
-                 value: Any = _Promise.NO_RESULT,
-                 on_complete: Optional[Callable] = None,
-                 name: Optional[str] = None,
-                 ):
-        super(Completion, self).__init__(_first_promise, value, on_complete, name)
-
-    @property
-    def _progress_reference(self) -> Optional[ProgressReference]:
-        if hasattr(self._on_complete, 'progress_id'):
-            return self._on_complete  # type: ignore
-        return None
-
-    @property
-    def progress_reference(self) -> Optional[ProgressReference]:
-        """
-        ProgressReference. Marks this completion
-        as a write completeion.
-        """
-
-        references = [c._progress_reference for c in iter(
-            self) if c._progress_reference is not None]
-        if references:
-            assert len(references) == 1
-            return references[0]
-        return None
-
-    @classmethod
-    def with_progress(cls: Any,
-                      message: str,
-                      mgr,
-                      _first_promise: Optional["Completion"] = None,
-                      value: Any = _Promise.NO_RESULT,
-                      on_complete: Optional[Callable] = None,
-                      calc_percent: Optional[Callable[[], Any]] = None
-                      ) -> Any:
-
-        c = cls(
-            _first_promise=_first_promise,
-            value=value,
-            on_complete=on_complete
-        ).add_progress(message, mgr, calc_percent)
-
-        return c._first_promise
-
-    def add_progress(self,
-                     message: str,
-                     mgr,
-                     calc_percent: Optional[Callable[[], Any]] = None
-                     ):
-        return self.then(
-            on_complete=ProgressReference(
-                message=message,
-                mgr=mgr,
-                completion=calc_percent
-            )
-        )
-
-    def fail(self, e: Exception):
-        super(Completion, self).fail(e)
-        if self._progress_reference:
-            self._progress_reference.fail()
-
-    def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
-        if self._first_promise._state == self.INITIALIZED:
-            self._first_promise._finalize(result)
-
-    @property
-    def result(self) -> T:
-        """
-        The result of the operation that we were waited
-        for.  Only valid after calling Orchestrator.process() on this
-        completion.
-        """
-        last = self._last_promise()
-        assert last._state == _Promise.FINISHED
-        return cast(T, last._value)
+            self.serialized_exception = pickle.dumps(e)
 
     def result_str(self) -> str:
         """Force a string."""
@@ -569,92 +215,23 @@ class Completion(_Promise, Generic[T]):
             return '\n'.join(str(x) for x in self.result)
         return str(self.result)
 
-    @property
-    def exception(self) -> Optional[Exception]:
-        return self._last_promise()._exception
-
-    @property
-    def serialized_exception(self) -> Optional[bytes]:
-        return self._last_promise()._serialized_exception
 
-    @property
-    def has_result(self) -> bool:
-        """
-        Has the operation already a result?
-
-        For Write operations, it can already have a
-        result, if the orchestrator's configuration is
-        persistently written. Typically this would
-        indicate that an update had been written to
-        a manifest, but that the update had not
-        necessarily been pushed out to the cluster.
-
-        :return:
-        """
-        return self._last_promise()._state == _Promise.FINISHED
-
-    @property
-    def is_errored(self) -> bool:
-        """
-        Has the completion failed. Default implementation looks for
-        self.exception. Can be overwritten.
-        """
-        return self.exception is not None
-
-    @property
-    def needs_result(self) -> bool:
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return not self.is_errored and not self.has_result
-
-    @property
-    def is_finished(self) -> bool:
-        """
-        Could the external operation be deemed as complete,
-        or should we wait?
-        We must wait for a read operation only if it is not complete.
-        """
-        return self.is_errored or (self.has_result)
-
-    def pretty_print(self):
-
-        reprs = '\n'.join(p.pretty_print_1() for p in iter(self._first_promise))
-        return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
-
-
-def pretty_print(completions: Sequence[Completion]) -> str:
-    return ', '.join(c.pretty_print() for c in completions)
-
-
-def raise_if_exception(c: Completion) -> None:
+def raise_if_exception(c: OrchResult[T]) -> T:
     """
-    :raises OrchestratorError: Some user error or a config error.
-    :raises Exception: Some internal error
+    Due to different sub-interpreters, this MUST not be in the `OrchResult` class.
     """
     if c.serialized_exception is not None:
         try:
             e = pickle.loads(c.serialized_exception)
         except (KeyError, AttributeError):
-            raise Exception('{}: {}'.format(type(c.exception), c.exception))
+            raise Exception(c.exception_str)
         raise e
+    assert c.result is not None, 'OrchResult should either have an exception or a result'
+    return c.result
 
 
-class TrivialReadCompletion(Completion[T]):
-    """
-    This is the trivial completion simply wrapping a result.
-    """
-
-    def __init__(self, result: T):
-        super(TrivialReadCompletion, self).__init__()
-        if result:
-            self.finalize(result)
-
-
-def _hide_in_features(f):
-    f._hide_in_features = True
+def _hide_in_features(f: FuncT) -> FuncT:
+    f._hide_in_features = True  # type: ignore
     return f
 
 
@@ -682,7 +259,7 @@ class Orchestrator(object):
     """
 
     @_hide_in_features
-    def is_orchestrator_module(self):
+    def is_orchestrator_module(self) -> bool:
         """
         Enable other modules to interrogate this module to discover
         whether it's usable as an orchestrator module.
@@ -692,7 +269,7 @@ class Orchestrator(object):
         return True
 
     @_hide_in_features
-    def available(self) -> Tuple[bool, str]:
+    def available(self) -> Tuple[bool, str, Dict[str, Any]]:
         """
         Report whether we can talk to the orchestrator.  This is the
         place to give the user a meaningful message if the orchestrator
@@ -713,27 +290,14 @@ class Orchestrator(object):
                 ... if OrchestratorClientMixin().available()[0]:  # wrong.
                 ...     OrchestratorClientMixin().get_hosts()
 
-        :return: two-tuple of boolean, string
+        :return: boolean representing whether the module is available/usable
+        :return: string describing any error
+        :return: dict containing any module specific information
         """
         raise NotImplementedError()
 
     @_hide_in_features
-    def process(self, completions: List[Completion]) -> None:
-        """
-        Given a list of Completion instances, process any which are
-        incomplete.
-
-        Callers should inspect the detail of each completion to identify
-        partial completion/progress information, and present that information
-        to the user.
-
-        This method should not block, as this would make it slow to query
-        a status, while other long running operations are in progress.
-        """
-        raise NotImplementedError()
-
-    @_hide_in_features
-    def get_feature_set(self):
+    def get_feature_set(self) -> Dict[str, dict]:
         """Describes which methods this orchestrator implements
 
         .. note::
@@ -775,7 +339,7 @@ class Orchestrator(object):
     def resume(self) -> None:
         raise NotImplementedError()
 
-    def add_host(self, host_spec: HostSpec) -> Completion[str]:
+    def add_host(self, host_spec: HostSpec) -> OrchResult[str]:
         """
         Add a host to the orchestrator inventory.
 
@@ -783,7 +347,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_host(self, host: str) -> Completion[str]:
+    def remove_host(self, host: str, force: bool, offline: bool) -> OrchResult[str]:
         """
         Remove a host from the orchestrator inventory.
 
@@ -791,7 +355,15 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def update_host_addr(self, host: str, addr: str) -> Completion[str]:
+    def drain_host(self, hostname: str, force: bool = False) -> OrchResult[str]:
+        """
+        drain all daemons from a host
+
+        :param hostname: hostname
+        """
+        raise NotImplementedError()
+
+    def update_host_addr(self, host: str, addr: str) -> OrchResult[str]:
         """
         Update a host's address
 
@@ -800,7 +372,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def get_hosts(self) -> Completion[List[HostSpec]]:
+    def get_hosts(self) -> OrchResult[List[HostSpec]]:
         """
         Report the hosts in the cluster.
 
@@ -808,19 +380,25 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def add_host_label(self, host: str, label: str) -> Completion[str]:
+    def get_facts(self, hostname: Optional[str] = None) -> OrchResult[List[Dict[str, Any]]]:
+        """
+        Return hosts metadata(gather_facts).
+        """
+        raise NotImplementedError()
+
+    def add_host_label(self, host: str, label: str) -> OrchResult[str]:
         """
         Add a host label
         """
         raise NotImplementedError()
 
-    def remove_host_label(self, host: str, label: str) -> Completion[str]:
+    def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a host label
         """
         raise NotImplementedError()
 
-    def host_ok_to_stop(self, hostname: str) -> Completion:
+    def host_ok_to_stop(self, hostname: str) -> OrchResult:
         """
         Check if the specified host can be safely stopped without reducing availability
 
@@ -828,7 +406,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
+    def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
+        """
+        Place a host in maintenance, stopping daemons and disabling it's systemd target
+        """
+        raise NotImplementedError()
+
+    def exit_host_maintenance(self, hostname: str) -> OrchResult:
+        """
+        Return a host from maintenance, restarting the clusters systemd target
+        """
+        raise NotImplementedError()
+
+    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> OrchResult[List['InventoryHost']]:
         """
         Returns something that was created by `ceph-volume inventory`.
 
@@ -836,7 +426,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
+    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -850,7 +440,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
+    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
         """
         Describe a daemon (of any kind) that is already configured in
         the orchestrator.
@@ -859,11 +449,12 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
+    @handle_orch_error
+    def apply(self, specs: Sequence["GenericSpec"], no_overwrite: bool = False) -> List[str]:
         """
         Applies any spec
         """
-        fns: Dict[str, function] = {
+        fns: Dict[str, Callable[..., OrchResult[str]]] = {
             'alertmanager': self.apply_alertmanager,
             'crash': self.apply_crash,
             'grafana': self.apply_grafana,
@@ -873,36 +464,31 @@ class Orchestrator(object):
             'mon': self.apply_mon,
             'nfs': self.apply_nfs,
             'node-exporter': self.apply_node_exporter,
-            'osd': lambda dg: self.apply_drivegroups([dg]),
+            'osd': lambda dg: self.apply_drivegroups([dg]),  # type: ignore
             'prometheus': self.apply_prometheus,
+            'loki': self.apply_loki,
+            'promtail': self.apply_promtail,
             'rbd-mirror': self.apply_rbd_mirror,
             'rgw': self.apply_rgw,
+            'ingress': self.apply_ingress,
+            'snmp-gateway': self.apply_snmp_gateway,
             'host': self.add_host,
         }
 
-        def merge(ls, r):
-            if isinstance(ls, list):
-                return ls + [r]
-            return [ls, r]
-
-        spec, *specs = specs
-
-        fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-        completion = fn(spec)
-        for s in specs:
-            def next(ls):
-                fn = cast(Callable[["GenericSpec"], Completion], fns[spec.service_type])
-                return fn(s).then(lambda r: merge(ls, r))
-            completion = completion.then(next)
-        return completion
+        def merge(l: OrchResult[List[str]], r: OrchResult[str]) -> OrchResult[List[str]]:  # noqa: E741
+            l_res = raise_if_exception(l)
+            r_res = raise_if_exception(r)
+            l_res.append(r_res)
+            return OrchResult(l_res)
+        return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
 
-    def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
+    def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
         """
         Plan (Dry-run, Preview) a List of Specs.
         """
         raise NotImplementedError()
 
-    def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
+    def remove_daemons(self, names: List[str]) -> OrchResult[List[str]]:
         """
         Remove specific daemon(s).
 
@@ -910,7 +496,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_service(self, service_name: str) -> Completion[str]:
+    def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]:
         """
         Remove a service (a collection of daemons).
 
@@ -918,7 +504,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
+    def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]:
         """
         Perform an action (start/stop/reload) on a service (i.e., all daemons
         providing the logical service).
@@ -926,24 +512,24 @@ class Orchestrator(object):
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param service_name: service_type + '.' + service_id
                             (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
-        :rtype: Completion
+        :rtype: OrchResult
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> Completion[str]:
+    def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> OrchResult[str]:
         """
         Perform an action (start/stop/reload) on a daemon.
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
         :param daemon_name: name of daemon
         :param image: Container image when redeploying that daemon
-        :rtype: Completion
+        :rtype: OrchResult
         """
         # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
+    def create_osds(self, drive_group: DriveGroupSpec) -> OrchResult[str]:
         """
         Create one or more OSDs within a single Drive Group.
 
@@ -954,49 +540,53 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
         """ Update OSD cluster """
         raise NotImplementedError()
 
     def set_unmanaged_flag(self,
                            unmanaged_flag: bool,
                            service_type: str = 'osd',
-                           service_name=None
+                           service_name: Optional[str] = None
                            ) -> HandleCommandResult:
         raise NotImplementedError()
 
     def preview_osdspecs(self,
                          osdspec_name: Optional[str] = 'osd',
                          osdspecs: Optional[List[DriveGroupSpec]] = None
-                         ) -> Completion[str]:
+                         ) -> OrchResult[str]:
         """ Get a preview for OSD deployments """
         raise NotImplementedError()
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> Completion[str]:
+                    force: bool = False,
+                    zap: bool = False) -> OrchResult[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
         :param force: Forces the OSD removal process without waiting for the data to be drained first.
-        Note that this can only remove OSDs that were successfully
-        created (i.e. got an OSD ID).
+        :param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+
+
+        .. note:: this can only remove OSDs that were successfully
+            created (i.e. got an OSD ID).
         """
         raise NotImplementedError()
 
-    def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+    def stop_remove_osds(self, osd_ids: List[str]) -> OrchResult:
         """
         TODO
         """
         raise NotImplementedError()
 
-    def remove_osds_status(self) -> Completion:
+    def remove_osds_status(self) -> OrchResult:
         """
         Returns a status of the ongoing OSD removal operations.
         """
         raise NotImplementedError()
 
-    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
+    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> OrchResult[List[str]]:
         """
         Instructs the orchestrator to enable or disable either the ident or the fault LED.
 
@@ -1006,122 +596,98 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def zap_device(self, host: str, path: str) -> Completion[str]:
+    def zap_device(self, host: str, path: str) -> OrchResult[str]:
         """Zap/Erase a device (DESTROYS DATA)"""
         raise NotImplementedError()
 
-    def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create mon daemon(s)"""
+    def add_daemon(self, spec: ServiceSpec) -> OrchResult[List[str]]:
+        """Create daemons daemon(s) for unmanaged services"""
         raise NotImplementedError()
 
-    def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mon cluster"""
         raise NotImplementedError()
 
-    def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create mgr daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update mgr cluster"""
         raise NotImplementedError()
 
-    def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create MDS daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_mds(self, spec: MDSSpec) -> OrchResult[str]:
         """Update MDS cluster"""
         raise NotImplementedError()
 
-    def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
-        """Create RGW daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
+    def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
         """Update RGW cluster"""
         raise NotImplementedError()
 
-    def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create rbd-mirror daemon(s)"""
+    def apply_ingress(self, spec: IngressSpec) -> OrchResult[str]:
+        """Update ingress daemons"""
         raise NotImplementedError()
 
-    def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update rbd-mirror cluster"""
         raise NotImplementedError()
 
-    def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
-        """Create NFS daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
+    def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]:
         """Update NFS cluster"""
         raise NotImplementedError()
 
-    def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
-        """Create iscsi daemon(s)"""
-        raise NotImplementedError()
-
-    def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
+    def apply_iscsi(self, spec: IscsiServiceSpec) -> OrchResult[str]:
         """Update iscsi cluster"""
         raise NotImplementedError()
 
-    def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create new prometheus daemon"""
-        raise NotImplementedError()
-
-    def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_prometheus(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update prometheus cluster"""
         raise NotImplementedError()
 
-    def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new Node-Exporter service"""
+    def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
-    def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
-        """Update existing a Node-Exporter daemon(s)"""
+    def apply_loki(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Loki daemon(s)"""
         raise NotImplementedError()
 
-    def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new crash service"""
+    def apply_promtail(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a Promtail daemon(s)"""
         raise NotImplementedError()
 
-    def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
+    def apply_crash(self, spec: ServiceSpec) -> OrchResult[str]:
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
 
-    def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new Node-Exporter service"""
+    def apply_grafana(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update existing a grafana service"""
         raise NotImplementedError()
 
-    def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
-        """Update existing a Node-Exporter daemon(s)"""
+    def apply_alertmanager(self, spec: ServiceSpec) -> OrchResult[str]:
+        """Update an existing AlertManager daemon(s)"""
         raise NotImplementedError()
 
-    def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
-        """Create a new AlertManager service"""
+    def apply_snmp_gateway(self, spec: SNMPGatewaySpec) -> OrchResult[str]:
+        """Update an existing snmp gateway service"""
         raise NotImplementedError()
 
-    def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
-        """Update an existing AlertManager daemon(s)"""
+    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+    def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool] = False) -> OrchResult[Dict[Any, Any]]:
         raise NotImplementedError()
 
-    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
+    def upgrade_start(self, image: Optional[str], version: Optional[str], daemon_types: Optional[List[str]],
+                      hosts: Optional[str], services: Optional[List[str]], limit: Optional[int]) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_pause(self) -> Completion[str]:
+    def upgrade_pause(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_resume(self) -> Completion[str]:
+    def upgrade_resume(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_stop(self) -> Completion[str]:
+    def upgrade_stop(self) -> OrchResult[str]:
         raise NotImplementedError()
 
-    def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
+    def upgrade_status(self) -> OrchResult['UpgradeStatusSpec']:
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
@@ -1131,7 +697,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
-    def upgrade_available(self) -> Completion:
+    def upgrade_available(self) -> OrchResult:
         """
         Report on what versions are available to upgrade to
 
@@ -1150,24 +716,104 @@ def json_to_generic_spec(spec: dict) -> GenericSpec:
         return ServiceSpec.from_json(spec)
 
 
+def daemon_type_to_service(dtype: str) -> str:
+    mapping = {
+        'mon': 'mon',
+        'mgr': 'mgr',
+        'mds': 'mds',
+        'rgw': 'rgw',
+        'osd': 'osd',
+        'haproxy': 'ingress',
+        'keepalived': 'ingress',
+        'iscsi': 'iscsi',
+        'rbd-mirror': 'rbd-mirror',
+        'cephfs-mirror': 'cephfs-mirror',
+        'nfs': 'nfs',
+        'grafana': 'grafana',
+        'alertmanager': 'alertmanager',
+        'prometheus': 'prometheus',
+        'node-exporter': 'node-exporter',
+        'loki': 'loki',
+        'promtail': 'promtail',
+        'crash': 'crash',
+        'crashcollector': 'crash',  # Specific Rook Daemon
+        'container': 'container',
+        'agent': 'agent',
+        'snmp-gateway': 'snmp-gateway',
+    }
+    return mapping[dtype]
+
+
+def service_to_daemon_types(stype: str) -> List[str]:
+    mapping = {
+        'mon': ['mon'],
+        'mgr': ['mgr'],
+        'mds': ['mds'],
+        'rgw': ['rgw'],
+        'osd': ['osd'],
+        'ingress': ['haproxy', 'keepalived'],
+        'iscsi': ['iscsi'],
+        'rbd-mirror': ['rbd-mirror'],
+        'cephfs-mirror': ['cephfs-mirror'],
+        'nfs': ['nfs'],
+        'grafana': ['grafana'],
+        'alertmanager': ['alertmanager'],
+        'prometheus': ['prometheus'],
+        'loki': ['loki'],
+        'promtail': ['promtail'],
+        'node-exporter': ['node-exporter'],
+        'crash': ['crash'],
+        'container': ['container'],
+        'agent': ['agent'],
+        'snmp-gateway': ['snmp-gateway'],
+    }
+    return mapping[stype]
+
+
+KNOWN_DAEMON_TYPES: List[str] = list(
+    sum((service_to_daemon_types(t) for t in ServiceSpec.KNOWN_SERVICE_TYPES), []))
+
+
 class UpgradeStatusSpec(object):
     # Orchestrator's report on what's going on with any ongoing upgrade
-    def __init__(self):
+    def __init__(self) -> None:
         self.in_progress = False  # Is an upgrade underway?
-        self.target_image = None
-        self.services_complete = []  # Which daemon types are fully updated?
+        self.target_image: Optional[str] = None
+        self.services_complete: List[str] = []  # Which daemon types are fully updated?
+        self.which: str = '<unknown>'  # for if user specified daemon types, services or hosts
+        self.progress: Optional[str] = None  # How many of the daemons have we upgraded
         self.message = ""  # Freeform description
 
 
-def handle_type_error(method):
+def handle_type_error(method: FuncT) -> FuncT:
     @wraps(method)
-    def inner(cls, *args, **kwargs):
+    def inner(cls: Any, *args: Any, **kwargs: Any) -> Any:
         try:
             return method(cls, *args, **kwargs)
         except TypeError as e:
             error_msg = '{}: {}'.format(cls.__name__, e)
         raise OrchestratorValidationError(error_msg)
-    return inner
+    return cast(FuncT, inner)
+
+
+class DaemonDescriptionStatus(enum.IntEnum):
+    unknown = -2
+    error = -1
+    stopped = 0
+    running = 1
+    starting = 2  #: Daemon is deployed, but not yet running
+
+    @staticmethod
+    def to_str(status: Optional['DaemonDescriptionStatus']) -> str:
+        if status is None:
+            status = DaemonDescriptionStatus.unknown
+        return {
+            DaemonDescriptionStatus.unknown: 'unknown',
+            DaemonDescriptionStatus.error: 'error',
+            DaemonDescriptionStatus.stopped: 'stopped',
+            DaemonDescriptionStatus.running: 'running',
+            DaemonDescriptionStatus.starting: 'starting',
+        }.get(status, '<unknown>')
 
 
 class DaemonDescription(object):
@@ -1184,53 +830,74 @@ class DaemonDescription(object):
     """
 
     def __init__(self,
-                 daemon_type=None,
-                 daemon_id=None,
-                 hostname=None,
-                 container_id=None,
-                 container_image_id=None,
-                 container_image_name=None,
-                 version=None,
-                 status=None,
-                 status_desc=None,
-                 last_refresh=None,
-                 created=None,
-                 started=None,
-                 last_configured=None,
-                 osdspec_affinity=None,
-                 last_deployed=None,
+                 daemon_type: Optional[str] = None,
+                 daemon_id: Optional[str] = None,
+                 hostname: Optional[str] = None,
+                 container_id: Optional[str] = None,
+                 container_image_id: Optional[str] = None,
+                 container_image_name: Optional[str] = None,
+                 container_image_digests: Optional[List[str]] = None,
+                 version: Optional[str] = None,
+                 status: Optional[DaemonDescriptionStatus] = None,
+                 status_desc: Optional[str] = None,
+                 last_refresh: Optional[datetime.datetime] = None,
+                 created: Optional[datetime.datetime] = None,
+                 started: Optional[datetime.datetime] = None,
+                 last_configured: Optional[datetime.datetime] = None,
+                 osdspec_affinity: Optional[str] = None,
+                 last_deployed: Optional[datetime.datetime] = None,
                  events: Optional[List['OrchestratorEvent']] = None,
-                 is_active: bool = False):
-
-        # Host is at the same granularity as InventoryHost
-        self.hostname: str = hostname
+                 is_active: bool = False,
+                 memory_usage: Optional[int] = None,
+                 memory_request: Optional[int] = None,
+                 memory_limit: Optional[int] = None,
+                 cpu_percentage: Optional[str] = None,
+                 service_name: Optional[str] = None,
+                 ports: Optional[List[int]] = None,
+                 ip: Optional[str] = None,
+                 deployed_by: Optional[List[str]] = None,
+                 rank: Optional[int] = None,
+                 rank_generation: Optional[int] = None,
+                 extra_container_args: Optional[List[str]] = None,
+                 ) -> None:
+
+        #: Host is at the same granularity as InventoryHost
+        self.hostname: Optional[str] = hostname
 
         # Not everyone runs in containers, but enough people do to
         # justify having the container_id (runtime id) and container_image
         # (image name)
         self.container_id = container_id                  # runtime id
-        self.container_image_id = container_image_id      # image hash
+        self.container_image_id = container_image_id      # image id locally
         self.container_image_name = container_image_name  # image friendly name
+        self.container_image_digests = container_image_digests  # reg hashes
 
-        # The type of service (osd, mon, mgr, etc.)
+        #: The type of service (osd, mon, mgr, etc.)
         self.daemon_type = daemon_type
 
-        # The orchestrator will have picked some names for daemons,
-        # typically either based on hostnames or on pod names.
-        # This is the <foo> in mds.<foo>, the ID that will appear
-        # in the FSMap/ServiceMap.
-        self.daemon_id: str = daemon_id
+        #: The orchestrator will have picked some names for daemons,
+        #: typically either based on hostnames or on pod names.
+        #: This is the <foo> in mds.<foo>, the ID that will appear
+        #: in the FSMap/ServiceMap.
+        self.daemon_id: Optional[str] = daemon_id
+        self.daemon_name = self.name()
+
+        #: Some daemon types have a numeric rank assigned
+        self.rank: Optional[int] = rank
+        self.rank_generation: Optional[int] = rank_generation
 
-        # Service version that was deployed
+        self._service_name: Optional[str] = service_name
+
+        #: Service version that was deployed
         self.version = version
 
-        # Service status: -1 error, 0 stopped, 1 running
-        self.status = status
+        # Service status: -2 unknown, -1 error, 0 stopped, 1 running, 2 starting
+        self._status = status
 
-        # Service status description when status == -1.
+        #: Service status description when status == error.
         self.status_desc = status_desc
 
-        # datetime when this info was last refreshed
+        #: datetime when this info was last refreshed
         self.last_refresh: Optional[datetime.datetime] = last_refresh
 
         self.created: Optional[datetime.datetime] = created
@@ -1238,26 +905,67 @@ class DaemonDescription(object):
         self.last_configured: Optional[datetime.datetime] = last_configured
         self.last_deployed: Optional[datetime.datetime] = last_deployed
 
-        # Affinity to a certain OSDSpec
+        #: Affinity to a certain OSDSpec
         self.osdspec_affinity: Optional[str] = osdspec_affinity
 
         self.events: List[OrchestratorEvent] = events or []
 
+        self.memory_usage: Optional[int] = memory_usage
+        self.memory_request: Optional[int] = memory_request
+        self.memory_limit: Optional[int] = memory_limit
+
+        self.cpu_percentage: Optional[str] = cpu_percentage
+
+        self.ports: Optional[List[int]] = ports
+        self.ip: Optional[str] = ip
+
+        self.deployed_by = deployed_by
+
         self.is_active = is_active
 
-    def name(self):
+        self.extra_container_args = extra_container_args
+
+    @property
+    def status(self) -> Optional[DaemonDescriptionStatus]:
+        return self._status
+
+    @status.setter
+    def status(self, new: DaemonDescriptionStatus) -> None:
+        self._status = new
+        self.status_desc = DaemonDescriptionStatus.to_str(new)
+
+    def get_port_summary(self) -> str:
+        if not self.ports:
+            return ''
+        return f"{self.ip or '*'}:{','.join(map(str, self.ports or []))}"
+
+    def name(self) -> str:
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
     def matches_service(self, service_name: Optional[str]) -> bool:
+        assert self.daemon_id is not None
+        assert self.daemon_type is not None
         if service_name:
-            return self.name().startswith(service_name + '.')
+            return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
         return False
 
-    def service_id(self):
-        if self.daemon_type == 'osd' and self.osdspec_affinity:
-            return self.osdspec_affinity
+    def service_id(self) -> str:
+        assert self.daemon_id is not None
+        assert self.daemon_type is not None
+
+        if self._service_name:
+            if '.' in self._service_name:
+                return self._service_name.split('.', 1)[1]
+            else:
+                return ''
+
+        if self.daemon_type == 'osd':
+            if self.osdspec_affinity and self.osdspec_affinity != 'None':
+                return self.osdspec_affinity
+            return ''
 
-        def _match():
+        def _match() -> str:
+            assert self.daemon_id is not None
             err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
                                     f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
 
@@ -1297,34 +1005,51 @@ class DaemonDescription(object):
             # daemon_id == "service_id"
             return self.daemon_id
 
-        if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
+        if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
             return _match()
 
         return self.daemon_id
 
-    def service_name(self):
-        if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
-            return f'{self.daemon_type}.{self.service_id()}'
-        return self.daemon_type
+    def service_name(self) -> str:
+        if self._service_name:
+            return self._service_name
+        assert self.daemon_type is not None
+        if daemon_type_to_service(self.daemon_type) in ServiceSpec.REQUIRES_SERVICE_ID:
+            return f'{daemon_type_to_service(self.daemon_type)}.{self.service_id()}'
+        return daemon_type_to_service(self.daemon_type)
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "<DaemonDescription>({type}.{id})".format(type=self.daemon_type,
                                                          id=self.daemon_id)
 
-    def to_json(self):
-        out = OrderedDict()
+    def __str__(self) -> str:
+        return f"{self.name()} in status {self.status_desc} on {self.hostname}"
+
+    def to_json(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
         out['daemon_type'] = self.daemon_type
         out['daemon_id'] = self.daemon_id
+        out['service_name'] = self._service_name
+        out['daemon_name'] = self.name()
         out['hostname'] = self.hostname
         out['container_id'] = self.container_id
         out['container_image_id'] = self.container_image_id
         out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
         out['version'] = self.version
-        out['status'] = self.status
+        out['status'] = self.status.value if self.status is not None else None
         out['status_desc'] = self.status_desc
         if self.daemon_type == 'osd':
             out['osdspec_affinity'] = self.osdspec_affinity
         out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+        out['rank'] = self.rank
+        out['rank_generation'] = self.rank_generation
 
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
@@ -1339,9 +1064,45 @@ class DaemonDescription(object):
             del out[e]
         return out
 
+    def to_dict(self) -> dict:
+        out: Dict[str, Any] = OrderedDict()
+        out['daemon_type'] = self.daemon_type
+        out['daemon_id'] = self.daemon_id
+        out['daemon_name'] = self.name()
+        out['hostname'] = self.hostname
+        out['container_id'] = self.container_id
+        out['container_image_id'] = self.container_image_id
+        out['container_image_name'] = self.container_image_name
+        out['container_image_digests'] = self.container_image_digests
+        out['memory_usage'] = self.memory_usage
+        out['memory_request'] = self.memory_request
+        out['memory_limit'] = self.memory_limit
+        out['cpu_percentage'] = self.cpu_percentage
+        out['version'] = self.version
+        out['status'] = self.status.value if self.status is not None else None
+        out['status_desc'] = self.status_desc
+        if self.daemon_type == 'osd':
+            out['osdspec_affinity'] = self.osdspec_affinity
+        out['is_active'] = self.is_active
+        out['ports'] = self.ports
+        out['ip'] = self.ip
+
+        for k in ['last_refresh', 'created', 'started', 'last_deployed',
+                  'last_configured']:
+            if getattr(self, k):
+                out[k] = datetime_to_str(getattr(self, k))
+
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+
+        empty = [k for k, v in out.items() if v is None]
+        for e in empty:
+            del out[e]
+        return out
+
     @classmethod
     @handle_type_error
-    def from_json(cls, data):
+    def from_json(cls, data: dict) -> 'DaemonDescription':
         c = data.copy()
         event_strs = c.pop('events', [])
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
@@ -1349,15 +1110,28 @@ class DaemonDescription(object):
             if k in c:
                 c[k] = str_to_datetime(c[k])
         events = [OrchestratorEvent.from_json(e) for e in event_strs]
-        return cls(events=events, **c)
+        status_int = c.pop('status', None)
+        if 'daemon_name' in c:
+            del c['daemon_name']
+        if 'service_name' in c and c['service_name'].startswith('osd.'):
+            # if the service_name is a osd.NNN (numeric osd id) then
+            # ignore it -- it is not a valid service_name and
+            # (presumably) came from an older version of cephadm.
+            try:
+                int(c['service_name'][4:])
+                del c['service_name']
+            except ValueError:
+                pass
+        status = DaemonDescriptionStatus(status_int) if status_int is not None else None
+        return cls(events=events, status=status, **c)
 
-    def __copy__(self):
+    def __copy__(self) -> 'DaemonDescription':
         # feel free to change this:
         return DaemonDescription.from_json(self.to_json())
 
     @staticmethod
-    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
-        return dumper.represent_dict(data.to_json().items())
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
 
 
 yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
@@ -1378,25 +1152,23 @@ class ServiceDescription(object):
 
     def __init__(self,
                  spec: ServiceSpec,
-                 container_image_id=None,
-                 container_image_name=None,
-                 rados_config_location=None,
-                 service_url=None,
-                 last_refresh=None,
-                 created=None,
-                 size=0,
-                 running=0,
-                 events: Optional[List['OrchestratorEvent']] = None):
+                 container_image_id: Optional[str] = None,
+                 container_image_name: Optional[str] = None,
+                 service_url: Optional[str] = None,
+                 last_refresh: Optional[datetime.datetime] = None,
+                 created: Optional[datetime.datetime] = None,
+                 deleted: Optional[datetime.datetime] = None,
+                 size: int = 0,
+                 running: int = 0,
+                 events: Optional[List['OrchestratorEvent']] = None,
+                 virtual_ip: Optional[str] = None,
+                 ports: List[int] = []) -> None:
         # Not everyone runs in containers, but enough people do to
         # justify having the container_image_id (image hash) and container_image
         # (image name)
         self.container_image_id = container_image_id      # image hash
         self.container_image_name = container_image_name  # image friendly name
 
-        # Location of the service configuration when stored in rados
-        # object. Format: "rados://<pool>/[<namespace/>]<object>"
-        self.rados_config_location = rados_config_location
-
         # If the service exposes REST-like API, this attribute should hold
         # the URL.
         self.service_url = service_url
@@ -1410,28 +1182,38 @@ class ServiceDescription(object):
         # datetime when this info was last refreshed
         self.last_refresh: Optional[datetime.datetime] = last_refresh
         self.created: Optional[datetime.datetime] = created
+        self.deleted: Optional[datetime.datetime] = deleted
 
         self.spec: ServiceSpec = spec
 
         self.events: List[OrchestratorEvent] = events or []
 
-    def service_type(self):
+        self.virtual_ip = virtual_ip
+        self.ports = ports
+
+    def service_type(self) -> str:
         return self.spec.service_type
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return f"<ServiceDescription of {self.spec.one_line_str()}>"
 
+    def get_port_summary(self) -> str:
+        if not self.ports:
+            return ''
+        return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
+
     def to_json(self) -> OrderedDict:
         out = self.spec.to_json()
         status = {
             'container_image_id': self.container_image_id,
             'container_image_name': self.container_image_name,
-            'rados_config_location': self.rados_config_location,
             'service_url': self.service_url,
             'size': self.size,
             'running': self.running,
             'last_refresh': self.last_refresh,
             'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
         }
         for k in ['last_refresh', 'created']:
             if getattr(self, k):
@@ -1442,9 +1224,31 @@ class ServiceDescription(object):
             out['events'] = [e.to_json() for e in self.events]
         return out
 
+    def to_dict(self) -> OrderedDict:
+        out = self.spec.to_json()
+        status = {
+            'container_image_id': self.container_image_id,
+            'container_image_name': self.container_image_name,
+            'service_url': self.service_url,
+            'size': self.size,
+            'running': self.running,
+            'last_refresh': self.last_refresh,
+            'created': self.created,
+            'virtual_ip': self.virtual_ip,
+            'ports': self.ports if self.ports else None,
+        }
+        for k in ['last_refresh', 'created']:
+            if getattr(self, k):
+                status[k] = datetime_to_str(getattr(self, k))
+        status = {k: v for (k, v) in status.items() if v is not None}
+        out['status'] = status
+        if self.events:
+            out['events'] = [e.to_dict() for e in self.events]
+        return out
+
     @classmethod
     @handle_type_error
-    def from_json(cls, data: dict):
+    def from_json(cls, data: dict) -> 'ServiceDescription':
         c = data.copy()
         status = c.pop('status', {})
         event_strs = c.pop('events', [])
@@ -1458,8 +1262,8 @@ class ServiceDescription(object):
         return cls(spec=spec, events=events, **c_status)
 
     @staticmethod
-    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
-        return dumper.represent_dict(data.to_json().items())
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'ServiceDescription') -> Any:
+        return dumper.represent_dict(cast(Mapping, data.to_json().items()))
 
 
 yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
@@ -1470,13 +1274,14 @@ class InventoryFilter(object):
     When fetching inventory, use this filter to avoid unnecessarily
     scanning the whole estate.
 
-    Typical use: filter by host when presenting UI workflow for configuring
-                 a particular server.
-                 filter by label when not all of estate is Ceph servers,
-                 and we want to only learn about the Ceph servers.
-                 filter by label when we are interested particularly
-                 in e.g. OSD servers.
+    Typical use:
 
+      filter by host when presentig UI workflow for configuring
+      a particular server.
+      filter by label when not all of estate is Ceph servers,
+      and we want to only learn about the Ceph servers.
+      filter by label when we are interested particularly
+      in e.g. OSD servers.
     """
 
     def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
@@ -1506,7 +1311,7 @@ class InventoryHost(object):
         self.devices = devices
         self.labels = labels
 
-    def to_json(self):
+    def to_json(self) -> dict:
         return {
             'name': self.name,
             'addr': self.addr,
@@ -1515,7 +1320,7 @@ class InventoryHost(object):
         }
 
     @classmethod
-    def from_json(cls, data):
+    def from_json(cls, data: dict) -> 'InventoryHost':
         try:
             _data = copy.deepcopy(data)
             name = _data.pop('name')
@@ -1533,18 +1338,18 @@ class InventoryHost(object):
             raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
 
     @classmethod
-    def from_nested_items(cls, hosts):
+    def from_nested_items(cls, hosts: List[dict]) -> List['InventoryHost']:
         devs = inventory.Devices.from_json
         return [cls(item[0], devs(item[1].data)) for item in hosts]
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "<InventoryHost>({name})".format(name=self.name)
 
     @staticmethod
     def get_host_names(hosts: List['InventoryHost']) -> List[str]:
         return [host.name for host in hosts]
 
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
         return self.name == other.name and self.devices == other.devices
 
 
@@ -1571,7 +1376,8 @@ class OrchestratorEvent:
     ERROR = 'ERROR'
     regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
 
-    def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
+    def __init__(self, created: Union[str, datetime.datetime], kind: str,
+                 subject: str, level: str, message: str) -> None:
         if isinstance(created, str):
             created = str_to_datetime(created)
         self.created: datetime.datetime = created
@@ -1598,9 +1404,18 @@ class OrchestratorEvent:
         created = datetime_to_str(self.created)
         return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
 
+    def to_dict(self) -> dict:
+        # Convert events data to dict.
+        return {
+            'created': datetime_to_str(self.created),
+            'subject': self.kind_subject(),
+            'level': self.level,
+            'message': self.message
+        }
+
     @classmethod
     @handle_type_error
-    def from_json(cls, data) -> "OrchestratorEvent":
+    def from_json(cls, data: str) -> "OrchestratorEvent":
         """
         >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
         '2020-06-10T10:20:25.691255Z daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
@@ -1613,26 +1428,30 @@ class OrchestratorEvent:
             return cls(*match.groups())
         raise ValueError(f'Unable to match: "{data}"')
 
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
         if not isinstance(other, OrchestratorEvent):
             return False
 
         return self.created == other.created and self.kind == other.kind \
             and self.subject == other.subject and self.message == other.message
 
+    def __repr__(self) -> str:
+        return f'OrchestratorEvent.from_json({self.to_json()!r})'
+
 
-def _mk_orch_methods(cls):
+def _mk_orch_methods(cls: Any) -> Any:
     # Needs to be defined outside of for.
     # Otherwise meth is always bound to last key
-    def shim(method_name):
-        def inner(self, *args, **kwargs):
+    def shim(method_name: str) -> Callable:
+        def inner(self: Any, *args: Any, **kwargs: Any) -> Any:
             completion = self._oremote(method_name, args, kwargs)
             return completion
         return inner
 
-    for meth in Orchestrator.__dict__:
-        if not meth.startswith('_') and meth not in ['is_orchestrator_module']:
-            setattr(cls, meth, shim(meth))
+    for name, method in Orchestrator.__dict__.items():
+        if not name.startswith('_') and name not in ['is_orchestrator_module']:
+            remote_call = update_wrapper(shim(name), method)
+            setattr(cls, name, remote_call)
     return cls
 
 
@@ -1648,7 +1467,6 @@ class OrchestratorClientMixin(Orchestrator):
     >>> class MyModule(OrchestratorClientMixin):
     ...    def func(self):
     ...        completion = self.add_host('somehost')  # calls `_oremote()`
-    ...        self._orchestrator_wait([completion])
     ...        self.log.debug(completion.result)
 
     .. note:: Orchestrator implementations should not inherit from `OrchestratorClientMixin`.
@@ -1671,13 +1489,13 @@ class OrchestratorClientMixin(Orchestrator):
 
         self.__mgr = mgr  # Make sure we're not overwriting any other `mgr` properties
 
-    def __get_mgr(self):
+    def __get_mgr(self) -> Any:
         try:
             return self.__mgr
         except AttributeError:
             return self
 
-    def _oremote(self, meth, args, kwargs):
+    def _oremote(self, meth: Any, args: Any, kwargs: Any) -> Any:
         """
         Helper for invoking `remote` on whichever orchestrator is enabled
 
@@ -1705,24 +1523,3 @@ class OrchestratorClientMixin(Orchestrator):
             if meth not in f_set or not f_set[meth]['available']:
                 raise NotImplementedError(f'{o} does not implement {meth}') from e
             raise
-
-    def _orchestrator_wait(self, completions: List[Completion]) -> None:
-        """
-        Wait for completions to complete (reads) or
-        become persistent (writes).
-
-        Waits for writes to be *persistent* but not *effective*.
-
-        :param completions: List of Completions
-        :raises NoOrchestrator:
-        :raises RuntimeError: something went wrong while calling the process method.
-        :raises ImportError: no `orchestrator` module or backend not found.
-        """
-        while any(not c.has_result for c in completions):
-            self.process(completions)
-            self.__get_mgr().log.info("Operations pending: %s",
-                                      sum(1 for c in completions if not c.has_result))
-            if any(c.needs_result for c in completions):
-                time.sleep(1)
-            else:
-                break