import threading
import traceback
import os
-import six
+import shlex
from io import BytesIO, StringIO
+from subprocess import DEVNULL
from teuthology import misc as teuthology
from tasks.scrub import Scrubber
from tasks.util.rados import cmd_erasure_code_profile
from teuthology.orchestra import run
from teuthology.exceptions import CommandFailedError
from tasks.thrasher import Thrasher
-from six import StringIO
-try:
- from subprocess import DEVNULL # py3k
-except ImportError:
- DEVNULL = open(os.devnull, 'r+') # type: ignore
DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
# this is for cephadm clusters
def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
- testdir = teuthology.get_testdir(ctx)
extra_args = []
if name:
extra_args = ['-n', name]
teuthology.feed_many_stdins_and_close(conf_fp, writes)
run.wait(writes)
+def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
+ """
+ Build a command line for running valgrind.
+
+ testdir - test results directory
+ name - name of daemon (for naming hte log file)
+ preamble - stuff we should run before valgrind
+ v - valgrind arguments
+ """
+ if v is None:
+ return preamble
+ if not isinstance(v, list):
+ v = [v]
+
+ # https://tracker.ceph.com/issues/44362
+ preamble.extend([
+ 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
+ ])
+
+ val_path = '/var/log/ceph/valgrind'
+ if '--tool=memcheck' in v or '--tool=helgrind' in v:
+ extra_args = [
+ 'valgrind',
+ '--trace-children=no',
+ '--child-silent-after-fork=yes',
+ '--soname-synonyms=somalloc=*tcmalloc*',
+ '--num-callers=50',
+ '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
+ '--xml=yes',
+ '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
+ '--time-stamp=yes',
+ '--vgdb=yes',
+ ]
+ else:
+ extra_args = [
+ 'valgrind',
+ '--trace-children=no',
+ '--child-silent-after-fork=yes',
+ '--soname-synonyms=somalloc=*tcmalloc*',
+ '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
+ '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
+ '--time-stamp=yes',
+ '--vgdb=yes',
+ ]
+ if exit_on_first_error:
+ extra_args.extend([
+ # at least Valgrind 3.14 is required
+ '--exit-on-first-error=yes',
+ '--error-exitcode=42',
+ ])
+ args = []
+ if cd:
+ args += ['cd', testdir, run.Raw('&&')]
+ args += preamble + extra_args + v
+ log.debug('running %s under valgrind with args %s', name, args)
+ return args
+
def mount_osd_data(ctx, remote, cluster, osd):
"""
stdout=StringIO(),
stderr=StringIO())
+ def run_ceph_bluestore_tool(self, remote, osd, cmd):
+ if self.ceph_manager.cephadm:
+ return shell(
+ self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
+ args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
+ name=osd,
+ wait=True, check_status=False,
+ stdout=StringIO(),
+ stderr=StringIO())
+ else:
+ return remote.run(
+ args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
+ wait=True, check_status=False,
+ stdout=StringIO(),
+ stderr=StringIO())
+
def kill_osd(self, osd=None, mark_down=False, mark_out=False):
"""
:param osd: Osd to be killed.
self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
check_status=True, timeout=30, stdout=DEVNULL)
+
+ def generate_random_sharding(self):
+ prefixes = [
+ 'm','O','P','L'
+ ]
+ new_sharding = ''
+ for prefix in prefixes:
+ choose = random.choice([False, True])
+ if not choose:
+ continue
+ if new_sharding != '':
+ new_sharding = new_sharding + ' '
+ columns = random.randint(1, 5)
+ do_hash = random.choice([False, True])
+ if do_hash:
+ low_hash = random.choice([0, 5, 8])
+ do_high_hash = random.choice([False, True])
+ if do_high_hash:
+ high_hash = random.choice([8, 16, 30]) + low_hash
+ new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
+ else:
+ new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
+ else:
+ if columns == 1:
+ new_sharding = new_sharding + prefix
+ else:
+ new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
+ return new_sharding
+
+ def test_bluestore_reshard_action(self):
+ """
+ Test if resharding of bluestore works properly.
+ If bluestore is not used, or bluestore is in version that
+ does not support sharding, skip.
+ """
+
+ osd = random.choice(self.dead_osds)
+ remote = self.ceph_manager.find_remote('osd', osd)
+ FSPATH = self.ceph_manager.get_filepath()
+
+ prefix = [
+ '--no-mon-config',
+ '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
+ '--log-level=10',
+ '--path', FSPATH.format(id=osd)
+ ]
+
+ # sanity check if bluestore-tool accessible
+ self.log('checking if target objectstore is bluestore on osd.%s' % osd)
+ cmd = prefix + [
+ 'show-label'
+ ]
+ proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+ if proc.exitstatus != 0:
+ raise Exception("ceph-bluestore-tool access failed.")
+
+ # check if sharding is possible
+ self.log('checking if target bluestore supports sharding on osd.%s' % osd)
+ cmd = prefix + [
+ 'show-sharding'
+ ]
+ proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+ if proc.exitstatus != 0:
+ self.log("Unable to test resharding, "
+ "ceph-bluestore-tool does not support it.")
+ return
+
+ # now go for reshard to something else
+ self.log('applying new sharding to bluestore on osd.%s' % osd)
+ new_sharding = self.config.get('bluestore_new_sharding','random')
+
+ if new_sharding == 'random':
+ self.log('generate random sharding')
+ new_sharding = self.generate_random_sharding()
+
+ self.log("applying new sharding: " + new_sharding)
+ cmd = prefix + [
+ '--sharding', new_sharding,
+ 'reshard'
+ ]
+ proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+ if proc.exitstatus != 0:
+ raise Exception("ceph-bluestore-tool resharding failed.")
+
+ # now do fsck to
+ self.log('running fsck to verify new sharding on osd.%s' % osd)
+ cmd = prefix + [
+ 'fsck'
+ ]
+ proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
+ if proc.exitstatus != 0:
+ raise Exception("ceph-bluestore-tool fsck failed.")
+ self.log('resharding successfully completed')
+
+ def test_bluestore_reshard(self):
+ """
+ 1) kills an osd
+ 2) reshards bluestore on killed osd
+ 3) revives the osd
+ """
+ self.log('test_bluestore_reshard started')
+ self.kill_osd(mark_down=True, mark_out=True)
+ self.test_bluestore_reshard_action()
+ self.revive_osd()
+ self.log('test_bluestore_reshard completed')
+
+
def test_map_discontinuity(self):
"""
1) Allows the osds to recover
self.config.get('chance_inject_pause_long', 0),)]:
actions.append(scenario)
+ # only consider resharding if objectstore is bluestore
+ cluster_name = self.ceph_manager.cluster
+ cluster = self.ceph_manager.ctx.ceph[cluster_name]
+ if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
+ actions.append((self.test_bluestore_reshard,
+ self.config.get('chance_bluestore_reshard', 0),))
+
total = sum([y for (x, y) in actions])
val = random.uniform(0, total)
for (action, prob) in actions:
"""
def __init__(self, controller, ctx=None, config=None, logger=None,
- cluster='ceph', cephadm=False):
+ cluster='ceph', cephadm=False) -> None:
self.lock = threading.RLock()
self.ctx = ctx
self.config = config
except CommandFailedError:
self.log('Failed to get pg_num from pool %s, ignoring' % pool)
- def raw_cluster_cmd(self, *args):
+ def ceph(self, cmd, **kwargs):
"""
- Start ceph on a raw cluster. Return count
+ Simple Ceph admin command wrapper around run_cluster_cmd.
+ """
+
+ kwargs.pop('args', None)
+ args = shlex.split(cmd)
+ stdout = kwargs.pop('stdout', StringIO())
+ stderr = kwargs.pop('stderr', StringIO())
+ return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
+
+ def run_cluster_cmd(self, **kwargs):
+ """
+ Run a Ceph command and return the object representing the process
+ for the command.
+
+ Accepts arguments same as that of teuthology.orchestra.run.run()
"""
if self.cephadm:
- proc = shell(self.ctx, self.cluster, self.controller,
- args=['ceph'] + list(args),
- stdout=BytesIO())
- else:
- testdir = teuthology.get_testdir(self.ctx)
- ceph_args = [
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'timeout',
- '120',
- 'ceph',
- '--cluster',
- self.cluster,
- '--log-early',
- ]
- ceph_args.extend(args)
- proc = self.controller.run(
- args=ceph_args,
- stdout=BytesIO(),
- )
- return six.ensure_str(proc.stdout.getvalue())
+ return shell(self.ctx, self.cluster, self.controller,
+ args=['ceph'] + list(kwargs['args']),
+ stdout=StringIO(),
+ check_status=kwargs.get('check_status', True))
+
+ testdir = teuthology.get_testdir(self.ctx)
+ prefix = ['sudo', 'adjust-ulimits', 'ceph-coverage',
+ f'{testdir}/archive/coverage', 'timeout', '120', 'ceph',
+ '--cluster', self.cluster]
+ kwargs['args'] = prefix + list(kwargs['args'])
+ return self.controller.run(**kwargs)
+
+ def raw_cluster_cmd(self, *args, **kwargs) -> str:
+ """
+ Start ceph on a raw cluster. Return count
+ """
+ stdout = kwargs.pop('stdout', StringIO())
+ p = self.run_cluster_cmd(args=args, stdout=stdout, **kwargs)
+ return p.stdout.getvalue()
def raw_cluster_cmd_result(self, *args, **kwargs):
"""
Start ceph on a cluster. Return success or failure information.
"""
- if self.cephadm:
- proc = shell(self.ctx, self.cluster, self.controller,
- args=['ceph'] + list(args),
- check_status=False)
- else:
- testdir = teuthology.get_testdir(self.ctx)
- ceph_args = [
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'timeout',
- '120',
- 'ceph',
- '--cluster',
- self.cluster,
- ]
- ceph_args.extend(args)
- kwargs['args'] = ceph_args
- kwargs['check_status'] = False
- proc = self.controller.run(**kwargs)
- return proc.exitstatus
+ kwargs['args'], kwargs['check_status'] = args, False
+ return self.run_cluster_cmd(**kwargs).exitstatus
def run_ceph_w(self, watch_channel=None):
"""
wait for all specified osds, but some of them could be
moved out of osdmap, so we cannot get their updated
stat seq from monitor anymore. in that case, you need
- to pass a blacklist.
+ to pass a blocklist.
:param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
it. (5 min by default)
"""
def flush_all_pg_stats(self):
self.flush_pg_stats(range(len(self.get_osd_dump())))
- def do_rados(self, remote, cmd, check_status=True):
+ def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
"""
Execute a remote rados command.
"""
+ if remote is None:
+ remote = self.controller
+
testdir = teuthology.get_testdir(self.ctx)
pre = [
'adjust-ulimits',
'--cluster',
self.cluster,
]
+ if pool is not None:
+ pre += ['--pool', pool]
+ if namespace is not None:
+ pre += ['--namespace', namespace]
pre.extend(cmd)
proc = remote.run(
args=pre,
wait=True,
- check_status=check_status
+ **kwargs
)
return proc
Threads not used yet.
"""
args = [
- '-p', pool,
'--num-objects', num_objects,
'-b', size,
'bench', timelimit,
]
if not cleanup:
args.append('--no-cleanup')
- return self.do_rados(self.controller, map(str, args))
+ return self.do_rados(map(str, args), pool=pool)
def do_put(self, pool, obj, fname, namespace=None):
"""
Implement rados put operation
"""
- args = ['-p', pool]
- if namespace is not None:
- args += ['-N', namespace]
- args += [
- 'put',
- obj,
- fname
- ]
+ args = ['put', obj, fname]
return self.do_rados(
- self.controller,
args,
- check_status=False
+ check_status=False,
+ pool=pool,
+ namespace=namespace
).exitstatus
def do_get(self, pool, obj, fname='/dev/null', namespace=None):
"""
Implement rados get operation
"""
- args = ['-p', pool]
- if namespace is not None:
- args += ['-N', namespace]
- args += [
- 'get',
- obj,
- fname
- ]
+ args = ['get', obj, fname]
return self.do_rados(
- self.controller,
args,
- check_status=False
+ check_status=False,
+ pool=pool,
+ namespace=namespace,
).exitstatus
def do_rm(self, pool, obj, namespace=None):
"""
Implement rados rm operation
"""
- args = ['-p', pool]
- if namespace is not None:
- args += ['-N', namespace]
- args += [
- 'rm',
- obj
- ]
+ args = ['rm', obj]
return self.do_rados(
- self.controller,
args,
- check_status=False
+ check_status=False,
+ pool=pool,
+ namespace=namespace
).exitstatus
def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
"""
out = self.raw_cluster_cmd('quorum_status')
j = json.loads(out)
- self.log('quorum_status is %s' % out)
return j['quorum']
def wait_for_mon_quorum_size(self, size, timeout=300):