self.stopping = False
self.logger = logger
self.config = config
- self.revive_timeout = self.config.get("revive_timeout", 150)
+ self.revive_timeout = self.config.get("revive_timeout", 360)
self.pools_to_fix_pgp_num = set()
if self.config.get('powercycle'):
self.revive_timeout += 120
self.clean_wait = self.config.get('clean_wait', 0)
- self.minin = self.config.get("min_in", 3)
+ self.minin = self.config.get("min_in", 4)
self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
self.sighup_delay = self.config.get('sighup_delay')
self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
+ self.random_eio = self.config.get('random_eio')
+ self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
num_osds = self.in_osds + self.out_osds
self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
pg=pg,
id=exp_osd))
# export
+ # Can't use new export-remove op since this is part of upgrade testing
cmd = prefix + "--op export --pgid {pg} --file {file}"
cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
proc = exp_remote.run(args=cmd)
"export failure with status {ret}".
format(ret=proc.exitstatus))
# remove
- cmd = prefix + "--op remove --pgid {pg}"
+ cmd = prefix + "--force --op remove --pgid {pg}"
cmd = cmd.format(id=exp_osd, pg=pg)
proc = exp_remote.run(args=cmd)
if proc.exitstatus:
skip_admin_check=skip_admin_check)
self.dead_osds.remove(osd)
self.live_osds.append(osd)
+ if self.random_eio > 0 and osd is self.rerrosd:
+ self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+ 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
+ self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+ 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
+
def out_osd(self, osd=None):
"""
except CommandFailedError:
self.log('Failed to rm-pg-upmap-items, ignoring')
+ def force_recovery(self):
+ """
+ Force recovery on some of PGs
+ """
+ backfill = random.random() >= 0.5
+ j = self.ceph_manager.get_pgids_to_force(backfill)
+ if j:
+ try:
+ if backfill:
+ self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
+ else:
+ self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
+ except CommandFailedError:
+ self.log('Failed to force backfill|recovery, ignoring')
+
+
+ def cancel_force_recovery(self):
+ """
+ Force recovery on some of PGs
+ """
+ backfill = random.random() >= 0.5
+ j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
+ if j:
+ try:
+ if backfill:
+ self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
+ else:
+ self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
+ except CommandFailedError:
+ self.log('Failed to force backfill|recovery, ignoring')
+
+ def force_cancel_recovery(self):
+ """
+ Force or cancel forcing recovery
+ """
+ if random.random() >= 0.4:
+ self.force_recovery()
+ else:
+ self.cancel_force_recovery()
+
def all_up(self):
"""
Make sure all osds are up and not out.
osd_debug_skip_full_check_in_backfill_reservation to force
the more complicated check in do_scan to be exercised.
- Then, verify that all backfills stop.
+ Then, verify that all backfillings stop.
"""
self.log("injecting backfill full")
for i in self.live_osds:
check_status=True, timeout=30, stdout=DEVNULL)
for i in range(30):
status = self.ceph_manager.compile_pg_status()
- if 'backfill' not in status.keys():
+ if 'backfilling' not in status.keys():
break
self.log(
- "waiting for {still_going} backfills".format(
- still_going=status.get('backfill')))
+ "waiting for {still_going} backfillings".format(
+ still_going=status.get('backfilling')))
time.sleep(1)
- assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
+ assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
for i in self.live_osds:
self.ceph_manager.set_config(
i,
while len(self.in_osds) < (self.minin + 1):
self.in_osd()
self.log("Waiting for recovery")
- self.ceph_manager.wait_for_all_up(
+ self.ceph_manager.wait_for_all_osds_up(
timeout=self.config.get('timeout')
)
# now we wait 20s for the pg status to change, if it takes longer,
actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
if self.chance_thrash_pg_upmap_items > 0:
actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
+ if self.chance_force_recovery > 0:
+ actions.append((self.force_cancel_recovery, self.chance_force_recovery))
for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
for scenario in [
scrubint = self.config.get("scrub_interval", -1)
maxdead = self.config.get("max_dead", 0)
delay = self.config.get("op_delay", 5)
+ self.rerrosd = self.live_osds[0]
+ if self.random_eio > 0:
+ self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+ 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
+ self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+ 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
self.log("starting do_thrash")
while not self.stopping:
to_log = [str(x) for x in ["in_osds: ", self.in_osds,
Scrubber(self.ceph_manager, self.config)
self.choose_action()()
time.sleep(delay)
+ self.all_up()
+ if self.random_eio > 0:
+ self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+ 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
+ self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+ 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
for pool in list(self.pools_to_fix_pgp_num):
if self.ceph_manager.get_pool_pg_num(pool) > 0:
self.fix_pgp_num(pool)
finally:
if self.do_revive:
self.manager.revive_osd(self.osd)
+ self.manager.wait_till_osd_is_up(self.osd, 300)
class CephManager:
"-w"],
wait=False, stdout=StringIO(), stdin=run.PIPE)
- def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=3*5):
+ def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
"""
Flush pg stats from a list of OSD ids, ensuring they are reflected
all the way to the monitor. Luminous and later only.
stat seq from monitor anymore. in that case, you need
to pass a blacklist.
:param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
- it. (3 * mon_mgr_digest_period, by default)
+ it. (5 min by default)
"""
seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
for osd in osds}
# both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
# by default, and take the faulty injection in ms into consideration,
# 12 seconds are more than enough
- delays = [1, 1, 2, 3, 5, 8, 13]
+ delays = [1, 1, 2, 3, 5, 8, 13, 0]
@wraps(func)
def wrapper(self, *args, **kwargs):
exc = None
'osd', 'pool', 'set', pool_name,
'allow_ec_overwrites',
'true')
+ self.raw_cluster_cmd(
+ 'osd', 'pool', 'application', 'enable',
+ pool_name, 'rados', '--yes-i-really-mean-it',
+ run.Raw('||'), 'true')
self.pools[pool_name] = pg_num
time.sleep(1)
j = json.loads('\n'.join(out.split('\n')[1:]))
return j['pg_stats']
+ def get_pgids_to_force(self, backfill):
+ """
+ Return the randomized list of PGs that can have their recovery/backfill forced
+ """
+ j = self.get_pg_stats();
+ pgids = []
+ if backfill:
+ wanted = ['degraded', 'backfilling', 'backfill_wait']
+ else:
+ wanted = ['recovering', 'degraded', 'recovery_wait']
+ for pg in j:
+ status = pg['state'].split('+')
+ for t in wanted:
+ if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
+ pgids.append(pg['pgid'])
+ break
+ return pgids
+
+ def get_pgids_to_cancel_force(self, backfill):
+ """
+ Return the randomized list of PGs whose recovery/backfill priority is forced
+ """
+ j = self.get_pg_stats();
+ pgids = []
+ if backfill:
+ wanted = 'forced_backfill'
+ else:
+ wanted = 'forced_recovery'
+ for pg in j:
+ status = pg['state'].split('+')
+ if wanted in status and random.random() > 0.5:
+ pgids.append(pg['pgid'])
+ return pgids
+
def compile_pg_status(self):
"""
Return a histogram of pg state values
"""
return self.get_osd_dump_json()['osds']
+ def get_mgr_dump(self):
+ out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
+ return json.loads(out)
+
def get_stuck_pgs(self, type_, threshold):
"""
:returns: stuck pg information from the cluster
for pg in pgs:
if (pg['state'].count('active') and
not pg['state'].count('recover') and
- not pg['state'].count('backfill') and
+ not pg['state'].count('backfilling') and
not pg['state'].count('stale')):
num += 1
return num
x = self.get_osd_dump()
return (len(x) == sum([(y['up'] > 0) for y in x]))
- def wait_for_all_up(self, timeout=None):
+ def wait_for_all_osds_up(self, timeout=None):
"""
When this exits, either the timeout has expired, or all
osds are up.
while not self.are_all_osds_up():
if timeout is not None:
assert time.time() - start < timeout, \
- 'timeout expired in wait_for_all_up'
+ 'timeout expired in wait_for_all_osds_up'
time.sleep(3)
self.log("all up!")
+ def pool_exists(self, pool):
+ if pool in self.list_pools():
+ return True
+ return False
+
+ def wait_for_pool(self, pool, timeout=300):
+ """
+ Wait for a pool to exist
+ """
+ self.log('waiting for pool %s to exist' % pool)
+ start = time.time()
+ while not self.pool_exists(pool):
+ if timeout is not None:
+ assert time.time() - start < timeout, \
+ 'timeout expired in wait_for_pool'
+ time.sleep(3)
+
+ def wait_for_pools(self, pools):
+ for pool in pools:
+ self.wait_for_pool(pool)
+
+ def is_mgr_available(self):
+ x = self.get_mgr_dump()
+ return x.get('available', False)
+
+ def wait_for_mgr_available(self, timeout=None):
+ self.log("waiting for mgr available")
+ start = time.time()
+ while not self.is_mgr_available():
+ if timeout is not None:
+ assert time.time() - start < timeout, \
+ 'timeout expired in wait_for_mgr_available'
+ time.sleep(3)
+ self.log("mgr available!")
+
def wait_for_recovery(self, timeout=None):
"""
Check peering. When this exists, we have recovered.
else:
self.log("no progress seen, keeping timeout for now")
if now - start >= timeout:
+ if self.is_recovered():
+ break
self.log('dumping pgs')
out = self.raw_cluster_cmd('pg', 'dump')
self.log(out)
time.sleep(3)
self.log("active!")
+ def wait_till_pg_convergence(self, timeout=None):
+ start = time.time()
+ old_stats = None
+ active_osds = [osd['osd'] for osd in self.get_osd_dump()
+ if osd['in'] and osd['up']]
+ while True:
+ # strictly speaking, no need to wait for mon. but due to the
+ # "ms inject socket failures" setting, the osdmap could be delayed,
+ # so mgr is likely to ignore the pg-stat messages with pgs serving
+ # newly created pools which is not yet known by mgr. so, to make sure
+ # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
+ # necessary.
+ self.flush_pg_stats(active_osds)
+ new_stats = dict((stat['pgid'], stat['state'])
+ for stat in self.get_pg_stats())
+ if old_stats == new_stats:
+ return old_stats
+ if timeout is not None:
+ assert time.time() - start < timeout, \
+ 'failed to reach convergence before %d secs' % timeout
+ old_stats = new_stats
+ # longer than mgr_stats_period
+ time.sleep(5 + 1)
+
def mark_out_osd(self, osd):
"""
Wrapper to mark osd out.
time.sleep(2)
self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
- def revive_osd(self, osd, timeout=150, skip_admin_check=False):
+ def revive_osd(self, osd, timeout=360, skip_admin_check=False):
"""
Revive osds by either power cycling (if indicated by the config)
or by restarting.
create_pool = utility_task("create_pool")
remove_pool = utility_task("remove_pool")
wait_for_clean = utility_task("wait_for_clean")
+flush_all_pg_stats = utility_task("flush_all_pg_stats")
set_pool_property = utility_task("set_pool_property")
do_pg_scrub = utility_task("do_pg_scrub")
+wait_for_pool = utility_task("wait_for_pool")
+wait_for_pools = utility_task("wait_for_pools")