]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/serve.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
index 35092793a3339f6999d174cd7e3c883925a85edd..ba2d3c65dd2c9d5a666a13240b50095f3c4be248 100644 (file)
@@ -3,21 +3,13 @@ import json
 import logging
 import uuid
 from collections import defaultdict
-from contextlib import contextmanager
-from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
-
-from cephadm import remotes
-
-try:
-    import remoto
-    import execnet.gateway_bootstrap
-except ImportError:
-    remoto = None
+from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
+    DefaultDict
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
-from ceph.utils import str_to_datetime, datetime_now
+from ceph.utils import datetime_now
 
 import orchestrator
 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
@@ -34,7 +26,6 @@ from . import utils
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
-    from remoto.backends import BaseConnection
 
 logger = logging.getLogger(__name__)
 
@@ -68,6 +59,7 @@ class CephadmServe:
         self.mgr.config_checker.load_network_config()
 
         while self.mgr.run:
+            self.log.debug("serve loop start")
 
             try:
 
@@ -101,6 +93,11 @@ class CephadmServe:
 
                     self._purge_deleted_services()
 
+                    self._check_for_moved_osds()
+
+                    if self.mgr.agent_helpers._handle_use_agent_setting():
+                        continue
+
                     if self.mgr.upgrade.continue_upgrade():
                         continue
 
@@ -108,7 +105,9 @@ class CephadmServe:
                 if e.event_subject:
                     self.mgr.events.from_orch_error(e)
 
+            self.log.debug("serve loop sleep")
             self._serve_sleep()
+            self.log.debug("serve loop wake")
         self.log.debug("serve exit")
 
     def _serve_sleep(self) -> None:
@@ -126,8 +125,10 @@ class CephadmServe:
         self.mgr.event.clear()
 
     def _update_paused_health(self) -> None:
+        self.log.debug('_update_paused_health')
         if self.mgr.paused:
-            self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, ["'ceph orch resume' to resume"])
+            self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
+                                        "'ceph orch resume' to resume"])
         else:
             self.mgr.remove_health_warning('CEPHADM_PAUSED')
 
@@ -177,72 +178,16 @@ class CephadmServe:
         self.mgr.cache.update_autotune(host)
 
     def _refresh_hosts_and_daemons(self) -> None:
+        self.log.debug('_refresh_hosts_and_daemons')
         bad_hosts = []
         failures = []
 
-        # host -> path -> (mode, uid, gid, content, digest)
-        client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
-
-        # ceph.conf
         if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
-            config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
-            config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
+            client_files = self._calc_client_files()
+        else:
+            client_files = {}
 
-        if self.mgr.manage_etc_ceph_ceph_conf:
-            try:
-                pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
-                ha = HostAssignment(
-                    spec=ServiceSpec('mon', placement=pspec),
-                    hosts=self.mgr._schedulable_hosts(),
-                    unreachable_hosts=self.mgr._unreachable_hosts(),
-                    daemons=[],
-                    networks=self.mgr.cache.networks,
-                )
-                all_slots, _, _ = ha.place()
-                for host in {s.hostname for s in all_slots}:
-                    if host not in client_files:
-                        client_files[host] = {}
-                    client_files[host]['/etc/ceph/ceph.conf'] = (
-                        0o644, 0, 0, bytes(config), str(config_digest)
-                    )
-            except Exception as e:
-                self.mgr.log.warning(
-                    f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
-
-        # client keyrings
-        for ks in self.mgr.keys.keys.values():
-            assert config
-            assert config_digest
-            try:
-                ret, keyring, err = self.mgr.mon_command({
-                    'prefix': 'auth get',
-                    'entity': ks.entity,
-                })
-                if ret:
-                    self.log.warning(f'unable to fetch keyring for {ks.entity}')
-                    continue
-                digest = ''.join('%02x' % c for c in hashlib.sha256(
-                    keyring.encode('utf-8')).digest())
-                ha = HostAssignment(
-                    spec=ServiceSpec('mon', placement=ks.placement),
-                    hosts=self.mgr._schedulable_hosts(),
-                    unreachable_hosts=self.mgr._unreachable_hosts(),
-                    daemons=[],
-                    networks=self.mgr.cache.networks,
-                )
-                all_slots, _, _ = ha.place()
-                for host in {s.hostname for s in all_slots}:
-                    if host not in client_files:
-                        client_files[host] = {}
-                    client_files[host]['/etc/ceph/ceph.conf'] = (
-                        0o644, 0, 0, bytes(config), str(config_digest)
-                    )
-                    client_files[host][ks.path] = (
-                        ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
-                    )
-            except Exception as e:
-                self.log.warning(
-                    f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
+        agents_down: List[str] = []
 
         @forall_hosts
         def refresh(host: str) -> None:
@@ -251,35 +196,59 @@ class CephadmServe:
             if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
                 return
 
+            if self.mgr.use_agent:
+                if self.mgr.agent_helpers._check_agent(host):
+                    agents_down.append(host)
+
             if self.mgr.cache.host_needs_check(host):
                 r = self._check_host(host)
                 if r is not None:
                     bad_hosts.append(r)
-            if self.mgr.cache.host_needs_daemon_refresh(host):
-                self.log.debug('refreshing %s daemons' % host)
-                r = self._refresh_host_daemons(host)
-                if r:
-                    failures.append(r)
 
-            if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
+            if (
+                not self.mgr.use_agent
+                or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
+                or host in agents_down
+            ):
+                if self.mgr.cache.host_needs_daemon_refresh(host):
+                    self.log.debug('refreshing %s daemons' % host)
+                    r = self._refresh_host_daemons(host)
+                    if r:
+                        failures.append(r)
+
+                if self.mgr.cache.host_needs_facts_refresh(host):
+                    self.log.debug(('Refreshing %s facts' % host))
+                    r = self._refresh_facts(host)
+                    if r:
+                        failures.append(r)
+
+                if self.mgr.cache.host_needs_network_refresh(host):
+                    self.log.debug(('Refreshing %s networks' % host))
+                    r = self._refresh_host_networks(host)
+                    if r:
+                        failures.append(r)
+
+                if self.mgr.cache.host_needs_device_refresh(host):
+                    self.log.debug('refreshing %s devices' % host)
+                    r = self._refresh_host_devices(host)
+                    if r:
+                        failures.append(r)
+                self.mgr.cache.metadata_up_to_date[host] = True
+            elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
+                if self.mgr.cache.host_needs_daemon_refresh(host):
+                    self.log.debug('refreshing %s daemons' % host)
+                    r = self._refresh_host_daemons(host)
+                    if r:
+                        failures.append(r)
+                self.mgr.cache.metadata_up_to_date[host] = True
+
+            if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
                 self.log.debug(f"Logging `{host}` into custom registry")
-                r = self._registry_login(host, self.mgr.registry_url,
-                                         self.mgr.registry_username, self.mgr.registry_password)
+                r = self.mgr.wait_async(self._registry_login(
+                    host, json.loads(str(self.mgr.get_store('registry_credentials')))))
                 if r:
                     bad_hosts.append(r)
 
-            if self.mgr.cache.host_needs_device_refresh(host):
-                self.log.debug('refreshing %s devices' % host)
-                r = self._refresh_host_devices(host)
-                if r:
-                    failures.append(r)
-
-            if self.mgr.cache.host_needs_facts_refresh(host):
-                self.log.debug(('Refreshing %s facts' % host))
-                r = self._refresh_facts(host)
-                if r:
-                    failures.append(r)
-
             if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
                 self.log.debug(f"refreshing OSDSpec previews for {host}")
                 r = self._refresh_host_osdspec_previews(host)
@@ -293,54 +262,26 @@ class CephadmServe:
                 self.log.debug(f"autotuning memory for {host}")
                 self._autotune_host_memory(host)
 
-            # client files
-            updated_files = False
-            old_files = self.mgr.cache.get_host_client_files(host).copy()
-            for path, m in client_files.get(host, {}).items():
-                mode, uid, gid, content, digest = m
-                if path in old_files:
-                    match = old_files[path] == (digest, mode, uid, gid)
-                    del old_files[path]
-                    if match:
-                        continue
-                self.log.info(f'Updating {host}:{path}')
-                self._write_remote_file(host, path, content, mode, uid, gid)
-                self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
-                updated_files = True
-            for path in old_files.keys():
-                self.log.info(f'Removing {host}:{path}')
-                with self._remote_connection(host) as tpl:
-                    conn, connr = tpl
-                    out, err, code = remoto.process.check(
-                        conn,
-                        ['rm', '-f', path])
-                updated_files = True
-                self.mgr.cache.removed_client_file(host, path)
-            if updated_files:
-                self.mgr.cache.save_host(host)
+            self._write_client_files(client_files, host)
 
         refresh(self.mgr.cache.get_hosts())
 
+        self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
+
         self.mgr.config_checker.run_checks()
 
         for k in [
                 'CEPHADM_HOST_CHECK_FAILED',
-                'CEPHADM_FAILED_DAEMON',
                 'CEPHADM_REFRESH_FAILED',
         ]:
             self.mgr.remove_health_warning(k)
         if bad_hosts:
-            self.mgr.set_health_warning('CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
+            self.mgr.set_health_warning(
+                'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
         if failures:
-            self.mgr.set_health_warning('CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
-        failed_daemons = []
-        for dd in self.mgr.cache.get_daemons():
-            if dd.status is not None and dd.status == DaemonDescriptionStatus.error:
-                failed_daemons.append('daemon %s on %s is in %s state' % (
-                    dd.name(), dd.hostname, dd.status_desc
-                ))
-        if failed_daemons:
-            self.mgr.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(failed_daemons), failed_daemons)
+            self.mgr.set_health_warning(
+                'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
+        self.mgr.update_failed_daemon_health_check()
 
     def _check_host(self, host: str) -> Optional[str]:
         if host not in self.mgr.inventory:
@@ -348,9 +289,9 @@ class CephadmServe:
         self.log.debug(' checking %s' % host)
         try:
             addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
-            out, err, code = self._run_cephadm(
+            out, err, code = self.mgr.wait_async(self._run_cephadm(
                 host, cephadmNoImage, 'check-host', [],
-                error_ok=True, no_fsid=True)
+                error_ok=True, no_fsid=True))
             self.mgr.cache.update_last_host_check(host)
             self.mgr.cache.save_host(host)
             if code:
@@ -366,70 +307,16 @@ class CephadmServe:
 
     def _refresh_host_daemons(self, host: str) -> Optional[str]:
         try:
-            ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
+            ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
         except OrchestratorError as e:
             return str(e)
-        dm = {}
-        for d in ls:
-            if not d['style'].startswith('cephadm'):
-                continue
-            if d['fsid'] != self.mgr._cluster_fsid:
-                continue
-            if '.' not in d['name']:
-                continue
-            sd = orchestrator.DaemonDescription()
-            sd.last_refresh = datetime_now()
-            for k in ['created', 'started', 'last_configured', 'last_deployed']:
-                v = d.get(k, None)
-                if v:
-                    setattr(sd, k, str_to_datetime(d[k]))
-            sd.daemon_type = d['name'].split('.')[0]
-            if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
-                logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
-                continue
-
-            sd.daemon_id = '.'.join(d['name'].split('.')[1:])
-            sd.hostname = host
-            sd.container_id = d.get('container_id')
-            if sd.container_id:
-                # shorten the hash
-                sd.container_id = sd.container_id[0:12]
-            sd.container_image_name = d.get('container_image_name')
-            sd.container_image_id = d.get('container_image_id')
-            sd.container_image_digests = d.get('container_image_digests')
-            sd.memory_usage = d.get('memory_usage')
-            sd.memory_request = d.get('memory_request')
-            sd.memory_limit = d.get('memory_limit')
-            sd._service_name = d.get('service_name')
-            sd.deployed_by = d.get('deployed_by')
-            sd.version = d.get('version')
-            sd.ports = d.get('ports')
-            sd.ip = d.get('ip')
-            sd.rank = int(d['rank']) if d.get('rank') is not None else None
-            sd.rank_generation = int(d['rank_generation']) if d.get(
-                'rank_generation') is not None else None
-            if sd.daemon_type == 'osd':
-                sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
-            if 'state' in d:
-                sd.status_desc = d['state']
-                sd.status = {
-                    'running': DaemonDescriptionStatus.running,
-                    'stopped': DaemonDescriptionStatus.stopped,
-                    'error': DaemonDescriptionStatus.error,
-                    'unknown': DaemonDescriptionStatus.error,
-                }[d['state']]
-            else:
-                sd.status_desc = 'unknown'
-                sd.status = None
-            dm[sd.name()] = sd
-        self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
-        self.mgr.cache.update_host_daemons(host, dm)
-        self.mgr.cache.save_host(host)
+        self.mgr._process_ls_output(host, ls)
         return None
 
     def _refresh_facts(self, host: str) -> Optional[str]:
         try:
-            val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
+            val = self.mgr.wait_async(self._run_cephadm_json(
+                host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
         except OrchestratorError as e:
             return str(e)
 
@@ -438,8 +325,7 @@ class CephadmServe:
         return None
 
     def _refresh_host_devices(self, host: str) -> Optional[str]:
-
-        with_lsm = self.mgr.get_module_option('device_enhanced_scan')
+        with_lsm = self.mgr.device_enhanced_scan
         inventory_args = ['--', 'inventory',
                           '--format=json-pretty',
                           '--filter-for-batch']
@@ -448,29 +334,41 @@ class CephadmServe:
 
         try:
             try:
-                devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
-                                                 inventory_args)
+                devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+                                                                     inventory_args))
             except OrchestratorError as e:
                 if 'unrecognized arguments: --filter-for-batch' in str(e):
                     rerun_args = inventory_args.copy()
                     rerun_args.remove('--filter-for-batch')
-                    devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
-                                                     rerun_args)
+                    devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
+                                                                         rerun_args))
                 else:
                     raise
 
-            networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
         except OrchestratorError as e:
             return str(e)
 
-        self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
-            host, len(devices), len(networks)))
+        self.log.debug('Refreshed host %s devices (%d)' % (
+            host, len(devices)))
         ret = inventory.Devices.from_json(devices)
-        self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
+        self.mgr.cache.update_host_devices(host, ret.devices)
         self.update_osdspec_previews(host)
         self.mgr.cache.save_host(host)
         return None
 
+    def _refresh_host_networks(self, host: str) -> Optional[str]:
+        try:
+            networks = self.mgr.wait_async(self._run_cephadm_json(
+                host, 'mon', 'list-networks', [], no_fsid=True))
+        except OrchestratorError as e:
+            return str(e)
+
+        self.log.debug('Refreshed host %s networks (%s)' % (
+            host, len(networks)))
+        self.mgr.cache.update_host_networks(host, networks)
+        self.mgr.cache.save_host(host)
+        return None
+
     def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
         self.update_osdspec_previews(host)
         self.mgr.cache.save_host(host)
@@ -527,7 +425,11 @@ class CephadmServe:
                                     s.get('type'), s.get('id')
                                 )
                             )
-
+                    if s.get('type') == 'tcmu-runner':
+                        # because we don't track tcmu-runner daemons in the host cache
+                        # and don't have a way to check if the daemon is part of iscsi service
+                        # we assume that all tcmu-runner daemons are managed by cephadm
+                        managed.append(name)
                     if host not in self.mgr.inventory:
                         missing_names.append(name)
                         host_num_daemons += 1
@@ -539,15 +441,59 @@ class CephadmServe:
                         'stray host %s has %d stray daemons: %s' % (
                             host, len(missing_names), missing_names))
             if self.mgr.warn_on_stray_hosts and host_detail:
-                self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
+                self.mgr.set_health_warning(
+                    'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
             if self.mgr.warn_on_stray_daemons and daemon_detail:
-                self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+                self.mgr.set_health_warning(
+                    'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
+
+    def _check_for_moved_osds(self) -> None:
+        self.log.debug('_check_for_moved_osds')
+        all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
+        for dd in self.mgr.cache.get_daemons_by_type('osd'):
+            assert dd.daemon_id
+            all_osds[int(dd.daemon_id)].append(dd)
+        for osd_id, dds in all_osds.items():
+            if len(dds) <= 1:
+                continue
+            running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
+            error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
+            msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
+            logger.info(msg)
+            if len(running) != 1:
+                continue
+            osd = self.mgr.get_osd_by_id(osd_id)
+            if not osd or not osd['up']:
+                continue
+            for e in error:
+                assert e.hostname
+                try:
+                    self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
+                    self.mgr.events.for_daemon(
+                        e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
+                except OrchestratorError as ex:
+                    self.mgr.events.from_orch_error(ex)
+                    logger.exception(f'failed to remove duplicated daemon {e}')
 
     def _apply_all_services(self) -> bool:
+        self.log.debug('_apply_all_services')
         r = False
         specs = []  # type: List[ServiceSpec]
-        for sn, spec in self.mgr.spec_store.active_specs.items():
-            specs.append(spec)
+        # if metadata is not up to date, we still need to apply spec for agent
+        # since the agent is the one who gather the metadata. If we don't we
+        # end up stuck between wanting metadata to be up to date to apply specs
+        # and needing to apply the agent spec to get up to date metadata
+        if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
+            self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
+            try:
+                specs.append(self.mgr.spec_store['agent'].spec)
+            except Exception as e:
+                self.log.debug(f'Failed to find agent spec: {e}')
+                self.mgr.agent_helpers._apply_agent()
+                return r
+        else:
+            for sn, spec in self.mgr.spec_store.active_specs.items():
+                specs.append(spec)
         for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
             self.mgr.remove_health_warning(name)
         self.mgr.apply_spec_fails = []
@@ -603,9 +549,11 @@ class CephadmServe:
                         options_failed_to_set.append(msg)
 
             if invalid_config_options:
-                self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(invalid_config_options), invalid_config_options)
+                self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
+                    invalid_config_options), invalid_config_options)
             if options_failed_to_set:
-                self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(options_failed_to_set), options_failed_to_set)
+                self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
+                    options_failed_to_set), options_failed_to_set)
 
     def _apply_service(self, spec: ServiceSpec) -> bool:
         """
@@ -624,6 +572,15 @@ class CephadmServe:
             return False
         self.log.debug('Applying service %s spec' % service_name)
 
+        if service_type == 'agent':
+            try:
+                assert self.mgr.cherrypy_thread
+                assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+            except Exception:
+                self.log.info(
+                    'Delaying applying agent spec until cephadm endpoint root cert created')
+                return False
+
         self._apply_service_config(spec)
 
         if service_type == 'osd':
@@ -661,8 +618,9 @@ class CephadmServe:
             rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
         ha = HostAssignment(
             spec=spec,
-            hosts=self.mgr._schedulable_hosts(),
-            unreachable_hosts=self.mgr._unreachable_hosts(),
+            hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
+            ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
+            unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
             daemons=daemons,
             networks=self.mgr.cache.networks,
             filter_new_host=(
@@ -730,6 +688,8 @@ class CephadmServe:
         self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
         self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
 
+        hosts_altered: Set[str] = set()
+
         try:
             # assign names
             for i in range(len(slots_to_add)):
@@ -784,6 +744,7 @@ class CephadmServe:
                         self._remove_daemon(d.name(), d.hostname)
                         daemons_to_remove.remove(d)
                         progress_done += 1
+                        hosts_altered.add(d.hostname)
                         break
 
                 # deploy new daemon
@@ -805,10 +766,11 @@ class CephadmServe:
 
                 try:
                     daemon_spec = svc.prepare_create(daemon_spec)
-                    self._create_daemon(daemon_spec)
+                    self.mgr.wait_async(self._create_daemon(daemon_spec))
                     r = True
                     progress_done += 1
                     update_progress()
+                    hosts_altered.add(daemon_spec.host)
                 except (RuntimeError, OrchestratorError) as e:
                     msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
                            f"on {slot.hostname}: {e}")
@@ -832,7 +794,8 @@ class CephadmServe:
                 daemons.append(sd)
 
             if daemon_place_fails:
-                self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(daemon_place_fails), daemon_place_fails)
+                self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
+                    daemon_place_fails), daemon_place_fails)
 
             # remove any?
             def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
@@ -852,18 +815,25 @@ class CephadmServe:
 
                 progress_done += 1
                 update_progress()
+                hosts_altered.add(d.hostname)
 
             self.mgr.remote('progress', 'complete', progress_id)
         except Exception as e:
             self.mgr.remote('progress', 'fail', progress_id, str(e))
             raise
+        finally:
+            if self.mgr.use_agent:
+                # can only send ack to agents if we know for sure port they bound to
+                hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
+                                    h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
+                self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
 
         if r is None:
             r = False
         return r
 
     def _check_daemons(self) -> None:
-
+        self.log.debug('_check_daemons')
         daemons = self.mgr.cache.get_daemons()
         daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
         for dd in daemons:
@@ -886,6 +856,14 @@ class CephadmServe:
             if dd.service_name() in self.mgr.spec_store.spec_deleted:
                 continue
 
+            if dd.daemon_type == 'agent':
+                try:
+                    self.mgr.agent_helpers._check_agent(dd.hostname)
+                except Exception as e:
+                    self.log.debug(
+                        f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
+                continue
+
             # These daemon types require additional configs after creation
             if dd.daemon_type in REQUIRES_POST_ACTIONS:
                 daemons_post[dd.daemon_type].append(dd)
@@ -912,6 +890,12 @@ class CephadmServe:
                 self.log.info('Reconfiguring %s (dependencies changed)...' % (
                     dd.name()))
                 action = 'reconfig'
+            elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
+                self.log.debug(
+                    f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
+                self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
+                dd.extra_container_args = spec.extra_container_args
+                action = 'redeploy'
             elif self.mgr.last_monmap and \
                     self.mgr.last_monmap > last_config and \
                     dd.daemon_type in CEPH_TYPES:
@@ -928,7 +912,8 @@ class CephadmServe:
                 try:
                     daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
                     self.mgr._daemon_action(daemon_spec, action=action)
-                    self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
+                    if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
+                        self.mgr.cache.save_host(dd.hostname)
                 except OrchestratorError as e:
                     self.mgr.events.from_orch_error(e)
                     if dd.daemon_type in daemons_post:
@@ -952,6 +937,7 @@ class CephadmServe:
                     daemon_type)).daemon_check_post(daemon_descs)
 
     def _purge_deleted_services(self) -> None:
+        self.log.debug('_purge_deleted_services')
         existing_services = self.mgr.spec_store.all_specs.items()
         for service_name, spec in list(existing_services):
             if service_name not in self.mgr.spec_store.spec_deleted:
@@ -973,7 +959,8 @@ class CephadmServe:
         digests: Dict[str, ContainerInspectInfo] = {}
         for container_image_ref in set(settings.values()):
             if not is_repo_digest(container_image_ref):
-                image_info = self._get_container_image_info(container_image_ref)
+                image_info = self.mgr.wait_async(
+                    self._get_container_image_info(container_image_ref))
                 if image_info.repo_digests:
                     # FIXME: we assume the first digest here is the best
                     assert is_repo_digest(image_info.repo_digests[0]), image_info
@@ -986,11 +973,99 @@ class CephadmServe:
                     # FIXME: we assume the first digest here is the best
                     self.mgr.set_container_image(entity, image_info.repo_digests[0])
 
-    def _create_daemon(self,
-                       daemon_spec: CephadmDaemonDeploySpec,
-                       reconfig: bool = False,
-                       osd_uuid_map: Optional[Dict[str, Any]] = None,
-                       ) -> str:
+    def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
+        # host -> path -> (mode, uid, gid, content, digest)
+        client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
+
+        # ceph.conf
+        config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
+        config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
+
+        if self.mgr.manage_etc_ceph_ceph_conf:
+            try:
+                pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
+                ha = HostAssignment(
+                    spec=ServiceSpec('mon', placement=pspec),
+                    hosts=self.mgr.cache.get_schedulable_hosts(),
+                    unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+                    daemons=[],
+                    networks=self.mgr.cache.networks,
+                )
+                all_slots, _, _ = ha.place()
+                for host in {s.hostname for s in all_slots}:
+                    if host not in client_files:
+                        client_files[host] = {}
+                    client_files[host]['/etc/ceph/ceph.conf'] = (
+                        0o644, 0, 0, bytes(config), str(config_digest)
+                    )
+            except Exception as e:
+                self.mgr.log.warning(
+                    f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
+
+        # client keyrings
+        for ks in self.mgr.keys.keys.values():
+            try:
+                ret, keyring, err = self.mgr.mon_command({
+                    'prefix': 'auth get',
+                    'entity': ks.entity,
+                })
+                if ret:
+                    self.log.warning(f'unable to fetch keyring for {ks.entity}')
+                    continue
+                digest = ''.join('%02x' % c for c in hashlib.sha256(
+                    keyring.encode('utf-8')).digest())
+                ha = HostAssignment(
+                    spec=ServiceSpec('mon', placement=ks.placement),
+                    hosts=self.mgr.cache.get_schedulable_hosts(),
+                    unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
+                    daemons=[],
+                    networks=self.mgr.cache.networks,
+                )
+                all_slots, _, _ = ha.place()
+                for host in {s.hostname for s in all_slots}:
+                    if host not in client_files:
+                        client_files[host] = {}
+                    client_files[host]['/etc/ceph/ceph.conf'] = (
+                        0o644, 0, 0, bytes(config), str(config_digest)
+                    )
+                    client_files[host][ks.path] = (
+                        ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
+                    )
+            except Exception as e:
+                self.log.warning(
+                    f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
+        return client_files
+
+    def _write_client_files(self,
+                            client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
+                            host: str) -> None:
+        updated_files = False
+        old_files = self.mgr.cache.get_host_client_files(host).copy()
+        for path, m in client_files.get(host, {}).items():
+            mode, uid, gid, content, digest = m
+            if path in old_files:
+                match = old_files[path] == (digest, mode, uid, gid)
+                del old_files[path]
+                if match:
+                    continue
+            self.log.info(f'Updating {host}:{path}')
+            self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
+            self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
+            updated_files = True
+        for path in old_files.keys():
+            self.log.info(f'Removing {host}:{path}')
+            cmd = ['rm', '-f', path]
+            self.mgr.ssh.check_execute_command(host, cmd)
+            updated_files = True
+            self.mgr.cache.removed_client_file(host, path)
+        if updated_files:
+            self.mgr.cache.save_host(host)
+
+    async def _create_daemon(self,
+                             daemon_spec: CephadmDaemonDeploySpec,
+                             reconfig: bool = False,
+                             osd_uuid_map: Optional[Dict[str, Any]] = None,
+                             ) -> str:
 
         with set_exception_subject('service', orchestrator.DaemonDescription(
                 daemon_type=daemon_spec.daemon_type,
@@ -1010,11 +1085,6 @@ class CephadmServe:
                     if spec.ports:
                         ports.extend(spec.ports)
 
-                if daemon_spec.daemon_type == 'cephadm-exporter':
-                    if not reconfig:
-                        assert daemon_spec.host
-                        self._deploy_cephadm_binary(daemon_spec.host)
-
                 # TCP port to open in the host firewall
                 if len(ports) > 0:
                     daemon_spec.extra_args.extend([
@@ -1035,15 +1105,22 @@ class CephadmServe:
                 if self.mgr.allow_ptrace:
                     daemon_spec.extra_args.append('--allow-ptrace')
 
+                try:
+                    eca = daemon_spec.extra_container_args
+                    if eca:
+                        for a in eca:
+                            daemon_spec.extra_args.append(f'--extra-container-args={a}')
+                except AttributeError:
+                    eca = None
+
                 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
-                    self._registry_login(daemon_spec.host, self.mgr.registry_url,
-                                         self.mgr.registry_username, self.mgr.registry_password)
+                    await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
 
                 self.log.info('%s daemon %s on %s' % (
                     'Reconfiguring' if reconfig else 'Deploying',
                     daemon_spec.name(), daemon_spec.host))
 
-                out, err, code = self._run_cephadm(
+                out, err, code = await self._run_cephadm(
                     daemon_spec.host, daemon_spec.name(), 'deploy',
                     [
                         '--name', daemon_spec.name(),
@@ -1054,11 +1131,17 @@ class CephadmServe:
                             'deployed_by': self.mgr.get_active_mgr_digests(),
                             'rank': daemon_spec.rank,
                             'rank_generation': daemon_spec.rank_generation,
+                            'extra_container_args': eca
                         }),
                         '--config-json', '-',
                     ] + daemon_spec.extra_args,
                     stdin=json.dumps(daemon_spec.final_config),
-                    image=image)
+                    image=image,
+                )
+
+                if daemon_spec.daemon_type == 'agent':
+                    self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
+                    self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
 
                 # refresh daemon state?  (ceph daemon reconfig does not need it)
                 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
@@ -1066,15 +1149,20 @@ class CephadmServe:
                         # prime cached service state with what we (should have)
                         # just created
                         sd = daemon_spec.to_daemon_description(
-                            DaemonDescriptionStatus.running, 'starting')
+                            DaemonDescriptionStatus.starting, 'starting')
                         self.mgr.cache.add_daemon(daemon_spec.host, sd)
                         if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
                             self.mgr.requires_post_actions.add(daemon_spec.name())
                     self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
 
-                self.mgr.cache.update_daemon_config_deps(
-                    daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
-                self.mgr.cache.save_host(daemon_spec.host)
+                if daemon_spec.daemon_type != 'agent':
+                    self.mgr.cache.update_daemon_config_deps(
+                        daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
+                    self.mgr.cache.save_host(daemon_spec.host)
+                else:
+                    self.mgr.agent_cache.update_agent_config_deps(
+                        daemon_spec.host, daemon_spec.deps, start_time)
+                    self.mgr.agent_cache.save_agent(daemon_spec.host)
                 msg = "{} {} on host '{}'".format(
                     'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
                 if not code:
@@ -1093,7 +1181,7 @@ class CephadmServe:
                     self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
                 raise
 
-    def _remove_daemon(self, name: str, host: str) -> str:
+    def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
         """
         Remove a daemon
         """
@@ -1111,28 +1199,29 @@ class CephadmServe:
             # we can delete a mon instances data.
             args = ['--name', name, '--force']
             self.log.info('Removing daemon %s from %s' % (name, host))
-            out, err, code = self._run_cephadm(
-                host, name, 'rm-daemon', args)
+            out, err, code = self.mgr.wait_async(self._run_cephadm(
+                host, name, 'rm-daemon', args))
             if not code:
                 # remove item from cache
                 self.mgr.cache.rm_daemon(host, name)
             self.mgr.cache.invalidate_host_daemons(host)
 
-            self.mgr.cephadm_services[daemon_type_to_service(
-                daemon_type)].post_remove(daemon, is_failed_deploy=False)
+            if not no_post_remove:
+                self.mgr.cephadm_services[daemon_type_to_service(
+                    daemon_type)].post_remove(daemon, is_failed_deploy=False)
 
             return "Removed {} from host '{}'".format(name, host)
 
-    def _run_cephadm_json(self,
-                          host: str,
-                          entity: Union[CephadmNoImage, str],
-                          command: str,
-                          args: List[str],
-                          no_fsid: Optional[bool] = False,
-                          image: Optional[str] = "",
-                          ) -> Any:
+    async def _run_cephadm_json(self,
+                                host: str,
+                                entity: Union[CephadmNoImage, str],
+                                command: str,
+                                args: List[str],
+                                no_fsid: Optional[bool] = False,
+                                image: Optional[str] = "",
+                                ) -> Any:
         try:
-            out, err, code = self._run_cephadm(
+            out, err, code = await self._run_cephadm(
                 host, entity, command, args, no_fsid=no_fsid, image=image)
             if code:
                 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
@@ -1145,18 +1234,18 @@ class CephadmServe:
             self.log.exception(f'{msg}: {"".join(out)}')
             raise OrchestratorError(msg)
 
-    def _run_cephadm(self,
-                     host: str,
-                     entity: Union[CephadmNoImage, str],
-                     command: str,
-                     args: List[str],
-                     addr: Optional[str] = "",
-                     stdin: Optional[str] = "",
-                     no_fsid: Optional[bool] = False,
-                     error_ok: Optional[bool] = False,
-                     image: Optional[str] = "",
-                     env_vars: Optional[List[str]] = None,
-                     ) -> Tuple[List[str], List[str], int]:
+    async def _run_cephadm(self,
+                           host: str,
+                           entity: Union[CephadmNoImage, str],
+                           command: str,
+                           args: List[str],
+                           addr: Optional[str] = "",
+                           stdin: Optional[str] = "",
+                           no_fsid: Optional[bool] = False,
+                           error_ok: Optional[bool] = False,
+                           image: Optional[str] = "",
+                           env_vars: Optional[List[str]] = None,
+                           ) -> Tuple[List[str], List[str], int]:
         """
         Run cephadm on the remote host with the given command + args
 
@@ -1164,99 +1253,100 @@ class CephadmServe:
 
         :env_vars: in format -> [KEY=VALUE, ..]
         """
+
+        await self.mgr.ssh._remote_connection(host, addr)
+
         self.log.debug(f"_run_cephadm : command = {command}")
         self.log.debug(f"_run_cephadm : args = {args}")
 
-        bypass_image = ('cephadm-exporter',)
+        bypass_image = ('agent')
 
-        with self._remote_connection(host, addr) as tpl:
-            conn, connr = tpl
-            assert image or entity
-            # Skip the image check for daemons deployed that are not ceph containers
-            if not str(entity).startswith(bypass_image):
-                if not image and entity is not cephadmNoImage:
-                    image = self.mgr._get_container_image(entity)
+        assert image or entity
+        # Skip the image check for daemons deployed that are not ceph containers
+        if not str(entity).startswith(bypass_image):
+            if not image and entity is not cephadmNoImage:
+                image = self.mgr._get_container_image(entity)
 
-            final_args = []
+        final_args = []
 
-            # global args
-            if env_vars:
-                for env_var_pair in env_vars:
-                    final_args.extend(['--env', env_var_pair])
+        # global args
+        if env_vars:
+            for env_var_pair in env_vars:
+                final_args.extend(['--env', env_var_pair])
 
-            if image:
-                final_args.extend(['--image', image])
+        if image:
+            final_args.extend(['--image', image])
 
-            if not self.mgr.container_init:
-                final_args += ['--no-container-init']
+        if not self.mgr.container_init:
+            final_args += ['--no-container-init']
 
-            # subcommand
-            final_args.append(command)
+        # subcommand
+        final_args.append(command)
 
-            # subcommand args
-            if not no_fsid:
-                final_args += ['--fsid', self.mgr._cluster_fsid]
+        # subcommand args
+        if not no_fsid:
+            final_args += ['--fsid', self.mgr._cluster_fsid]
 
-            final_args += args
+        final_args += args
 
-            # exec
-            self.log.debug('args: %s' % (' '.join(final_args)))
-            if self.mgr.mode == 'root':
-                if stdin:
-                    self.log.debug('stdin: %s' % stdin)
+        # exec
+        self.log.debug('args: %s' % (' '.join(final_args)))
+        if self.mgr.mode == 'root':
+            # agent has cephadm binary as an extra file which is
+            # therefore passed over stdin. Even for debug logs it's too much
+            if stdin and 'agent' not in str(entity):
+                self.log.debug('stdin: %s' % stdin)
 
-                python = connr.choose_python()
-                if not python:
-                    raise RuntimeError(
-                        'unable to find python on %s (tried %s in %s)' % (
-                            host, remotes.PYTHONS, remotes.PATH))
-                try:
-                    out, err, code = remoto.process.check(
-                        conn,
-                        [python, self.mgr.cephadm_binary_path] + final_args,
-                        stdin=stdin.encode('utf-8') if stdin is not None else None)
-                    if code == 2:
-                        out_ls, err_ls, code_ls = remoto.process.check(
-                            conn, ['ls', self.mgr.cephadm_binary_path])
-                        if code_ls == 2:
-                            self._deploy_cephadm_binary_conn(conn, host)
-                            out, err, code = remoto.process.check(
-                                conn,
-                                [python, self.mgr.cephadm_binary_path] + final_args,
-                                stdin=stdin.encode('utf-8') if stdin is not None else None)
-
-                except RuntimeError as e:
-                    self.mgr._reset_con(host)
-                    if error_ok:
-                        return [], [str(e)], 1
-                    raise
+            cmd = ['which', 'python3']
+            python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
+            cmd = [python, self.mgr.cephadm_binary_path] + final_args
 
-            elif self.mgr.mode == 'cephadm-package':
-                try:
-                    out, err, code = remoto.process.check(
-                        conn,
-                        ['sudo', '/usr/bin/cephadm'] + final_args,
-                        stdin=stdin)
-                except RuntimeError as e:
-                    self.mgr._reset_con(host)
-                    if error_ok:
-                        return [], [str(e)], 1
-                    raise
-            else:
-                assert False, 'unsupported mode'
-
-            self.log.debug('code: %d' % code)
-            if out:
-                self.log.debug('out: %s' % '\n'.join(out))
-            if err:
-                self.log.debug('err: %s' % '\n'.join(err))
-            if code and not error_ok:
-                raise OrchestratorError(
-                    'cephadm exited with an error code: %d, stderr:%s' % (
-                        code, '\n'.join(err)))
-            return out, err, code
-
-    def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
+            try:
+                out, err, code = await self.mgr.ssh._execute_command(
+                    host, cmd, stdin=stdin, addr=addr)
+                if code == 2:
+                    ls_cmd = ['ls', self.mgr.cephadm_binary_path]
+                    out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
+                    if code_ls == 2:
+                        await self._deploy_cephadm_binary(host, addr)
+                        out, err, code = await self.mgr.ssh._execute_command(
+                            host, cmd, stdin=stdin, addr=addr)
+                        # if there is an agent on this host, make sure it is using the most recent
+                        # vesion of cephadm binary
+                        if host in self.mgr.inventory:
+                            for agent in self.mgr.cache.get_daemons_by_type('agent', host):
+                                self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
+
+            except Exception as e:
+                await self.mgr.ssh._reset_con(host)
+                if error_ok:
+                    return [], [str(e)], 1
+                raise
+
+        elif self.mgr.mode == 'cephadm-package':
+            try:
+                cmd = ['/usr/bin/cephadm'] + final_args
+                out, err, code = await self.mgr.ssh._execute_command(
+                    host, cmd, stdin=stdin, addr=addr)
+            except Exception as e:
+                await self.mgr.ssh._reset_con(host)
+                if error_ok:
+                    return [], [str(e)], 1
+                raise
+        else:
+            assert False, 'unsupported mode'
+
+        self.log.debug(f'code: {code}')
+        if out:
+            self.log.debug(f'out: {out}')
+        if err:
+            self.log.debug(f'err: {err}')
+        if code and not error_ok:
+            raise OrchestratorError(
+                f'cephadm exited with an error code: {code}, stderr: {err}')
+        return [out], [err], code
+
+    async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
         # pick a random host...
         host = None
         for host_name in self.mgr.inventory.keys():
@@ -1265,14 +1355,13 @@ class CephadmServe:
         if not host:
             raise OrchestratorError('no hosts defined')
         if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
-            self._registry_login(host, self.mgr.registry_url,
-                                 self.mgr.registry_username, self.mgr.registry_password)
+            await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
 
         pullargs: List[str] = []
         if self.mgr.registry_insecure:
             pullargs.append("--insecure")
 
-        j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
+        j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
 
         r = ContainerInspectInfo(
             j['image_id'],
@@ -1283,112 +1372,19 @@ class CephadmServe:
         return r
 
     # function responsible for logging single host into custom registry
-    def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
-        self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
+    async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
+        self.log.debug(
+            f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
         # want to pass info over stdin rather than through normal list of args
-        args_str = json.dumps({
-            'url': url,
-            'username': username,
-            'password': password,
-        })
-        out, err, code = self._run_cephadm(
+        out, err, code = await self._run_cephadm(
             host, 'mon', 'registry-login',
-            ['--registry-json', '-'], stdin=args_str, error_ok=True)
+            ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
         if code:
-            return f"Host {host} failed to login to {url} as {username} with given password"
+            return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
         return None
 
-    def _deploy_cephadm_binary(self, host: str) -> None:
+    async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
         # Use tee (from coreutils) to create a copy of cephadm on the target machine
         self.log.info(f"Deploying cephadm binary to {host}")
-        with self._remote_connection(host) as tpl:
-            conn, _connr = tpl
-            return self._deploy_cephadm_binary_conn(conn, host)
-
-    def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None:
-        _out, _err, code = remoto.process.check(
-            conn,
-            ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}'])
-        if code:
-            msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
-            self.log.warning(msg)
-            raise OrchestratorError(msg)
-        _out, _err, code = remoto.process.check(
-            conn,
-            ['tee', '-', self.mgr.cephadm_binary_path],
-            stdin=self.mgr._cephadm.encode('utf-8'))
-        if code:
-            msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
-            self.log.warning(msg)
-            raise OrchestratorError(msg)
-
-    def _write_remote_file(self,
-                           host: str,
-                           path: str,
-                           content: bytes,
-                           mode: int,
-                           uid: int,
-                           gid: int) -> None:
-        with self._remote_connection(host) as tpl:
-            conn, connr = tpl
-            try:
-                errmsg = connr.write_file(path, content, mode, uid, gid)
-                if errmsg is not None:
-                    raise OrchestratorError(errmsg)
-            except Exception as e:
-                msg = f"Unable to write {host}:{path}: {e}"
-                self.log.warning(msg)
-                raise OrchestratorError(msg)
-
-    @contextmanager
-    def _remote_connection(self,
-                           host: str,
-                           addr: Optional[str] = None,
-                           ) -> Iterator[Tuple["BaseConnection", Any]]:
-        if not addr and host in self.mgr.inventory:
-            addr = self.mgr.inventory.get_addr(host)
-
-        self.mgr.offline_hosts_remove(host)
-
-        try:
-            try:
-                if not addr:
-                    raise OrchestratorError("host address is empty")
-                conn, connr = self.mgr._get_connection(addr)
-            except OSError as e:
-                self.mgr._reset_con(host)
-                msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
-                raise execnet.gateway_bootstrap.HostNotFound(msg)
-
-            yield (conn, connr)
-
-        except execnet.gateway_bootstrap.HostNotFound as e:
-            # this is a misleading exception as it seems to be thrown for
-            # any sort of connection failure, even those having nothing to
-            # do with "host not found" (e.g., ssh key permission denied).
-            self.mgr.offline_hosts.add(host)
-            self.mgr._reset_con(host)
-
-            user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm'
-            if str(e).startswith("Can't communicate"):
-                msg = str(e)
-            else:
-                msg = f'''Failed to connect to {host} ({addr}).
-Please make sure that the host is reachable and accepts connections using the cephadm SSH key
-
-To add the cephadm SSH key to the host:
-> ceph cephadm get-pub-key > ~/ceph.pub
-> ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
-
-To check that the host is reachable open a new shell with the --no-hosts flag:
-> cephadm shell --no-hosts
-
-Then run the following:
-> ceph cephadm get-ssh-config > ssh_config
-> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
-> chmod 0600 ~/cephadm_private_key
-> ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
-            raise OrchestratorError(msg) from e
-        except Exception as ex:
-            self.log.exception(ex)
-            raise
+        await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
+                                              self.mgr._cephadm.encode('utf-8'), addr=addr)