]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
index de9673aa039320f7540a3117a78b999721349969..4de556f9c6cb90813e59f91eca3c2aac2e7d6df9 100644 (file)
@@ -2,13 +2,14 @@ import json
 import errno
 import logging
 from collections import defaultdict
+from contextlib import contextmanager
 from functools import wraps
 from tempfile import TemporaryDirectory
 from threading import Event
 
 import string
 from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
-    Any, Set, TYPE_CHECKING, cast
+    Any, Set, TYPE_CHECKING, cast, Iterator, Union
 
 import datetime
 import six
@@ -22,27 +23,30 @@ import subprocess
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.service_spec import \
-    NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
+    NFSServiceSpec, RGWSpec, ServiceSpec, PlacementSpec, assert_valid_host
+from cephadm.services.cephadmservice import CephadmDaemonSpec
 
 from mgr_module import MgrModule, HandleCommandResult
 import orchestrator
 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
-    CLICommandMeta
+    CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
 from orchestrator._interface import GenericSpec
 
 from . import remotes
 from . import utils
+from .migrations import Migrations
 from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
     RbdMirrorService, CrashService, CephadmService
 from .services.iscsi import IscsiService
 from .services.nfs import NFSService
-from .services.osd import RemoveUtil, OSDRemoval, OSDService
+from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError
 from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
     NodeExporterService
-from .schedule import HostAssignment
-from .inventory import Inventory, SpecStore, HostCache
+from .schedule import HostAssignment, HostPlacementSpec
+from .inventory import Inventory, SpecStore, HostCache, EventStore
 from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
 from .template import TemplateMgr
+from .utils import forall_hosts, CephadmNoImage, cephadmNoImage
 
 try:
     import remoto
@@ -84,42 +88,11 @@ CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
 
 
-def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
-    @wraps(f)
-    def forall_hosts_wrapper(*args) -> List[T]:
-
-        # Some weired logic to make calling functions with multiple arguments work.
-        if len(args) == 1:
-            vals = args[0]
-            self = None
-        elif len(args) == 2:
-            self, vals = args
-        else:
-            assert 'either f([...]) or self.f([...])'
-
-        def do_work(arg):
-            if not isinstance(arg, tuple):
-                arg = (arg, )
-            try:
-                if self:
-                    return f(self, *arg)
-                return f(*arg)
-            except Exception as e:
-                logger.exception(f'executing {f.__name__}({args}) failed.')
-                raise
-
-        assert CephadmOrchestrator.instance is not None
-        return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
-
-
-    return forall_hosts_wrapper
-
-
-class CephadmCompletion(orchestrator.Completion):
+class CephadmCompletion(orchestrator.Completion[T]):
     def evaluate(self):
         self.finalize(None)
 
-def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
+def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]:
     """
     Decorator to make CephadmCompletion methods return
     a completion object that executes themselves.
@@ -139,7 +112,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
     instance = None
     NATIVE_OPTIONS = []  # type: List[Any]
-    MODULE_OPTIONS = [
+    MODULE_OPTIONS: List[dict] = [
         {
             'name': 'ssh_config_file',
             'type': 'str',
@@ -184,7 +157,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         },
         {
             'name': 'container_image_grafana',
-            'default': 'ceph/ceph-grafana:latest',
+            'default': 'ceph/ceph-grafana:6.6.2',
             'desc': 'Prometheus container image',
         },
         {
@@ -239,11 +212,49 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             'default': '/etc/prometheus/ceph/ceph_default_alerts.yml',
             'desc': 'location of alerts to include in prometheus deployments',
         },
+        {
+            'name': 'migration_current',
+            'type': 'int',
+            'default': None,
+            'desc': 'internal - do not modify',
+            # used to track track spec and other data migrations.
+        },
+        {
+            'name': 'config_dashboard',
+            'type': 'bool',
+            'default': True,
+            'desc': 'manage configs like API endpoints in Dashboard.'
+        },
+        {
+            'name': 'manage_etc_ceph_ceph_conf',
+            'type': 'bool',
+            'default': False,
+            'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.',
+        },
+        {
+            'name': 'registry_url',
+            'type': 'str',
+            'default': None,
+            'desc': 'Custom repository url'
+        },
+        {
+            'name': 'registry_username',
+            'type': 'str',
+            'default': None,
+            'desc': 'Custom repository username'
+        },
+        {
+            'name': 'registry_password',
+            'type': 'str',
+            'default': None,
+            'desc': 'Custom repository password'
+        },
     ]
 
     def __init__(self, *args, **kwargs):
         super(CephadmOrchestrator, self).__init__(*args, **kwargs)
         self._cluster_fsid = self.get('mon_map')['fsid']
+        self.last_monmap: Optional[datetime.datetime] = None
 
         # for serve()
         self.run = True
@@ -271,9 +282,17 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self.warn_on_failed_host_check = True
             self.allow_ptrace = False
             self.prometheus_alerts_path = ''
+            self.migration_current = None
+            self.config_dashboard = True
+            self.manage_etc_ceph_ceph_conf = True
+            self.registry_url: Optional[str] = None
+            self.registry_username: Optional[str] = None
+            self.registry_password: Optional[str] = None
 
         self._cons = {}  # type: Dict[str, Tuple[remoto.backends.BaseConnection,remoto.backends.LegacyModuleExecute]]
 
+
+        self.notify('mon_map', None)
         self.config_notify()
 
         path = self.get_ceph_option('cephadm_path')
@@ -300,7 +319,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         self.cache = HostCache(self)
         self.cache.load()
+
         self.rm_util = RemoveUtil(self)
+        self.to_remove_osds = OSDQueue()
+        self.rm_util.load_from_store()
 
         self.spec_store = SpecStore(self)
         self.spec_store.load()
@@ -313,9 +335,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             if h not in self.inventory:
                 self.cache.rm_host(h)
 
+
         # in-memory only.
+        self.events = EventStore(self)
         self.offline_hosts: Set[str] = set()
 
+        self.migration = Migrations(self)
+
         # services:
         self.osd_service = OSDService(self)
         self.nfs_service = NFSService(self)
@@ -348,6 +374,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         self.template = TemplateMgr()
 
+        self.requires_post_actions = set()
+
     def shutdown(self):
         self.log.debug('shutdown')
         self._worker_pool.close()
@@ -363,27 +391,19 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.debug('_kick_serve_loop')
         self.event.set()
 
-    def _check_safe_to_destroy_mon(self, mon_id):
-        # type: (str) -> None
-        ret, out, err = self.check_mon_command({
-            'prefix': 'quorum_status',
-        })
-        try:
-            j = json.loads(out)
-        except Exception as e:
-            raise OrchestratorError('failed to parse quorum status')
+    # function responsible for logging single host into custom registry
+    def _registry_login(self, host, url, username, password):
+        self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
+        # want to pass info over stdin rather than through normal list of args
+        args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", "
+                    " \"password\": \"" + password + "\"}")
+        out, err, code = self._run_cephadm(
+            host, 'mon', 'registry-login',
+            ['--registry-json', '-'], stdin=args_str, error_ok=True)
+        if code:
+            return f"Host {host} failed to login to {url} as {username} with given password"
+        return
 
-        mons = [m['name'] for m in j['monmap']['mons']]
-        if mon_id not in mons:
-            self.log.info('Safe to remove mon.%s: not in monmap (%s)' % (
-                mon_id, mons))
-            return
-        new_mons = [m for m in mons if m != mon_id]
-        new_quorum = [m for m in j['quorum_names'] if m != mon_id]
-        if len(new_quorum) > len(new_mons) / 2:
-            self.log.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % (mon_id, new_quorum, new_mons))
-            return
-        raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
 
     def _check_host(self, host):
         if host not in self.inventory:
@@ -391,7 +411,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.debug(' checking %s' % host)
         try:
             out, err, code = self._run_cephadm(
-                host, 'client', 'check-host', [],
+                host, cephadmNoImage, 'check-host', [],
                 error_ok=True, no_fsid=True)
             self.cache.update_last_host_check(host)
             self.cache.save_host(host)
@@ -458,94 +478,67 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         ret = self.event.wait(sleep_interval)
         self.event.clear()
 
-    def serve(self):
-        # type: () -> None
+    def serve(self) -> None:
+        """
+        The main loop of cephadm.
+
+        A command handler will typically change the declarative state
+        of cephadm. This loop will then attempt to apply this new state.
+        """
         self.log.debug("serve starting")
         while self.run:
 
-            # refresh daemons
-            self.log.debug('refreshing hosts')
-            bad_hosts = []
-            failures = []
-            for host in self.cache.get_hosts():
-                if self.cache.host_needs_check(host):
-                    r = self._check_host(host)
-                    if r is not None:
-                        bad_hosts.append(r)
-                if self.cache.host_needs_daemon_refresh(host):
-                    self.log.debug('refreshing %s daemons' % host)
-                    r = self._refresh_host_daemons(host)
-                    if r:
-                        failures.append(r)
-                if self.cache.host_needs_device_refresh(host):
-                    self.log.debug('refreshing %s devices' % host)
-                    r = self._refresh_host_devices(host)
-                    if r:
-                        failures.append(r)
-
-                if self.cache.host_needs_osdspec_preview_refresh(host):
-                    self.log.debug(f"refreshing OSDSpec previews for {host}")
-                    r = self._refresh_host_osdspec_previews(host)
-                    if r:
-                        failures.append(r)
-
-            health_changed = False
-            if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
-                del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
-                health_changed = True
-            if bad_hosts:
-                self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
-                    'severity': 'warning',
-                    'summary': '%d hosts fail cephadm check' % len(bad_hosts),
-                    'count': len(bad_hosts),
-                    'detail': bad_hosts,
-                }
-                health_changed = True
-            if failures:
-                self.health_checks['CEPHADM_REFRESH_FAILED'] = {
-                    'severity': 'warning',
-                    'summary': 'failed to probe daemons or devices',
-                    'count': len(failures),
-                    'detail': failures,
-                }
-                health_changed = True
-            elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
-                del self.health_checks['CEPHADM_REFRESH_FAILED']
-                health_changed = True
-            if health_changed:
-                self.set_health_checks(self.health_checks)
+            try:
 
-            self._check_for_strays()
+                # refresh daemons
+                self.log.debug('refreshing hosts and daemons')
+                self._refresh_hosts_and_daemons()
 
-            if self.paused:
-                self.health_checks['CEPHADM_PAUSED'] = {
-                    'severity': 'warning',
-                    'summary': 'cephadm background work is paused',
-                    'count': 1,
-                    'detail': ["'ceph orch resume' to resume"],
-                }
-                self.set_health_checks(self.health_checks)
-            else:
-                if 'CEPHADM_PAUSED' in self.health_checks:
-                    del self.health_checks['CEPHADM_PAUSED']
-                    self.set_health_checks(self.health_checks)
+                self._check_for_strays()
 
-                self.rm_util._remove_osds_bg()
+                self._update_paused_health()
 
-                if self._apply_all_services():
-                    continue  # did something, refresh
+                if not self.paused:
+                    self.rm_util.process_removal_queue()
 
-                self._check_daemons()
+                    self.migration.migrate()
+                    if self.migration.is_migration_ongoing():
+                        continue
 
-                if self.upgrade.continue_upgrade():
-                    continue
+                    if self._apply_all_services():
+                        continue  # did something, refresh
+
+                    self._check_daemons()
+
+                    if self.upgrade.continue_upgrade():
+                        continue
+
+            except OrchestratorError as e:
+                if e.event_subject:
+                    self.events.from_orch_error(e)
 
             self._serve_sleep()
         self.log.debug("serve exit")
 
+    def _update_paused_health(self):
+        if self.paused:
+            self.health_checks['CEPHADM_PAUSED'] = {
+                'severity': 'warning',
+                'summary': 'cephadm background work is paused',
+                'count': 1,
+                'detail': ["'ceph orch resume' to resume"],
+            }
+            self.set_health_checks(self.health_checks)
+        else:
+            if 'CEPHADM_PAUSED' in self.health_checks:
+                del self.health_checks['CEPHADM_PAUSED']
+                self.set_health_checks(self.health_checks)
+
     def config_notify(self):
         """
         This method is called whenever one of our config options is changed.
+
+        TODO: this method should be moved into mgr_module.py
         """
         for opt in self.MODULE_OPTIONS:
             setattr(self,
@@ -562,7 +555,27 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.event.set()
 
     def notify(self, notify_type, notify_id):
-        pass
+        if notify_type == "mon_map":
+            # get monmap mtime so we can refresh configs when mons change
+            monmap = self.get('mon_map')
+            self.last_monmap = datetime.datetime.strptime(
+                monmap['modified'], CEPH_DATEFMT)
+            if self.last_monmap and self.last_monmap > datetime.datetime.utcnow():
+                self.last_monmap = None  # just in case clocks are skewed
+        if notify_type == "pg_summary":
+            self._trigger_osd_removal()
+
+    def _trigger_osd_removal(self):
+        data = self.get("osd_stats")
+        for osd in data.get('osd_stats', []):
+            if osd.get('num_pgs') == 0:
+                # if _ANY_ osd that is currently in the queue appears to be empty,
+                # start the removal process
+                if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids():
+                    self.log.debug(f"Found empty osd. Starting removal process")
+                    # if the osd that is now empty is also part of the removal queue
+                    # start the process
+                    self.rm_util.process_removal_queue()
 
     def pause(self):
         if not self.paused:
@@ -593,7 +606,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         ]
         if forcename:
             if len([d for d in existing if d.daemon_id == forcename]):
-                raise orchestrator.OrchestratorValidationError('name %s already in use', forcename)
+                raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{forcename} already in use')
             return forcename
 
         if '.' in host:
@@ -609,7 +622,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                                       for _ in range(6))
             if len([d for d in existing if d.daemon_id == name]):
                 if not suffix:
-                    raise orchestrator.OrchestratorValidationError('name %s already in use', name)
+                    raise orchestrator.OrchestratorValidationError(f'name {daemon_type}.{name} already in use')
                 self.log.debug('name %s exists, trying again', name)
                 continue
             return name
@@ -633,6 +646,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         if ssh_config_fname:
             self.validate_ssh_config_fname(ssh_config_fname)
             ssh_options += ['-F', ssh_config_fname]
+        self.ssh_config = ssh_config
 
         # identity
         ssh_key = self.get_store("ssh_identity_key")
@@ -658,7 +672,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self._ssh_options = None
 
         if self.mode == 'root':
-            self.ssh_user = 'root'
+            self.ssh_user = self.get_store('ssh_user', default='root')
         elif self.mode == 'cephadm-package':
             self.ssh_user = 'cephadm'
 
@@ -700,7 +714,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         """
         The cephadm orchestrator is always available.
         """
-        return self.can_run()
+        ok, err = self.can_run()
+        if not ok:
+            return ok, err
+        if not self.ssh_key or not self.ssh_pub:
+            return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`'
+        return True, ''
 
     def process(self, completions):
         """
@@ -724,8 +743,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         """
         if inbuf is None or len(inbuf) == 0:
             return -errno.EINVAL, "", "empty ssh config provided"
+        if inbuf == self.ssh_config:
+            return 0, "value unchanged", ""
         self.set_store("ssh_config", inbuf)
         self.log.info('Set ssh_config')
+        self._reconfig_ssh()
         return 0, "", ""
 
     @orchestrator._cli_write_command(
@@ -738,6 +760,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.set_store("ssh_config", None)
         self.ssh_config_tmp = None
         self.log.info('Cleared ssh_config')
+        self._reconfig_ssh()
         return 0, "", ""
 
     @orchestrator._cli_read_command(
@@ -789,6 +812,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def _set_priv_key(self, inbuf=None):
         if inbuf is None or len(inbuf) == 0:
             return -errno.EINVAL, "", "empty private ssh key provided"
+        if inbuf == self.ssh_key:
+            return 0, "value unchanged", ""
         self.set_store("ssh_identity_key", inbuf)
         self.log.info('Set ssh private key')
         self._reconfig_ssh()
@@ -800,6 +825,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def _set_pub_key(self, inbuf=None):
         if inbuf is None or len(inbuf) == 0:
             return -errno.EINVAL, "", "empty public ssh key provided"
+        if inbuf == self.ssh_pub:
+            return 0, "value unchanged", ""
         self.set_store("ssh_identity_pub", inbuf)
         self.log.info('Set ssh public key')
         self._reconfig_ssh()
@@ -830,18 +857,93 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def _get_user(self):
         return 0, self.ssh_user, ''
 
+    @orchestrator._cli_read_command(
+        'cephadm set-user',
+        'name=user,type=CephString',
+        'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
+    def set_ssh_user(self, user):
+        current_user = self.ssh_user
+        if user == current_user:
+            return 0, "value unchanged", ""
+
+        self.set_store('ssh_user', user)
+        self._reconfig_ssh()
+
+        host = self.cache.get_hosts()[0]
+        r = self._check_host(host)
+        if r is not None:
+            #connection failed reset user
+            self.set_store('ssh_user', current_user)
+            self._reconfig_ssh()
+            return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host)
+
+        msg = 'ssh user set to %s' % user
+        if user != 'root':
+            msg += ' sudo will be used'
+        self.log.info(msg)
+        return 0, msg, ''
+
+    @orchestrator._cli_read_command(
+        'cephadm registry-login',
+        "name=url,type=CephString,req=false "
+        "name=username,type=CephString,req=false "
+        "name=password,type=CephString,req=false",
+        'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)')
+    def registry_login(self, url=None, username=None, password=None, inbuf=None):
+        # if password not given in command line, get it through file input
+        if not (url and username and password) and (inbuf is None or len(inbuf) == 0):
+            return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> "
+                                        "or -i <login credentials json file>")
+        elif not (url and username and password):
+            login_info = json.loads(inbuf)
+            if "url" in login_info and "username" in login_info and "password" in login_info:
+                url = login_info["url"]
+                username = login_info["username"]
+                password = login_info["password"]
+            else:
+                return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. "
+                                "Please setup json file as\n"
+                                "{\n"
+                                  " \"url\": \"REGISTRY_URL\",\n"
+                                  " \"username\": \"REGISTRY_USERNAME\",\n"
+                                  " \"password\": \"REGISTRY_PASSWORD\"\n"
+                                "}\n")
+        # verify login info works by attempting login on random host
+        host = None
+        for host_name in self.inventory.keys():
+            host = host_name
+            break
+        if not host:
+            raise OrchestratorError('no hosts defined')
+        r = self._registry_login(host, url, username, password)
+        if r is not None:
+            return 1, '', r
+        # if logins succeeded, store info
+        self.log.debug("Host logins successful. Storing login info.")
+        self.set_module_option('registry_url', url)
+        self.set_module_option('registry_username', username)
+        self.set_module_option('registry_password', password)
+        # distribute new login info to all hosts
+        self.cache.distribute_new_registry_login_info()
+        return 0, "registry login scheduled", ''
+
     @orchestrator._cli_read_command(
         'cephadm check-host',
         'name=host,type=CephString '
         'name=addr,type=CephString,req=false',
         'Check whether we can access and manage a remote host')
     def check_host(self, host, addr=None):
-        out, err, code = self._run_cephadm(host, 'client', 'check-host',
-                                           ['--expect-hostname', host],
-                                           addr=addr,
-                                           error_ok=True, no_fsid=True)
-        if code:
-            return 1, '', ('check-host failed:\n' + '\n'.join(err))
+        try:
+            out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host',
+                                               ['--expect-hostname', host],
+                                               addr=addr,
+                                               error_ok=True, no_fsid=True)
+            if code:
+                return 1, '', ('check-host failed:\n' + '\n'.join(err))
+        except OrchestratorError as e:
+            self.log.exception(f"check-host failed for '{host}'")
+            return 1, '', ('check-host failed:\n' +
+                f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.")
         # if we have an outstanding health alert for this host, give the
         # serve thread a kick
         if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
@@ -856,7 +958,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         'name=addr,type=CephString,req=false',
         'Prepare a remote host for use with cephadm')
     def _prepare_host(self, host, addr=None):
-        out, err, code = self._run_cephadm(host, 'client', 'prepare-host',
+        out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host',
                                            ['--expect-hostname', host],
                                            addr=addr,
                                            error_ok=True, no_fsid=True)
@@ -870,7 +972,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     self.event.set()
         return 0, '%s (%s) ok' % (host, addr), err
 
-    def _get_connection(self, host):
+    def _get_connection(self, host: str):
         """
         Setup a connection for running commands on remote host.
         """
@@ -889,7 +991,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         conn = remoto.Connection(
             n,
             logger=child_logger,
-            ssh_options=self._ssh_options)
+            ssh_options=self._ssh_options,
+            sudo=True if self.ssh_user != 'root' else False)
 
         r = conn.import_module(remotes)
         self._cons[host] = conn, r
@@ -912,23 +1015,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             executable_path))
         return executable_path
 
-    def _run_cephadm(self,
-                     host: str,
-                     entity: Optional[str],
-                     command: str,
-                     args: List[str],
-                     addr: Optional[str] = None,
-                     stdin: Optional[str] = None,
-                     no_fsid=False,
-                     error_ok=False,
-                     image: Optional[str] = None,
-                     env_vars: Optional[List[str]] = None,
-                     ) -> Tuple[List[str], List[str], int]:
-        """
-        Run cephadm on the remote host with the given command + args
-
-        :env_vars: in format -> [KEY=VALUE, ..]
-        """
+    @contextmanager
+    def _remote_connection(self,
+                           host: str,
+                           addr: Optional[str]=None,
+                           ) -> Iterator[Tuple["BaseConnection", Any]]:
         if not addr and host in self.inventory:
             addr = self.inventory.get_addr(host)
 
@@ -936,36 +1027,85 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         try:
             try:
+                if not addr:
+                    raise OrchestratorError("host address is empty")
                 conn, connr = self._get_connection(addr)
             except OSError as e:
-                if error_ok:
-                    self.log.exception('failed to establish ssh connection')
-                    return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
-                raise execnet.gateway_bootstrap.HostNotFound(str(e)) from e
+                self._reset_con(host)
+                msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
+                raise execnet.gateway_bootstrap.HostNotFound(msg)
 
+            yield (conn, connr)
+
+        except execnet.gateway_bootstrap.HostNotFound as e:
+            # this is a misleading exception as it seems to be thrown for
+            # any sort of connection failure, even those having nothing to
+            # do with "host not found" (e.g., ssh key permission denied).
+            self.offline_hosts.add(host)
+            self._reset_con(host)
+
+            user = self.ssh_user if self.mode == 'root' else 'cephadm'
+            msg = f'''Failed to connect to {host} ({addr}).
+Check that the host is reachable and accepts connections using the cephadm SSH key
+
+you may want to run:
+> ceph cephadm get-ssh-config > ssh_config
+> ceph config-key get mgr/cephadm/ssh_identity_key > key
+> ssh -F ssh_config -i key {user}@{host}'''
+            raise OrchestratorError(msg) from e
+        except Exception as ex:
+            self.log.exception(ex)
+            raise
+
+    def _get_container_image(self, daemon_name: str) -> str:
+        daemon_type = daemon_name.split('.', 1)[0]  # type: ignore
+        if daemon_type in CEPH_TYPES or \
+                daemon_type == 'nfs' or \
+                daemon_type == 'iscsi':
+            # get container image
+            ret, image, err = self.check_mon_command({
+                'prefix': 'config get',
+                'who': utils.name_to_config_section(daemon_name),
+                'key': 'container_image',
+            })
+            image = image.strip()  # type: ignore
+        elif daemon_type == 'prometheus':
+            image = self.container_image_prometheus
+        elif daemon_type == 'grafana':
+            image = self.container_image_grafana
+        elif daemon_type == 'alertmanager':
+            image = self.container_image_alertmanager
+        elif daemon_type == 'node-exporter':
+            image = self.container_image_node_exporter
+        else:
+            assert False, daemon_type
+
+        self.log.debug('%s container image %s' % (daemon_name, image))
+
+        return image
+
+    def _run_cephadm(self,
+                     host: str,
+                     entity: Union[CephadmNoImage, str],
+                     command: str,
+                     args: List[str],
+                     addr: Optional[str] = "",
+                     stdin: Optional[str] = "",
+                     no_fsid: Optional[bool] = False,
+                     error_ok: Optional[bool] = False,
+                     image: Optional[str] = "",
+                     env_vars: Optional[List[str]]= None,
+                     ) -> Tuple[List[str], List[str], int]:
+        """
+        Run cephadm on the remote host with the given command + args
+
+        :env_vars: in format -> [KEY=VALUE, ..]
+        """
+        with self._remote_connection(host, addr) as tpl:
+            conn, connr = tpl
             assert image or entity
-            if not image:
-                daemon_type = entity.split('.', 1)[0]  # type: ignore
-                if daemon_type in CEPH_TYPES or \
-                        daemon_type == 'nfs' or \
-                        daemon_type == 'iscsi':
-                    # get container image
-                    ret, image, err = self.check_mon_command({
-                        'prefix': 'config get',
-                        'who': utils.name_to_config_section(entity),
-                        'key': 'container_image',
-                    })
-                    image = image.strip()  # type: ignore
-                elif daemon_type == 'prometheus':
-                    image = self.container_image_prometheus
-                elif daemon_type == 'grafana':
-                    image = self.container_image_grafana
-                elif daemon_type == 'alertmanager':
-                    image = self.container_image_alertmanager
-                elif daemon_type == 'node-exporter':
-                    image = self.container_image_node_exporter
-
-            self.log.debug('%s container image %s' % (entity, image))
+            if not image and entity is not cephadmNoImage:
+                image = self._get_container_image(entity)
 
             final_args = []
 
@@ -996,9 +1136,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                             host, remotes.PYTHONS, remotes.PATH))
                 try:
                     out, err, code = remoto.process.check(
-                        conn,
-                        [python, '-u'],
-                        stdin=script.encode('utf-8'))
+                    conn,
+                    [python, '-u'],
+                    stdin=script.encode('utf-8'))
                 except RuntimeError as e:
                     self._reset_con(host)
                     if error_ok:
@@ -1024,28 +1164,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             if err:
                 self.log.debug('err: %s' % '\n'.join(err))
             if code and not error_ok:
-                raise RuntimeError(
+                raise OrchestratorError(
                     'cephadm exited with an error code: %d, stderr:%s' % (
                         code, '\n'.join(err)))
             return out, err, code
 
-        except execnet.gateway_bootstrap.HostNotFound as e:
-            # this is a misleading exception as it seems to be thrown for
-            # any sort of connection failure, even those having nothing to
-            # do with "host not found" (e.g., ssh key permission denied).
-            self.offline_hosts.add(host)
-            user = 'root' if self.mode == 'root' else 'cephadm'
-            msg = f'''Failed to connect to {host} ({addr}).
-Check that the host is reachable and accepts connections using the cephadm SSH key
-
-you may want to run:
-> ceph cephadm get-ssh-config > ssh_config
-> ceph config-key get mgr/cephadm/ssh_identity_key > key
-> ssh -F ssh_config -i key {user}@{host}'''
-            raise OrchestratorError(msg) from e
-        except Exception as ex:
-            self.log.exception(ex)
-            raise
 
     def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
         return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
@@ -1058,7 +1181,7 @@ you may want to run:
         :param host: host name
         """
         assert_valid_host(spec.hostname)
-        out, err, code = self._run_cephadm(spec.hostname, 'client', 'check-host',
+        out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host',
                                            ['--expect-hostname', spec.hostname],
                                            addr=spec.addr,
                                            error_ok=True, no_fsid=True)
@@ -1093,7 +1216,7 @@ you may want to run:
         return "Removed host '{}'".format(host)
 
     @trivial_completion
-    def update_host_addr(self, host, addr):
+    def update_host_addr(self, host, addr) -> str:
         self.inventory.set_addr(host, addr)
         self._reset_con(host)
         self.event.set()  # refresh stray health check
@@ -1112,17 +1235,40 @@ you may want to run:
         return list(self.inventory.all_specs())
 
     @trivial_completion
-    def add_host_label(self, host, label):
+    def add_host_label(self, host, label) -> str:
         self.inventory.add_label(host, label)
         self.log.info('Added label %s to host %s' % (label, host))
         return 'Added label %s to host %s' % (label, host)
 
     @trivial_completion
-    def remove_host_label(self, host, label):
+    def remove_host_label(self, host, label) -> str:
         self.inventory.rm_label(host, label)
         self.log.info('Removed label %s to host %s' % (label, host))
         return 'Removed label %s from host %s' % (label, host)
 
+    @trivial_completion
+    def host_ok_to_stop(self, hostname: str):
+        if hostname not in self.cache.get_hosts():
+            raise OrchestratorError(f'Cannot find host "{hostname}"')
+
+        daemons = self.cache.get_daemons()
+        daemon_map = defaultdict(lambda: [])
+        for dd in daemons:
+            if dd.hostname == hostname:
+                daemon_map[dd.daemon_type].append(dd.daemon_id)
+
+        for daemon_type,daemon_ids in daemon_map.items():
+            r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
+            if r.retval:
+                self.log.error(f'It is NOT safe to stop host {hostname}')
+                raise orchestrator.OrchestratorError(
+                        r.stderr,
+                        errno=r.retval)
+
+        msg = f'It is presumed safe to stop host {hostname}'
+        self.log.info(msg)
+        return msg
+
     def update_osdspec_previews(self, search_host: str = ''):
         # Set global 'pending' flag for host
         self.cache.loading_osdspec_preview.add(search_host)
@@ -1141,7 +1287,75 @@ you may want to run:
         self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
         return True
 
-    def _refresh_host_daemons(self, host):
+    def _refresh_hosts_and_daemons(self) -> None:
+        bad_hosts = []
+        failures = []
+
+        @forall_hosts
+        def refresh(host):
+            if self.cache.host_needs_check(host):
+                r = self._check_host(host)
+                if r is not None:
+                    bad_hosts.append(r)
+            if self.cache.host_needs_daemon_refresh(host):
+                self.log.debug('refreshing %s daemons' % host)
+                r = self._refresh_host_daemons(host)
+                if r:
+                    failures.append(r)
+
+            if self.cache.host_needs_registry_login(host) and self.registry_url:
+                self.log.debug(f"Logging `{host}` into custom registry")
+                r = self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
+                if r:
+                    bad_hosts.append(r)
+
+            if self.cache.host_needs_device_refresh(host):
+                self.log.debug('refreshing %s devices' % host)
+                r = self._refresh_host_devices(host)
+                if r:
+                    failures.append(r)
+
+            if self.cache.host_needs_osdspec_preview_refresh(host):
+                self.log.debug(f"refreshing OSDSpec previews for {host}")
+                r = self._refresh_host_osdspec_previews(host)
+                if r:
+                    failures.append(r)
+
+            if self.cache.host_needs_new_etc_ceph_ceph_conf(host):
+                self.log.debug(f"deploying new /etc/ceph/ceph.conf on `{host}`")
+                r = self._deploy_etc_ceph_ceph_conf(host)
+                if r:
+                    bad_hosts.append(r)
+
+        refresh(self.cache.get_hosts())
+
+        health_changed = False
+        if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
+            del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
+            health_changed = True
+        if bad_hosts:
+            self.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
+                'severity': 'warning',
+                'summary': '%d hosts fail cephadm check' % len(bad_hosts),
+                'count': len(bad_hosts),
+                'detail': bad_hosts,
+            }
+            health_changed = True
+        if failures:
+            self.health_checks['CEPHADM_REFRESH_FAILED'] = {
+                'severity': 'warning',
+                'summary': 'failed to probe daemons or devices',
+                'count': len(failures),
+                'detail': failures,
+            }
+            health_changed = True
+        elif 'CEPHADM_REFRESH_FAILED' in self.health_checks:
+            del self.health_checks['CEPHADM_REFRESH_FAILED']
+            health_changed = True
+        if health_changed:
+            self.set_health_checks(self.health_checks)
+
+    def _refresh_host_daemons(self, host) -> Optional[str]:
         try:
             out, err, code = self._run_cephadm(
                 host, 'mon', 'ls', [], no_fsid=True)
@@ -1194,7 +1408,7 @@ you may want to run:
         self.cache.save_host(host)
         return None
 
-    def _refresh_host_devices(self, host):
+    def _refresh_host_devices(self, host) -> Optional[str]:
         try:
             out, err, code = self._run_cephadm(
                 host, 'osd',
@@ -1226,15 +1440,51 @@ you may want to run:
         self.cache.save_host(host)
         return None
 
+    def _deploy_etc_ceph_ceph_conf(self, host: str) -> Optional[str]:
+        ret, config, err = self.check_mon_command({
+            "prefix": "config generate-minimal-conf",
+        })
+
+        try:
+            with self._remote_connection(host) as tpl:
+                conn, connr = tpl
+                out, err, code = remoto.process.check(
+                    conn,
+                    ['mkdir', '-p', '/etc/ceph'])
+                if code:
+                    return f'failed to create /etc/ceph on {host}: {err}'
+                out, err, code = remoto.process.check(
+                    conn,
+                    ['dd', 'of=/etc/ceph/ceph.conf'],
+                    stdin=config.encode('utf-8')
+                )
+                if code:
+                    return f'failed to create /etc/ceph/ceph.conf on {host}: {err}'
+                self.cache.update_last_etc_ceph_ceph_conf(host)
+                self.cache.save_host(host)
+        except OrchestratorError as e:
+            return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
+        return None
+
+    def _invalidate_daemons_and_kick_serve(self, filter_host=None):
+        if filter_host:
+            self.cache.invalidate_host_daemons(filter_host)
+        else:
+            for h in self.cache.get_hosts():
+                # Also discover daemons deployed manually
+                self.cache.invalidate_host_daemons(h)
+
+        self._kick_serve_loop()
+
     @trivial_completion
-    def describe_service(self, service_type=None, service_name=None,
-                         refresh=False):
+    def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None,
+                         refresh: bool = False) -> List[orchestrator.ServiceDescription]:
         if refresh:
-            # ugly sync path, FIXME someday perhaps?
-            for host in self.inventory.keys():
-                self._refresh_host_daemons(host)
+            self._invalidate_daemons_and_kick_serve()
+            self.log.info('Kicked serve() loop to refresh all services')
+
         # <service_map>
-        sm = {}  # type: Dict[str, orchestrator.ServiceDescription]
+        sm: Dict[str, orchestrator.ServiceDescription] = {}
         osd_count = 0
         for h, dm in self.cache.get_daemons_with_volatile_status():
             for name, dd in dm.items():
@@ -1248,6 +1498,9 @@ you may want to run:
                     OSDs do not know the affinity to their spec out of the box.
                     """
                     n = f"osd.{dd.osdspec_affinity}"
+                    if not dd.osdspec_affinity:
+                        # If there is no osdspec_affinity, the spec should suffice for displaying
+                        continue
                 if n in self.spec_store.specs:
                     spec = self.spec_store.specs[n]
                 else:
@@ -1265,18 +1518,20 @@ you may want to run:
                         container_image_id=dd.container_image_id,
                         container_image_name=dd.container_image_name,
                         spec=spec,
+                        events=self.events.get_for_service(spec.service_name()),
                     )
                 if n in self.spec_store.specs:
                     if dd.daemon_type == 'osd':
                         """
                         The osd count can't be determined by the Placement spec.
-                        It's rather pointless to show a actual/expected representation 
+                        Showing an actual/expected representation cannot be determined
                         here. So we're setting running = size for now.
                         """
                         osd_count += 1
                         sm[n].size = osd_count
                     else:
-                        sm[n].size = spec.placement.get_host_selection_size(self._get_hosts)
+                        sm[n].size = spec.placement.get_host_selection_size(
+                            self.inventory.all_specs())
 
                     sm[n].created = self.spec_store.spec_created[n]
                     if service_type == 'nfs':
@@ -1301,8 +1556,9 @@ you may want to run:
                 continue
             sm[n] = orchestrator.ServiceDescription(
                 spec=spec,
-                size=spec.placement.get_host_selection_size(self._get_hosts),
+                size=spec.placement.get_host_selection_size(self.inventory.all_specs()),
                 running=0,
+                events=self.events.get_for_service(spec.service_name()),
             )
             if service_type == 'nfs':
                 spec = cast(NFSServiceSpec, spec)
@@ -1310,15 +1566,16 @@ you may want to run:
         return list(sm.values())
 
     @trivial_completion
-    def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None,
-                     host=None, refresh=False):
+    def list_daemons(self,
+                     service_name: Optional[str] = None,
+                     daemon_type: Optional[str] = None,
+                     daemon_id: Optional[str] = None,
+                     host: Optional[str] = None,
+                     refresh: bool = False) -> List[orchestrator.DaemonDescription]:
         if refresh:
-            # ugly sync path, FIXME someday perhaps?
-            if host:
-                self._refresh_host_daemons(host)
-            else:
-                for hostname in self.inventory.keys():
-                    self._refresh_host_daemons(hostname)
+            self._invalidate_daemons_and_kick_serve(host)
+            self.log.info('Kicked serve() loop to refresh all daemons')
+
         result = []
         for h, dm in self.cache.get_daemons_with_volatile_status():
             if host and h != host:
@@ -1334,7 +1591,7 @@ you may want to run:
         return result
 
     @trivial_completion
-    def service_action(self, action, service_name):
+    def service_action(self, action, service_name) -> List[str]:
         args = []
         for host, dm in self.cache.daemons.items():
             for name, d in dm.items():
@@ -1345,47 +1602,67 @@ you may want to run:
         return self._daemon_actions(args)
 
     @forall_hosts
-    def _daemon_actions(self, daemon_type, daemon_id, host, action):
-        return self._daemon_action(daemon_type, daemon_id, host, action)
+    def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str:
+        with set_exception_subject('daemon', DaemonDescription(
+            daemon_type=daemon_type,
+            daemon_id=daemon_id
+        ).name()):
+            return self._daemon_action(daemon_type, daemon_id, host, action)
+
+    def _daemon_action(self, daemon_type, daemon_id, host, action, image=None):
+        daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec(
+            host=host,
+            daemon_id=daemon_id,
+            daemon_type=daemon_type,
+        )
+
+        if image is not None:
+            if action != 'redeploy':
+                raise OrchestratorError(
+                    f'Cannot execute {action} with new image. `action` needs to be `redeploy`')
+            if daemon_type not in CEPH_TYPES:
+                raise OrchestratorError(
+                    f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported '
+                    f'types are: {", ".join(CEPH_TYPES)}')
+
+            self.check_mon_command({
+                'prefix': 'config set',
+                'name': 'container_image',
+                'value': image,
+                'who': utils.name_to_config_section(daemon_type + '.' + daemon_id),
+            })
 
-    def _daemon_action(self, daemon_type, daemon_id, host, action):
         if action == 'redeploy':
             # stop, recreate the container+unit, then restart
-            return self._create_daemon(daemon_type, daemon_id, host)
+            return self._create_daemon(daemon_spec)
         elif action == 'reconfig':
-            return self._create_daemon(daemon_type, daemon_id, host,
-                                       reconfig=True)
+            return self._create_daemon(daemon_spec, reconfig=True)
 
         actions = {
             'start': ['reset-failed', 'start'],
             'stop': ['stop'],
             'restart': ['reset-failed', 'restart'],
         }
-        name = '%s.%s' % (daemon_type, daemon_id)
+        name = daemon_spec.name()
         for a in actions[action]:
-            out, err, code = self._run_cephadm(
-                host, name, 'unit',
-                ['--name', name, a],
-                error_ok=True)
-        self.cache.invalidate_host_daemons(host)
-        return "{} {} from host '{}'".format(action, name, host)
+            try:
+                out, err, code = self._run_cephadm(
+                    host, name, 'unit',
+                    ['--name', name, a])
+            except Exception:
+                self.log.exception(f'`{host}: cephadm unit {name} {a}` failed')
+        self.cache.invalidate_host_daemons(daemon_spec.host)
+        msg = "{} {} from host '{}'".format(action, name, daemon_spec.host)
+        self.events.for_daemon(name, 'INFO', msg)
+        return msg
 
     @trivial_completion
-    def daemon_action(self, action, daemon_type, daemon_id):
-        args = []
-        for host, dm in self.cache.daemons.items():
-            for name, d in dm.items():
-                if d.daemon_type == daemon_type and d.daemon_id == daemon_id:
-                    args.append((d.daemon_type, d.daemon_id,
-                                 d.hostname, action))
-        if not args:
-            raise orchestrator.OrchestratorError(
-                'Unable to find %s.%s daemon(s)' % (
-                    daemon_type, daemon_id))
-        self.log.info('%s daemons %s' % (
-            action.capitalize(),
-            ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
-        return self._daemon_actions(args)
+    def daemon_action(self, action: str, daemon_name: str, image: Optional[str]=None) -> str:
+        d = self.cache.get_daemon(daemon_name)
+
+        self.log.info(f'{action} daemon {daemon_name}')
+        return self._daemon_action(d.daemon_type, d.daemon_id,
+                                 d.hostname, action, image=image)
 
     @trivial_completion
     def remove_daemons(self, names):
@@ -1401,19 +1678,19 @@ you may want to run:
         return self._remove_daemons(args)
 
     @trivial_completion
-    def remove_service(self, service_name):
+    def remove_service(self, service_name) -> str:
         self.log.info('Remove service %s' % service_name)
         self._trigger_preview_refresh(service_name=service_name)
         found = self.spec_store.rm(service_name)
         if found:
             self._kick_serve_loop()
-            return ['Removed service %s' % service_name]
+            return 'Removed service %s' % service_name
         else:
             # must be idempotent: still a success.
-            return [f'Failed to remove service. <{service_name}> was not found.']
+            return f'Failed to remove service. <{service_name}> was not found.'
 
     @trivial_completion
-    def get_inventory(self, host_filter=None, refresh=False):
+    def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]:
         """
         Return the storage inventory of hosts matching the given filter.
 
@@ -1423,24 +1700,26 @@ you may want to run:
           - add filtering by label
         """
         if refresh:
-            # ugly sync path, FIXME someday perhaps?
-            if host_filter:
-                for host in host_filter.hosts:
-                    self._refresh_host_devices(host)
+            if host_filter and host_filter.hosts:
+                for h in host_filter.hosts:
+                    self.cache.invalidate_host_devices(h)
             else:
-                for host in self.inventory.keys():
-                    self._refresh_host_devices(host)
+                for h in self.cache.get_hosts():
+                    self.cache.invalidate_host_devices(h)
+
+            self.event.set()
+            self.log.info('Kicked serve() loop to refresh devices')
 
         result = []
         for host, dls in self.cache.devices.items():
-            if host_filter and host not in host_filter.hosts:
+            if host_filter and host_filter.hosts and host not in host_filter.hosts:
                 continue
             result.append(orchestrator.InventoryHost(host,
                                                      inventory.Devices(dls)))
         return result
 
     @trivial_completion
-    def zap_device(self, host, path):
+    def zap_device(self, host, path) -> str:
         self.log.info('Zap device %s:%s' % (host, path))
         out, err, code = self._run_cephadm(
             host, 'osd', 'ceph-volume',
@@ -1452,7 +1731,7 @@ you may want to run:
         return '\n'.join(out + err)
 
     @trivial_completion
-    def blink_device_light(self, ident_fault, on, locs):
+    def blink_device_light(self, ident_fault, on, locs) -> List[str]:
         @forall_hosts
         def blink(host, dev, path):
             cmd = [
@@ -1466,7 +1745,7 @@ you may want to run:
                 host, 'osd', 'shell', ['--'] + cmd,
                 error_ok=True)
             if code:
-                raise RuntimeError(
+                raise OrchestratorError(
                     'Unable to affect %s light for %s:%s. Command: %s' % (
                         ident_fault, host, dev, ' '.join(cmd)))
             self.log.info('Set %s light for %s:%s %s' % (
@@ -1491,58 +1770,50 @@ you may want to run:
                 r[str(osd_id)] = o.get('uuid', '')
         return r
 
-    def resolve_hosts_for_osdspecs(self,
-                                   specs: Optional[List[DriveGroupSpec]] = None,
-                                   service_name: Optional[str] = None
-                                   ) -> List[str]:
-        osdspecs = []
-        if service_name:
-            self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
-            osdspecs = self.spec_store.find(service_name=service_name)
-            self.log.debug(f"Found OSDSpecs: {osdspecs}")
-        if specs:
-            osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
-        if not service_name and not specs:
-            # if neither parameters are fulfilled, search for all available osdspecs
-            osdspecs = self.spec_store.find(service_name='osd')
-            self.log.debug(f"Found OSDSpecs: {osdspecs}")
-        if not osdspecs:
-            self.log.debug("No OSDSpecs found")
-            return []
-        return sum([spec.placement.filter_matching_hosts(self._get_hosts) for spec in osdspecs], [])
-
-    def resolve_osdspecs_for_host(self, host):
-        matching_specs = []
-        self.log.debug(f"Finding OSDSpecs for host: <{host}>")
-        for spec in self.spec_store.find('osd'):
-            if host in spec.placement.filter_matching_hosts(self._get_hosts):
-                self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
-                matching_specs.append(spec)
-        return matching_specs
-
     def _trigger_preview_refresh(self,
                                  specs: Optional[List[DriveGroupSpec]] = None,
-                                 service_name: Optional[str] = None):
-        refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+                                 service_name: Optional[str] = None,
+                                 ) -> None:
+        # Only trigger a refresh when a spec has changed
+        trigger_specs = []
+        if specs:
+            for spec in specs:
+                preview_spec = self.spec_store.spec_preview.get(spec.service_name())
+                # the to-be-preview spec != the actual spec, this means we need to
+                # trigger a refresh, if the spec has been removed (==None) we need to
+                # refresh as well.
+                if not preview_spec or spec != preview_spec:
+                    trigger_specs.append(spec)
+        if service_name:
+            trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))]
+        if not any(trigger_specs):
+            return None
+
+        refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs)
         for host in refresh_hosts:
             self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
             self.cache.osdspec_previews_refresh_queue.append(host)
 
     @trivial_completion
-    def apply_drivegroups(self, specs: List[DriveGroupSpec]):
-        self._trigger_preview_refresh(specs=specs)
+    def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]:
+        """
+        Deprecated. Please use `apply()` instead.
+
+        Keeping this around to be compapatible to mgr/dashboard
+        """
         return [self._apply(spec) for spec in specs]
 
     @trivial_completion
-    def create_osds(self, drive_group: DriveGroupSpec):
-        return self.osd_service.create(drive_group)
+    def create_osds(self, drive_group: DriveGroupSpec) -> str:
+        return self.osd_service.create_from_spec(drive_group)
 
-    @trivial_completion
-    def preview_osdspecs(self,
-                         osdspec_name: Optional[str] = None,
-                         osdspecs: Optional[List[DriveGroupSpec]] = None
-                         ):
-        matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+    def _preview_osdspecs(self,
+                          osdspecs: Optional[List[DriveGroupSpec]] = None
+                          ):
+        if not osdspecs:
+            return {'n/a': [{'error': True,
+                             'message': 'No OSDSpec or matching hosts found.'}]}
+        matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs)
         if not matching_hosts:
             return {'n/a': [{'error': True,
                              'message': 'No OSDSpec or matching hosts found.'}]}
@@ -1552,9 +1823,18 @@ you may want to run:
             # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
             return {'n/a': [{'error': True,
                              'message': 'Preview data is being generated.. '
-                                        'Please try again in a bit.'}]}
-        # drop all keys that are not in search_hosts and return preview struct
-        return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
+                                        'Please re-run this command in a bit.'}]}
+        # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs
+        previews_for_specs = {}
+        for host, raw_reports in self.cache.osdspec_previews.items():
+            if host not in matching_hosts:
+                continue
+            osd_reports = []
+            for osd_report in raw_reports:
+                if osd_report.get('osdspec') in [x.service_id for x in osdspecs]:
+                    osd_reports.append(osd_report)
+            previews_for_specs.update({host: osd_reports})
+        return previews_for_specs
 
     def _calc_daemon_deps(self, daemon_type, daemon_id):
         need = {
@@ -1568,13 +1848,13 @@ you may want to run:
                 deps.append(dd.name())
         return sorted(deps)
 
-    def _get_config_and_keyring(self, daemon_type, daemon_id,
+    def _get_config_and_keyring(self, daemon_type, daemon_id, host,
                                 keyring=None,
                                 extra_ceph_config=None):
-        # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
+        # type: (str, str, str, Optional[str], Optional[str]) -> Dict[str, Any]
         # keyring
         if not keyring:
-            ename = utils.name_to_auth_entity(daemon_type + '.' + daemon_id)
+            ename = utils.name_to_auth_entity(daemon_type, daemon_id, host=host)
             ret, keyring, err = self.check_mon_command({
                 'prefix': 'auth get',
                 'entity': ename,
@@ -1593,91 +1873,78 @@ you may want to run:
         }
 
     def _create_daemon(self,
-                       daemon_type: str,
-                       daemon_id: str,
-                       host: str,
-                       keyring: Optional[str] = None,
-                       extra_args: Optional[List[str]] = None,
-                       extra_config: Optional[Dict[str, Any]] = None,
+                       daemon_spec: CephadmDaemonSpec,
                        reconfig=False,
                        osd_uuid_map: Optional[Dict[str, Any]] = None,
                        redeploy=False,
                        ) -> str:
 
-        if not extra_args:
-            extra_args = []
-        if not extra_config:
-            extra_config = {}
-        name = '%s.%s' % (daemon_type, daemon_id)
-
-        start_time = datetime.datetime.utcnow()
-        deps = []  # type: List[str]
-        cephadm_config = {}  # type: Dict[str, Any]
-        if daemon_type == 'prometheus':
-            cephadm_config, deps = self.prometheus_service.generate_config()
-            extra_args.extend(['--config-json', '-'])
-        elif daemon_type == 'grafana':
-            cephadm_config, deps = self.grafana_service.generate_config()
-            extra_args.extend(['--config-json', '-'])
-        elif daemon_type == 'nfs':
-            cephadm_config, deps = \
-                    self.nfs_service._generate_nfs_config(daemon_type, daemon_id, host)
-            extra_args.extend(['--config-json', '-'])
-        elif daemon_type == 'alertmanager':
-            cephadm_config, deps = self.alertmanager_service.generate_config()
-            extra_args.extend(['--config-json', '-'])
-        elif daemon_type == 'node-exporter':
-            cephadm_config, deps = self.node_exporter_service.generate_config()
-            extra_args.extend(['--config-json', '-'])
-        else:
-            # Ceph.daemons (mon, mgr, mds, osd, etc)
-            cephadm_config = self._get_config_and_keyring(
-                    daemon_type, daemon_id,
-                    keyring=keyring,
-                    extra_ceph_config=extra_config.pop('config', ''))
-            if extra_config:
-                cephadm_config.update({'files': extra_config})
-            extra_args.extend(['--config-json', '-'])
+
+        with set_exception_subject('service', orchestrator.DaemonDescription(
+                daemon_type=daemon_spec.daemon_type,
+                daemon_id=daemon_spec.daemon_id,
+                hostname=daemon_spec.host,
+        ).service_id(), overwrite=True):
+
+            start_time = datetime.datetime.utcnow()
+            cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config(daemon_spec)
+
+            daemon_spec.extra_args.extend(['--config-json', '-'])
+
+            # TCP port to open in the host firewall
+            if daemon_spec.ports:
+                daemon_spec.extra_args.extend(['--tcp-ports', ' '.join(map(str,daemon_spec.ports))])
 
             # osd deployments needs an --osd-uuid arg
-            if daemon_type == 'osd':
+            if daemon_spec.daemon_type == 'osd':
                 if not osd_uuid_map:
                     osd_uuid_map = self.get_osd_uuid_map()
-                osd_uuid = osd_uuid_map.get(daemon_id)
+                osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
                 if not osd_uuid:
-                    raise OrchestratorError('osd.%s not in osdmap' % daemon_id)
-                extra_args.extend(['--osd-fsid', osd_uuid])
+                    raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
+                daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
 
-        if reconfig:
-            extra_args.append('--reconfig')
-        if self.allow_ptrace:
-            extra_args.append('--allow-ptrace')
+            if reconfig:
+                daemon_spec.extra_args.append('--reconfig')
+            if self.allow_ptrace:
+                daemon_spec.extra_args.append('--allow-ptrace')
 
-        self.log.info('%s daemon %s on %s' % (
-            'Reconfiguring' if reconfig else 'Deploying',
-            name, host))
+            if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url:
+                self._registry_login(daemon_spec.host, self.registry_url, self.registry_username, self.registry_password)
 
-        out, err, code = self._run_cephadm(
-            host, name, 'deploy',
-            [
-                '--name', name,
-            ] + extra_args,
-            stdin=json.dumps(cephadm_config))
-        if not code and host in self.cache.daemons:
-            # prime cached service state with what we (should have)
-            # just created
-            sd = orchestrator.DaemonDescription()
-            sd.daemon_type = daemon_type
-            sd.daemon_id = daemon_id
-            sd.hostname = host
-            sd.status = 1
-            sd.status_desc = 'starting'
-            self.cache.add_daemon(host, sd)
-        self.cache.invalidate_host_daemons(host)
-        self.cache.update_daemon_config_deps(host, name, deps, start_time)
-        self.cache.save_host(host)
-        return "{} {} on host '{}'".format(
-            'Reconfigured' if reconfig else 'Deployed', name, host)
+            self.log.info('%s daemon %s on %s' % (
+                'Reconfiguring' if reconfig else 'Deploying',
+                daemon_spec.name(), daemon_spec.host))
+
+            out, err, code = self._run_cephadm(
+                daemon_spec.host, daemon_spec.name(), 'deploy',
+                [
+                    '--name', daemon_spec.name(),
+                ] + daemon_spec.extra_args,
+                stdin=json.dumps(cephadm_config))
+            if not code and daemon_spec.host in self.cache.daemons:
+                # prime cached service state with what we (should have)
+                # just created
+                sd = orchestrator.DaemonDescription()
+                sd.daemon_type = daemon_spec.daemon_type
+                sd.daemon_id = daemon_spec.daemon_id
+                sd.hostname = daemon_spec.host
+                sd.status = 1
+                sd.status_desc = 'starting'
+                self.cache.add_daemon(daemon_spec.host, sd)
+                if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
+                    self.requires_post_actions.add(daemon_spec.daemon_type)
+            self.cache.invalidate_host_daemons(daemon_spec.host)
+            self.cache.update_daemon_config_deps(daemon_spec.host, daemon_spec.name(), deps, start_time)
+            self.cache.save_host(daemon_spec.host)
+            msg = "{} {} on host '{}'".format(
+                'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
+            if not code:
+                self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
+            else:
+                what = 'reconfigure' if reconfig else 'deploy'
+                self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
+            return msg
 
     @forall_hosts
     def _remove_daemons(self, name, host) -> str:
@@ -1688,55 +1955,34 @@ you may want to run:
         Remove a daemon
         """
         (daemon_type, daemon_id) = name.split('.', 1)
-        if daemon_type == 'mon':
-            self._check_safe_to_destroy_mon(daemon_id)
 
-            # remove mon from quorum before we destroy the daemon
-            self.log.info('Removing monitor %s from monmap...' % name)
-            ret, out, err = self.check_mon_command({
-                'prefix': 'mon rm',
-                'name': daemon_id,
-            })
+        with set_exception_subject('service', orchestrator.DaemonDescription(
+                daemon_type=daemon_type,
+                daemon_id=daemon_id,
+                hostname=host,
+        ).service_id(), overwrite=True):
 
-        args = ['--name', name, '--force']
-        self.log.info('Removing daemon %s from %s' % (name, host))
-        out, err, code = self._run_cephadm(
-            host, name, 'rm-daemon', args)
-        if not code:
-            # remove item from cache
-            self.cache.rm_daemon(host, name)
-        self.cache.invalidate_host_daemons(host)
-        return "Removed {} from host '{}'".format(name, host)
-
-    def _create_fn(self, service_type: str) -> Callable[..., str]:
-        try:
-            d: Dict[str, function] = {
-                'mon': self.mon_service.create,
-                'mgr': self.mgr_service.create,
-                'osd': self.osd_service.create,
-                'mds': self.mds_service.create,
-                'rgw': self.rgw_service.create,
-                'rbd-mirror': self.rbd_mirror_service.create,
-                'nfs': self.nfs_service.create,
-                'grafana': self.grafana_service.create,
-                'alertmanager': self.alertmanager_service.create,
-                'prometheus': self.prometheus_service.create,
-                'node-exporter': self.node_exporter_service.create,
-                'crash': self.crash_service.create,
-                'iscsi': self.iscsi_service.create,
-            }
-            return d[service_type]  # type: ignore
-        except KeyError:
-            self.log.exception(f'unknown service type {service_type}')
-            raise OrchestratorError(f'unknown service type {service_type}') from e
+
+            self.cephadm_services[daemon_type].pre_remove(daemon_id)
+
+            args = ['--name', name, '--force']
+            self.log.info('Removing daemon %s from %s' % (name, host))
+            out, err, code = self._run_cephadm(
+                host, name, 'rm-daemon', args)
+            if not code:
+                # remove item from cache
+                self.cache.rm_daemon(host, name)
+            self.cache.invalidate_host_daemons(host)
+            return "Removed {} from host '{}'".format(name, host)
 
     def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
-        return {
+        fn = {
             'mds': self.mds_service.config,
             'rgw': self.rgw_service.config,
             'nfs': self.nfs_service.config,
             'iscsi': self.iscsi_service.config,
         }.get(service_type)
+        return cast(Callable[[ServiceSpec], None], fn)
 
     def _apply_service(self, spec: ServiceSpec) -> bool:
         """
@@ -1746,15 +1992,17 @@ you may want to run:
         daemon_type = spec.service_type
         service_name = spec.service_name()
         if spec.unmanaged:
-            self.log.debug('Skipping unmanaged service %s spec' % service_name)
+            self.log.debug('Skipping unmanaged service %s' % service_name)
+            return False
+        if spec.preview_only:
+            self.log.debug('Skipping preview_only service %s' % service_name)
             return False
         self.log.debug('Applying service %s spec' % service_name)
 
-        create_func = self._create_fn(daemon_type)
         config_func = self._config_fn(daemon_type)
 
         if daemon_type == 'osd':
-            create_func(spec)
+            self.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
             # TODO: return True would result in a busy loop
             return False
 
@@ -1779,12 +2027,15 @@ you may want to run:
             # host
             return len(self.cache.networks[host].get(public_network, [])) > 0
 
-        hosts = HostAssignment(
+        ha = HostAssignment(
             spec=spec,
             get_hosts_func=self._get_hosts,
             get_daemons_func=self.cache.get_daemons_by_service,
             filter_new_host=matches_network if daemon_type == 'mon' else None,
-        ).place()
+        )
+
+        hosts: List[HostPlacementSpec] = ha.place()
+        self.log.debug('Usable hosts: %s' % hosts)
 
         r = False
 
@@ -1795,42 +2046,55 @@ you may want to run:
 
         # add any?
         did_config = False
-        hosts_with_daemons = {d.hostname for d in daemons}
-        self.log.debug('hosts with daemons: %s' % hosts_with_daemons)
-        for host, network, name in hosts:
-            if host not in hosts_with_daemons:
-                if not did_config and config_func:
-                    config_func(spec)
-                    did_config = True
-                daemon_id = self.get_unique_name(daemon_type, host, daemons,
-                                                 prefix=spec.service_id,
-                                                 forcename=name)
-                self.log.debug('Placing %s.%s on host %s' % (
-                    daemon_type, daemon_id, host))
-                if daemon_type == 'mon':
-                    create_func(daemon_id, host, network)  # type: ignore
-                elif daemon_type in ['nfs', 'iscsi']:
-                    create_func(daemon_id, host, spec)  # type: ignore
+
+        add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
+        self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
+
+        remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
+        self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
+
+        for host, network, name in add_daemon_hosts:
+            daemon_id = self.get_unique_name(daemon_type, host, daemons,
+                                             prefix=spec.service_id,
+                                             forcename=name)
+
+            if not did_config and config_func:
+                if daemon_type == 'rgw':
+                    rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
+                    rgw_config_func(cast(RGWSpec, spec), daemon_id)
                 else:
-                    create_func(daemon_id, host)  # type: ignore
+                    config_func(spec)
+                did_config = True
 
-                # add to daemon list so next name(s) will also be unique
-                sd = orchestrator.DaemonDescription(
-                    hostname=host,
-                    daemon_type=daemon_type,
-                    daemon_id=daemon_id,
-                )
-                daemons.append(sd)
-                r = True
+            daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
+            self.log.debug('Placing %s.%s on host %s' % (
+                daemon_type, daemon_id, host))
+
+            self.cephadm_services[daemon_type].create(daemon_spec)
+
+            # add to daemon list so next name(s) will also be unique
+            sd = orchestrator.DaemonDescription(
+                hostname=host,
+                daemon_type=daemon_type,
+                daemon_id=daemon_id,
+            )
+            daemons.append(sd)
+            r = True
 
         # remove any?
-        target_hosts = [h.hostname for h in hosts]
-        for d in daemons:
-            if d.hostname not in target_hosts:
-                # NOTE: we are passing the 'force' flag here, which means
-                # we can delete a mon instances data.
-                self._remove_daemon(d.name(), d.hostname)
-                r = True
+        def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
+            daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
+            r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
+            return not r.retval
+
+        while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
+            # let's find a subset that is ok-to-stop
+            remove_daemon_hosts.pop()
+        for d in remove_daemon_hosts:
+            # NOTE: we are passing the 'force' flag here, which means
+            # we can delete a mon instances data.
+            self._remove_daemon(d.name(), d.hostname)
+            r = True
 
         return r
 
@@ -1846,6 +2110,8 @@ you may want to run:
             except Exception as e:
                 self.log.exception('Failed to apply %s spec %s: %s' % (
                     spec.service_name(), spec, e))
+                self.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
+
         return r
 
     def _check_pool_exists(self, pool, service_name):
@@ -1855,15 +2121,9 @@ you may want to run:
                                     f'service {service_name}')
 
     def _check_daemons(self):
-        # get monmap mtime so we can refresh configs when mons change
-        monmap = self.get('mon_map')
-        last_monmap: Optional[datetime.datetime] = datetime.datetime.strptime(
-            monmap['modified'], CEPH_DATEFMT)
-        if last_monmap and last_monmap > datetime.datetime.utcnow():
-            last_monmap = None   # just in case clocks are skewed
 
         daemons = self.cache.get_daemons()
-        daemons_post = defaultdict(list)
+        daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
         for dd in daemons:
             # orphan?
             spec = self.spec_store.specs.get(dd.service_name(), None)
@@ -1878,8 +2138,14 @@ you may want to run:
                 continue
 
             # These daemon types require additional configs after creation
-            if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+            if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
                 daemons_post[dd.daemon_type].append(dd)
+            
+            if self.cephadm_services[dd.daemon_type].get_active_daemon(
+               self.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
+                dd.is_active = True
+            else:
+                dd.is_active = False
 
             deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
             last_deps, last_config = self.cache.get_daemon_last_config_deps(
@@ -1897,18 +2163,35 @@ you may want to run:
                 self.log.info('Reconfiguring %s (dependencies changed)...' % (
                     dd.name()))
                 reconfig = True
-            elif last_monmap and \
-               last_monmap > last_config and \
+            elif self.last_monmap and \
+                    self.last_monmap > last_config and \
                dd.daemon_type in CEPH_TYPES:
                 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
                 reconfig = True
             if reconfig:
-                self._create_daemon(dd.daemon_type, dd.daemon_id,
-                                    dd.hostname, reconfig=True)
+                try:
+                    self._create_daemon(
+                        CephadmDaemonSpec(
+                            host=dd.hostname,
+                            daemon_id=dd.daemon_id,
+                            daemon_type=dd.daemon_type),
+                        reconfig=True)
+                except OrchestratorError as e:
+                    self.events.from_orch_error(e)
+                    if dd.daemon_type in daemons_post:
+                        del daemons_post[dd.daemon_type]
+                    # continue...
+                except Exception as e:
+                    self.events.for_daemon_from_exception(dd.name(), e)
+                    if dd.daemon_type in daemons_post:
+                        del daemons_post[dd.daemon_type]
+                    # continue...
 
         # do daemon post actions
         for daemon_type, daemon_descs in daemons_post.items():
-            self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
+            if daemon_type in self.requires_post_actions:
+                self.requires_post_actions.remove(daemon_type)
+                self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
 
     def _add_daemon(self, daemon_type, spec,
                     create_func: Callable[..., T], config_func=None) -> List[T]:
@@ -1932,22 +2215,25 @@ you may want to run:
             raise OrchestratorError('too few hosts: want %d, have %s' % (
                 count, hosts))
 
-        if config_func:
-            config_func(spec)
+        did_config = False
 
-        args = []  # type: List[tuple]
+        args = []  # type: List[CephadmDaemonSpec]
         for host, network, name in hosts:
             daemon_id = self.get_unique_name(daemon_type, host, daemons,
                                              prefix=spec.service_id,
                                              forcename=name)
+
+            if not did_config and config_func:
+                if daemon_type == 'rgw':
+                    config_func(spec, daemon_id)
+                else:
+                    config_func(spec)
+                did_config = True
+
+            daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec(host, daemon_id, network, spec)
             self.log.debug('Placing %s.%s on host %s' % (
                 daemon_type, daemon_id, host))
-            if daemon_type == 'mon':
-                args.append((daemon_id, host, network))  # type: ignore
-            elif daemon_type in ['nfs', 'iscsi']:
-                args.append((daemon_id, host, spec))  # type: ignore
-            else:
-                args.append((daemon_id, host))  # type: ignore
+            args.append(daemon_spec)
 
             # add to daemon list so next name(s) will also be unique
             sd = orchestrator.DaemonDescription(
@@ -1964,7 +2250,7 @@ you may want to run:
         return create_func_map(args)
 
     @trivial_completion
-    def apply_mon(self, spec):
+    def apply_mon(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -1978,11 +2264,54 @@ you may want to run:
         return self._add_daemon('mgr', spec, self.mgr_service.create)
 
     def _apply(self, spec: GenericSpec) -> str:
+        self.migration.verify_no_migration()
+
         if spec.service_type == 'host':
             return self._add_host(cast(HostSpec, spec))
 
+        if spec.service_type == 'osd':
+            # _trigger preview refresh needs to be smart and
+            # should only refresh if a change has been detected
+            self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)])
+
         return self._apply_service_spec(cast(ServiceSpec, spec))
 
+    def _plan(self, spec: ServiceSpec):
+        if spec.service_type == 'osd':
+            return {'service_name': spec.service_name(),
+                    'service_type': spec.service_type,
+                    'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])}
+
+        ha = HostAssignment(
+            spec=spec,
+            get_hosts_func=self._get_hosts,
+            get_daemons_func=self.cache.get_daemons_by_service,
+        )
+        ha.validate()
+        hosts = ha.place()
+
+        add_daemon_hosts = ha.add_daemon_hosts(hosts)
+        remove_daemon_hosts = ha.remove_daemon_hosts(hosts)
+
+        return {
+            'service_name': spec.service_name(),
+            'service_type': spec.service_type,
+            'add': [hs.hostname for hs in add_daemon_hosts],
+            'remove': [d.hostname for d in remove_daemon_hosts]
+        }
+
+    @trivial_completion
+    def plan(self, specs: List[GenericSpec]) -> List:
+        results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n'
+                               'to the current inventory setup. If any on these conditions changes, the \n'
+                               'preview will be invalid. Please make sure to have a minimal \n'
+                               'timeframe between planning and applying the specs.'}]
+        if any([spec.service_type == 'host' for spec in specs]):
+            return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}]
+        for spec in specs:
+            results.append(self._plan(cast(ServiceSpec, spec)))
+        return results
+
     def _apply_service_spec(self, spec: ServiceSpec) -> str:
         if spec.placement.is_empty():
             # fill in default placement
@@ -2020,30 +2349,30 @@ you may want to run:
         return "Scheduled %s update..." % spec.service_name()
 
     @trivial_completion
-    def apply(self, specs: List[GenericSpec]):
+    def apply(self, specs: List[GenericSpec]) -> List[str]:
         results = []
         for spec in specs:
             results.append(self._apply(spec))
         return results
 
     @trivial_completion
-    def apply_mgr(self, spec):
+    def apply_mgr(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
-    def add_mds(self, spec: ServiceSpec):
+    def add_mds(self, spec: ServiceSpec) -> List[str]:
         return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
 
     @trivial_completion
-    def apply_mds(self, spec: ServiceSpec):
+    def apply_mds(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
-    def add_rgw(self, spec):
+    def add_rgw(self, spec) -> List[str]:
         return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
 
     @trivial_completion
-    def apply_rgw(self, spec):
+    def apply_rgw(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2052,23 +2381,23 @@ you may want to run:
         return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
 
     @trivial_completion
-    def apply_iscsi(self, spec):
+    def apply_iscsi(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
-    def add_rbd_mirror(self, spec):
+    def add_rbd_mirror(self, spec) -> List[str]:
         return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
 
     @trivial_completion
-    def apply_rbd_mirror(self, spec):
+    def apply_rbd_mirror(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
-    def add_nfs(self, spec):
+    def add_nfs(self, spec) -> List[str]:
         return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
 
     @trivial_completion
-    def apply_nfs(self, spec):
+    def apply_nfs(self, spec) -> str:
         return self._apply(spec)
 
     def _get_dashboard_url(self):
@@ -2076,11 +2405,11 @@ you may want to run:
         return self.get('mgr_map').get('services', {}).get('dashboard', '')
 
     @trivial_completion
-    def add_prometheus(self, spec):
+    def add_prometheus(self, spec) -> List[str]:
         return self._add_daemon('prometheus', spec, self.prometheus_service.create)
 
     @trivial_completion
-    def apply_prometheus(self, spec):
+    def apply_prometheus(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2090,7 +2419,7 @@ you may want to run:
                                 self.node_exporter_service.create)
 
     @trivial_completion
-    def apply_node_exporter(self, spec):
+    def apply_node_exporter(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2100,7 +2429,7 @@ you may want to run:
                                 self.crash_service.create)
 
     @trivial_completion
-    def apply_crash(self, spec):
+    def apply_crash(self, spec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2109,7 +2438,7 @@ you may want to run:
         return self._add_daemon('grafana', spec, self.grafana_service.create)
 
     @trivial_completion
-    def apply_grafana(self, spec: ServiceSpec):
+    def apply_grafana(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     @trivial_completion
@@ -2118,7 +2447,7 @@ you may want to run:
         return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
 
     @trivial_completion
-    def apply_alertmanager(self, spec: ServiceSpec):
+    def apply_alertmanager(self, spec: ServiceSpec) -> str:
         return self._apply(spec)
 
     def _get_container_image_id(self, image_name):
@@ -2129,8 +2458,10 @@ you may want to run:
             break
         if not host:
             raise OrchestratorError('no hosts defined')
+        if self.cache.host_needs_registry_login(host) and self.registry_url:
+            self._registry_login(host, self.registry_url, self.registry_username, self.registry_password)
         out, err, code = self._run_cephadm(
-            host, None, 'pull', [],
+            host, '', 'pull', [],
             image=image_name,
             no_fsid=True,
             error_ok=True)
@@ -2145,7 +2476,7 @@ you may want to run:
         return image_id, ceph_version
 
     @trivial_completion
-    def upgrade_check(self, image, version):
+    def upgrade_check(self, image, version) -> str:
         if version:
             target_name = self.container_image_base + ':v' + version
         elif image:
@@ -2176,57 +2507,79 @@ you may want to run:
         return json.dumps(r, indent=4, sort_keys=True)
 
     @trivial_completion
-    def upgrade_status(self):
+    def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
         return self.upgrade.upgrade_status()
 
     @trivial_completion
-    def upgrade_start(self, image, version):
+    def upgrade_start(self, image, version) -> str:
         return self.upgrade.upgrade_start(image, version)
 
     @trivial_completion
-    def upgrade_pause(self):
+    def upgrade_pause(self) -> str:
         return self.upgrade.upgrade_pause()
 
     @trivial_completion
-    def upgrade_resume(self):
+    def upgrade_resume(self) -> str:
         return self.upgrade.upgrade_resume()
 
     @trivial_completion
-    def upgrade_stop(self):
+    def upgrade_stop(self) -> str:
         return self.upgrade.upgrade_stop()
 
     @trivial_completion
     def remove_osds(self, osd_ids: List[str],
                     replace: bool = False,
-                    force: bool = False):
+                    force: bool = False) -> str:
         """
         Takes a list of OSDs and schedules them for removal.
         The function that takes care of the actual removal is
-        _remove_osds_bg().
+        process_removal_queue().
         """
 
-        daemons = self.cache.get_daemons_by_service('osd')
-        found: Set[OSDRemoval] = set()
+        daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
+        to_remove_daemons = list()
         for daemon in daemons:
-            if daemon.daemon_id not in osd_ids:
-                continue
-            found.add(OSDRemoval(daemon.daemon_id, replace, force,
-                                 daemon.hostname, daemon.name(),
-                                 datetime.datetime.utcnow(), -1))
+            if daemon.daemon_id in osd_ids:
+                to_remove_daemons.append(daemon)
+        if not to_remove_daemons:
+            return f"Unable to find OSDs: {osd_ids}"
 
-        not_found = {osd_id for osd_id in osd_ids if osd_id not in [x.osd_id for x in found]}
-        if not_found:
-            raise OrchestratorError('Unable to find OSD: %s' % not_found)
-
-        self.rm_util.queue_osds_for_removal(found)
+        for daemon in to_remove_daemons:
+            try:
+                self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
+                                                replace=replace,
+                                                force=force,
+                                                hostname=daemon.hostname,
+                                                fullname=daemon.name(),
+                                                process_started_at=datetime.datetime.utcnow(),
+                                                remove_util=self.rm_util))
+            except NotFoundError:
+                return f"Unable to find OSDs: {osd_ids}"
 
         # trigger the serve loop to initiate the removal
         self._kick_serve_loop()
         return "Scheduled OSD(s) for removal"
 
+    @trivial_completion
+    def stop_remove_osds(self, osd_ids: List[str]):
+        """
+        Stops a `removal` process for a List of OSDs.
+        This will revert their weight and remove it from the osds_to_remove queue
+        """
+        for osd_id in osd_ids:
+            try:
+                self.to_remove_osds.rm(OSD(osd_id=int(osd_id),
+                                           remove_util=self.rm_util))
+            except (NotFoundError, KeyError):
+                return f'Unable to find OSD in the queue: {osd_id}'
+
+        # trigger the serve loop to halt the removal
+        self._kick_serve_loop()
+        return "Stopped OSD(s) removal"
+
     @trivial_completion
     def remove_osds_status(self):
         """
         The CLI call to retrieve an osd removal report
         """
-        return self.rm_util.report
+        return self.to_remove_osds.all_osds()