]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cephadm/cephadm.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / cephadm / cephadm.py
index 593a08f009e9a5bd9a709505333598e4346f010b..bcb82c4c4d4bf065063b21d764750aa76372c616 100755 (executable)
@@ -26,13 +26,13 @@ import errno
 import struct
 import ssl
 from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO, Generator
 
 import re
 import uuid
 
 from configparser import ConfigParser
-from contextlib import redirect_stdout
+from contextlib import redirect_stdout, contextmanager
 from functools import wraps
 from glob import glob
 from io import StringIO
@@ -55,6 +55,7 @@ DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
 DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
 DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
 DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
+DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
 DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
 DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
 DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
@@ -79,6 +80,7 @@ CEPH_DEFAULT_KEYRING = f'/etc/ceph/{CEPH_KEYRING}'
 CEPH_DEFAULT_PUBKEY = f'/etc/ceph/{CEPH_PUBKEY}'
 LOG_DIR_MODE = 0o770
 DATA_DIR_MODE = 0o700
+DEFAULT_MODE = 0o600
 CONTAINER_INIT = True
 MIN_PODMAN_VERSION = (2, 0, 2)
 CGROUPS_SPLIT_PODMAN_VERSION = (2, 1, 0)
@@ -88,6 +90,7 @@ DEFAULT_TIMEOUT = None  # in seconds
 DEFAULT_RETRY = 15
 DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
 QUIET_LOG_LEVEL = 9  # DEBUG is 10, so using 9 to be lower level than DEBUG
+NO_DEPRECATED = False
 
 logger: logging.Logger = None  # type: ignore
 
@@ -169,6 +172,17 @@ class ContainerInfo:
                 and self.version == other.version)
 
 
+class DeploymentType(Enum):
+    # Fresh deployment of a daemon.
+    DEFAULT = 'Deploy'
+    # Redeploying a daemon. Works the same as fresh
+    # deployment minus port checking.
+    REDEPLOY = 'Redeploy'
+    # Reconfiguring a daemon. Rewrites config
+    # files and potentially restarts daemon.
+    RECONFIG = 'Reconfig'
+
+
 class BaseConfig:
 
     def __init__(self) -> None:
@@ -363,6 +377,10 @@ class Error(Exception):
     pass
 
 
+class ClusterAlreadyExists(Exception):
+    pass
+
+
 class TimeoutExpired(Error):
     pass
 
@@ -376,7 +394,7 @@ class UnauthorizedRegistryError(Error):
 class Ceph(object):
     daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
                'crash', 'cephfs-mirror', 'ceph-exporter')
-    gateways = ('iscsi', 'nfs')
+    gateways = ('iscsi', 'nfs', 'nvmeof')
 
 ##################################
 
@@ -432,9 +450,9 @@ class SNMPGateway:
     @classmethod
     def init(cls, ctx: CephadmContext, fsid: str,
              daemon_id: Union[int, str]) -> 'SNMPGateway':
-        assert ctx.config_json
-        return cls(ctx, fsid, daemon_id,
-                   get_parm(ctx.config_json), ctx.image)
+        cfgs = fetch_configs(ctx)
+        assert cfgs  # assert some config data was found
+        return cls(ctx, fsid, daemon_id, cfgs, ctx.image)
 
     @staticmethod
     def get_version(ctx: CephadmContext, fsid: str, daemon_id: str) -> Optional[str]:
@@ -467,13 +485,10 @@ class SNMPGateway:
 
     @property
     def port(self) -> int:
-        if not self.ctx.tcp_ports:
+        endpoints = fetch_tcp_ports(self.ctx)
+        if not endpoints:
             return self.DEFAULT_PORT
-        else:
-            if len(self.ctx.tcp_ports) > 0:
-                return int(self.ctx.tcp_ports.split()[0])
-            else:
-                return self.DEFAULT_PORT
+        return endpoints[0].port
 
     def get_daemon_args(self) -> List[str]:
         v3_args = []
@@ -511,7 +526,7 @@ class SNMPGateway:
 
     def create_daemon_conf(self) -> None:
         """Creates the environment file holding 'secrets' passed to the snmp-notifier daemon"""
-        with open(os.open(self.conf_file_path, os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+        with write_new(self.conf_file_path) as f:
             if self.snmp_version == 'V2c':
                 f.write(f'SNMP_NOTIFIER_COMMUNITY={self.snmp_community}\n')
             else:
@@ -657,6 +672,42 @@ class Monitoring(object):
 ##################################
 
 
+@contextmanager
+def write_new(
+    destination: Union[str, Path],
+    *,
+    owner: Optional[Tuple[int, int]] = None,
+    perms: Optional[int] = DEFAULT_MODE,
+    encoding: Optional[str] = None,
+) -> Generator[IO, None, None]:
+    """Write a new file in a robust manner, optionally specifying the owner,
+    permissions, or encoding. This function takes care to never leave a file in
+    a partially-written state due to a crash or power outage by writing to
+    temporary file and then renaming that temp file over to the final
+    destination once all data is written.  Note that the temporary files can be
+    leaked but only for a "crash" or power outage - regular exceptions will
+    clean up the temporary file.
+    """
+    destination = os.path.abspath(destination)
+    tempname = f'{destination}.new'
+    open_kwargs: Dict[str, Any] = {}
+    if encoding:
+        open_kwargs['encoding'] = encoding
+    try:
+        with open(tempname, 'w', **open_kwargs) as fh:
+            yield fh
+            fh.flush()
+            os.fsync(fh.fileno())
+            if owner is not None:
+                os.fchown(fh.fileno(), *owner)
+            if perms is not None:
+                os.fchmod(fh.fileno(), perms)
+    except Exception:
+        os.unlink(tempname)
+        raise
+    os.rename(tempname, destination)
+
+
 def populate_files(config_dir, config_files, uid, gid):
     # type: (str, Dict, int, int) -> None
     """create config files for different services"""
@@ -664,9 +715,7 @@ def populate_files(config_dir, config_files, uid, gid):
         config_file = os.path.join(config_dir, fname)
         config_content = dict_get_join(config_files, fname)
         logger.info('Write file: %s' % (config_file))
-        with open(config_file, 'w', encoding='utf-8') as f:
-            os.fchown(f.fileno(), uid, gid)
-            os.fchmod(f.fileno(), 0o600)
+        with write_new(config_file, owner=(uid, gid), encoding='utf-8') as f:
             f.write(config_content)
 
 
@@ -709,7 +758,7 @@ class NFSGanesha(object):
     @classmethod
     def init(cls, ctx, fsid, daemon_id):
         # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha
-        return cls(ctx, fsid, daemon_id, get_parm(ctx.config_json), ctx.image)
+        return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image)
 
     def get_container_mounts(self, data_dir):
         # type: (str) -> Dict[str, str]
@@ -801,9 +850,7 @@ class NFSGanesha(object):
         # write the RGW keyring
         if self.rgw:
             keyring_path = os.path.join(data_dir, 'keyring.rgw')
-            with open(keyring_path, 'w') as f:
-                os.fchmod(f.fileno(), 0o600)
-                os.fchown(f.fileno(), uid, gid)
+            with write_new(keyring_path, owner=(uid, gid)) as f:
                 f.write(self.rgw.get('keyring', ''))
 
 ##################################
@@ -839,7 +886,7 @@ class CephIscsi(object):
     def init(cls, ctx, fsid, daemon_id):
         # type: (CephadmContext, str, Union[int, str]) -> CephIscsi
         return cls(ctx, fsid, daemon_id,
-                   get_parm(ctx.config_json), ctx.image)
+                   fetch_configs(ctx), ctx.image)
 
     @staticmethod
     def get_container_mounts(data_dir, log_dir):
@@ -849,6 +896,7 @@ class CephIscsi(object):
         mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
         mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
         mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+        mounts[os.path.join(data_dir, 'tcmu-runner-entrypoint.sh')] = '/usr/local/scripts/tcmu-runner-entrypoint.sh'
         mounts[log_dir] = '/var/log:z'
         mounts['/dev'] = '/dev'
         return mounts
@@ -870,7 +918,8 @@ class CephIscsi(object):
         version = None
         out, err, code = call(ctx,
                               [ctx.container_engine.path, 'exec', container_id,
-                               '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
+                               '/usr/bin/python3', '-c',
+                               "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"],
                               verbosity=CallVerbosity.QUIET)
         if code == 0:
             version = out.strip()
@@ -912,9 +961,19 @@ class CephIscsi(object):
         configfs_dir = os.path.join(data_dir, 'configfs')
         makedirs(configfs_dir, uid, gid, 0o755)
 
+        # set up the tcmu-runner entrypoint script
+        # to be mounted into the container. For more info
+        # on why we need this script, see the
+        # tcmu_runner_entrypoint_script function
+        self.files['tcmu-runner-entrypoint.sh'] = self.tcmu_runner_entrypoint_script()
+
         # populate files from the config-json
         populate_files(data_dir, self.files, uid, gid)
 
+        # we want the tcmu runner entrypoint script to be executable
+        # populate_files will give it 0o600 by default
+        os.chmod(os.path.join(data_dir, 'tcmu-runner-entrypoint.sh'), 0o700)
+
     @staticmethod
     def configfs_mount_umount(data_dir, mount=True):
         # type: (str, bool) -> List[str]
@@ -927,16 +986,182 @@ class CephIscsi(object):
                   'umount {0}; fi'.format(mount_path)
         return cmd.split()
 
+    @staticmethod
+    def tcmu_runner_entrypoint_script() -> str:
+        # since we are having tcmu-runner be a background
+        # process in its systemd unit (rbd-target-api being
+        # the main process) systemd will not restart it when
+        # it fails. in order to try and get around that for now
+        # we can have a script mounted in the container that
+        # that attempts to do the restarting for us. This script
+        # can then become the entrypoint for the tcmu-runner
+        # container
+
+        # This is intended to be dropped for a better solution
+        # for at least the squid release onward
+        return """#!/bin/bash
+RUN_DIR=/var/run/tcmu-runner
+
+if [ ! -d "${RUN_DIR}" ] ; then
+    mkdir -p "${RUN_DIR}"
+fi
+
+rm -rf "${RUN_DIR}"/*
+
+while true
+do
+    touch "${RUN_DIR}"/start-up-$(date -Ins)
+    /usr/bin/tcmu-runner
+
+    # If we got around 3 kills/segfaults in the last minute,
+    # don't start anymore
+    if [ $(find "${RUN_DIR}" -type f -cmin -1 | wc -l) -ge 3 ] ; then
+        exit 0
+    fi
+
+    sleep 1
+done
+"""
+
     def get_tcmu_runner_container(self):
         # type: () -> CephContainer
         # daemon_id, is used to generated the cid and pid files used by podman but as both tcmu-runner
         # and rbd-target-api have the same daemon_id, it conflits and prevent the second container from
         # starting. .tcmu runner is appended to the daemon_id to fix that.
-        tcmu_container = get_container(self.ctx, self.fsid, self.daemon_type, str(self.daemon_id) + '.tcmu')
-        tcmu_container.entrypoint = '/usr/bin/tcmu-runner'
+        tcmu_container = get_deployment_container(self.ctx, self.fsid, self.daemon_type, str(self.daemon_id) + '.tcmu')
+        # TODO: Eventually we don't want to run tcmu-runner through this script.
+        # This is intended to be a workaround backported to older releases
+        # and should eventually be removed in at least squid onward
+        tcmu_container.entrypoint = '/usr/local/scripts/tcmu-runner-entrypoint.sh'
         tcmu_container.cname = self.get_container_name(desc='tcmu')
         return tcmu_container
 
+
+##################################
+
+
+class CephNvmeof(object):
+    """Defines a Ceph-Nvmeof container"""
+
+    daemon_type = 'nvmeof'
+    required_files = ['ceph-nvmeof.conf']
+    default_image = DEFAULT_NVMEOF_IMAGE
+
+    def __init__(self,
+                 ctx,
+                 fsid,
+                 daemon_id,
+                 config_json,
+                 image=DEFAULT_NVMEOF_IMAGE):
+        # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
+        self.ctx = ctx
+        self.fsid = fsid
+        self.daemon_id = daemon_id
+        self.image = image
+
+        # config-json options
+        self.files = dict_get(config_json, 'files', {})
+
+        # validate the supplied args
+        self.validate()
+
+    @classmethod
+    def init(cls, ctx, fsid, daemon_id):
+        # type: (CephadmContext, str, Union[int, str]) -> CephNvmeof
+        return cls(ctx, fsid, daemon_id,
+                   fetch_configs(ctx), ctx.image)
+
+    @staticmethod
+    def get_container_mounts(data_dir: str) -> Dict[str, str]:
+        mounts = dict()
+        mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
+        mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
+        mounts[os.path.join(data_dir, 'ceph-nvmeof.conf')] = '/src/ceph-nvmeof.conf:z'
+        mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+        mounts['/dev/hugepages'] = '/dev/hugepages'
+        mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio'
+        return mounts
+
+    @staticmethod
+    def get_container_binds():
+        # type: () -> List[List[str]]
+        binds = []
+        lib_modules = ['type=bind',
+                       'source=/lib/modules',
+                       'destination=/lib/modules',
+                       'ro=true']
+        binds.append(lib_modules)
+        return binds
+
+    @staticmethod
+    def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]:
+        out, err, ret = call(ctx,
+                             [ctx.container_engine.path, 'inspect',
+                              '--format', '{{index .Config.Labels "io.ceph.version"}}',
+                              ctx.image])
+        version = None
+        if ret == 0:
+            version = out.strip()
+        return version
+
+    def validate(self):
+        # type: () -> None
+        if not is_fsid(self.fsid):
+            raise Error('not an fsid: %s' % self.fsid)
+        if not self.daemon_id:
+            raise Error('invalid daemon_id: %s' % self.daemon_id)
+        if not self.image:
+            raise Error('invalid image: %s' % self.image)
+
+        # check for the required files
+        if self.required_files:
+            for fname in self.required_files:
+                if fname not in self.files:
+                    raise Error('required file missing from config-json: %s' % fname)
+
+    def get_daemon_name(self):
+        # type: () -> str
+        return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+    def get_container_name(self, desc=None):
+        # type: (Optional[str]) -> str
+        cname = '%s-%s' % (self.fsid, self.get_daemon_name())
+        if desc:
+            cname = '%s-%s' % (cname, desc)
+        return cname
+
+    def create_daemon_dirs(self, data_dir, uid, gid):
+        # type: (str, int, int) -> None
+        """Create files under the container data dir"""
+        if not os.path.isdir(data_dir):
+            raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+        logger.info('Creating ceph-nvmeof config...')
+        configfs_dir = os.path.join(data_dir, 'configfs')
+        makedirs(configfs_dir, uid, gid, 0o755)
+
+        # populate files from the config-json
+        populate_files(data_dir, self.files, uid, gid)
+
+    @staticmethod
+    def configfs_mount_umount(data_dir, mount=True):
+        # type: (str, bool) -> List[str]
+        mount_path = os.path.join(data_dir, 'configfs')
+        if mount:
+            cmd = 'if ! grep -qs {0} /proc/mounts; then ' \
+                  'mount -t configfs none {0}; fi'.format(mount_path)
+        else:
+            cmd = 'if grep -qs {0} /proc/mounts; then ' \
+                  'umount {0}; fi'.format(mount_path)
+        return cmd.split()
+
+    @staticmethod
+    def get_sysctl_settings() -> List[str]:
+        return [
+            'vm.nr_hugepages = 4096',
+        ]
+
+
 ##################################
 
 
@@ -974,7 +1199,7 @@ class CephExporter(object):
     def init(cls, ctx: CephadmContext, fsid: str,
              daemon_id: Union[int, str]) -> 'CephExporter':
         return cls(ctx, fsid, daemon_id,
-                   get_parm(ctx.config_json), ctx.image)
+                   fetch_configs(ctx), ctx.image)
 
     @staticmethod
     def get_container_mounts() -> Dict[str, str]:
@@ -1023,7 +1248,7 @@ class HAproxy(object):
     @classmethod
     def init(cls, ctx: CephadmContext,
              fsid: str, daemon_id: Union[int, str]) -> 'HAproxy':
-        return cls(ctx, fsid, daemon_id, get_parm(ctx.config_json),
+        return cls(ctx, fsid, daemon_id, fetch_configs(ctx),
                    ctx.image)
 
     def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
@@ -1112,7 +1337,7 @@ class Keepalived(object):
     def init(cls, ctx: CephadmContext, fsid: str,
              daemon_id: Union[int, str]) -> 'Keepalived':
         return cls(ctx, fsid, daemon_id,
-                   get_parm(ctx.config_json), ctx.image)
+                   fetch_configs(ctx), ctx.image)
 
     def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
         """Create files under the container data dir"""
@@ -1248,7 +1473,7 @@ class CustomContainer(object):
     def init(cls, ctx: CephadmContext,
              fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
         return cls(fsid, daemon_id,
-                   get_parm(ctx.config_json), ctx.image)
+                   fetch_configs(ctx), ctx.image)
 
     def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
         """
@@ -1269,9 +1494,7 @@ class CustomContainer(object):
             logger.info('Creating file: {}'.format(file_path))
             content = dict_get_join(self.files, file_path)
             file_path = os.path.join(data_dir, file_path.strip('/'))
-            with open(file_path, 'w', encoding='utf-8') as f:
-                os.fchown(f.fileno(), uid, gid)
-                os.fchmod(f.fileno(), 0o600)
+            with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f:
                 f.write(content)
 
     def get_daemon_args(self) -> List[str]:
@@ -1366,7 +1589,7 @@ def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> A
 ##################################
 
 
-def dict_get_join(d: Dict, key: str) -> Any:
+def dict_get_join(d: Dict[str, Any], key: str) -> Any:
     """
     Helper function to get the value of a given key from a dictionary.
     `List` values will be converted to a string by joining them with a
@@ -1390,6 +1613,7 @@ def get_supported_daemons():
     supported_daemons.extend(Monitoring.components)
     supported_daemons.append(NFSGanesha.daemon_type)
     supported_daemons.append(CephIscsi.daemon_type)
+    supported_daemons.append(CephNvmeof.daemon_type)
     supported_daemons.append(CustomContainer.daemon_type)
     supported_daemons.append(HAproxy.daemon_type)
     supported_daemons.append(Keepalived.daemon_type)
@@ -1417,22 +1641,21 @@ def attempt_bind(ctx, s, address, port):
             logger.warning(msg)
             raise PortOccupiedError(msg)
         else:
-            raise Error(e)
+            raise e
     except Exception as e:
         raise Error(e)
     finally:
         s.close()
 
 
-def port_in_use(ctx, port_num):
-    # type: (CephadmContext, int) -> bool
+def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool:
     """Detect whether a port is in use on the local machine - IPv4 and IPv6"""
-    logger.info('Verifying port %d ...' % port_num)
+    logger.info('Verifying port %s ...' % str(endpoint))
 
     def _port_in_use(af: socket.AddressFamily, address: str) -> bool:
         try:
             s = socket.socket(af, socket.SOCK_STREAM)
-            attempt_bind(ctx, s, address, port_num)
+            attempt_bind(ctx, s, address, endpoint.port)
         except PortOccupiedError:
             return True
         except OSError as e:
@@ -1444,6 +1667,13 @@ def port_in_use(ctx, port_num):
             else:
                 raise e
         return False
+
+    if endpoint.ip != '0.0.0.0' and endpoint.ip != '::':
+        if is_ipv6(endpoint.ip):
+            return _port_in_use(socket.AF_INET6, endpoint.ip)
+        else:
+            return _port_in_use(socket.AF_INET, endpoint.ip)
+
     return any(_port_in_use(af, address) for af, address in (
         (socket.AF_INET, '0.0.0.0'),
         (socket.AF_INET6, '::')
@@ -2261,29 +2491,58 @@ def require_image(func: FuncT) -> FuncT:
 def default_image(func: FuncT) -> FuncT:
     @wraps(func)
     def _default_image(ctx: CephadmContext) -> Any:
-        if not ctx.image:
-            if 'name' in ctx and ctx.name:
-                type_ = ctx.name.split('.', 1)[0]
-                if type_ in Monitoring.components:
-                    ctx.image = Monitoring.components[type_]['image']
-                if type_ == 'haproxy':
-                    ctx.image = HAproxy.default_image
-                if type_ == 'keepalived':
-                    ctx.image = Keepalived.default_image
-                if type_ == SNMPGateway.daemon_type:
-                    ctx.image = SNMPGateway.default_image
-                if type_ in Tracing.components:
-                    ctx.image = Tracing.components[type_]['image']
-            if not ctx.image:
-                ctx.image = os.environ.get('CEPHADM_IMAGE')
-            if not ctx.image:
-                ctx.image = _get_default_image(ctx)
-
+        update_default_image(ctx)
         return func(ctx)
 
     return cast(FuncT, _default_image)
 
 
+def update_default_image(ctx: CephadmContext) -> None:
+    if getattr(ctx, 'image', None):
+        return
+    ctx.image = None  # ensure ctx.image exists to avoid repeated `getattr`s
+    name = getattr(ctx, 'name', None)
+    if name:
+        type_ = name.split('.', 1)[0]
+        if type_ in Monitoring.components:
+            ctx.image = Monitoring.components[type_]['image']
+        if type_ == 'haproxy':
+            ctx.image = HAproxy.default_image
+        if type_ == 'keepalived':
+            ctx.image = Keepalived.default_image
+        if type_ == SNMPGateway.daemon_type:
+            ctx.image = SNMPGateway.default_image
+        if type_ == CephNvmeof.daemon_type:
+            ctx.image = CephNvmeof.default_image
+        if type_ in Tracing.components:
+            ctx.image = Tracing.components[type_]['image']
+    if not ctx.image:
+        ctx.image = os.environ.get('CEPHADM_IMAGE')
+    if not ctx.image:
+        ctx.image = _get_default_image(ctx)
+
+
+def executes_early(func: FuncT) -> FuncT:
+    """Decorator that indicates the command function is meant to have no
+    dependencies and no environmental requirements and can therefore be
+    executed as non-root and with no logging, etc. Commands that have this
+    decorator applied must be simple and self-contained.
+    """
+    cast(Any, func)._execute_early = True
+    return func
+
+
+def deprecated_command(func: FuncT) -> FuncT:
+    @wraps(func)
+    def _deprecated_command(ctx: CephadmContext) -> Any:
+        logger.warning(f'Deprecated command used: {func}')
+        if NO_DEPRECATED:
+            raise Error('running deprecated commands disabled')
+        return func(ctx)
+
+    return cast(FuncT, _deprecated_command)
+
+
 def get_container_info(ctx: CephadmContext, daemon_filter: str, by_name: bool) -> Optional[ContainerInfo]:
     """
     :param ctx: Cephadm context
@@ -2733,15 +2992,15 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
         if daemon_type not in ['grafana', 'loki', 'promtail']:
             ip = ''
             port = Monitoring.port_map[daemon_type][0]
-            if 'meta_json' in ctx and ctx.meta_json:
-                meta = json.loads(ctx.meta_json) or {}
+            meta = fetch_meta(ctx)
+            if meta:
                 if 'ip' in meta and meta['ip']:
                     ip = meta['ip']
                 if 'ports' in meta and meta['ports']:
                     port = meta['ports'][0]
             r += [f'--web.listen-address={ip}:{port}']
             if daemon_type == 'prometheus':
-                config = get_parm(ctx.config_json)
+                config = fetch_configs(ctx)
                 retention_time = config.get('retention_time', '15d')
                 retention_size = config.get('retention_size', '0')  # default to disabled
                 r += [f'--storage.tsdb.retention.time={retention_time}']
@@ -2757,7 +3016,7 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
                     host = wrap_ipv6(addr) if addr else host
                 r += [f'--web.external-url={scheme}://{host}:{port}']
         if daemon_type == 'alertmanager':
-            config = get_parm(ctx.config_json)
+            config = fetch_configs(ctx)
             peers = config.get('peers', list())  # type: ignore
             for peer in peers:
                 r += ['--cluster.peer={}'.format(peer)]
@@ -2770,15 +3029,15 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
         if daemon_type == 'promtail':
             r += ['--config.expand-env']
         if daemon_type == 'prometheus':
-            config = get_parm(ctx.config_json)
+            config = fetch_configs(ctx)
             try:
                 r += [f'--web.config.file={config["web_config"]}']
             except KeyError:
                 pass
         if daemon_type == 'node-exporter':
-            config = get_parm(ctx.config_json)
+            config = fetch_configs(ctx)
             try:
-                r += [f'--web.config={config["web_config"]}']
+                r += [f'--web.config.file={config["web_config"]}']
             except KeyError:
                 pass
             r += ['--path.procfs=/host/proc',
@@ -2815,22 +3074,16 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
 
     if config:
         config_path = os.path.join(data_dir, 'config')
-        with open(config_path, 'w') as f:
-            os.fchown(f.fileno(), uid, gid)
-            os.fchmod(f.fileno(), 0o600)
+        with write_new(config_path, owner=(uid, gid)) as f:
             f.write(config)
 
     if keyring:
         keyring_path = os.path.join(data_dir, 'keyring')
-        with open(keyring_path, 'w') as f:
-            os.fchmod(f.fileno(), 0o600)
-            os.fchown(f.fileno(), uid, gid)
+        with write_new(keyring_path, owner=(uid, gid)) as f:
             f.write(keyring)
 
     if daemon_type in Monitoring.components.keys():
-        config_json: Dict[str, Any] = dict()
-        if 'config_json' in ctx:
-            config_json = get_parm(ctx.config_json)
+        config_json = fetch_configs(ctx)
 
         # Set up directories specific to the monitoring component
         config_dir = ''
@@ -2881,14 +3134,15 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
         # populate the config directory for the component from the config-json
         if 'files' in config_json:
             for fname in config_json['files']:
-                content = dict_get_join(config_json['files'], fname)
+                # work around mypy wierdness where it thinks `str`s aren't Anys
+                # when used for dictionary values! feels like possibly a mypy bug?!
+                cfg = cast(Dict[str, Any], config_json['files'])
+                content = dict_get_join(cfg, fname)
                 if os.path.isabs(fname):
                     fpath = os.path.join(data_dir_root, fname.lstrip(os.path.sep))
                 else:
                     fpath = os.path.join(data_dir_root, config_dir, fname)
-                with open(fpath, 'w', encoding='utf-8') as f:
-                    os.fchown(f.fileno(), uid, gid)
-                    os.fchmod(f.fileno(), 0o600)
+                with write_new(fpath, owner=(uid, gid), encoding='utf-8') as f:
                     f.write(content)
 
     elif daemon_type == NFSGanesha.daemon_type:
@@ -2899,6 +3153,10 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
         ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
         ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
 
+    elif daemon_type == CephNvmeof.daemon_type:
+        ceph_nvmeof = CephNvmeof.init(ctx, fsid, daemon_id)
+        ceph_nvmeof.create_daemon_dirs(data_dir, uid, gid)
+
     elif daemon_type == HAproxy.daemon_type:
         haproxy = HAproxy.init(ctx, fsid, daemon_id)
         haproxy.create_daemon_dirs(data_dir, uid, gid)
@@ -2920,40 +3178,39 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
 
 def _write_custom_conf_files(ctx: CephadmContext, daemon_type: str, daemon_id: str, fsid: str, uid: int, gid: int) -> None:
     # mostly making this its own function to make unit testing easier
-    if 'config_json' not in ctx or not ctx.config_json:
+    ccfiles = fetch_custom_config_files(ctx)
+    if not ccfiles:
         return
-    config_json = get_custom_config_files(ctx.config_json)
     custom_config_dir = os.path.join(ctx.data_dir, fsid, 'custom_config_files', f'{daemon_type}.{daemon_id}')
     if not os.path.exists(custom_config_dir):
         makedirs(custom_config_dir, uid, gid, 0o755)
     mandatory_keys = ['mount_path', 'content']
-    for ccf in config_json['custom_config_files']:
+    for ccf in ccfiles:
         if all(k in ccf for k in mandatory_keys):
             file_path = os.path.join(custom_config_dir, os.path.basename(ccf['mount_path']))
-            with open(file_path, 'w+', encoding='utf-8') as f:
-                os.fchown(f.fileno(), uid, gid)
-                os.fchmod(f.fileno(), 0o600)
+            with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f:
                 f.write(ccf['content'])
+            # temporary workaround to make custom config files work for tcmu-runner
+            # container we deploy with iscsi until iscsi is refactored
+            if daemon_type == 'iscsi':
+                tcmu_config_dir = custom_config_dir + '.tcmu'
+                if not os.path.exists(tcmu_config_dir):
+                    makedirs(tcmu_config_dir, uid, gid, 0o755)
+                tcmu_file_path = os.path.join(tcmu_config_dir, os.path.basename(ccf['mount_path']))
+                with write_new(tcmu_file_path, owner=(uid, gid), encoding='utf-8') as f:
+                    f.write(ccf['content'])
 
 
 def get_parm(option: str) -> Dict[str, str]:
     js = _get_config_json(option)
     # custom_config_files is a special field that may be in the config
     # dict. It is used for mounting custom config files into daemon's containers
-    # and should be accessed through the "get_custom_config_files" function.
+    # and should be accessed through the "fetch_custom_config_files" function.
     # For get_parm we need to discard it.
     js.pop('custom_config_files', None)
     return js
 
 
-def get_custom_config_files(option: str) -> Dict[str, List[Dict[str, str]]]:
-    js = _get_config_json(option)
-    res: Dict[str, List[Dict[str, str]]] = {'custom_config_files': []}
-    if 'custom_config_files' in js:
-        res['custom_config_files'] = js['custom_config_files']
-    return res
-
-
 def _get_config_json(option: str) -> Dict[str, Any]:
     if not option:
         return dict()
@@ -2984,13 +3241,94 @@ def _get_config_json(option: str) -> Dict[str, Any]:
         return js
 
 
+def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
+    """Return a dict containing metadata about a deployment.
+    """
+    meta = getattr(ctx, 'meta_properties', None)
+    if meta is not None:
+        return meta
+    mjson = getattr(ctx, 'meta_json', None)
+    if mjson is not None:
+        meta = json.loads(mjson) or {}
+        ctx.meta_properties = meta
+        return meta
+    return {}
+
+
+def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
+    """Return a dict containing arbitrary configuration parameters.
+    This function filters out the key 'custom_config_files' which
+    must not be part of a deployment's configuration key-value pairs.
+    To access custom configuration file data, use `fetch_custom_config_files`.
+    """
+    # ctx.config_blobs is *always* a dict. it is created once when
+    # a command is parsed/processed and stored "forever"
+    cfg_blobs = getattr(ctx, 'config_blobs', None)
+    if cfg_blobs:
+        cfg_blobs = dict(cfg_blobs)
+        cfg_blobs.pop('custom_config_files', None)
+        return cfg_blobs
+    # ctx.config_json is the legacy equivalent of config_blobs. it is a
+    # string that either contains json or refers to a file name where
+    # the file contains json.
+    cfg_json = getattr(ctx, 'config_json', None)
+    if cfg_json:
+        jdata = _get_config_json(cfg_json) or {}
+        jdata.pop('custom_config_files', None)
+        return jdata
+    return {}
+
+
+def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]:
+    """Return a list containing dicts that can be used to populate
+    custom configuration files for containers.
+    """
+    # NOTE: this function works like the opposite of fetch_configs.
+    # instead of filtering out custom_config_files, it returns only
+    # the content in that key.
+    cfg_blobs = getattr(ctx, 'config_blobs', None)
+    if cfg_blobs:
+        return cfg_blobs.get('custom_config_files', [])
+    cfg_json = getattr(ctx, 'config_json', None)
+    if cfg_json:
+        jdata = _get_config_json(cfg_json)
+        return jdata.get('custom_config_files', [])
+    return []
+
+
+def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
+    """Return a list of Endpoints, which have a port and ip attribute
+    """
+    ports = getattr(ctx, 'tcp_ports', None)
+    if ports is None:
+        ports = []
+    if isinstance(ports, str):
+        ports = list(map(int, ports.split()))
+    port_ips: Dict[str, str] = {}
+    port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
+    if isinstance(port_ips_attr, str):
+        port_ips = json.loads(port_ips_attr)
+    elif port_ips_attr is not None:
+        # if it's not None or a str, assume it's already the dict we want
+        port_ips = port_ips_attr
+
+    endpoints: List[EndPoint] = []
+    for port in ports:
+        if str(port) in port_ips:
+            endpoints.append(EndPoint(port_ips[str(port)], port))
+        else:
+            endpoints.append(EndPoint('0.0.0.0', port))
+
+    return endpoints
+
+
 def get_config_and_keyring(ctx):
     # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
     config = None
     keyring = None
 
-    if 'config_json' in ctx and ctx.config_json:
-        d = get_parm(ctx.config_json)
+    d = fetch_configs(ctx)
+    if d:
         config = d.get('config')
         keyring = d.get('keyring')
         if config and keyring:
@@ -3021,6 +3359,8 @@ def get_container_binds(ctx, fsid, daemon_type, daemon_id):
 
     if daemon_type == CephIscsi.daemon_type:
         binds.extend(CephIscsi.get_container_binds())
+    if daemon_type == CephNvmeof.daemon_type:
+        binds.extend(CephNvmeof.get_container_binds())
     elif daemon_type == CustomContainer.daemon_type:
         assert daemon_id
         cc = CustomContainer.init(ctx, fsid, daemon_id)
@@ -3073,10 +3413,14 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
     if daemon_type == 'osd':
         # selinux-policy in the container may not match the host.
         if HostFacts(ctx).selinux_enabled:
-            selinux_folder = '/var/lib/ceph/%s/selinux' % fsid
-            if not os.path.exists(selinux_folder):
-                os.makedirs(selinux_folder, mode=0o755)
-            mounts[selinux_folder] = '/sys/fs/selinux:ro'
+            cluster_dir = f'{ctx.data_dir}/{fsid}'
+            selinux_folder = f'{cluster_dir}/selinux'
+            if os.path.exists(cluster_dir):
+                if not os.path.exists(selinux_folder):
+                    os.makedirs(selinux_folder, mode=0o755)
+                mounts[selinux_folder] = '/sys/fs/selinux:ro'
+            else:
+                logger.error(f'Cluster direcotry {cluster_dir} does not exist.')
         mounts['/'] = '/rootfs'
 
     try:
@@ -3133,6 +3477,11 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
         data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
         mounts.update(HAproxy.get_container_mounts(data_dir))
 
+    if daemon_type == CephNvmeof.daemon_type:
+        assert daemon_id
+        data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+        mounts.update(CephNvmeof.get_container_mounts(data_dir))
+
     if daemon_type == CephIscsi.daemon_type:
         assert daemon_id
         data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
@@ -3250,7 +3599,7 @@ def get_container(ctx: CephadmContext,
     elif daemon_type in Tracing.components:
         entrypoint = ''
         name = '%s.%s' % (daemon_type, daemon_id)
-        config = get_parm(ctx.config_json)
+        config = fetch_configs(ctx)
         Tracing.set_configuration(config, daemon_type)
         envs.extend(Tracing.components[daemon_type].get('envs', []))
     elif daemon_type == NFSGanesha.daemon_type:
@@ -3267,6 +3616,11 @@ def get_container(ctx: CephadmContext,
         name = '%s.%s' % (daemon_type, daemon_id)
         envs.extend(Keepalived.get_container_envs())
         container_args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
+    elif daemon_type == CephNvmeof.daemon_type:
+        name = '%s.%s' % (daemon_type, daemon_id)
+        container_args.extend(['--ulimit', 'memlock=-1:-1'])
+        container_args.extend(['--ulimit', 'nofile=10240'])
+        container_args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE'])
     elif daemon_type == CephIscsi.daemon_type:
         entrypoint = CephIscsi.entrypoint
         name = '%s.%s' % (daemon_type, daemon_id)
@@ -3374,26 +3728,29 @@ def extract_uid_gid(ctx, img='', file_path='/var/lib/ceph'):
     raise RuntimeError('uid/gid not found')
 
 
-def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
-                  config=None, keyring=None,
-                  osd_fsid=None,
-                  reconfig=False,
-                  ports=None):
-    # type: (CephadmContext, str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
-
-    ports = ports or []
-    if any([port_in_use(ctx, port) for port in ports]):
-        if daemon_type == 'mgr':
-            # non-fatal for mgr when we are in mgr_standby_modules=false, but we can't
-            # tell whether that is the case here.
-            logger.warning(
-                f"ceph-mgr TCP port(s) {','.join(map(str, ports))} already in use"
-            )
-        else:
-            raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type))
+def deploy_daemon(ctx: CephadmContext, fsid: str, daemon_type: str,
+                  daemon_id: Union[int, str], c: Optional['CephContainer'],
+                  uid: int, gid: int, config: Optional[str] = None,
+                  keyring: Optional[str] = None, osd_fsid: Optional[str] = None,
+                  deployment_type: DeploymentType = DeploymentType.DEFAULT,
+                  endpoints: Optional[List[EndPoint]] = None) -> None:
+
+    endpoints = endpoints or []
+    # only check port in use if fresh deployment since service
+    # we are redeploying/reconfiguring will already be using the port
+    if deployment_type == DeploymentType.DEFAULT:
+        if any([port_in_use(ctx, e) for e in endpoints]):
+            if daemon_type == 'mgr':
+                # non-fatal for mgr when we are in mgr_standby_modules=false, but we can't
+                # tell whether that is the case here.
+                logger.warning(
+                    f"ceph-mgr TCP port(s) {','.join(map(str, endpoints))} already in use"
+                )
+            else:
+                raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type))
 
     data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
-    if reconfig and not os.path.exists(data_dir):
+    if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir):
         raise Error('cannot reconfig, data path %s does not exist' % data_dir)
     if daemon_type == 'mon' and not os.path.exists(data_dir):
         assert config
@@ -3428,9 +3785,7 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
         ).run()
 
         # write conf
-        with open(mon_dir + '/config', 'w') as f:
-            os.fchown(f.fileno(), uid, gid)
-            os.fchmod(f.fileno(), 0o600)
+        with write_new(mon_dir + '/config', owner=(uid, gid)) as f:
             f.write(config)
     else:
         # dirs, conf, keyring
@@ -3440,12 +3795,11 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
             uid, gid,
             config, keyring)
 
-    if not reconfig:
+    # only write out unit files and start daemon
+    # with systemd if this is not a reconfig
+    if deployment_type != DeploymentType.RECONFIG:
         if daemon_type == CephadmAgent.daemon_type:
-            if ctx.config_json == '-':
-                config_js = get_parm('-')
-            else:
-                config_js = get_parm(ctx.config_json)
+            config_js = fetch_configs(ctx)
             assert isinstance(config_js, dict)
 
             cephadm_agent = CephadmAgent(ctx, fsid, daemon_id)
@@ -3453,30 +3807,28 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
         else:
             if c:
                 deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
-                                    c, osd_fsid=osd_fsid, ports=ports)
+                                    c, osd_fsid=osd_fsid, endpoints=endpoints)
             else:
                 raise RuntimeError('attempting to deploy a daemon without a container image')
 
     if not os.path.exists(data_dir + '/unit.created'):
-        with open(data_dir + '/unit.created', 'w') as f:
-            os.fchmod(f.fileno(), 0o600)
-            os.fchown(f.fileno(), uid, gid)
+        with write_new(data_dir + '/unit.created', owner=(uid, gid)) as f:
             f.write('mtime is time the daemon deployment was created\n')
 
-    with open(data_dir + '/unit.configured', 'w') as f:
+    with write_new(data_dir + '/unit.configured', owner=(uid, gid)) as f:
         f.write('mtime is time we were last configured\n')
-        os.fchmod(f.fileno(), 0o600)
-        os.fchown(f.fileno(), uid, gid)
 
     update_firewalld(ctx, daemon_type)
 
     # Open ports explicitly required for the daemon
-    if ports:
+    if endpoints:
         fw = Firewalld(ctx)
-        fw.open_ports(ports + fw.external_ports.get(daemon_type, []))
+        fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, []))
         fw.apply_rules()
 
-    if reconfig and daemon_type not in Ceph.daemons:
+    # If this was a reconfig and the daemon is not a Ceph daemon, restart it
+    # so it can pick up potential changes to its configuration files
+    if deployment_type == DeploymentType.RECONFIG and daemon_type not in Ceph.daemons:
         # ceph daemons do not need a restart; others (presumably) do to pick
         # up the new config
         call_throws(ctx, ['systemctl', 'reset-failed',
@@ -3547,7 +3899,7 @@ def deploy_daemon_units(
     enable: bool = True,
     start: bool = True,
     osd_fsid: Optional[str] = None,
-    ports: Optional[List[int]] = None,
+    endpoints: Optional[List[EndPoint]] = None,
 ) -> None:
     # cmd
 
@@ -3560,8 +3912,10 @@ def deploy_daemon_units(
         f.write(f'! {container_exists % c.cname} || {" ".join(c.stop_cmd(timeout=timeout))} \n')
 
     data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
-    with open(data_dir + '/unit.run.new', 'w') as f, \
-            open(data_dir + '/unit.meta.new', 'w') as metaf:
+    run_file_path = data_dir + '/unit.run'
+    meta_file_path = data_dir + '/unit.meta'
+    with write_new(run_file_path) as f, write_new(meta_file_path) as metaf:
+
         f.write('set -e\n')
 
         if daemon_type in Ceph.daemons:
@@ -3625,30 +3979,24 @@ def deploy_daemon_units(
         _write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id)))
 
         # some metadata about the deploy
-        meta: Dict[str, Any] = {}
-        if 'meta_json' in ctx and ctx.meta_json:
-            meta = json.loads(ctx.meta_json) or {}
+        meta: Dict[str, Any] = fetch_meta(ctx)
         meta.update({
             'memory_request': int(ctx.memory_request) if ctx.memory_request else None,
             'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None,
         })
         if not meta.get('ports'):
-            meta['ports'] = ports
+            if endpoints:
+                meta['ports'] = [e.port for e in endpoints]
+            else:
+                meta['ports'] = []
         metaf.write(json.dumps(meta, indent=4) + '\n')
 
-        os.fchmod(f.fileno(), 0o600)
-        os.fchmod(metaf.fileno(), 0o600)
-        os.rename(data_dir + '/unit.run.new',
-                  data_dir + '/unit.run')
-        os.rename(data_dir + '/unit.meta.new',
-                  data_dir + '/unit.meta')
-
     timeout = 30 if daemon_type == 'osd' else None
     # post-stop command(s)
-    with open(data_dir + '/unit.poststop.new', 'w') as f:
+    with write_new(data_dir + '/unit.poststop') as f:
         # this is a fallback to eventually stop any underlying container that was not stopped properly by unit.stop,
         # this could happen in very slow setups as described in the issue https://tracker.ceph.com/issues/58242.
-        add_stop_actions(f, timeout)
+        add_stop_actions(cast(TextIO, f), timeout)
         if daemon_type == 'osd':
             assert osd_fsid
             poststop = get_ceph_volume_container(
@@ -3672,23 +4020,14 @@ def deploy_daemon_units(
             f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n')
             f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n')
             f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
-        os.fchmod(f.fileno(), 0o600)
-        os.rename(data_dir + '/unit.poststop.new',
-                  data_dir + '/unit.poststop')
 
     # post-stop command(s)
-    with open(data_dir + '/unit.stop.new', 'w') as f:
-        add_stop_actions(f, timeout)
-        os.fchmod(f.fileno(), 0o600)
-        os.rename(data_dir + '/unit.stop.new',
-                  data_dir + '/unit.stop')
+    with write_new(data_dir + '/unit.stop') as f:
+        add_stop_actions(cast(TextIO, f), timeout)
 
     if c:
-        with open(data_dir + '/unit.image.new', 'w') as f:
+        with write_new(data_dir + '/unit.image') as f:
             f.write(c.image + '\n')
-            os.fchmod(f.fileno(), 0o600)
-            os.rename(data_dir + '/unit.image.new',
-                      data_dir + '/unit.image')
 
     # sysctl
     install_sysctl(ctx, fsid, daemon_type)
@@ -3697,10 +4036,8 @@ def deploy_daemon_units(
     install_base_units(ctx, fsid)
     unit = get_unit_file(ctx, fsid)
     unit_file = 'ceph-%s@.service' % (fsid)
-    with open(ctx.unit_dir + '/' + unit_file + '.new', 'w') as f:
+    with write_new(ctx.unit_dir + '/' + unit_file, perms=None) as f:
         f.write(unit)
-        os.rename(ctx.unit_dir + '/' + unit_file + '.new',
-                  ctx.unit_dir + '/' + unit_file)
     call_throws(ctx, ['systemctl', 'daemon-reload'])
 
     unit_name = get_unit_name(fsid, daemon_type, daemon_id)
@@ -3852,7 +4189,7 @@ def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
             *lines,
             '',
         ]
-        with open(conf, 'w') as f:
+        with write_new(conf, owner=None, perms=None) as f:
             f.write('\n'.join(lines))
 
     conf = Path(ctx.sysctl_dir).joinpath(f'90-ceph-{fsid}-{daemon_type}.conf')
@@ -3864,6 +4201,8 @@ def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
         lines = HAproxy.get_sysctl_settings()
     elif daemon_type == 'keepalived':
         lines = Keepalived.get_sysctl_settings()
+    elif daemon_type == CephNvmeof.daemon_type:
+        lines = CephNvmeof.get_sysctl_settings()
     lines = filter_sysctl_settings(ctx, lines)
 
     # apply the sysctl settings
@@ -3950,14 +4289,12 @@ def install_base_units(ctx, fsid):
     """
     # global unit
     existed = os.path.exists(ctx.unit_dir + '/ceph.target')
-    with open(ctx.unit_dir + '/ceph.target.new', 'w') as f:
+    with write_new(ctx.unit_dir + '/ceph.target', perms=None) as f:
         f.write('[Unit]\n'
                 'Description=All Ceph clusters and services\n'
                 '\n'
                 '[Install]\n'
                 'WantedBy=multi-user.target\n')
-        os.rename(ctx.unit_dir + '/ceph.target.new',
-                  ctx.unit_dir + '/ceph.target')
     if not existed:
         # we disable before enable in case a different ceph.target
         # (from the traditional package) is present; while newer
@@ -3970,7 +4307,7 @@ def install_base_units(ctx, fsid):
 
     # cluster unit
     existed = os.path.exists(ctx.unit_dir + '/ceph-%s.target' % fsid)
-    with open(ctx.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f:
+    with write_new(ctx.unit_dir + f'/ceph-{fsid}.target', perms=None) as f:
         f.write(
             '[Unit]\n'
             'Description=Ceph cluster {fsid}\n'
@@ -3981,8 +4318,6 @@ def install_base_units(ctx, fsid):
             'WantedBy=multi-user.target ceph.target\n'.format(
                 fsid=fsid)
         )
-        os.rename(ctx.unit_dir + '/ceph-%s.target.new' % fsid,
-                  ctx.unit_dir + '/ceph-%s.target' % fsid)
     if not existed:
         call_throws(ctx, ['systemctl', 'enable', 'ceph-%s.target' % fsid])
         call_throws(ctx, ['systemctl', 'start', 'ceph-%s.target' % fsid])
@@ -3992,7 +4327,7 @@ def install_base_units(ctx, fsid):
         return
 
     # logrotate for the cluster
-    with open(ctx.logrotate_dir + '/ceph-%s' % fsid, 'w') as f:
+    with write_new(ctx.logrotate_dir + f'/ceph-{fsid}', perms=None) as f:
         """
         This is a bit sloppy in that the killall/pkill will touch all ceph daemons
         in all containers, but I don't see an elegant way to send SIGHUP *just* to
@@ -4001,6 +4336,18 @@ def install_base_units(ctx, fsid):
         first child (bash), but that isn't the ceph daemon.  This is simpler and
         should be harmless.
         """
+        targets: List[str] = [
+            'ceph-mon',
+            'ceph-mgr',
+            'ceph-mds',
+            'ceph-osd',
+            'ceph-fuse',
+            'radosgw',
+            'rbd-mirror',
+            'cephfs-mirror',
+            'tcmu-runner'
+        ]
+
         f.write("""# created by cephadm
 /var/log/ceph/%s/*.log {
     rotate 7
@@ -4008,13 +4355,13 @@ def install_base_units(ctx, fsid):
     compress
     sharedscripts
     postrotate
-        killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror cephfs-mirror || pkill -1 -x 'ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror|cephfs-mirror' || true
+        killall -q -1 %s || pkill -1 -x '%s' || true
     endscript
     missingok
     notifempty
     su root root
 }
-""" % fsid)
+""" % (fsid, ' '.join(targets), '|'.join(targets)))
 
 
 def get_unit_file(ctx, fsid):
@@ -4391,9 +4738,8 @@ class MgrListener(Thread):
             for filename in config:
                 if filename in self.agent.required_files:
                     file_path = os.path.join(self.agent.daemon_dir, filename)
-                    with open(os.open(file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+                    with write_new(file_path) as f:
                         f.write(config[filename])
-                        os.rename(file_path + '.new', file_path)
             self.agent.pull_conf_settings()
             self.agent.wakeup()
 
@@ -4454,27 +4800,21 @@ class CephadmAgent():
         for filename in config:
             if filename in self.required_files:
                 file_path = os.path.join(self.daemon_dir, filename)
-                with open(os.open(file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+                with write_new(file_path) as f:
                     f.write(config[filename])
-                    os.rename(file_path + '.new', file_path)
 
         unit_run_path = os.path.join(self.daemon_dir, 'unit.run')
-        with open(os.open(unit_run_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+        with write_new(unit_run_path) as f:
             f.write(self.unit_run())
-            os.rename(unit_run_path + '.new', unit_run_path)
 
-        meta: Dict[str, Any] = {}
+        meta: Dict[str, Any] = fetch_meta(self.ctx)
         meta_file_path = os.path.join(self.daemon_dir, 'unit.meta')
-        if 'meta_json' in self.ctx and self.ctx.meta_json:
-            meta = json.loads(self.ctx.meta_json) or {}
-        with open(os.open(meta_file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+        with write_new(meta_file_path) as f:
             f.write(json.dumps(meta, indent=4) + '\n')
-            os.rename(meta_file_path + '.new', meta_file_path)
 
         unit_file_path = os.path.join(self.ctx.unit_dir, self.unit_name())
-        with open(os.open(unit_file_path + '.new', os.O_CREAT | os.O_WRONLY, 0o600), 'w') as f:
+        with write_new(unit_file_path) as f:
             f.write(self.unit_file())
-            os.rename(unit_file_path + '.new', unit_file_path)
 
         call_throws(self.ctx, ['systemctl', 'daemon-reload'])
         call(self.ctx, ['systemctl', 'stop', self.unit_name()],
@@ -4557,7 +4897,7 @@ WantedBy=ceph-{fsid}.target
 
         try:
             for _ in range(1001):
-                if not port_in_use(self.ctx, self.starting_port):
+                if not port_in_use(self.ctx, EndPoint('0.0.0.0', self.starting_port)):
                     self.listener_port = str(self.starting_port)
                     break
                 self.starting_port += 1
@@ -4833,15 +5173,24 @@ def command_agent(ctx: CephadmContext) -> None:
 
 ##################################
 
-
-@infer_image
+@executes_early
 def command_version(ctx):
     # type: (CephadmContext) -> int
-    c = CephContainer(ctx, ctx.image, 'ceph', ['--version'])
-    out, err, ret = call(ctx, c.run_cmd(), desc=c.entrypoint)
-    if not ret:
-        print(out.strip())
-    return ret
+    import importlib
+
+    try:
+        vmod = importlib.import_module('_version')
+    except ImportError:
+        print('cephadm version UNKNOWN')
+        return 1
+    _unset = '<UNSET>'
+    print('cephadm version {0} ({1}) {2} ({3})'.format(
+        getattr(vmod, 'CEPH_GIT_NICE_VER', _unset),
+        getattr(vmod, 'CEPH_GIT_VER', _unset),
+        getattr(vmod, 'CEPH_RELEASE_NAME', _unset),
+        getattr(vmod, 'CEPH_RELEASE_TYPE', _unset),
+    ))
+    return 0
 
 ##################################
 
@@ -5333,7 +5682,7 @@ def create_mon(
     fsid: str, mon_id: str
 ) -> None:
     mon_c = get_container(ctx, fsid, 'mon', mon_id)
-    ctx.meta_json = json.dumps({'service_name': 'mon'})
+    ctx.meta_properties = {'service_name': 'mon'}
     deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid,
                   config=None, keyring=None)
 
@@ -5380,12 +5729,12 @@ def create_mgr(
     mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
     mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
     # Note:the default port used by the Prometheus node exporter is opened in fw
-    ctx.meta_json = json.dumps({'service_name': 'mgr'})
-    ports = [9283, 8765]
+    ctx.meta_properties = {'service_name': 'mgr'}
+    endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)]
     if not ctx.skip_monitoring_stack:
-        ports.append(8443)
+        endpoints.append(EndPoint('0.0.0.0', 8443))
     deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
-                  config=config, keyring=mgr_keyring, ports=ports)
+                  config=config, keyring=mgr_keyring, endpoints=endpoints)
 
     # wait for the service to become available
     logger.info('Waiting for mgr to start...')
@@ -5402,6 +5751,7 @@ def create_mgr(
         except Exception as e:
             logger.debug('status failed: %s' % e)
             return False
+
     is_available(ctx, 'mgr', is_mgr_available)
 
 
@@ -5428,6 +5778,15 @@ def prepare_ssh(
         cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
         cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts)
         ssh_pub = cli(['cephadm', 'get-pub-key'])
+        authorize_ssh_key(ssh_pub, ctx.ssh_user)
+    elif ctx.ssh_private_key and ctx.ssh_signed_cert:
+        logger.info('Using provided ssh private key and signed cert ...')
+        mounts = {
+            pathify(ctx.ssh_private_key.name): '/tmp/cephadm-ssh-key:z',
+            pathify(ctx.ssh_signed_cert.name): '/tmp/cephadm-ssh-key-cert.pub:z'
+        }
+        cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
+        cli(['cephadm', 'set-signed-cert', '-i', '/tmp/cephadm-ssh-key-cert.pub'], extra_mounts=mounts)
     else:
         logger.info('Generating ssh key...')
         cli(['cephadm', 'generate-key'])
@@ -5435,8 +5794,7 @@ def prepare_ssh(
         with open(ctx.output_pub_ssh_key, 'w') as f:
             f.write(ssh_pub)
         logger.info('Wrote public SSH key to %s' % ctx.output_pub_ssh_key)
-
-    authorize_ssh_key(ssh_pub, ctx.ssh_user)
+        authorize_ssh_key(ssh_pub, ctx.ssh_user)
 
     host = get_hostname()
     logger.info('Adding host %s...' % host)
@@ -5649,8 +6007,10 @@ def finish_bootstrap_config(
         cli(['config', 'set', 'global', 'container_image', f'{ctx.image}'])
 
     if mon_network:
-        logger.info(f'Setting mon public_network to {mon_network}')
-        cli(['config', 'set', 'mon', 'public_network', mon_network])
+        cp = read_config(ctx.config)
+        cfg_section = 'global' if cp.has_option('global', 'public_network') else 'mon'
+        logger.info(f'Setting public_network to {mon_network} in {cfg_section} config section')
+        cli(['config', 'set', cfg_section, 'public_network', mon_network])
 
     if cluster_network:
         logger.info(f'Setting cluster_network to {cluster_network}')
@@ -5787,6 +6147,43 @@ def save_cluster_config(ctx: CephadmContext, uid: int, gid: int, fsid: str) -> N
         logger.warning(f'Cannot create cluster configuration directory {conf_dir}')
 
 
+def rollback(func: FuncT) -> FuncT:
+    """
+    """
+    @wraps(func)
+    def _rollback(ctx: CephadmContext) -> Any:
+        try:
+            return func(ctx)
+        except ClusterAlreadyExists:
+            # another cluster with the provided fsid already exists: don't remove.
+            raise
+        except (KeyboardInterrupt, Exception) as e:
+            logger.error(f'{type(e).__name__}: {e}')
+            if ctx.cleanup_on_failure:
+                logger.info('\n\n'
+                            '\t***************\n'
+                            '\tCephadm hit an issue during cluster installation. Current cluster files will be deleted automatically,\n'
+                            '\tto disable this behaviour do not pass the --cleanup-on-failure flag. In case of any previous\n'
+                            '\tbroken installation user must use the following command to completely delete the broken cluster:\n\n'
+                            '\t> cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
+                            '\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n'
+                            '\t***************\n\n')
+                _rm_cluster(ctx, keep_logs=False, zap_osds=False)
+            else:
+                logger.info('\n\n'
+                            '\t***************\n'
+                            '\tCephadm hit an issue during cluster installation. Current cluster files will NOT BE DELETED automatically to change\n'
+                            '\tthis behaviour you can pass the --cleanup-on-failure. To remove this broken cluster manually please run:\n\n'
+                            f'\t   > cephadm rm-cluster --force --fsid {ctx.fsid}\n\n'
+                            '\tin case of any previous broken installation user must use the rm-cluster command to delete the broken cluster:\n\n'
+                            '\t   > cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
+                            '\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n'
+                            '\t***************\n\n')
+            raise
+    return cast(FuncT, _rollback)
+
+
+@rollback
 @default_image
 def command_bootstrap(ctx):
     # type: (CephadmContext) -> int
@@ -5800,23 +6197,38 @@ def command_bootstrap(ctx):
     if not ctx.output_pub_ssh_key:
         ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, CEPH_PUBKEY)
 
-    if bool(ctx.ssh_private_key) is not bool(ctx.ssh_public_key):
-        raise Error('--ssh-private-key and --ssh-public-key must be provided together or not at all.')
+    if (
+        (bool(ctx.ssh_private_key) is not bool(ctx.ssh_public_key))
+        and (bool(ctx.ssh_private_key) is not bool(ctx.ssh_signed_cert))
+    ):
+        raise Error('--ssh-private-key must be passed with either --ssh-public-key in the case of standard pubkey '
+                    'authentication or with --ssh-signed-cert in the case of CA signed signed keys or not provided at all.')
+
+    if (bool(ctx.ssh_public_key) and bool(ctx.ssh_signed_cert)):
+        raise Error('--ssh-public-key and --ssh-signed-cert are mututally exclusive. --ssh-public-key is intended '
+                    'for standard pubkey encryption where the public key is set as an authorized key on cluster hosts. '
+                    '--ssh-signed-cert is intended for the CA signed keys use case where cluster hosts are configured to trust '
+                    'a CA pub key and authentication during SSH is done by authenticating the signed cert, requiring no '
+                    'public key to be installed on the cluster hosts.')
 
     if ctx.fsid:
         data_dir_base = os.path.join(ctx.data_dir, ctx.fsid)
         if os.path.exists(data_dir_base):
-            raise Error(f"A cluster with the same fsid '{ctx.fsid}' already exists.")
+            raise ClusterAlreadyExists(f"A cluster with the same fsid '{ctx.fsid}' already exists.")
         else:
             logger.warning('Specifying an fsid for your cluster offers no advantages and may increase the likelihood of fsid conflicts.')
 
+    # initial vars
+    ctx.fsid = ctx.fsid or make_fsid()
+    fsid = ctx.fsid
+    if not is_fsid(fsid):
+        raise Error('not an fsid: %s' % fsid)
+
     # verify output files
-    for f in [ctx.output_config, ctx.output_keyring,
-              ctx.output_pub_ssh_key]:
+    for f in [ctx.output_config, ctx.output_keyring, ctx.output_pub_ssh_key]:
         if not ctx.allow_overwrite:
             if os.path.exists(f):
-                raise Error('%s already exists; delete or pass '
-                            '--allow-overwrite to overwrite' % f)
+                raise ClusterAlreadyExists('%s already exists; delete or pass --allow-overwrite to overwrite' % f)
         dirname = os.path.dirname(f)
         if dirname and not os.path.exists(dirname):
             fname = os.path.basename(f)
@@ -5837,12 +6249,7 @@ def command_bootstrap(ctx):
     else:
         logger.info('Skip prepare_host')
 
-    # initial vars
-    fsid = ctx.fsid or make_fsid()
-    if not is_fsid(fsid):
-        raise Error('not an fsid: %s' % fsid)
     logger.info('Cluster fsid: %s' % fsid)
-
     hostname = get_hostname()
     if '.' in hostname and not ctx.allow_fqdn_hostname:
         raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0]))
@@ -5887,9 +6294,7 @@ def command_bootstrap(ctx):
     (mon_dir, log_dir) = prepare_create_mon(ctx, uid, gid, fsid, mon_id,
                                             bootstrap_keyring.name, monmap.name)
 
-    with open(mon_dir + '/config', 'w') as f:
-        os.fchown(f.fileno(), uid, gid)
-        os.fchmod(f.fileno(), 0o600)
+    with write_new(mon_dir + '/config', owner=(uid, gid)) as f:
         f.write(config)
 
     make_var_run(ctx, fsid, uid, gid)
@@ -5924,8 +6329,7 @@ def command_bootstrap(ctx):
                             cluster_network, ipv6_cluster_network)
 
     # output files
-    with open(ctx.output_keyring, 'w') as f:
-        os.fchmod(f.fileno(), 0o600)
+    with write_new(ctx.output_keyring) as f:
         f.write('[client.admin]\n'
                 '\tkey = ' + admin_key + '\n')
     logger.info('Wrote keyring to %s' % ctx.output_keyring)
@@ -6000,7 +6404,10 @@ def command_bootstrap(ctx):
         with open(ctx.apply_spec) as f:
             host_dicts = _extract_host_info_from_applied_spec(f)
             for h in host_dicts:
-                _distribute_ssh_keys(ctx, h, hostname)
+                if ctx.ssh_signed_cert:
+                    logger.info('Key distribution is not supported for signed CA key setups. Skipping ...')
+                else:
+                    _distribute_ssh_keys(ctx, h, hostname)
 
         mounts = {}
         mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:ro'
@@ -6076,7 +6483,7 @@ def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[s
             cmd.append('--authfile=/etc/ceph/podman-auth.json')
         out, _, _ = call_throws(ctx, cmd)
         if isinstance(engine, Podman):
-            os.chmod('/etc/ceph/podman-auth.json', 0o600)
+            os.chmod('/etc/ceph/podman-auth.json', DEFAULT_MODE)
     except Exception:
         raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username))
 
@@ -6115,10 +6522,10 @@ def get_deployment_container(ctx: CephadmContext,
         c.container_args.extend(ctx.extra_container_args)
     if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args:
         c.args.extend(ctx.extra_entrypoint_args)
-    if 'config_json' in ctx and ctx.config_json:
-        conf_files = get_custom_config_files(ctx.config_json)
+    ccfiles = fetch_custom_config_files(ctx)
+    if ccfiles:
         mandatory_keys = ['mount_path', 'content']
-        for conf in conf_files['custom_config_files']:
+        for conf in ccfiles:
             if all(k in conf for k in mandatory_keys):
                 mount_path = conf['mount_path']
                 file_path = os.path.join(
@@ -6132,50 +6539,118 @@ def get_deployment_container(ctx: CephadmContext,
     return c
 
 
+def get_deployment_type(ctx: CephadmContext, daemon_type: str, daemon_id: str) -> DeploymentType:
+    deployment_type: DeploymentType = DeploymentType.DEFAULT
+    if ctx.reconfig:
+        deployment_type = DeploymentType.RECONFIG
+    unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id)
+    (_, state, _) = check_unit(ctx, unit_name)
+    if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ctx.fsid, daemon_type, daemon_id, 'bash')):
+        # if reconfig was set, that takes priority over redeploy. If
+        # this is considered a fresh deployment at this stage,
+        # mark it as a redeploy to avoid port checking
+        if deployment_type == DeploymentType.DEFAULT:
+            deployment_type = DeploymentType.REDEPLOY
+
+    logger.info(f'{deployment_type.value} daemon {ctx.name} ...')
+
+    return deployment_type
+
+
 @default_image
+@deprecated_command
 def command_deploy(ctx):
     # type: (CephadmContext) -> None
-    daemon_type, daemon_id = ctx.name.split('.', 1)
+    _common_deploy(ctx)
 
-    lock = FileLock(ctx, ctx.fsid)
-    lock.acquire()
 
+def read_configuration_source(ctx: CephadmContext) -> Dict[str, Any]:
+    """Read a JSON configuration based on the `ctx.source` value."""
+    source = '-'
+    if 'source' in ctx and ctx.source:
+        source = ctx.source
+    if source == '-':
+        config_data = json.load(sys.stdin)
+    else:
+        with open(source, 'rb') as fh:
+            config_data = json.load(fh)
+    logger.debug('Loaded deploy configuration: %r', config_data)
+    return config_data
+
+
+def apply_deploy_config_to_ctx(
+    config_data: Dict[str, Any],
+    ctx: CephadmContext,
+) -> None:
+    """Bind properties taken from the config_data dictionary to our ctx,
+    similar to how cli options on `deploy` are bound to the context.
+    """
+    ctx.name = config_data['name']
+    image = config_data.get('image', '')
+    if image:
+        ctx.image = image
+    if 'fsid' in config_data:
+        ctx.fsid = config_data['fsid']
+    if 'meta' in config_data:
+        ctx.meta_properties = config_data['meta']
+    if 'config_blobs' in config_data:
+        ctx.config_blobs = config_data['config_blobs']
+
+    # many functions don't check that an attribute is set on the ctx
+    # (with getattr or the '__contains__' func on ctx).
+    # This reuses the defaults from the CLI options so we don't
+    # have to repeat things and they can stay in sync.
+    facade = ArgumentFacade()
+    _add_deploy_parser_args(facade)
+    facade.apply(ctx)
+    for key, value in config_data.get('params', {}).items():
+        if key not in facade.defaults:
+            logger.warning('unexpected parameter: %r=%r', key, value)
+        setattr(ctx, key, value)
+    update_default_image(ctx)
+    logger.debug('Determined image: %r', ctx.image)
+
+
+def command_deploy_from(ctx: CephadmContext) -> None:
+    """The deploy-from command is similar to deploy but sources nearly all
+    configuration parameters from an input JSON configuration file.
+    """
+    config_data = read_configuration_source(ctx)
+    apply_deploy_config_to_ctx(config_data, ctx)
+    _common_deploy(ctx)
+
+
+def _common_deploy(ctx: CephadmContext) -> None:
+    daemon_type, daemon_id = ctx.name.split('.', 1)
     if daemon_type not in get_supported_daemons():
         raise Error('daemon type %s not recognized' % daemon_type)
 
-    redeploy = False
-    unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id)
-    (_, state, _) = check_unit(ctx, unit_name)
-    if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ctx.fsid, daemon_type, daemon_id, 'bash')):
-        redeploy = True
+    lock = FileLock(ctx, ctx.fsid)
+    lock.acquire()
 
-    if ctx.reconfig:
-        logger.info('%s daemon %s ...' % ('Reconfig', ctx.name))
-    elif redeploy:
-        logger.info('%s daemon %s ...' % ('Redeploy', ctx.name))
-    else:
-        logger.info('%s daemon %s ...' % ('Deploy', ctx.name))
+    deployment_type = get_deployment_type(ctx, daemon_type, daemon_id)
 
     # Migrate sysctl conf files from /usr/lib to /etc
     migrate_sysctl_dir(ctx, ctx.fsid)
 
     # Get and check ports explicitly required to be opened
-    daemon_ports = []  # type: List[int]
+    endpoints = fetch_tcp_ports(ctx)
+    _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type)
 
-    # only check port in use if not reconfig or redeploy since service
-    # we are redeploying/reconfiguring will already be using the port
-    if not ctx.reconfig and not redeploy:
-        if ctx.tcp_ports:
-            daemon_ports = list(map(int, ctx.tcp_ports.split()))
 
+def _dispatch_deploy(
+    ctx: CephadmContext,
+    daemon_type: str,
+    daemon_id: str,
+    daemon_endpoints: List[EndPoint],
+    deployment_type: DeploymentType,
+) -> None:
     if daemon_type in Ceph.daemons:
         config, keyring = get_config_and_keyring(ctx)
         uid, gid = extract_uid_gid(ctx)
         make_var_run(ctx, ctx.fsid, uid, gid)
 
-        config_json: Optional[Dict[str, str]] = None
-        if 'config_json' in ctx and ctx.config_json:
-            config_json = get_parm(ctx.config_json)
+        config_json = fetch_configs(ctx)
 
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
                                      ptrace=ctx.allow_ptrace)
@@ -6191,14 +6666,14 @@ def command_deploy(ctx):
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
                       osd_fsid=ctx.osd_fsid,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type in Monitoring.components:
         # monitoring daemon - prometheus, grafana, alertmanager, node-exporter
         # Default Checks
         # make sure provided config-json is sufficient
-        config = get_parm(ctx.config_json)  # type: ignore
+        config = fetch_configs(ctx)  # type: ignore
         required_files = Monitoring.components[daemon_type].get('config-json-files', list())
         required_args = Monitoring.components[daemon_type].get('config-json-args', list())
         if required_files:
@@ -6213,12 +6688,14 @@ def command_deploy(ctx):
         uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == NFSGanesha.daemon_type:
-        if not ctx.reconfig and not redeploy and not daemon_ports:
-            daemon_ports = list(NFSGanesha.port_map.values())
+        # only check ports if this is a fresh deployment
+        if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints:
+            nfs_ports = list(NFSGanesha.port_map.values())
+            daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports]
 
         config, keyring = get_config_and_keyring(ctx)
         # TODO: extract ganesha uid/gid (997, 994) ?
@@ -6226,8 +6703,8 @@ def command_deploy(ctx):
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == CephIscsi.daemon_type:
         config, keyring = get_config_and_keyring(ctx)
@@ -6235,55 +6712,68 @@ def command_deploy(ctx):
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
+    elif daemon_type == CephNvmeof.daemon_type:
+        config, keyring = get_config_and_keyring(ctx)
+        uid, gid = 167, 167  # TODO: need to get properly the uid/gid
+        c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
+        deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
+                      config=config, keyring=keyring,
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
     elif daemon_type in Tracing.components:
         uid, gid = 65534, 65534
         c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
     elif daemon_type == HAproxy.daemon_type:
         haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id)
         uid, gid = haproxy.extract_uid_gid_haproxy()
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == Keepalived.daemon_type:
         keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id)
         uid, gid = keepalived.extract_uid_gid_keepalived()
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
-                      reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == CustomContainer.daemon_type:
         cc = CustomContainer.init(ctx, ctx.fsid, daemon_id)
-        if not ctx.reconfig and not redeploy:
-            daemon_ports.extend(cc.ports)
+        # only check ports if this is a fresh deployment
+        if deployment_type == DeploymentType.DEFAULT:
+            daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports])
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
                                      privileged=cc.privileged,
                                      ptrace=ctx.allow_ptrace)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
                       uid=cc.uid, gid=cc.gid, config=None,
-                      keyring=None, reconfig=ctx.reconfig,
-                      ports=daemon_ports)
+                      keyring=None,
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == CephadmAgent.daemon_type:
         # get current user gid and uid
         uid = os.getuid()
         gid = os.getgid()
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
-                      uid, gid, ports=daemon_ports)
+                      uid, gid,
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     elif daemon_type == SNMPGateway.daemon_type:
         sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id)
         c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
         deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
                       sc.uid, sc.gid,
-                      ports=daemon_ports)
+                      deployment_type=deployment_type,
+                      endpoints=daemon_endpoints)
 
     else:
         raise Error('daemon type {} not implemented in command_deploy function'
@@ -6398,6 +6888,10 @@ def command_shell(ctx):
         privileged=True)
     command = c.shell_cmd(command)
 
+    if ctx.dry_run:
+        print(' '.join(shlex.quote(arg) for arg in command))
+        return 0
+
     return call_timeout(ctx, command, ctx.timeout)
 
 ##################################
@@ -6774,6 +7268,8 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                                 version = NFSGanesha.get_version(ctx, container_id)
                             if daemon_type == CephIscsi.daemon_type:
                                 version = CephIscsi.get_version(ctx, container_id)
+                            if daemon_type == CephNvmeof.daemon_type:
+                                version = CephNvmeof.get_version(ctx, container_id)
                             elif not version:
                                 if daemon_type in Ceph.daemons:
                                     out, err, code = call(ctx,
@@ -6806,7 +7302,8 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                                                            'haproxy', '-v'],
                                                           verbosity=CallVerbosity.QUIET)
                                     if not code and \
-                                       out.startswith('HA-Proxy version '):
+                                       out.startswith('HA-Proxy version ') or \
+                                       out.startswith('HAProxy version '):
                                         version = out.split(' ')[2]
                                         seen_versions[image_id] = version
                                 elif daemon_type == 'keepalived':
@@ -7208,6 +7705,10 @@ def command_adopt_prometheus(ctx, daemon_id, fsid):
     # type: (CephadmContext, str, str) -> None
     daemon_type = 'prometheus'
     (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+    # should try to set the ports we know cephadm defaults
+    # to for these services in the firewall.
+    ports = Monitoring.port_map['prometheus']
+    endpoints = [EndPoint('0.0.0.0', p) for p in ports]
 
     _stop_and_disable(ctx, 'prometheus')
 
@@ -7229,7 +7730,8 @@ def command_adopt_prometheus(ctx, daemon_id, fsid):
 
     make_var_run(ctx, fsid, uid, gid)
     c = get_container(ctx, fsid, daemon_type, daemon_id)
-    deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+    deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
+                  deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
     update_firewalld(ctx, daemon_type)
 
 
@@ -7238,6 +7740,10 @@ def command_adopt_grafana(ctx, daemon_id, fsid):
 
     daemon_type = 'grafana'
     (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+    # should try to set the ports we know cephadm defaults
+    # to for these services in the firewall.
+    ports = Monitoring.port_map['grafana']
+    endpoints = [EndPoint('0.0.0.0', p) for p in ports]
 
     _stop_and_disable(ctx, 'grafana-server')
 
@@ -7283,7 +7789,8 @@ def command_adopt_grafana(ctx, daemon_id, fsid):
 
     make_var_run(ctx, fsid, uid, gid)
     c = get_container(ctx, fsid, daemon_type, daemon_id)
-    deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+    deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
+                  deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
     update_firewalld(ctx, daemon_type)
 
 
@@ -7292,6 +7799,10 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid):
 
     daemon_type = 'alertmanager'
     (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+    # should try to set the ports we know cephadm defaults
+    # to for these services in the firewall.
+    ports = Monitoring.port_map['alertmanager']
+    endpoints = [EndPoint('0.0.0.0', p) for p in ports]
 
     _stop_and_disable(ctx, 'prometheus-alertmanager')
 
@@ -7313,7 +7824,8 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid):
 
     make_var_run(ctx, fsid, uid, gid)
     c = get_container(ctx, fsid, daemon_type, daemon_id)
-    deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+    deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
+                  deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
     update_firewalld(ctx, daemon_type)
 
 
@@ -7325,7 +7837,7 @@ def _adjust_grafana_ini(filename):
     try:
         with open(filename, 'r') as grafana_ini:
             lines = grafana_ini.readlines()
-        with open('{}.new'.format(filename), 'w') as grafana_ini:
+        with write_new(filename, perms=None) as grafana_ini:
             server_section = False
             for line in lines:
                 if line.startswith('['):
@@ -7338,7 +7850,6 @@ def _adjust_grafana_ini(filename):
                     line = re.sub(r'^cert_key.*',
                                   'cert_key = /etc/grafana/certs/cert_key', line)
                 grafana_ini.write(line)
-        os.rename('{}.new'.format(filename), filename)
     except OSError as err:
         raise Error('Cannot update {}: {}'.format(filename, err))
 
@@ -7396,8 +7907,9 @@ def command_rm_daemon(ctx):
     else:
         call_throws(ctx, ['rm', '-rf', data_dir])
 
-    if 'tcp_ports' in ctx and ctx.tcp_ports is not None:
-        ports: List[int] = [int(p) for p in ctx.tcp_ports.split()]
+    endpoints = fetch_tcp_ports(ctx)
+    ports: List[int] = [e.port for e in endpoints]
+    if ports:
         try:
             fw = Firewalld(ctx)
             fw.close_ports(ports)
@@ -7467,14 +7979,20 @@ def get_ceph_cluster_count(ctx: CephadmContext) -> int:
     return len([c for c in os.listdir(ctx.data_dir) if is_fsid(c)])
 
 
-def command_rm_cluster(ctx):
-    # type: (CephadmContext) -> None
+def command_rm_cluster(ctx: CephadmContext) -> None:
     if not ctx.force:
         raise Error('must pass --force to proceed: '
                     'this command may destroy precious data!')
 
     lock = FileLock(ctx, ctx.fsid)
     lock.acquire()
+    _rm_cluster(ctx, ctx.keep_logs, ctx.zap_osds)
+
+
+def _rm_cluster(ctx: CephadmContext, keep_logs: bool, zap_osds: bool) -> None:
+
+    if not ctx.fsid:
+        raise Error('must select the cluster to delete by passing --fsid to proceed')
 
     def disable_systemd_service(unit_name: str) -> None:
         call(ctx, ['systemctl', 'stop', unit_name],
@@ -7484,6 +8002,8 @@ def command_rm_cluster(ctx):
         call(ctx, ['systemctl', 'disable', unit_name],
              verbosity=CallVerbosity.DEBUG)
 
+    logger.info(f'Deleting cluster with fsid: {ctx.fsid}')
+
     # stop + disable individual daemon units
     for d in list_daemons(ctx, detail=False):
         if d['fsid'] != ctx.fsid:
@@ -7501,7 +8021,7 @@ def command_rm_cluster(ctx):
          verbosity=CallVerbosity.DEBUG)
 
     # osds?
-    if ctx.zap_osds:
+    if zap_osds:
         _zap_osds(ctx)
 
     # rm units
@@ -7514,7 +8034,7 @@ def command_rm_cluster(ctx):
     # rm data
     call_throws(ctx, ['rm', '-rf', ctx.data_dir + '/' + ctx.fsid])
 
-    if not ctx.keep_logs:
+    if not keep_logs:
         # rm logs
         call_throws(ctx, ['rm', '-rf', ctx.log_dir + '/' + ctx.fsid])
         call_throws(ctx, ['rm', '-rf', ctx.log_dir
@@ -7534,7 +8054,7 @@ def command_rm_cluster(ctx):
         # rm cephadm logrotate config
         call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/cephadm'])
 
-        if not ctx.keep_logs:
+        if not keep_logs:
             # remove all cephadm logs
             for fname in glob(f'{ctx.log_dir}/cephadm.log*'):
                 os.remove(fname)
@@ -7547,7 +8067,7 @@ def command_rm_cluster(ctx):
             p.unlink()
 
     # cleanup remaining ceph directories
-    ceph_dirs = [f'/run/ceph/{ctx.fsid}', f'/tmp/var/lib/ceph/{ctx.fsid}', f'/var/run/ceph/{ctx.fsid}']
+    ceph_dirs = [f'/run/ceph/{ctx.fsid}', f'/tmp/cephadm-{ctx.fsid}', f'/var/run/ceph/{ctx.fsid}']
     for dd in ceph_dirs:
         shutil.rmtree(dd, ignore_errors=True)
 
@@ -7671,7 +8191,7 @@ def authorize_ssh_key(ssh_pub_key: str, ssh_user: str) -> bool:
 
     with open(auth_keys_file, 'a') as f:
         os.fchown(f.fileno(), ssh_uid, ssh_gid)  # just in case we created it
-        os.fchmod(f.fileno(), 0o600)  # just in case we created it
+        os.fchmod(f.fileno(), DEFAULT_MODE)  # just in case we created it
         if add_newline:
             f.write('\n')
         f.write(ssh_pub_key + '\n')
@@ -7690,7 +8210,7 @@ def revoke_ssh_key(key: str, ssh_user: str) -> None:
         _, filename = tempfile.mkstemp()
         with open(filename, 'w') as f:
             os.fchown(f.fileno(), ssh_uid, ssh_gid)
-            os.fchmod(f.fileno(), 0o600)  # secure access to the keys file
+            os.fchmod(f.fileno(), DEFAULT_MODE)  # secure access to the keys file
             for line in lines:
                 if line.strip() == key.strip():
                     deleted = True
@@ -7715,11 +8235,17 @@ def check_ssh_connectivity(ctx: CephadmContext) -> None:
         logger.warning('Cannot check ssh connectivity. Skipping...')
         return
 
-    logger.info('Verifying ssh connectivity ...')
+    ssh_priv_key_path = ''
+    ssh_pub_key_path = ''
+    ssh_signed_cert_path = ''
     if ctx.ssh_private_key and ctx.ssh_public_key:
         # let's use the keys provided by the user
         ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
         ssh_pub_key_path = pathify(ctx.ssh_public_key.name)
+    elif ctx.ssh_private_key and ctx.ssh_signed_cert:
+        # CA signed keys use case
+        ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
+        ssh_signed_cert_path = pathify(ctx.ssh_signed_cert.name)
     else:
         # no custom keys, let's generate some random keys just for this check
         ssh_priv_key_path = f'/tmp/ssh_key_{uuid.uuid1()}'
@@ -7730,31 +8256,35 @@ def check_ssh_connectivity(ctx: CephadmContext) -> None:
             logger.warning('Cannot generate keys to check ssh connectivity.')
             return
 
-    with open(ssh_pub_key_path, 'r') as f:
-        key = f.read().strip()
-    new_key = authorize_ssh_key(key, ctx.ssh_user)
-    ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
-    _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
-                            *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
-                            '-o PasswordAuthentication=no',
-                            f'{ctx.ssh_user}@{get_hostname()}',
-                            'sudo echo'])
-
-    # we only remove the key if it's a new one. In case the user has provided
-    # some already existing key then we don't alter authorized_keys file
-    if new_key:
-        revoke_ssh_key(key, ctx.ssh_user)
-
-    pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
-    prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
-    ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
-    err_msg = f"""
+    if ssh_signed_cert_path:
+        logger.info('Verification for CA signed keys authentication not implemented. Skipping ...')
+    elif ssh_pub_key_path:
+        logger.info('Verifying ssh connectivity using standard pubkey authentication ...')
+        with open(ssh_pub_key_path, 'r') as f:
+            key = f.read().strip()
+        new_key = authorize_ssh_key(key, ctx.ssh_user)
+        ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
+        _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
+                                *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
+                                '-o PasswordAuthentication=no',
+                                f'{ctx.ssh_user}@{get_hostname()}',
+                                'sudo echo'])
+
+        # we only remove the key if it's a new one. In case the user has provided
+        # some already existing key then we don't alter authorized_keys file
+        if new_key:
+            revoke_ssh_key(key, ctx.ssh_user)
+
+        pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
+        prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
+        ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
+        err_msg = f"""
 ** Please verify your user's ssh configuration and make sure:
 - User {ctx.ssh_user} must have passwordless sudo access
 {pub_key_msg}{prv_key_msg}{ssh_cfg_msg}
 """
-    if code != 0:
-        raise Error(err_msg)
+        if code != 0:
+            raise Error(err_msg)
 
 
 def command_prepare_host(ctx: CephadmContext) -> None:
@@ -9321,6 +9851,93 @@ def command_maintenance(ctx: CephadmContext) -> str:
 ##################################
 
 
+class ArgumentFacade:
+    def __init__(self) -> None:
+        self.defaults: Dict[str, Any] = {}
+
+    def add_argument(self, *args: Any, **kwargs: Any) -> None:
+        if not args:
+            raise ValueError('expected at least one argument')
+        name = args[0]
+        if not name.startswith('--'):
+            raise ValueError(f'expected long option, got: {name!r}')
+        name = name[2:].replace('-', '_')
+        value = kwargs.pop('default', None)
+        self.defaults[name] = value
+
+    def apply(self, ctx: CephadmContext) -> None:
+        for key, value in self.defaults.items():
+            setattr(ctx, key, value)
+
+
+def _add_deploy_parser_args(
+    parser_deploy: Union[argparse.ArgumentParser, ArgumentFacade],
+) -> None:
+    parser_deploy.add_argument(
+        '--config', '-c',
+        help='config file for new daemon')
+    parser_deploy.add_argument(
+        '--config-json',
+        help='Additional configuration information in JSON format')
+    parser_deploy.add_argument(
+        '--keyring',
+        help='keyring for new daemon')
+    parser_deploy.add_argument(
+        '--key',
+        help='key for new daemon')
+    parser_deploy.add_argument(
+        '--osd-fsid',
+        help='OSD uuid, if creating an OSD container')
+    parser_deploy.add_argument(
+        '--skip-firewalld',
+        action='store_true',
+        help='Do not configure firewalld')
+    parser_deploy.add_argument(
+        '--tcp-ports',
+        help='List of tcp ports to open in the host firewall')
+    parser_deploy.add_argument(
+        '--port-ips',
+        help='JSON dict mapping ports to IPs they need to be bound on'
+    )
+    parser_deploy.add_argument(
+        '--reconfig',
+        action='store_true',
+        help='Reconfigure a previously deployed daemon')
+    parser_deploy.add_argument(
+        '--allow-ptrace',
+        action='store_true',
+        help='Allow SYS_PTRACE on daemon container')
+    parser_deploy.add_argument(
+        '--container-init',
+        action='store_true',
+        default=CONTAINER_INIT,
+        help=argparse.SUPPRESS)
+    parser_deploy.add_argument(
+        '--memory-request',
+        help='Container memory request/target'
+    )
+    parser_deploy.add_argument(
+        '--memory-limit',
+        help='Container memory hard limit'
+    )
+    parser_deploy.add_argument(
+        '--meta-json',
+        help='JSON dict of additional metadata'
+    )
+    parser_deploy.add_argument(
+        '--extra-container-args',
+        action='append',
+        default=[],
+        help='Additional container arguments to apply to daemon'
+    )
+    parser_deploy.add_argument(
+        '--extra-entrypoint-args',
+        action='append',
+        default=[],
+        help='Additional entrypoint arguments to apply to deamon'
+    )
+
+
 def _get_parser():
     # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(
@@ -9387,7 +10004,7 @@ def _get_parser():
     subparsers = parser.add_subparsers(help='sub-command')
 
     parser_version = subparsers.add_parser(
-        'version', help='get ceph version from container')
+        'version', help='get cephadm version')
     parser_version.set_defaults(func=command_version)
 
     parser_pull = subparsers.add_parser(
@@ -9558,6 +10175,10 @@ def _get_parser():
         '--no-hosts',
         action='store_true',
         help='dont pass /etc/hosts through to the container')
+    parser_shell.add_argument(
+        '--dry-run',
+        action='store_true',
+        help='print, but do not execute, the container command to start the shell')
 
     parser_enter = subparsers.add_parser(
         'enter', help='run an interactive shell inside a running daemon container')
@@ -9714,6 +10335,10 @@ def _get_parser():
         '--ssh-public-key',
         type=argparse.FileType('r'),
         help='SSH public key')
+    parser_bootstrap.add_argument(
+        '--ssh-signed-cert',
+        type=argparse.FileType('r'),
+        help='Signed cert for setups using CA signed SSH keys')
     parser_bootstrap.add_argument(
         '--ssh-user',
         default='root',
@@ -9750,6 +10375,11 @@ def _get_parser():
         '--allow-overwrite',
         action='store_true',
         help='allow overwrite of existing --output-* config/keyring/ssh files')
+    parser_bootstrap.add_argument(
+        '--cleanup-on-failure',
+        action='store_true',
+        default=False,
+        help='Delete cluster files in case of a failed installation')
     parser_bootstrap.add_argument(
         '--allow-fqdn-hostname',
         action='store_true',
@@ -9823,64 +10453,29 @@ def _get_parser():
         '--fsid',
         required=True,
         help='cluster FSID')
-    parser_deploy.add_argument(
-        '--config', '-c',
-        help='config file for new daemon')
-    parser_deploy.add_argument(
-        '--config-json',
-        help='Additional configuration information in JSON format')
-    parser_deploy.add_argument(
-        '--keyring',
-        help='keyring for new daemon')
-    parser_deploy.add_argument(
-        '--key',
-        help='key for new daemon')
-    parser_deploy.add_argument(
-        '--osd-fsid',
-        help='OSD uuid, if creating an OSD container')
-    parser_deploy.add_argument(
-        '--skip-firewalld',
-        action='store_true',
-        help='Do not configure firewalld')
-    parser_deploy.add_argument(
-        '--tcp-ports',
-        help='List of tcp ports to open in the host firewall')
-    parser_deploy.add_argument(
-        '--reconfig',
-        action='store_true',
-        help='Reconfigure a previously deployed daemon')
-    parser_deploy.add_argument(
-        '--allow-ptrace',
-        action='store_true',
-        help='Allow SYS_PTRACE on daemon container')
-    parser_deploy.add_argument(
-        '--container-init',
-        action='store_true',
-        default=CONTAINER_INIT,
-        help=argparse.SUPPRESS)
-    parser_deploy.add_argument(
-        '--memory-request',
-        help='Container memory request/target'
-    )
-    parser_deploy.add_argument(
-        '--memory-limit',
-        help='Container memory hard limit'
-    )
-    parser_deploy.add_argument(
-        '--meta-json',
-        help='JSON dict of additional metadata'
+    _add_deploy_parser_args(parser_deploy)
+
+    parser_orch = subparsers.add_parser(
+        '_orch',
     )
-    parser_deploy.add_argument(
-        '--extra-container-args',
-        action='append',
-        default=[],
-        help='Additional container arguments to apply to daemon'
+    subparsers_orch = parser_orch.add_subparsers(
+        title='Orchestrator Driven Commands',
+        description='Commands that are typically only run by cephadm mgr module',
     )
-    parser_deploy.add_argument(
-        '--extra-entrypoint-args',
-        action='append',
-        default=[],
-        help='Additional entrypoint arguments to apply to deamon'
+
+    parser_deploy_from = subparsers_orch.add_parser(
+        'deploy', help='deploy a daemon')
+    parser_deploy_from.set_defaults(func=command_deploy_from)
+    # currently cephadm mgr module passes an fsid option on the CLI too
+    # TODO: remove this and always source fsid from the JSON?
+    parser_deploy_from.add_argument(
+        '--fsid',
+        help='cluster FSID')
+    parser_deploy_from.add_argument(
+        'source',
+        default='-',
+        nargs='?',
+        help='Configuration input source file',
     )
 
     parser_check_host = subparsers.add_parser(
@@ -10066,6 +10661,15 @@ def main() -> None:
         sys.stderr.write('No command specified; pass -h or --help for usage\n')
         sys.exit(1)
 
+    if ctx.has_function() and getattr(ctx.func, '_execute_early', False):
+        try:
+            sys.exit(ctx.func(ctx))
+        except Error as e:
+            if ctx.verbose:
+                raise
+            logger.error('ERROR: %s' % e)
+            sys.exit(1)
+
     cephadm_require_root()
     cephadm_init_logging(ctx, av)
     try:
@@ -10082,7 +10686,7 @@ def main() -> None:
             check_container_engine(ctx)
         # command handler
         r = ctx.func(ctx)
-    except Error as e:
+    except (Error, ClusterAlreadyExists) as e:
         if ctx.verbose:
             raise
         logger.error('ERROR: %s' % e)