"""
ceph manager -- Thrasher and CephManager objects
"""
-from cStringIO import StringIO
from functools import wraps
import contextlib
import random
import threading
import traceback
import os
+import six
+
+from io import BytesIO
+from six import StringIO
from teuthology import misc as teuthology
from tasks.scrub import Scrubber
-from util.rados import cmd_erasure_code_profile
-from util import get_remote
+from tasks.util.rados import cmd_erasure_code_profile
+from tasks.util import get_remote
from teuthology.contextutil import safe_while
from teuthology.orchestra.remote import Remote
from teuthology.orchestra import run
from teuthology.exceptions import CommandFailedError
+from tasks.thrasher import Thrasher
+from six import StringIO
try:
from subprocess import DEVNULL # py3k
except ImportError:
- DEVNULL = open(os.devnull, 'r+')
+ DEVNULL = open(os.devnull, 'r+') # type: ignore
DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
log = logging.getLogger(__name__)
+# this is for cephadm clusters
+def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
+ testdir = teuthology.get_testdir(ctx)
+ extra_args = []
+ if name:
+ extra_args = ['-n', name]
+ return remote.run(
+ args=[
+ 'sudo',
+ ctx.cephadm,
+ '--image', ctx.ceph[cluster_name].image,
+ 'shell',
+ ] + extra_args + [
+ '--fsid', ctx.ceph[cluster_name].fsid,
+ '--',
+ ] + args,
+ **kwargs
+ )
def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
- conf_fp = StringIO()
+ conf_fp = BytesIO()
ctx.ceph[cluster].conf.write(conf_fp)
conf_fp.seek(0)
writes = ctx.cluster.run(
args=[
'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
- 'sudo', 'python',
- '-c',
- ('import shutil, sys; '
- 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
- conf_path,
- run.Raw('&&'),
+ 'sudo', 'tee', conf_path, run.Raw('&&'),
'sudo', 'chmod', '0644', conf_path,
+ run.Raw('>'), '/dev/null',
+
],
stdin=run.PIPE,
wait=False)
)
-class Thrasher:
+def log_exc(func):
+ @wraps(func)
+ def wrapper(self):
+ try:
+ return func(self)
+ except:
+ self.log(traceback.format_exc())
+ raise
+ return wrapper
+
+
+class PoolType:
+ REPLICATED = 1
+ ERASURE_CODED = 3
+
+
+class OSDThrasher(Thrasher):
"""
Object used to thrash Ceph
"""
- def __init__(self, manager, config, logger=None):
+ def __init__(self, manager, config, name, logger):
+ super(OSDThrasher, self).__init__()
+
self.ceph_manager = manager
self.cluster = manager.cluster
self.ceph_manager.wait_for_clean()
self.stopping = False
self.logger = logger
self.config = config
- self.revive_timeout = self.config.get("revive_timeout", 150)
+ self.name = name
+ self.revive_timeout = self.config.get("revive_timeout", 360)
self.pools_to_fix_pgp_num = set()
if self.config.get('powercycle'):
self.revive_timeout += 120
self.clean_wait = self.config.get('clean_wait', 0)
- self.minin = self.config.get("min_in", 3)
+ self.minin = self.config.get("min_in", 4)
self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
self.sighup_delay = self.config.get('sighup_delay')
self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
num_osds = self.in_osds + self.out_osds
- self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
- if self.logger is not None:
- self.log = lambda x: self.logger.info(x)
- else:
- def tmp(x):
- """
- Implement log behavior
- """
- print x
- self.log = tmp
+ self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
+ self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
if self.config is None:
self.config = dict()
# prevent monitor from auto-marking things out while thrasher runs
# another
first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
opts = [('mon', 'mon_osd_down_out_interval', 0)]
+ #why do we disable marking an OSD out automatically? :/
for service, opt, new_value in opts:
old_value = manager.get_config(first_mon[0],
first_mon[1],
opt)
self.saved_options.append((service, opt, old_value))
- self._set_config(service, '*', opt, new_value)
+ manager.inject_args(service, '*', opt, new_value)
# initialize ceph_objectstore_tool property - must be done before
# do_thrash is spawned - http://tracker.ceph.com/issues/18799
if (self.config.get('powercycle') or
not self.cmd_exists_on_osds("ceph-objectstore-tool") or
self.config.get('disable_objectstore_tool_tests', False)):
self.ceph_objectstore_tool = False
- self.test_rm_past_intervals = False
if self.config.get('powercycle'):
self.log("Unable to test ceph-objectstore-tool, "
"powercycle testing")
else:
self.ceph_objectstore_tool = \
self.config.get('ceph_objectstore_tool', True)
- self.test_rm_past_intervals = \
- self.config.get('test_rm_past_intervals', True)
# spawn do_thrash
self.thread = gevent.spawn(self.do_thrash)
if self.sighup_delay:
if self.noscrub_toggle_delay:
self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
- def _set_config(self, service_type, service_id, name, value):
- opt_arg = '--{name} {value}'.format(name=name, value=value)
- whom = '.'.join([service_type, service_id])
- self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
- 'injectargs', opt_arg)
-
+ def log(self, msg, *args, **kwargs):
+ self.logger.info(msg, *args, **kwargs)
def cmd_exists_on_osds(self, cmd):
+ if self.ceph_manager.cephadm:
+ return True
allremotes = self.ceph_manager.ctx.cluster.only(\
teuthology.is_type('osd', self.cluster)).remotes.keys()
allremotes = list(set(allremotes))
for remote in allremotes:
proc = remote.run(args=['type', cmd], wait=True,
- check_status=False, stdout=StringIO(),
- stderr=StringIO())
+ check_status=False, stdout=BytesIO(),
+ stderr=BytesIO())
if proc.exitstatus != 0:
return False;
return True;
+ def run_ceph_objectstore_tool(self, remote, osd, cmd):
+ if self.ceph_manager.cephadm:
+ return shell(
+ self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
+ args=['ceph-objectstore-tool'] + cmd,
+ name=osd,
+ wait=True, check_status=False,
+ stdout=StringIO(),
+ stderr=StringIO())
+ else:
+ return remote.run(
+ args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
+ wait=True, check_status=False,
+ stdout=StringIO(),
+ stderr=StringIO())
+
def kill_osd(self, osd=None, mark_down=False, mark_out=False):
"""
:param osd: Osd to be killed.
if mark_out and osd in self.in_osds:
self.out_osd(osd)
if self.ceph_objectstore_tool:
- self.log("Testing ceph-objectstore-tool on down osd")
+ self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
remote = self.ceph_manager.find_remote('osd', osd)
FSPATH = self.ceph_manager.get_filepath()
JPATH = os.path.join(FSPATH, "journal")
exp_osd = imp_osd = osd
+ self.log('remote for osd %s is %s' % (osd, remote))
exp_remote = imp_remote = remote
# If an older osd is available we'll move a pg from there
if (len(self.dead_osds) > 1 and
random.random() < self.chance_move_pg):
exp_osd = random.choice(self.dead_osds[:-1])
exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
- if ('keyvaluestore_backend' in
- self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
- prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
- "--data-path {fpath} --journal-path {jpath} "
- "--type keyvaluestore "
- "--log-file="
- "/var/log/ceph/objectstore_tool.\\$pid.log ".
- format(fpath=FSPATH, jpath=JPATH))
- else:
- prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
- "--data-path {fpath} --journal-path {jpath} "
- "--log-file="
- "/var/log/ceph/objectstore_tool.\\$pid.log ".
- format(fpath=FSPATH, jpath=JPATH))
- cmd = (prefix + "--op list-pgs").format(id=exp_osd)
-
- # ceph-objectstore-tool might be temporarily absent during an
- # upgrade - see http://tracker.ceph.com/issues/18014
- with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
- while proceed():
- proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
- wait=True, check_status=False, stdout=StringIO(),
- stderr=StringIO())
- if proc.exitstatus == 0:
- break
- log.debug("ceph-objectstore-tool binary not present, trying again")
+ self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
+ prefix = [
+ '--no-mon-config',
+ '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
+ ]
+
+ if not self.ceph_manager.cephadm:
+ # ceph-objectstore-tool might be temporarily absent during an
+ # upgrade - see http://tracker.ceph.com/issues/18014
+ with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
+ while proceed():
+ proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
+ wait=True, check_status=False, stdout=BytesIO(),
+ stderr=BytesIO())
+ if proc.exitstatus == 0:
+ break
+ log.debug("ceph-objectstore-tool binary not present, trying again")
# ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
# see http://tracker.ceph.com/issues/19556
with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
while proceed():
- proc = exp_remote.run(args=cmd, wait=True,
- check_status=False,
- stdout=StringIO(), stderr=StringIO())
+ proc = self.run_ceph_objectstore_tool(
+ exp_remote, 'osd.%s' % exp_osd,
+ prefix + [
+ '--data-path', FSPATH.format(id=exp_osd),
+ '--journal-path', JPATH.format(id=exp_osd),
+ '--op', 'list-pgs',
+ ])
if proc.exitstatus == 0:
break
- elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
+ elif (proc.exitstatus == 1 and
+ proc.stderr.getvalue() == "OSD has the store locked"):
continue
else:
raise Exception("ceph-objectstore-tool: "
"exp list-pgs failure with status {ret}".
format(ret=proc.exitstatus))
- pgs = proc.stdout.getvalue().split('\n')[:-1]
+ pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
if len(pgs) == 0:
self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
return
pg = random.choice(pgs)
- exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
- exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
- exp_path = os.path.join(exp_path,
+ #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
+ #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
+ exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
"exp.{pg}.{id}".format(
pg=pg,
id=exp_osd))
+ if self.ceph_manager.cephadm:
+ exp_host_path = os.path.join(
+ '/var/log/ceph',
+ self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
+ "exp.{pg}.{id}".format(
+ pg=pg,
+ id=exp_osd))
+ else:
+ exp_host_path = exp_path
+
# export
- cmd = prefix + "--op export --pgid {pg} --file {file}"
- cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
- proc = exp_remote.run(args=cmd)
+ # Can't use new export-remove op since this is part of upgrade testing
+ proc = self.run_ceph_objectstore_tool(
+ exp_remote, 'osd.%s' % exp_osd,
+ prefix + [
+ '--data-path', FSPATH.format(id=exp_osd),
+ '--journal-path', JPATH.format(id=exp_osd),
+ '--op', 'export',
+ '--pgid', pg,
+ '--file', exp_path,
+ ])
if proc.exitstatus:
raise Exception("ceph-objectstore-tool: "
"export failure with status {ret}".
format(ret=proc.exitstatus))
# remove
- cmd = prefix + "--op remove --pgid {pg}"
- cmd = cmd.format(id=exp_osd, pg=pg)
- proc = exp_remote.run(args=cmd)
+ proc = self.run_ceph_objectstore_tool(
+ exp_remote, 'osd.%s' % exp_osd,
+ prefix + [
+ '--data-path', FSPATH.format(id=exp_osd),
+ '--journal-path', JPATH.format(id=exp_osd),
+ '--force',
+ '--op', 'remove',
+ '--pgid', pg,
+ ])
if proc.exitstatus:
raise Exception("ceph-objectstore-tool: "
"remove failure with status {ret}".
# If there are at least 2 dead osds we might move the pg
if exp_osd != imp_osd:
# If pg isn't already on this osd, then we will move it there
- cmd = (prefix + "--op list-pgs").format(id=imp_osd)
- proc = imp_remote.run(args=cmd, wait=True,
- check_status=False, stdout=StringIO())
+ proc = self.run_ceph_objectstore_tool(
+ imp_remote,
+ 'osd.%s' % imp_osd,
+ prefix + [
+ '--data-path', FSPATH.format(id=imp_osd),
+ '--journal-path', JPATH.format(id=imp_osd),
+ '--op', 'list-pgs',
+ ])
if proc.exitstatus:
raise Exception("ceph-objectstore-tool: "
"imp list-pgs failure with status {ret}".
format(ret=proc.exitstatus))
- pgs = proc.stdout.getvalue().split('\n')[:-1]
+ pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
if pg not in pgs:
self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
format(pg=pg, fosd=exp_osd, tosd=imp_osd))
# Copy export file to the other machine
self.log("Transfer export file from {srem} to {trem}".
format(srem=exp_remote, trem=imp_remote))
- tmpexport = Remote.get_file(exp_remote, exp_path)
- Remote.put_file(imp_remote, tmpexport, exp_path)
+ # just in case an upgrade make /var/log/ceph unreadable by non-root,
+ exp_remote.run(args=['sudo', 'chmod', '777',
+ '/var/log/ceph'])
+ imp_remote.run(args=['sudo', 'chmod', '777',
+ '/var/log/ceph'])
+ tmpexport = Remote.get_file(exp_remote, exp_host_path,
+ sudo=True)
+ if exp_host_path != exp_path:
+ # push to /var/log/ceph, then rename (we can't
+ # chmod 777 the /var/log/ceph/$fsid mountpoint)
+ Remote.put_file(imp_remote, tmpexport, exp_path)
+ imp_remote.run(args=[
+ 'sudo', 'mv', exp_path, exp_host_path])
+ else:
+ Remote.put_file(imp_remote, tmpexport, exp_host_path)
os.remove(tmpexport)
else:
# Can't move the pg after all
imp_osd = exp_osd
imp_remote = exp_remote
# import
- cmd = (prefix + "--op import --file {file}")
- cmd = cmd.format(id=imp_osd, file=exp_path)
- proc = imp_remote.run(args=cmd, wait=True, check_status=False,
- stderr=StringIO())
+ proc = self.run_ceph_objectstore_tool(
+ imp_remote, 'osd.%s' % imp_osd,
+ [
+ '--data-path', FSPATH.format(id=imp_osd),
+ '--journal-path', JPATH.format(id=imp_osd),
+ '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
+ '--op', 'import',
+ '--file', exp_path,
+ ])
if proc.exitstatus == 1:
bogosity = "The OSD you are using is older than the exported PG"
if bogosity in proc.stderr.getvalue():
elif proc.exitstatus == 11:
self.log("Attempt to import an incompatible export"
"...ignored")
+ elif proc.exitstatus == 12:
+ # this should be safe to ignore because we only ever move 1
+ # copy of the pg at a time, and merge is only initiated when
+ # all replicas are peered and happy. /me crosses fingers
+ self.log("PG merged on target"
+ "...ignored")
elif proc.exitstatus:
raise Exception("ceph-objectstore-tool: "
"import failure with status {ret}".
format(ret=proc.exitstatus))
- cmd = "rm -f {file}".format(file=exp_path)
+ cmd = "sudo rm -f {file}".format(file=exp_host_path)
exp_remote.run(args=cmd)
if imp_remote != exp_remote:
imp_remote.run(args=cmd)
# apply low split settings to each pool
- for pool in self.ceph_manager.list_pools():
- no_sudo_prefix = prefix[5:]
- cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
- "--filestore-split-multiple 1' sudo -E "
- + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
- proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
- output = proc.stderr.getvalue()
- if 'Couldn\'t find pool' in output:
- continue
- if proc.exitstatus:
- raise Exception("ceph-objectstore-tool apply-layout-settings"
- " failed with {status}".format(status=proc.exitstatus))
+ if not self.ceph_manager.cephadm:
+ for pool in self.ceph_manager.list_pools():
+ cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
+ "--filestore-split-multiple 1' sudo -E "
+ + 'ceph-objectstore-tool '
+ + ' '.join(prefix + [
+ '--data-path', FSPATH.format(id=imp_osd),
+ '--journal-path', JPATH.format(id=imp_osd),
+ ])
+ + " --op apply-layout-settings --pool " + pool).format(id=osd)
+ proc = imp_remote.run(args=cmd,
+ wait=True, check_status=False,
+ stderr=StringIO())
+ if 'Couldn\'t find pool' in proc.stderr.getvalue():
+ continue
+ if proc.exitstatus:
+ raise Exception("ceph-objectstore-tool apply-layout-settings"
+ " failed with {status}".format(status=proc.exitstatus))
- def rm_past_intervals(self, osd=None):
- """
- :param osd: Osd to find pg to remove past intervals
- """
- if self.test_rm_past_intervals:
- if osd is None:
- osd = random.choice(self.dead_osds)
- self.log("Use ceph_objectstore_tool to remove past intervals")
- remote = self.ceph_manager.find_remote('osd', osd)
- FSPATH = self.ceph_manager.get_filepath()
- JPATH = os.path.join(FSPATH, "journal")
- if ('keyvaluestore_backend' in
- self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
- prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
- "--data-path {fpath} --journal-path {jpath} "
- "--type keyvaluestore "
- "--log-file="
- "/var/log/ceph/objectstore_tool.\\$pid.log ".
- format(fpath=FSPATH, jpath=JPATH))
- else:
- prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
- "--data-path {fpath} --journal-path {jpath} "
- "--log-file="
- "/var/log/ceph/objectstore_tool.\\$pid.log ".
- format(fpath=FSPATH, jpath=JPATH))
- cmd = (prefix + "--op list-pgs").format(id=osd)
- proc = remote.run(args=cmd, wait=True,
- check_status=False, stdout=StringIO())
- if proc.exitstatus:
- raise Exception("ceph_objectstore_tool: "
- "exp list-pgs failure with status {ret}".
- format(ret=proc.exitstatus))
- pgs = proc.stdout.getvalue().split('\n')[:-1]
- if len(pgs) == 0:
- self.log("No PGs found for osd.{osd}".format(osd=osd))
- return
- pg = random.choice(pgs)
- cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
- format(id=osd, pg=pg)
- proc = remote.run(args=cmd)
- if proc.exitstatus:
- raise Exception("ceph_objectstore_tool: "
- "rm-past-intervals failure with status {ret}".
- format(ret=proc.exitstatus))
def blackhole_kill_osd(self, osd=None):
"""
skip_admin_check=skip_admin_check)
self.dead_osds.remove(osd)
self.live_osds.append(osd)
- if self.random_eio > 0 and osd is self.rerrosd:
- self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
- 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
- self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
- 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
+ if self.random_eio > 0 and osd == self.rerrosd:
+ self.ceph_manager.set_config(self.rerrosd,
+ filestore_debug_random_read_err = self.random_eio)
+ self.ceph_manager.set_config(self.rerrosd,
+ bluestore_debug_random_read_err = self.random_eio)
def out_osd(self, osd=None):
try:
if random.random() >= .3:
pgs = self.ceph_manager.get_pg_stats()
+ if not pgs:
+ return
pg = random.choice(pgs)
pgid = str(pg['pgid'])
poolid = int(pgid.split('.')[0])
try:
if random.random() >= .3:
pgs = self.ceph_manager.get_pg_stats()
+ if not pgs:
+ return
pg = random.choice(pgs)
pgid = str(pg['pgid'])
poolid = int(pgid.split('.')[0])
backfill = random.random() >= 0.5
j = self.ceph_manager.get_pgids_to_force(backfill)
if j:
- if backfill:
- self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
- else:
- self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
+ try:
+ if backfill:
+ self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
+ else:
+ self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
+ except CommandFailedError:
+ self.log('Failed to force backfill|recovery, ignoring')
+
def cancel_force_recovery(self):
"""
backfill = random.random() >= 0.5
j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
if j:
- if backfill:
- self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
- else:
- self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
+ try:
+ if backfill:
+ self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
+ else:
+ self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
+ except CommandFailedError:
+ self.log('Failed to force backfill|recovery, ignoring')
def force_cancel_recovery(self):
"""
Increase the size of the pool
"""
pool = self.ceph_manager.get_pool()
- orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
+ if pool is None:
+ return
self.log("Growing pool %s" % (pool,))
if self.ceph_manager.expand_pool(pool,
self.config.get('pool_grow_by', 10),
self.max_pgs):
self.pools_to_fix_pgp_num.add(pool)
+ def shrink_pool(self):
+ """
+ Decrease the size of the pool
+ """
+ pool = self.ceph_manager.get_pool()
+ if pool is None:
+ return
+ _ = self.ceph_manager.get_pool_pg_num(pool)
+ self.log("Shrinking pool %s" % (pool,))
+ if self.ceph_manager.contract_pool(
+ pool,
+ self.config.get('pool_shrink_by', 10),
+ self.min_pgs):
+ self.pools_to_fix_pgp_num.add(pool)
+
def fix_pgp_num(self, pool=None):
"""
Fix number of pgs in pool.
"""
if pool is None:
pool = self.ceph_manager.get_pool()
+ if not pool:
+ return
force = False
else:
force = True
def test_pool_min_size(self):
"""
- Kill and revive all osds except one.
+ Loop to selectively push PGs below their min_size and test that recovery
+ still occurs.
"""
self.log("test_pool_min_size")
self.all_up()
self.ceph_manager.wait_for_recovery(
timeout=self.config.get('timeout')
)
- the_one = random.choice(self.in_osds)
- self.log("Killing everyone but %s", the_one)
- to_kill = filter(lambda x: x != the_one, self.in_osds)
- [self.kill_osd(i) for i in to_kill]
- [self.out_osd(i) for i in to_kill]
- time.sleep(self.config.get("test_pool_min_size_time", 10))
- self.log("Killing %s" % (the_one,))
- self.kill_osd(the_one)
- self.out_osd(the_one)
- self.log("Reviving everyone but %s" % (the_one,))
- [self.revive_osd(i) for i in to_kill]
- [self.in_osd(i) for i in to_kill]
- self.log("Revived everyone but %s" % (the_one,))
- self.log("Waiting for clean")
+
+ minout = int(self.config.get("min_out", 1))
+ minlive = int(self.config.get("min_live", 2))
+ mindead = int(self.config.get("min_dead", 1))
+ self.log("doing min_size thrashing")
+ self.ceph_manager.wait_for_clean(timeout=60)
+ assert self.ceph_manager.is_clean(), \
+ 'not clean before minsize thrashing starts'
+ while not self.stopping:
+ # look up k and m from all the pools on each loop, in case it
+ # changes as the cluster runs
+ k = 0
+ m = 99
+ has_pools = False
+ pools_json = self.ceph_manager.get_osd_dump_json()['pools']
+
+ for pool_json in pools_json:
+ pool = pool_json['pool_name']
+ has_pools = True
+ pool_type = pool_json['type'] # 1 for rep, 3 for ec
+ min_size = pool_json['min_size']
+ self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
+ try:
+ ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
+ if pool_type != PoolType.ERASURE_CODED:
+ continue
+ ec_profile = pool_json['erasure_code_profile']
+ ec_profile_json = self.ceph_manager.raw_cluster_cmd(
+ 'osd',
+ 'erasure-code-profile',
+ 'get',
+ ec_profile,
+ '--format=json')
+ ec_json = json.loads(ec_profile_json)
+ local_k = int(ec_json['k'])
+ local_m = int(ec_json['m'])
+ self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
+ k=local_k, m=local_m))
+ if local_k > k:
+ self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
+ k = local_k
+ if local_m < m:
+ self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
+ m = local_m
+ except CommandFailedError:
+ self.log("failed to read erasure_code_profile. %s was likely removed", pool)
+ continue
+
+ if has_pools :
+ self.log("using k={k}, m={m}".format(k=k,m=m))
+ else:
+ self.log("No pools yet, waiting")
+ time.sleep(5)
+ continue
+
+ if minout > len(self.out_osds): # kill OSDs and mark out
+ self.log("forced to out an osd")
+ self.kill_osd(mark_out=True)
+ continue
+ elif mindead > len(self.dead_osds): # kill OSDs but force timeout
+ self.log("forced to kill an osd")
+ self.kill_osd()
+ continue
+ else: # make mostly-random choice to kill or revive OSDs
+ minup = max(minlive, k)
+ rand_val = random.uniform(0, 1)
+ self.log("choosing based on number of live OSDs and rand val {rand}".\
+ format(rand=rand_val))
+ if len(self.live_osds) > minup+1 and rand_val < 0.5:
+ # chose to knock out as many OSDs as we can w/out downing PGs
+
+ most_killable = min(len(self.live_osds) - minup, m)
+ self.log("chose to kill {n} OSDs".format(n=most_killable))
+ for i in range(1, most_killable):
+ self.kill_osd(mark_out=True)
+ time.sleep(10)
+ # try a few times since there might be a concurrent pool
+ # creation or deletion
+ with safe_while(
+ sleep=5, tries=5,
+ action='check for active or peered') as proceed:
+ while proceed():
+ if self.ceph_manager.all_active_or_peered():
+ break
+ self.log('not all PGs are active or peered')
+ else: # chose to revive OSDs, bring up a random fraction of the dead ones
+ self.log("chose to revive osds")
+ for i in range(1, int(rand_val * len(self.dead_osds))):
+ self.revive_osd(i)
+
+ # let PGs repair themselves or our next knockout might kill one
+ self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
+
+ # / while not self.stopping
+ self.all_up_in()
+
self.ceph_manager.wait_for_recovery(
timeout=self.config.get('timeout')
)
osd_debug_skip_full_check_in_backfill_reservation to force
the more complicated check in do_scan to be exercised.
- Then, verify that all backfills stop.
+ Then, verify that all backfillings stop.
"""
self.log("injecting backfill full")
for i in self.live_osds:
check_status=True, timeout=30, stdout=DEVNULL)
for i in range(30):
status = self.ceph_manager.compile_pg_status()
- if 'backfill' not in status.keys():
+ if 'backfilling' not in status.keys():
break
self.log(
- "waiting for {still_going} backfills".format(
- still_going=status.get('backfill')))
+ "waiting for {still_going} backfillings".format(
+ still_going=status.get('backfilling')))
time.sleep(1)
- assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
+ assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
for i in self.live_osds:
self.ceph_manager.set_config(
i,
Random action selector.
"""
chance_down = self.config.get('chance_down', 0.4)
- chance_test_min_size = self.config.get('chance_test_min_size', 0)
+ _ = self.config.get('chance_test_min_size', 0)
chance_test_backfill_full = \
self.config.get('chance_test_backfill_full', 0)
if isinstance(chance_down, int):
chance_down = float(chance_down) / 100
minin = self.minin
- minout = self.config.get("min_out", 0)
- minlive = self.config.get("min_live", 2)
- mindead = self.config.get("min_dead", 0)
+ minout = int(self.config.get("min_out", 0))
+ minlive = int(self.config.get("min_live", 2))
+ mindead = int(self.config.get("min_dead", 0))
self.log('choose_action: min_in %d min_out '
'%d min_live %d min_dead %d' %
actions.append((self.out_osd, 1.0,))
if len(self.live_osds) > minlive and chance_down > 0:
actions.append((self.kill_osd, chance_down,))
- if len(self.dead_osds) > 1:
- actions.append((self.rm_past_intervals, 1.0,))
if len(self.out_osds) > minout:
actions.append((self.in_osd, 1.7,))
if len(self.dead_osds) > mindead:
self.config.get('reweight_osd', .5),))
actions.append((self.grow_pool,
self.config.get('chance_pgnum_grow', 0),))
+ actions.append((self.shrink_pool,
+ self.config.get('chance_pgnum_shrink', 0),))
actions.append((self.fix_pgp_num,
self.config.get('chance_pgpnum_fix', 0),))
actions.append((self.test_pool_min_size,
- chance_test_min_size,))
+ self.config.get('chance_test_min_size', 0),))
actions.append((self.test_backfill_full,
chance_test_backfill_full,))
if self.chance_thrash_cluster_full > 0:
val -= prob
return None
- def log_exc(func):
- @wraps(func)
- def wrapper(self):
- try:
- return func(self)
- except:
- self.log(traceback.format_exc())
- raise
- return wrapper
+ def do_thrash(self):
+ """
+ _do_thrash() wrapper.
+ """
+ try:
+ self._do_thrash()
+ except Exception as e:
+ # See _run exception comment for MDSThrasher
+ self.set_thrasher_exception(e)
+ self.logger.exception("exception:")
+ # Allow successful completion so gevent doesn't see an exception.
+ # The DaemonWatchdog will observe the error and tear down the test.
@log_exc
def do_sighup(self):
osd_state = "false"
else:
osd_state = "true"
- self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
- 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
+ try:
+ self.ceph_manager.inject_args('osd', '*',
+ 'osd_enable_op_tracker',
+ osd_state)
+ except CommandFailedError:
+ self.log('Failed to tell all osds, ignoring')
gevent.sleep(delay)
@log_exc
self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
@log_exc
- def do_thrash(self):
+ def _do_thrash(self):
"""
Loop to select random actions to thrash ceph manager with.
"""
delay = self.config.get("op_delay", 5)
self.rerrosd = self.live_osds[0]
if self.random_eio > 0:
- self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
- 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
- self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
- 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
+ self.ceph_manager.inject_args('osd', self.rerrosd,
+ 'filestore_debug_random_read_err',
+ self.random_eio)
+ self.ceph_manager.inject_args('osd', self.rerrosd,
+ 'bluestore_debug_random_read_err',
+ self.random_eio)
self.log("starting do_thrash")
while not self.stopping:
to_log = [str(x) for x in ["in_osds: ", self.in_osds,
self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
str(osd), str(1))
if random.uniform(0, 1) < float(
- self.config.get('chance_test_map_discontinuity', 0)):
+ self.config.get('chance_test_map_discontinuity', 0)) \
+ and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
self.test_map_discontinuity()
else:
self.ceph_manager.wait_for_recovery(
time.sleep(delay)
self.all_up()
if self.random_eio > 0:
- self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
- 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
- self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
- 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
+ self.ceph_manager.inject_args('osd', self.rerrosd,
+ 'filestore_debug_random_read_err', '0.0')
+ self.ceph_manager.inject_args('osd', self.rerrosd,
+ 'bluestore_debug_random_read_err', '0.0')
for pool in list(self.pools_to_fix_pgp_num):
if self.ceph_manager.get_pool_pg_num(pool) > 0:
self.fix_pgp_num(pool)
self.pools_to_fix_pgp_num.clear()
for service, opt, saved_value in self.saved_options:
- self._set_config(service, '*', opt, saved_value)
+ self.ceph_manager.inject_args(service, '*', opt, saved_value)
self.saved_options = []
self.all_up_in()
self.pgid = self.manager.get_object_pg_with_shard(self.pool,
self.object_name,
self.osd)
- self.remote = self.manager.ctx.\
- cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
+ self.remote = next(iter(self.manager.ctx.\
+ cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
path = self.manager.get_filepath().format(id=self.osd)
self.paths = ("--data-path {path} --journal-path {path}/journal".
format(path=path))
def run(self, options, args, stdin=None, stdout=None):
if stdout is None:
- stdout = StringIO()
+ stdout = BytesIO()
self.manager.kill_osd(self.osd)
cmd = self.build_cmd(options, args, stdin)
self.manager.log(cmd)
proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
check_status=False,
stdout=stdout,
- stderr=StringIO())
+ stderr=BytesIO())
proc.wait()
if proc.exitstatus != 0:
self.manager.log("failed with " + str(proc.exitstatus))
- error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
+ error = six.ensure_str(proc.stdout.getvalue()) + " " + \
+ six.ensure_str(proc.stderr.getvalue())
raise Exception(error)
finally:
if self.do_revive:
self.manager.wait_till_osd_is_up(self.osd, 300)
+# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
+# the same name.
class CephManager:
"""
Ceph manager object.
Contains several local functions that form a bulk of this module.
- Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
- the same name.
+ :param controller: the remote machine where the Ceph commands should be
+ executed
+ :param ctx: the cluster context
+ :param config: path to Ceph config file
+ :param logger: for logging messages
+ :param cluster: name of the Ceph cluster
"""
- REPLICATED_POOL = 1
- ERASURE_CODED_POOL = 3
-
def __init__(self, controller, ctx=None, config=None, logger=None,
- cluster='ceph'):
+ cluster='ceph', cephadm=False):
self.lock = threading.RLock()
self.ctx = ctx
self.config = config
self.controller = controller
self.next_pool_id = 0
self.cluster = cluster
+ self.cephadm = cephadm
if (logger):
self.log = lambda x: logger.info(x)
else:
"""
implement log behavior.
"""
- print x
+ print(x)
self.log = tmp
if self.config is None:
self.config = dict()
for pool in pools:
# we may race with a pool deletion; ignore failures here
try:
- self.pools[pool] = self.get_pool_property(pool, 'pg_num')
+ self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
except CommandFailedError:
self.log('Failed to get pg_num from pool %s, ignoring' % pool)
"""
Start ceph on a raw cluster. Return count
"""
- testdir = teuthology.get_testdir(self.ctx)
- ceph_args = [
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'timeout',
- '120',
- 'ceph',
- '--cluster',
- self.cluster,
- ]
- ceph_args.extend(args)
- proc = self.controller.run(
- args=ceph_args,
- stdout=StringIO(),
+ if self.cephadm:
+ proc = shell(self.ctx, self.cluster, self.controller,
+ args=['ceph'] + list(args),
+ stdout=BytesIO())
+ else:
+ testdir = teuthology.get_testdir(self.ctx)
+ ceph_args = [
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'timeout',
+ '120',
+ 'ceph',
+ '--cluster',
+ self.cluster,
+ '--log-early',
+ ]
+ ceph_args.extend(args)
+ proc = self.controller.run(
+ args=ceph_args,
+ stdout=BytesIO(),
)
- return proc.stdout.getvalue()
+ return six.ensure_str(proc.stdout.getvalue())
- def raw_cluster_cmd_result(self, *args):
+ def raw_cluster_cmd_result(self, *args, **kwargs):
"""
Start ceph on a cluster. Return success or failure information.
"""
- testdir = teuthology.get_testdir(self.ctx)
- ceph_args = [
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=testdir),
- 'timeout',
- '120',
- 'ceph',
- '--cluster',
- self.cluster,
- ]
- ceph_args.extend(args)
- proc = self.controller.run(
- args=ceph_args,
- check_status=False,
- )
+ if self.cephadm:
+ proc = shell(self.ctx, self.cluster, self.controller,
+ args=['ceph'] + list(args),
+ check_status=False)
+ else:
+ testdir = teuthology.get_testdir(self.ctx)
+ ceph_args = [
+ 'sudo',
+ 'adjust-ulimits',
+ 'ceph-coverage',
+ '{tdir}/archive/coverage'.format(tdir=testdir),
+ 'timeout',
+ '120',
+ 'ceph',
+ '--cluster',
+ self.cluster,
+ ]
+ ceph_args.extend(args)
+ kwargs['args'] = ceph_args
+ kwargs['check_status'] = False
+ proc = self.controller.run(**kwargs)
return proc.exitstatus
- def run_ceph_w(self):
+ def run_ceph_w(self, watch_channel=None):
"""
- Execute "ceph -w" in the background with stdout connected to a StringIO,
+ Execute "ceph -w" in the background with stdout connected to a BytesIO,
and return the RemoteProcess.
+
+ :param watch_channel: Specifies the channel to be watched. This can be
+ 'cluster', 'audit', ...
+ :type watch_channel: str
+ """
+ args = ["sudo",
+ "daemon-helper",
+ "kill",
+ "ceph",
+ '--cluster',
+ self.cluster,
+ "-w"]
+ if watch_channel is not None:
+ args.append("--watch-channel")
+ args.append(watch_channel)
+ return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
+
+ def get_mon_socks(self):
"""
- return self.controller.run(
- args=["sudo",
- "daemon-helper",
- "kill",
- "ceph",
- '--cluster',
- self.cluster,
- "-w"],
- wait=False, stdout=StringIO(), stdin=run.PIPE)
+ Get monitor sockets.
+
+ :return socks: tuple of strings; strings are individual sockets.
+ """
+ from json import loads
+
+ output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
+ socks = []
+ for mon in output['mons']:
+ for addrvec_mem in mon['public_addrs']['addrvec']:
+ socks.append(addrvec_mem['addr'])
+ return tuple(socks)
+
+ def get_msgrv1_mon_socks(self):
+ """
+ Get monitor sockets that use msgrv1 to operate.
+
+ :return socks: tuple of strings; strings are individual sockets.
+ """
+ from json import loads
+
+ output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+ socks = []
+ for mon in output['mons']:
+ for addrvec_mem in mon['public_addrs']['addrvec']:
+ if addrvec_mem['type'] == 'v1':
+ socks.append(addrvec_mem['addr'])
+ return tuple(socks)
+
+ def get_msgrv2_mon_socks(self):
+ """
+ Get monitor sockets that use msgrv2 to operate.
+
+ :return socks: tuple of strings; strings are individual sockets.
+ """
+ from json import loads
+
+ output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+ socks = []
+ for mon in output['mons']:
+ for addrvec_mem in mon['public_addrs']['addrvec']:
+ if addrvec_mem['type'] == 'v2':
+ socks.append(addrvec_mem['addr'])
+ return tuple(socks)
def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
"""
:param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
it. (5 min by default)
"""
- seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
+ seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
for osd in osds}
if not wait_for_mon:
return
if no_wait is None:
no_wait = []
- for osd, need in seq.iteritems():
+ for osd, need in seq.items():
if osd in no_wait:
continue
got = 0
while wait_for_mon > 0:
- got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
+ got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
self.log('need seq {need} got {got} for osd.{osd}'.format(
need=need, got=got, osd=osd))
if got >= need:
"""
if stdout is None:
stdout = StringIO()
- testdir = teuthology.get_testdir(self.ctx)
+
remote = self.find_remote(service_type, service_id)
+
+ if self.cephadm:
+ return shell(
+ self.ctx, self.cluster, remote,
+ args=[
+ 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
+ ] + command,
+ stdout=stdout,
+ wait=True,
+ check_status=check_status,
+ )
+
+ testdir = teuthology.get_testdir(self.ctx)
args = [
'sudo',
'adjust-ulimits',
assert False
def wait_for_pg_stats(func):
- # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
+ # both osd_mon_report_interval and mgr_stats_period are 5 seconds
# by default, and take the faulty injection in ms into consideration,
# 12 seconds are more than enough
- delays = [1, 1, 2, 3, 5, 8, 13]
+ delays = [1, 1, 2, 3, 5, 8, 13, 0]
@wraps(func)
def wrapper(self, *args, **kwargs):
exc = None
def wait_run_admin_socket(self, service_type,
service_id, args=['version'], timeout=75, stdout=None):
"""
- If osd_admin_socket call suceeds, return. Otherwise wait
+ If osd_admin_socket call succeeds, return. Otherwise wait
five seconds and try again.
"""
if stdout is None:
while True:
proc = self.admin_socket(service_type, service_id,
args, check_status=False, stdout=stdout)
- if proc.exitstatus is 0:
+ if proc.exitstatus == 0:
return proc
else:
tries += 1
j = json.loads(proc.stdout.getvalue())
return j[name]
+ def inject_args(self, service_type, service_id, name, value):
+ whom = '{0}.{1}'.format(service_type, service_id)
+ if isinstance(value, bool):
+ value = 'true' if value else 'false'
+ opt_arg = '--{name}={value}'.format(name=name, value=value)
+ self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
+
def set_config(self, osdnum, **argdict):
"""
:param osdnum: osd number
:param argdict: dictionary containing values to set.
"""
- for k, v in argdict.iteritems():
+ for k, v in argdict.items():
self.wait_run_admin_socket(
'osd', osdnum,
['config', 'set', str(k), str(v)])
"""
Get status from cluster
"""
- status = self.raw_cluster_cmd('status', '--format=json-pretty')
+ status = self.raw_cluster_cmd('status', '--format=json')
return json.loads(status)
def raw_osd_status(self):
"""
Get osd statuses sorted by states that the osds are in.
"""
- osd_lines = filter(
+ osd_lines = list(filter(
lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
- self.raw_osd_status().split('\n'))
+ self.raw_osd_status().split('\n')))
self.log(osd_lines)
in_osds = [int(i[4:].split()[0])
for i in filter(lambda x: " in " in x, osd_lines)]
:param erasure_code_use_overwrites: if true, allow overwrites
"""
with self.lock:
- assert isinstance(pool_name, basestring)
+ assert isinstance(pool_name, six.string_types)
assert isinstance(pg_num, int)
assert pool_name not in self.pools
self.log("creating pool_name %s" % (pool_name,))
:param pool_name: Pool to be removed
"""
with self.lock:
- assert isinstance(pool_name, basestring)
+ assert isinstance(pool_name, six.string_types)
assert pool_name in self.pools
self.log("removing pool_name %s" % (pool_name,))
del self.pools[pool_name]
- self.do_rados(self.controller,
- ['rmpool', pool_name, pool_name,
- "--yes-i-really-really-mean-it"])
+ self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
+ "--yes-i-really-really-mean-it")
def get_pool(self):
"""
Pick a random pool
"""
with self.lock:
- return random.choice(self.pools.keys())
+ if self.pools:
+ return random.sample(self.pools.keys(), 1)[0]
def get_pool_pg_num(self, pool_name):
"""
Return the number of pgs in the pool specified.
"""
with self.lock:
- assert isinstance(pool_name, basestring)
+ assert isinstance(pool_name, six.string_types)
if pool_name in self.pools:
return self.pools[pool_name]
return 0
"""
:param pool_name: pool
:param prop: property to be checked.
- :returns: property as an int value.
+ :returns: property as string
"""
with self.lock:
- assert isinstance(pool_name, basestring)
- assert isinstance(prop, basestring)
+ assert isinstance(pool_name, six.string_types)
+ assert isinstance(prop, six.string_types)
output = self.raw_cluster_cmd(
'osd',
'pool',
'get',
pool_name,
prop)
- return int(output.split()[1])
+ return output.split()[1]
+
+ def get_pool_int_property(self, pool_name, prop):
+ return int(self.get_pool_property(pool_name, prop))
def set_pool_property(self, pool_name, prop, val):
"""
This routine retries if set operation fails.
"""
with self.lock:
- assert isinstance(pool_name, basestring)
- assert isinstance(prop, basestring)
+ assert isinstance(pool_name, six.string_types)
+ assert isinstance(prop, six.string_types)
assert isinstance(val, int)
tries = 0
while True:
Increase the number of pgs in a pool
"""
with self.lock:
- assert isinstance(pool_name, basestring)
+ assert isinstance(pool_name, six.string_types)
assert isinstance(by, int)
assert pool_name in self.pools
if self.get_num_creating() > 0:
self.pools[pool_name] = new_pg_num
return True
+ def contract_pool(self, pool_name, by, min_pgs):
+ """
+ Decrease the number of pgs in a pool
+ """
+ with self.lock:
+ self.log('contract_pool %s by %s min %s' % (
+ pool_name, str(by), str(min_pgs)))
+ assert isinstance(pool_name, six.string_types)
+ assert isinstance(by, int)
+ assert pool_name in self.pools
+ if self.get_num_creating() > 0:
+ self.log('too many creating')
+ return False
+ proj = self.pools[pool_name] - by
+ if proj < min_pgs:
+ self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
+ return False
+ self.log("decrease pool size by %d" % (by,))
+ new_pg_num = self.pools[pool_name] - by
+ self.set_pool_property(pool_name, "pg_num", new_pg_num)
+ self.pools[pool_name] = new_pg_num
+ return True
+
+ def stop_pg_num_changes(self):
+ """
+ Reset all pg_num_targets back to pg_num, canceling splits and merges
+ """
+ self.log('Canceling any pending splits or merges...')
+ osd_dump = self.get_osd_dump_json()
+ try:
+ for pool in osd_dump['pools']:
+ if pool['pg_num'] != pool['pg_num_target']:
+ self.log('Setting pool %s (%d) pg_num %d -> %d' %
+ (pool['pool_name'], pool['pool'],
+ pool['pg_num_target'],
+ pool['pg_num']))
+ self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
+ 'pg_num', str(pool['pg_num']))
+ except KeyError:
+ # we don't support pg_num_target before nautilus
+ pass
+
def set_pool_pgpnum(self, pool_name, force):
"""
Set pgpnum property of pool_name pool.
"""
with self.lock:
- assert isinstance(pool_name, basestring)
+ assert isinstance(pool_name, six.string_types)
assert pool_name in self.pools
if not force and self.get_num_creating() > 0:
return False
self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
return True
- def list_pg_missing(self, pgid):
+ def list_pg_unfound(self, pgid):
"""
- return list of missing pgs with the id specified
+ return list of unfound pgs with the id specified
"""
r = None
offset = {}
while True:
- out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
+ out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
json.dumps(offset))
j = json.loads(out)
if r is None:
"""
out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
j = json.loads('\n'.join(out.split('\n')[1:]))
- return j['pg_stats']
+ try:
+ return j['pg_map']['pg_stats']
+ except KeyError:
+ return j['pg_stats']
def get_pgids_to_force(self, backfill):
"""
ret[status] += 1
return ret
- @wait_for_pg_stats
+ @wait_for_pg_stats # type: ignore
def with_pg_state(self, pool, pgnum, check):
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
assert(check(stats['state']))
- @wait_for_pg_stats
+ @wait_for_pg_stats # type: ignore
def with_pg(self, pool, pgnum, check):
pgstr = self.get_pgid(pool, pgnum)
stats = self.get_single_pg_stats(pgstr)
"""
pool_dump = self.get_pool_dump(pool)
object_map = self.get_object_map(pool, name)
- if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
+ if pool_dump["type"] == PoolType.ERASURE_CODED:
shard = object_map['acting'].index(osdid)
return "{pgid}s{shard}".format(pgid=object_map['pgid'],
shard=shard)
"""
return self.get_osd_dump_json()['osds']
+ def get_osd_metadata(self):
+ """
+ osd metadata --format=json converted to a python object
+ :returns: the python object containing osd metadata information
+ """
+ out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
+ return json.loads('\n'.join(out.split('\n')[1:]))
+
def get_mgr_dump(self):
out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
return json.loads(out)
"""
out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
'--format=json')
- return json.loads(out)
+ return json.loads(out).get('stuck_pg_stats',[])
def get_num_unfound_objects(self):
"""
Find the number of active and clean pgs.
"""
pgs = self.get_pg_stats()
+ return self._get_num_active_clean(pgs)
+
+ def _get_num_active_clean(self, pgs):
num = 0
for pg in pgs:
if (pg['state'].count('active') and
Find the number of active and recovered pgs.
"""
pgs = self.get_pg_stats()
+ return self._get_num_active_recovered(pgs)
+
+ def _get_num_active_recovered(self, pgs):
num = 0
for pg in pgs:
if (pg['state'].count('active') and
not pg['state'].count('recover') and
- not pg['state'].count('backfill') and
+ not pg['state'].count('backfilling') and
not pg['state'].count('stale')):
num += 1
return num
Find the number of active pgs.
"""
pgs = self.get_pg_stats()
+ return self._get_num_active(pgs)
+
+ def _get_num_active(self, pgs):
num = 0
for pg in pgs:
if pg['state'].count('active') and not pg['state'].count('stale'):
Find the number of pgs that are either active or down.
"""
pgs = self.get_pg_stats()
+ return self._get_num_active_down(pgs)
+
+ def _get_num_active_down(self, pgs):
num = 0
for pg in pgs:
if ((pg['state'].count('active') and not
num += 1
return num
+ def get_num_peered(self):
+ """
+ Find the number of PGs that are peered
+ """
+ pgs = self.get_pg_stats()
+ return self._get_num_peered(pgs)
+
+ def _get_num_peered(self, pgs):
+ num = 0
+ for pg in pgs:
+ if pg['state'].count('peered') and not pg['state'].count('stale'):
+ num += 1
+ return num
+
def is_clean(self):
"""
True if all pgs are clean
"""
- return self.get_num_active_clean() == self.get_num_pgs()
+ pgs = self.get_pg_stats()
+ return self._get_num_active_clean(pgs) == len(pgs)
def is_recovered(self):
"""
True if all pgs have recovered
"""
- return self.get_num_active_recovered() == self.get_num_pgs()
+ pgs = self.get_pg_stats()
+ return self._get_num_active_recovered(pgs) == len(pgs)
def is_active_or_down(self):
"""
True if all pgs are active or down
"""
- return self.get_num_active_down() == self.get_num_pgs()
+ pgs = self.get_pg_stats()
+ return self._get_num_active_down(pgs) == len(pgs)
- def wait_for_clean(self, timeout=None):
+ def wait_for_clean(self, timeout=1200):
"""
Returns true when all pgs are clean.
"""
else:
self.log("no progress seen, keeping timeout for now")
if now - start >= timeout:
+ if self.is_recovered():
+ break
self.log('dumping pgs')
out = self.raw_cluster_cmd('pg', 'dump')
self.log(out)
"""
return self.get_num_active() == self.get_num_pgs()
+ def all_active_or_peered(self):
+ """
+ Wrapper to check if all PGs are active or peered
+ """
+ pgs = self.get_pg_stats()
+ return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
+
def wait_till_active(self, timeout=None):
"""
Wait until all pgs are active.
time.sleep(3)
self.log("active!")
+ def wait_till_pg_convergence(self, timeout=None):
+ start = time.time()
+ old_stats = None
+ active_osds = [osd['osd'] for osd in self.get_osd_dump()
+ if osd['in'] and osd['up']]
+ while True:
+ # strictly speaking, no need to wait for mon. but due to the
+ # "ms inject socket failures" setting, the osdmap could be delayed,
+ # so mgr is likely to ignore the pg-stat messages with pgs serving
+ # newly created pools which is not yet known by mgr. so, to make sure
+ # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
+ # necessary.
+ self.flush_pg_stats(active_osds)
+ new_stats = dict((stat['pgid'], stat['state'])
+ for stat in self.get_pg_stats())
+ if old_stats == new_stats:
+ return old_stats
+ if timeout is not None:
+ assert time.time() - start < timeout, \
+ 'failed to reach convergence before %d secs' % timeout
+ old_stats = new_stats
+ # longer than mgr_stats_period
+ time.sleep(5 + 1)
+
def mark_out_osd(self, osd):
"""
Wrapper to mark osd out.
remote.console.power_off()
elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
- self.raw_cluster_cmd(
- '--', 'tell', 'osd.%d' % osd,
- 'injectargs',
- '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
- )
+ self.inject_args(
+ 'osd', osd,
+ 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
try:
self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
except:
"""
Stop osd if nothing else works.
"""
- self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
- 'injectargs',
- '--objectstore-blackhole')
+ self.inject_args('osd', osd,
+ 'objectstore-blackhole', True)
time.sleep(2)
self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
- def revive_osd(self, osd, timeout=150, skip_admin_check=False):
+ def revive_osd(self, osd, timeout=360, skip_admin_check=False):
"""
Revive osds by either power cycling (if indicated by the config)
or by restarting.
## monitors
def signal_mon(self, mon, sig, silent=False):
"""
- Wrapper to local get_deamon call
+ Wrapper to local get_daemon call
"""
self.ctx.daemons.get_daemon('mon', mon,
self.cluster).signal(sig, silent=silent)
"""
Extract all the monitor status information from the cluster
"""
- addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
- out = self.raw_cluster_cmd('-m', addr, 'mon_status')
+ out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
return json.loads(out)
def get_mon_quorum(self):
self.log('health:\n{h}'.format(h=out))
return json.loads(out)
- def get_mds_status(self, mds):
- """
- Run cluster commands for the mds in order to get mds information
- """
- out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
- j = json.loads(' '.join(out.splitlines()[1:]))
- # collate; for dup ids, larger gid wins.
- for info in j['info'].itervalues():
- if info['name'] == mds:
- return info
- return None
+ def wait_until_healthy(self, timeout=None):
+ self.log("wait_until_healthy")
+ start = time.time()
+ while self.get_mon_health()['status'] != 'HEALTH_OK':
+ if timeout is not None:
+ assert time.time() - start < timeout, \
+ 'timeout expired in wait_until_healthy'
+ time.sleep(3)
+ self.log("wait_until_healthy done")
def get_filepath(self):
"""
remote.run(args=['sudo',
'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
+ def get_service_task_status(self, service, status_key):
+ """
+ Return daemon task status for a given ceph service.
+
+ :param service: ceph service (mds, osd, etc...)
+ :param status_key: matching task status key
+ """
+ task_status = {}
+ status = self.raw_cluster_status()
+ try:
+ for k,v in status['servicemap']['services'][service]['daemons'].items():
+ ts = dict(v).get('task_status', None)
+ if ts:
+ task_status[k] = ts[status_key]
+ except KeyError: # catches missing service and status key
+ return {}
+ self.log(task_status)
+ return task_status
def utility_task(name):
"""