]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cephadm/cephadm
import 15.2.5
[ceph.git] / ceph / src / cephadm / cephadm
index e27bc73a6365c81c831f1ac43ba074c5bb160318..d6f85f1ad8811a0ef2120cde2f6f8e4be8c1b79c 100755 (executable)
@@ -2,20 +2,20 @@
 
 DEFAULT_IMAGE='docker.io/ceph/ceph:v15'
 DEFAULT_IMAGE_IS_MASTER=False
-LATEST_STABLE_RELEASE='octopus'
-DATA_DIR='/var/lib/ceph'
-LOG_DIR='/var/log/ceph'
-LOCK_DIR='/run/cephadm'
-LOGROTATE_DIR='/etc/logrotate.d'
-UNIT_DIR='/etc/systemd/system'
-LOG_DIR_MODE=0o770
-DATA_DIR_MODE=0o700
+LATEST_STABLE_RELEASE = 'octopus'
+DATA_DIR = '/var/lib/ceph'
+LOG_DIR = '/var/log/ceph'
+LOCK_DIR = '/run/cephadm'
+LOGROTATE_DIR = '/etc/logrotate.d'
+UNIT_DIR = '/etc/systemd/system'
+LOG_DIR_MODE = 0o770
+DATA_DIR_MODE = 0o700
 CONTAINER_PREFERENCE = ['podman', 'docker']  # prefer podman to docker
-CUSTOM_PS1=r'[ceph: \u@\h \W]\$ '
-DEFAULT_TIMEOUT=None # in seconds
-DEFAULT_RETRY=10
-SHELL_DEFAULT_CONF='/etc/ceph/ceph.conf'
-SHELL_DEFAULT_KEYRING='/etc/ceph/ceph.client.admin.keyring'
+CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
+DEFAULT_TIMEOUT = None  # in seconds
+DEFAULT_RETRY = 10
+SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf'
+SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring'
 
 """
 You can invoke cephadm in two ways:
@@ -41,10 +41,12 @@ You can invoke cephadm in two ways:
 import argparse
 import datetime
 import fcntl
+import ipaddress
 import json
 import logging
 import os
 import platform
+import pwd
 import random
 import re
 import select
@@ -57,7 +59,7 @@ import tempfile
 import time
 import errno
 try:
-    from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable
+    from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
 except ImportError:
     pass
 import uuid
@@ -82,6 +84,9 @@ if sys.version_info >= (3, 0):
 else:
     from urllib2 import urlopen, HTTPError
 
+if sys.version_info > (3, 0):
+    unicode = str
+
 container_path = ''
 cached_stdin = None
 
@@ -93,20 +98,24 @@ class termcolor:
     red = '\033[31m'
     end = '\033[0m'
 
+
 class Error(Exception):
     pass
 
+
 class TimeoutExpired(Error):
     pass
 
 ##################################
 
+
 class Ceph(object):
     daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror',
                'crash')
 
 ##################################
 
+
 class Monitoring(object):
     """Define the configs for the monitoring containers"""
 
@@ -167,6 +176,7 @@ class Monitoring(object):
 
 ##################################
 
+
 class NFSGanesha(object):
     """Defines a NFS-Ganesha container"""
 
@@ -210,14 +220,6 @@ class NFSGanesha(object):
         # type: (str, Union[int, str]) -> NFSGanesha
         return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
 
-    @staticmethod
-    def port_in_use():
-        # type () -> None
-        for (srv, port) in NFSGanesha.port_map.items():
-            if port_in_use(port):
-                msg = 'TCP port {} required for {} is already in use'.format(port, srv)
-                raise Error(msg)
-
     @staticmethod
     def get_container_mounts(data_dir):
         # type: (str) -> Dict[str, str]
@@ -325,19 +327,20 @@ class NFSGanesha(object):
         volume_mounts = self.get_container_mounts(data_dir)
         envs = self.get_container_envs()
 
-        logger.info('Creating RADOS grace for action: %s' % (action))
+        logger.info('Creating RADOS grace for action: %s' % action)
         c = CephContainer(
             image=self.image,
             entrypoint=entrypoint,
             args=args,
             volume_mounts=volume_mounts,
-            cname=self.get_container_name(desc='grace-%s' % (action)),
+            cname=self.get_container_name(desc='grace-%s' % action),
             envs=envs
         )
         return c
 
 ##################################
 
+
 class CephIscsi(object):
     """Defines a Ceph-Iscsi container"""
 
@@ -384,6 +387,17 @@ class CephIscsi(object):
         mounts['/dev/log'] = '/dev/log:z'
         return mounts
 
+    @staticmethod
+    def get_container_binds():
+        # type: () -> List[List[str]]
+        binds = []
+        lib_modules = ['type=bind',
+                       'source=/lib/modules',
+                       'destination=/lib/modules',
+                       'ro=true']
+        binds.append(lib_modules)
+        return binds
+
     @staticmethod
     def get_version(container_id):
         # type: (str) -> Optional[str]
@@ -392,7 +406,7 @@ class CephIscsi(object):
             [container_path, 'exec', container_id,
              '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"])
         if code == 0:
-            version = out
+            version = out.strip()
         return version
 
     def validate(self):
@@ -461,8 +475,18 @@ class CephIscsi(object):
                   "umount {0}; fi".format(mount_path)
         return cmd.split()
 
+    def get_tcmu_runner_container(self):
+        # type: () -> CephContainer
+        tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id)
+        tcmu_container.entrypoint = "/usr/bin/tcmu-runner"
+        tcmu_container.volume_mounts.pop("/dev/log")
+        tcmu_container.volume_mounts["/dev"] = "/dev:z"
+        tcmu_container.cname = self.get_container_name(desc='tcmu')
+        return tcmu_container
+
 ##################################
 
+
 def get_supported_daemons():
     # type: () -> List[str]
     supported_daemons = list(Ceph.daemons)
@@ -474,6 +498,7 @@ def get_supported_daemons():
 
 ##################################
 
+
 def attempt_bind(s, address, port):
     # type: (socket.socket, str, int) -> None
     try:
@@ -489,6 +514,7 @@ def attempt_bind(s, address, port):
     finally:
         s.close()
 
+
 def port_in_use(port_num):
     # type: (int) -> bool
     """Detect whether a port is in use on the local machine - IPv4 and IPv6"""
@@ -504,6 +530,7 @@ def port_in_use(port_num):
     else:
         return False
 
+
 def check_ip_port(ip, port):
     # type: (str, int) -> None
     if not args.skip_ping_check:
@@ -530,6 +557,7 @@ try:
 except NameError:
     TimeoutError = OSError
 
+
 class Timeout(TimeoutError):
     """
     Raised when the lock could not be acquired in *timeout*
@@ -563,7 +591,7 @@ class _Acquire_ReturnProxy(object):
 
 
 class FileLock(object):
-    def __init__(self, name, timeout = -1):
+    def __init__(self, name, timeout=-1):
         if not os.path.exists(LOCK_DIR):
             os.mkdir(LOCK_DIR, 0o700)
         self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
@@ -643,14 +671,14 @@ class FileLock(object):
                         lock_id, lock_filename, poll_intervall
                     )
                     time.sleep(poll_intervall)
-        except:
+        except:  # noqa
             # Something did go wrong, so decrement the counter.
             self._lock_counter = max(0, self._lock_counter - 1)
 
             raise
         return _Acquire_ReturnProxy(lock = self)
 
-    def release(self, force = False):
+    def release(self, force=False):
         """
         Releases the file lock.
         Please note, that the lock is only completly released, if the lock
@@ -683,10 +711,9 @@ class FileLock(object):
         return None
 
     def __del__(self):
-        self.release(force = True)
+        self.release(force=True)
         return None
 
-
     def _acquire(self):
         open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
         fd = os.open(self._lock_file, open_mode)
@@ -706,8 +733,8 @@ class FileLock(object):
         #   https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
         fd = self._lock_file_fd
         self._lock_file_fd = None
-        fcntl.flock(fd, fcntl.LOCK_UN)
-        os.close(fd)
+        fcntl.flock(fd, fcntl.LOCK_UN)  # type: ignore
+        os.close(fd)  # type: ignore
         return None
 
 
@@ -765,9 +792,10 @@ def call(command,  # type: List[str]
         end_time = start_time + timeout
     while not stop:
         if end_time and (time.time() >= end_time):
-            logger.info(desc + ':timeout after %s seconds' % timeout)
             stop = True
-            process.kill()
+            if process.poll() is None:
+                logger.info(desc + ':timeout after %s seconds' % timeout)
+                process.kill()
         if reads and process.poll() is not None:
             # we want to stop, but first read off anything remaining
             # on stdout/stderr
@@ -814,6 +842,8 @@ def call(command,  # type: List[str]
                     assert False
             except (IOError, OSError):
                 pass
+        logger.debug(desc + ':profile rt=%s, stop=%s, exit=%s, reads=%s'
+                % (time.time()-start_time, stop, process.poll(), reads))
 
     returncode = process.wait()
 
@@ -888,6 +918,7 @@ def call_timeout(command, timeout):
 
 ##################################
 
+
 def is_available(what, func):
     # type: (str, Callable[[], bool]) -> None
     """
@@ -897,12 +928,12 @@ def is_available(what, func):
     :param func: the callable object that determines availability
     """
     retry = args.retry
-    logger.info('Waiting for %s...' % (what))
+    logger.info('Waiting for %s...' % what)
     num = 1
     while True:
         if func():
             logger.info('%s is available'
-                    % (what))
+                        % what)
             break
         elif num > retry:
             raise Error('%s not available after %s tries'
@@ -937,11 +968,13 @@ def read_config(fn):
 
     return cp
 
+
 def pathify(p):
     # type: (str) -> str
     p = os.path.expanduser(p)
     return os.path.abspath(p)
 
+
 def get_file_timestamp(fn):
     # type: (str) -> Optional[str]
     try:
@@ -952,6 +985,7 @@ def get_file_timestamp(fn):
     except Exception as e:
         return None
 
+
 def try_convert_datetime(s):
     # type: (str) -> Optional[str]
     # This is super irritating because
@@ -992,6 +1026,7 @@ def try_convert_datetime(s):
             pass
     return None
 
+
 def get_podman_version():
     # type: () -> Tuple[int, ...]
     if 'podman' not in container_path:
@@ -999,6 +1034,7 @@ def get_podman_version():
     out, _, _ = call_throws([container_path, '--version'])
     return _parse_podman_version(out)
 
+
 def _parse_podman_version(out):
     # type: (str) -> Tuple[int, ...]
     _, _, version_str = out.strip().split()
@@ -1018,24 +1054,29 @@ def get_hostname():
     # type: () -> str
     return socket.gethostname()
 
+
 def get_fqdn():
     # type: () -> str
     return socket.getfqdn() or socket.gethostname()
 
+
 def get_arch():
     # type: () -> str
     return platform.uname().machine
 
+
 def generate_service_id():
     # type: () -> str
     return get_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
                                           for _ in range(6))
 
+
 def generate_password():
     # type: () -> str
     return ''.join(random.choice(string.ascii_lowercase + string.digits)
                    for i in range(10))
 
+
 def normalize_container_id(i):
     # type: (str) -> str
     # docker adds the sha256: prefix, but AFAICS both
@@ -1047,10 +1088,12 @@ def normalize_container_id(i):
         i = i[len(prefix):]
     return i
 
+
 def make_fsid():
     # type: () -> str
     return str(uuid.uuid1())
 
+
 def is_fsid(s):
     # type: (str) -> bool
     try:
@@ -1059,6 +1102,7 @@ def is_fsid(s):
         return False
     return True
 
+
 def infer_fsid(func):
     """
     If we only find a single fsid in /var/lib/ceph/*, use that
@@ -1069,14 +1113,19 @@ def infer_fsid(func):
             logger.debug('Using specified fsid: %s' % args.fsid)
             return func()
 
-        fsids = set()
+        fsids_set = set()
         daemon_list = list_daemons(detail=False)
         for daemon in daemon_list:
-            if 'name' not in args or not args.name:
-                fsids.add(daemon['fsid'])
+            if not is_fsid(daemon['fsid']):
+                # 'unknown' fsid
+                continue
+            elif 'name' not in args or not args.name:
+                # args.name not specified
+                fsids_set.add(daemon['fsid'])
             elif daemon['name'] == args.name:
-                fsids.add(daemon['fsid'])
-        fsids = list(fsids)
+                # args.name is a match
+                fsids_set.add(daemon['fsid'])
+        fsids = sorted(fsids_set)
 
         if not fsids:
             # some commands do not always require an fsid
@@ -1090,6 +1139,7 @@ def infer_fsid(func):
 
     return _infer_fsid
 
+
 def infer_config(func):
     """
     If we find a MON daemon, use the config from that container
@@ -1120,6 +1170,7 @@ def infer_config(func):
 
     return _infer_config
 
+
 def _get_default_image():
     if DEFAULT_IMAGE_IS_MASTER:
         warn = '''This is a development version of cephadm.
@@ -1130,6 +1181,7 @@ For information regarding the latest stable release:
             logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end))
     return DEFAULT_IMAGE
 
+
 def infer_image(func):
     """
     Use the most recent ceph image
@@ -1146,6 +1198,7 @@ def infer_image(func):
 
     return _infer_image
 
+
 def default_image(func):
     @wraps(func)
     def _default_image():
@@ -1163,6 +1216,7 @@ def default_image(func):
 
     return _default_image
 
+
 def get_last_local_ceph_image():
     """
     :return: The most recent local ceph image (already pulled)
@@ -1179,6 +1233,7 @@ def get_last_local_ceph_image():
         return r
     return None
 
+
 def write_tmp(s, uid, gid):
     # type: (str, int, int) -> Any
     tmp_f = tempfile.NamedTemporaryFile(mode='w',
@@ -1189,6 +1244,7 @@ def write_tmp(s, uid, gid):
 
     return tmp_f
 
+
 def makedirs(dir, uid, gid, mode):
     # type: (str, int, int, int) -> None
     if not os.path.exists(dir):
@@ -1198,14 +1254,17 @@ def makedirs(dir, uid, gid, mode):
     os.chown(dir, uid, gid)
     os.chmod(dir, mode)   # the above is masked by umask...
 
+
 def get_data_dir(fsid, t, n):
     # type: (str, str, Union[int, str]) -> str
     return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
 
+
 def get_log_dir(fsid):
     # type: (str) -> str
     return os.path.join(args.log_dir, fsid)
 
+
 def make_data_dir_base(fsid, uid, gid):
     # type: (str, int, int) -> str
     data_dir_base = os.path.join(args.data_dir, fsid)
@@ -1215,30 +1274,34 @@ def make_data_dir_base(fsid, uid, gid):
              DATA_DIR_MODE)
     return data_dir_base
 
+
 def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
-    # type: (str, str, Union[int, str], int, int) -> str
-    if not uid or not gid:
-        (uid, gid) = extract_uid_gid()
+    # type: (str, str, Union[int, str], Optional[int], Optional[int]) -> str
+    if uid is None or gid is None:
+        uid, gid = extract_uid_gid()
     make_data_dir_base(fsid, uid, gid)
     data_dir = get_data_dir(fsid, daemon_type, daemon_id)
     makedirs(data_dir, uid, gid, DATA_DIR_MODE)
     return data_dir
 
+
 def make_log_dir(fsid, uid=None, gid=None):
-    # type: (str, int, int) -> str
-    if not uid or not gid:
-        (uid, gid) = extract_uid_gid()
+    # type: (str, Optional[int], Optional[int]) -> str
+    if uid is None or gid is None:
+        uid, gid = extract_uid_gid()
     log_dir = get_log_dir(fsid)
     makedirs(log_dir, uid, gid, LOG_DIR_MODE)
     return log_dir
 
+
 def make_var_run(fsid, uid, gid):
     # type: (str, int, int) -> None
     call_throws(['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid),
                  '/var/run/ceph/%s' % fsid])
 
+
 def copy_tree(src, dst, uid=None, gid=None):
-    # type: (List[str], str, int, int) -> None
+    # type: (List[str], str, Optional[int], Optional[int]) -> None
     """
     Copy a directory tree from src to dst
     """
@@ -1263,7 +1326,7 @@ def copy_tree(src, dst, uid=None, gid=None):
 
 
 def copy_files(src, dst, uid=None, gid=None):
-    # type: (List[str], str, int, int) -> None
+    # type: (List[str], str, Optional[int], Optional[int]) -> None
     """
     Copy a files from src to dst
     """
@@ -1281,8 +1344,9 @@ def copy_files(src, dst, uid=None, gid=None):
         logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
         os.chown(dst_file, uid, gid)
 
+
 def move_files(src, dst, uid=None, gid=None):
-    # type: (List[str], str, int, int) -> None
+    # type: (List[str], str, Optional[int], Optional[int]) -> None
     """
     Move files from src to dst
     """
@@ -1306,6 +1370,7 @@ def move_files(src, dst, uid=None, gid=None):
             logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file))
             os.chown(dst_file, uid, gid)
 
+
 ## copied from distutils ##
 def find_executable(executable, path=None):
     """Tries to find 'executable' in the directories listed in 'path'.
@@ -1342,6 +1407,7 @@ def find_executable(executable, path=None):
             return f
     return None
 
+
 def find_program(filename):
     # type: (str) -> str
     name = find_executable(filename)
@@ -1349,6 +1415,7 @@ def find_program(filename):
         raise ValueError('%s not found' % filename)
     return name
 
+
 def get_unit_name(fsid, daemon_type, daemon_id=None):
     # type: (str, str, Optional[Union[int, str]]) -> str
     # accept either name or type + id
@@ -1357,6 +1424,7 @@ def get_unit_name(fsid, daemon_type, daemon_id=None):
     else:
         return 'ceph-%s@%s' % (fsid, daemon_type)
 
+
 def get_unit_name_by_daemon_name(fsid, name):
     daemon = get_daemon_description(fsid, name)
     try:
@@ -1364,6 +1432,7 @@ def get_unit_name_by_daemon_name(fsid, name):
     except KeyError:
         raise Error('Failed to get unit name for {}'.format(daemon))
 
+
 def check_unit(unit_name):
     # type: (str) -> Tuple[bool, str, bool]
     # NOTE: we ignore the exit code here because systemctl outputs
@@ -1402,6 +1471,7 @@ def check_unit(unit_name):
         state = 'unknown'
     return (enabled, state, installed)
 
+
 def check_units(units, enabler=None):
     # type: (List[str], Optional[Packager]) -> bool
     for u in units:
@@ -1415,8 +1485,9 @@ def check_units(units, enabler=None):
                 enabler.enable_service(u)
     return False
 
+
 def get_legacy_config_fsid(cluster, legacy_dir=None):
-    # type: (str, str) -> Optional[str]
+    # type: (str, Optional[str]) -> Optional[str]
     config_file = '/etc/ceph/%s.conf' % cluster
     if legacy_dir is not None:
         config_file = os.path.abspath(legacy_dir + config_file)
@@ -1427,8 +1498,9 @@ def get_legacy_config_fsid(cluster, legacy_dir=None):
             return config.get('global', 'fsid')
     return None
 
+
 def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
-    # type: (str, str, Union[int, str], str) -> Optional[str]
+    # type: (str, str, Union[int, str], Optional[str]) -> Optional[str]
     fsid = None
     if daemon_type == 'osd':
         try:
@@ -1446,6 +1518,7 @@ def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
         fsid = get_legacy_config_fsid(cluster, legacy_dir=legacy_dir)
     return fsid
 
+
 def get_daemon_args(fsid, daemon_type, daemon_id):
     # type: (str, str, Union[int, str]) -> List[str]
     r = list()  # type: List[str]
@@ -1471,12 +1544,15 @@ def get_daemon_args(fsid, daemon_type, daemon_id):
             peers = config.get('peers', list())  # type: ignore
             for peer in peers:
                 r += ["--cluster.peer={}".format(peer)]
+            # some alertmanager, by default, look elsewhere for a config
+            r += ["--config.file=/etc/alertmanager/alertmanager.yml"]
     elif daemon_type == NFSGanesha.daemon_type:
         nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
         r += nfs_ganesha.get_daemon_args()
 
     return r
 
+
 def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
                        config=None, keyring=None):
     # type: (str, str, Union[int, str], int, int, Optional[str], Optional[str]) ->  None
@@ -1521,7 +1597,6 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
             makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
             makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
 
-
         # populate the config directory for the component from the config-json
         for fname in required_files:
             if 'files' in config: # type: ignore
@@ -1543,6 +1618,7 @@ def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
         ceph_iscsi = CephIscsi.init(fsid, daemon_id)
         ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
 
+
 def get_parm(option):
     # type: (str) -> Dict[str, str]
 
@@ -1577,6 +1653,7 @@ def get_parm(option):
     else:
         return js
 
+
 def get_config_and_keyring():
     # type: () -> Tuple[Optional[str], Optional[str]]
     config = None
@@ -1597,7 +1674,19 @@ def get_config_and_keyring():
         with open(args.keyring, 'r') as f:
             keyring = f.read()
 
-    return (config, keyring)
+    return config, keyring
+
+
+def get_container_binds(fsid, daemon_type, daemon_id):
+    # type: (str, str, Union[int, str, None]) -> List[List[str]]
+    binds = list()
+
+    if daemon_type == CephIscsi.daemon_type:
+        assert daemon_id
+        binds.extend(CephIscsi.get_container_binds())
+
+    return binds
+
 
 def get_container_mounts(fsid, daemon_type, daemon_id,
                          no_config=False):
@@ -1667,7 +1756,7 @@ def get_container_mounts(fsid, daemon_type, daemon_id,
             mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z'
             mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z'
         elif daemon_type == 'alertmanager':
-            mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/alertmanager:Z'
+            mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/etc/alertmanager:Z'
 
     if daemon_type == NFSGanesha.daemon_type:
         assert daemon_id
@@ -1682,6 +1771,7 @@ def get_container_mounts(fsid, daemon_type, daemon_id,
 
     return mounts
 
+
 def get_container(fsid, daemon_type, daemon_id,
                   privileged=False,
                   ptrace=False,
@@ -1718,20 +1808,14 @@ def get_container(fsid, daemon_type, daemon_id,
         entrypoint = ''
         name = ''
 
-    ceph_args = [] # type: List[str]
+    ceph_args = []  # type: List[str]
     if daemon_type in Monitoring.components:
         uid, gid = extract_uid_gid_monitoring(daemon_type)
-        m = Monitoring.components[daemon_type]  # type: ignore
-        metadata = m.get('image', dict())  # type: ignore
         monitoring_args = [
             '--user',
             str(uid),
             # FIXME: disable cpu/memory limits for the time being (not supported
             # by ubuntu 18.04 kernel!)
-            #'--cpus',
-            #metadata.get('cpus', '2'),
-            #'--memory',
-            #metadata.get('memory', '4GB')
         ]
         container_args.extend(monitoring_args)
     elif daemon_type == 'crash':
@@ -1739,7 +1823,7 @@ def get_container(fsid, daemon_type, daemon_id,
     elif daemon_type in Ceph.daemons:
         ceph_args = ['-n', name, '-f']
 
-    envs=[] # type: List[str]
+    envs = []  # type: List[str]
     if daemon_type == NFSGanesha.daemon_type:
         envs.extend(NFSGanesha.get_container_envs())
 
@@ -1749,31 +1833,50 @@ def get_container(fsid, daemon_type, daemon_id,
         args=ceph_args + get_daemon_args(fsid, daemon_type, daemon_id),
         container_args=container_args,
         volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+        bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
         cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
         envs=envs,
         privileged=privileged,
         ptrace=ptrace,
     )
 
+
 def extract_uid_gid(img='', file_path='/var/lib/ceph'):
-    # type: (str, str) -> Tuple[int, int]
+    # type: (str, Union[str, List[str]]) -> Tuple[int, int]
 
     if not img:
         img = args.image
 
-    out = CephContainer(
-        image=img,
-        entrypoint='stat',
-        args=['-c', '%u %g', file_path]
-    ).run()
-    (uid, gid) = out.split(' ')
-    return (int(uid), int(gid))
+    if isinstance(file_path, str):
+        paths = [file_path]
+    else:
+        paths = file_path
+
+    for fp in paths:
+        try:
+            out = CephContainer(
+                image=img,
+                entrypoint='stat',
+                args=['-c', '%u %g', fp]
+            ).run()
+            uid, gid = out.split(' ')
+            return int(uid), int(gid)
+        except RuntimeError:
+            pass
+    raise RuntimeError('uid/gid not found')
+
 
 def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
                   config=None, keyring=None,
                   osd_fsid=None,
-                  reconfig=False):
-    # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool]) -> None
+                  reconfig=False,
+                  ports=None):
+    # type: (str, str, Union[int, str], CephContainer, int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
+
+    ports = ports or []
+    if any([port_in_use(port) for port in ports]):
+        raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type))
+
     data_dir = get_data_dir(fsid, daemon_type, daemon_id)
     if reconfig and not os.path.exists(data_dir):
         raise Error('cannot reconfig, data path %s does not exist' % data_dir)
@@ -1836,6 +1939,12 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
 
     update_firewalld(daemon_type)
 
+    # Open ports explicitly required for the daemon
+    if ports:
+        fw = Firewalld()
+        fw.open_ports(ports)
+        fw.apply_rules()
+
     if reconfig and daemon_type not in Ceph.daemons:
         # ceph daemons do not need a restart; others (presumably) do to pick
         # up the new config
@@ -1844,6 +1953,21 @@ def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
         call_throws(['systemctl', 'restart',
                      get_unit_name(fsid, daemon_type, daemon_id)])
 
+def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False):
+    # type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None
+    if comment:
+        # Sometimes adding a comment, espectially if there are multiple containers in one
+        # unit file, makes it easier to read and grok.
+        file_obj.write('# ' + comment + '\n')
+    # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
+    file_obj.write('! '+ ' '.join(container.rm_cmd()) + '\n')
+    # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
+    if 'podman' in container_path:
+        file_obj.write('! '+ ' '.join(container.rm_cmd(storage=True)) + '\n')
+
+    # container run command
+    file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n')
+
 def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
                         enable=True, start=True,
                         osd_fsid=None):
@@ -1851,28 +1975,34 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
     # cmd
     data_dir = get_data_dir(fsid, daemon_type, daemon_id)
     with open(data_dir + '/unit.run.new', 'w') as f:
+        f.write('set -e\n')
         # pre-start cmd(s)
         if daemon_type == 'osd':
             # osds have a pre-start step
             assert osd_fsid
-            f.write('# Simple OSDs need chown on startup:\n')
-            for n in ['block', 'block.db', 'block.wal']:
-                p = os.path.join(data_dir, n)
-                f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
-            f.write('# LVM OSDs use ceph-volume lvm activate:\n')
-            prestart = CephContainer(
-                image=args.image,
-                entrypoint='/usr/sbin/ceph-volume',
-                args=[
-                    'lvm', 'activate',
-                    str(daemon_id), osd_fsid,
-                    '--no-systemd'
-                ],
-                privileged=True,
-                volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
-                cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
-            )
-            f.write(' '.join(prestart.run_cmd()) + '\n')
+            simple_fn = os.path.join('/etc/ceph/osd',
+                                     '%s-%s.json.adopted-by-cephadm' % (daemon_id, osd_fsid))
+            if os.path.exists(simple_fn):
+                f.write('# Simple OSDs need chown on startup:\n')
+                for n in ['block', 'block.db', 'block.wal']:
+                    p = os.path.join(data_dir, n)
+                    f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
+            else:
+                f.write('# LVM OSDs use ceph-volume lvm activate:\n')
+                prestart = CephContainer(
+                    image=args.image,
+                    entrypoint='/usr/sbin/ceph-volume',
+                    args=[
+                        'lvm', 'activate',
+                        str(daemon_id), osd_fsid,
+                        '--no-systemd'
+                    ],
+                    privileged=True,
+                    volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+                    bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
+                    cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
+                )
+                f.write(' '.join(prestart.run_cmd()) + '\n')
         elif daemon_type == NFSGanesha.daemon_type:
             # add nfs to the rados grace db
             nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
@@ -1880,13 +2010,15 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
             f.write(' '.join(prestart.run_cmd()) + '\n')
         elif daemon_type == CephIscsi.daemon_type:
             f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
+            ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+            tcmu_container = ceph_iscsi.get_tcmu_runner_container()
+            _write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
 
         if daemon_type in Ceph.daemons:
             install_path = find_program('install')
             f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
 
-        # container run command
-        f.write(' '.join(c.run_cmd()) + '\n')
+        _write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id)))
         os.fchmod(f.fileno(), 0o600)
         os.rename(data_dir + '/unit.run.new',
                   data_dir + '/unit.run')
@@ -1904,6 +2036,7 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
                 ],
                 privileged=True,
                 volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
+                bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
                 cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type,
                                                     daemon_id),
             )
@@ -1914,6 +2047,10 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
             poststop = nfs_ganesha.get_rados_grace_container('remove')
             f.write(' '.join(poststop.run_cmd()) + '\n')
         elif daemon_type == CephIscsi.daemon_type:
+            # make sure we also stop the tcmu container
+            ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+            tcmu_container = ceph_iscsi.get_tcmu_runner_container()
+            f.write('! '+ ' '.join(tcmu_container.stop_cmd()) + '\n')
             f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
         os.fchmod(f.fileno(), 0o600)
         os.rename(data_dir + '/unit.poststop.new',
@@ -1945,56 +2082,94 @@ def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
     if start:
         call_throws(['systemctl', 'start', unit_name])
 
-def update_firewalld(daemon_type):
-    # type: (str) -> None
-    if args.skip_firewalld:
-        return
-    cmd = find_executable('firewall-cmd')
-    if not cmd:
-        logger.debug('firewalld does not appear to be present')
-        return
-    (enabled, state, _) = check_unit('firewalld.service')
-    if not enabled:
-        logger.debug('firewalld.service is not enabled')
-        return
-
-    fw_services = []
-    fw_ports = []
-    if daemon_type == 'mon':
-        fw_services.append('ceph-mon')
-    elif daemon_type in ['mgr', 'mds', 'osd']:
-        fw_services.append('ceph')
-    if daemon_type == 'mgr':
-        fw_ports.append(8080)  # dashboard
-        fw_ports.append(8443)  # dashboard
-        fw_ports.append(9283)  # mgr/prometheus exporter
-    elif daemon_type in Monitoring.port_map.keys():
-        fw_ports.extend(Monitoring.port_map[daemon_type])  # prometheus etc
-    elif daemon_type == NFSGanesha.daemon_type:
-        fw_services.append('nfs')
 
-    for svc in fw_services:
-        out, err, ret = call([cmd, '--permanent', '--query-service', svc])
+
+class Firewalld(object):
+    def __init__(self):
+        # type: () -> None
+        self.available = self.check()
+
+    def check(self):
+        # type: () -> bool
+        self.cmd = find_executable('firewall-cmd')
+        if not self.cmd:
+            logger.debug('firewalld does not appear to be present')
+            return False
+        (enabled, state, _) = check_unit('firewalld.service')
+        if not enabled:
+            logger.debug('firewalld.service is not enabled')
+            return False
+        if state != "running":
+            logger.debug('firewalld.service is not running')
+            return False
+
+        logger.info("firewalld ready")
+        return True
+
+    def enable_service_for(self, daemon_type):
+        # type: (str) -> None
+        if not self.available:
+            logger.debug('Not possible to enable service <%s>. firewalld.service is not available' % daemon_type)
+            return
+
+        if daemon_type == 'mon':
+            svc = 'ceph-mon'
+        elif daemon_type in ['mgr', 'mds', 'osd']:
+            svc = 'ceph'
+        elif daemon_type == NFSGanesha.daemon_type:
+            svc = 'nfs'
+        else:
+            return
+
+        out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbose_on_failure=False)
         if ret:
             logger.info('Enabling firewalld service %s in current zone...' % svc)
-            out, err, ret = call([cmd, '--permanent', '--add-service', svc])
+            out, err, ret = call([self.cmd, '--permanent', '--add-service', svc])
             if ret:
                 raise RuntimeError(
                     'unable to add service %s to current zone: %s' % (svc, err))
         else:
             logger.debug('firewalld service %s is enabled in current zone' % svc)
-    for port in fw_ports:
-        tcp_port = str(port) + '/tcp'
-        out, err, ret = call([cmd, '--permanent', '--query-port', tcp_port])
-        if ret:
-            logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
-            out, err, ret = call([cmd, '--permanent', '--add-port', tcp_port])
+
+    def open_ports(self, fw_ports):
+        # type: (List[int]) -> None
+        if not self.available:
+            logger.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports)
+            return
+
+        for port in fw_ports:
+            tcp_port = str(port) + '/tcp'
+            out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
             if ret:
-                raise RuntimeError('unable to add port %s to current zone: %s' %
-                                   (tcp_port, err))
-        else:
-            logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
-    call_throws([cmd, '--reload'])
+                logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
+                out, err, ret = call([self.cmd, '--permanent', '--add-port', tcp_port])
+                if ret:
+                    raise RuntimeError('unable to add port %s to current zone: %s' %
+                                    (tcp_port, err))
+            else:
+                logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
+
+    def apply_rules(self):
+        # type: () -> None
+        if not self.available:
+            return
+
+        call_throws([self.cmd, '--reload'])
+
+
+def update_firewalld(daemon_type):
+    # type: (str) -> None
+    firewall = Firewalld()
+
+    firewall.enable_service_for(daemon_type)
+
+    fw_ports = []
+
+    if daemon_type in Monitoring.port_map.keys():
+        fw_ports.extend(Monitoring.port_map[daemon_type])  # prometheus etc
+
+    firewall.open_ports(fw_ports)
+    firewall.apply_rules()
 
 def install_base_units(fsid):
     # type: (str) -> None
@@ -2064,6 +2239,7 @@ def install_base_units(fsid):
 }
 """ % fsid)
 
+
 def get_unit_file(fsid):
     # type: (str) -> str
     u = """# generated by cephadm
@@ -2106,6 +2282,7 @@ WantedBy=ceph-{fsid}.target
 
 ##################################
 
+
 class CephContainer:
     def __init__(self,
                  image,
@@ -2116,8 +2293,9 @@ class CephContainer:
                  container_args=[],
                  envs=None,
                  privileged=False,
-                 ptrace=False):
-        # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool) -> None
+                 ptrace=False,
+                 bind_mounts=None):
+        # type: (str, str, List[str], Dict[str, str], str, List[str], Optional[List[str]], bool, bool, Optional[List[List[str]]]) -> None
         self.image = image
         self.entrypoint = entrypoint
         self.args = args
@@ -2127,17 +2305,19 @@ class CephContainer:
         self.envs = envs
         self.privileged = privileged
         self.ptrace = ptrace
+        self.bind_mounts = bind_mounts if bind_mounts else []
 
     def run_cmd(self):
         # type: () -> List[str]
-        vols = [] # type: List[str]
-        envs = [] # type: List[str]
-        cname = [] # type: List[str]
-        entrypoint = [] # type: List[str]
+        vols = []  # type: List[str]
+        envs = []  # type: List[str]
+        cname = []  # type: List[str]
+        binds = []  # type: List[str]
+        entrypoint = []  # type: List[str]
         if self.entrypoint:
             entrypoint = ['--entrypoint', self.entrypoint]
 
-        priv = [] # type: List[str]
+        priv = []  # type: List[str]
         if self.privileged:
             priv = ['--privileged',
                     # let OSD etc read block devs that haven't been chowned
@@ -2147,6 +2327,8 @@ class CephContainer:
         vols = sum(
             [['-v', '%s:%s' % (host_dir, container_dir)]
              for host_dir, container_dir in self.volume_mounts.items()], [])
+        binds = sum([['--mount', '{}'.format(','.join(bind))]
+                     for bind in self.bind_mounts],[])
         envs = [
             '-e', 'CONTAINER_IMAGE=%s' % self.image,
             '-e', 'NODE_NAME=%s' % get_hostname(),
@@ -2163,22 +2345,25 @@ class CephContainer:
             '--ipc=host',
         ] + self.container_args + priv + \
         cname + envs + \
-        vols + entrypoint + \
+        vols + binds + entrypoint + \
         [
             self.image
         ] + self.args # type: ignore
 
     def shell_cmd(self, cmd):
         # type: (List[str]) -> List[str]
-        priv = [] # type: List[str]
+        priv = []  # type: List[str]
         if self.privileged:
             priv = ['--privileged',
                     # let OSD etc read block devs that haven't been chowned
                     '--group-add=disk']
-        vols = [] # type: List[str]
+        vols = []  # type: List[str]
         vols = sum(
             [['-v', '%s:%s' % (host_dir, container_dir)]
              for host_dir, container_dir in self.volume_mounts.items()], [])
+        binds = [] # type: List[str]
+        binds = sum([['--mount', '{}'.format(','.join(bind))]
+                     for bind in self.bind_mounts], [])
         envs = [
             '-e', 'CONTAINER_IMAGE=%s' % self.image,
             '-e', 'NODE_NAME=%s' % get_hostname(),
@@ -2195,7 +2380,7 @@ class CephContainer:
             '--rm',
             '--net=host',
             '--ipc=host',
-        ] + self.container_args + priv + envs + vols + [
+        ] + self.container_args + priv + envs + vols + binds + [
             '--entrypoint', cmd[0],
             self.image
         ] + cmd[1:]
@@ -2209,6 +2394,25 @@ class CephContainer:
             self.cname,
         ] + cmd
 
+    def rm_cmd(self, storage=False):
+        # type: (bool) -> List[str]
+        ret = [
+            str(container_path),
+            'rm', '-f',
+        ]
+        if storage:
+            ret.append('--storage')
+        ret.append(self.cname)
+        return ret
+
+    def stop_cmd(self):
+        # type () -> List[str]
+        ret = [
+            str(container_path),
+            'stop', self.cname,
+        ]
+        return ret
+
     def run(self, timeout=DEFAULT_TIMEOUT):
         # type: (Optional[int]) -> str
         logger.debug(self.run_cmd())
@@ -2218,6 +2422,7 @@ class CephContainer:
 
 ##################################
 
+
 @infer_image
 def command_version():
     # type: () -> int
@@ -2227,15 +2432,43 @@ def command_version():
 
 ##################################
 
+
 @infer_image
 def command_pull():
     # type: () -> int
-    logger.info('Pulling latest %s...' % args.image)
-    call_throws([container_path, 'pull', args.image])
+
+    _pull_image(args.image)
     return command_inspect_image()
 
+
+def _pull_image(image):
+    # type: (str) -> None
+    logger.info('Pulling container image %s...' % image)
+
+    ignorelist = [
+        "error creating read-write layer with ID",
+        "net/http: TLS handshake timeout",
+        "Digest did not match, expected",
+    ]
+
+    cmd = [container_path, 'pull', image]
+    cmd_str = ' '.join(cmd)
+
+    for sleep_secs in [1, 4, 25]:
+        out, err, ret = call(cmd)
+        if not ret:
+            return
+
+        if not any(pattern in err for pattern in ignorelist):
+            raise RuntimeError('Failed command: %s' % cmd_str)
+
+        logger.info('"%s failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs))
+        time.sleep(sleep_secs)
+
+    raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str)
 ##################################
 
+
 @infer_image
 def command_inspect_image():
     # type: () -> int
@@ -2256,6 +2489,23 @@ def command_inspect_image():
 
 ##################################
 
+def unwrap_ipv6(address):
+    # type: (str) -> str
+    if address.startswith('[') and address.endswith(']'):
+        return address[1:-1]
+    return address
+
+
+def is_ipv6(address):
+    # type: (str) -> bool
+    address = unwrap_ipv6(address)
+    try:
+        return ipaddress.ip_address(unicode(address)).version == 6
+    except ValueError:
+        logger.warning("Address: {} isn't a valid IP address".format(address))
+        return False
+
+
 @default_image
 def command_bootstrap():
     # type: () -> int
@@ -2291,14 +2541,16 @@ def command_bootstrap():
     mon_id = args.mon_id or hostname
     mgr_id = args.mgr_id or generate_service_id()
     logging.info('Cluster fsid: %s' % fsid)
+    ipv6 = False
 
     l = FileLock(fsid)
     l.acquire()
 
     # ip
     r = re.compile(r':(\d+)$')
-    base_ip = None
+    base_ip = ''
     if args.mon_ip:
+        ipv6 = is_ipv6(args.mon_ip)
         hasport = r.findall(args.mon_ip)
         if hasport:
             port = int(hasport[0])
@@ -2322,6 +2574,7 @@ def command_bootstrap():
         if addr_arg[0] != '[' or addr_arg[-1] != ']':
             raise Error('--mon-addrv value %s must use square backets' %
                         addr_arg)
+        ipv6 = addr_arg.count('[') > 1
         for addr in addr_arg[1:-1].split(','):
             hasport = r.findall(addr)
             if not hasport:
@@ -2341,7 +2594,8 @@ def command_bootstrap():
         # make sure IP is configured locally, and then figure out the
         # CIDR network
         for net, ips in list_networks().items():
-            if base_ip in ips:
+            if ipaddress.ip_address(unicode(unwrap_ipv6(base_ip))) in \
+                    [ipaddress.ip_address(unicode(ip)) for ip in ips]:
                 mon_network = net
                 logger.info('Mon IP %s is in CIDR network %s' % (base_ip,
                                                                  mon_network))
@@ -2361,9 +2615,11 @@ def command_bootstrap():
     cp.write(cpf)
     config = cpf.getvalue()
 
+    if args.registry_json or args.registry_url:
+        command_registry_login()
+
     if not args.skip_pull:
-        logger.info('Pulling latest %s container...' % args.image)
-        call_throws([container_path, 'pull', args.image])
+        _pull_image(args.image)
 
     logger.info('Extracting ceph user uid/gid from container image...')
     (uid, gid) = extract_uid_gid()
@@ -2498,7 +2754,7 @@ def command_bootstrap():
     # wait for the service to become available
     def is_mon_available():
         # type: () -> bool
-        timeout=args.timeout if args.timeout else 30 # seconds
+        timeout=args.timeout if args.timeout else 60 # seconds
         out, err, ret = call(c.run_cmd(),
                              desc=c.entrypoint,
                              timeout=timeout)
@@ -2535,12 +2791,17 @@ def command_bootstrap():
         logger.info('Setting mon public_network...')
         cli(['config', 'set', 'mon', 'public_network', mon_network])
 
+    if ipv6:
+        logger.info('Enabling IPv6 (ms_bind_ipv6)')
+        cli(['config', 'set', 'global', 'ms_bind_ipv6', 'true'])
+
     # create mgr
     logger.info('Creating mgr...')
     mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
     mgr_c = get_container(fsid, 'mgr', mgr_id)
+    # Note:the default port used by the Prometheus node exporter is opened in fw
     deploy_daemon(fsid, 'mgr', mgr_id, mgr_c, uid, gid,
-                  config=config, keyring=mgr_keyring)
+                  config=config, keyring=mgr_keyring, ports=[9283])
 
     # output files
     with open(args.output_keyring, 'w') as f:
@@ -2557,7 +2818,7 @@ def command_bootstrap():
     logger.info('Waiting for mgr to start...')
     def is_mgr_available():
         # type: () -> bool
-        timeout=args.timeout if args.timeout else 30 # seconds
+        timeout=args.timeout if args.timeout else 60 # seconds
         try:
             out = cli(['status', '-f', 'json-pretty'], timeout=timeout)
             j = json.loads(out)
@@ -2588,6 +2849,8 @@ def command_bootstrap():
 
     # ssh
     if not args.skip_ssh:
+        cli(['config-key', 'set', 'mgr/cephadm/ssh_user', args.ssh_user])
+
         logger.info('Enabling cephadm module...')
         cli(['mgr', 'module', 'enable', 'cephadm'])
         wait_for_mgr_restart()
@@ -2619,11 +2882,21 @@ def command_bootstrap():
                 f.write(ssh_pub)
             logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key)
 
-            logger.info('Adding key to root@localhost\'s authorized_keys...')
-            if not os.path.exists('/root/.ssh'):
-                os.mkdir('/root/.ssh', 0o700)
-            auth_keys_file = '/root/.ssh/authorized_keys'
+            logger.info('Adding key to %s@localhost\'s authorized_keys...' % args.ssh_user)
+            try:
+                s_pwd = pwd.getpwnam(args.ssh_user)
+            except KeyError as e:
+                raise Error('Cannot find uid/gid for ssh-user: %s' % (args.ssh_user))
+            ssh_uid = s_pwd.pw_uid
+            ssh_gid = s_pwd.pw_gid
+            ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
+
+            if not os.path.exists(ssh_dir):
+                makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
+
+            auth_keys_file = '%s/authorized_keys' % ssh_dir
             add_newline = False
+
             if os.path.exists(auth_keys_file):
                 with open(auth_keys_file, 'r') as f:
                     f.seek(0, os.SEEK_END)
@@ -2631,7 +2904,9 @@ def command_bootstrap():
                         f.seek(f.tell()-1, os.SEEK_SET) # go to last char
                         if f.read() != '\n':
                             add_newline = True
+
             with open(auth_keys_file, 'a') as f:
+                os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
                 os.fchmod(f.fileno(), 0o600)  # just in case we created it
                 if add_newline:
                     f.write('\n')
@@ -2639,7 +2914,10 @@ def command_bootstrap():
 
         host = get_hostname()
         logger.info('Adding host %s...' % host)
-        cli(['orch', 'host', 'add', host])
+        try:
+            cli(['orch', 'host', 'add', host])
+        except RuntimeError as e:
+            raise Error('Failed to add host <%s>: %s' % (host, e))
 
         if not args.orphan_initial_daemons:
             for t in ['mon', 'mgr', 'crash']:
@@ -2653,7 +2931,17 @@ def command_bootstrap():
                 logger.info('Deploying %s service with default placement...' % t)
                 cli(['orch', 'apply', t])
 
+    if args.registry_url and args.registry_username and args.registry_password:
+        cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', args.registry_url, '--force'])
+        cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force'])
+        cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force'])
+
     if not args.skip_dashboard:
+        # Configure SSL port (cephadm only allows to configure dashboard SSL port)
+        # if the user does not want to use SSL he can change this setting once the cluster is up
+        cli(["config", "set",  "mgr", "mgr/dashboard/ssl_server_port" , str(args.ssl_dashboard_port)])
+
+        # configuring dashboard parameters
         logger.info('Enabling the dashboard module...')
         cli(['mgr', 'module', 'enable', 'dashboard'])
         wait_for_mgr_restart()
@@ -2681,6 +2969,11 @@ def command_bootstrap():
         out = cli(['config', 'get', 'mgr', 'mgr/dashboard/ssl_server_port'])
         port = int(out)
 
+        # Open dashboard port
+        fw = Firewalld()
+        fw.open_ports([port])
+        fw.apply_rules()
+
         logger.info('Ceph Dashboard is now available at:\n\n'
                     '\t     URL: https://%s:%s/\n'
                     '\t    User: %s\n'
@@ -2688,7 +2981,7 @@ def command_bootstrap():
                         get_fqdn(), port,
                         args.initial_dashboard_user,
                         password))
-    
+
     if args.apply_spec:
         logger.info('Applying %s to cluster' % args.apply_spec)
 
@@ -2703,7 +2996,7 @@ def command_bootstrap():
                         ssh_key = '/etc/ceph/ceph.pub'
                         if args.ssh_public_key:
                             ssh_key = args.ssh_public_key.name
-                        out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, 'root@%s' % split[1]])
+                        out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])])
 
         mounts = {}
         mounts[pathify(args.apply_spec)] = '/tmp/spec.yml:z'
@@ -2726,6 +3019,44 @@ def command_bootstrap():
 
 ##################################
 
+def command_registry_login():
+    if args.registry_json:
+        logger.info("Pulling custom registry login info from %s." % args.registry_json)
+        d = get_parm(args.registry_json)
+        if d.get('url') and d.get('username') and d.get('password'):
+            args.registry_url = d.get('url')
+            args.registry_username = d.get('username')
+            args.registry_password = d.get('password')
+            registry_login(args.registry_url, args.registry_username, args.registry_password)
+        else:
+            raise Error("json provided for custom registry login did not include all necessary fields. "
+                            "Please setup json file as\n"
+                            "{\n"
+                              " \"url\": \"REGISTRY_URL\",\n"
+                              " \"username\": \"REGISTRY_USERNAME\",\n"
+                              " \"password\": \"REGISTRY_PASSWORD\"\n"
+                            "}\n")
+    elif args.registry_url and args.registry_username and args.registry_password:
+        registry_login(args.registry_url, args.registry_username, args.registry_password)
+    else:
+        raise Error("Invalid custom registry arguments received. To login to a custom registry include "
+                        "--registry-url, --registry-username and --registry-password "
+                        "options or --registry-json option")
+    return 0
+
+def registry_login(url, username, password):
+    logger.info("Logging into custom registry.")
+    try:
+        out, _, _ = call_throws([container_path, 'login',
+                                   '-u', username,
+                                   '-p', password,
+                                   url])
+    except:
+        raise Error("Failed to login to custom registry @ %s as %s with given password" % (args.registry_url, args.registry_username))
+
+##################################
+
+
 def extract_uid_gid_monitoring(daemon_type):
     # type: (str) -> Tuple[int, int]
 
@@ -2736,7 +3067,7 @@ def extract_uid_gid_monitoring(daemon_type):
     elif daemon_type == 'grafana':
         uid, gid = extract_uid_gid(file_path='/var/lib/grafana')
     elif daemon_type == 'alertmanager':
-        uid, gid = extract_uid_gid(file_path='/etc/alertmanager')
+        uid, gid = extract_uid_gid(file_path=['/etc/alertmanager', '/etc/prometheus'])
     else:
         raise Error("{} not implemented yet".format(daemon_type))
     return uid, gid
@@ -2766,24 +3097,29 @@ def command_deploy():
     else:
         logger.info('%s daemon %s ...' % ('Deploy', args.name))
 
+    # Get and check ports explicitly required to be opened
+    daemon_ports = [] # type: List[int]
+    if args.tcp_ports:
+        daemon_ports = list(map(int, args.tcp_ports.split()))
+
     if daemon_type in Ceph.daemons:
         config, keyring = get_config_and_keyring()
         uid, gid = extract_uid_gid()
         make_var_run(args.fsid, uid, gid)
+
         c = get_container(args.fsid, daemon_type, daemon_id,
                           ptrace=args.allow_ptrace)
         deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
                       osd_fsid=args.osd_fsid,
-                      reconfig=args.reconfig)
+                      reconfig=args.reconfig,
+                      ports=daemon_ports)
 
     elif daemon_type in Monitoring.components:
         # monitoring daemon - prometheus, grafana, alertmanager, node-exporter
         # Default Checks
         if not args.reconfig and not redeploy:
-            daemon_ports = Monitoring.port_map[daemon_type]  # type: List[int]
-            if any([port_in_use(port) for port in daemon_ports]):
-                raise Error("TCP Port(s) '{}' required for {} is already in use".format(",".join(map(str, daemon_ports)), daemon_type))
+            daemon_ports.extend(Monitoring.port_map[daemon_type])
 
         # make sure provided config-json is sufficient
         config = get_parm(args.config_json) # type: ignore
@@ -2801,18 +3137,21 @@ def command_deploy():
         uid, gid = extract_uid_gid_monitoring(daemon_type)
         c = get_container(args.fsid, daemon_type, daemon_id)
         deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
-                      reconfig=args.reconfig)
+                      reconfig=args.reconfig,
+                      ports=daemon_ports)
 
     elif daemon_type == NFSGanesha.daemon_type:
         if not args.reconfig and not redeploy:
-            NFSGanesha.port_in_use()
+            daemon_ports.extend(NFSGanesha.port_map.values())
+
         config, keyring = get_config_and_keyring()
         # TODO: extract ganesha uid/gid (997, 994) ?
         uid, gid = extract_uid_gid()
         c = get_container(args.fsid, daemon_type, daemon_id)
         deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
-                      reconfig=args.reconfig)
+                      reconfig=args.reconfig,
+                      ports=daemon_ports)
 
     elif daemon_type == CephIscsi.daemon_type:
         config, keyring = get_config_and_keyring()
@@ -2820,12 +3159,14 @@ def command_deploy():
         c = get_container(args.fsid, daemon_type, daemon_id)
         deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
                       config=config, keyring=keyring,
-                      reconfig=args.reconfig)
+                      reconfig=args.reconfig,
+                      ports=daemon_ports)
     else:
         raise Error("{} not implemented in command_deploy function".format(daemon_type))
 
 ##################################
 
+
 @infer_image
 def command_run():
     # type: () -> int
@@ -2836,6 +3177,7 @@ def command_run():
 
 ##################################
 
+
 @infer_fsid
 @infer_config
 @infer_image
@@ -2865,6 +3207,7 @@ def command_shell():
     container_args = [] # type: List[str]
     mounts = get_container_mounts(args.fsid, daemon_type, daemon_id,
                                   no_config=True if args.config else False)
+    binds = get_container_binds(args.fsid, daemon_type, daemon_id)
     if args.config:
         mounts[pathify(args.config)] = '/etc/ceph/ceph.conf:z'
     if args.keyring:
@@ -2900,6 +3243,7 @@ def command_shell():
         args=[],
         container_args=container_args,
         volume_mounts=mounts,
+        bind_mounts=binds,
         envs=args.env,
         privileged=True)
     command = c.shell_cmd(command)
@@ -2908,6 +3252,7 @@ def command_shell():
 
 ##################################
 
+
 @infer_fsid
 def command_enter():
     # type: () -> int
@@ -2935,6 +3280,7 @@ def command_enter():
 
 ##################################
 
+
 @infer_fsid
 @infer_image
 def command_ceph_volume():
@@ -2977,6 +3323,7 @@ def command_ceph_volume():
 
 ##################################
 
+
 @infer_fsid
 def command_unit():
     # type: () -> None
@@ -2992,6 +3339,7 @@ def command_unit():
 
 ##################################
 
+
 @infer_fsid
 def command_logs():
     # type: () -> None
@@ -3012,6 +3360,7 @@ def command_logs():
 
 ##################################
 
+
 def list_networks():
     # type: () -> Dict[str,List[str]]
 
@@ -3021,10 +3370,17 @@ def list_networks():
     #j = json.loads(out)
     #for x in j:
 
+    res = _list_ipv4_networks()
+    res.update(_list_ipv6_networks())
+    return res
+
+
+def _list_ipv4_networks():
     out, _, _ = call_throws([find_executable('ip'), 'route', 'ls'])
-    return _parse_ip_route(out)
+    return _parse_ipv4_route(out)
 
-def _parse_ip_route(out):
+
+def _parse_ipv4_route(out):
     r = {}  # type: Dict[str,List[str]]
     p = re.compile(r'^(\S+) (.*)scope link (.*)src (\S+)')
     for line in out.splitlines():
@@ -3038,6 +3394,39 @@ def _parse_ip_route(out):
         r[net].append(ip)
     return r
 
+
+def _list_ipv6_networks():
+    routes, _, _ = call_throws([find_executable('ip'), '-6', 'route', 'ls'])
+    ips, _, _ = call_throws([find_executable('ip'), '-6', 'addr', 'ls'])
+    return _parse_ipv6_route(routes, ips)
+
+
+def _parse_ipv6_route(routes, ips):
+    r = {}  # type: Dict[str,List[str]]
+    route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
+    ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
+    for line in routes.splitlines():
+        m = route_p.findall(line)
+        if not m or m[0][0].lower() == 'default':
+            continue
+        net = m[0][0]
+        if net not in r:
+            r[net] = []
+
+    for line in ips.splitlines():
+        m = ip_p.findall(line)
+        if not m:
+            continue
+        ip = m[0][0]
+        # find the network it belongs to
+        net = [n for n in r.keys()
+               if ipaddress.ip_address(unicode(ip)) in ipaddress.ip_network(unicode(n))]
+        if net:
+            r[net[0]].append(ip)
+
+    return r
+
+
 def command_list_networks():
     # type: () -> None
     r = list_networks()
@@ -3045,12 +3434,14 @@ def command_list_networks():
 
 ##################################
 
+
 def command_ls():
     # type: () -> None
     ls = list_daemons(detail=not args.no_detail,
                       legacy_dir=args.legacy_dir)
     print(json.dumps(ls, indent=4))
 
+
 def list_daemons(detail=True, legacy_dir=None):
     # type: (bool, Optional[str]) -> List[Dict[str, str]]
     host_version = None
@@ -3217,8 +3608,7 @@ def command_adopt():
     # type: () -> None
 
     if not args.skip_pull:
-        logger.info('Pulling latest %s container...' % args.image)
-        call_throws([container_path, 'pull', args.image])
+        _pull_image(args.image)
 
     (daemon_type, daemon_id) = args.name.split('.', 1)
 
@@ -3479,6 +3869,7 @@ def command_adopt_prometheus(daemon_id, fsid):
     deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
     update_firewalld(daemon_type)
 
+
 def command_adopt_grafana(daemon_id, fsid):
     # type: (str, str) -> None
 
@@ -3521,7 +3912,6 @@ def command_adopt_grafana(daemon_id, fsid):
     else:
         logger.debug("Skipping ssl, missing cert {} or key {}".format(cert, key))
 
-
     # data - possible custom dashboards/plugins
     data_src = '/var/lib/grafana/'
     data_src = os.path.abspath(args.legacy_dir + data_src)
@@ -3533,6 +3923,7 @@ def command_adopt_grafana(daemon_id, fsid):
     deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
     update_firewalld(daemon_type)
 
+
 def command_adopt_alertmanager(daemon_id, fsid):
     # type: (str, str) -> None
 
@@ -3562,6 +3953,7 @@ def command_adopt_alertmanager(daemon_id, fsid):
     deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
     update_firewalld(daemon_type)
 
+
 def _adjust_grafana_ini(filename):
     # type: (str) -> None
 
@@ -3637,6 +4029,7 @@ def command_rm_daemon():
 
 ##################################
 
+
 def command_rm_cluster():
     # type: () -> None
     if not args.force:
@@ -3720,13 +4113,16 @@ def check_time_sync(enabler=None):
         return False
     return True
 
+
 def command_check_host():
     # type: () -> None
+    global container_path
+
     errors = []
     commands = ['systemctl', 'lvcreate']
 
     if args.docker:
-            container_path = find_program('docker')
+        container_path = find_program('docker')
     else:
         for i in CONTAINER_PREFERENCE:
             try:
@@ -3764,6 +4160,7 @@ def command_check_host():
 
 ##################################
 
+
 def command_prepare_host():
     # type: () -> None
     logger.info('Verifying podman|docker is present...')
@@ -3799,6 +4196,7 @@ def command_prepare_host():
 
 ##################################
 
+
 class CustomValidation(argparse.Action):
 
     def _check_name(self, values):
@@ -3821,6 +4219,7 @@ class CustomValidation(argparse.Action):
 
 ##################################
 
+
 def get_distro():
     # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
     distro = None
@@ -3842,6 +4241,7 @@ def get_distro():
                 distro_codename = val.lower()
     return distro, distro_version, distro_codename
 
+
 class Packager(object):
     def __init__(self, stable=None, version=None, branch=None, commit=None):
         assert \
@@ -3969,6 +4369,7 @@ class Apt(Packager):
             logging.info('Podman did not work.  Falling back to docker...')
             self.install(['docker.io'])
 
+
 class YumDnf(Packager):
     DISTRO_NAMES = {
         'centos': ('centos', 'el'),
@@ -4086,17 +4487,10 @@ class YumDnf(Packager):
         if self.distro_code.startswith('el'):
             logger.info('Enabling EPEL...')
             call_throws([self.tool, 'install', '-y', 'epel-release'])
-        if self.distro_code == 'el8':
-            # we also need Ken's copr repo, at least for now
-            logger.info('Enabling supplementary copr repo ktdreyer/ceph-el8...')
-            call_throws(['dnf', 'copr', 'enable', '-y', 'ktdreyer/ceph-el8'])
 
     def rm_repo(self):
         if os.path.exists(self.repo_path()):
             os.unlink(self.repo_path())
-        if self.distro_code == 'el8':
-            logger.info('Disabling supplementary copr repo ktdreyer/ceph-el8...')
-            call_throws(['dnf', 'copr', 'disable', '-y', 'ktdreyer/ceph-el8'])
 
     def install(self, ls):
         logger.info('Installing packages %s...' % ls)
@@ -4234,16 +4628,19 @@ def command_add_repo():
                           commit=args.dev_commit)
     pkg.add_repo()
 
+
 def command_rm_repo():
     pkg = create_packager()
     pkg.rm_repo()
 
+
 def command_install():
     pkg = create_packager()
     pkg.install(args.packages)
 
 ##################################
 
+
 def _get_parser():
     # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(
@@ -4420,7 +4817,7 @@ def _get_parser():
         help='ceph.keyring to pass through to the container')
     parser_shell.add_argument(
         '--mount', '-m',
-        help='file or directory path that will be mounted in container /mnt')
+        help='mount a file or directory under /mnt in the container')
     parser_shell.add_argument(
         '--env', '-e',
         action='append',
@@ -4538,7 +4935,11 @@ def _get_parser():
     parser_bootstrap.add_argument(
         '--initial-dashboard-password',
         help='Initial password for the initial dashboard user')
-
+    parser_bootstrap.add_argument(
+        '--ssl-dashboard-port',
+        type=int,
+        default = 8443,
+        help='Port number used to connect with dashboard using SSL')
     parser_bootstrap.add_argument(
         '--dashboard-key',
         type=argparse.FileType('r'),
@@ -4560,6 +4961,10 @@ def _get_parser():
         '--ssh-public-key',
         type=argparse.FileType('r'),
         help='SSH public key')
+    parser_bootstrap.add_argument(
+        '--ssh-user',
+        default='root',
+        help='set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users')
 
     parser_bootstrap.add_argument(
         '--skip-mon-network',
@@ -4613,12 +5018,24 @@ def _get_parser():
         '--apply-spec',
         help='Apply cluster spec after bootstrap (copy ssh key, add hosts and apply services)')
 
-
     parser_bootstrap.add_argument(
         '--shared_ceph_folder',
         metavar='CEPH_SOURCE_FOLDER',
         help='Development mode. Several folders in containers are volumes mapped to different sub-folders in the ceph source folder')
 
+    parser_bootstrap.add_argument(
+        '--registry-url',
+        help='url for custom registry')
+    parser_bootstrap.add_argument(
+        '--registry-username',
+        help='username for custom registry')
+    parser_bootstrap.add_argument(
+        '--registry-password',
+        help='password for custom registry')
+    parser_bootstrap.add_argument(
+        '--registry-json',
+        help='json file with custom registry login info (URL, Username, Password)')
+
     parser_deploy = subparsers.add_parser(
         'deploy', help='deploy a daemon')
     parser_deploy.set_defaults(func=command_deploy)
@@ -4650,6 +5067,9 @@ def _get_parser():
         '--skip-firewalld',
         action='store_true',
         help='Do not configure firewalld')
+    parser_deploy.add_argument(
+        '--tcp-ports',
+        help='List of tcp ports to open in the host firewall')
     parser_deploy.add_argument(
         '--reconfig',
         action='store_true',
@@ -4709,8 +5129,28 @@ def _get_parser():
         default=['cephadm'],
         help='packages')
 
+    parser_registry_login = subparsers.add_parser(
+        'registry-login', help='log host into authenticated registry')
+    parser_registry_login.set_defaults(func=command_registry_login)
+    parser_registry_login.add_argument(
+        '--registry-url',
+        help='url for custom registry')
+    parser_registry_login.add_argument(
+        '--registry-username',
+        help='username for custom registry')
+    parser_registry_login.add_argument(
+        '--registry-password',
+        help='password for custom registry')
+    parser_registry_login.add_argument(
+        '--registry-json',
+        help='json file with custom registry login info (URL, Username, Password)')
+    parser_registry_login.add_argument(
+        '--fsid',
+        help='cluster FSID')
+
     return parser
 
+
 def _parse_args(av):
     parser = _get_parser()
     args = parser.parse_args(av)
@@ -4718,10 +5158,11 @@ def _parse_args(av):
         args.command.pop(0)
     return args
 
+
 if __name__ == "__main__":
     # allow argv to be injected
     try:
-        av = injected_argv # type: ignore
+        av = injected_argv  # type: ignore
     except NameError:
         av = sys.argv[1:]
     args = _parse_args(av)