"""
import logging
import contextlib
-import ceph_manager
import itertools
import random
-import signal
import time
from gevent import sleep
from gevent.event import Event
from teuthology import misc as teuthology
-from tasks.cephfs.filesystem import MDSCluster, Filesystem
+from tasks import ceph_manager
+from tasks.cephfs.filesystem import MDSCluster, Filesystem, FSMissing
+from tasks.thrasher import Thrasher
log = logging.getLogger(__name__)
-class DaemonWatchdog(Greenlet):
- """
- DaemonWatchdog::
-
- Watch Ceph daemons for failures. If an extended failure is detected (i.e.
- not intentional), then the watchdog will unmount file systems and send
- SIGTERM to all daemons. The duration of an extended failure is configurable
- with watchdog_daemon_timeout.
-
- watchdog_daemon_timeout [default: 300]: number of seconds a daemon
- is allowed to be failed before the watchdog will bark.
- """
-
- def __init__(self, ctx, manager, config, thrashers):
- Greenlet.__init__(self)
- self.ctx = ctx
- self.config = config
- self.e = None
- self.logger = log.getChild('daemon_watchdog')
- self.manager = manager
- self.name = 'watchdog'
- self.stopping = Event()
- self.thrashers = thrashers
-
- def _run(self):
- try:
- self.watch()
- except Exception as e:
- # See _run exception comment for MDSThrasher
- self.e = e
- self.logger.exception("exception:")
- # allow successful completion so gevent doesn't see an exception...
-
- def log(self, x):
- """Write data to logger"""
- self.logger.info(x)
-
- def stop(self):
- self.stopping.set()
-
- def bark(self):
- self.log("BARK! unmounting mounts and killing all daemons")
- for mount in self.ctx.mounts.values():
- try:
- mount.umount_wait(force=True)
- except:
- self.logger.exception("ignoring exception:")
- daemons = []
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
- daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
- for daemon in daemons:
- try:
- daemon.signal(signal.SIGTERM)
- except:
- self.logger.exception("ignoring exception:")
-
- def watch(self):
- self.log("watchdog starting")
- daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
- daemon_failure_time = {}
- while not self.stopping.is_set():
- bark = False
- now = time.time()
-
- mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
- mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
- clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
-
- #for daemon in mons:
- # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
- #for daemon in mdss:
- # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
-
- daemon_failures = []
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
- daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
- for daemon in daemon_failures:
- name = daemon.role + '.' + daemon.id_
- dt = daemon_failure_time.setdefault(name, (daemon, now))
- assert dt[0] is daemon
- delta = now-dt[1]
- self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
- if delta > daemon_timeout:
- bark = True
-
- # If a daemon is no longer failed, remove it from tracking:
- for name in daemon_failure_time.keys():
- if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
- self.log("daemon {name} has been restored".format(name=name))
- del daemon_failure_time[name]
-
- for thrasher in self.thrashers:
- if thrasher.e is not None:
- self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
- bark = True
-
- if bark:
- self.bark()
- return
-
- sleep(5)
-
- self.log("watchdog finished")
-
-class MDSThrasher(Greenlet):
+class MDSThrasher(Thrasher, Greenlet):
"""
MDSThrasher::
thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
during replay. Value should be between 0.0 and 1.0.
- thrash_max_mds: [default: 0.0] likelihood that the max_mds of the mds
+ thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
cluster will be modified to a value [1, current) or (current, starting
- max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
- deactivated to reach the new max_mds. Value should be between 0.0 and 1.0.
+ max_mds]. Value should be between 0.0 and 1.0.
thrash_while_stopping: [default: false] thrash an MDS while there
are MDS in up:stopping (because max_mds was changed and some
"""
def __init__(self, ctx, manager, config, fs, max_mds):
- Greenlet.__init__(self)
+ super(MDSThrasher, self).__init__()
self.config = config
self.ctx = ctx
- self.e = None
self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
self.fs = fs
self.manager = manager
self.stopping = Event()
self.randomize = bool(self.config.get('randomize', True))
- self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.0))
+ self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
self.max_thrash = int(self.config.get('max_thrash', 1))
self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
def _run(self):
try:
self.do_thrash()
+ except FSMissing:
+ pass
except Exception as e:
# Log exceptions here so we get the full backtrace (gevent loses them).
- # Also allow succesful completion as gevent exception handling is a broken mess:
+ # Also allow successful completion as gevent exception handling is a broken mess:
#
# 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
# File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
# File "/usr/lib/python2.7/traceback.py", line 13, in _print
# file.write(str+terminator)
# 2017-02-03T14:34:01.261 CRITICAL:root:IOError
- self.e = e
+ self.set_thrasher_exception(e)
self.logger.exception("exception:")
# allow successful completion so gevent doesn't see an exception...
def log(self, x):
- """Write data to logger assigned to this MDThrasher"""
+ """Write data to the logger assigned to MDSThrasher"""
self.logger.info(x)
def stop(self):
def kill_mds(self, mds):
if self.config.get('powercycle'):
(remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
- remotes.iterkeys())
+ remotes.keys())
self.log('kill_mds on mds.{m} doing powercycle of {s}'.
format(m=mds, s=remote.name))
self._assert_ipmi(remote)
"powercycling requested but RemoteConsole is not "
"initialized. Check ipmi config.")
- def revive_mds(self, mds, standby_for_rank=None):
+ def revive_mds(self, mds):
"""
Revive mds -- do an ipmpi powercycle (if indicated by the config)
- and then restart (using --hot-standby if specified.
+ and then restart.
"""
if self.config.get('powercycle'):
(remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
- remotes.iterkeys())
+ remotes.keys())
self.log('revive_mds on mds.{m} doing powercycle of {s}'.
format(m=mds, s=remote.name))
self._assert_ipmi(remote)
remote.console.power_on()
self.manager.make_admin_daemon_dir(self.ctx, remote)
args = []
- if standby_for_rank:
- args.extend(['--hot-standby', standby_for_rank])
self.ctx.daemons.get_daemon('mds', mds).restart(*args)
def wait_for_stable(self, rank = None, gid = None):
status = self.fs.status()
max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
ranks = list(status.get_ranks(self.fs.id))
- stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
- actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
+ stopping = sum(1 for _ in ranks if "up:stopping" == _['state'])
+ actives = sum(1 for _ in ranks
+ if "up:active" == _['state'] and "laggy_since" not in _)
- if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
+ if not bool(self.config.get('thrash_while_stopping', False)) and stopping > 0:
if itercount % 5 == 0:
self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
else:
return status
except:
pass # no rank present
- if len(actives) >= max_mds:
+ if actives >= max_mds:
# no replacement can occur!
- self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
+ self.log("cluster has {actives} actives (max_mds is {max_mds}), no MDS can replace rank {rank}".format(
+ actives=actives, max_mds=max_mds, rank=rank))
return status
else:
- if len(actives) >= max_mds:
- self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
+ if actives == max_mds:
+ self.log('mds cluster has {count} alive and active, now stable!'.format(count = actives))
return status, None
if itercount > 300/2: # 5 minutes
raise RuntimeError('timeout waiting for cluster to stabilize')
if random.random() <= self.thrash_max_mds:
max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
- options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
+ options = [i for i in range(1, self.max_mds + 1) if i != max_mds]
if len(options) > 0:
- sample = random.sample(options, 1)
- new_max_mds = sample[0]
+ new_max_mds = random.choice(options)
self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
self.fs.set_max_mds(new_max_mds)
stats['max_mds'] += 1
-
- # Now randomly deactivate mds if we shrank
- # TODO: it's desirable to deactivate in order. Make config to do random.
- targets = filter(lambda r: r['rank'] > 0, status.get_ranks(self.fs.id)) # can't deactivate 0
- for target in random.sample(targets, max(0, max_mds-new_max_mds)):
- self.log("deactivating rank %d" % target['rank'])
- self.fs.deactivate(target['rank'])
- stats['deactivate'] += 1
-
- status = self.wait_for_stable()[0]
+ self.wait_for_stable()
count = 0
for info in status.get_ranks(self.fs.id):
weight = 1.0
if 'thrash_weights' in self.config:
weight = self.config['thrash_weights'].get(label, '0.0')
- skip = random.randrange(0.0, 1.0)
+ skip = random.random()
if weight <= skip:
self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
continue
self.log(
'{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
else:
- self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
+ self.log('{label} down, removed from mdsmap'.format(label=label))
# wait for a standby mds to takeover and become active
status = self.wait_for_stable(rank, gid)
log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
random.seed(seed)
- (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
+ (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.keys()
manager = ceph_manager.CephManager(
first, ctx=ctx, logger=log.getChild('ceph_manager'),
)
status = mds_cluster.status()
log.info('Ready to start thrashing')
- thrashers = []
-
- watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
- watchdog.start()
-
manager.wait_for_clean()
assert manager.is_clean()
+
+ if 'cluster' not in config:
+ config['cluster'] = 'ceph'
+
for fs in status.get_filesystems():
- thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
+ thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fscid=fs['id']), fs['mdsmap']['max_mds'])
thrasher.start()
- thrashers.append(thrasher)
+ ctx.ceph[config['cluster']].thrashers.append(thrasher)
try:
log.debug('Yielding')
yield
finally:
- log.info('joining mds_thrashers')
- for thrasher in thrashers:
- thrasher.stop()
- if thrasher.e:
- raise RuntimeError('error during thrashing')
- thrasher.join()
+ log.info('joining mds_thrasher')
+ thrasher.stop()
+ if thrasher.exception is not None:
+ raise RuntimeError('error during thrashing')
+ thrasher.join()
log.info('done joining')
-
- watchdog.stop()
- watchdog.join()