]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/orchestrator/_interface.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / orchestrator / _interface.py
index a27c000f99b5b362c47c63f94d6100305943becd..55c512aee449984e766db5354ee2fbf8350c5a43 100644 (file)
@@ -14,19 +14,23 @@ import re
 import time
 import uuid
 
-from collections import namedtuple
+from collections import namedtuple, OrderedDict
+from contextlib import contextmanager
 from functools import wraps
 
+import yaml
+
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
     ServiceSpecValidationError, IscsiServiceSpec
 from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.hostspec import HostSpec
 
 from mgr_module import MgrModule, CLICommand, HandleCommandResult
 
 try:
     from typing import TypeVar, Generic, List, Optional, Union, Tuple, Iterator, Callable, Any, \
-    Type, Sequence, Dict, cast
+        Type, Sequence, Dict, cast
 except ImportError:
     pass
 
@@ -34,6 +38,8 @@ logger = logging.getLogger(__name__)
 
 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
 
+T = TypeVar('T')
+
 
 class OrchestratorError(Exception):
     """
@@ -44,13 +50,23 @@ class OrchestratorError(Exception):
     It's not intended for programming errors or orchestrator internal errors.
     """
 
+    def __init__(self,
+                 msg: str,
+                 errno: int = -errno.EINVAL,
+                 event_kind_subject: Optional[Tuple[str, str]] = None):
+        super(Exception, self).__init__(msg)
+        self.errno = errno
+        # See OrchestratorEvent.subject
+        self.event_subject = event_kind_subject
+
 
 class NoOrchestrator(OrchestratorError):
     """
     No orchestrator in configured.
     """
+
     def __init__(self, msg="No orchestrator configured (try `ceph orch set backend`)"):
-        super(NoOrchestrator, self).__init__(msg)
+        super(NoOrchestrator, self).__init__(msg, errno=-errno.ENOENT)
 
 
 class OrchestratorValidationError(OrchestratorError):
@@ -59,13 +75,25 @@ class OrchestratorValidationError(OrchestratorError):
     """
 
 
+@contextmanager
+def set_exception_subject(kind, subject, overwrite=False):
+    try:
+        yield
+    except OrchestratorError as e:
+        if overwrite or hasattr(e, 'event_subject'):
+            e.event_subject = (kind, subject)
+        raise
+
+
 def handle_exception(prefix, cmd_args, desc, perm, func):
     @wraps(func)
     def wrapper(*args, **kwargs):
         try:
             return func(*args, **kwargs)
-        except (OrchestratorError, ImportError, ServiceSpecValidationError) as e:
+        except (OrchestratorError, ServiceSpecValidationError) as e:
             # Do not print Traceback for expected errors.
+            return HandleCommandResult(e.errno, stderr=str(e))
+        except ImportError as e:
             return HandleCommandResult(-errno.ENOENT, stderr=str(e))
         except NotImplementedError:
             msg = 'This Orchestrator does not support `{}`'.format(prefix)
@@ -99,7 +127,7 @@ class CLICommandMeta(type):
     """
     def __init__(cls, name, bases, dct):
         super(CLICommandMeta, cls).__init__(name, bases, dct)
-        dispatch = {}  # type: Dict[str, CLICommand]
+        dispatch: Dict[str, CLICommand] = {}
         for v in dct.values():
             try:
                 dispatch[v._prefix] = v._cli_command
@@ -135,32 +163,31 @@ class _Promise(object):
     RUNNING = 2
     FINISHED = 3  # we have a final result
 
-    NO_RESULT = _no_result()  # type: None
+    NO_RESULT: None = _no_result()
     ASYNC_RESULT = object()
 
     def __init__(self,
-                 _first_promise=None,  # type: Optional["_Promise"]
-                 value=NO_RESULT,  # type: Optional[Any]
-                 on_complete=None,    # type: Optional[Callable]
-                 name=None,  # type: Optional[str]
+                 _first_promise: Optional["_Promise"] = None,
+                 value: Optional[Any] = NO_RESULT,
+                 on_complete: Optional[Callable] = None,
+                 name: Optional[str] = None,
                  ):
         self._on_complete_ = on_complete
         self._name = name
-        self._next_promise = None  # type: Optional[_Promise]
+        self._next_promise: Optional[_Promise] = None
 
         self._state = self.INITIALIZED
-        self._exception = None  # type: Optional[Exception]
+        self._exception: Optional[Exception] = None
 
         # Value of this _Promise. may be an intermediate result.
         self._value = value
 
         # _Promise is not a continuation monad, as `_result` is of type
         # T instead of (T -> r) -> r. Therefore we need to store the first promise here.
-        self._first_promise = _first_promise or self  # type: '_Promise'
+        self._first_promise: '_Promise' = _first_promise or self
 
     @property
-    def _exception(self):
-        # type: () -> Optional[Exception]
+    def _exception(self) -> Optional[Exception]:
         return getattr(self, '_exception_', None)
 
     @_exception.setter
@@ -178,29 +205,25 @@ class _Promise(object):
             self._serialized_exception_ = pickle.dumps(e)
 
     @property
-    def _serialized_exception(self):
-        # type: () -> Optional[bytes]
+    def _serialized_exception(self) -> Optional[bytes]:
         return getattr(self, '_serialized_exception_', None)
 
-
-
     @property
-    def _on_complete(self):
-        # type: () -> Optional[Callable]
+    def _on_complete(self) -> Optional[Callable]:
         # https://github.com/python/mypy/issues/4125
         return self._on_complete_
 
     @_on_complete.setter
-    def _on_complete(self, val):
-        # type: (Optional[Callable]) -> None
+    def _on_complete(self, val: Optional[Callable]) -> None:
         self._on_complete_ = val
 
-
     def __repr__(self):
-        name = self._name or getattr(self._on_complete, '__name__', '??') if self._on_complete else 'None'
+        name = self._name or getattr(self._on_complete, '__name__',
+                                     '??') if self._on_complete else 'None'
         val = repr(self._value) if self._value is not self.NO_RESULT else 'NA'
         return '{}(_s={}, val={}, _on_c={}, id={}, name={}, pr={}, _next={})'.format(
-            self.__class__, self._state, val, self._on_complete, id(self), name, getattr(next, '_progress_reference', 'NA'), repr(self._next_promise)
+            self.__class__, self._state, val, self._on_complete, id(self), name, getattr(
+                next, '_progress_reference', 'NA'), repr(self._next_promise)
         )
 
     def pretty_print_1(self):
@@ -220,8 +243,7 @@ class _Promise(object):
         }[self._state]
         return '{} {}({}),'.format(prefix, name, val)
 
-    def then(self, on_complete):
-        # type: (Any, Callable) -> Any
+    def then(self: Any, on_complete: Callable) -> Any:
         """
         Call ``on_complete`` as soon as this promise is finalized.
         """
@@ -242,8 +264,7 @@ class _Promise(object):
             self._set_next_promise(self.__class__(_first_promise=self._first_promise))
             return self._next_promise
 
-    def _set_next_promise(self, next):
-        # type: (_Promise) -> None
+    def _set_next_promise(self, next: '_Promise') -> None:
         assert self is not next
         assert self._state in (self.INITIALIZED, self.RUNNING)
 
@@ -301,15 +322,13 @@ class _Promise(object):
             # asynchronous promise
             pass
 
-
     def propagate_to_next(self):
         self._state = self.FINISHED
         logger.debug('finalized {}'.format(repr(self)))
         if self._next_promise:
             self._next_promise._finalize()
 
-    def fail(self, e):
-        # type: (Exception) -> None
+    def fail(self, e: Exception) -> None:
         """
         Sets the whole completion to be faild with this exception and end the
         evaluation.
@@ -341,17 +360,16 @@ class _Promise(object):
             assert other not in self
             self._last_promise()._set_next_promise(other)
 
-    def _last_promise(self):
-        # type: () -> _Promise
+    def _last_promise(self) -> '_Promise':
         return list(iter(self))[-1]
 
 
 class ProgressReference(object):
     def __init__(self,
-                 message,  # type: str
+                 message: str,
                  mgr,
-                 completion=None  # type: Optional[Callable[[], Completion]]
-                ):
+                 completion: Optional[Callable[[], 'Completion']] = None
+                 ):
         """
         ProgressReference can be used within Completions::
 
@@ -373,7 +391,7 @@ class ProgressReference(object):
         #: The completion can already have a result, before the write
         #: operation is effective. progress == 1 means, the services are
         #: created / removed.
-        self.completion = completion  # type: Optional[Callable[[], Completion]]
+        self.completion: Optional[Callable[[], Completion]] = completion
 
         #: if a orchestrator module can provide a more detailed
         #: progress information, it needs to also call ``progress.update()``.
@@ -404,7 +422,8 @@ class ProgressReference(object):
         try:
             if self.effective:
                 self.mgr.remote("progress", "complete", self.progress_id)
-                self.mgr.all_progress_references = [p for p in self.mgr.all_progress_references if p is not self]
+                self.mgr.all_progress_references = [
+                    p for p in self.mgr.all_progress_references if p is not self]
             else:
                 self.mgr.remote("progress", "update", self.progress_id, self.message,
                                 progress,
@@ -432,7 +451,7 @@ class ProgressReference(object):
         self.progress = 1
 
 
-class Completion(_Promise):
+class Completion(_Promise, Generic[T]):
     """
     Combines multiple promises into one overall operation.
 
@@ -463,45 +482,44 @@ class Completion(_Promise):
         +---------------+      +-----------------+
 
     """
+
     def __init__(self,
-                 _first_promise=None,  # type: Optional["Completion"]
-                 value=_Promise.NO_RESULT,  # type: Any
-                 on_complete=None,  # type: Optional[Callable]
-                 name=None,  # type: Optional[str]
+                 _first_promise: Optional["Completion"] = None,
+                 value: Any = _Promise.NO_RESULT,
+                 on_complete: Optional[Callable] = None,
+                 name: Optional[str] = None,
                  ):
         super(Completion, self).__init__(_first_promise, value, on_complete, name)
 
     @property
-    def _progress_reference(self):
-        # type: () -> Optional[ProgressReference]
+    def _progress_reference(self) -> Optional[ProgressReference]:
         if hasattr(self._on_complete, 'progress_id'):
             return self._on_complete  # type: ignore
         return None
 
     @property
-    def progress_reference(self):
-        # type: () -> Optional[ProgressReference]
+    def progress_reference(self) -> Optional[ProgressReference]:
         """
         ProgressReference. Marks this completion
         as a write completeion.
         """
 
-        references = [c._progress_reference for c in iter(self) if c._progress_reference is not None]
+        references = [c._progress_reference for c in iter(
+            self) if c._progress_reference is not None]
         if references:
             assert len(references) == 1
             return references[0]
         return None
 
     @classmethod
-    def with_progress(cls,  # type: Any
-                      message,  # type: str
+    def with_progress(cls: Any,
+                      message: str,
                       mgr,
-                      _first_promise=None,  # type: Optional["Completion"]
-                      value=_Promise.NO_RESULT,  # type: Any
-                      on_complete=None,  # type: Optional[Callable]
-                      calc_percent=None  # type: Optional[Callable[[], Any]]
-                      ):
-        # type: (...) -> Any
+                      _first_promise: Optional["Completion"] = None,
+                      value: Any = _Promise.NO_RESULT,
+                      on_complete: Optional[Callable] = None,
+                      calc_percent: Optional[Callable[[], Any]] = None
+                      ) -> Any:
 
         c = cls(
             _first_promise=_first_promise,
@@ -512,9 +530,9 @@ class Completion(_Promise):
         return c._first_promise
 
     def add_progress(self,
-                     message,  # type: str
+                     message: str,
                      mgr,
-                     calc_percent=None  # type: Optional[Callable[[], Any]]
+                     calc_percent: Optional[Callable[[], Any]] = None
                      ):
         return self.then(
             on_complete=ProgressReference(
@@ -524,17 +542,17 @@ class Completion(_Promise):
             )
         )
 
-    def fail(self, e):
+    def fail(self, e: Exception):
         super(Completion, self).fail(e)
         if self._progress_reference:
             self._progress_reference.fail()
 
-    def finalize(self, result=_Promise.NO_RESULT):
+    def finalize(self, result: Union[None, object, T] = _Promise.NO_RESULT):
         if self._first_promise._state == self.INITIALIZED:
             self._first_promise._finalize(result)
 
     @property
-    def result(self):
+    def result(self) -> T:
         """
         The result of the operation that we were waited
         for.  Only valid after calling Orchestrator.process() on this
@@ -542,9 +560,9 @@ class Completion(_Promise):
         """
         last = self._last_promise()
         assert last._state == _Promise.FINISHED
-        return last._value
+        return cast(T, last._value)
 
-    def result_str(self):
+    def result_str(self) -> str:
         """Force a string."""
         if self.result is None:
             return ''
@@ -553,18 +571,15 @@ class Completion(_Promise):
         return str(self.result)
 
     @property
-    def exception(self):
-        # type: () -> Optional[Exception]
+    def exception(self) -> Optional[Exception]:
         return self._last_promise()._exception
 
     @property
-    def serialized_exception(self):
-        # type: () -> Optional[bytes]
+    def serialized_exception(self) -> Optional[bytes]:
         return self._last_promise()._serialized_exception
 
     @property
-    def has_result(self):
-        # type: () -> bool
+    def has_result(self) -> bool:
         """
         Has the operation already a result?
 
@@ -580,8 +595,7 @@ class Completion(_Promise):
         return self._last_promise()._state == _Promise.FINISHED
 
     @property
-    def is_errored(self):
-        # type: () -> bool
+    def is_errored(self) -> bool:
         """
         Has the completion failed. Default implementation looks for
         self.exception. Can be overwritten.
@@ -589,8 +603,7 @@ class Completion(_Promise):
         return self.exception is not None
 
     @property
-    def needs_result(self):
-        # type: () -> bool
+    def needs_result(self) -> bool:
         """
         Could the external operation be deemed as complete,
         or should we wait?
@@ -599,8 +612,7 @@ class Completion(_Promise):
         return not self.is_errored and not self.has_result
 
     @property
-    def is_finished(self):
-        # type: () -> bool
+    def is_finished(self) -> bool:
         """
         Could the external operation be deemed as complete,
         or should we wait?
@@ -614,13 +626,11 @@ class Completion(_Promise):
         return """<{}>[\n{}\n]""".format(self.__class__.__name__, reprs)
 
 
-def pretty_print(completions):
-    # type: (Sequence[Completion]) -> str
+def pretty_print(completions: Sequence[Completion]) -> str:
     return ', '.join(c.pretty_print() for c in completions)
 
 
-def raise_if_exception(c):
-    # type: (Completion) -> None
+def raise_if_exception(c: Completion) -> None:
     """
     :raises OrchestratorError: Some user error or a config error.
     :raises Exception: Some internal error
@@ -633,11 +643,12 @@ def raise_if_exception(c):
         raise e
 
 
-class TrivialReadCompletion(Completion):
+class TrivialReadCompletion(Completion[T]):
     """
     This is the trivial completion simply wrapping a result.
     """
-    def __init__(self, result):
+
+    def __init__(self, result: T):
         super(TrivialReadCompletion, self).__init__()
         if result:
             self.finalize(result)
@@ -682,8 +693,7 @@ class Orchestrator(object):
         return True
 
     @_hide_in_features
-    def available(self):
-        # type: () -> Tuple[bool, str]
+    def available(self) -> Tuple[bool, str]:
         """
         Report whether we can talk to the orchestrator.  This is the
         place to give the user a meaningful message if the orchestrator
@@ -709,8 +719,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
-    def process(self, completions):
-        # type: (List[Completion]) -> None
+    def process(self, completions: List[Completion]) -> None:
         """
         Given a list of Completion instances, process any which are
         incomplete.
@@ -755,23 +764,19 @@ class Orchestrator(object):
                     }
         return features
 
-    def cancel_completions(self):
-        # type: () -> None
+    def cancel_completions(self) -> None:
         """
         Cancels ongoing completions. Unstuck the mgr.
         """
         raise NotImplementedError()
 
-    def pause(self):
-        # type: () -> None
+    def pause(self) -> None:
         raise NotImplementedError()
 
-    def resume(self):
-        # type: () -> None
+    def resume(self) -> None:
         raise NotImplementedError()
 
-    def add_host(self, host_spec):
-        # type: (HostSpec) -> Completion
+    def add_host(self, host_spec: HostSpec) -> Completion[str]:
         """
         Add a host to the orchestrator inventory.
 
@@ -779,8 +784,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_host(self, host):
-        # type: (str) -> Completion
+    def remove_host(self, host: str) -> Completion[str]:
         """
         Remove a host from the orchestrator inventory.
 
@@ -788,8 +792,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def update_host_addr(self, host, addr):
-        # type: (str, str) -> Completion
+    def update_host_addr(self, host: str, addr: str) -> Completion[str]:
         """
         Update a host's address
 
@@ -798,8 +801,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def get_hosts(self):
-        # type: () -> Completion
+    def get_hosts(self) -> Completion[List[HostSpec]]:
         """
         Report the hosts in the cluster.
 
@@ -807,22 +809,27 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def add_host_label(self, host, label):
-        # type: (str, str) -> Completion
+    def add_host_label(self, host: str, label: str) -> Completion[str]:
         """
         Add a host label
         """
         raise NotImplementedError()
 
-    def remove_host_label(self, host, label):
-        # type: (str, str) -> Completion
+    def remove_host_label(self, host: str, label: str) -> Completion[str]:
         """
         Remove a host label
         """
         raise NotImplementedError()
 
-    def get_inventory(self, host_filter=None, refresh=False):
-        # type: (Optional[InventoryFilter], bool) -> Completion
+    def host_ok_to_stop(self, hostname: str) -> Completion:
+        """
+        Check if the specified host can be safely stopped without reducing availability
+
+        :param host: hostname
+        """
+        raise NotImplementedError()
+
+    def get_inventory(self, host_filter: Optional['InventoryFilter'] = None, refresh: bool = False) -> Completion[List['InventoryHost']]:
         """
         Returns something that was created by `ceph-volume inventory`.
 
@@ -830,8 +837,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def describe_service(self, service_type=None, service_name=None, refresh=False):
-        # type: (Optional[str], Optional[str], bool) -> Completion
+    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> Completion[List['ServiceDescription']]:
         """
         Describe a service (of any kind) that is already configured in
         the orchestrator.  For example, when viewing an OSD in the dashboard
@@ -845,8 +851,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None, refresh=False):
-        # type: (Optional[str], Optional[str], Optional[str], Optional[str], bool) -> Completion
+    def list_daemons(self, service_name: Optional[str] = None, daemon_type: Optional[str] = None, daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> Completion[List['DaemonDescription']]:
         """
         Describe a daemon (of any kind) that is already configured in
         the orchestrator.
@@ -855,7 +860,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply(self, specs: List["GenericSpec"]) -> Completion:
+    def apply(self, specs: List["GenericSpec"]) -> Completion[List[str]]:
         """
         Applies any spec
         """
@@ -892,8 +897,13 @@ class Orchestrator(object):
             completion = completion.then(next)
         return completion
 
-    def remove_daemons(self, names):
-        # type: (List[str]) -> Completion
+    def plan(self, spec: List["GenericSpec"]) -> Completion[List]:
+        """
+        Plan (Dry-run, Preview) a List of Specs.
+        """
+        raise NotImplementedError()
+
+    def remove_daemons(self, names: List[str]) -> Completion[List[str]]:
         """
         Remove specific daemon(s).
 
@@ -901,8 +911,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_service(self, service_name):
-        # type: (str) -> Completion
+    def remove_service(self, service_name: str) -> Completion[str]:
         """
         Remove a service (a collection of daemons).
 
@@ -910,34 +919,32 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def service_action(self, action, service_name):
-        # type: (str, str) -> Completion
+    def service_action(self, action: str, service_name: str) -> Completion[List[str]]:
         """
         Perform an action (start/stop/reload) on a service (i.e., all daemons
         providing the logical service).
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
-        :param service_type: e.g. "mds", "rgw", ...
-        :param service_name: name of logical service ("cephfs", "us-east", ...)
+        :param service_name: service_type + '.' + service_id
+                            (e.g. "mon", "mgr", "mds.mycephfs", "rgw.realm.zone", ...)
         :rtype: Completion
         """
-        #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+        # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def daemon_action(self, action, daemon_type, daemon_id):
-        # type: (str, str, str) -> Completion
+    def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> Completion[str]:
         """
         Perform an action (start/stop/reload) on a daemon.
 
         :param action: one of "start", "stop", "restart", "redeploy", "reconfig"
-        :param name: name of daemon
+        :param daemon_name: name of daemon
+        :param image: Container image when redeploying that daemon
         :rtype: Completion
         """
-        #assert action in ["start", "stop", "reload, "restart", "redeploy"]
+        # assert action in ["start", "stop", "reload, "restart", "redeploy"]
         raise NotImplementedError()
 
-    def create_osds(self, drive_group):
-        # type: (DriveGroupSpec) -> Completion
+    def create_osds(self, drive_group: DriveGroupSpec) -> Completion[str]:
         """
         Create one or more OSDs within a single Drive Group.
 
@@ -948,7 +955,7 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion:
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> Completion[List[str]]:
         """ Update OSD cluster """
         raise NotImplementedError()
 
@@ -962,13 +969,13 @@ class Orchestrator(object):
     def preview_osdspecs(self,
                          osdspec_name: Optional[str] = 'osd',
                          osdspecs: Optional[List[DriveGroupSpec]] = None
-                         ) -> Completion:
+                         ) -> Completion[str]:
         """ Get a preview for OSD deployments """
         raise NotImplementedError()
 
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False) -> Completion:
+                    force: bool = False) -> Completion[str]:
         """
         :param osd_ids: list of OSD IDs
         :param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
@@ -978,15 +985,19 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def remove_osds_status(self):
-        # type: () -> Completion
+    def stop_remove_osds(self, osd_ids: List[str]) -> Completion:
+        """
+        TODO
+        """
+        raise NotImplementedError()
+
+    def remove_osds_status(self) -> Completion:
         """
         Returns a status of the ongoing OSD removal operations.
         """
         raise NotImplementedError()
 
-    def blink_device_light(self, ident_fault, on, locations):
-        # type: (str, bool, List[DeviceLightLoc]) -> Completion
+    def blink_device_light(self, ident_fault: str, on: bool, locations: List['DeviceLightLoc']) -> Completion[List[str]]:
         """
         Instructs the orchestrator to enable or disable either the ident or the fault LED.
 
@@ -996,153 +1007,122 @@ class Orchestrator(object):
         """
         raise NotImplementedError()
 
-    def zap_device(self, host, path):
-        # type: (str, str) -> Completion
+    def zap_device(self, host: str, path: str) -> Completion[str]:
         """Zap/Erase a device (DESTROYS DATA)"""
         raise NotImplementedError()
 
-    def add_mon(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_mon(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create mon daemon(s)"""
         raise NotImplementedError()
 
-    def apply_mon(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_mon(self, spec: ServiceSpec) -> Completion[str]:
         """Update mon cluster"""
         raise NotImplementedError()
 
-    def add_mgr(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_mgr(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create mgr daemon(s)"""
         raise NotImplementedError()
 
-    def apply_mgr(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_mgr(self, spec: ServiceSpec) -> Completion[str]:
         """Update mgr cluster"""
         raise NotImplementedError()
 
-    def add_mds(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_mds(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create MDS daemon(s)"""
         raise NotImplementedError()
 
-    def apply_mds(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_mds(self, spec: ServiceSpec) -> Completion[str]:
         """Update MDS cluster"""
         raise NotImplementedError()
 
-    def add_rgw(self, spec):
-        # type: (RGWSpec) -> Completion
+    def add_rgw(self, spec: RGWSpec) -> Completion[List[str]]:
         """Create RGW daemon(s)"""
         raise NotImplementedError()
 
-    def apply_rgw(self, spec):
-        # type: (RGWSpec) -> Completion
+    def apply_rgw(self, spec: RGWSpec) -> Completion[str]:
         """Update RGW cluster"""
         raise NotImplementedError()
 
-    def add_rbd_mirror(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_rbd_mirror(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create rbd-mirror daemon(s)"""
         raise NotImplementedError()
 
-    def apply_rbd_mirror(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_rbd_mirror(self, spec: ServiceSpec) -> Completion[str]:
         """Update rbd-mirror cluster"""
         raise NotImplementedError()
 
-    def add_nfs(self, spec):
-        # type: (NFSServiceSpec) -> Completion
+    def add_nfs(self, spec: NFSServiceSpec) -> Completion[List[str]]:
         """Create NFS daemon(s)"""
         raise NotImplementedError()
 
-    def apply_nfs(self, spec):
-        # type: (NFSServiceSpec) -> Completion
+    def apply_nfs(self, spec: NFSServiceSpec) -> Completion[str]:
         """Update NFS cluster"""
         raise NotImplementedError()
 
-    def add_iscsi(self, spec):
-        # type: (IscsiServiceSpec) -> Completion
+    def add_iscsi(self, spec: IscsiServiceSpec) -> Completion[List[str]]:
         """Create iscsi daemon(s)"""
         raise NotImplementedError()
 
-    def apply_iscsi(self, spec):
-        # type: (IscsiServiceSpec) -> Completion
+    def apply_iscsi(self, spec: IscsiServiceSpec) -> Completion[str]:
         """Update iscsi cluster"""
         raise NotImplementedError()
 
-    def add_prometheus(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_prometheus(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create new prometheus daemon"""
         raise NotImplementedError()
 
-    def apply_prometheus(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_prometheus(self, spec: ServiceSpec) -> Completion[str]:
         """Update prometheus cluster"""
         raise NotImplementedError()
 
-    def add_node_exporter(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_node_exporter(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create a new Node-Exporter service"""
         raise NotImplementedError()
 
-    def apply_node_exporter(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_node_exporter(self, spec: ServiceSpec) -> Completion[str]:
         """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
-    def add_crash(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_crash(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create a new crash service"""
         raise NotImplementedError()
 
-    def apply_crash(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_crash(self, spec: ServiceSpec) -> Completion[str]:
         """Update existing a crash daemon(s)"""
         raise NotImplementedError()
 
-    def add_grafana(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_grafana(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create a new Node-Exporter service"""
         raise NotImplementedError()
 
-    def apply_grafana(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_grafana(self, spec: ServiceSpec) -> Completion[str]:
         """Update existing a Node-Exporter daemon(s)"""
         raise NotImplementedError()
 
-    def add_alertmanager(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def add_alertmanager(self, spec: ServiceSpec) -> Completion[List[str]]:
         """Create a new AlertManager service"""
         raise NotImplementedError()
 
-    def apply_alertmanager(self, spec):
-        # type: (ServiceSpec) -> Completion
+    def apply_alertmanager(self, spec: ServiceSpec) -> Completion[str]:
         """Update an existing AlertManager daemon(s)"""
         raise NotImplementedError()
 
-    def upgrade_check(self, image, version):
-        # type: (Optional[str], Optional[str]) -> Completion
+    def upgrade_check(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
         raise NotImplementedError()
 
-    def upgrade_start(self, image, version):
-        # type: (Optional[str], Optional[str]) -> Completion
+    def upgrade_start(self, image: Optional[str], version: Optional[str]) -> Completion[str]:
         raise NotImplementedError()
 
-    def upgrade_pause(self):
-        # type: () -> Completion
+    def upgrade_pause(self) -> Completion[str]:
         raise NotImplementedError()
 
-    def upgrade_resume(self):
-        # type: () -> Completion
+    def upgrade_resume(self) -> Completion[str]:
         raise NotImplementedError()
 
-    def upgrade_stop(self):
-        # type: () -> Completion
+    def upgrade_stop(self) -> Completion[str]:
         raise NotImplementedError()
 
-    def upgrade_status(self):
-        # type: () -> Completion
+    def upgrade_status(self) -> Completion['UpgradeStatusSpec']:
         """
         If an upgrade is currently underway, report on where
         we are in the process, or if some error has occurred.
@@ -1152,8 +1132,7 @@ class Orchestrator(object):
         raise NotImplementedError()
 
     @_hide_in_features
-    def upgrade_available(self):
-        # type: () -> Completion
+    def upgrade_available(self) -> Completion:
         """
         Report on what versions are available to upgrade to
 
@@ -1162,71 +1141,16 @@ class Orchestrator(object):
         raise NotImplementedError()
 
 
-class HostSpec(object):
-    """
-    Information about hosts. Like e.g. ``kubectl get nodes``
-    """
-    def __init__(self,
-                 hostname,  # type: str
-                 addr=None,  # type: Optional[str]
-                 labels=None,  # type: Optional[List[str]]
-                 status=None,  # type: Optional[str]
-                 ):
-        self.service_type = 'host'
-
-        #: the bare hostname on the host. Not the FQDN.
-        self.hostname = hostname  # type: str
-
-        #: DNS name or IP address to reach it
-        self.addr = addr or hostname  # type: str
-
-        #: label(s), if any
-        self.labels = labels or []  # type: List[str]
-
-        #: human readable status
-        self.status = status or ''  # type: str
-
-    def to_json(self):
-        return {
-            'hostname': self.hostname,
-            'addr': self.addr,
-            'labels': self.labels,
-            'status': self.status,
-        }
-
-    @classmethod
-    def from_json(cls, host_spec):
-        _cls = cls(host_spec['hostname'],
-                   host_spec['addr'] if 'addr' in host_spec else None,
-                   host_spec['labels'] if 'labels' in host_spec else None)
-        return _cls
-
-    def __repr__(self):
-        args = [self.hostname]  # type: List[Any]
-        if self.addr is not None:
-            args.append(self.addr)
-        if self.labels:
-            args.append(self.labels)
-        if self.status:
-            args.append(self.status)
-
-        return "<HostSpec>({})".format(', '.join(map(repr, args)))
-
-    def __eq__(self, other):
-        # Let's omit `status` for the moment, as it is still the very same host.
-        return self.hostname == other.hostname and \
-               self.addr == other.addr and \
-               self.labels == other.labels
-
 GenericSpec = Union[ServiceSpec, HostSpec]
 
-def json_to_generic_spec(spec):
-    # type: (dict) -> GenericSpec
+
+def json_to_generic_spec(spec: dict) -> GenericSpec:
     if 'service_type' in spec and spec['service_type'] == 'host':
         return HostSpec.from_json(spec)
     else:
         return ServiceSpec.from_json(spec)
 
+
 class UpgradeStatusSpec(object):
     # Orchestrator's report on what's going on with any ongoing upgrade
     def __init__(self):
@@ -1275,9 +1199,12 @@ class DaemonDescription(object):
                  started=None,
                  last_configured=None,
                  osdspec_affinity=None,
-                 last_deployed=None):
+                 last_deployed=None,
+                 events: Optional[List['OrchestratorEvent']] = None,
+                 is_active: bool=False):
+
         # Host is at the same granularity as InventoryHost
-        self.hostname = hostname
+        self.hostname: str = hostname
 
         # Not everyone runs in containers, but enough people do to
         # justify having the container_id (runtime id) and container_image
@@ -1293,7 +1220,7 @@ class DaemonDescription(object):
         # typically either based on hostnames or on pod names.
         # This is the <foo> in mds.<foo>, the ID that will appear
         # in the FSMap/ServiceMap.
-        self.daemon_id = daemon_id
+        self.daemon_id: str = daemon_id
 
         # Service version that was deployed
         self.version = version
@@ -1305,29 +1232,35 @@ class DaemonDescription(object):
         self.status_desc = status_desc
 
         # datetime when this info was last refreshed
-        self.last_refresh = last_refresh  # type: Optional[datetime.datetime]
+        self.last_refresh: Optional[datetime.datetime] = last_refresh
 
-        self.created = created    # type: Optional[datetime.datetime]
-        self.started = started    # type: Optional[datetime.datetime]
-        self.last_configured = last_configured # type: Optional[datetime.datetime]
-        self.last_deployed = last_deployed    # type: Optional[datetime.datetime]
+        self.created: Optional[datetime.datetime] = created
+        self.started: Optional[datetime.datetime] = started
+        self.last_configured: Optional[datetime.datetime] = last_configured
+        self.last_deployed: Optional[datetime.datetime] = last_deployed
 
         # Affinity to a certain OSDSpec
-        self.osdspec_affinity = osdspec_affinity  # type: Optional[str]
+        self.osdspec_affinity: Optional[str] = osdspec_affinity
+
+        self.events: List[OrchestratorEvent] = events or []
+        
+        self.is_active = is_active
 
     def name(self):
         return '%s.%s' % (self.daemon_type, self.daemon_id)
 
-    def matches_service(self, service_name):
-        # type: (Optional[str]) -> bool
+    def matches_service(self, service_name: Optional[str]) -> bool:
         if service_name:
             return self.name().startswith(service_name + '.')
         return False
 
     def service_id(self):
+        if self.daemon_type == 'osd' and self.osdspec_affinity:
+            return self.osdspec_affinity
+
         def _match():
-            err = OrchestratorError("DaemonDescription: Cannot calculate service_id: " \
-                    f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
+            err = OrchestratorError("DaemonDescription: Cannot calculate service_id: "
+                                    f"daemon_id='{self.daemon_id}' hostname='{self.hostname}'")
 
             if not self.hostname:
                 # TODO: can a DaemonDescription exist without a hostname?
@@ -1361,13 +1294,13 @@ class DaemonDescription(object):
             # daemon_id == "service_id"
             return self.daemon_id
 
-        if self.daemon_type in ['mds', 'nfs', 'iscsi', 'rgw']:
+        if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
             return _match()
 
         return self.daemon_id
 
     def service_name(self):
-        if self.daemon_type in ['rgw', 'mds', 'nfs', 'iscsi']:
+        if self.daemon_type in ServiceSpec.REQUIRES_SERVICE_ID:
             return f'{self.daemon_type}.{self.service_id()}'
         return self.daemon_type
 
@@ -1376,37 +1309,57 @@ class DaemonDescription(object):
                                                          id=self.daemon_id)
 
     def to_json(self):
-        out = {
-            'hostname': self.hostname,
-            'container_id': self.container_id,
-            'container_image_id': self.container_image_id,
-            'container_image_name': self.container_image_name,
-            'daemon_id': self.daemon_id,
-            'daemon_type': self.daemon_type,
-            'version': self.version,
-            'status': self.status,
-            'status_desc': self.status_desc,
-        }
+        out = OrderedDict()
+        out['daemon_type'] = self.daemon_type
+        out['daemon_id'] = self.daemon_id
+        out['hostname'] = self.hostname
+        out['container_id'] = self.container_id
+        out['container_image_id'] = self.container_image_id
+        out['container_image_name'] = self.container_image_name
+        out['version'] = self.version
+        out['status'] = self.status
+        out['status_desc'] = self.status_desc
+        if self.daemon_type == 'osd':
+            out['osdspec_affinity'] = self.osdspec_affinity
+        out['is_active'] = self.is_active
+
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if getattr(self, k):
                 out[k] = getattr(self, k).strftime(DATEFMT)
-        return {k: v for (k, v) in out.items() if v is not None}
+
+        if self.events:
+            out['events'] = [e.to_json() for e in self.events]
+
+        empty = [k for k, v in out.items() if v is None]
+        for e in empty:
+            del out[e]
+        return out
 
     @classmethod
     @handle_type_error
     def from_json(cls, data):
         c = data.copy()
+        event_strs = c.pop('events', [])
         for k in ['last_refresh', 'created', 'started', 'last_deployed',
                   'last_configured']:
             if k in c:
                 c[k] = datetime.datetime.strptime(c[k], DATEFMT)
-        return cls(**c)
+        events = [OrchestratorEvent.from_json(e) for e in event_strs]
+        return cls(events=events, **c)
 
     def __copy__(self):
         # feel free to change this:
         return DaemonDescription.from_json(self.to_json())
 
+    @staticmethod
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
+        return dumper.represent_dict(data.to_json().items())
+
+
+yaml.add_representer(DaemonDescription, DaemonDescription.yaml_representer)
+
+
 class ServiceDescription(object):
     """
     For responding to queries about the status of a particular service,
@@ -1429,7 +1382,8 @@ class ServiceDescription(object):
                  last_refresh=None,
                  created=None,
                  size=0,
-                 running=0):
+                 running=0,
+                 events: Optional[List['OrchestratorEvent']] = None):
         # Not everyone runs in containers, but enough people do to
         # justify having the container_image_id (image hash) and container_image
         # (image name)
@@ -1451,18 +1405,20 @@ class ServiceDescription(object):
         self.running = running
 
         # datetime when this info was last refreshed
-        self.last_refresh = last_refresh   # type: Optional[datetime.datetime]
-        self.created = created   # type: Optional[datetime.datetime]
+        self.last_refresh: Optional[datetime.datetime] = last_refresh
+        self.created: Optional[datetime.datetime] = created
 
         self.spec: ServiceSpec = spec
 
+        self.events: List[OrchestratorEvent] = events or []
+
     def service_type(self):
         return self.spec.service_type
 
     def __repr__(self):
         return f"<ServiceDescription of {self.spec.one_line_str()}>"
 
-    def to_json(self):
+    def to_json(self) -> OrderedDict:
         out = self.spec.to_json()
         status = {
             'container_image_id': self.container_image_id,
@@ -1472,13 +1428,15 @@ class ServiceDescription(object):
             'size': self.size,
             'running': self.running,
             'last_refresh': self.last_refresh,
-            'created': self.created
+            'created': self.created,
         }
         for k in ['last_refresh', 'created']:
             if getattr(self, k):
                 status[k] = getattr(self, k).strftime(DATEFMT)
         status = {k: v for (k, v) in status.items() if v is not None}
         out['status'] = status
+        if self.events:
+            out['events'] = [e.to_json() for e in self.events]
         return out
 
     @classmethod
@@ -1486,13 +1444,22 @@ class ServiceDescription(object):
     def from_json(cls, data: dict):
         c = data.copy()
         status = c.pop('status', {})
+        event_strs = c.pop('events', [])
         spec = ServiceSpec.from_json(c)
 
         c_status = status.copy()
         for k in ['last_refresh', 'created']:
             if k in c_status:
                 c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
-        return cls(spec=spec, **c_status)
+        events = [OrchestratorEvent.from_json(e) for e in event_strs]
+        return cls(spec=spec, events=events, **c_status)
+
+    @staticmethod
+    def yaml_representer(dumper: 'yaml.SafeDumper', data: 'DaemonDescription'):
+        return dumper.represent_dict(data.to_json().items())
+
+
+yaml.add_representer(ServiceDescription, ServiceDescription.yaml_representer)
 
 
 class InventoryFilter(object):
@@ -1508,8 +1475,8 @@ class InventoryFilter(object):
                  in e.g. OSD servers.
 
     """
-    def __init__(self, labels=None, hosts=None):
-        # type: (Optional[List[str]], Optional[List[str]]) -> None
+
+    def __init__(self, labels: Optional[List[str]] = None, hosts: Optional[List[str]] = None) -> None:
 
         #: Optional: get info about hosts matching labels
         self.labels = labels
@@ -1523,8 +1490,8 @@ class InventoryHost(object):
     When fetching inventory, all Devices are groups inside of an
     InventoryHost.
     """
-    def __init__(self, name, devices=None, labels=None, addr=None):
-        # type: (str, Optional[inventory.Devices], Optional[List[str]], Optional[str]) -> None
+
+    def __init__(self, name: str, devices: Optional[inventory.Devices] = None, labels: Optional[List[str]] = None, addr: Optional[str] = None) -> None:
         if devices is None:
             devices = inventory.Devices([])
         if labels is None:
@@ -1562,7 +1529,6 @@ class InventoryHost(object):
         except TypeError as e:
             raise OrchestratorValidationError('Failed to read inventory: {}'.format(e))
 
-
     @classmethod
     def from_nested_items(cls, hosts):
         devs = inventory.Devices.from_json
@@ -1572,8 +1538,7 @@ class InventoryHost(object):
         return "<InventoryHost>({name})".format(name=self.name)
 
     @staticmethod
-    def get_host_names(hosts):
-        # type: (List[InventoryHost]) -> List[str]
+    def get_host_names(hosts: List['InventoryHost']) -> List[str]:
         return [host.name for host in hosts]
 
     def __eq__(self, other):
@@ -1593,6 +1558,66 @@ class DeviceLightLoc(namedtuple('DeviceLightLoc', ['host', 'dev', 'path'])):
     __slots__ = ()
 
 
+class OrchestratorEvent:
+    """
+    Similar to K8s Events.
+
+    Some form of "important" log message attached to something.
+    """
+    INFO = 'INFO'
+    ERROR = 'ERROR'
+    regex_v1 = re.compile(r'^([^ ]+) ([^:]+):([^ ]+) \[([^\]]+)\] "((?:.|\n)*)"$', re.MULTILINE)
+
+    def __init__(self, created: Union[str, datetime.datetime], kind, subject, level, message):
+        if isinstance(created, str):
+            created = datetime.datetime.strptime(created, DATEFMT)
+        self.created: datetime.datetime = created
+
+        assert kind in "service daemon".split()
+        self.kind: str = kind
+
+        # service name, or daemon danem or something
+        self.subject: str = subject
+
+        # Events are not meant for debugging. debugs should end in the log.
+        assert level in "INFO ERROR".split()
+        self.level = level
+
+        self.message: str = message
+
+    __slots__ = ('created', 'kind', 'subject', 'level', 'message')
+
+    def kind_subject(self) -> str:
+        return f'{self.kind}:{self.subject}'
+
+    def to_json(self) -> str:
+        # Make a long list of events readable.
+        created = self.created.strftime(DATEFMT)
+        return f'{created} {self.kind_subject()} [{self.level}] "{self.message}"'
+
+    @classmethod
+    @handle_type_error
+    def from_json(cls, data) -> "OrchestratorEvent":
+        """
+        >>> OrchestratorEvent.from_json('''2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host 'ubuntu'"''').to_json()
+        '2020-06-10T10:20:25.691255 daemon:crash.ubuntu [INFO] "Deployed crash.ubuntu on host \\'ubuntu\\'"'
+
+        :param data:
+        :return:
+        """
+        match = cls.regex_v1.match(data)
+        if match:
+            return cls(*match.groups())
+        raise ValueError(f'Unable to match: "{data}"')
+
+    def __eq__(self, other):
+        if not isinstance(other, OrchestratorEvent):
+            return False
+
+        return self.created == other.created and self.kind == other.kind \
+            and self.subject == other.subject and self.message == other.message
+
+
 def _mk_orch_methods(cls):
     # Needs to be defined outside of for.
     # Otherwise meth is always bound to last key
@@ -1636,8 +1661,7 @@ class OrchestratorClientMixin(Orchestrator):
     ...         self.orch_client.set_mgr(self.mgr))
     """
 
-    def set_mgr(self, mgr):
-        # type: (MgrModule) -> None
+    def set_mgr(self, mgr: MgrModule) -> None:
         """
         Useable in the Dashbord that uses a global ``mgr``
         """
@@ -1679,8 +1703,7 @@ class OrchestratorClientMixin(Orchestrator):
                 raise NotImplementedError(f'{o} does not implement {meth}') from e
             raise
 
-    def _orchestrator_wait(self, completions):
-        # type: (List[Completion]) -> None
+    def _orchestrator_wait(self, completions: List[Completion]) -> None:
         """
         Wait for completions to complete (reads) or
         become persistent (writes).