]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cephadm/cephadm
import ceph 16.2.7
[ceph.git] / ceph / src / cephadm / cephadm
index 13f0f6e6a59ae8147d997d148e73224f7c90bf63..9fe5fb3763b382935b509e31a04712884b56c338 100755 (executable)
@@ -31,7 +31,7 @@ from contextlib import redirect_stdout
 import ssl
 from enum import Enum
 
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set
+from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable
 
 import re
 import uuid
@@ -175,6 +175,9 @@ class ContainerEngine:
     def EXE(cls) -> str:
         raise NotImplementedError()
 
+    def __str__(self) -> str:
+        return f'{self.EXE} ({self.path})'
+
 
 class Podman(ContainerEngine):
     EXE = 'podman'
@@ -193,6 +196,10 @@ class Podman(ContainerEngine):
         out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}'])
         self._version = _parse_podman_version(out)
 
+    def __str__(self) -> str:
+        version = '.'.join(map(str, self.version))
+        return f'{self.EXE} ({self.path}) version {version}'
+
 
 class Docker(ContainerEngine):
     EXE = 'docker'
@@ -207,7 +214,7 @@ logging_config = {
     'disable_existing_loggers': True,
     'formatters': {
         'cephadm': {
-            'format': '%(asctime)s %(levelname)s %(message)s'
+            'format': '%(asctime)s %(thread)x %(levelname)s %(message)s'
         },
     },
     'handlers': {
@@ -549,7 +556,7 @@ class CephIscsi(object):
         mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
         mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
         mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
-        mounts[log_dir] = '/var/log/rbd-target-api:z'
+        mounts[log_dir] = '/var/log:z'
         mounts['/dev'] = '/dev'
         return mounts
 
@@ -1286,28 +1293,28 @@ if sys.version_info < (3, 8):
 
         def __init__(self) -> None:
             self._pid_counter = itertools.count(0)
-            self._threads = {}
+            self._threads: Dict[Any, Any] = {}
 
-        def is_active(self):
+        def is_active(self) -> bool:
             return True
 
-        def close(self):
+        def close(self) -> None:
             self._join_threads()
 
-        def _join_threads(self):
+        def _join_threads(self) -> None:
             """Internal: Join all non-daemon threads"""
             threads = [thread for thread in list(self._threads.values())
                        if thread.is_alive() and not thread.daemon]
             for thread in threads:
                 thread.join()
 
-        def __enter__(self):
+        def __enter__(self) -> Any:
             return self
 
-        def __exit__(self, exc_type, exc_val, exc_tb):
+        def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
             pass
 
-        def __del__(self, _warn=warnings.warn):
+        def __del__(self, _warn: Any = warnings.warn) -> None:
             threads = [thread for thread in list(self._threads.values())
                        if thread.is_alive()]
             if threads:
@@ -1315,7 +1322,7 @@ if sys.version_info < (3, 8):
                       ResourceWarning,
                       source=self)
 
-        def add_child_handler(self, pid, callback, *args):
+        def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None:
             loop = events.get_event_loop()
             thread = threading.Thread(target=self._do_waitpid,
                                       name=f'waitpid-{next(self._pid_counter)}',
@@ -1324,16 +1331,16 @@ if sys.version_info < (3, 8):
             self._threads[pid] = thread
             thread.start()
 
-        def remove_child_handler(self, pid):
+        def remove_child_handler(self, pid: Any) -> bool:
             # asyncio never calls remove_child_handler() !!!
             # The method is no-op but is implemented because
             # abstract base classe requires it
             return True
 
-        def attach_loop(self, loop):
+        def attach_loop(self, loop: Any) -> None:
             pass
 
-        def _do_waitpid(self, loop, expected_pid, callback, args):
+        def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None:
             assert expected_pid > 0
 
             try:
@@ -1408,8 +1415,6 @@ def call(ctx: CephadmContext,
         prefix += ': '
     timeout = timeout or ctx.timeout
 
-    logger.debug('Running command: %s' % ' '.join(command))
-
     async def tee(reader: asyncio.StreamReader) -> str:
         collected = StringIO()
         async for line in reader:
@@ -1994,13 +1999,12 @@ def find_container_engine(ctx: CephadmContext) -> Optional[ContainerEngine]:
         for i in CONTAINER_PREFERENCE:
             try:
                 return i()
-            except Exception as e:
-                logger.debug('Could not locate %s: %s' % (i.EXE, e))
+            except Exception:
+                pass
     return None
 
 
-def check_container_engine(ctx):
-    # type: (CephadmContext) -> None
+def check_container_engine(ctx: CephadmContext) -> ContainerEngine:
     engine = ctx.container_engine
     if not isinstance(engine, CONTAINER_PREFERENCE):
         # See https://github.com/python/mypy/issues/8993
@@ -2010,6 +2014,7 @@ def check_container_engine(ctx):
         engine.get_version(ctx)
         if engine.version < MIN_PODMAN_VERSION:
             raise Error('podman version %d.%d.%d or later is required' % MIN_PODMAN_VERSION)
+    return engine
 
 
 def get_unit_name(fsid, daemon_type, daemon_id=None):
@@ -2310,6 +2315,8 @@ def get_config_and_keyring(ctx):
         d = get_parm(ctx.config_json)
         config = d.get('config')
         keyring = d.get('keyring')
+        if config and keyring:
+            return config, keyring
 
     if 'config' in ctx and ctx.config:
         try:
@@ -2806,7 +2813,7 @@ def deploy_daemon_units(
             f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
             ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
             tcmu_container = ceph_iscsi.get_tcmu_runner_container()
-            _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
+            _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runner container', background=True)
 
         _write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id)))
 
@@ -3433,12 +3440,12 @@ def command_version(ctx):
 def command_pull(ctx):
     # type: (CephadmContext) -> int
 
-    _pull_image(ctx, ctx.image)
+    _pull_image(ctx, ctx.image, ctx.insecure)
     return command_inspect_image(ctx)
 
 
-def _pull_image(ctx, image):
-    # type: (CephadmContext, str) -> None
+def _pull_image(ctx, image, insecure=False):
+    # type: (CephadmContext, str, bool) -> None
     logger.info('Pulling container image %s...' % image)
 
     ignorelist = [
@@ -3448,8 +3455,12 @@ def _pull_image(ctx, image):
     ]
 
     cmd = [ctx.container_engine.path, 'pull', image]
-    if isinstance(ctx.container_engine, Podman) and os.path.exists('/etc/ceph/podman-auth.json'):
-        cmd.append('--authfile=/etc/ceph/podman-auth.json')
+    if isinstance(ctx.container_engine, Podman):
+        if insecure:
+            cmd.append('--tls-verify=false')
+
+        if os.path.exists('/etc/ceph/podman-auth.json'):
+            cmd.append('--authfile=/etc/ceph/podman-auth.json')
     cmd_str = ' '.join(cmd)
 
     for sleep_secs in [1, 4, 25]:
@@ -3458,12 +3469,12 @@ def _pull_image(ctx, image):
             return
 
         if not any(pattern in err for pattern in ignorelist):
-            raise RuntimeError('Failed command: %s' % cmd_str)
+            raise Error('Failed command: %s' % cmd_str)
 
         logger.info('`%s` failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs))
         time.sleep(sleep_secs)
 
-    raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str)
+    raise Error('Failed command: %s: maximum retries reached' % cmd_str)
 
 ##################################
 
@@ -4036,10 +4047,10 @@ def prepare_bootstrap_config(
         logger.info('Adjusting default settings to suit single-host cluster...')
         # replicate across osds, not hosts
         if (
-                not cp.has_option('global', 'osd_crush_choose_leaf_type')
-                and not cp.has_option('global', 'osd crush choose leaf type')
+                not cp.has_option('global', 'osd_crush_chooseleaf_type')
+                and not cp.has_option('global', 'osd crush chooseleaf type')
         ):
-            cp.set('global', 'osd_crush_choose_leaf_type', '0')
+            cp.set('global', 'osd_crush_chooseleaf_type', '0')
         # replica 2x
         if (
                 not cp.has_option('global', 'osd_pool_default_size')
@@ -4125,6 +4136,65 @@ def finish_bootstrap_config(
     pass
 
 
+# funcs to process spec file for apply spec
+def _parse_yaml_docs(f: Iterable[str]) -> List[List[str]]:
+    docs = []
+    current_doc = []  # type: List[str]
+    for line in f:
+        if '---' in line:
+            if current_doc:
+                docs.append(current_doc)
+            current_doc = []
+        else:
+            current_doc.append(line.rstrip())
+    if current_doc:
+        docs.append(current_doc)
+    return docs
+
+
+def _parse_yaml_obj(doc: List[str]) -> Dict[str, str]:
+    # note: this only parses the first layer of yaml
+    obj = {}  # type: Dict[str, str]
+    current_key = ''
+    for line in doc:
+        if line.startswith(' '):
+            obj[current_key] += line.strip()
+        elif line.endswith(':'):
+            current_key = line.strip(':')
+            obj[current_key] = ''
+        else:
+            current_key, val = line.split(':')
+            obj[current_key] = val.strip()
+    return obj
+
+
+def parse_yaml_objs(f: Iterable[str]) -> List[Dict[str, str]]:
+    objs = []
+    for d in _parse_yaml_docs(f):
+        objs.append(_parse_yaml_obj(d))
+    return objs
+
+
+def _distribute_ssh_keys(ctx: CephadmContext, host_spec: Dict[str, str], bootstrap_hostname: str) -> int:
+    # copy ssh key to hosts in host spec (used for apply spec)
+    ssh_key = '/etc/ceph/ceph.pub'
+    if ctx.ssh_public_key:
+        ssh_key = ctx.ssh_public_key.name
+
+    if bootstrap_hostname != host_spec['hostname']:
+        if 'addr' in host_spec:
+            addr = host_spec['addr']
+        else:
+            addr = host_spec['hostname']
+        out, err, code = call(ctx, ['sudo', '-u', ctx.ssh_user, 'ssh-copy-id', '-f', '-i', ssh_key, '-o StrictHostKeyChecking=no', '%s@%s' % (ctx.ssh_user, addr)])
+        if code:
+            logger.info('\nCopying ssh key to host %s at address %s failed!\n' % (host_spec['hostname'], addr))
+            return 1
+        else:
+            logger.info('Added ssh key to host %s at address %s\n' % (host_spec['hostname'], addr))
+    return 0
+
+
 @default_image
 def command_bootstrap(ctx):
     # type: (CephadmContext) -> int
@@ -4339,25 +4409,22 @@ def command_bootstrap(ctx):
 
     if ctx.apply_spec:
         logger.info('Applying %s to cluster' % ctx.apply_spec)
-
+        # copy ssh key to hosts in spec file
         with open(ctx.apply_spec) as f:
-            for line in f:
-                if 'hostname:' in line:
-                    line = line.replace('\n', '')
-                    split = line.split(': ')
-                    if split[1] != hostname:
-                        logger.info('Adding ssh key to %s' % split[1])
-
-                        ssh_key = '/etc/ceph/ceph.pub'
-                        if ctx.ssh_public_key:
-                            ssh_key = ctx.ssh_public_key.name
-                        out, err, code = call_throws(ctx, ['sudo', '-u', ctx.ssh_user, 'ssh-copy-id', '-f', '-i', ssh_key, '-o StrictHostKeyChecking=no', '%s@%s' % (ctx.ssh_user, split[1])])
+            try:
+                for spec in parse_yaml_objs(f):
+                    if spec.get('service_type') == 'host':
+                        _distribute_ssh_keys(ctx, spec, hostname)
+            except ValueError:
+                logger.info('Unable to parse %s succesfully' % ctx.apply_spec)
 
         mounts = {}
-        mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:z'
-
-        out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts)
-        logger.info(out)
+        mounts[pathify(ctx.apply_spec)] = '/tmp/spec.yml:ro'
+        try:
+            out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts)
+            logger.info(out)
+        except Exception:
+            logger.info('\nApplying %s to cluster failed!\n' % ctx.apply_spec)
 
     logger.info('You can access the Ceph CLI with:\n\n'
                 '\tsudo %s shell --fsid %s -c %s -k %s\n' % (
@@ -4631,10 +4698,12 @@ def command_shell(ctx):
             mount = pathify(split_src_dst[0])
             filename = os.path.basename(split_src_dst[0])
             if len(split_src_dst) > 1:
-                dst = split_src_dst[1] + ':z' if len(split_src_dst) == 3 else split_src_dst[1]
+                dst = split_src_dst[1]
+                if len(split_src_dst) == 3:
+                    dst = '{}:{}'.format(dst, split_src_dst[2])
                 mounts[mount] = dst
             else:
-                mounts[mount] = '/mnt/{}:z'.format(filename)
+                mounts[mount] = '/mnt/{}'.format(filename)
     if ctx.command:
         command = ctx.command
     else:
@@ -5770,14 +5839,12 @@ def check_time_sync(ctx, enabler=None):
 
 
 def command_check_host(ctx: CephadmContext) -> None:
-    container_path = ctx.container_engine.path
-
     errors = []
     commands = ['systemctl', 'lvcreate']
 
     try:
-        check_container_engine(ctx)
-        logger.info('podman|docker (%s) is present' % container_path)
+        engine = check_container_engine(ctx)
+        logger.info(f'{engine} is present')
     except Error as e:
         errors.append(str(e))
 
@@ -6104,6 +6171,7 @@ class YumDnf(Packager):
         'rocky': ('centos', 'el'),
         'almalinux': ('centos', 'el'),
         'fedora': ('fedora', 'fc'),
+        'mariner': ('mariner', 'cm'),
     }
 
     def __init__(self, ctx: CephadmContext,
@@ -6120,6 +6188,8 @@ class YumDnf(Packager):
         if (self.distro_code == 'fc' and self.major >= 30) or \
            (self.distro_code == 'el' and self.major >= 8):
             self.tool = 'dnf'
+        elif (self.distro_code == 'cm'):
+            self.tool = 'tdnf'
         else:
             self.tool = 'yum'
 
@@ -6494,6 +6564,7 @@ class HostFacts():
     _disk_vendor_workarounds = {
         '0x1af4': 'Virtio Block Device'
     }
+    _excluded_block_devices = ('sr', 'zram', 'dm-')
 
     def __init__(self, ctx: CephadmContext):
         self.ctx: CephadmContext = ctx
@@ -6533,7 +6604,7 @@ class HostFacts():
         # type: () -> List[str]
         """Determine the list of block devices by looking at /sys/block"""
         return [dev for dev in os.listdir('/sys/block')
-                if not dev.startswith('dm')]
+                if not dev.startswith(HostFacts._excluded_block_devices)]
 
     def _get_devs_by_type(self, rota='0'):
         # type: (str) -> List[str]
@@ -7680,27 +7751,31 @@ def command_exporter(ctx: CephadmContext) -> None:
 ##################################
 
 
-def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool:
+def systemd_target_state(ctx: CephadmContext, target_name: str, subsystem: str = 'ceph') -> bool:
     # TODO: UNITTEST
     return os.path.exists(
         os.path.join(
-            UNIT_DIR,
+            ctx.unit_dir,
             f'{subsystem}.target.wants',
             target_name
         )
     )
 
 
+def target_exists(ctx: CephadmContext) -> bool:
+    return os.path.exists(ctx.unit_dir + '/ceph.target')
+
+
 @infer_fsid
 def command_maintenance(ctx: CephadmContext) -> str:
     if not ctx.fsid:
-        raise Error('must pass --fsid to specify cluster')
+        raise Error('failed - must pass --fsid to specify cluster')
 
     target = f'ceph-{ctx.fsid}.target'
 
     if ctx.maintenance_action.lower() == 'enter':
         logger.info('Requested to place host into maintenance')
-        if systemd_target_state(target):
+        if systemd_target_state(ctx, target):
             _out, _err, code = call(ctx,
                                     ['systemctl', 'disable', target],
                                     verbosity=CallVerbosity.DEBUG)
@@ -7723,8 +7798,14 @@ def command_maintenance(ctx: CephadmContext) -> str:
 
     else:
         logger.info('Requested to exit maintenance state')
+        # if we've never deployed a daemon on this host there will be no systemd
+        # target to disable so attempting a disable will fail. We still need to
+        # return success here or host will be permanently stuck in maintenance mode
+        # as no daemons can be deployed so no systemd target will ever exist to disable.
+        if not target_exists(ctx):
+            return 'skipped - systemd target not present on this host. Host removed from maintenance mode.'
         # exit maintenance request
-        if not systemd_target_state(target):
+        if not systemd_target_state(ctx, target):
             _out, _err, code = call(ctx,
                                     ['systemctl', 'enable', target],
                                     verbosity=CallVerbosity.DEBUG)
@@ -7813,6 +7894,11 @@ def _get_parser():
     parser_pull = subparsers.add_parser(
         'pull', help='pull latest image version')
     parser_pull.set_defaults(func=command_pull)
+    parser_pull.add_argument(
+        '--insecure',
+        action='store_true',
+        help=argparse.SUPPRESS,
+    )
 
     parser_inspect_image = subparsers.add_parser(
         'inspect-image', help='inspect local container image')
@@ -8438,7 +8524,7 @@ def cephadm_init(args: List[str]) -> CephadmContext:
         for handler in logger.handlers:
             if handler.name == 'console':
                 handler.setLevel(logging.DEBUG)
-
+    logger.debug('%s\ncephadm %s' % ('-' * 80, args))
     return ctx
 
 
@@ -8461,7 +8547,13 @@ def main() -> None:
         # podman or docker?
         ctx.container_engine = find_container_engine(ctx)
         if ctx.func not in \
-                [command_check_host, command_prepare_host, command_add_repo, command_install]:
+                [
+                    command_check_host,
+                    command_prepare_host,
+                    command_add_repo,
+                    command_rm_repo,
+                    command_install
+                ]:
             check_container_engine(ctx)
         # command handler
         r = ctx.func(ctx)