]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/ceph_manager.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / ceph_manager.py
index dc7ff02e50c66d85b85e5bdfa8617abdc508f855..e67ca06ddf3513df5d8879af9ab647d0ff9f727b 100644 (file)
@@ -13,9 +13,10 @@ import logging
 import threading
 import traceback
 import os
-import six
+import shlex
 
 from io import BytesIO, StringIO
+from subprocess import DEVNULL
 from teuthology import misc as teuthology
 from tasks.scrub import Scrubber
 from tasks.util.rados import cmd_erasure_code_profile
@@ -25,12 +26,7 @@ from teuthology.orchestra.remote import Remote
 from teuthology.orchestra import run
 from teuthology.exceptions import CommandFailedError
 from tasks.thrasher import Thrasher
-from six import StringIO
 
-try:
-    from subprocess import DEVNULL # py3k
-except ImportError:
-    DEVNULL = open(os.devnull, 'r+')  # type: ignore
 
 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
 
@@ -38,7 +34,6 @@ log = logging.getLogger(__name__)
 
 # this is for cephadm clusters
 def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
-    testdir = teuthology.get_testdir(ctx)
     extra_args = []
     if name:
         extra_args = ['-n', name]
@@ -73,6 +68,63 @@ def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
     teuthology.feed_many_stdins_and_close(conf_fp, writes)
     run.wait(writes)
 
+def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
+    """
+    Build a command line for running valgrind.
+
+    testdir - test results directory
+    name - name of daemon (for naming hte log file)
+    preamble - stuff we should run before valgrind
+    v - valgrind arguments
+    """
+    if v is None:
+        return preamble
+    if not isinstance(v, list):
+        v = [v]
+
+    # https://tracker.ceph.com/issues/44362
+    preamble.extend([
+        'env', 'OPENSSL_ia32cap=~0x1000000000000000',
+    ])
+
+    val_path = '/var/log/ceph/valgrind'
+    if '--tool=memcheck' in v or '--tool=helgrind' in v:
+        extra_args = [
+            'valgrind',
+            '--trace-children=no',
+            '--child-silent-after-fork=yes',
+            '--soname-synonyms=somalloc=*tcmalloc*',
+            '--num-callers=50',
+            '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
+            '--xml=yes',
+            '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
+            '--time-stamp=yes',
+            '--vgdb=yes',
+        ]
+    else:
+        extra_args = [
+            'valgrind',
+            '--trace-children=no',
+            '--child-silent-after-fork=yes',
+            '--soname-synonyms=somalloc=*tcmalloc*',
+            '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
+            '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
+            '--time-stamp=yes',
+            '--vgdb=yes',
+        ]
+    if exit_on_first_error:
+        extra_args.extend([
+            # at least Valgrind 3.14 is required
+            '--exit-on-first-error=yes',
+            '--error-exitcode=42',
+        ])
+    args = []
+    if cd:
+        args += ['cd', testdir, run.Raw('&&')]
+    args += preamble + extra_args + v
+    log.debug('running %s under valgrind with args %s', name, args)
+    return args
+
 
 def mount_osd_data(ctx, remote, cluster, osd):
     """
@@ -244,6 +296,22 @@ class OSDThrasher(Thrasher):
                 stdout=StringIO(),
                 stderr=StringIO())
 
+    def run_ceph_bluestore_tool(self, remote, osd, cmd):
+        if self.ceph_manager.cephadm:
+            return shell(
+                self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
+                args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
+                name=osd,
+                wait=True, check_status=False,
+                stdout=StringIO(),
+                stderr=StringIO())
+        else:
+            return remote.run(
+                args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
+                wait=True, check_status=False,
+                stdout=StringIO(),
+                stderr=StringIO())
+
     def kill_osd(self, osd=None, mark_down=False, mark_out=False):
         """
         :param osd: Osd to be killed.
@@ -958,6 +1026,113 @@ class OSDThrasher(Thrasher):
             self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
                                      check_status=True, timeout=30, stdout=DEVNULL)
 
+
+    def generate_random_sharding(self):
+        prefixes = [
+            'm','O','P','L'
+        ]
+        new_sharding = ''
+        for prefix in prefixes:
+            choose = random.choice([False, True])
+            if not choose:
+                continue
+            if new_sharding != '':
+                new_sharding = new_sharding + ' '
+            columns = random.randint(1, 5)
+            do_hash = random.choice([False, True])
+            if do_hash:
+                low_hash = random.choice([0, 5, 8])
+                do_high_hash = random.choice([False, True])
+                if do_high_hash:
+                    high_hash = random.choice([8, 16, 30]) + low_hash
+                    new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
+                else:
+                    new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
+            else:
+                if columns == 1:
+                    new_sharding = new_sharding + prefix
+                else:
+                    new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
+        return new_sharding
+
+    def test_bluestore_reshard_action(self):
+        """
+        Test if resharding of bluestore works properly.
+        If bluestore is not used, or bluestore is in version that
+        does not support sharding, skip.
+        """
+
+        osd = random.choice(self.dead_osds)
+        remote = self.ceph_manager.find_remote('osd', osd)
+        FSPATH = self.ceph_manager.get_filepath()
+
+        prefix = [
+                '--no-mon-config',
+                '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
+                '--log-level=10',
+                '--path', FSPATH.format(id=osd)
+            ]
+
+        # sanity check if bluestore-tool accessible
+        self.log('checking if target objectstore is bluestore on osd.%s' % osd)
+        cmd = prefix + [
+            'show-label'
+            ]
+        proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+        if proc.exitstatus != 0:
+            raise Exception("ceph-bluestore-tool access failed.")
+
+        # check if sharding is possible
+        self.log('checking if target bluestore supports sharding on osd.%s' % osd)
+        cmd = prefix + [
+            'show-sharding'
+            ]
+        proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+        if proc.exitstatus != 0:
+            self.log("Unable to test resharding, "
+                     "ceph-bluestore-tool does not support it.")
+            return
+
+        # now go for reshard to something else
+        self.log('applying new sharding to bluestore on osd.%s' % osd)
+        new_sharding = self.config.get('bluestore_new_sharding','random')
+
+        if new_sharding == 'random':
+            self.log('generate random sharding')
+            new_sharding = self.generate_random_sharding()
+
+        self.log("applying new sharding: " + new_sharding)
+        cmd = prefix + [
+            '--sharding', new_sharding,
+            'reshard'
+            ]
+        proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+        if proc.exitstatus != 0:
+            raise Exception("ceph-bluestore-tool resharding failed.")
+
+        # now do fsck to
+        self.log('running fsck to verify new sharding on osd.%s' % osd)
+        cmd = prefix + [
+            'fsck'
+            ]
+        proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+        if proc.exitstatus != 0:
+            raise Exception("ceph-bluestore-tool fsck failed.")
+        self.log('resharding successfully completed')
+
+    def test_bluestore_reshard(self):
+        """
+        1) kills an osd
+        2) reshards bluestore on killed osd
+        3) revives the osd
+        """
+        self.log('test_bluestore_reshard started')
+        self.kill_osd(mark_down=True, mark_out=True)
+        self.test_bluestore_reshard_action()
+        self.revive_osd()
+        self.log('test_bluestore_reshard completed')
+
+
     def test_map_discontinuity(self):
         """
         1) Allows the osds to recover
@@ -1059,6 +1234,13 @@ class OSDThrasher(Thrasher):
                  self.config.get('chance_inject_pause_long', 0),)]:
                 actions.append(scenario)
 
+        # only consider resharding if objectstore is bluestore
+        cluster_name = self.ceph_manager.cluster
+        cluster = self.ceph_manager.ctx.ceph[cluster_name]
+        if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
+            actions.append((self.test_bluestore_reshard,
+                            self.config.get('chance_bluestore_reshard', 0),))
+
         total = sum([y for (x, y) in actions])
         val = random.uniform(0, total)
         for (action, prob) in actions:
@@ -1304,7 +1486,7 @@ class CephManager:
     """
 
     def __init__(self, controller, ctx=None, config=None, logger=None,
-                 cluster='ceph', cephadm=False):
+                 cluster='ceph', cephadm=False) -> None:
         self.lock = threading.RLock()
         self.ctx = ctx
         self.config = config
@@ -1332,61 +1514,51 @@ class CephManager:
             except CommandFailedError:
                 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
 
-    def raw_cluster_cmd(self, *args):
+    def ceph(self, cmd, **kwargs):
         """
-        Start ceph on a raw cluster.  Return count
+        Simple Ceph admin command wrapper around run_cluster_cmd.
+        """
+
+        kwargs.pop('args', None)
+        args = shlex.split(cmd)
+        stdout = kwargs.pop('stdout', StringIO())
+        stderr = kwargs.pop('stderr', StringIO())
+        return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
+
+    def run_cluster_cmd(self, **kwargs):
+        """
+        Run a Ceph command and return the object representing the process
+        for the command.
+
+        Accepts arguments same as that of teuthology.orchestra.run.run()
         """
         if self.cephadm:
-            proc = shell(self.ctx, self.cluster, self.controller,
-                         args=['ceph'] + list(args),
-                         stdout=BytesIO())
-        else:
-            testdir = teuthology.get_testdir(self.ctx)
-            ceph_args = [
-                'sudo',
-                'adjust-ulimits',
-                'ceph-coverage',
-                '{tdir}/archive/coverage'.format(tdir=testdir),
-                'timeout',
-                '120',
-                'ceph',
-                '--cluster',
-                self.cluster,
-                '--log-early',
-            ]
-            ceph_args.extend(args)
-            proc = self.controller.run(
-                args=ceph_args,
-                stdout=BytesIO(),
-            )
-        return six.ensure_str(proc.stdout.getvalue())
+            return shell(self.ctx, self.cluster, self.controller,
+                         args=['ceph'] + list(kwargs['args']),
+                         stdout=StringIO(),
+                         check_status=kwargs.get('check_status', True))
+
+        testdir = teuthology.get_testdir(self.ctx)
+        prefix = ['sudo', 'adjust-ulimits', 'ceph-coverage',
+                  f'{testdir}/archive/coverage', 'timeout', '120', 'ceph',
+                  '--cluster', self.cluster]
+        kwargs['args'] = prefix + list(kwargs['args'])
+        return self.controller.run(**kwargs)
+
+    def raw_cluster_cmd(self, *args, **kwargs) -> str:
+        """
+        Start ceph on a raw cluster.  Return count
+        """
+        stdout = kwargs.pop('stdout', StringIO())
+        p = self.run_cluster_cmd(args=args, stdout=stdout, **kwargs)
+        return p.stdout.getvalue()
 
     def raw_cluster_cmd_result(self, *args, **kwargs):
         """
         Start ceph on a cluster.  Return success or failure information.
         """
-        if self.cephadm:
-            proc = shell(self.ctx, self.cluster, self.controller,
-                         args=['ceph'] + list(args),
-                         check_status=False)
-        else:
-            testdir = teuthology.get_testdir(self.ctx)
-            ceph_args = [
-                'sudo',
-                'adjust-ulimits',
-                'ceph-coverage',
-                '{tdir}/archive/coverage'.format(tdir=testdir),
-                'timeout',
-                '120',
-                'ceph',
-                '--cluster',
-                self.cluster,
-            ]
-            ceph_args.extend(args)
-            kwargs['args'] = ceph_args
-            kwargs['check_status'] = False
-            proc = self.controller.run(**kwargs)
-        return proc.exitstatus
+        kwargs['args'], kwargs['check_status'] = args, False
+        return self.run_cluster_cmd(**kwargs).exitstatus
 
     def run_ceph_w(self, watch_channel=None):
         """
@@ -1466,7 +1638,7 @@ class CephManager:
                         wait for all specified osds, but some of them could be
                         moved out of osdmap, so we cannot get their updated
                         stat seq from monitor anymore. in that case, you need
-                        to pass a blacklist.
+                        to pass a blocklist.
         :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
                              it. (5 min by default)
         """
@@ -1497,10 +1669,13 @@ class CephManager:
     def flush_all_pg_stats(self):
         self.flush_pg_stats(range(len(self.get_osd_dump())))
 
-    def do_rados(self, remote, cmd, check_status=True):
+    def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
         """
         Execute a remote rados command.
         """
+        if remote is None:
+            remote = self.controller
+
         testdir = teuthology.get_testdir(self.ctx)
         pre = [
             'adjust-ulimits',
@@ -1510,11 +1685,15 @@ class CephManager:
             '--cluster',
             self.cluster,
             ]
+        if pool is not None:
+            pre += ['--pool', pool]
+        if namespace is not None:
+            pre += ['--namespace', namespace]
         pre.extend(cmd)
         proc = remote.run(
             args=pre,
             wait=True,
-            check_status=check_status
+            **kwargs
             )
         return proc
 
@@ -1525,7 +1704,6 @@ class CephManager:
         Threads not used yet.
         """
         args = [
-            '-p', pool,
             '--num-objects', num_objects,
             '-b', size,
             'bench', timelimit,
@@ -1533,59 +1711,42 @@ class CephManager:
             ]
         if not cleanup:
             args.append('--no-cleanup')
-        return self.do_rados(self.controller, map(str, args))
+        return self.do_rados(map(str, args), pool=pool)
 
     def do_put(self, pool, obj, fname, namespace=None):
         """
         Implement rados put operation
         """
-        args = ['-p', pool]
-        if namespace is not None:
-            args += ['-N', namespace]
-        args += [
-            'put',
-            obj,
-            fname
-        ]
+        args = ['put', obj, fname]
         return self.do_rados(
-            self.controller,
             args,
-            check_status=False
+            check_status=False,
+            pool=pool,
+            namespace=namespace
         ).exitstatus
 
     def do_get(self, pool, obj, fname='/dev/null', namespace=None):
         """
         Implement rados get operation
         """
-        args = ['-p', pool]
-        if namespace is not None:
-            args += ['-N', namespace]
-        args += [
-            'get',
-            obj,
-            fname
-        ]
+        args = ['get', obj, fname]
         return self.do_rados(
-            self.controller,
             args,
-            check_status=False
+            check_status=False,
+            pool=pool,
+            namespace=namespace,
         ).exitstatus
 
     def do_rm(self, pool, obj, namespace=None):
         """
         Implement rados rm operation
         """
-        args = ['-p', pool]
-        if namespace is not None:
-            args += ['-N', namespace]
-        args += [
-            'rm',
-            obj
-        ]
+        args = ['rm', obj]
         return self.do_rados(
-            self.controller,
             args,
-            check_status=False
+            check_status=False,
+            pool=pool,
+            namespace=namespace
         ).exitstatus
 
     def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
@@ -2881,7 +3042,6 @@ class CephManager:
         """
         out = self.raw_cluster_cmd('quorum_status')
         j = json.loads(out)
-        self.log('quorum_status is %s' % out)
         return j['quorum']
 
     def wait_for_mon_quorum_size(self, size, timeout=300):