]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / module.py
index 537c4d4102fa6ddb76e63512202dd25db0c8fd7a..de9673aa039320f7540a3117a78b999721349969 100644 (file)
@@ -1,21 +1,14 @@
 import json
 import errno
 import logging
-import time
-from copy import copy
-from threading import Event
+from collections import defaultdict
 from functools import wraps
-
-from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
+from tempfile import TemporaryDirectory
+from threading import Event
 
 import string
-try:
-    from typing import List, Dict, Optional, Callable, Tuple, TypeVar, Type, \
-        Any, NamedTuple, Iterator, Set, Sequence
-    from typing import TYPE_CHECKING, cast
-except ImportError:
-    TYPE_CHECKING = False  # just for type checking
-
+from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
+    Any, Set, TYPE_CHECKING, cast
 
 import datetime
 import six
@@ -23,30 +16,45 @@ import os
 import random
 import tempfile
 import multiprocessing.pool
-import re
 import shutil
 import subprocess
-import uuid
 
-from ceph.deployment import inventory, translate
+from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.drive_selection.selector import DriveSelection
 from ceph.deployment.service_spec import \
-    HostPlacementSpec, NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
+    NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host
 
 from mgr_module import MgrModule, HandleCommandResult
 import orchestrator
 from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
     CLICommandMeta
+from orchestrator._interface import GenericSpec
 
 from . import remotes
 from . import utils
-from .nfs import NFSGanesha
-from .osd import RemoveUtil, OSDRemoval
-
+from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
+    RbdMirrorService, CrashService, CephadmService
+from .services.iscsi import IscsiService
+from .services.nfs import NFSService
+from .services.osd import RemoveUtil, OSDRemoval, OSDService
+from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
+    NodeExporterService
+from .schedule import HostAssignment
+from .inventory import Inventory, SpecStore, HostCache
+from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade
+from .template import TemplateMgr
 
 try:
     import remoto
+    # NOTE(mattoliverau) Patch remoto until remoto PR
+    # (https://github.com/alfredodeza/remoto/pull/56) lands
+    from distutils.version import StrictVersion
+    if StrictVersion(remoto.__version__) <= StrictVersion('1.2'):
+        def remoto_has_connection(self):
+            return self.gateway.hasreceiver()
+
+        from remoto.backends import BaseConnection
+        BaseConnection.has_connection = remoto_has_connection
     import remoto.process
     import execnet.gateway_bootstrap
 except ImportError as e:
@@ -60,6 +68,8 @@ except ImportError:
 
 logger = logging.getLogger(__name__)
 
+T = TypeVar('T')
+
 DEFAULT_SSH_CONFIG = """
 Host *
   User root
@@ -71,494 +81,54 @@ Host *
 DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
 CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
 
-HOST_CACHE_PREFIX = "host."
-SPEC_STORE_PREFIX = "spec."
-
-# ceph daemon types that use the ceph container image.
-# NOTE: listed in upgrade order!
-CEPH_UPGRADE_ORDER = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror']
 CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
 
 
-# for py2 compat
-try:
-    from tempfile import TemporaryDirectory # py3
-except ImportError:
-    # define a minimal (but sufficient) equivalent for <= py 3.2
-    class TemporaryDirectory(object): # type: ignore
-        def __init__(self):
-            self.name = tempfile.mkdtemp()
-
-        def __enter__(self):
-            if not self.name:
-                self.name = tempfile.mkdtemp()
-            return self.name
-
-        def cleanup(self):
-            shutil.rmtree(self.name)
-
-        def __exit__(self, exc_type, exc_value, traceback):
-            self.cleanup()
-
-
-class SpecStore():
-    def __init__(self, mgr):
-        # type: (CephadmOrchestrator) -> None
-        self.mgr = mgr
-        self.specs = {} # type: Dict[str, ServiceSpec]
-        self.spec_created = {} # type: Dict[str, datetime.datetime]
+def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]:
+    @wraps(f)
+    def forall_hosts_wrapper(*args) -> List[T]:
+
+        # Some weired logic to make calling functions with multiple arguments work.
+        if len(args) == 1:
+            vals = args[0]
+            self = None
+        elif len(args) == 2:
+            self, vals = args
+        else:
+            assert 'either f([...]) or self.f([...])'
 
-    def load(self):
-        # type: () -> None
-        for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
-            service_name = k[len(SPEC_STORE_PREFIX):]
+        def do_work(arg):
+            if not isinstance(arg, tuple):
+                arg = (arg, )
             try:
-                v = json.loads(v)
-                spec = ServiceSpec.from_json(v['spec'])
-                created = datetime.datetime.strptime(v['created'], DATEFMT)
-                self.specs[service_name] = spec
-                self.spec_created[service_name] = created
-                self.mgr.log.debug('SpecStore: loaded spec for %s' % (
-                    service_name))
+                if self:
+                    return f(self, *arg)
+                return f(*arg)
             except Exception as e:
-                self.mgr.log.warning('unable to load spec for %s: %s' % (
-                    service_name, e))
-                pass
-
-    def save(self, spec):
-        # type: (ServiceSpec) -> None
-        self.specs[spec.service_name()] = spec
-        self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
-        self.mgr.set_store(
-            SPEC_STORE_PREFIX + spec.service_name(),
-            json.dumps({
-                'spec': spec.to_json(),
-                'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
-            }, sort_keys=True),
-        )
-
-    def rm(self, service_name):
-        # type: (str) -> bool
-        found = service_name in self.specs
-        if found:
-            del self.specs[service_name]
-            del self.spec_created[service_name]
-            self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
-        return found
-
-    def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
-        specs = []
-        for sn, spec in self.specs.items():
-            if not service_name or \
-                    sn == service_name or \
-                    sn.startswith(service_name + '.'):
-                specs.append(spec)
-        self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
-            service_name, specs))
-        return specs
-
-class HostCache():
-    def __init__(self, mgr):
-        # type: (CephadmOrchestrator) -> None
-        self.mgr: CephadmOrchestrator = mgr
-        self.daemons = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
-        self.last_daemon_update = {}   # type: Dict[str, datetime.datetime]
-        self.devices = {}              # type: Dict[str, List[inventory.Device]]
-        self.networks = {}             # type: Dict[str, Dict[str, List[str]]]
-        self.last_device_update = {}   # type: Dict[str, datetime.datetime]
-        self.daemon_refresh_queue = [] # type: List[str]
-        self.device_refresh_queue = [] # type: List[str]
-        self.daemon_config_deps = {}   # type: Dict[str, Dict[str, Dict[str,Any]]]
-        self.last_host_check = {}      # type: Dict[str, datetime.datetime]
-
-    def load(self):
-        # type: () -> None
-        for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
-            host = k[len(HOST_CACHE_PREFIX):]
-            if host not in self.mgr.inventory:
-                self.mgr.log.warning('removing stray HostCache host record %s' % (
-                    host))
-                self.mgr.set_store(k, None)
-            try:
-                j = json.loads(v)
-                if 'last_device_update' in j:
-                    self.last_device_update[host] = datetime.datetime.strptime(
-                        j['last_device_update'], DATEFMT)
-                else:
-                    self.device_refresh_queue.append(host)
-                # for services, we ignore the persisted last_*_update
-                # and always trigger a new scrape on mgr restart.
-                self.daemon_refresh_queue.append(host)
-                self.daemons[host] = {}
-                self.devices[host] = []
-                self.networks[host] = {}
-                self.daemon_config_deps[host] = {}
-                for name, d in j.get('daemons', {}).items():
-                    self.daemons[host][name] = \
-                        orchestrator.DaemonDescription.from_json(d)
-                for d in j.get('devices', []):
-                    self.devices[host].append(inventory.Device.from_json(d))
-                self.networks[host] = j.get('networks', {})
-                for name, d in j.get('daemon_config_deps', {}).items():
-                    self.daemon_config_deps[host][name] = {
-                        'deps': d.get('deps', []),
-                        'last_config': datetime.datetime.strptime(
-                            d['last_config'], DATEFMT),
-                    }
-                if 'last_host_check' in j:
-                    self.last_host_check[host] = datetime.datetime.strptime(
-                        j['last_host_check'], DATEFMT)
-                self.mgr.log.debug(
-                    'HostCache.load: host %s has %d daemons, '
-                    '%d devices, %d networks' % (
-                        host, len(self.daemons[host]), len(self.devices[host]),
-                        len(self.networks[host])))
-            except Exception as e:
-                self.mgr.log.warning('unable to load cached state for %s: %s' % (
-                    host, e))
-                pass
-
-    def update_host_daemons(self, host, dm):
-        # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
-        self.daemons[host] = dm
-        self.last_daemon_update[host] = datetime.datetime.utcnow()
-
-    def update_host_devices_networks(self, host, dls, nets):
-        # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
-        self.devices[host] = dls
-        self.networks[host] = nets
-        self.last_device_update[host] = datetime.datetime.utcnow()
-
-    def update_daemon_config_deps(self, host, name, deps, stamp):
-        self.daemon_config_deps[host][name] = {
-            'deps': deps,
-            'last_config': stamp,
-        }
-
-    def update_last_host_check(self, host):
-        # type: (str) -> None
-        self.last_host_check[host] = datetime.datetime.utcnow()
-
-    def prime_empty_host(self, host):
-        # type: (str) -> None
-        """
-        Install an empty entry for a host
-        """
-        self.daemons[host] = {}
-        self.devices[host] = []
-        self.networks[host] = {}
-        self.daemon_config_deps[host] = {}
-        self.daemon_refresh_queue.append(host)
-        self.device_refresh_queue.append(host)
-
-    def invalidate_host_daemons(self, host):
-        # type: (str) -> None
-        self.daemon_refresh_queue.append(host)
-        if host in self.last_daemon_update:
-            del self.last_daemon_update[host]
-        self.mgr.event.set()
-
-    def invalidate_host_devices(self, host):
-        # type: (str) -> None
-        self.device_refresh_queue.append(host)
-        if host in self.last_device_update:
-            del self.last_device_update[host]
-        self.mgr.event.set()
-
-    def save_host(self, host):
-        # type: (str) -> None
-        j = {   # type: ignore
-            'daemons': {},
-            'devices': [],
-            'daemon_config_deps': {},
-        }
-        if host in self.last_daemon_update:
-            j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
-        if host in self.last_device_update:
-            j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
-        for name, dd in self.daemons[host].items():
-            j['daemons'][name] = dd.to_json()  # type: ignore
-        for d in self.devices[host]:
-            j['devices'].append(d.to_json())  # type: ignore
-        j['networks'] = self.networks[host]
-        for name, depi in self.daemon_config_deps[host].items():
-            j['daemon_config_deps'][name] = {   # type: ignore
-                'deps': depi.get('deps', []),
-                'last_config': depi['last_config'].strftime(DATEFMT),
-            }
-        if host in self.last_host_check:
-            j['last_host_check']= self.last_host_check[host].strftime(DATEFMT)
-        self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
-
-    def rm_host(self, host):
-        # type: (str) -> None
-        if host in self.daemons:
-            del self.daemons[host]
-        if host in self.devices:
-            del self.devices[host]
-        if host in self.networks:
-            del self.networks[host]
-        if host in self.last_daemon_update:
-            del self.last_daemon_update[host]
-        if host in self.last_device_update:
-            del self.last_device_update[host]
-        if host in self.daemon_config_deps:
-            del self.daemon_config_deps[host]
-        self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
-
-    def get_hosts(self):
-        # type: () -> List[str]
-        r = []
-        for host, di in self.daemons.items():
-            r.append(host)
-        return r
-
-    def get_daemons(self):
-        # type: () -> List[orchestrator.DaemonDescription]
-        r = []
-        for host, dm in self.daemons.items():
-            for name, dd in dm.items():
-                r.append(dd)
-        return r
-
-    def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
-        for host, dm in self.daemons.items():
-            if host in self.mgr.offline_hosts:
-                def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
-                    ret = copy(dd)
-                    ret.status = -1
-                    ret.status_desc = 'host is offline'
-                    return ret
-                yield host, {name: set_offline(d) for name, d in dm.items()}
-            else:
-                yield host, dm
-
-    def get_daemons_by_service(self, service_name):
-        # type: (str) -> List[orchestrator.DaemonDescription]
-        result = []   # type: List[orchestrator.DaemonDescription]
-        for host, dm in self.daemons.items():
-            for name, d in dm.items():
-                if name.startswith(service_name + '.'):
-                    result.append(d)
-        return result
-
-    def get_daemon_names(self):
-        # type: () -> List[str]
-        r = []
-        for host, dm in self.daemons.items():
-            for name, dd in dm.items():
-                r.append(name)
-        return r
-
-    def get_daemon_last_config_deps(self, host, name):
-        if host in self.daemon_config_deps:
-            if name in self.daemon_config_deps[host]:
-                return self.daemon_config_deps[host][name].get('deps', []), \
-                    self.daemon_config_deps[host][name].get('last_config', None)
-        return None, None
-
-    def host_needs_daemon_refresh(self, host):
-        # type: (str) -> bool
-        if host in self.mgr.offline_hosts:
-            logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
-            return False
-        if host in self.daemon_refresh_queue:
-            self.daemon_refresh_queue.remove(host)
-            return True
-        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
-            seconds=self.mgr.daemon_cache_timeout)
-        if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
-            return True
-        return False
-
-    def host_needs_device_refresh(self, host):
-        # type: (str) -> bool
-        if host in self.mgr.offline_hosts:
-            logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
-            return False
-        if host in self.device_refresh_queue:
-            self.device_refresh_queue.remove(host)
-            return True
-        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
-            seconds=self.mgr.device_cache_timeout)
-        if host not in self.last_device_update or self.last_device_update[host] < cutoff:
-            return True
-        return False
-
-    def host_needs_check(self, host):
-        # type: (str) -> bool
-        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
-            seconds=self.mgr.host_check_interval)
-        return host not in self.last_host_check or self.last_host_check[host] < cutoff
-
-    def add_daemon(self, host, dd):
-        # type: (str, orchestrator.DaemonDescription) -> None
-        assert host in self.daemons
-        self.daemons[host][dd.name()] = dd
-
-    def rm_daemon(self, host, name):
-        if host in self.daemons:
-            if name in self.daemons[host]:
-                del self.daemons[host][name]
-
-
-class AsyncCompletion(orchestrator.Completion):
-    def __init__(self,
-                 _first_promise=None,  # type: Optional[orchestrator.Completion]
-                 value=orchestrator._Promise.NO_RESULT,  # type: Any
-                 on_complete=None,  # type: Optional[Callable]
-                 name=None,  # type: Optional[str]
-                 many=False, # type: bool
-                 update_progress=False,  # type: bool
-                 ):
+                logger.exception(f'executing {f.__name__}({args}) failed.')
+                raise
 
         assert CephadmOrchestrator.instance is not None
-        self.many = many
-        self.update_progress = update_progress
-        if name is None and on_complete is not None:
-            name = getattr(on_complete, '__name__', None)
-        super(AsyncCompletion, self).__init__(_first_promise, value, on_complete, name)
-
-    @property
-    def _progress_reference(self):
-        # type: () -> Optional[orchestrator.ProgressReference]
-        if hasattr(self._on_complete_, 'progress_id'):  # type: ignore
-            return self._on_complete_  # type: ignore
-        return None
-
-    @property
-    def _on_complete(self):
-        # type: () -> Optional[Callable]
-        if self._on_complete_ is None:
-            return None
-
-        def callback(result):
-            try:
-                if self.update_progress:
-                    assert self.progress_reference
-                    self.progress_reference.progress = 1.0
-                self._on_complete_ = None
-                self._finalize(result)
-            except Exception as e:
-                try:
-                    self.fail(e)
-                except Exception:
-                    logger.exception(f'failed to fail AsyncCompletion: >{repr(self)}<')
-                    if 'UNITTEST' in os.environ:
-                        assert False
-
-        def error_callback(e):
-            pass
-
-        def run(value):
-            def do_work(*args, **kwargs):
-                assert self._on_complete_ is not None
-                try:
-                    res = self._on_complete_(*args, **kwargs)
-                    if self.update_progress and self.many:
-                        assert self.progress_reference
-                        self.progress_reference.progress += 1.0 / len(value)
-                    return res
-                except Exception as e:
-                    self.fail(e)
-                    raise
-
-            assert CephadmOrchestrator.instance
-            if self.many:
-                if not value:
-                    logger.info('calling map_async without values')
-                    callback([])
-                if six.PY3:
-                    CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
-                                                                    callback=callback,
-                                                                    error_callback=error_callback)
-                else:
-                    CephadmOrchestrator.instance._worker_pool.map_async(do_work, value,
-                                                                    callback=callback)
-            else:
-                if six.PY3:
-                    CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
-                                                                      callback=callback, error_callback=error_callback)
-                else:
-                    CephadmOrchestrator.instance._worker_pool.apply_async(do_work, (value,),
-                                                                      callback=callback)
-            return self.ASYNC_RESULT
-
-        return run
-
-    @_on_complete.setter
-    def _on_complete(self, inner):
-        # type: (Callable) -> None
-        self._on_complete_ = inner
-
-
-def ssh_completion(cls=AsyncCompletion, **c_kwargs):
-    # type: (Type[orchestrator.Completion], Any) -> Callable
-    """
-    See ./HACKING.rst for a how-to
-    """
-    def decorator(f):
-        @wraps(f)
-        def wrapper(*args):
-
-            name = f.__name__
-            many = c_kwargs.get('many', False)
-
-            # Some weired logic to make calling functions with multiple arguments work.
-            if len(args) == 1:
-                [value] = args
-                if many and value and isinstance(value[0], tuple):
-                    return cls(on_complete=lambda x: f(*x), value=value, name=name, **c_kwargs)
-                else:
-                    return cls(on_complete=f, value=value, name=name, **c_kwargs)
-            else:
-                if many:
-                    self, value = args
+        return CephadmOrchestrator.instance._worker_pool.map(do_work, vals)
 
-                    def call_self(inner_args):
-                        if not isinstance(inner_args, tuple):
-                            inner_args = (inner_args, )
-                        return f(self, *inner_args)
 
-                    return cls(on_complete=call_self, value=value, name=name, **c_kwargs)
-                else:
-                    return cls(on_complete=lambda x: f(*x), value=args, name=name, **c_kwargs)
+    return forall_hosts_wrapper
 
-        return wrapper
-    return decorator
 
+class CephadmCompletion(orchestrator.Completion):
+    def evaluate(self):
+        self.finalize(None)
 
-def async_completion(f):
-    # type: (Callable) -> Callable[..., AsyncCompletion]
+def trivial_completion(f: Callable) -> Callable[..., CephadmCompletion]:
     """
-    See ./HACKING.rst for a how-to
-
-    :param f: wrapped function
-    """
-    return ssh_completion()(f)
-
-
-def async_map_completion(f):
-    # type: (Callable) -> Callable[..., AsyncCompletion]
-    """
-    See ./HACKING.rst for a how-to
-
-    :param f: wrapped function
-
-    kind of similar to
-
-    >>> def sync_map(f):
-    ...     return lambda x: map(f, x)
-
+    Decorator to make CephadmCompletion methods return
+    a completion object that executes themselves.
     """
-    return ssh_completion(many=True)(f)
 
-
-def trivial_completion(f):
-    # type: (Callable) -> Callable[..., orchestrator.Completion]
     @wraps(f)
     def wrapper(*args, **kwargs):
-        return AsyncCompletion(value=f(*args, **kwargs), name=f.__name__)
+        return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs))
+
     return wrapper
 
 
@@ -607,6 +177,26 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             'desc': 'Container image name, without the tag',
             'runtime': True,
         },
+        {
+            'name': 'container_image_prometheus',
+            'default': 'prom/prometheus:v2.18.1',
+            'desc': 'Prometheus container image',
+        },
+        {
+            'name': 'container_image_grafana',
+            'default': 'ceph/ceph-grafana:latest',
+            'desc': 'Prometheus container image',
+        },
+        {
+            'name': 'container_image_alertmanager',
+            'default': 'prom/alertmanager:v0.20.0',
+            'desc': 'Prometheus container image',
+        },
+        {
+            'name': 'container_image_node_exporter',
+            'default': 'prom/node-exporter:v0.18.1',
+            'desc': 'Prometheus container image',
+        },
         {
             'name': 'warn_on_stray_hosts',
             'type': 'bool',
@@ -672,6 +262,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self.host_check_interval = 0
             self.mode = ''
             self.container_image_base = ''
+            self.container_image_prometheus = ''
+            self.container_image_grafana = ''
+            self.container_image_alertmanager = ''
+            self.container_image_node_exporter = ''
             self.warn_on_stray_hosts = True
             self.warn_on_stray_daemons = True
             self.warn_on_failed_host_check = True
@@ -696,23 +290,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         CephadmOrchestrator.instance = self
 
-        t = self.get_store('upgrade_state')
-        if t:
-            self.upgrade_state = json.loads(t)
-        else:
-            self.upgrade_state = None
+        self.upgrade = CephadmUpgrade(self)
 
         self.health_checks = {}
 
         self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
-        # load inventory
-        i = self.get_store('inventory')
-        if i:
-            self.inventory: Dict[str, dict] = json.loads(i)
-        else:
-            self.inventory = dict()
-        self.log.debug('Loaded inventory %s' % self.inventory)
+        self.inventory = Inventory(self)
 
         self.cache = HostCache(self)
         self.cache.load()
@@ -732,6 +316,38 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         # in-memory only.
         self.offline_hosts: Set[str] = set()
 
+        # services:
+        self.osd_service = OSDService(self)
+        self.nfs_service = NFSService(self)
+        self.mon_service = MonService(self)
+        self.mgr_service = MgrService(self)
+        self.mds_service = MdsService(self)
+        self.rgw_service = RgwService(self)
+        self.rbd_mirror_service = RbdMirrorService(self)
+        self.grafana_service = GrafanaService(self)
+        self.alertmanager_service = AlertmanagerService(self)
+        self.prometheus_service = PrometheusService(self)
+        self.node_exporter_service = NodeExporterService(self)
+        self.crash_service = CrashService(self)
+        self.iscsi_service = IscsiService(self)
+        self.cephadm_services = {
+            'mon': self.mon_service,
+            'mgr': self.mgr_service,
+            'osd': self.osd_service,
+            'mds': self.mds_service,
+            'rgw': self.rgw_service,
+            'rbd-mirror': self.rbd_mirror_service,
+            'nfs': self.nfs_service,
+            'grafana': self.grafana_service,
+            'alertmanager': self.alertmanager_service,
+            'prometheus': self.prometheus_service,
+            'node-exporter': self.node_exporter_service,
+            'crash': self.crash_service,
+            'iscsi': self.iscsi_service,
+        }
+
+        self.template = TemplateMgr()
+
     def shutdown(self):
         self.log.debug('shutdown')
         self._worker_pool.close()
@@ -739,17 +355,19 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.run = False
         self.event.set()
 
+    def _get_cephadm_service(self, service_type: str) -> CephadmService:
+        assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES
+        return self.cephadm_services[service_type]
+
     def _kick_serve_loop(self):
         self.log.debug('_kick_serve_loop')
         self.event.set()
 
     def _check_safe_to_destroy_mon(self, mon_id):
         # type: (str) -> None
-        ret, out, err = self.mon_command({
+        ret, out, err = self.check_mon_command({
             'prefix': 'quorum_status',
         })
-        if ret:
-            raise OrchestratorError('failed to check mon quorum status')
         try:
             j = json.loads(out)
         except Exception as e:
@@ -767,261 +385,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             return
         raise OrchestratorError('Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
 
-    def _wait_for_ok_to_stop(self, s):
-        # only wait a little bit; the service might go away for something
-        tries = 4
-        while tries > 0:
-            if s.daemon_type not in ['mon', 'osd', 'mds']:
-                self.log.info('Upgrade: It is presumed safe to stop %s.%s' %
-                              (s.daemon_type, s.daemon_id))
-                return True
-            ret, out, err = self.mon_command({
-                'prefix': '%s ok-to-stop' % s.daemon_type,
-                'ids': [s.daemon_id],
-            })
-            if not self.upgrade_state or self.upgrade_state.get('paused'):
-                return False
-            if ret:
-                self.log.info('Upgrade: It is NOT safe to stop %s.%s' %
-                              (s.daemon_type, s.daemon_id))
-                time.sleep(15)
-                tries -= 1
-            else:
-                self.log.info('Upgrade: It is safe to stop %s.%s' %
-                              (s.daemon_type, s.daemon_id))
-                return True
-        return False
-
-    def _clear_upgrade_health_checks(self):
-        for k in ['UPGRADE_NO_STANDBY_MGR',
-                  'UPGRADE_FAILED_PULL']:
-            if k in self.health_checks:
-                del self.health_checks[k]
-        self.set_health_checks(self.health_checks)
-
-    def _fail_upgrade(self, alert_id, alert):
-        self.log.error('Upgrade: Paused due to %s: %s' % (alert_id,
-                                                          alert['summary']))
-        self.upgrade_state['error'] = alert_id + ': ' + alert['summary']
-        self.upgrade_state['paused'] = True
-        self._save_upgrade_state()
-        self.health_checks[alert_id] = alert
-        self.set_health_checks(self.health_checks)
-
-    def _update_upgrade_progress(self, progress):
-        if 'progress_id' not in self.upgrade_state:
-            self.upgrade_state['progress_id'] = str(uuid.uuid4())
-            self._save_upgrade_state()
-        self.remote('progress', 'update', self.upgrade_state['progress_id'],
-                    ev_msg='Upgrade to %s' % self.upgrade_state['target_name'],
-                    ev_progress=progress)
-
-    def _do_upgrade(self):
-        # type: () -> None
-        if not self.upgrade_state:
-            self.log.debug('_do_upgrade no state, exiting')
-            return
-
-        target_name = self.upgrade_state.get('target_name')
-        target_id = self.upgrade_state.get('target_id', None)
-        if not target_id:
-            # need to learn the container hash
-            self.log.info('Upgrade: First pull of %s' % target_name)
-            try:
-                target_id, target_version = self._get_container_image_id(target_name)
-            except OrchestratorError as e:
-                self._fail_upgrade('UPGRADE_FAILED_PULL', {
-                    'severity': 'warning',
-                    'summary': 'Upgrade: failed to pull target image',
-                    'count': 1,
-                    'detail': [str(e)],
-                })
-                return
-            self.upgrade_state['target_id'] = target_id
-            self.upgrade_state['target_version'] = target_version
-            self._save_upgrade_state()
-        target_version = self.upgrade_state.get('target_version')
-        self.log.info('Upgrade: Target is %s with id %s' % (target_name,
-                                                            target_id))
-
-        # get all distinct container_image settings
-        image_settings = {}
-        ret, out, err = self.mon_command({
-            'prefix': 'config dump',
-            'format': 'json',
-        })
-        config = json.loads(out)
-        for opt in config:
-            if opt['name'] == 'container_image':
-                image_settings[opt['section']] = opt['value']
-
-        daemons = self.cache.get_daemons()
-        done = 0
-        for daemon_type in CEPH_UPGRADE_ORDER:
-            self.log.info('Upgrade: Checking %s daemons...' % daemon_type)
-            need_upgrade_self = False
-            for d in daemons:
-                if d.daemon_type != daemon_type:
-                    continue
-                if d.container_image_id == target_id:
-                    self.log.debug('daemon %s.%s version correct' % (
-                        daemon_type, d.daemon_id))
-                    done += 1
-                    continue
-                self.log.debug('daemon %s.%s not correct (%s, %s, %s)' % (
-                    daemon_type, d.daemon_id,
-                    d.container_image_name, d.container_image_id, d.version))
-
-                if daemon_type == 'mgr' and \
-                   d.daemon_id == self.get_mgr_id():
-                    self.log.info('Upgrade: Need to upgrade myself (mgr.%s)' %
-                                  self.get_mgr_id())
-                    need_upgrade_self = True
-                    continue
-
-                # make sure host has latest container image
-                out, err, code = self._run_cephadm(
-                    d.hostname, None, 'inspect-image', [],
-                    image=target_name, no_fsid=True, error_ok=True)
-                if code or json.loads(''.join(out)).get('image_id') != target_id:
-                    self.log.info('Upgrade: Pulling %s on %s' % (target_name,
-                                                                 d.hostname))
-                    out, err, code = self._run_cephadm(
-                        d.hostname, None, 'pull', [],
-                        image=target_name, no_fsid=True, error_ok=True)
-                    if code:
-                        self._fail_upgrade('UPGRADE_FAILED_PULL', {
-                            'severity': 'warning',
-                            'summary': 'Upgrade: failed to pull target image',
-                            'count': 1,
-                            'detail': [
-                                'failed to pull %s on host %s' % (target_name,
-                                                                  d.hostname)],
-                        })
-                        return
-                    r = json.loads(''.join(out))
-                    if r.get('image_id') != target_id:
-                        self.log.info('Upgrade: image %s pull on %s got new image %s (not %s), restarting' % (target_name, d.hostname, r['image_id'], target_id))
-                        self.upgrade_state['target_id'] = r['image_id']
-                        self._save_upgrade_state()
-                        return
-
-                self._update_upgrade_progress(done / len(daemons))
-
-                if not d.container_image_id:
-                    if d.container_image_name == target_name:
-                        self.log.debug('daemon %s has unknown container_image_id but has correct image name' % (d.name()))
-                        continue
-                if not self._wait_for_ok_to_stop(d):
-                    return
-                self.log.info('Upgrade: Redeploying %s.%s' %
-                              (d.daemon_type, d.daemon_id))
-                ret, out, err = self.mon_command({
-                    'prefix': 'config set',
-                    'name': 'container_image',
-                    'value': target_name,
-                    'who': utils.name_to_config_section(daemon_type + '.' + d.daemon_id),
-                })
-                self._daemon_action(
-                    d.daemon_type,
-                    d.daemon_id,
-                    d.hostname,
-                    'redeploy'
-                )
-                return
-
-            if need_upgrade_self:
-                mgr_map = self.get('mgr_map')
-                num = len(mgr_map.get('standbys'))
-                if not num:
-                    self._fail_upgrade('UPGRADE_NO_STANDBY_MGR', {
-                        'severity': 'warning',
-                        'summary': 'Upgrade: Need standby mgr daemon',
-                        'count': 1,
-                        'detail': [
-                            'The upgrade process needs to upgrade the mgr, '
-                            'but it needs at least one standby to proceed.',
-                        ],
-                    })
-                    return
-
-                self.log.info('Upgrade: there are %d other already-upgraded '
-                              'standby mgrs, failing over' % num)
-
-                self._update_upgrade_progress(done / len(daemons))
-
-                # fail over
-                ret, out, err = self.mon_command({
-                    'prefix': 'mgr fail',
-                    'who': self.get_mgr_id(),
-                })
-                return
-            elif daemon_type == 'mgr':
-                if 'UPGRADE_NO_STANDBY_MGR' in self.health_checks:
-                    del self.health_checks['UPGRADE_NO_STANDBY_MGR']
-                    self.set_health_checks(self.health_checks)
-
-            # make sure 'ceph versions' agrees
-            ret, out, err = self.mon_command({
-                'prefix': 'versions',
-            })
-            j = json.loads(out)
-            for version, count in j.get(daemon_type, {}).items():
-                if version != target_version:
-                    self.log.warning(
-                        'Upgrade: %d %s daemon(s) are %s != target %s' %
-                        (count, daemon_type, version, target_version))
-
-            # push down configs
-            if image_settings.get(daemon_type) != target_name:
-                self.log.info('Upgrade: Setting container_image for all %s...' %
-                              daemon_type)
-                ret, out, err = self.mon_command({
-                    'prefix': 'config set',
-                    'name': 'container_image',
-                    'value': target_name,
-                    'who': daemon_type,
-                })
-            to_clean = []
-            for section in image_settings.keys():
-                if section.startswith(utils.name_to_config_section(daemon_type) + '.'):
-                    to_clean.append(section)
-            if to_clean:
-                self.log.debug('Upgrade: Cleaning up container_image for %s...' %
-                               to_clean)
-                for section in to_clean:
-                    ret, image, err = self.mon_command({
-                        'prefix': 'config rm',
-                        'name': 'container_image',
-                        'who': section,
-                    })
-
-            self.log.info('Upgrade: All %s daemons are up to date.' %
-                          daemon_type)
-
-        # clean up
-        self.log.info('Upgrade: Finalizing container_image settings')
-        ret, out, err = self.mon_command({
-            'prefix': 'config set',
-            'name': 'container_image',
-            'value': target_name,
-            'who': 'global',
-        })
-        for daemon_type in CEPH_UPGRADE_ORDER:
-            ret, image, err = self.mon_command({
-                'prefix': 'config rm',
-                'name': 'container_image',
-                'who': utils.name_to_config_section(daemon_type),
-            })
-
-        self.log.info('Upgrade: Complete!')
-        if 'progress_id' in self.upgrade_state:
-            self.remote('progress', 'complete',
-                        self.upgrade_state['progress_id'])
-        self.upgrade_state = None
-        self._save_upgrade_state()
-        return
-
     def _check_host(self, host):
         if host not in self.inventory:
             return
@@ -1120,6 +483,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     if r:
                         failures.append(r)
 
+                if self.cache.host_needs_osdspec_preview_refresh(host):
+                    self.log.debug(f"refreshing OSDSpec previews for {host}")
+                    r = self._refresh_host_osdspec_previews(host)
+                    if r:
+                        failures.append(r)
+
             health_changed = False
             if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks:
                 del self.health_checks['CEPHADM_HOST_CHECK_FAILED']
@@ -1146,8 +515,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             if health_changed:
                 self.set_health_checks(self.health_checks)
 
-
-
             self._check_for_strays()
 
             if self.paused:
@@ -1170,8 +537,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
                 self._check_daemons()
 
-                if self.upgrade_state and not self.upgrade_state.get('paused'):
-                    self._do_upgrade()
+                if self.upgrade.continue_upgrade():
                     continue
 
             self._serve_sleep()
@@ -1248,20 +614,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 continue
             return name
 
-    def get_service_name(self, daemon_type, daemon_id, host):
-        # type: (str, str, str) -> (str)
-        """
-        Returns the generic service name
-        """
-        p = re.compile(r'(.*)\.%s.*' % (host))
-        return '%s.%s' % (daemon_type, p.sub(r'\1', daemon_id))
-
-    def _save_inventory(self):
-        self.set_store('inventory', json.dumps(self.inventory))
-
-    def _save_upgrade_state(self):
-        self.set_store('upgrade_state', json.dumps(self.upgrade_state))
-
     def _reconfig_ssh(self):
         temp_files = []  # type: list
         ssh_options = []  # type: List[str]
@@ -1358,21 +710,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self.log.debug("process: completions={0}".format(orchestrator.pretty_print(completions)))
 
             for p in completions:
-                p.finalize()
-
-    def _require_hosts(self, hosts):
-        """
-        Raise an error if any of the given hosts are unregistered.
-        """
-        if isinstance(hosts, six.string_types):
-            hosts = [hosts]
-        keys = self.inventory.keys()
-        unregistered_hosts = set(hosts) - keys
-        if unregistered_hosts:
-            logger.warning('keys = {}'.format(keys))
-            raise RuntimeError("Host(s) {} not registered".format(
-                ", ".join(map(lambda h: "'{}'".format(h),
-                    unregistered_hosts))))
+                p.evaluate()
 
     @orchestrator._cli_write_command(
         prefix='cephadm set-ssh-config',
@@ -1445,6 +783,28 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self._reconfig_ssh()
         return 0, '', ''
 
+    @orchestrator._cli_write_command(
+        'cephadm set-priv-key',
+        desc='Set cluster SSH private key (use -i <private_key>)')
+    def _set_priv_key(self, inbuf=None):
+        if inbuf is None or len(inbuf) == 0:
+            return -errno.EINVAL, "", "empty private ssh key provided"
+        self.set_store("ssh_identity_key", inbuf)
+        self.log.info('Set ssh private key')
+        self._reconfig_ssh()
+        return 0, "", ""
+
+    @orchestrator._cli_write_command(
+        'cephadm set-pub-key',
+        desc='Set cluster SSH public key (use -i <public_key>)')
+    def _set_pub_key(self, inbuf=None):
+        if inbuf is None or len(inbuf) == 0:
+            return -errno.EINVAL, "", "empty public ssh key provided"
+        self.set_store("ssh_identity_pub", inbuf)
+        self.log.info('Set ssh public key')
+        self._reconfig_ssh()
+        return 0, "", ""
+
     @orchestrator._cli_write_command(
         'cephadm clear-key',
         desc='Clear cluster SSH key')
@@ -1514,10 +874,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         """
         Setup a connection for running commands on remote host.
         """
-        conn_and_r = self._cons.get(host)
-        if conn_and_r:
-            self.log.debug('Have connection to %s' % host)
-            return conn_and_r
+        conn, r = self._cons.get(host, (None, None))
+        if conn:
+            if conn.has_connection():
+                self.log.debug('Have connection to %s' % host)
+                return conn, r
+            else:
+                self._reset_con(host)
         n = self.ssh_user + '@' + host
         self.log.debug("Opening connection to {} with ssh options '{}'".format(
             n, self._ssh_options))
@@ -1549,45 +912,67 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             executable_path))
         return executable_path
 
-    def _run_cephadm(self, host, entity, command, args,
-                     addr=None,
-                     stdin=None,
+    def _run_cephadm(self,
+                     host: str,
+                     entity: Optional[str],
+                     command: str,
+                     args: List[str],
+                     addr: Optional[str] = None,
+                     stdin: Optional[str] = None,
                      no_fsid=False,
                      error_ok=False,
-                     image=None):
-        # type: (str, Optional[str], str, List[str], Optional[str], Optional[str], bool, bool, Optional[str]) -> Tuple[List[str], List[str], int]
+                     image: Optional[str] = None,
+                     env_vars: Optional[List[str]] = None,
+                     ) -> Tuple[List[str], List[str], int]:
         """
         Run cephadm on the remote host with the given command + args
+
+        :env_vars: in format -> [KEY=VALUE, ..]
         """
         if not addr and host in self.inventory:
-            addr = self.inventory[host].get('addr', host)
+            addr = self.inventory.get_addr(host)
 
         self.offline_hosts_remove(host)
 
         try:
             try:
                 conn, connr = self._get_connection(addr)
-            except IOError as e:
+            except OSError as e:
                 if error_ok:
                     self.log.exception('failed to establish ssh connection')
                     return [], [str("Can't communicate with remote host, possibly because python3 is not installed there")], 1
-                raise
+                raise execnet.gateway_bootstrap.HostNotFound(str(e)) from e
 
             assert image or entity
             if not image:
-                daemon_type = entity.split('.', 1)[0] # type: ignore
+                daemon_type = entity.split('.', 1)[0]  # type: ignore
                 if daemon_type in CEPH_TYPES or \
-                        daemon_type == 'nfs':
+                        daemon_type == 'nfs' or \
+                        daemon_type == 'iscsi':
                     # get container image
-                    ret, image, err = self.mon_command({
+                    ret, image, err = self.check_mon_command({
                         'prefix': 'config get',
                         'who': utils.name_to_config_section(entity),
                         'key': 'container_image',
                     })
-                    image = image.strip() # type: ignore
+                    image = image.strip()  # type: ignore
+                elif daemon_type == 'prometheus':
+                    image = self.container_image_prometheus
+                elif daemon_type == 'grafana':
+                    image = self.container_image_grafana
+                elif daemon_type == 'alertmanager':
+                    image = self.container_image_alertmanager
+                elif daemon_type == 'node-exporter':
+                    image = self.container_image_node_exporter
+
             self.log.debug('%s container image %s' % (entity, image))
 
             final_args = []
+
+            if env_vars:
+                for env_var_pair in env_vars:
+                    final_args.extend(['--env', env_var_pair])
+
             if image:
                 final_args.extend(['--image', image])
             final_args.append(command)
@@ -1596,8 +981,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 final_args += ['--fsid', self._cluster_fsid]
             final_args += args
 
+            self.log.debug('args: %s' % (' '.join(final_args)))
             if self.mode == 'root':
-                self.log.debug('args: %s' % (' '.join(final_args)))
                 if stdin:
                     self.log.debug('stdin: %s' % stdin)
                 script = 'injected_argv = ' + json.dumps(final_args) + '\n'
@@ -1650,25 +1035,22 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             # do with "host not found" (e.g., ssh key permission denied).
             self.offline_hosts.add(host)
             user = 'root' if self.mode == 'root' else 'cephadm'
-            msg = f'Failed to connect to {host} ({addr}).  ' \
-                  f'Check that the host is reachable and accepts connections using the cephadm SSH key\n' \
-                  f'you may want to run: \n' \
-                  f'> ssh -F =(ceph cephadm get-ssh-config) -i =(ceph config-key get mgr/cephadm/ssh_identity_key) {user}@{host}'
+            msg = f'''Failed to connect to {host} ({addr}).
+Check that the host is reachable and accepts connections using the cephadm SSH key
+
+you may want to run:
+> ceph cephadm get-ssh-config > ssh_config
+> ceph config-key get mgr/cephadm/ssh_identity_key > key
+> ssh -F ssh_config -i key {user}@{host}'''
             raise OrchestratorError(msg) from e
         except Exception as ex:
             self.log.exception(ex)
             raise
 
-    def _get_hosts(self, label=None):
-        # type: (Optional[str]) -> List[str]
-        r = []
-        for h, hostspec in self.inventory.items():
-            if not label or label in hostspec.get('labels', []):
-                r.append(h)
-        return r
+    def _get_hosts(self, label: Optional[str] = '', as_hostspec: bool = False) -> List:
+        return list(self.inventory.filter_by_label(label=label, as_hostspec=as_hostspec))
 
-    @async_completion
-    def add_host(self, spec):
+    def _add_host(self, spec):
         # type: (HostSpec) -> str
         """
         Add a host to be managed by the orchestrator.
@@ -1684,15 +1066,18 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             raise OrchestratorError('New host %s (%s) failed check: %s' % (
                 spec.hostname, spec.addr, err))
 
-        self.inventory[spec.hostname] = spec.to_json()
-        self._save_inventory()
+        self.inventory.add_host(spec)
         self.cache.prime_empty_host(spec.hostname)
         self.offline_hosts_remove(spec.hostname)
         self.event.set()  # refresh stray health check
         self.log.info('Added host %s' % spec.hostname)
         return "Added host '{}'".format(spec.hostname)
 
-    @async_completion
+    @trivial_completion
+    def add_host(self, spec: HostSpec) -> str:
+        return self._add_host(spec)
+
+    @trivial_completion
     def remove_host(self, host):
         # type: (str) -> str
         """
@@ -1700,20 +1085,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         :param host: host name
         """
-        del self.inventory[host]
-        self._save_inventory()
+        self.inventory.rm_host(host)
         self.cache.rm_host(host)
         self._reset_con(host)
         self.event.set()  # refresh stray health check
         self.log.info('Removed host %s' % host)
         return "Removed host '{}'".format(host)
 
-    @async_completion
+    @trivial_completion
     def update_host_addr(self, host, addr):
-        if host not in self.inventory:
-            raise OrchestratorError('host %s not registered' % host)
-        self.inventory[host]['addr'] = addr
-        self._save_inventory()
+        self.inventory.set_addr(host, addr)
         self._reset_con(host)
         self.event.set()  # refresh stray health check
         self.log.info('Set host %s addr to %s' % (host, addr))
@@ -1728,42 +1109,38 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         Notes:
           - skip async: manager reads from cache.
         """
-        r = []
-        for hostname, info in self.inventory.items():
-            r.append(orchestrator.HostSpec(
-                hostname,
-                addr=info.get('addr', hostname),
-                labels=info.get('labels', []),
-                status='Offline' if hostname in self.offline_hosts else info.get('status', ''),
-            ))
-        return r
+        return list(self.inventory.all_specs())
 
-    @async_completion
+    @trivial_completion
     def add_host_label(self, host, label):
-        if host not in self.inventory:
-            raise OrchestratorError('host %s does not exist' % host)
-
-        if 'labels' not in self.inventory[host]:
-            self.inventory[host]['labels'] = list()
-        if label not in self.inventory[host]['labels']:
-            self.inventory[host]['labels'].append(label)
-        self._save_inventory()
+        self.inventory.add_label(host, label)
         self.log.info('Added label %s to host %s' % (label, host))
         return 'Added label %s to host %s' % (label, host)
 
-    @async_completion
+    @trivial_completion
     def remove_host_label(self, host, label):
-        if host not in self.inventory:
-            raise OrchestratorError('host %s does not exist' % host)
-
-        if 'labels' not in self.inventory[host]:
-            self.inventory[host]['labels'] = list()
-        if label in self.inventory[host]['labels']:
-            self.inventory[host]['labels'].remove(label)
-        self._save_inventory()
+        self.inventory.rm_label(host, label)
         self.log.info('Removed label %s to host %s' % (label, host))
         return 'Removed label %s from host %s' % (label, host)
 
+    def update_osdspec_previews(self, search_host: str = ''):
+        # Set global 'pending' flag for host
+        self.cache.loading_osdspec_preview.add(search_host)
+        previews = []
+        # query OSDSpecs for host <search host> and generate/get the preview
+        # There can be multiple previews for one host due to multiple OSDSpecs.
+        previews.extend(self.osd_service.get_previews(search_host))
+        self.log.debug(f"Loading OSDSpec previews to HostCache")
+        self.cache.osdspec_previews[search_host] = previews
+        # Unset global 'pending' flag for host
+        self.cache.loading_osdspec_preview.remove(search_host)
+
+    def _refresh_host_osdspec_previews(self, host) -> bool:
+        self.update_osdspec_previews(host)
+        self.cache.save_host(host)
+        self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
+        return True
+
     def _refresh_host_daemons(self, host):
         try:
             out, err, code = self._run_cephadm(
@@ -1798,6 +1175,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             sd.container_image_name = d.get('container_image_name')
             sd.container_image_id = d.get('container_image_id')
             sd.version = d.get('version')
+            if sd.daemon_type == 'osd':
+                sd.osdspec_affinity = self.osd_service.get_osdspec_affinity(sd.daemon_id)
             if 'state' in d:
                 sd.status_desc = d['state']
                 sd.status = {
@@ -1843,30 +1222,20 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             host, len(devices), len(networks)))
         devices = inventory.Devices.from_json(devices)
         self.cache.update_host_devices_networks(host, devices.devices, networks)
+        self.update_osdspec_previews(host)
         self.cache.save_host(host)
         return None
 
-    def _get_spec_size(self, spec):
-        if spec.placement.count:
-            return spec.placement.count
-        elif spec.placement.host_pattern:
-            return len(spec.placement.pattern_matches_hosts(self.inventory.keys()))
-        elif spec.placement.label:
-            return len(self._get_hosts(spec.placement.label))
-        elif spec.placement.hosts:
-            return len(spec.placement.hosts)
-        # hmm!
-        return 0
-
     @trivial_completion
     def describe_service(self, service_type=None, service_name=None,
                          refresh=False):
         if refresh:
             # ugly sync path, FIXME someday perhaps?
-            for host, hi in self.inventory.items():
+            for host in self.inventory.keys():
                 self._refresh_host_daemons(host)
         # <service_map>
         sm = {}  # type: Dict[str, orchestrator.ServiceDescription]
+        osd_count = 0
         for h, dm in self.cache.get_daemons_with_volatile_status():
             for name, dd in dm.items():
                 if service_type and service_type != dd.daemon_type:
@@ -1875,9 +1244,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 if service_name and service_name != n:
                     continue
                 if dd.daemon_type == 'osd':
-                    continue                # ignore OSDs for now
-                if dd.service_name() in self.spec_store.specs:
-                    spec = self.spec_store.specs[dd.service_name()]
+                    """
+                    OSDs do not know the affinity to their spec out of the box.
+                    """
+                    n = f"osd.{dd.osdspec_affinity}"
+                if n in self.spec_store.specs:
+                    spec = self.spec_store.specs[n]
                 else:
                     spec = ServiceSpec(
                         unmanaged=True,
@@ -1894,9 +1266,19 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                         container_image_name=dd.container_image_name,
                         spec=spec,
                     )
-                if dd.service_name() in self.spec_store.specs:
-                    sm[n].size = self._get_spec_size(spec)
-                    sm[n].created = self.spec_store.spec_created[dd.service_name()]
+                if n in self.spec_store.specs:
+                    if dd.daemon_type == 'osd':
+                        """
+                        The osd count can't be determined by the Placement spec.
+                        It's rather pointless to show a actual/expected representation 
+                        here. So we're setting running = size for now.
+                        """
+                        osd_count += 1
+                        sm[n].size = osd_count
+                    else:
+                        sm[n].size = spec.placement.get_host_selection_size(self._get_hosts)
+
+                    sm[n].created = self.spec_store.spec_created[n]
                     if service_type == 'nfs':
                         spec = cast(NFSServiceSpec, spec)
                         sm[n].rados_config_location = spec.rados_config_location()
@@ -1919,7 +1301,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 continue
             sm[n] = orchestrator.ServiceDescription(
                 spec=spec,
-                size=self._get_spec_size(spec),
+                size=spec.placement.get_host_selection_size(self._get_hosts),
                 running=0,
             )
             if service_type == 'nfs':
@@ -1935,7 +1317,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             if host:
                 self._refresh_host_daemons(host)
             else:
-                for hostname, hi in self.inventory.items():
+                for hostname in self.inventory.keys():
                     self._refresh_host_daemons(hostname)
         result = []
         for h, dm in self.cache.get_daemons_with_volatile_status():
@@ -1951,6 +1333,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 result.append(dd)
         return result
 
+    @trivial_completion
     def service_action(self, action, service_name):
         args = []
         for host, dm in self.cache.daemons.items():
@@ -1961,7 +1344,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.log.info('%s service %s' % (action.capitalize(), service_name))
         return self._daemon_actions(args)
 
-    @async_map_completion
+    @forall_hosts
     def _daemon_actions(self, daemon_type, daemon_id, host, action):
         return self._daemon_action(daemon_type, daemon_id, host, action)
 
@@ -1987,6 +1370,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.cache.invalidate_host_daemons(host)
         return "{} {} from host '{}'".format(action, name, host)
 
+    @trivial_completion
     def daemon_action(self, action, daemon_type, daemon_id):
         args = []
         for host, dm in self.cache.daemons.items():
@@ -2003,8 +1387,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             ','.join(['%s.%s' % (a[0], a[1]) for a in args])))
         return self._daemon_actions(args)
 
+    @trivial_completion
     def remove_daemons(self, names):
-        # type: (List[str]) -> orchestrator.Completion
+        # type: (List[str]) -> List[str]
         args = []
         for host, dm in self.cache.daemons.items():
             for name in names:
@@ -2018,6 +1403,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     @trivial_completion
     def remove_service(self, service_name):
         self.log.info('Remove service %s' % service_name)
+        self._trigger_preview_refresh(service_name=service_name)
         found = self.spec_store.rm(service_name)
         if found:
             self._kick_serve_loop()
@@ -2042,7 +1428,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 for host in host_filter.hosts:
                     self._refresh_host_devices(host)
             else:
-                for host, hi in self.inventory.items():
+                for host in self.inventory.keys():
                     self._refresh_host_devices(host)
 
         result = []
@@ -2065,8 +1451,9 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
         return '\n'.join(out + err)
 
+    @trivial_completion
     def blink_device_light(self, ident_fault, on, locs):
-        @async_map_completion
+        @forall_hosts
         def blink(host, dev, path):
             cmd = [
                 'lsmcli',
@@ -2104,196 +1491,70 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 r[str(osd_id)] = o.get('uuid', '')
         return r
 
+    def resolve_hosts_for_osdspecs(self,
+                                   specs: Optional[List[DriveGroupSpec]] = None,
+                                   service_name: Optional[str] = None
+                                   ) -> List[str]:
+        osdspecs = []
+        if service_name:
+            self.log.debug(f"Looking for OSDSpec with service_name: {service_name}")
+            osdspecs = self.spec_store.find(service_name=service_name)
+            self.log.debug(f"Found OSDSpecs: {osdspecs}")
+        if specs:
+            osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
+        if not service_name and not specs:
+            # if neither parameters are fulfilled, search for all available osdspecs
+            osdspecs = self.spec_store.find(service_name='osd')
+            self.log.debug(f"Found OSDSpecs: {osdspecs}")
+        if not osdspecs:
+            self.log.debug("No OSDSpecs found")
+            return []
+        return sum([spec.placement.filter_matching_hosts(self._get_hosts) for spec in osdspecs], [])
+
+    def resolve_osdspecs_for_host(self, host):
+        matching_specs = []
+        self.log.debug(f"Finding OSDSpecs for host: <{host}>")
+        for spec in self.spec_store.find('osd'):
+            if host in spec.placement.filter_matching_hosts(self._get_hosts):
+                self.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
+                matching_specs.append(spec)
+        return matching_specs
+
+    def _trigger_preview_refresh(self,
+                                 specs: Optional[List[DriveGroupSpec]] = None,
+                                 service_name: Optional[str] = None):
+        refresh_hosts = self.resolve_hosts_for_osdspecs(specs=specs, service_name=service_name)
+        for host in refresh_hosts:
+            self.log.info(f"Marking host: {host} for OSDSpec preview refresh.")
+            self.cache.osdspec_previews_refresh_queue.append(host)
+
     @trivial_completion
     def apply_drivegroups(self, specs: List[DriveGroupSpec]):
+        self._trigger_preview_refresh(specs=specs)
         return [self._apply(spec) for spec in specs]
 
-    def find_destroyed_osds(self) -> Dict[str, List[str]]:
-        osd_host_map: Dict[str, List[str]] = dict()
-        ret, out, err = self.mon_command({
-            'prefix': 'osd tree',
-            'states': ['destroyed'],
-            'format': 'json'
-        })
-        if ret != 0:
-            raise OrchestratorError(f"Caught error on calling 'osd tree destroyed' -> {err}")
-        try:
-            tree = json.loads(out)
-        except json.decoder.JSONDecodeError:
-            self.log.error(f"Could not decode json -> {out}")
-            return osd_host_map
-
-        nodes = tree.get('nodes', {})
-        for node in nodes:
-            if node.get('type') == 'host':
-                osd_host_map.update(
-                    {node.get('name'): [str(_id) for _id in node.get('children', list())]}
-                )
-        return osd_host_map
-
     @trivial_completion
     def create_osds(self, drive_group: DriveGroupSpec):
-        self.log.debug(f"Processing DriveGroup {drive_group}")
-        ret = []
-        drive_group.osd_id_claims = self.find_destroyed_osds()
-        self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
-        for host, drive_selection in self.prepare_drivegroup(drive_group):
-            self.log.info('Applying %s on host %s...' % (drive_group.service_id, host))
-            cmd = self.driveselection_to_ceph_volume(drive_group, drive_selection,
-                                                     drive_group.osd_id_claims.get(host, []))
-            if not cmd:
-                self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
-                continue
-            ret_msg = self._create_osd(host, cmd,
-                                       replace_osd_ids=drive_group.osd_id_claims.get(host, []))
-            ret.append(ret_msg)
-        return ", ".join(ret)
-
-    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
-        # 1) use fn_filter to determine matching_hosts
-        matching_hosts = drive_group.placement.pattern_matches_hosts([x for x in self.cache.get_hosts()])
-        # 2) Map the inventory to the InventoryHost object
-        host_ds_map = []
-
-        # set osd_id_claims
-
-        def _find_inv_for_host(hostname: str, inventory_dict: dict):
-            # This is stupid and needs to be loaded with the host
-            for _host, _inventory in inventory_dict.items():
-                if _host == hostname:
-                    return _inventory
-            raise OrchestratorError("No inventory found for host: {}".format(hostname))
-
-        # 3) iterate over matching_host and call DriveSelection
-        self.log.debug(f"Checking matching hosts -> {matching_hosts}")
-        for host in matching_hosts:
-            inventory_for_host = _find_inv_for_host(host, self.cache.devices)
-            self.log.debug(f"Found inventory for host {inventory_for_host}")
-            drive_selection = DriveSelection(drive_group, inventory_for_host)
-            self.log.debug(f"Found drive selection {drive_selection}")
-            host_ds_map.append((host, drive_selection))
-        return host_ds_map
-
-    def driveselection_to_ceph_volume(self, drive_group: DriveGroupSpec,
-                                      drive_selection: DriveSelection,
-                                      osd_id_claims: Optional[List[str]] = None,
-                                      preview: bool = False) -> Optional[str]:
-        self.log.debug(f"Translating DriveGroup <{drive_group}> to ceph-volume command")
-        cmd: Optional[str] = translate.to_ceph_volume(drive_group, drive_selection, osd_id_claims, preview=preview).run()
-        self.log.debug(f"Resulting ceph-volume cmd: {cmd}")
-        return cmd
-
-    def preview_drivegroups(self, drive_group_name: Optional[str] = None,
-                            dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
-        # find drivegroups
-        if drive_group_name:
-            drive_groups = cast(List[DriveGroupSpec],
-                                self.spec_store.find(service_name=drive_group_name))
-        elif dg_specs:
-            drive_groups = dg_specs
-        else:
-            drive_groups = []
-        ret_all = []
-        for drive_group in drive_groups:
-            drive_group.osd_id_claims = self.find_destroyed_osds()
-            self.log.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {drive_group.osd_id_claims}")
-            # prepare driveselection
-            for host, ds in self.prepare_drivegroup(drive_group):
-                cmd = self.driveselection_to_ceph_volume(drive_group, ds,
-                                                         drive_group.osd_id_claims.get(host, []), preview=True)
-                if not cmd:
-                    self.log.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_name()))
-                    continue
-                out, err, code = self._run_ceph_volume_command(host, cmd)
-                if out:
-                    concat_out = json.loads(" ".join(out))
-                    ret_all.append({'data': concat_out, 'drivegroup': drive_group.service_id, 'host': host})
-        return ret_all
-
-    def _run_ceph_volume_command(self, host: str, cmd: str) -> Tuple[List[str], List[str], int]:
-        self._require_hosts(host)
-
-        # get bootstrap key
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get',
-            'entity': 'client.bootstrap-osd',
-        })
-
-        # generate config
-        ret, config, err = self.mon_command({
-            "prefix": "config generate-minimal-conf",
-        })
-
-        j = json.dumps({
-            'config': config,
-            'keyring': keyring,
-        })
-
-        split_cmd = cmd.split(' ')
-        _cmd = ['--config-json', '-', '--']
-        _cmd.extend(split_cmd)
-        out, err, code = self._run_cephadm(
-            host, 'osd', 'ceph-volume',
-            _cmd,
-            stdin=j,
-            error_ok=True)
-        return out, err, code
-
-    def _create_osd(self, host, cmd, replace_osd_ids=None):
-        out, err, code = self._run_ceph_volume_command(host, cmd)
-
-        if code == 1 and ', it is already prepared' in '\n'.join(err):
-            # HACK: when we create against an existing LV, ceph-volume
-            # returns an error and the above message.  To make this
-            # command idempotent, tolerate this "error" and continue.
-            self.log.debug('the device was already prepared; continuing')
-            code = 0
-        if code:
-            raise RuntimeError(
-                'cephadm exited with an error code: %d, stderr:%s' % (
-                    code, '\n'.join(err)))
-
-        # check result
-        out, err, code = self._run_cephadm(
-            host, 'osd', 'ceph-volume',
-            [
-                '--',
-                'lvm', 'list',
-                '--format', 'json',
-            ])
-        before_osd_uuid_map = self.get_osd_uuid_map(only_up=True)
-        osds_elems = json.loads('\n'.join(out))
-        fsid = self._cluster_fsid
-        osd_uuid_map = self.get_osd_uuid_map()
-        created = []
-        for osd_id, osds in osds_elems.items():
-            for osd in osds:
-                if osd['tags']['ceph.cluster_fsid'] != fsid:
-                    self.log.debug('mismatched fsid, skipping %s' % osd)
-                    continue
-                if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
-                    # if it exists but is part of the replacement operation, don't skip
-                    continue
-                if osd_id not in osd_uuid_map:
-                    self.log.debug('osd id {} does not exist in cluster'.format(osd_id))
-                    continue
-                if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
-                    self.log.debug('mismatched osd uuid (cluster has %s, osd '
-                                   'has %s)' % (
-                                       osd_uuid_map.get(osd_id),
-                                       osd['tags']['ceph.osd_fsid']))
-                    continue
-
-                created.append(osd_id)
-                self._create_daemon(
-                    'osd', osd_id, host,
-                    osd_uuid_map=osd_uuid_map)
+        return self.osd_service.create(drive_group)
 
-        if created:
-            self.cache.invalidate_host_devices(host)
-            return "Created osd(s) %s on host '%s'" % (','.join(created), host)
-        else:
-            return "Created no osd(s) on host %s; already created?" % host
+    @trivial_completion
+    def preview_osdspecs(self,
+                         osdspec_name: Optional[str] = None,
+                         osdspecs: Optional[List[DriveGroupSpec]] = None
+                         ):
+        matching_hosts = self.resolve_hosts_for_osdspecs(specs=osdspecs, service_name=osdspec_name)
+        if not matching_hosts:
+            return {'n/a': [{'error': True,
+                             'message': 'No OSDSpec or matching hosts found.'}]}
+        # Is any host still loading previews
+        pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts}
+        if pending_hosts:
+            # Report 'pending' when any of the matching hosts is still loading previews (flag is True)
+            return {'n/a': [{'error': True,
+                             'message': 'Preview data is being generated.. '
+                                        'Please try again in a bit.'}]}
+        # drop all keys that are not in search_hosts and return preview struct
+        return {k: v for (k, v) in self.cache.osdspec_previews.items() if k in matching_hosts}
 
     def _calc_daemon_deps(self, daemon_type, daemon_id):
         need = {
@@ -2313,17 +1574,14 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         # type: (str, str, Optional[str], Optional[str]) -> Dict[str, Any]
         # keyring
         if not keyring:
-            if daemon_type == 'mon':
-                ename = 'mon.'
-            else:
-                ename = utils.name_to_config_section(daemon_type + '.' + daemon_id)
-            ret, keyring, err = self.mon_command({
+            ename = utils.name_to_auth_entity(daemon_type + '.' + daemon_id)
+            ret, keyring, err = self.check_mon_command({
                 'prefix': 'auth get',
                 'entity': ename,
             })
 
         # generate config
-        ret, config, err = self.mon_command({
+        ret, config, err = self.check_mon_command({
             "prefix": "config generate-minimal-conf",
         })
         if extra_ceph_config:
@@ -2334,11 +1592,18 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             'keyring': keyring,
         }
 
-    def _create_daemon(self, daemon_type, daemon_id, host,
-                       keyring=None,
-                       extra_args=None, extra_config=None,
+    def _create_daemon(self,
+                       daemon_type: str,
+                       daemon_id: str,
+                       host: str,
+                       keyring: Optional[str] = None,
+                       extra_args: Optional[List[str]] = None,
+                       extra_config: Optional[Dict[str, Any]] = None,
                        reconfig=False,
-                       osd_uuid_map=None):
+                       osd_uuid_map: Optional[Dict[str, Any]] = None,
+                       redeploy=False,
+                       ) -> str:
+
         if not extra_args:
             extra_args = []
         if not extra_config:
@@ -2347,19 +1612,22 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         start_time = datetime.datetime.utcnow()
         deps = []  # type: List[str]
-        cephadm_config = {} # type: Dict[str, Any]
+        cephadm_config = {}  # type: Dict[str, Any]
         if daemon_type == 'prometheus':
-            cephadm_config, deps = self._generate_prometheus_config()
+            cephadm_config, deps = self.prometheus_service.generate_config()
             extra_args.extend(['--config-json', '-'])
         elif daemon_type == 'grafana':
-            cephadm_config, deps = self._generate_grafana_config()
+            cephadm_config, deps = self.grafana_service.generate_config()
             extra_args.extend(['--config-json', '-'])
         elif daemon_type == 'nfs':
             cephadm_config, deps = \
-                    self._generate_nfs_config(daemon_type, daemon_id, host)
+                    self.nfs_service._generate_nfs_config(daemon_type, daemon_id, host)
             extra_args.extend(['--config-json', '-'])
         elif daemon_type == 'alertmanager':
-            cephadm_config, deps = self._generate_alertmanager_config()
+            cephadm_config, deps = self.alertmanager_service.generate_config()
+            extra_args.extend(['--config-json', '-'])
+        elif daemon_type == 'node-exporter':
+            cephadm_config, deps = self.node_exporter_service.generate_config()
             extra_args.extend(['--config-json', '-'])
         else:
             # Ceph.daemons (mon, mgr, mds, osd, etc)
@@ -2377,7 +1645,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     osd_uuid_map = self.get_osd_uuid_map()
                 osd_uuid = osd_uuid_map.get(daemon_id)
                 if not osd_uuid:
-                    raise OrchestratorError('osd.%d not in osdmap' % daemon_id)
+                    raise OrchestratorError('osd.%s not in osdmap' % daemon_id)
                 extra_args.extend(['--osd-fsid', osd_uuid])
 
         if reconfig:
@@ -2411,11 +1679,11 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return "{} {} on host '{}'".format(
             'Reconfigured' if reconfig else 'Deployed', name, host)
 
-    @async_map_completion
-    def _remove_daemons(self, name, host):
+    @forall_hosts
+    def _remove_daemons(self, name, host) -> str:
         return self._remove_daemon(name, host)
 
-    def _remove_daemon(self, name, host):
+    def _remove_daemon(self, name, host) -> str:
         """
         Remove a daemon
         """
@@ -2425,13 +1693,10 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
             # remove mon from quorum before we destroy the daemon
             self.log.info('Removing monitor %s from monmap...' % name)
-            ret, out, err = self.mon_command({
+            ret, out, err = self.check_mon_command({
                 'prefix': 'mon rm',
                 'name': daemon_id,
             })
-            if ret:
-                raise OrchestratorError('failed to remove mon %s from monmap' % (
-                    name))
 
         args = ['--name', name, '--force']
         self.log.info('Removing daemon %s from %s' % (name, host))
@@ -2443,7 +1708,37 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         self.cache.invalidate_host_daemons(host)
         return "Removed {} from host '{}'".format(name, host)
 
-    def _apply_service(self, spec):
+    def _create_fn(self, service_type: str) -> Callable[..., str]:
+        try:
+            d: Dict[str, function] = {
+                'mon': self.mon_service.create,
+                'mgr': self.mgr_service.create,
+                'osd': self.osd_service.create,
+                'mds': self.mds_service.create,
+                'rgw': self.rgw_service.create,
+                'rbd-mirror': self.rbd_mirror_service.create,
+                'nfs': self.nfs_service.create,
+                'grafana': self.grafana_service.create,
+                'alertmanager': self.alertmanager_service.create,
+                'prometheus': self.prometheus_service.create,
+                'node-exporter': self.node_exporter_service.create,
+                'crash': self.crash_service.create,
+                'iscsi': self.iscsi_service.create,
+            }
+            return d[service_type]  # type: ignore
+        except KeyError:
+            self.log.exception(f'unknown service type {service_type}')
+            raise OrchestratorError(f'unknown service type {service_type}') from e
+
+    def _config_fn(self, service_type) -> Optional[Callable[[ServiceSpec], None]]:
+        return {
+            'mds': self.mds_service.config,
+            'rgw': self.rgw_service.config,
+            'nfs': self.nfs_service.config,
+            'iscsi': self.iscsi_service.config,
+        }.get(service_type)
+
+    def _apply_service(self, spec: ServiceSpec) -> bool:
         """
         Schedule a service.  Deploy new daemons or remove old ones, depending
         on the target label and count specified in the placement.
@@ -2454,38 +1749,20 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             self.log.debug('Skipping unmanaged service %s spec' % service_name)
             return False
         self.log.debug('Applying service %s spec' % service_name)
-        create_fns = {
-            'mon': self._create_mon,
-            'mgr': self._create_mgr,
-            'osd': self.create_osds,
-            'mds': self._create_mds,
-            'rgw': self._create_rgw,
-            'rbd-mirror': self._create_rbd_mirror,
-            'nfs': self._create_nfs,
-            'grafana': self._create_grafana,
-            'alertmanager': self._create_alertmanager,
-            'prometheus': self._create_prometheus,
-            'node-exporter': self._create_node_exporter,
-            'crash': self._create_crash,
-            'iscsi': self._create_iscsi,
-        }
-        config_fns = {
-            'mds': self._config_mds,
-            'rgw': self._config_rgw,
-            'nfs': self._config_nfs,
-            'iscsi': self._config_iscsi,
-        }
-        create_func = create_fns.get(daemon_type, None)
-        if not create_func:
-            self.log.debug('unrecognized service type %s' % daemon_type)
+
+        create_func = self._create_fn(daemon_type)
+        config_func = self._config_fn(daemon_type)
+
+        if daemon_type == 'osd':
+            create_func(spec)
+            # TODO: return True would result in a busy loop
             return False
-        config_func = config_fns.get(daemon_type, None)
 
         daemons = self.cache.get_daemons_by_service(service_name)
 
         public_network = None
         if daemon_type == 'mon':
-            ret, out, err = self.mon_command({
+            ret, out, err = self.check_mon_command({
                 'prefix': 'config get',
                 'who': 'mon',
                 'key': 'public_network',
@@ -2511,9 +1788,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
         r = False
 
-        if daemon_type == 'osd':
-            return False if create_func(spec) else True # type: ignore
-
         # sanity check
         if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
             self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
@@ -2529,15 +1803,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                     config_func(spec)
                     did_config = True
                 daemon_id = self.get_unique_name(daemon_type, host, daemons,
-                                                 spec.service_id, name)
+                                                 prefix=spec.service_id,
+                                                 forcename=name)
                 self.log.debug('Placing %s.%s on host %s' % (
                     daemon_type, daemon_id, host))
                 if daemon_type == 'mon':
                     create_func(daemon_id, host, network)  # type: ignore
-                elif daemon_type == 'nfs':
+                elif daemon_type in ['nfs', 'iscsi']:
                     create_func(daemon_id, host, spec)  # type: ignore
                 else:
-                    create_func(daemon_id, host)           # type: ignore
+                    create_func(daemon_id, host)  # type: ignore
 
                 # add to daemon list so next name(s) will also be unique
                 sd = orchestrator.DaemonDescription(
@@ -2569,10 +1844,16 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 if self._apply_service(spec):
                     r = True
             except Exception as e:
-                self.log.warning('Failed to apply %s spec %s: %s' % (
+                self.log.exception('Failed to apply %s spec %s: %s' % (
                     spec.service_name(), spec, e))
         return r
 
+    def _check_pool_exists(self, pool, service_name):
+        logger.info(f'Checking pool "{pool}" exists for service {service_name}')
+        if not self.rados.pool_exists(pool):
+            raise OrchestratorError(f'Cannot find pool "{pool}" for '
+                                    f'service {service_name}')
+
     def _check_daemons(self):
         # get monmap mtime so we can refresh configs when mons change
         monmap = self.get('mon_map')
@@ -2582,7 +1863,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             last_monmap = None   # just in case clocks are skewed
 
         daemons = self.cache.get_daemons()
-        grafanas = []  # type: List[orchestrator.DaemonDescription]
+        daemons_post = defaultdict(list)
         for dd in daemons:
             # orphan?
             spec = self.spec_store.specs.get(dd.service_name(), None)
@@ -2593,13 +1874,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 self._remove_daemon(dd.name(), dd.hostname)
 
             # ignore unmanaged services
-            if not spec or spec.unmanaged:
+            if spec and spec.unmanaged:
                 continue
 
-            # dependencies?
-            if dd.daemon_type == 'grafana':
-                # put running instances at the front of the list
-                grafanas.insert(0, dd)
+            # These daemon types require additional configs after creation
+            if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']:
+                daemons_post[dd.daemon_type].append(dd)
+
             deps = self._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
             last_deps, last_config = self.cache.get_daemon_last_config_deps(
                 dd.hostname, dd.name())
@@ -2625,25 +1906,12 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
                 self._create_daemon(dd.daemon_type, dd.daemon_id,
                                     dd.hostname, reconfig=True)
 
-        # make sure the dashboard [does not] references grafana
-        try:
-            current_url = self.get_module_option_ex('dashboard',
-                                                    'GRAFANA_API_URL')
-            if grafanas:
-                host = grafanas[0].hostname
-                url = 'https://%s:3000' % (self.inventory[host].get('addr',
-                                                                    host))
-                if current_url != url:
-                    self.log.info('Setting dashboard grafana config to %s' % url)
-                    self.set_module_option_ex('dashboard', 'GRAFANA_API_URL',
-                                              url)
-                    # FIXME: is it a signed cert??
-        except Exception as e:
-            self.log.debug('got exception fetching dashboard grafana state: %s',
-                           e)
+        # do daemon post actions
+        for daemon_type, daemon_descs in daemons_post.items():
+            self._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
 
     def _add_daemon(self, daemon_type, spec,
-                    create_func, config_func=None):
+                    create_func: Callable[..., T], config_func=None) -> List[T]:
         """
         Add (and place) a daemon. Require explicit host placement.  Do not
         schedule, and do not apply the related scheduling limitations.
@@ -2659,7 +1927,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
 
     def _create_daemons(self, daemon_type, spec, daemons,
                         hosts, count,
-                        create_func, config_func=None):
+                        create_func: Callable[..., T], config_func=None) -> List[T]:
         if count > len(hosts):
             raise OrchestratorError('too few hosts: want %d, have %s' % (
                 count, hosts))
@@ -2670,14 +1938,13 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         args = []  # type: List[tuple]
         for host, network, name in hosts:
             daemon_id = self.get_unique_name(daemon_type, host, daemons,
-                                             spec.service_id, name)
+                                             prefix=spec.service_id,
+                                             forcename=name)
             self.log.debug('Placing %s.%s on host %s' % (
                 daemon_type, daemon_id, host))
             if daemon_type == 'mon':
                 args.append((daemon_id, host, network))  # type: ignore
-            elif daemon_type == 'nfs':
-                args.append((daemon_id, host, spec)) # type: ignore
-            elif daemon_type == 'iscsi':
+            elif daemon_type in ['nfs', 'iscsi']:
                 args.append((daemon_id, host, spec))  # type: ignore
             else:
                 args.append((daemon_id, host))  # type: ignore
@@ -2690,7 +1957,7 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
             )
             daemons.append(sd)
 
-        @async_map_completion
+        @forall_hosts
         def create_func_map(*args):
             return create_func(*args)
 
@@ -2700,71 +1967,23 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
     def apply_mon(self, spec):
         return self._apply(spec)
 
-    def _create_mon(self, name, host, network):
-        """
-        Create a new monitor on the given host.
-        """
-        # get mon. key
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get',
-            'entity': 'mon.',
-        })
-
-        extra_config = '[mon.%s]\n' % name
-        if network:
-            # infer whether this is a CIDR network, addrvec, or plain IP
-            if '/' in network:
-                extra_config += 'public network = %s\n' % network
-            elif network.startswith('[v') and network.endswith(']'):
-                extra_config += 'public addrv = %s\n' % network
-            elif ':' not in network:
-                extra_config += 'public addr = %s\n' % network
-            else:
-                raise OrchestratorError('Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
-        else:
-            # try to get the public_network from the config
-            ret, network, err = self.mon_command({
-                'prefix': 'config get',
-                'who': 'mon',
-                'key': 'public_network',
-            })
-            network = network.strip() # type: ignore
-            if ret:
-                raise RuntimeError('Unable to fetch cluster_network config option')
-            if not network:
-                raise OrchestratorError('Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
-            if '/' not in network:
-                raise OrchestratorError('public_network is set but does not look like a CIDR network: \'%s\'' % network)
-            extra_config += 'public network = %s\n' % network
-
-        return self._create_daemon('mon', name, host,
-                                   keyring=keyring,
-                                   extra_config={'config': extra_config})
-
+    @trivial_completion
     def add_mon(self, spec):
-        # type: (ServiceSpec) -> orchestrator.Completion
-        return self._add_daemon('mon', spec, self._create_mon)
+        # type: (ServiceSpec) -> List[str]
+        return self._add_daemon('mon', spec, self.mon_service.create)
 
-    def _create_mgr(self, mgr_id, host):
-        """
-        Create a new manager instance on a host.
-        """
-        # get mgr. key
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': 'mgr.%s' % mgr_id,
-            'caps': ['mon', 'profile mgr',
-                     'osd', 'allow *',
-                     'mds', 'allow *'],
-        })
+    @trivial_completion
+    def add_mgr(self, spec):
+        # type: (ServiceSpec) -> List[str]
+        return self._add_daemon('mgr', spec, self.mgr_service.create)
 
-        return self._create_daemon('mgr', mgr_id, host, keyring=keyring)
+    def _apply(self, spec: GenericSpec) -> str:
+        if spec.service_type == 'host':
+            return self._add_host(cast(HostSpec, spec))
 
-    def add_mgr(self, spec):
-        # type: (ServiceSpec) -> orchestrator.Completion
-        return self._add_daemon('mgr', spec, self._create_mgr)
+        return self._apply_service_spec(cast(ServiceSpec, spec))
 
-    def _apply(self, spec: ServiceSpec) -> str:
+    def _apply_service_spec(self, spec: ServiceSpec) -> str:
         if spec.placement.is_empty():
             # fill in default placement
             defaults = {
@@ -2801,537 +2020,111 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
         return "Scheduled %s update..." % spec.service_name()
 
     @trivial_completion
-    def apply(self, specs: List[ServiceSpec]):
-        return [self._apply(spec) for spec in specs]
+    def apply(self, specs: List[GenericSpec]):
+        results = []
+        for spec in specs:
+            results.append(self._apply(spec))
+        return results
 
     @trivial_completion
     def apply_mgr(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_mds(self, spec: ServiceSpec):
-        return self._add_daemon('mds', spec, self._create_mds, self._config_mds)
+        return self._add_daemon('mds', spec, self.mds_service.create, self.mds_service.config)
 
     @trivial_completion
     def apply_mds(self, spec: ServiceSpec):
         return self._apply(spec)
 
-    def _config_mds(self, spec):
-        # ensure mds_join_fs is set for these daemons
-        assert spec.service_id
-        ret, out, err = self.mon_command({
-            'prefix': 'config set',
-            'who': 'mds.' + spec.service_id,
-            'name': 'mds_join_fs',
-            'value': spec.service_id,
-        })
-
-    def _create_mds(self, mds_id, host):
-        # get mgr. key
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': 'mds.' + mds_id,
-            'caps': ['mon', 'profile mds',
-                     'osd', 'allow rwx',
-                     'mds', 'allow'],
-        })
-        return self._create_daemon('mds', mds_id, host, keyring=keyring)
-
+    @trivial_completion
     def add_rgw(self, spec):
-        return self._add_daemon('rgw', spec, self._create_rgw, self._config_rgw)
-
-    def _config_rgw(self, spec):
-        # ensure rgw_realm and rgw_zone is set for these daemons
-        ret, out, err = self.mon_command({
-            'prefix': 'config set',
-            'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
-            'name': 'rgw_zone',
-            'value': spec.rgw_zone,
-        })
-        ret, out, err = self.mon_command({
-            'prefix': 'config set',
-            'who': f"{utils.name_to_config_section('rgw')}.{spec.rgw_realm}",
-            'name': 'rgw_realm',
-            'value': spec.rgw_realm,
-        })
-        ret, out, err = self.mon_command({
-            'prefix': 'config set',
-            'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
-            'name': 'rgw_frontends',
-            'value': spec.rgw_frontends_config_value(),
-        })
-
-        if spec.rgw_frontend_ssl_certificate:
-            if isinstance(spec.rgw_frontend_ssl_certificate, list):
-                cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
-            else:
-                cert_data = spec.rgw_frontend_ssl_certificate
-            ret, out, err = self.mon_command({
-                'prefix': 'config-key set',
-                'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.crt',
-                'val': cert_data,
-            })
-
-        if spec.rgw_frontend_ssl_key:
-            if isinstance(spec.rgw_frontend_ssl_key, list):
-                key_data = '\n'.join(spec.rgw_frontend_ssl_key)
-            else:
-                key_data = spec.rgw_frontend_ssl_key
-            ret, out, err = self.mon_command({
-                'prefix': 'config-key set',
-                'key': f'rgw/cert/{spec.rgw_realm}/{spec.rgw_zone}.key',
-                'val': key_data,
-            })
-
-        logger.info('Saving service %s spec with placement %s' % (
-            spec.service_name(), spec.placement.pretty_str()))
-        self.spec_store.save(spec)
-
-    def _create_rgw(self, rgw_id, host):
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': f"{utils.name_to_config_section('rgw')}.{rgw_id}",
-            'caps': ['mon', 'allow *',
-                     'mgr', 'allow rw',
-                     'osd', 'allow rwx'],
-        })
-        return self._create_daemon('rgw', rgw_id, host, keyring=keyring)
+        return self._add_daemon('rgw', spec, self.rgw_service.create, self.rgw_service.config)
 
     @trivial_completion
     def apply_rgw(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_iscsi(self, spec):
-        # type: (ServiceSpec) -> orchestrator.Completion
-        return self._add_daemon('iscsi', spec, self._create_iscsi, self._config_iscsi)
-
-    def _config_iscsi(self, spec):
-        logger.info('Saving service %s spec with placement %s' % (
-            spec.service_name(), spec.placement.pretty_str()))
-        self.spec_store.save(spec)
-
-    def _create_iscsi(self, igw_id, host, spec):
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': utils.name_to_config_section('iscsi') + '.' + igw_id,
-            'caps': ['mon', 'allow rw',
-                     'osd', f'allow rwx pool={spec.pool}'],
-        })
-
-        api_secure = 'false' if spec.api_secure is None else spec.api_secure
-        igw_conf = f"""
-# generated by cephadm
-[config]
-cluster_client_name = {utils.name_to_config_section('iscsi')}.{igw_id}
-pool = {spec.pool}
-trusted_ip_list = {spec.trusted_ip_list or ''}
-minimum_gateways = 1
-fqdn_enabled = {spec.fqdn_enabled or ''}
-api_port = {spec.api_port or ''}
-api_user = {spec.api_user or ''}
-api_password = {spec.api_password or ''}
-api_secure = {api_secure}
-"""
-        extra_config = {'iscsi-gateway.cfg': igw_conf}
-        return self._create_daemon('iscsi', igw_id, host, keyring=keyring,
-                                   extra_config=extra_config)
+        # type: (ServiceSpec) -> List[str]
+        return self._add_daemon('iscsi', spec, self.iscsi_service.create, self.iscsi_service.config)
 
     @trivial_completion
     def apply_iscsi(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_rbd_mirror(self, spec):
-        return self._add_daemon('rbd-mirror', spec, self._create_rbd_mirror)
-
-    def _create_rbd_mirror(self, daemon_id, host):
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': 'client.rbd-mirror.' + daemon_id,
-            'caps': ['mon', 'profile rbd-mirror',
-                     'osd', 'profile rbd'],
-        })
-        return self._create_daemon('rbd-mirror', daemon_id, host,
-                                   keyring=keyring)
+        return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.create)
 
     @trivial_completion
     def apply_rbd_mirror(self, spec):
         return self._apply(spec)
 
-    def _generate_nfs_config(self, daemon_type, daemon_id, host):
-        # type: (str, str, str) -> Tuple[Dict[str, Any], List[str]]
-        deps = [] # type: List[str]
-
-        # find the matching NFSServiceSpec
-        # TODO: find the spec and pass via _create_daemon instead ??
-        service_name = self.get_service_name(daemon_type, daemon_id, host)
-        specs = self.spec_store.find(service_name)
-        if not specs:
-            raise OrchestratorError('Cannot find service spec %s' % (service_name))
-        elif len(specs) > 1:
-            raise OrchestratorError('Found multiple service specs for %s' % (service_name))
-        else:
-            # cast to keep mypy happy
-            spec = cast(NFSServiceSpec, specs[0])
-
-        nfs = NFSGanesha(self, daemon_id, spec)
-
-        # create the keyring
-        entity = nfs.get_keyring_entity()
-        keyring = nfs.get_or_create_keyring(entity=entity)
-
-        # update the caps after get-or-create, the keyring might already exist!
-        nfs.update_keyring_caps(entity=entity)
-
-        # create the rados config object
-        nfs.create_rados_config_obj()
-
-        # generate the cephadm config
-        cephadm_config = nfs.get_cephadm_config()
-        cephadm_config.update(
-                self._get_config_and_keyring(
-                    daemon_type, daemon_id,
-                    keyring=keyring))
-
-        return cephadm_config, deps
-
+    @trivial_completion
     def add_nfs(self, spec):
-        return self._add_daemon('nfs', spec, self._create_nfs, self._config_nfs)
-
-    def _config_nfs(self, spec):
-        logger.info('Saving service %s spec with placement %s' % (
-            spec.service_name(), spec.placement.pretty_str()))
-        self.spec_store.save(spec)
-
-    def _create_nfs(self, daemon_id, host, spec):
-        return self._create_daemon('nfs', daemon_id, host)
+        return self._add_daemon('nfs', spec, self.nfs_service.create, self.nfs_service.config)
 
     @trivial_completion
     def apply_nfs(self, spec):
         return self._apply(spec)
 
-    def _generate_prometheus_config(self):
-        # type: () -> Tuple[Dict[str, Any], List[str]]
-        deps = []  # type: List[str]
-
-        # scrape mgrs
-        mgr_scrape_list = []
-        mgr_map = self.get('mgr_map')
-        port = None
-        t = mgr_map.get('services', {}).get('prometheus', None)
-        if t:
-            t = t.split('/')[2]
-            mgr_scrape_list.append(t)
-            port = '9283'
-            if ':' in t:
-                port = t.split(':')[1]
-        # scan all mgrs to generate deps and to get standbys too.
-        # assume that they are all on the same port as the active mgr.
-        for dd in self.cache.get_daemons_by_service('mgr'):
-            # we consider the mgr a dep even if the prometheus module is
-            # disabled in order to be consistent with _calc_daemon_deps().
-            deps.append(dd.name())
-            if not port:
-                continue
-            if dd.daemon_id == self.get_mgr_id():
-                continue
-            hi = self.inventory.get(dd.hostname, {})
-            addr = hi.get('addr', dd.hostname)
-            mgr_scrape_list.append(addr.split(':')[0] + ':' + port)
-
-        # scrape node exporters
-        node_configs = ''
-        for dd in self.cache.get_daemons_by_service('node-exporter'):
-            deps.append(dd.name())
-            hi = self.inventory.get(dd.hostname, {})
-            addr = hi.get('addr', dd.hostname)
-            if not node_configs:
-                node_configs = """
-  - job_name: 'node'
-    static_configs:
-"""
-            node_configs += """    - targets: {}
-      labels:
-        instance: '{}'
-""".format([addr.split(':')[0] + ':9100'],
-           dd.hostname)
-
-        # scrape alert managers
-        alertmgr_configs = ""
-        alertmgr_targets = []
-        for dd in self.cache.get_daemons_by_service('alertmanager'):
-            deps.append(dd.name())
-            hi = self.inventory.get(dd.hostname, {})
-            addr = hi.get('addr', dd.hostname)
-            alertmgr_targets.append("'{}:9093'".format(addr.split(':')[0]))
-        if alertmgr_targets:
-            alertmgr_configs = """alerting:
-  alertmanagers:
-    - scheme: http
-      path_prefix: /alertmanager
-      static_configs:
-        - targets: [{}]
-""".format(", ".join(alertmgr_targets))
-
-        # generate the prometheus configuration
-        r = {
-            'files': {
-                'prometheus.yml': """# generated by cephadm
-global:
-  scrape_interval: 5s
-  evaluation_interval: 10s
-rule_files:
-  - /etc/prometheus/alerting/*
-{alertmgr_configs}
-scrape_configs:
-  - job_name: 'ceph'
-    static_configs:
-    - targets: {mgr_scrape_list}
-      labels:
-        instance: 'ceph_cluster'
-{node_configs}
-""".format(
-    mgr_scrape_list=str(mgr_scrape_list),
-    node_configs=str(node_configs),
-    alertmgr_configs=str(alertmgr_configs)
-    ),
-            },
-        }
-
-        # include alerts, if present in the container
-        if os.path.exists(self.prometheus_alerts_path):
-            with open(self.prometheus_alerts_path, "r") as f:
-                alerts = f.read()
-            r['files']['/etc/prometheus/alerting/ceph_alerts.yml'] = alerts
-
-        return r, sorted(deps)
-
-    def _generate_grafana_config(self):
-        # type: () -> Tuple[Dict[str, Any], List[str]]
-        deps = []  # type: List[str]
-        def generate_grafana_ds_config(hosts: List[str]) -> str:
-            config = '''# generated by cephadm
-deleteDatasources:
-{delete_data_sources}
-
-datasources:
-{data_sources}
-'''
-            delete_ds_template = '''
-  - name: '{name}'
-    orgId: 1\n'''.lstrip('\n')
-            ds_template = '''
-  - name: '{name}'
-    type: 'prometheus'
-    access: 'proxy'
-    orgId: 1
-    url: 'http://{host}:9095'
-    basicAuth: false
-    isDefault: {is_default}
-    editable: false\n'''.lstrip('\n')
-
-            delete_data_sources = ''
-            data_sources = ''
-            for i, host in enumerate(hosts):
-                name = "Dashboard %d" % (i + 1)
-                data_sources += ds_template.format(
-                    name=name,
-                    host=host,
-                    is_default=str(i == 0).lower()
-                )
-                delete_data_sources += delete_ds_template.format(
-                    name=name
-                )
-            return config.format(
-                delete_data_sources=delete_data_sources,
-                data_sources=data_sources,
-            )
-
-        prom_services = []  # type: List[str]
-        for dd in self.cache.get_daemons_by_service('prometheus'):
-            prom_services.append(dd.hostname)
-            deps.append(dd.name())
-
-        cert = self.get_store('grafana_crt')
-        pkey = self.get_store('grafana_key')
-        if cert and pkey:
-            try:
-                verify_tls(cert, pkey)
-            except ServerConfigException as e:
-                logger.warning('Provided grafana TLS certificates invalid: %s', str(e))
-                cert, pkey = None, None
-        if not (cert and pkey):
-            cert, pkey = create_self_signed_cert('Ceph', 'cephadm')
-            self.set_store('grafana_crt', cert)
-            self.set_store('grafana_key', pkey)
-            self.mon_command({
-                'prefix': 'dashboard set-grafana-api-ssl-verify',
-                'value': 'false',
-            })
-
-
-
-        config_file = {
-            'files': {
-                "grafana.ini": """# generated by cephadm
-[users]
-  default_theme = light
-[auth.anonymous]
-  enabled = true
-  org_name = 'Main Org.'
-  org_role = 'Viewer'
-[server]
-  domain = 'bootstrap.storage.lab'
-  protocol = https
-  cert_file = /etc/grafana/certs/cert_file
-  cert_key = /etc/grafana/certs/cert_key
-  http_port = 3000
-[security]
-  admin_user = admin
-  admin_password = admin
-  allow_embedding = true
-""",
-                'provisioning/datasources/ceph-dashboard.yml': generate_grafana_ds_config(prom_services),
-                'certs/cert_file': '# generated by cephadm\n%s' % cert,
-                'certs/cert_key': '# generated by cephadm\n%s' % pkey,
-            }
-        }
-        return config_file, sorted(deps)
-
     def _get_dashboard_url(self):
         # type: () -> str
         return self.get('mgr_map').get('services', {}).get('dashboard', '')
 
-    def _generate_alertmanager_config(self):
-        # type: () -> Tuple[Dict[str, Any], List[str]]
-        deps = [] # type: List[str]
-
-        # dashboard(s)
-        dashboard_urls = []
-        mgr_map = self.get('mgr_map')
-        port = None
-        proto = None  # http: or https:
-        url = mgr_map.get('services', {}).get('dashboard', None)
-        if url:
-            dashboard_urls.append(url)
-            proto = url.split('/')[0]
-            port = url.split('/')[2].split(':')[1]
-        # scan all mgrs to generate deps and to get standbys too.
-        # assume that they are all on the same port as the active mgr.
-        for dd in self.cache.get_daemons_by_service('mgr'):
-            # we consider mgr a dep even if the dashboard is disabled
-            # in order to be consistent with _calc_daemon_deps().
-            deps.append(dd.name())
-            if not port:
-                continue
-            if dd.daemon_id == self.get_mgr_id():
-                continue
-            hi = self.inventory.get(dd.hostname, {})
-            addr = hi.get('addr', dd.hostname)
-            dashboard_urls.append('%s//%s:%s/' % (proto, addr.split(':')[0],
-                                                 port))
-
-        yml = """# generated by cephadm
-# See https://prometheus.io/docs/alerting/configuration/ for documentation.
-
-global:
-  resolve_timeout: 5m
-
-route:
-  group_by: ['alertname']
-  group_wait: 10s
-  group_interval: 10s
-  repeat_interval: 1h
-  receiver: 'ceph-dashboard'
-receivers:
-- name: 'ceph-dashboard'
-  webhook_configs:
-{urls}
-""".format(
-    urls='\n'.join(
-        ["  - url: '{}api/prometheus_receiver'".format(u)
-         for u in dashboard_urls]
-    ))
-        peers = []
-        port = '9094'
-        for dd in self.cache.get_daemons_by_service('alertmanager'):
-            deps.append(dd.name())
-            hi = self.inventory.get(dd.hostname, {})
-            addr = hi.get('addr', dd.hostname)
-            peers.append(addr.split(':')[0] + ':' + port)
-        return {
-            "files": {
-                "alertmanager.yml": yml
-            },
-            "peers": peers
-        }, sorted(deps)
-
+    @trivial_completion
     def add_prometheus(self, spec):
-        return self._add_daemon('prometheus', spec, self._create_prometheus)
-
-    def _create_prometheus(self, daemon_id, host):
-        return self._create_daemon('prometheus', daemon_id, host)
+        return self._add_daemon('prometheus', spec, self.prometheus_service.create)
 
     @trivial_completion
     def apply_prometheus(self, spec):
         return self._apply(spec)
 
+    @trivial_completion
     def add_node_exporter(self, spec):
-        # type: (ServiceSpec) -> AsyncCompletion
+        # type: (ServiceSpec) -> List[str]
         return self._add_daemon('node-exporter', spec,
-                                self._create_node_exporter)
+                                self.node_exporter_service.create)
 
     @trivial_completion
     def apply_node_exporter(self, spec):
         return self._apply(spec)
 
-    def _create_node_exporter(self, daemon_id, host):
-        return self._create_daemon('node-exporter', daemon_id, host)
-
+    @trivial_completion
     def add_crash(self, spec):
-        # type: (ServiceSpec) -> AsyncCompletion
+        # type: (ServiceSpec) -> List[str]
         return self._add_daemon('crash', spec,
-                                self._create_crash)
+                                self.crash_service.create)
 
     @trivial_completion
     def apply_crash(self, spec):
         return self._apply(spec)
 
-    def _create_crash(self, daemon_id, host):
-        ret, keyring, err = self.mon_command({
-            'prefix': 'auth get-or-create',
-            'entity': 'client.crash.' + host,
-            'caps': ['mon', 'profile crash',
-                     'mgr', 'profile crash'],
-        })
-        return self._create_daemon('crash', daemon_id, host, keyring=keyring)
-
+    @trivial_completion
     def add_grafana(self, spec):
-        # type: (ServiceSpec) -> AsyncCompletion
-        return self._add_daemon('grafana', spec, self._create_grafana)
+        # type: (ServiceSpec) -> List[str]
+        return self._add_daemon('grafana', spec, self.grafana_service.create)
 
     @trivial_completion
     def apply_grafana(self, spec: ServiceSpec):
         return self._apply(spec)
 
-    def _create_grafana(self, daemon_id, host):
-        # type: (str, str) -> str
-        return self._create_daemon('grafana', daemon_id, host)
-
+    @trivial_completion
     def add_alertmanager(self, spec):
-        # type: (ServiceSpec) -> AsyncCompletion
-        return self._add_daemon('alertmanager', spec, self._create_alertmanager)
+        # type: (ServiceSpec) -> List[str]
+        return self._add_daemon('alertmanager', spec, self.alertmanager_service.create)
 
     @trivial_completion
     def apply_alertmanager(self, spec: ServiceSpec):
         return self._apply(spec)
 
-    def _create_alertmanager(self, daemon_id, host):
-        return self._create_daemon('alertmanager', daemon_id, host)
-
-
     def _get_container_image_id(self, image_name):
         # pick a random host...
         host = None
-        for host_name, hi in self.inventory.items():
+        for host_name in self.inventory.keys():
             host = host_name
             break
         if not host:
@@ -3384,89 +2177,23 @@ receivers:
 
     @trivial_completion
     def upgrade_status(self):
-        r = orchestrator.UpgradeStatusSpec()
-        if self.upgrade_state:
-            r.target_image = self.upgrade_state.get('target_name')
-            r.in_progress = True
-            if self.upgrade_state.get('error'):
-                r.message = 'Error: ' + self.upgrade_state.get('error')
-            elif self.upgrade_state.get('paused'):
-                r.message = 'Upgrade paused'
-        return r
+        return self.upgrade.upgrade_status()
 
     @trivial_completion
     def upgrade_start(self, image, version):
-        if self.mode != 'root':
-            raise OrchestratorError('upgrade is not supported in %s mode' % (
-                self.mode))
-        if version:
-            try:
-                (major, minor, patch) = version.split('.')
-                assert int(minor) >= 0
-                assert int(patch) >= 0
-            except:
-                raise OrchestratorError('version must be in the form X.Y.Z (e.g., 15.2.3)')
-            if int(major) < 15 or (int(major) == 15 and int(minor) < 2):
-                raise OrchestratorError('cephadm only supports octopus (15.2.0) or later')
-            target_name = self.container_image_base + ':v' + version
-        elif image:
-            target_name = image
-        else:
-            raise OrchestratorError('must specify either image or version')
-        if self.upgrade_state:
-            if self.upgrade_state.get('target_name') != target_name:
-                raise OrchestratorError(
-                    'Upgrade to %s (not %s) already in progress' %
-                (self.upgrade_state.get('target_name'), target_name))
-            if self.upgrade_state.get('paused'):
-                del self.upgrade_state['paused']
-                self._save_upgrade_state()
-                return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
-            return 'Upgrade to %s in progress' % self.upgrade_state.get('target_name')
-        self.upgrade_state = {
-            'target_name': target_name,
-            'progress_id': str(uuid.uuid4()),
-        }
-        self._update_upgrade_progress(0.0)
-        self._save_upgrade_state()
-        self._clear_upgrade_health_checks()
-        self.event.set()
-        return 'Initiating upgrade to %s' % (target_name)
+        return self.upgrade.upgrade_start(image, version)
 
     @trivial_completion
     def upgrade_pause(self):
-        if not self.upgrade_state:
-            raise OrchestratorError('No upgrade in progress')
-        if self.upgrade_state.get('paused'):
-            return 'Upgrade to %s already paused' % self.upgrade_state.get('target_name')
-        self.upgrade_state['paused'] = True
-        self._save_upgrade_state()
-        return 'Paused upgrade to %s' % self.upgrade_state.get('target_name')
+        return self.upgrade.upgrade_pause()
 
     @trivial_completion
     def upgrade_resume(self):
-        if not self.upgrade_state:
-            raise OrchestratorError('No upgrade in progress')
-        if not self.upgrade_state.get('paused'):
-            return 'Upgrade to %s not paused' % self.upgrade_state.get('target_name')
-        del self.upgrade_state['paused']
-        self._save_upgrade_state()
-        self.event.set()
-        return 'Resumed upgrade to %s' % self.upgrade_state.get('target_name')
+        return self.upgrade.upgrade_resume()
 
     @trivial_completion
     def upgrade_stop(self):
-        if not self.upgrade_state:
-            return 'No upgrade in progress'
-        target_name = self.upgrade_state.get('target_name')
-        if 'progress_id' in self.upgrade_state:
-            self.remote('progress', 'complete',
-                        self.upgrade_state['progress_id'])
-        self.upgrade_state = None
-        self._save_upgrade_state()
-        self._clear_upgrade_health_checks()
-        self.event.set()
-        return 'Stopped upgrade to %s' % target_name
+        return self.upgrade.upgrade_stop()
 
     @trivial_completion
     def remove_osds(self, osd_ids: List[str],
@@ -3503,188 +2230,3 @@ receivers:
         The CLI call to retrieve an osd removal report
         """
         return self.rm_util.report
-
-
-class BaseScheduler(object):
-    """
-    Base Scheduler Interface
-
-    * requires a placement_spec
-
-    `place(host_pool)` needs to return a List[HostPlacementSpec, ..]
-    """
-
-    def __init__(self, placement_spec):
-        # type: (PlacementSpec) -> None
-        self.placement_spec = placement_spec
-
-    def place(self, host_pool, count=None):
-        # type: (List, Optional[int]) -> List[HostPlacementSpec]
-        raise NotImplementedError
-
-
-class SimpleScheduler(BaseScheduler):
-    """
-    The most simple way to pick/schedule a set of hosts.
-    1) Shuffle the provided host_pool
-    2) Select from list up to :count
-    """
-    def __init__(self, placement_spec):
-        super(SimpleScheduler, self).__init__(placement_spec)
-
-    def place(self, host_pool, count=None):
-        # type: (List, Optional[int]) -> List[HostPlacementSpec]
-        if not host_pool:
-            return []
-        host_pool = [x for x in host_pool]
-        # shuffle for pseudo random selection
-        random.shuffle(host_pool)
-        return host_pool[:count]
-
-
-class HostAssignment(object):
-    """
-    A class to detect if hosts are being passed imperative or declarative
-    If the spec is populated via the `hosts/hosts` field it will not load
-    any hosts into the list.
-    If the spec isn't populated, i.e. when only num or label is present (declarative)
-    it will use the provided `get_host_func` to load it from the inventory.
-
-    Schedulers can be assigned to pick hosts from the pool.
-    """
-
-    def __init__(self,
-                 spec,  # type: ServiceSpec
-                 get_hosts_func,  # type: Callable[[Optional[str]],List[str]]
-                 get_daemons_func, # type: Callable[[str],List[orchestrator.DaemonDescription]]
-
-                 filter_new_host=None, # type: Optional[Callable[[str],bool]]
-                 scheduler=None,  # type: Optional[BaseScheduler]
-                 ):
-        assert spec and get_hosts_func and get_daemons_func
-        self.spec = spec  # type: ServiceSpec
-        self.scheduler = scheduler if scheduler else SimpleScheduler(self.spec.placement)
-        self.get_hosts_func = get_hosts_func
-        self.get_daemons_func = get_daemons_func
-        self.filter_new_host = filter_new_host
-        self.service_name = spec.service_name()
-
-
-    def validate(self):
-        self.spec.validate()
-
-        if self.spec.placement.hosts:
-            explicit_hostnames = {h.hostname for h in self.spec.placement.hosts}
-            unknown_hosts = explicit_hostnames.difference(set(self.get_hosts_func(None)))
-            if unknown_hosts:
-                raise OrchestratorValidationError(
-                    f'Cannot place {self.spec.one_line_str()} on {unknown_hosts}: Unknown hosts')
-
-        if self.spec.placement.host_pattern:
-            pattern_hostnames = self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
-            if not pattern_hostnames:
-                raise OrchestratorValidationError(
-                    f'Cannot place {self.spec.one_line_str()}: No matching hosts')
-
-        if self.spec.placement.label:
-            label_hostnames = self.get_hosts_func(self.spec.placement.label)
-            if not label_hostnames:
-                raise OrchestratorValidationError(
-                    f'Cannot place {self.spec.one_line_str()}: No matching '
-                    f'hosts for label {self.spec.placement.label}')
-
-    def place(self):
-        # type: () -> List[HostPlacementSpec]
-        """
-        Load hosts into the spec.placement.hosts container.
-        """
-
-        self.validate()
-
-        # count == 0
-        if self.spec.placement.count == 0:
-            return []
-
-        # respect any explicit host list
-        if self.spec.placement.hosts and not self.spec.placement.count:
-            logger.debug('Provided hosts: %s' % self.spec.placement.hosts)
-            return self.spec.placement.hosts
-
-        # respect host_pattern
-        if self.spec.placement.host_pattern:
-            candidates = [
-                HostPlacementSpec(x, '', '')
-                for x in self.spec.placement.pattern_matches_hosts(self.get_hosts_func(None))
-            ]
-            logger.debug('All hosts: {}'.format(candidates))
-            return candidates
-
-        count = 0
-        if self.spec.placement.hosts and \
-           self.spec.placement.count and \
-           len(self.spec.placement.hosts) >= self.spec.placement.count:
-            hosts = self.spec.placement.hosts
-            logger.debug('place %d over provided host list: %s' % (
-                count, hosts))
-            count = self.spec.placement.count
-        elif self.spec.placement.label:
-            hosts = [
-                HostPlacementSpec(x, '', '')
-                for x in self.get_hosts_func(self.spec.placement.label)
-            ]
-            if not self.spec.placement.count:
-                logger.debug('Labeled hosts: {}'.format(hosts))
-                return hosts
-            count = self.spec.placement.count
-            logger.debug('place %d over label %s: %s' % (
-                count, self.spec.placement.label, hosts))
-        else:
-            hosts = [
-                HostPlacementSpec(x, '', '')
-                for x in self.get_hosts_func(None)
-            ]
-            if self.spec.placement.count:
-                count = self.spec.placement.count
-            else:
-                # this should be a totally empty spec given all of the
-                # alternative paths above.
-                assert self.spec.placement.count is None
-                assert not self.spec.placement.hosts
-                assert not self.spec.placement.label
-                count = 1
-            logger.debug('place %d over all hosts: %s' % (count, hosts))
-
-        # we need to select a subset of the candidates
-
-        # if a partial host list is provided, always start with that
-        if len(self.spec.placement.hosts) < count:
-            chosen = self.spec.placement.hosts
-        else:
-            chosen = []
-
-        # prefer hosts that already have services
-        daemons = self.get_daemons_func(self.service_name)
-        hosts_with_daemons = {d.hostname for d in daemons}
-        # calc existing daemons (that aren't already in chosen)
-        chosen_hosts = [hs.hostname for hs in chosen]
-        existing = [hs for hs in hosts
-                    if hs.hostname in hosts_with_daemons and \
-                    hs.hostname not in chosen_hosts]
-        if len(chosen + existing) >= count:
-            chosen = chosen + self.scheduler.place(
-                existing,
-                count - len(chosen))
-            logger.debug('Hosts with existing daemons: {}'.format(chosen))
-            return chosen
-
-        need = count - len(existing + chosen)
-        others = [hs for hs in hosts
-                  if hs.hostname not in hosts_with_daemons]
-        if self.filter_new_host:
-            old = others
-            others = [h for h in others if self.filter_new_host(h.hostname)]
-            logger.debug('filtered %s down to %s' % (old, hosts))
-        chosen = chosen + self.scheduler.place(others, need)
-        logger.debug('Combine hosts with existing daemons %s + new hosts %s' % (
-            existing, chosen))
-        return existing + chosen