]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cephadm/cephadm
import ceph quincy 17.2.1
[ceph.git] / ceph / src / cephadm / cephadm
index ca64ae472e37d0acbb16216579763157545b065a..2353e7b1e74b28607f8b4f75e2bb8af48d81a77d 100755 (executable)
@@ -26,7 +26,6 @@ import errno
 import struct
 import ssl
 from enum import Enum
-
 from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable
 
 import re
@@ -49,6 +48,8 @@ DEFAULT_IMAGE = 'quay.io/ceph/ceph:v17'
 DEFAULT_IMAGE_IS_MASTER = False
 DEFAULT_IMAGE_RELEASE = 'quincy'
 DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.33.4'
+DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
+DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
 DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.3.1'
 DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.23.0'
 DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:8.3.5'
@@ -63,8 +64,15 @@ DATA_DIR = '/var/lib/ceph'
 LOG_DIR = '/var/log/ceph'
 LOCK_DIR = '/run/cephadm'
 LOGROTATE_DIR = '/etc/logrotate.d'
-SYSCTL_DIR = '/usr/lib/sysctl.d'
+SYSCTL_DIR = '/etc/sysctl.d'
 UNIT_DIR = '/etc/systemd/system'
+CEPH_CONF_DIR = 'config'
+CEPH_CONF = 'ceph.conf'
+CEPH_PUBKEY = 'ceph.pub'
+CEPH_KEYRING = 'ceph.client.admin.keyring'
+CEPH_DEFAULT_CONF = f'/etc/ceph/{CEPH_CONF}'
+CEPH_DEFAULT_KEYRING = f'/etc/ceph/{CEPH_KEYRING}'
+CEPH_DEFAULT_PUBKEY = f'/etc/ceph/{CEPH_PUBKEY}'
 LOG_DIR_MODE = 0o770
 DATA_DIR_MODE = 0o700
 CONTAINER_INIT = True
@@ -73,8 +81,6 @@ CGROUPS_SPLIT_PODMAN_VERSION = (2, 1, 0)
 CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
 DEFAULT_TIMEOUT = None  # in seconds
 DEFAULT_RETRY = 15
-SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf'
-SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring'
 DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
 
 logger: logging.Logger = None  # type: ignore
@@ -104,6 +110,42 @@ cached_stdin = None
 ##################################
 
 
+class EndPoint:
+    """EndPoint representing an ip:port format"""
+
+    def __init__(self, ip: str, port: int) -> None:
+        self.ip = ip
+        self.port = port
+
+    def __str__(self) -> str:
+        return f'{self.ip}:{self.port}'
+
+    def __repr__(self) -> str:
+        return f'{self.ip}:{self.port}'
+
+
+class ContainerInfo:
+    def __init__(self, container_id: str,
+                 image_name: str,
+                 image_id: str,
+                 start: str,
+                 version: str) -> None:
+        self.container_id = container_id
+        self.image_name = image_name
+        self.image_id = image_id
+        self.start = start
+        self.version = version
+
+    def __eq__(self, other: Any) -> bool:
+        if not isinstance(other, ContainerInfo):
+            return NotImplemented
+        return (self.container_id == other.container_id
+                and self.image_name == other.image_name
+                and self.image_id == other.image_id
+                and self.start == other.start
+                and self.version == other.version)
+
+
 class BaseConfig:
 
     def __init__(self) -> None:
@@ -207,7 +249,9 @@ class Docker(ContainerEngine):
 CONTAINER_PREFERENCE = (Podman, Docker)  # prefer podman to docker
 
 
-# Log and console output config
+# During normal cephadm operations (cephadm ls, gather-facts, etc ) we use:
+# stdout: for JSON output only
+# stderr: for error, debug, info, etc
 logging_config = {
     'version': 1,
     'disable_existing_loggers': True,
@@ -237,6 +281,56 @@ logging_config = {
 }
 
 
+class ExcludeErrorsFilter(logging.Filter):
+    def filter(self, record: logging.LogRecord) -> bool:
+        """Only lets through log messages with log level below WARNING ."""
+        return record.levelno < logging.WARNING
+
+
+# When cephadm is used as standard binary (bootstrap, rm-cluster, etc) we use:
+# stdout: for debug and info
+# stderr: for errors and warnings
+interactive_logging_config = {
+    'version': 1,
+    'filters': {
+        'exclude_errors': {
+            '()': ExcludeErrorsFilter
+        }
+    },
+    'disable_existing_loggers': True,
+    'formatters': {
+        'cephadm': {
+            'format': '%(asctime)s %(thread)x %(levelname)s %(message)s'
+        },
+    },
+    'handlers': {
+        'console_stdout': {
+            'level': 'INFO',
+            'class': 'logging.StreamHandler',
+            'filters': ['exclude_errors'],
+            'stream': sys.stdout
+        },
+        'console_stderr': {
+            'level': 'WARNING',
+            'class': 'logging.StreamHandler',
+            'stream': sys.stderr
+        },
+        'log_file': {
+            'level': 'DEBUG',
+            'class': 'logging.handlers.WatchedFileHandler',
+            'formatter': 'cephadm',
+            'filename': '%s/cephadm.log' % LOG_DIR,
+        }
+    },
+    'loggers': {
+        '': {
+            'level': 'DEBUG',
+            'handlers': ['console_stdout', 'console_stderr', 'log_file'],
+        }
+    }
+}
+
+
 class termcolor:
     yellow = '\033[93m'
     red = '\033[31m'
@@ -250,11 +344,15 @@ class Error(Exception):
 class TimeoutExpired(Error):
     pass
 
+
+class UnauthorizedRegistryError(Error):
+    pass
+
 ##################################
 
 
 class Ceph(object):
-    daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
+    daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
                'crash', 'cephfs-mirror')
 
 ##################################
@@ -426,6 +524,8 @@ class Monitoring(object):
         'node-exporter': [9100],
         'grafana': [3000],
         'alertmanager': [9093, 9094],
+        'loki': [3100],
+        'promtail': [9080]
     }
 
     components = {
@@ -441,6 +541,28 @@ class Monitoring(object):
                 'prometheus.yml',
             ],
         },
+        'loki': {
+            'image': DEFAULT_LOKI_IMAGE,
+            'cpus': '1',
+            'memory': '1GB',
+            'args': [
+                '--config.file=/etc/loki/loki.yml',
+            ],
+            'config-json-files': [
+                'loki.yml'
+            ],
+        },
+        'promtail': {
+            'image': DEFAULT_PROMTAIL_IMAGE,
+            'cpus': '1',
+            'memory': '1GB',
+            'args': [
+                '--config.file=/etc/promtail/promtail.yml',
+            ],
+            'config-json-files': [
+                'promtail.yml',
+            ],
+        },
         'node-exporter': {
             'image': DEFAULT_NODE_EXPORTER_IMAGE,
             'cpus': '1',
@@ -481,9 +603,9 @@ class Monitoring(object):
     def get_version(ctx, container_id, daemon_type):
         # type: (CephadmContext, str, str) -> str
         """
-        :param: daemon_type Either "prometheus", "alertmanager" or "node-exporter"
+        :param: daemon_type Either "prometheus", "alertmanager", "loki", "promtail" or "node-exporter"
         """
-        assert daemon_type in ('prometheus', 'alertmanager', 'node-exporter')
+        assert daemon_type in ('prometheus', 'alertmanager', 'node-exporter', 'loki', 'promtail')
         cmd = daemon_type.replace('-', '_')
         code = -1
         err = ''
@@ -580,7 +702,7 @@ class NFSGanesha(object):
     def get_container_envs():
         # type: () -> List[str]
         envs = [
-            'CEPH_CONF=%s' % ('/etc/ceph/ceph.conf')
+            'CEPH_CONF=%s' % (CEPH_DEFAULT_CONF)
         ]
         return envs
 
@@ -1204,16 +1326,17 @@ def port_in_use(ctx, port_num):
     ))
 
 
-def check_ip_port(ctx, ip, port):
-    # type: (CephadmContext, str, int) -> None
+def check_ip_port(ctx, ep):
+    # type: (CephadmContext, EndPoint) -> None
     if not ctx.skip_ping_check:
-        logger.info('Verifying IP %s port %d ...' % (ip, port))
-        if is_ipv6(ip):
+        logger.info(f'Verifying IP {ep.ip} port {ep.port} ...')
+        if is_ipv6(ep.ip):
             s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
-            ip = unwrap_ipv6(ip)
+            ip = unwrap_ipv6(ep.ip)
         else:
             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        attempt_bind(ctx, s, ip, port)
+            ip = ep.ip
+        attempt_bind(ctx, s, ip, ep.port)
 
 ##################################
 
@@ -1852,7 +1975,7 @@ def infer_fsid(func: FuncT) -> FuncT:
             logger.info('Inferring fsid %s' % fsids[0])
             ctx.fsid = fsids[0]
         else:
-            raise Error('Cannot infer an fsid, one must be specified: %s' % fsids)
+            raise Error('Cannot infer an fsid, one must be specified (using --fsid): %s' % fsids)
         return func(ctx)
 
     return cast(FuncT, _infer_fsid)
@@ -1860,29 +1983,54 @@ def infer_fsid(func: FuncT) -> FuncT:
 
 def infer_config(func: FuncT) -> FuncT:
     """
-    If we find a MON daemon, use the config from that container
+    Infer the clusater configuration using the followign priority order:
+     1- if the user has provided custom conf file (-c option) use it
+     2- otherwise if daemon --name has been provided use daemon conf
+     3- otherwise find the mon daemon conf file and use it (if v1)
+     4- otherwise if {ctx.data_dir}/{fsid}/{CEPH_CONF_DIR} dir exists use it
+     5- finally: fallback to the default file /etc/ceph/ceph.conf
     """
     @wraps(func)
     def _infer_config(ctx: CephadmContext) -> Any:
+
+        def config_path(daemon_type: str, daemon_name: str) -> str:
+            data_dir = get_data_dir(ctx.fsid, ctx.data_dir, daemon_type, daemon_name)
+            return os.path.join(data_dir, 'config')
+
+        def get_mon_daemon_name(fsid: str) -> Optional[str]:
+            daemon_list = list_daemons(ctx, detail=False)
+            for daemon in daemon_list:
+                if (
+                    daemon.get('name', '').startswith('mon.')
+                    and daemon.get('fsid', '') == fsid
+                    and daemon.get('style', '') == 'cephadm:v1'
+                    and os.path.exists(config_path('mon', daemon['name'].split('.', 1)[1]))
+                ):
+                    return daemon['name']
+            return None
+
         ctx.config = ctx.config if 'config' in ctx else None
-        if ctx.config:
-            logger.debug('Using specified config: %s' % ctx.config)
+        #  check if user has provided conf by using -c option
+        if ctx.config and (ctx.config != CEPH_DEFAULT_CONF):
+            logger.debug(f'Using specified config: {ctx.config}')
             return func(ctx)
+
         if 'fsid' in ctx and ctx.fsid:
-            name = ctx.name if 'name' in ctx else None
-            if not name:
-                daemon_list = list_daemons(ctx, detail=False)
-                for daemon in daemon_list:
-                    if daemon.get('name', '').startswith('mon.') and daemon.get('fsid', '') == ctx.fsid:
-                        name = daemon['name']
-                        break
-            if name:
-                ctx.config = f'/var/lib/ceph/{ctx.fsid}/{name}/config'
+            name = ctx.name if ('name' in ctx and ctx.name) else get_mon_daemon_name(ctx.fsid)
+            if name is not None:
+                # daemon name has been specified (or inffered from mon), let's use its conf
+                ctx.config = config_path(name.split('.', 1)[0], name.split('.', 1)[1])
+            else:
+                # no daemon, in case the cluster has a config dir then use it
+                ceph_conf = f'{ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_CONF}'
+                if os.path.exists(ceph_conf):
+                    ctx.config = ceph_conf
+
         if ctx.config:
-            logger.info('Inferring config %s' % ctx.config)
-        elif os.path.exists(SHELL_DEFAULT_CONF):
-            logger.debug('Using default config: %s' % SHELL_DEFAULT_CONF)
-            ctx.config = SHELL_DEFAULT_CONF
+            logger.info(f'Inferring config {ctx.config}')
+        elif os.path.exists(CEPH_DEFAULT_CONF):
+            logger.debug(f'Using default config {CEPH_DEFAULT_CONF}')
+            ctx.config = CEPH_DEFAULT_CONF
         return func(ctx)
 
     return cast(FuncT, _infer_config)
@@ -1908,7 +2056,7 @@ def infer_image(func: FuncT) -> FuncT:
         if not ctx.image:
             ctx.image = os.environ.get('CEPHADM_IMAGE')
         if not ctx.image:
-            ctx.image = get_last_local_ceph_image(ctx, ctx.container_engine.path)
+            ctx.image = infer_local_ceph_image(ctx, ctx.container_engine.path)
         if not ctx.image:
             ctx.image = _get_default_image(ctx)
         return func(ctx)
@@ -1940,24 +2088,70 @@ def default_image(func: FuncT) -> FuncT:
     return cast(FuncT, _default_image)
 
 
-def get_last_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
+def get_container_info(ctx: CephadmContext, daemon_filter: str, by_name: bool) -> Optional[ContainerInfo]:
+    """
+    :param ctx: Cephadm context
+    :param daemon_filter: daemon name or type
+    :param by_name: must be set to True if daemon name is provided
+    :return: Container information or None
     """
+    def daemon_name_or_type(daemon: Dict[str, str]) -> str:
+        return daemon['name'] if by_name else daemon['name'].split('.', 1)[0]
+
+    if by_name and '.' not in daemon_filter:
+        logger.warning(f'Trying to get container info using invalid daemon name {daemon_filter}')
+        return None
+    daemons = list_daemons(ctx, detail=False)
+    matching_daemons = [d for d in daemons if daemon_name_or_type(d) == daemon_filter and d['fsid'] == ctx.fsid]
+    if matching_daemons:
+        d_type, d_id = matching_daemons[0]['name'].split('.', 1)
+        out, _, code = get_container_stats(ctx, ctx.container_engine.path, ctx.fsid, d_type, d_id)
+        if not code:
+            (container_id, image_name, image_id, start, version) = out.strip().split(',')
+            return ContainerInfo(container_id, image_name, image_id, start, version)
+    return None
+
+
+def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional[str]:
+    """
+     Infer the local ceph image based on the following priority criteria:
+       1- the image specified by --image arg (if provided).
+       2- the same image as the daemon container specified by --name arg (if provided).
+       3- image used by any ceph container running on the host. In this case we use daemon types.
+       4- if no container is found then we use the most ceph recent image on the host.
+
+     Note: any selected container must have the same fsid inferred previously.
+
     :return: The most recent local ceph image (already pulled)
     """
+    # '|' special character is used to separate the output fields into:
+    #  - Repository@digest
+    #  - Image Id
+    #  - Image Tag
+    #  - Image creation date
     out, _, _ = call_throws(ctx,
                             [container_path, 'images',
                              '--filter', 'label=ceph=True',
                              '--filter', 'dangling=false',
-                             '--format', '{{.Repository}}@{{.Digest}}'])
-    return _filter_last_local_ceph_image(out)
-
+                             '--format', '{{.Repository}}@{{.Digest}}|{{.ID}}|{{.Tag}}|{{.CreatedAt}}'])
+
+    container_info = None
+    daemon_name = ctx.name if ('name' in ctx and ctx.name and '.' in ctx.name) else None
+    daemons_ls = [daemon_name] if daemon_name is not None else Ceph.daemons  # daemon types: 'mon', 'mgr', etc
+    for daemon in daemons_ls:
+        container_info = get_container_info(ctx, daemon, daemon_name is not None)
+        if container_info is not None:
+            logger.debug(f"Using container info for daemon '{daemon}'")
+            break
 
-def _filter_last_local_ceph_image(out):
-    # type: (str) -> Optional[str]
     for image in out.splitlines():
-        if image and not image.endswith('@'):
-            logger.info('Using recent ceph image %s' % image)
-            return image
+        if image and not image.isspace():
+            (digest, image_id, tag, created_date) = image.lstrip().split('|')
+            if container_info is not None and image_id not in container_info.image_id:
+                continue
+            if digest and not digest.endswith('@'):
+                logger.info(f"Using ceph image with id '{image_id}' and tag '{tag}' created on {created_date}\n{digest}")
+                return digest
     return None
 
 
@@ -2098,6 +2292,13 @@ def move_files(ctx, src, dst, uid=None, gid=None):
             os.chown(dst_file, uid, gid)
 
 
+def recursive_chown(path: str, uid: int, gid: int) -> None:
+    for dirpath, dirnames, filenames in os.walk(path):
+        os.chown(dirpath, uid, gid)
+        for filename in filenames:
+            os.chown(os.path.join(dirpath, filename), uid, gid)
+
+
 # copied from distutils
 def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]:
     """Tries to find 'executable' in the directories listed in 'path'.
@@ -2333,7 +2534,7 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
         metadata = Monitoring.components[daemon_type]
         r += metadata.get('args', list())
         # set ip and port to bind to for nodeexporter,alertmanager,prometheus
-        if daemon_type != 'grafana':
+        if daemon_type not in ['grafana', 'loki', 'promtail']:
             ip = ''
             port = Monitoring.port_map[daemon_type][0]
             if 'meta_json' in ctx and ctx.meta_json:
@@ -2343,6 +2544,10 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
                 if 'ports' in meta and meta['ports']:
                     port = meta['ports'][0]
             r += [f'--web.listen-address={ip}:{port}']
+            if daemon_type == 'prometheus':
+                scheme = 'http'
+                host = get_fqdn()
+                r += [f'--web.external-url={scheme}://{host}:{port}']
         if daemon_type == 'alertmanager':
             config = get_parm(ctx.config_json)
             peers = config.get('peers', list())  # type: ignore
@@ -2350,6 +2555,14 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
                 r += ['--cluster.peer={}'.format(peer)]
             # some alertmanager, by default, look elsewhere for a config
             r += ['--config.file=/etc/alertmanager/alertmanager.yml']
+        if daemon_type == 'loki':
+            r += ['--config.file=/etc/loki/loki.yml']
+        if daemon_type == 'promtail':
+            r += ['--config.file=/etc/promtail/promtail.yml']
+        if daemon_type == 'node-exporter':
+            r += ['--path.procfs=/host/proc',
+                  '--path.sysfs=/host/sys',
+                  '--path.rootfs=/rootfs']
     elif daemon_type == NFSGanesha.daemon_type:
         nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
         r += nfs_ganesha.get_daemon_args()
@@ -2403,6 +2616,8 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
             makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
             makedirs(os.path.join(data_dir_root, config_dir, 'alerting'), uid, gid, 0o755)
             makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
+            recursive_chown(os.path.join(data_dir_root, 'etc'), uid, gid)
+            recursive_chown(os.path.join(data_dir_root, 'data'), uid, gid)
         elif daemon_type == 'grafana':
             data_dir_root = get_data_dir(fsid, ctx.data_dir,
                                          daemon_type, daemon_id)
@@ -2418,6 +2633,18 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
             config_dir = 'etc/alertmanager'
             makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
             makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
+        elif daemon_type == 'promtail':
+            data_dir_root = get_data_dir(fsid, ctx.data_dir,
+                                         daemon_type, daemon_id)
+            config_dir = 'etc/promtail'
+            makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
+            makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
+        elif daemon_type == 'loki':
+            data_dir_root = get_data_dir(fsid, ctx.data_dir,
+                                         daemon_type, daemon_id)
+            config_dir = 'etc/loki'
+            makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
+            makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
 
         # populate the config directory for the component from the config-json
         if 'files' in config_json:
@@ -2603,9 +2830,17 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
 
     if daemon_type in Monitoring.components and daemon_id:
         data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+        log_dir = get_log_dir(fsid, ctx.log_dir)
         if daemon_type == 'prometheus':
             mounts[os.path.join(data_dir, 'etc/prometheus')] = '/etc/prometheus:Z'
             mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z'
+        elif daemon_type == 'loki':
+            mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z'
+            mounts[os.path.join(data_dir, 'data')] = '/loki:Z'
+        elif daemon_type == 'promtail':
+            mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z'
+            mounts[log_dir] = '/var/log/ceph:z'
+            mounts[os.path.join(data_dir, 'data')] = '/promtail:Z'
         elif daemon_type == 'node-exporter':
             mounts['/proc'] = '/host/proc:ro'
             mounts['/sys'] = '/host/sys:ro'
@@ -2744,6 +2979,11 @@ def get_container(ctx: CephadmContext,
             # by ubuntu 18.04 kernel!)
         ]
         container_args.extend(monitoring_args)
+        if daemon_type == 'node-exporter':
+            # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys',
+            # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation
+            # between the node-exporter container and the host to avoid selinux denials
+            container_args.extend(['--security-opt', 'label=disable'])
     elif daemon_type == 'crash':
         ceph_args = ['-n', name]
     elif daemon_type in Ceph.daemons:
@@ -3048,6 +3288,16 @@ def deploy_daemon_units(
                     bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
                     cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
                 )
+                if 'cluster' in ctx and ctx.cluster:
+                    # ctx.cluster is only set during adoption of a daemon from a cluster
+                    # with a custom name (not "ceph"). The initial activate command the first
+                    # time we start the new cephadm based systemd unit for this osd must account
+                    # for this by mounting to the correct data dir in the container. Otherwise
+                    # necessary files from the old data dir of the daemon won't be copied over
+                    # to the new data dir on the host. After the first start (e.g. on any redeploys)
+                    # this is no longer necessary as we will have these files in the data dir on the host
+                    if data_dir in prestart.volume_mounts:
+                        prestart.volume_mounts[data_dir] = f'/var/lib/ceph/osd/{ctx.cluster}-{daemon_id}'
                 _write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate')
         elif daemon_type == CephIscsi.daemon_type:
             f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
@@ -3104,8 +3354,12 @@ def deploy_daemon_units(
 
     # post-stop command(s)
     with open(data_dir + '/unit.stop.new', 'w') as f:
-        f.write('! ' + ' '.join(c.stop_cmd()) + '\n')
-        f.write('! ' + ' '.join(c.stop_cmd(old_cname=True)) + '\n')
+        # following generated script basically checks if the container exists
+        # before stopping it. Exit code will be success either if it doesn't
+        # exist or if it exists and is stopped successfully.
+        container_exists = f'{ctx.container_engine.path} inspect %s &>/dev/null'
+        f.write(f'! {container_exists % c.old_cname} || {" ".join(c.stop_cmd(old_cname=True))} \n')
+        f.write(f'! {container_exists % c.cname} || {" ".join(c.stop_cmd())} \n')
 
         os.fchmod(f.fileno(), 0o600)
         os.rename(data_dir + '/unit.stop.new',
@@ -3251,9 +3505,10 @@ class Firewalld(object):
 
 def update_firewalld(ctx, daemon_type):
     # type: (CephadmContext, str) -> None
-    firewall = Firewalld(ctx)
-    firewall.enable_service_for(daemon_type)
-    firewall.apply_rules()
+    if not ('skip_firewalld' in ctx and ctx.skip_firewalld):
+        firewall = Firewalld(ctx)
+        firewall.enable_service_for(daemon_type)
+        firewall.apply_rules()
 
 
 def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
@@ -3287,6 +3542,47 @@ def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
         call_throws(ctx, ['sysctl', '--system'])
 
 
+def migrate_sysctl_dir(ctx: CephadmContext, fsid: str) -> None:
+    """
+    Cephadm once used '/usr/lib/sysctl.d' for storing sysctl configuration.
+    This moves it to '/etc/sysctl.d'.
+    """
+    deprecated_location: str = '/usr/lib/sysctl.d'
+    deprecated_confs: List[str] = glob(f'{deprecated_location}/90-ceph-{fsid}-*.conf')
+    if not deprecated_confs:
+        return
+
+    file_count: int = len(deprecated_confs)
+    logger.info(f'Found sysctl {file_count} files in deprecated location {deprecated_location}. Starting Migration.')
+    for conf in deprecated_confs:
+        try:
+            shutil.move(conf, ctx.sysctl_dir)
+            file_count -= 1
+        except shutil.Error as err:
+            if str(err).endswith('already exists'):
+                logger.warning(f'Destination file already exists. Deleting {conf}.')
+                try:
+                    os.unlink(conf)
+                    file_count -= 1
+                except OSError as del_err:
+                    logger.warning(f'Could not remove {conf}: {del_err}.')
+            else:
+                logger.warning(f'Could not move {conf} from {deprecated_location} to {ctx.sysctl_dir}: {err}')
+
+    # Log successful migration
+    if file_count == 0:
+        logger.info(f'Successfully migrated sysctl config to {ctx.sysctl_dir}.')
+        return
+
+    # Log partially successful / unsuccessful migration
+    files_processed: int = len(deprecated_confs)
+    if file_count < files_processed:
+        status: str = f'partially successful (failed {file_count}/{files_processed})'
+    elif file_count == files_processed:
+        status = 'unsuccessful'
+    logger.warning(f'Migration of sysctl configuration {status}. You may want to perform a migration manually.')
+
+
 def install_base_units(ctx, fsid):
     # type: (CephadmContext, str) -> None
     """
@@ -3389,7 +3685,7 @@ LimitNOFILE=1048576
 LimitNPROC=1048576
 EnvironmentFile=-/etc/environment
 ExecStart=/bin/bash {data_dir}/{fsid}/%i/unit.run
-ExecStop=-/bin/bash -c '{container_path} stop ceph-{fsid}-%i ; bash {data_dir}/{fsid}/%i/unit.stop'
+ExecStop=-/bin/bash -c 'bash {data_dir}/{fsid}/%i/unit.stop'
 ExecStopPost=-/bin/bash {data_dir}/{fsid}/%i/unit.poststop
 KillMode=none
 Restart=on-failure
@@ -3401,8 +3697,7 @@ StartLimitBurst=5
 {extra_args}
 [Install]
 WantedBy=ceph-{fsid}.target
-""".format(container_path=ctx.container_engine.path,
-           fsid=fsid,
+""".format(fsid=fsid,
            data_dir=ctx.data_dir,
            extra_args=extra_args,
            # if docker, we depend on docker.service
@@ -4169,11 +4464,16 @@ def command_version(ctx):
 ##################################
 
 
-@infer_image
+@default_image
 def command_pull(ctx):
     # type: (CephadmContext) -> int
 
-    _pull_image(ctx, ctx.image, ctx.insecure)
+    try:
+        _pull_image(ctx, ctx.image, ctx.insecure)
+    except UnauthorizedRegistryError:
+        err_str = 'Failed to pull container image. Check that host(s) are logged into the registry'
+        logger.debug(f'Pulling image for `command_pull` failed: {err_str}')
+        raise Error(err_str)
     return command_inspect_image(ctx)
 
 
@@ -4201,6 +4501,9 @@ def _pull_image(ctx, image, insecure=False):
         if not ret:
             return
 
+        if 'unauthorized' in err:
+            raise UnauthorizedRegistryError()
+
         if not any(pattern in err for pattern in ignorelist):
             raise Error('Failed command: %s' % cmd_str)
 
@@ -4285,6 +4588,7 @@ def check_subnet(subnets: str) -> Tuple[int, List[int], str]:
     subnet_list = subnets.split(',')
     for subnet in subnet_list:
         # ensure the format of the string is as expected address/netmask
+        subnet = subnet.strip()
         if not re.search(r'\/\d+$', subnet):
             rc = 1
             errors.append(f'{subnet} is not in CIDR format (address/netmask)')
@@ -4331,93 +4635,190 @@ def is_ipv6(address):
         return False
 
 
-def prepare_mon_addresses(
-    ctx: CephadmContext
-) -> Tuple[str, bool, Optional[str]]:
+def ip_in_subnets(ip_addr: str, subnets: str) -> bool:
+    """Determine if the ip_addr belongs to any of the subnets list."""
+    subnet_list = [x.strip() for x in subnets.split(',')]
+    for subnet in subnet_list:
+        ip_address = unwrap_ipv6(ip_addr) if is_ipv6(ip_addr) else ip_addr
+        if ipaddress.ip_address(ip_address) in ipaddress.ip_network(subnet):
+            return True
+    return False
+
+
+def parse_mon_addrv(addrv_arg: str) -> List[EndPoint]:
+    """Parse mon-addrv param into a list of mon end points."""
     r = re.compile(r':(\d+)$')
-    base_ip = ''
+    addrv_args = []
+    addr_arg = addrv_arg
+    if addr_arg[0] != '[' or addr_arg[-1] != ']':
+        raise Error(f'--mon-addrv value {addr_arg} must use square backets')
+
+    for addr in addr_arg[1: -1].split(','):
+        hasport = r.findall(addr)
+        if not hasport:
+            raise Error(f'--mon-addrv value {addr_arg} must include port number')
+        port_str = hasport[0]
+        addr = re.sub(r'^v\d+:', '', addr)  # strip off v1: or v2: prefix
+        base_ip = addr[0:-(len(port_str)) - 1]
+        addrv_args.append(EndPoint(base_ip, int(port_str)))
+
+    return addrv_args
+
+
+def parse_mon_ip(mon_ip: str) -> List[EndPoint]:
+    """Parse mon-ip param into a list of mon end points."""
+    r = re.compile(r':(\d+)$')
+    addrv_args = []
+    hasport = r.findall(mon_ip)
+    if hasport:
+        port_str = hasport[0]
+        base_ip = mon_ip[0:-(len(port_str)) - 1]
+        addrv_args.append(EndPoint(base_ip, int(port_str)))
+    else:
+        # No port provided: use fixed ports for ceph monitor
+        addrv_args.append(EndPoint(mon_ip, 3300))
+        addrv_args.append(EndPoint(mon_ip, 6789))
+
+    return addrv_args
+
+
+def build_addrv_params(addrv: List[EndPoint]) -> str:
+    """Convert mon end-points (ip:port) into the format: [v[1|2]:ip:port1]"""
+    if len(addrv) > 2:
+        raise Error('Detected a local mon-addrv list with more than 2 entries.')
+    port_to_ver: Dict[int, str] = {6789: 'v1', 3300: 'v2'}
+    addr_arg_list: List[str] = []
+    for ep in addrv:
+        if ep.port in port_to_ver:
+            ver = port_to_ver[ep.port]
+        else:
+            ver = 'v2'  # default mon protocol version if port is not provided
+            logger.warning(f'Using msgr2 protocol for unrecognized port {ep}')
+        addr_arg_list.append(f'{ver}:{ep.ip}:{ep.port}')
+
+    addr_arg = '[{0}]'.format(','.join(addr_arg_list))
+    return addr_arg
+
+
+def get_public_net_from_cfg(ctx: CephadmContext) -> Optional[str]:
+    """Get mon public network from configuration file."""
+    cp = read_config(ctx.config)
+    if not cp.has_option('global', 'public_network'):
+        return None
+
+    # Ensure all public CIDR networks are valid
+    public_network = cp.get('global', 'public_network')
+    rc, _, err_msg = check_subnet(public_network)
+    if rc:
+        raise Error(f'Invalid public_network {public_network} parameter: {err_msg}')
+
+    # Ensure all public CIDR networks are configured locally
+    configured_subnets = set([x.strip() for x in public_network.split(',')])
+    local_subnets = set([x[0] for x in list_networks(ctx).items()])
+    valid_public_net = False
+    for net in configured_subnets:
+        if net in local_subnets:
+            valid_public_net = True
+        else:
+            logger.warning(f'The public CIDR network {net} (from -c conf file) is not configured locally.')
+    if not valid_public_net:
+        raise Error(f'None of the public CIDR network(s) {configured_subnets} (from -c conf file) is configured locally.')
+
+    # Ensure public_network is compatible with the provided mon-ip (or mon-addrv)
+    if ctx.mon_ip:
+        if not ip_in_subnets(ctx.mon_ip, public_network):
+            raise Error(f'The provided --mon-ip {ctx.mon_ip} does not belong to any public_network(s) {public_network}')
+    elif ctx.mon_addrv:
+        addrv_args = parse_mon_addrv(ctx.mon_addrv)
+        for addrv in addrv_args:
+            if not ip_in_subnets(addrv.ip, public_network):
+                raise Error(f'The provided --mon-addrv {addrv.ip} ip does not belong to any public_network(s) {public_network}')
+
+    logger.debug(f'Using mon public network from configuration file {public_network}')
+    return public_network
+
+
+def infer_mon_network(ctx: CephadmContext, mon_eps: List[EndPoint]) -> Optional[str]:
+    """Infer mon public network from local network."""
+    # Make sure IP is configured locally, and then figure out the CIDR network
+    mon_networks = []
+    for net, ifaces in list_networks(ctx).items():
+        # build local_ips list for the specified network
+        local_ips: List[str] = []
+        for _, ls in ifaces.items():
+            local_ips.extend([ipaddress.ip_address(ip) for ip in ls])
+
+        # check if any of mon ips belong to this net
+        for mon_ep in mon_eps:
+            try:
+                if ipaddress.ip_address(unwrap_ipv6(mon_ep.ip)) in local_ips:
+                    mon_networks.append(net)
+                    logger.info(f'Mon IP `{mon_ep.ip}` is in CIDR network `{net}`')
+            except ValueError as e:
+                logger.warning(f'Cannot infer CIDR network for mon IP `{mon_ep.ip}` : {e}')
+
+    if not mon_networks:
+        raise Error('Cannot infer CIDR network. Pass --skip-mon-network to configure it later')
+    else:
+        logger.debug(f'Inferred mon public CIDR from local network configuration {mon_networks}')
+
+    mon_networks = list(set(mon_networks))  # remove duplicates
+    return ','.join(mon_networks)
+
+
+def prepare_mon_addresses(ctx: CephadmContext) -> Tuple[str, bool, Optional[str]]:
+    """Get mon public network configuration."""
     ipv6 = False
+    addrv_args: List[EndPoint] = []
+    mon_addrv: str = ''  # i.e: [v2:192.168.100.1:3300,v1:192.168.100.1:6789]
 
     if ctx.mon_ip:
         ipv6 = is_ipv6(ctx.mon_ip)
         if ipv6:
             ctx.mon_ip = wrap_ipv6(ctx.mon_ip)
-        hasport = r.findall(ctx.mon_ip)
-        if hasport:
-            port_str = hasport[0]
-            port = int(port_str)
-            if port == 6789:
-                addr_arg = '[v1:%s]' % ctx.mon_ip
-            elif port == 3300:
-                addr_arg = '[v2:%s]' % ctx.mon_ip
-            else:
-                logger.warning('Using msgr2 protocol for unrecognized port %d' %
-                               port)
-                addr_arg = '[v2:%s]' % ctx.mon_ip
-            base_ip = ctx.mon_ip[0:-(len(port_str)) - 1]
-            check_ip_port(ctx, base_ip, port)
-        else:
-            base_ip = ctx.mon_ip
-            addr_arg = '[v2:%s:3300,v1:%s:6789]' % (ctx.mon_ip, ctx.mon_ip)
-            check_ip_port(ctx, ctx.mon_ip, 3300)
-            check_ip_port(ctx, ctx.mon_ip, 6789)
+        addrv_args = parse_mon_ip(ctx.mon_ip)
+        mon_addrv = build_addrv_params(addrv_args)
     elif ctx.mon_addrv:
-        addr_arg = ctx.mon_addrv
-        if addr_arg[0] != '[' or addr_arg[-1] != ']':
-            raise Error('--mon-addrv value %s must use square backets' %
-                        addr_arg)
-        ipv6 = addr_arg.count('[') > 1
-        for addr in addr_arg[1: -1].split(','):
-            hasport = r.findall(addr)
-            if not hasport:
-                raise Error('--mon-addrv value %s must include port number' %
-                            addr_arg)
-            port_str = hasport[0]
-            port = int(port_str)
-            # strip off v1: or v2: prefix
-            addr = re.sub(r'^v\d+:', '', addr)
-            base_ip = addr[0:-(len(port_str)) - 1]
-            check_ip_port(ctx, base_ip, port)
+        ipv6 = ctx.mon_addrv.count('[') > 1
+        addrv_args = parse_mon_addrv(ctx.mon_addrv)
+        mon_addrv = ctx.mon_addrv
     else:
         raise Error('must specify --mon-ip or --mon-addrv')
-    logger.debug('Base mon IP is %s, final addrv is %s' % (base_ip, addr_arg))
 
+    if addrv_args:
+        for end_point in addrv_args:
+            check_ip_port(ctx, end_point)
+
+    logger.debug(f'Base mon IP(s) is {addrv_args}, mon addrv is {mon_addrv}')
     mon_network = None
     if not ctx.skip_mon_network:
-        # make sure IP is configured locally, and then figure out the
-        # CIDR network
-        errmsg = f'Cannot infer CIDR network for mon IP `{base_ip}`'
-        for net, ifaces in list_networks(ctx).items():
-            ips: List[str] = []
-            for iface, ls in ifaces.items():
-                ips.extend(ls)
-            try:
-                if ipaddress.ip_address(unwrap_ipv6(base_ip)) in \
-                        [ipaddress.ip_address(ip) for ip in ips]:
-                    mon_network = net
-                    logger.info(f'Mon IP `{base_ip}` is in CIDR network `{mon_network}`')
-                    break
-            except ValueError as e:
-                logger.warning(f'{errmsg}: {e}')
-        if not mon_network:
-            raise Error(f'{errmsg}: pass --skip-mon-network to configure it later')
+        mon_network = get_public_net_from_cfg(ctx) or infer_mon_network(ctx, addrv_args)
 
-    return (addr_arg, ipv6, mon_network)
+    return (mon_addrv, ipv6, mon_network)
 
 
 def prepare_cluster_network(ctx: CephadmContext) -> Tuple[str, bool]:
-    cluster_network = ''
-    ipv6_cluster_network = False
     # the cluster network may not exist on this node, so all we can do is
     # validate that the address given is valid ipv4 or ipv6 subnet
-    if ctx.cluster_network:
-        rc, versions, err_msg = check_subnet(ctx.cluster_network)
+    ipv6_cluster_network = False
+    cp = read_config(ctx.config)
+    cluster_network = ctx.cluster_network
+    if cluster_network is None and cp.has_option('global', 'cluster_network'):
+        cluster_network = cp.get('global', 'cluster_network')
+
+    if cluster_network:
+        cluser_nets = set([x.strip() for x in cluster_network.split(',')])
+        local_subnets = set([x[0] for x in list_networks(ctx).items()])
+        for net in cluser_nets:
+            if net not in local_subnets:
+                logger.warning(f'The cluster CIDR network {net} is not configured locally.')
+
+        rc, versions, err_msg = check_subnet(cluster_network)
         if rc:
             raise Error(f'Invalid --cluster-network parameter: {err_msg}')
-        cluster_network = ctx.cluster_network
         ipv6_cluster_network = True if 6 in versions else False
     else:
-        logger.info('- internal network (--cluster-network) has not '
+        logger.info('Internal network (--cluster-network) has not '
                     'been provided, OSD replication will default to '
                     'the public_network')
 
@@ -4637,44 +5038,16 @@ def prepare_ssh(
         }
         cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
         cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts)
+        ssh_pub = cli(['cephadm', 'get-pub-key'])
     else:
         logger.info('Generating ssh key...')
         cli(['cephadm', 'generate-key'])
         ssh_pub = cli(['cephadm', 'get-pub-key'])
-
         with open(ctx.output_pub_ssh_key, 'w') as f:
             f.write(ssh_pub)
         logger.info('Wrote public SSH key to %s' % ctx.output_pub_ssh_key)
 
-        logger.info('Adding key to %s@localhost authorized_keys...' % ctx.ssh_user)
-        try:
-            s_pwd = pwd.getpwnam(ctx.ssh_user)
-        except KeyError:
-            raise Error('Cannot find uid/gid for ssh-user: %s' % (ctx.ssh_user))
-        ssh_uid = s_pwd.pw_uid
-        ssh_gid = s_pwd.pw_gid
-        ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
-
-        if not os.path.exists(ssh_dir):
-            makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
-
-        auth_keys_file = '%s/authorized_keys' % ssh_dir
-        add_newline = False
-
-        if os.path.exists(auth_keys_file):
-            with open(auth_keys_file, 'r') as f:
-                f.seek(0, os.SEEK_END)
-                if f.tell() > 0:
-                    f.seek(f.tell() - 1, os.SEEK_SET)  # go to last char
-                    if f.read() != '\n':
-                        add_newline = True
-
-        with open(auth_keys_file, 'a') as f:
-            os.fchown(f.fileno(), ssh_uid, ssh_gid)  # just in case we created it
-            os.fchmod(f.fileno(), 0o600)  # just in case we created it
-            if add_newline:
-                f.write('\n')
-            f.write(ssh_pub.strip() + '\n')
+    authorize_ssh_key(ssh_pub, ctx.ssh_user)
 
     host = get_hostname()
     logger.info('Adding host %s...' % host)
@@ -4682,6 +5055,9 @@ def prepare_ssh(
         args = ['orch', 'host', 'add', host]
         if ctx.mon_ip:
             args.append(unwrap_ipv6(ctx.mon_ip))
+        elif ctx.mon_addrv:
+            addrv_args = parse_mon_addrv(ctx.mon_addrv)
+            args.append(unwrap_ipv6(addrv_args[0].ip))
         cli(args)
     except RuntimeError as e:
         raise Error('Failed to add host <%s>: %s' % (host, e))
@@ -4755,9 +5131,10 @@ def prepare_dashboard(
     port = int(out)
 
     # Open dashboard port
-    fw = Firewalld(ctx)
-    fw.open_ports([port])
-    fw.apply_rules()
+    if not ('skip_firewalld' in ctx and ctx.skip_firewalld):
+        fw = Firewalld(ctx)
+        fw.open_ports([port])
+        fw.apply_rules()
 
     logger.info('Ceph Dashboard is now available at:\n\n'
                 '\t     URL: https://%s:%s/\n'
@@ -4863,6 +5240,9 @@ def finish_bootstrap_config(
             'restart',
             get_unit_name(fsid, 'mon', mon_id)
         ])
+    elif 'image' in ctx and ctx.image:
+        # we still want to assimilate the given container image if provided
+        cli(['config', 'set', 'global', 'container_image', f'{ctx.image}'])
 
     if mon_network:
         logger.info(f'Setting mon public_network to {mon_network}')
@@ -4887,7 +5267,7 @@ def _parse_yaml_docs(f: Iterable[str]) -> List[List[str]]:
     docs = []
     current_doc = []  # type: List[str]
     for line in f:
-        if '---' in line:
+        if re.search(r'^---\s+', line):
             if current_doc:
                 docs.append(current_doc)
             current_doc = []
@@ -4923,7 +5303,7 @@ def parse_yaml_objs(f: Iterable[str]) -> List[Dict[str, str]]:
 
 def _distribute_ssh_keys(ctx: CephadmContext, host_spec: Dict[str, str], bootstrap_hostname: str) -> int:
     # copy ssh key to hosts in host spec (used for apply spec)
-    ssh_key = '/etc/ceph/ceph.pub'
+    ssh_key = CEPH_DEFAULT_PUBKEY
     if ctx.ssh_public_key:
         ssh_key = ctx.ssh_public_key.name
 
@@ -4941,17 +5321,45 @@ def _distribute_ssh_keys(ctx: CephadmContext, host_spec: Dict[str, str], bootstr
     return 0
 
 
+def save_cluster_config(ctx: CephadmContext, uid: int, gid: int, fsid: str) -> None:
+    """Save cluster configuration to the per fsid directory """
+    def copy_file(src: str, dst: str) -> None:
+        if src:
+            shutil.copyfile(src, dst)
+
+    conf_dir = f'{ctx.data_dir}/{fsid}/{CEPH_CONF_DIR}'
+    makedirs(conf_dir, uid, gid, DATA_DIR_MODE)
+    if os.path.exists(conf_dir):
+        logger.info(f'Saving cluster configuration to {conf_dir} directory')
+        copy_file(ctx.output_config, os.path.join(conf_dir, CEPH_CONF))
+        copy_file(ctx.output_keyring, os.path.join(conf_dir, CEPH_KEYRING))
+        # ctx.output_pub_ssh_key may not exist if user has provided custom ssh keys
+        if (os.path.exists(ctx.output_pub_ssh_key)):
+            copy_file(ctx.output_pub_ssh_key, os.path.join(conf_dir, CEPH_PUBKEY))
+    else:
+        logger.warning(f'Cannot create cluster configuration directory {conf_dir}')
+
+
 @default_image
 def command_bootstrap(ctx):
     # type: (CephadmContext) -> int
 
     if not ctx.output_config:
-        ctx.output_config = os.path.join(ctx.output_dir, 'ceph.conf')
+        ctx.output_config = os.path.join(ctx.output_dir, CEPH_CONF)
     if not ctx.output_keyring:
-        ctx.output_keyring = os.path.join(ctx.output_dir,
-                                          'ceph.client.admin.keyring')
+        ctx.output_keyring = os.path.join(ctx.output_dir, CEPH_KEYRING)
     if not ctx.output_pub_ssh_key:
-        ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, 'ceph.pub')
+        ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, CEPH_PUBKEY)
+
+    if bool(ctx.ssh_private_key) is not bool(ctx.ssh_public_key):
+        raise Error('--ssh-private-key and --ssh-public-key must be provided together or not at all.')
+
+    if ctx.fsid:
+        data_dir_base = os.path.join(ctx.data_dir, ctx.fsid)
+        if os.path.exists(data_dir_base):
+            raise Error(f"A cluster with the same fsid '{ctx.fsid}' already exists.")
+        else:
+            logger.warning('Specifying an fsid for your cluster offers no advantages and may increase the likelihood of fsid conflicts.')
 
     # verify output files
     for f in [ctx.output_config, ctx.output_keyring,
@@ -4972,6 +5380,9 @@ def command_bootstrap(ctx):
 
     (user_conf, _) = get_config_and_keyring(ctx)
 
+    if ctx.ssh_user != 'root':
+        check_ssh_connectivity(ctx)
+
     if not ctx.skip_prepare_host:
         command_prepare_host(ctx)
     else:
@@ -4998,7 +5409,12 @@ def command_bootstrap(ctx):
     config = prepare_bootstrap_config(ctx, fsid, addr_arg, ctx.image)
 
     if not ctx.skip_pull:
-        _pull_image(ctx, ctx.image)
+        try:
+            _pull_image(ctx, ctx.image)
+        except UnauthorizedRegistryError:
+            err_str = 'Failed to pull container image. Check that correct registry credentials are provided in bootstrap by --registry-url, --registry-username, --registry-password, or supply --registry-json with credentials'
+            logger.debug(f'Pulling image for bootstrap on {hostname} failed: {err_str}')
+            raise Error(err_str)
 
     image_ver = CephContainer(ctx, ctx.image, 'ceph', ['--version']).run().strip()
     logger.info(f'Ceph version: {image_ver}')
@@ -5121,7 +5537,7 @@ def command_bootstrap(ctx):
     if not ctx.skip_dashboard:
         prepare_dashboard(ctx, uid, gid, cli, wait_for_mgr_restart)
 
-    if ctx.output_config == '/etc/ceph/ceph.conf' and not ctx.skip_admin_label:
+    if ctx.output_config == CEPH_DEFAULT_CONF and not ctx.skip_admin_label and not ctx.no_minimize_config:
         logger.info('Enabling client.admin keyring and conf on hosts with "admin" label')
         try:
             cli(['orch', 'client-keyring', 'set', 'client.admin', 'label:_admin'])
@@ -5148,6 +5564,8 @@ def command_bootstrap(ctx):
         except Exception:
             logger.info('\nApplying %s to cluster failed!\n' % ctx.apply_spec)
 
+    save_cluster_config(ctx, uid, gid, fsid)
+
     # enable autotune for osd_memory_target
     logger.info('Enabling autotune for osd_memory_target')
     cli(['config', 'set', 'osd', 'osd_memory_target_autotune', 'true'])
@@ -5155,12 +5573,15 @@ def command_bootstrap(ctx):
     # Notify the Dashboard to show the 'Expand cluster' page on first log in.
     cli(['config-key', 'set', 'mgr/dashboard/cluster/status', 'INSTALLED'])
 
-    logger.info('You can access the Ceph CLI with:\n\n'
+    logger.info('You can access the Ceph CLI as following in case of multi-cluster or non-default config:\n\n'
                 '\tsudo %s shell --fsid %s -c %s -k %s\n' % (
                     sys.argv[0],
                     fsid,
                     ctx.output_config,
                     ctx.output_keyring))
+
+    logger.info('Or, if you are only running a single cluster on this host:\n\n\tsudo %s shell \n' % (sys.argv[0]))
+
     logger.info('Please consider enabling telemetry to help improve Ceph:\n\n'
                 '\tceph telemetry on\n\n'
                 'For more information see:\n\n'
@@ -5224,6 +5645,10 @@ def extract_uid_gid_monitoring(ctx, daemon_type):
         uid, gid = 65534, 65534
     elif daemon_type == 'grafana':
         uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana')
+    elif daemon_type == 'loki':
+        uid, gid = extract_uid_gid(ctx, file_path='/etc/loki')
+    elif daemon_type == 'promtail':
+        uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail')
     elif daemon_type == 'alertmanager':
         uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus'])
     else:
@@ -5268,6 +5693,9 @@ def command_deploy(ctx):
     else:
         logger.info('%s daemon %s ...' % ('Deploy', ctx.name))
 
+    # Migrate sysctl conf files from /usr/lib to /etc
+    migrate_sysctl_dir(ctx, ctx.fsid)
+
     # Get and check ports explicitly required to be opened
     daemon_ports = []  # type: List[int]
 
@@ -5421,11 +5849,16 @@ def command_shell(ctx):
     if daemon_id and not ctx.fsid:
         raise Error('must pass --fsid to specify cluster')
 
-    # use /etc/ceph files by default, if present.  we do this instead of
+    # in case a dedicated keyring for the specified fsid is found we us it.
+    # Otherwise, use /etc/ceph files by default, if present.  We do this instead of
     # making these defaults in the arg parser because we don't want an error
     # if they don't exist.
-    if not ctx.keyring and os.path.exists(SHELL_DEFAULT_KEYRING):
-        ctx.keyring = SHELL_DEFAULT_KEYRING
+    if not ctx.keyring:
+        keyring_file = f'{ctx.data_dir}/{ctx.fsid}/{CEPH_CONF_DIR}/{CEPH_KEYRING}'
+        if os.path.exists(keyring_file):
+            ctx.keyring = keyring_file
+        elif os.path.exists(CEPH_DEFAULT_KEYRING):
+            ctx.keyring = CEPH_DEFAULT_KEYRING
 
     container_args: List[str] = ['-i']
     mounts = get_container_mounts(ctx, ctx.fsid, daemon_type, daemon_id,
@@ -5568,19 +6001,19 @@ def command_ceph_volume(ctx):
 
 @infer_fsid
 def command_unit(ctx):
-    # type: (CephadmContext) -> None
+    # type: (CephadmContext) -> int
     if not ctx.fsid:
         raise Error('must pass --fsid to specify cluster')
 
     unit_name = get_unit_name_by_daemon_name(ctx, ctx.fsid, ctx.name)
 
-    call_throws(ctx, [
-        'systemctl',
-        ctx.command,
-        unit_name],
+    _, _, code = call(
+        ctx,
+        ['systemctl', ctx.command, unit_name],
         verbosity=CallVerbosity.VERBOSE,
         desc=''
     )
+    return code
 
 ##################################
 
@@ -5630,12 +6063,14 @@ def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
 
 def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
     r = {}  # type: Dict[str, Dict[str, Set[str]]]
-    p = re.compile(r'^(\S+) dev (\S+) (.*)scope link (.*)src (\S+)')
+    p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)')
     for line in out.splitlines():
         m = p.findall(line)
         if not m:
             continue
         net = m[0][0]
+        if '/' not in net:  # aggregate /32 mask for single host sub-networks
+            net += '/32'
         iface = m[0][1]
         ip = m[0][4]
         if net not in r:
@@ -5665,9 +6100,11 @@ def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]:
         if not m or m[0][0].lower() == 'default':
             continue
         net = m[0][0]
-        if '/' not in net:  # only consider networks with a mask
-            continue
+        if '/' not in net:  # aggregate /128 mask for single host sub-networks
+            net += '/128'
         iface = m[0][1]
+        if iface == 'lo':  # skip loopback devices
+            continue
         if net not in r:
             r[net] = {}
         if iface not in r[net]:
@@ -5749,8 +6186,9 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
     # keep track of image digests
     seen_digests = {}   # type: Dict[str, List[str]]
 
-    # keep track of memory usage we've seen
+    # keep track of memory and cpu usage we've seen
     seen_memusage = {}  # type: Dict[str, int]
+    seen_cpuperc = {}  # type: Dict[str, str]
     out, err, code = call(
         ctx,
         [container_path, 'stats', '--format', '{{.ID}},{{.MemUsage}}', '--no-stream'],
@@ -5758,6 +6196,13 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
     )
     seen_memusage_cid_len, seen_memusage = _parse_mem_usage(code, out)
 
+    out, err, code = call(
+        ctx,
+        [container_path, 'stats', '--format', '{{.ID}},{{.CPUPerc}}', '--no-stream'],
+        verbosity=CallVerbosity.DEBUG
+    )
+    seen_cpuperc_cid_len, seen_cpuperc = _parse_cpu_perc(code, out)
+
     # /var/lib/ceph
     if os.path.exists(data_dir):
         for i in os.listdir(data_dir):
@@ -5869,7 +6314,9 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                                         seen_versions[image_id] = version
                                 elif daemon_type in ['prometheus',
                                                      'alertmanager',
-                                                     'node-exporter']:
+                                                     'node-exporter',
+                                                     'loki',
+                                                     'promtail']:
                                     version = Monitoring.get_version(ctx, container_id, daemon_type)
                                     seen_versions[image_id] = version
                                 elif daemon_type == 'haproxy':
@@ -5925,6 +6372,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                         val['container_image_digests'] = image_digests
                         if container_id:
                             val['memory_usage'] = seen_memusage.get(container_id[0:seen_memusage_cid_len])
+                            val['cpu_percentage'] = seen_cpuperc.get(container_id[0:seen_cpuperc_cid_len])
                         val['version'] = version
                         val['started'] = start_stamp
                         val['created'] = get_file_timestamp(
@@ -5934,7 +6382,6 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
                             os.path.join(data_dir, fsid, j, 'unit.image'))
                         val['configured'] = get_file_timestamp(
                             os.path.join(data_dir, fsid, j, 'unit.configured'))
-
                     ls.append(val)
 
     return ls
@@ -5958,6 +6405,22 @@ def _parse_mem_usage(code: int, out: str) -> Tuple[int, Dict[str, int]]:
     return seen_memusage_cid_len, seen_memusage
 
 
+def _parse_cpu_perc(code: int, out: str) -> Tuple[int, Dict[str, str]]:
+    seen_cpuperc = {}
+    seen_cpuperc_cid_len = 0
+    if not code:
+        for line in out.splitlines():
+            (cid, cpuperc) = line.split(',')
+            try:
+                seen_cpuperc[cid] = cpuperc
+                if not seen_cpuperc_cid_len:
+                    seen_cpuperc_cid_len = len(cid)
+            except ValueError:
+                logger.info('unable to parse cpu percentage line\n>{}'.format(line))
+                pass
+    return seen_cpuperc_cid_len, seen_cpuperc
+
+
 def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None):
     # type: (CephadmContext, str, str, bool, Optional[str]) -> Dict[str, str]
 
@@ -5992,7 +6455,12 @@ def command_adopt(ctx):
     # type: (CephadmContext) -> None
 
     if not ctx.skip_pull:
-        _pull_image(ctx, ctx.image)
+        try:
+            _pull_image(ctx, ctx.image)
+        except UnauthorizedRegistryError:
+            err_str = 'Failed to pull container image. Host may not be logged into container registry. Try `cephadm registry-login --registry-url <url> --registry-username <username> --registry-password <password>` or supply login info via a json file with `cephadm registry-login --registry-json <file>`'
+            logger.debug(f'Pulling image for `command_adopt` failed: {err_str}')
+            raise Error(err_str)
 
     (daemon_type, daemon_id) = ctx.name.split('.', 1)
 
@@ -6407,6 +6875,18 @@ def command_rm_daemon(ctx):
     else:
         call_throws(ctx, ['rm', '-rf', data_dir])
 
+    if 'tcp_ports' in ctx and ctx.tcp_ports is not None:
+        ports: List[int] = [int(p) for p in ctx.tcp_ports.split()]
+        try:
+            fw = Firewalld(ctx)
+            fw.close_ports(ports)
+            fw.apply_rules()
+        except RuntimeError as e:
+            # in case we cannot close the ports we will remove
+            # the daemon but keep them open.
+            logger.warning(f' Error when trying to close ports: {e}')
+
+
 ##################################
 
 
@@ -6462,6 +6942,10 @@ def command_zap_osds(ctx: CephadmContext) -> None:
 ##################################
 
 
+def get_ceph_cluster_count(ctx: CephadmContext) -> int:
+    return len([c for c in os.listdir(ctx.data_dir) if is_fsid(c)])
+
+
 def command_rm_cluster(ctx):
     # type: (CephadmContext) -> None
     if not ctx.force:
@@ -6471,13 +6955,7 @@ def command_rm_cluster(ctx):
     lock = FileLock(ctx, ctx.fsid)
     lock.acquire()
 
-    # stop + disable individual daemon units
-    for d in list_daemons(ctx, detail=False):
-        if d['fsid'] != ctx.fsid:
-            continue
-        if d['style'] != 'cephadm:v1':
-            continue
-        unit_name = get_unit_name(ctx.fsid, d['name'])
+    def disable_systemd_service(unit_name: str) -> None:
         call(ctx, ['systemctl', 'stop', unit_name],
              verbosity=CallVerbosity.DEBUG)
         call(ctx, ['systemctl', 'reset-failed', unit_name],
@@ -6485,14 +6963,17 @@ def command_rm_cluster(ctx):
         call(ctx, ['systemctl', 'disable', unit_name],
              verbosity=CallVerbosity.DEBUG)
 
+    # stop + disable individual daemon units
+    for d in list_daemons(ctx, detail=False):
+        if d['fsid'] != ctx.fsid:
+            continue
+        if d['style'] != 'cephadm:v1':
+            continue
+        disable_systemd_service(get_unit_name(ctx.fsid, d['name']))
+
     # cluster units
     for unit_name in ['ceph-%s.target' % ctx.fsid]:
-        call(ctx, ['systemctl', 'stop', unit_name],
-             verbosity=CallVerbosity.DEBUG)
-        call(ctx, ['systemctl', 'reset-failed', unit_name],
-             verbosity=CallVerbosity.DEBUG)
-        call(ctx, ['systemctl', 'disable', unit_name],
-             verbosity=CallVerbosity.DEBUG)
+        disable_systemd_service(unit_name)
 
     slice_name = 'system-ceph\\x2d{}.slice'.format(ctx.fsid.replace('-', '\\x2d'))
     call(ctx, ['systemctl', 'stop', slice_name],
@@ -6521,29 +7002,47 @@ def command_rm_cluster(ctx):
     # rm logrotate config
     call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/ceph-%s' % ctx.fsid])
 
-    # rm cephadm logrotate config if last cluster on host
-    if not os.listdir(ctx.data_dir):
+    # if last cluster on host remove shared files
+    if get_ceph_cluster_count(ctx) == 0:
+        disable_systemd_service('ceph.target')
+
+        # rm shared ceph target files
+        call_throws(ctx, ['rm', '-f', ctx.unit_dir + '/multi-user.target.wants/ceph.target'])
+        call_throws(ctx, ['rm', '-f', ctx.unit_dir + '/ceph.target'])
+
+        # rm cephadm logrotate config
         call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/cephadm'])
 
+        if not ctx.keep_logs:
+            # remove all cephadm logs
+            for fname in glob(f'{ctx.log_dir}/cephadm.log*'):
+                os.remove(fname)
+
     # rm sysctl settings
-    sysctl_dir = Path(ctx.sysctl_dir)
-    for p in sysctl_dir.glob(f'90-ceph-{ctx.fsid}-*.conf'):
-        p.unlink()
+    sysctl_dirs: List[Path] = [Path(ctx.sysctl_dir), Path('/usr/lib/sysctl.d')]
 
-    # clean up config, keyring, and pub key files
-    files = ['/etc/ceph/ceph.conf', '/etc/ceph/ceph.pub', '/etc/ceph/ceph.client.admin.keyring']
+    for sysctl_dir in sysctl_dirs:
+        for p in sysctl_dir.glob(f'90-ceph-{ctx.fsid}-*.conf'):
+            p.unlink()
 
+    # cleanup remaining ceph directories
+    ceph_dirs = [f'/run/ceph/{ctx.fsid}', f'/tmp/var/lib/ceph/{ctx.fsid}', f'/var/run/ceph/{ctx.fsid}']
+    for dd in ceph_dirs:
+        shutil.rmtree(dd, ignore_errors=True)
+
+    # clean up config, keyring, and pub key files
+    files = [CEPH_DEFAULT_CONF, CEPH_DEFAULT_PUBKEY, CEPH_DEFAULT_KEYRING]
     if os.path.exists(files[0]):
         valid_fsid = False
         with open(files[0]) as f:
             if ctx.fsid in f.read():
                 valid_fsid = True
         if valid_fsid:
+            # rm configuration files on /etc/ceph
             for n in range(0, len(files)):
                 if os.path.exists(files[n]):
                     os.remove(files[n])
 
-
 ##################################
 
 
@@ -6601,6 +7100,142 @@ def command_check_host(ctx: CephadmContext) -> None:
 ##################################
 
 
+def get_ssh_vars(ssh_user: str) -> Tuple[int, int, str]:
+    try:
+        s_pwd = pwd.getpwnam(ssh_user)
+    except KeyError:
+        raise Error('Cannot find uid/gid for ssh-user: %s' % (ssh_user))
+
+    ssh_uid = s_pwd.pw_uid
+    ssh_gid = s_pwd.pw_gid
+    ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
+    return ssh_uid, ssh_gid, ssh_dir
+
+
+def authorize_ssh_key(ssh_pub_key: str, ssh_user: str) -> bool:
+    """Authorize the public key for the provided ssh user"""
+
+    def key_in_file(path: str, key: str) -> bool:
+        if not os.path.exists(path):
+            return False
+        with open(path) as f:
+            lines = f.readlines()
+            for line in lines:
+                if line.strip() == key.strip():
+                    return True
+        return False
+
+    logger.info(f'Adding key to {ssh_user}@localhost authorized_keys...')
+    if ssh_pub_key is None or ssh_pub_key.isspace():
+        raise Error('Trying to authorize an empty ssh key')
+
+    ssh_pub_key = ssh_pub_key.strip()
+    ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user)
+    if not os.path.exists(ssh_dir):
+        makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
+
+    auth_keys_file = '%s/authorized_keys' % ssh_dir
+    if key_in_file(auth_keys_file, ssh_pub_key):
+        logger.info(f'key already in {ssh_user}@localhost authorized_keys...')
+        return False
+
+    add_newline = False
+    if os.path.exists(auth_keys_file):
+        with open(auth_keys_file, 'r') as f:
+            f.seek(0, os.SEEK_END)
+            if f.tell() > 0:
+                f.seek(f.tell() - 1, os.SEEK_SET)  # go to last char
+                if f.read() != '\n':
+                    add_newline = True
+
+    with open(auth_keys_file, 'a') as f:
+        os.fchown(f.fileno(), ssh_uid, ssh_gid)  # just in case we created it
+        os.fchmod(f.fileno(), 0o600)  # just in case we created it
+        if add_newline:
+            f.write('\n')
+        f.write(ssh_pub_key + '\n')
+
+    return True
+
+
+def revoke_ssh_key(key: str, ssh_user: str) -> None:
+    """Revoke the public key authorization for the ssh user"""
+    ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user)
+    auth_keys_file = '%s/authorized_keys' % ssh_dir
+    deleted = False
+    if os.path.exists(auth_keys_file):
+        with open(auth_keys_file, 'r') as f:
+            lines = f.readlines()
+        _, filename = tempfile.mkstemp()
+        with open(filename, 'w') as f:
+            os.fchown(f.fileno(), ssh_uid, ssh_gid)
+            os.fchmod(f.fileno(), 0o600)  # secure access to the keys file
+            for line in lines:
+                if line.strip() == key.strip():
+                    deleted = True
+                else:
+                    f.write(line)
+
+    if deleted:
+        shutil.move(filename, auth_keys_file)
+    else:
+        logger.warning('Cannot find the ssh key to be deleted')
+
+
+def check_ssh_connectivity(ctx: CephadmContext) -> None:
+
+    def cmd_is_available(cmd: str) -> bool:
+        if shutil.which(cmd) is None:
+            logger.warning(f'Command not found: {cmd}')
+            return False
+        return True
+
+    if not cmd_is_available('ssh') or not cmd_is_available('ssh-keygen'):
+        logger.warning('Cannot check ssh connectivity. Skipping...')
+        return
+
+    logger.info('Verifying ssh connectivity ...')
+    if ctx.ssh_private_key and ctx.ssh_public_key:
+        # let's use the keys provided by the user
+        ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
+        ssh_pub_key_path = pathify(ctx.ssh_public_key.name)
+    else:
+        # no custom keys, let's generate some random keys just for this check
+        ssh_priv_key_path = f'/tmp/ssh_key_{uuid.uuid1()}'
+        ssh_pub_key_path = f'{ssh_priv_key_path}.pub'
+        ssh_key_gen_cmd = ['ssh-keygen', '-q', '-t', 'rsa', '-N', '', '-C', '', '-f', ssh_priv_key_path]
+        _, _, code = call(ctx, ssh_key_gen_cmd)
+        if code != 0:
+            logger.warning('Cannot generate keys to check ssh connectivity.')
+            return
+
+    with open(ssh_pub_key_path, 'r') as f:
+        key = f.read().strip()
+    new_key = authorize_ssh_key(key, ctx.ssh_user)
+    ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
+    _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
+                            *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
+                            '-o PasswordAuthentication=no',
+                            f'{ctx.ssh_user}@{get_hostname()}',
+                            'sudo echo'])
+
+    # we only remove the key if it's a new one. In case the user has provided
+    # some already existing key then we don't alter authorized_keys file
+    if new_key:
+        revoke_ssh_key(key, ctx.ssh_user)
+
+    pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
+    prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
+    ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
+    err_msg = f"""
+** Please verify your user's ssh configuration and make sure:
+- User {ctx.ssh_user} must have passwordless sudo access
+{pub_key_msg}{prv_key_msg}{ssh_cfg_msg}
+"""
+    if code != 0:
+        raise Error(err_msg)
+
+
 def command_prepare_host(ctx: CephadmContext) -> None:
     logger.info('Verifying podman|docker is present...')
     pkg = None
@@ -7518,6 +8153,16 @@ class HostFacts():
                 continue
             for iface in os.listdir(nic_path):
 
+                if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
+                    nic_type = 'bridge'
+                elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
+                    nic_type = 'bonding'
+                else:
+                    nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
+
+                if nic_type == 'loopback':  # skip loopback devices
+                    continue
+
                 lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))]
                 upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))]
 
@@ -7536,13 +8181,6 @@ class HostFacts():
                     # Either way, we show a -1 when speed isn't available
                     speed = -1
 
-                if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
-                    nic_type = 'bridge'
-                elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
-                    nic_type = 'bonding'
-                else:
-                    nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
-
                 dev_link = os.path.join(nic_path, iface, 'device')
                 if os.path.exists(dev_link):
                     iftype = 'physical'
@@ -7955,7 +8593,7 @@ def _get_parser():
     parser_version.set_defaults(func=command_version)
 
     parser_pull = subparsers.add_parser(
-        'pull', help='pull latest image version')
+        'pull', help='pull the default container image')
     parser_pull.set_defaults(func=command_pull)
     parser_pull.add_argument(
         '--insecure',
@@ -8012,7 +8650,7 @@ def _get_parser():
     parser_adopt.add_argument(
         '--skip-pull',
         action='store_true',
-        help='do not pull the latest image before adopting')
+        help='do not pull the default image before adopting')
     parser_adopt.add_argument(
         '--force-start',
         action='store_true',
@@ -8031,6 +8669,9 @@ def _get_parser():
         required=True,
         action=CustomValidation,
         help='daemon name (type.id)')
+    parser_rm_daemon.add_argument(
+        '--tcp-ports',
+        help='List of tcp ports to close in the host firewall')
     parser_rm_daemon.add_argument(
         '--fsid',
         required=True,
@@ -8207,10 +8848,11 @@ def _get_parser():
         '--mon-id',
         required=False,
         help='mon id (default: local hostname)')
-    parser_bootstrap.add_argument(
+    group = parser_bootstrap.add_mutually_exclusive_group()
+    group.add_argument(
         '--mon-addrv',
         help='mon IPs (e.g., [v2:localipaddr:3300,v1:localipaddr:6789])')
-    parser_bootstrap.add_argument(
+    group.add_argument(
         '--mon-ip',
         help='mon IP')
     parser_bootstrap.add_argument(
@@ -8301,7 +8943,7 @@ def _get_parser():
     parser_bootstrap.add_argument(
         '--skip-pull',
         action='store_true',
-        help='do not pull the latest image before bootstrapping')
+        help='do not pull the default image before bootstrapping')
     parser_bootstrap.add_argument(
         '--skip-firewalld',
         action='store_true',
@@ -8566,7 +9208,12 @@ def cephadm_init_logging(ctx: CephadmContext, args: List[str]) -> None:
     global logger
     if not os.path.exists(LOG_DIR):
         os.makedirs(LOG_DIR)
-    dictConfig(logging_config)
+    operations = ['bootstrap', 'rm-cluster']
+    if any(op in args for op in operations):
+        dictConfig(interactive_logging_config)
+    else:
+        dictConfig(logging_config)
+
     logger = logging.getLogger()
 
     if not os.path.exists(ctx.logrotate_dir + '/cephadm'):