]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/rook.py
import ceph pacific 16.2.5
[ceph.git] / ceph / qa / tasks / rook.py
diff --git a/ceph/qa/tasks/rook.py b/ceph/qa/tasks/rook.py
new file mode 100644 (file)
index 0000000..bdd9e58
--- /dev/null
@@ -0,0 +1,638 @@
+"""
+Rook cluster task
+"""
+import argparse
+import configobj
+import contextlib
+import json
+import logging
+import os
+import yaml
+from io import BytesIO
+
+from tarfile import ReadError
+from tasks.ceph_manager import CephManager
+from teuthology import misc as teuthology
+from teuthology.config import config as teuth_config
+from teuthology.contextutil import safe_while
+from teuthology.orchestra import run
+from teuthology import contextutil
+from tasks.ceph import healthy
+from tasks.cephadm import update_archive_setting
+
+log = logging.getLogger(__name__)
+
+
+def _kubectl(ctx, config, args, **kwargs):
+    cluster_name = config.get('cluster', 'ceph')
+    return ctx.rook[cluster_name].remote.run(
+        args=['kubectl'] + args,
+        **kwargs
+    )
+
+
+def shell(ctx, config):
+    """
+    Run command(s) inside the rook tools container.
+
+      tasks:
+      - kubeadm:
+      - rook:
+      - rook.shell:
+          - ceph -s
+
+    or
+
+      tasks:
+      - kubeadm:
+      - rook:
+      - rook.shell:
+          commands:
+          - ceph -s
+
+    """
+    if isinstance(config, list):
+        config = {'commands': config}
+    for cmd in config.get('commands', []):
+        if isinstance(cmd, str):
+            _shell(ctx, config, cmd.split(' '))
+        else:
+            _shell(ctx, config, cmd)
+
+
+def _shell(ctx, config, args, **kwargs):
+    cluster_name = config.get('cluster', 'ceph')
+    return _kubectl(
+        ctx, config,
+        [
+            '-n', 'rook-ceph',
+            'exec',
+            ctx.rook[cluster_name].toolbox, '--'
+        ] + args,
+        **kwargs
+    )
+
+
+@contextlib.contextmanager
+def rook_operator(ctx, config):
+    cluster_name = config['cluster']
+    rook_branch = config.get('rook_branch', 'master')
+    rook_git_url = config.get('rook_git_url', 'https://github.com/rook/rook')
+
+    log.info(f'Cloning {rook_git_url} branch {rook_branch}')
+    ctx.rook[cluster_name].remote.run(
+        args=[
+            'rm', '-rf', 'rook',
+            run.Raw('&&'),
+            'git',
+            'clone',
+            '--single-branch',
+            '--branch', rook_branch,
+            rook_git_url,
+            'rook',
+        ]
+    )
+
+    # operator.yaml
+    operator_yaml = ctx.rook[cluster_name].remote.read_file(
+        'rook/cluster/examples/kubernetes/ceph/operator.yaml'
+    )
+    rook_image = config.get('rook_image')
+    if rook_image:
+        log.info(f'Patching operator to use image {rook_image}')
+        crs = list(yaml.load_all(operator_yaml, Loader=yaml.FullLoader))
+        assert len(crs) == 2
+        crs[1]['spec']['template']['spec']['containers'][0]['image'] = rook_image
+        operator_yaml = yaml.dump_all(crs)
+    ctx.rook[cluster_name].remote.write_file('operator.yaml', operator_yaml)
+
+    op_job = None
+    try:
+        log.info('Deploying operator')
+        _kubectl(ctx, config, [
+            'create',
+            '-f', 'rook/cluster/examples/kubernetes/ceph/crds.yaml',
+            '-f', 'rook/cluster/examples/kubernetes/ceph/common.yaml',
+            '-f', 'operator.yaml',
+        ])
+
+        # on centos:
+        if teuthology.get_distro(ctx) == 'centos':
+            _kubectl(ctx, config, [
+                '-n', 'rook-ceph',
+                'set', 'env', 'deploy/rook-ceph-operator',
+                'ROOK_HOSTPATH_REQUIRES_PRIVILEGED=true'
+            ])
+
+        # wait for operator
+        op_name = None
+        with safe_while(sleep=10, tries=90, action="wait for operator") as proceed:
+            while not op_name and proceed():
+                p = _kubectl(
+                    ctx, config,
+                    ['-n', 'rook-ceph', 'get', 'pods', '-l', 'app=rook-ceph-operator'],
+                    stdout=BytesIO(),
+                )
+                for line in p.stdout.getvalue().decode('utf-8').strip().splitlines():
+                    name, ready, status, _ = line.split(None, 3)
+                    if status == 'Running':
+                        op_name = name
+                        break
+
+        # log operator output
+        op_job = _kubectl(
+            ctx,
+            config,
+            ['-n', 'rook-ceph', 'logs', '-f', op_name],
+            wait=False,
+            logger=log.getChild('operator'),
+        )
+
+        yield
+
+    except Exception as e:
+        log.exception(e)
+        raise
+
+    finally:
+        log.info('Cleaning up rook operator')
+        _kubectl(ctx, config, [
+            'delete',
+            '-f', 'operator.yaml',
+        ])
+        if False:
+            # don't bother since we'll tear down k8s anyway (and this mysteriously
+            # fails sometimes when deleting some of the CRDs... not sure why!)
+            _kubectl(ctx, config, [
+                'delete',
+                '-f', 'rook/cluster/examples/kubernetes/ceph/common.yaml',
+            ])
+            _kubectl(ctx, config, [
+                'delete',
+                '-f', 'rook/cluster/examples/kubernetes/ceph/crds.yaml',
+            ])
+        ctx.rook[cluster_name].remote.run(args=['rm', '-rf', 'rook', 'operator.yaml'])
+        if op_job:
+            op_job.wait()
+        run.wait(
+            ctx.cluster.run(
+                args=[
+                    'sudo', 'rm', '-rf', '/var/lib/rook'
+                ]
+            )
+        )
+
+
+@contextlib.contextmanager
+def ceph_log(ctx, config):
+    cluster_name = config['cluster']
+
+    log_dir = '/var/lib/rook/rook-ceph/log'
+    update_archive_setting(ctx, 'log', log_dir)
+
+    try:
+        yield
+
+    except Exception:
+        # we need to know this below
+        ctx.summary['success'] = False
+        raise
+
+    finally:
+        log.info('Checking cluster log for badness...')
+        def first_in_ceph_log(pattern, excludes):
+            """
+            Find the first occurrence of the pattern specified in the Ceph log,
+            Returns None if none found.
+
+            :param pattern: Pattern scanned for.
+            :param excludes: Patterns to ignore.
+            :return: First line of text (or None if not found)
+            """
+            args = [
+                'sudo',
+                'egrep', pattern,
+                f'{log_dir}/ceph.log',
+            ]
+            if excludes:
+                for exclude in excludes:
+                    args.extend([run.Raw('|'), 'egrep', '-v', exclude])
+            args.extend([
+                run.Raw('|'), 'head', '-n', '1',
+            ])
+            r = ctx.rook[cluster_name].remote.run(
+                stdout=BytesIO(),
+                args=args,
+            )
+            stdout = r.stdout.getvalue().decode()
+            if stdout:
+                return stdout
+            return None
+
+        if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]',
+                             config.get('log-ignorelist')) is not None:
+            log.warning('Found errors (ERR|WRN|SEC) in cluster log')
+            ctx.summary['success'] = False
+            # use the most severe problem as the failure reason
+            if 'failure_reason' not in ctx.summary:
+                for pattern in ['\[SEC\]', '\[ERR\]', '\[WRN\]']:
+                    match = first_in_ceph_log(pattern, config['log-ignorelist'])
+                    if match is not None:
+                        ctx.summary['failure_reason'] = \
+                            '"{match}" in cluster log'.format(
+                                match=match.rstrip('\n'),
+                            )
+                        break
+
+        if ctx.archive is not None and \
+                not (ctx.config.get('archive-on-error') and ctx.summary['success']):
+            # and logs
+            log.info('Compressing logs...')
+            run.wait(
+                ctx.cluster.run(
+                    args=[
+                        'sudo',
+                        'find',
+                        log_dir,
+                        '-name',
+                        '*.log',
+                        '-print0',
+                        run.Raw('|'),
+                        'sudo',
+                        'xargs',
+                        '-0',
+                        '--no-run-if-empty',
+                        '--',
+                        'gzip',
+                        '--',
+                    ],
+                    wait=False,
+                ),
+            )
+
+            log.info('Archiving logs...')
+            path = os.path.join(ctx.archive, 'remote')
+            try:
+                os.makedirs(path)
+            except OSError:
+                pass
+            for remote in ctx.cluster.remotes.keys():
+                sub = os.path.join(path, remote.name)
+                try:
+                    os.makedirs(sub)
+                except OSError:
+                    pass
+                try:
+                    teuthology.pull_directory(remote, log_dir,
+                                              os.path.join(sub, 'log'))
+                except ReadError:
+                    pass
+
+
+def build_initial_config(ctx, config):
+    path = os.path.join(os.path.dirname(__file__), 'rook-ceph.conf')
+    conf = configobj.ConfigObj(path, file_error=True)
+
+    # overrides
+    for section, keys in config.get('conf',{}).items():
+        for key, value in keys.items():
+            log.info(" override: [%s] %s = %s" % (section, key, value))
+            if section not in conf:
+                conf[section] = {}
+            conf[section][key] = value
+
+    return conf
+
+
+@contextlib.contextmanager
+def rook_cluster(ctx, config):
+    cluster_name = config['cluster']
+
+    # count how many OSDs we'll create
+    num_devs = 0
+    num_hosts = 0
+    for remote in ctx.cluster.remotes.keys():
+        ls = remote.read_file('/scratch_devs').decode('utf-8').strip().splitlines()
+        num_devs += len(ls)
+        num_hosts += 1
+    ctx.rook[cluster_name].num_osds = num_devs
+
+    # config
+    config = build_initial_config(ctx, config)
+    config_fp = BytesIO()
+    config.write(config_fp)
+    log.info(f'Config:\n{config_fp.getvalue()}')
+    _kubectl(ctx, config, ['create', '-f', '-'], stdin=yaml.dump({
+        'apiVersion': 'v1',
+        'kind': 'ConfigMap',
+        'metadata': {
+            'name': 'rook-config-override',
+            'namespace': 'rook-ceph'},
+        'data': {
+            'config': config_fp.getvalue()
+        }
+    }))
+
+    # cluster
+    cluster = {
+        'apiVersion': 'ceph.rook.io/v1',
+        'kind': 'CephCluster',
+        'metadata': {'name': 'rook-ceph', 'namespace': 'rook-ceph'},
+        'spec': {
+            'cephVersion': {
+                'image': ctx.rook[cluster_name].image,
+                'allowUnsupported': True,
+            },
+            'dataDirHostPath': '/var/lib/rook',
+            'skipUpgradeChecks': True,
+            'mgr': {
+                'count': 1,
+                'modules': [
+                    { 'name': 'rook', 'enabled': True },
+                ],
+            },
+            'mon': {
+                'count': num_hosts,
+                'allowMultiplePerNode': True,
+            },
+            'storage': {
+                'storageClassDeviceSets': [
+                    {
+                        'name': 'scratch',
+                        'count': num_devs,
+                        'portable': False,
+                        'volumeClaimTemplates': [
+                            {
+                                'metadata': {'name': 'data'},
+                                'spec': {
+                                    'resources': {
+                                        'requests': {
+                                            'storage': '10Gi'  # <= (lte) the actual PV size
+                                        }
+                                    },
+                                    'storageClassName': 'scratch',
+                                    'volumeMode': 'Block',
+                                    'accessModes': ['ReadWriteOnce'],
+                                },
+                            },
+                        ],
+                    }
+                ],
+            },
+        }
+    }
+    teuthology.deep_merge(cluster['spec'], config.get('spec', {}))
+    
+    cluster_yaml = yaml.dump(cluster)
+    log.info(f'Cluster:\n{cluster_yaml}')
+    try:
+        ctx.rook[cluster_name].remote.write_file('cluster.yaml', cluster_yaml)
+        _kubectl(ctx, config, ['create', '-f', 'cluster.yaml'])
+        yield
+
+    except Exception as e:
+        log.exception(e)
+        raise
+
+    finally:
+        _kubectl(ctx, config, ['delete', '-f', 'cluster.yaml'], check_status=False)
+
+        # wait for cluster to shut down
+        log.info('Waiting for cluster to stop')
+        running = True
+        with safe_while(sleep=5, tries=100, action="wait for teardown") as proceed:
+            while running and proceed():
+                p = _kubectl(
+                    ctx, config,
+                    ['-n', 'rook-ceph', 'get', 'pods'],
+                    stdout=BytesIO(),
+                )
+                running = False
+                for line in p.stdout.getvalue().decode('utf-8').strip().splitlines():
+                    name, ready, status, _ = line.split(None, 3)
+                    if (
+                            name != 'NAME'
+                            and not name.startswith('csi-')
+                            and not name.startswith('rook-ceph-operator-')
+                            and not name.startswith('rook-ceph-tools-')
+                    ):
+                        running = True
+                        break
+
+        _kubectl(
+            ctx, config,
+            ['-n', 'rook-ceph', 'delete', 'configmap', 'rook-config-override'],
+            check_status=False,
+        )
+        ctx.rook[cluster_name].remote.run(args=['rm', '-f', 'cluster.yaml'])
+
+
+@contextlib.contextmanager
+def rook_toolbox(ctx, config):
+    cluster_name = config['cluster']
+    try:
+        _kubectl(ctx, config, [
+            'create',
+            '-f', 'rook/cluster/examples/kubernetes/ceph/toolbox.yaml',
+        ])
+
+        log.info('Waiting for tools container to start')
+        toolbox = None
+        with safe_while(sleep=5, tries=100, action="wait for toolbox") as proceed:
+            while not toolbox and proceed():
+                p = _kubectl(
+                    ctx, config,
+                    ['-n', 'rook-ceph', 'get', 'pods', '-l', 'app=rook-ceph-tools'],
+                    stdout=BytesIO(),
+                )
+                for line in p.stdout.getvalue().decode('utf-8').strip().splitlines():
+                    name, ready, status, _ = line.split(None, 3)
+                    if status == 'Running':
+                        toolbox = name
+                        break
+        ctx.rook[cluster_name].toolbox = toolbox
+        yield
+
+    except Exception as e:
+        log.exception(e)
+        raise
+
+    finally:
+        _kubectl(ctx, config, [
+            'delete',
+            '-f', 'rook/cluster/examples/kubernetes/ceph/toolbox.yaml',
+        ], check_status=False)
+
+
+@contextlib.contextmanager
+def wait_for_osds(ctx, config):
+    cluster_name = config.get('cluster', 'ceph')
+
+    want = ctx.rook[cluster_name].num_osds
+    log.info(f'Waiting for {want} OSDs')
+    with safe_while(sleep=10, tries=90, action="check osd count") as proceed:
+        while proceed():
+            p = _shell(ctx, config, ['ceph', 'osd', 'stat', '-f', 'json'],
+                       stdout=BytesIO(),
+                       check_status=False)
+            if p.exitstatus == 0:
+                r = json.loads(p.stdout.getvalue().decode('utf-8'))
+                have = r.get('num_up_osds', 0)
+                if have == want:
+                    break
+                log.info(f' have {have}/{want} OSDs')
+
+    yield
+
+
+@contextlib.contextmanager
+def ceph_config_keyring(ctx, config):
+    # get config and push to hosts
+    log.info('Distributing ceph config and client.admin keyring')
+    p = _shell(ctx, config, ['cat', '/etc/ceph/ceph.conf'], stdout=BytesIO())
+    conf = p.stdout.getvalue()
+    p = _shell(ctx, config, ['cat', '/etc/ceph/keyring'], stdout=BytesIO())
+    keyring = p.stdout.getvalue()
+    ctx.cluster.run(args=['sudo', 'mkdir', '-p', '/etc/ceph'])
+    for remote in ctx.cluster.remotes.keys():
+        remote.write_file(
+            '/etc/ceph/ceph.conf',
+            conf,
+            sudo=True,
+        )
+        remote.write_file(
+            '/etc/ceph/keyring',
+            keyring,
+            sudo=True,
+        )
+
+    try:
+        yield
+
+    except Exception as e:
+        log.exception(e)
+        raise
+
+    finally:
+        log.info('Cleaning up config and client.admin keyring')
+        ctx.cluster.run(args=[
+            'sudo', 'rm', '-f',
+            '/etc/ceph/ceph.conf',
+            '/etc/ceph/ceph.client.admin.keyring'
+        ])
+
+
+@contextlib.contextmanager
+def ceph_clients(ctx, config):
+    cluster_name = config['cluster']
+
+    log.info('Setting up client nodes...')
+    clients = ctx.cluster.only(teuthology.is_type('client', cluster_name))
+    for remote, roles_for_host in clients.remotes.items():
+        for role in teuthology.cluster_roles_of_type(roles_for_host, 'client',
+                                                     cluster_name):
+            name = teuthology.ceph_role(role)
+            client_keyring = '/etc/ceph/{0}.{1}.keyring'.format(cluster_name,
+                                                                name)
+            r = _shell(ctx, config,
+                args=[
+                    'ceph', 'auth',
+                    'get-or-create', name,
+                    'mon', 'allow *',
+                    'osd', 'allow *',
+                    'mds', 'allow *',
+                    'mgr', 'allow *',
+                ],
+                stdout=BytesIO(),
+            )
+            keyring = r.stdout.getvalue()
+            remote.write_file(client_keyring, keyring, sudo=True, mode='0644')
+    yield
+
+
+@contextlib.contextmanager
+def task(ctx, config):
+    """
+    Deploy rook-ceph cluster
+
+      tasks:
+      - kubeadm:
+      - rook:
+          branch: wip-foo
+          spec:
+            mon:
+              count: 1
+
+    The spec item is deep-merged against the cluster.yaml.  The branch, sha1, or
+    image items are used to determine the Ceph container image.
+    """
+    if not config:
+        config = {}
+    assert isinstance(config, dict), \
+        "task only supports a dictionary for configuration"
+
+    log.info('Rook start')
+
+    overrides = ctx.config.get('overrides', {})
+    teuthology.deep_merge(config, overrides.get('ceph', {}))
+    teuthology.deep_merge(config, overrides.get('rook', {}))
+    log.info('Config: ' + str(config))
+
+    # set up cluster context
+    if not hasattr(ctx, 'rook'):
+        ctx.rook = {}
+    if 'cluster' not in config:
+        config['cluster'] = 'ceph'
+    cluster_name = config['cluster']
+    if cluster_name not in ctx.rook:
+        ctx.rook[cluster_name] = argparse.Namespace()
+
+    ctx.rook[cluster_name].remote = list(ctx.cluster.remotes.keys())[0]
+
+    # image
+    teuth_defaults = teuth_config.get('defaults', {})
+    cephadm_defaults = teuth_defaults.get('cephadm', {})
+    containers_defaults = cephadm_defaults.get('containers', {})
+    container_image_name = containers_defaults.get('image', None)
+    if 'image' in config:
+        ctx.rook[cluster_name].image = config.get('image')
+    else:
+        sha1 = config.get('sha1')
+        flavor = config.get('flavor', 'default')
+        if sha1:
+            if flavor == "crimson":
+                ctx.rook[cluster_name].image = container_image_name + ':' + sha1 + '-' + flavor
+            else:
+                ctx.rook[cluster_name].image = container_image_name + ':' + sha1
+        else:
+            # hmm, fall back to branch?
+            branch = config.get('branch', 'master')
+            ctx.rook[cluster_name].image = container_image_name + ':' + branch
+    log.info('Ceph image is %s' % ctx.rook[cluster_name].image)
+    
+    with contextutil.nested(
+            lambda: rook_operator(ctx, config),
+            lambda: ceph_log(ctx, config),
+            lambda: rook_cluster(ctx, config),
+            lambda: rook_toolbox(ctx, config),
+            lambda: wait_for_osds(ctx, config),
+            lambda: ceph_config_keyring(ctx, config),
+            lambda: ceph_clients(ctx, config),
+    ):
+        if not hasattr(ctx, 'managers'):
+            ctx.managers = {}
+        ctx.managers[cluster_name] = CephManager(
+            ctx.rook[cluster_name].remote,
+            ctx=ctx,
+            logger=log.getChild('ceph_manager.' + cluster_name),
+            cluster=cluster_name,
+            rook=True,
+        )
+        try:
+            if config.get('wait-for-healthy', True):
+                healthy(ctx=ctx, config=config)
+            log.info('Rook complete, yielding')
+            yield
+
+        finally:
+            log.info('Tearing down rook')