]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/mds_thrash.py
import quincy beta 17.1.0
[ceph.git] / ceph / qa / tasks / mds_thrash.py
index 8714967b95229880835be0acdcbf16d707db2a61..7b7b420f9ea5878397222ae78fdde2b1d5eb5f04 100644 (file)
@@ -3,10 +3,8 @@ Thrash mds by simulating failures
 """
 import logging
 import contextlib
-import ceph_manager
 import itertools
 import random
-import signal
 import time
 
 from gevent import sleep
@@ -14,115 +12,13 @@ from gevent.greenlet import Greenlet
 from gevent.event import Event
 from teuthology import misc as teuthology
 
-from tasks.cephfs.filesystem import MDSCluster, Filesystem
+from tasks import ceph_manager
+from tasks.cephfs.filesystem import MDSCluster, Filesystem, FSMissing
+from tasks.thrasher import Thrasher
 
 log = logging.getLogger(__name__)
 
-class DaemonWatchdog(Greenlet):
-    """
-    DaemonWatchdog::
-
-    Watch Ceph daemons for failures. If an extended failure is detected (i.e.
-    not intentional), then the watchdog will unmount file systems and send
-    SIGTERM to all daemons. The duration of an extended failure is configurable
-    with watchdog_daemon_timeout.
-
-    watchdog_daemon_timeout [default: 300]: number of seconds a daemon
-        is allowed to be failed before the watchdog will bark.
-    """
-
-    def __init__(self, ctx, manager, config, thrashers):
-        Greenlet.__init__(self)
-        self.ctx = ctx
-        self.config = config
-        self.e = None
-        self.logger = log.getChild('daemon_watchdog')
-        self.manager = manager
-        self.name = 'watchdog'
-        self.stopping = Event()
-        self.thrashers = thrashers
-
-    def _run(self):
-        try:
-            self.watch()
-        except Exception as e:
-            # See _run exception comment for MDSThrasher
-            self.e = e
-            self.logger.exception("exception:")
-            # allow successful completion so gevent doesn't see an exception...
-
-    def log(self, x):
-        """Write data to logger"""
-        self.logger.info(x)
-
-    def stop(self):
-        self.stopping.set()
-
-    def bark(self):
-        self.log("BARK! unmounting mounts and killing all daemons")
-        for mount in self.ctx.mounts.values():
-            try:
-                mount.umount_wait(force=True)
-            except:
-                self.logger.exception("ignoring exception:")
-        daemons = []
-        daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
-        daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
-        for daemon in daemons:
-            try:
-                daemon.signal(signal.SIGTERM)
-            except:
-                self.logger.exception("ignoring exception:")
-
-    def watch(self):
-        self.log("watchdog starting")
-        daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
-        daemon_failure_time = {}
-        while not self.stopping.is_set():
-            bark = False
-            now = time.time()
-
-            mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
-            mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
-            clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
-
-            #for daemon in mons:
-            #    self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
-            #for daemon in mdss:
-            #    self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
-
-            daemon_failures = []
-            daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
-            daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
-            for daemon in daemon_failures:
-                name = daemon.role + '.' + daemon.id_
-                dt = daemon_failure_time.setdefault(name, (daemon, now))
-                assert dt[0] is daemon
-                delta = now-dt[1]
-                self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
-                if delta > daemon_timeout:
-                    bark = True
-
-            # If a daemon is no longer failed, remove it from tracking:
-            for name in daemon_failure_time.keys():
-                if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
-                    self.log("daemon {name} has been restored".format(name=name))
-                    del daemon_failure_time[name]
-
-            for thrasher in self.thrashers:
-                if thrasher.e is not None:
-                    self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
-                    bark = True
-
-            if bark:
-                self.bark()
-                return
-
-            sleep(5)
-
-        self.log("watchdog finished")
-
-class MDSThrasher(Greenlet):
+class MDSThrasher(Thrasher, Greenlet):
     """
     MDSThrasher::
 
@@ -151,10 +47,9 @@ class MDSThrasher(Greenlet):
     thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
       during replay.  Value should be between 0.0 and 1.0.
 
-    thrash_max_mds: [default: 0.0] likelihood that the max_mds of the mds
+    thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
       cluster will be modified to a value [1, current) or (current, starting
-      max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
-      deactivated to reach the new max_mds.  Value should be between 0.0 and 1.0.
+      max_mds]. Value should be between 0.0 and 1.0.
 
     thrash_while_stopping: [default: false] thrash an MDS while there
       are MDS in up:stopping (because max_mds was changed and some
@@ -203,11 +98,10 @@ class MDSThrasher(Greenlet):
     """
 
     def __init__(self, ctx, manager, config, fs, max_mds):
-        Greenlet.__init__(self)
+        super(MDSThrasher, self).__init__()
 
         self.config = config
         self.ctx = ctx
-        self.e = None
         self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
         self.fs = fs
         self.manager = manager
@@ -216,7 +110,7 @@ class MDSThrasher(Greenlet):
         self.stopping = Event()
 
         self.randomize = bool(self.config.get('randomize', True))
-        self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.0))
+        self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
         self.max_thrash = int(self.config.get('max_thrash', 1))
         self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
         self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
@@ -228,9 +122,11 @@ class MDSThrasher(Greenlet):
     def _run(self):
         try:
             self.do_thrash()
+        except FSMissing:
+            pass
         except Exception as e:
             # Log exceptions here so we get the full backtrace (gevent loses them).
-            # Also allow succesful completion as gevent exception handling is a broken mess:
+            # Also allow successful completion as gevent exception handling is a broken mess:
             #
             # 2017-02-03T14:34:01.259 CRITICAL:root:  File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
             #   File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
@@ -242,12 +138,12 @@ class MDSThrasher(Greenlet):
             #   File "/usr/lib/python2.7/traceback.py", line 13, in _print
             #     file.write(str+terminator)
             # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
-            self.e = e
+            self.set_thrasher_exception(e)
             self.logger.exception("exception:")
             # allow successful completion so gevent doesn't see an exception...
 
     def log(self, x):
-        """Write data to logger assigned to this MDThrasher"""
+        """Write data to the logger assigned to MDSThrasher"""
         self.logger.info(x)
 
     def stop(self):
@@ -256,7 +152,7 @@ class MDSThrasher(Greenlet):
     def kill_mds(self, mds):
         if self.config.get('powercycle'):
             (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
-                         remotes.iterkeys())
+                         remotes.keys())
             self.log('kill_mds on mds.{m} doing powercycle of {s}'.
                      format(m=mds, s=remote.name))
             self._assert_ipmi(remote)
@@ -270,22 +166,20 @@ class MDSThrasher(Greenlet):
             "powercycling requested but RemoteConsole is not "
             "initialized.  Check ipmi config.")
 
-    def revive_mds(self, mds, standby_for_rank=None):
+    def revive_mds(self, mds):
         """
         Revive mds -- do an ipmpi powercycle (if indicated by the config)
-        and then restart (using --hot-standby if specified.
+        and then restart.
         """
         if self.config.get('powercycle'):
             (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
-                         remotes.iterkeys())
+                         remotes.keys())
             self.log('revive_mds on mds.{m} doing powercycle of {s}'.
                      format(m=mds, s=remote.name))
             self._assert_ipmi(remote)
             remote.console.power_on()
             self.manager.make_admin_daemon_dir(self.ctx, remote)
         args = []
-        if standby_for_rank:
-            args.extend(['--hot-standby', standby_for_rank])
         self.ctx.daemons.get_daemon('mds', mds).restart(*args)
 
     def wait_for_stable(self, rank = None, gid = None):
@@ -294,10 +188,11 @@ class MDSThrasher(Greenlet):
             status = self.fs.status()
             max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
             ranks = list(status.get_ranks(self.fs.id))
-            stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
-            actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
+            stopping = sum(1 for _ in ranks if "up:stopping" == _['state'])
+            actives = sum(1 for _ in ranks
+                          if "up:active" == _['state'] and "laggy_since" not in _)
 
-            if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
+            if not bool(self.config.get('thrash_while_stopping', False)) and stopping > 0:
                 if itercount % 5 == 0:
                     self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
             else:
@@ -309,13 +204,14 @@ class MDSThrasher(Greenlet):
                             return status
                     except:
                         pass # no rank present
-                    if len(actives) >= max_mds:
+                    if actives >= max_mds:
                         # no replacement can occur!
-                        self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
+                        self.log("cluster has {actives} actives (max_mds is {max_mds}), no MDS can replace rank {rank}".format(
+                            actives=actives, max_mds=max_mds, rank=rank))
                         return status
                 else:
-                    if len(actives) >= max_mds:
-                        self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
+                    if actives == max_mds:
+                        self.log('mds cluster has {count} alive and active, now stable!'.format(count = actives))
                         return status, None
             if itercount > 300/2: # 5 minutes
                  raise RuntimeError('timeout waiting for cluster to stabilize')
@@ -352,23 +248,13 @@ class MDSThrasher(Greenlet):
 
             if random.random() <= self.thrash_max_mds:
                 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
-                options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
+                options = [i for i in range(1, self.max_mds + 1) if i != max_mds]
                 if len(options) > 0:
-                    sample = random.sample(options, 1)
-                    new_max_mds = sample[0]
+                    new_max_mds = random.choice(options)
                     self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
                     self.fs.set_max_mds(new_max_mds)
                     stats['max_mds'] += 1
-
-                    # Now randomly deactivate mds if we shrank
-                    # TODO: it's desirable to deactivate in order. Make config to do random.
-                    targets = filter(lambda r: r['rank'] > 0, status.get_ranks(self.fs.id)) # can't deactivate 0
-                    for target in random.sample(targets, max(0, max_mds-new_max_mds)):
-                        self.log("deactivating rank %d" % target['rank'])
-                        self.fs.deactivate(target['rank'])
-                        stats['deactivate'] += 1
-
-                    status = self.wait_for_stable()[0]
+                    self.wait_for_stable()
 
             count = 0
             for info in status.get_ranks(self.fs.id):
@@ -386,7 +272,7 @@ class MDSThrasher(Greenlet):
                 weight = 1.0
                 if 'thrash_weights' in self.config:
                     weight = self.config['thrash_weights'].get(label, '0.0')
-                skip = random.randrange(0.0, 1.0)
+                skip = random.random()
                 if weight <= skip:
                     self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
                     continue
@@ -420,7 +306,7 @@ class MDSThrasher(Greenlet):
                     self.log(
                         '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
                 else:
-                    self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
+                    self.log('{label} down, removed from mdsmap'.format(label=label))
 
                 # wait for a standby mds to takeover and become active
                 status = self.wait_for_stable(rank, gid)
@@ -504,7 +390,7 @@ def task(ctx, config):
     log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
     random.seed(seed)
 
-    (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
+    (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.keys()
     manager = ceph_manager.CephManager(
         first, ctx=ctx, logger=log.getChild('ceph_manager'),
     )
@@ -525,29 +411,24 @@ def task(ctx, config):
         status = mds_cluster.status()
     log.info('Ready to start thrashing')
 
-    thrashers = []
-
-    watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
-    watchdog.start()
-
     manager.wait_for_clean()
     assert manager.is_clean()
+
+    if 'cluster' not in config:
+        config['cluster'] = 'ceph'
+
     for fs in status.get_filesystems():
-        thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
+        thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fscid=fs['id']), fs['mdsmap']['max_mds'])
         thrasher.start()
-        thrashers.append(thrasher)
+        ctx.ceph[config['cluster']].thrashers.append(thrasher)
 
     try:
         log.debug('Yielding')
         yield
     finally:
-        log.info('joining mds_thrashers')
-        for thrasher in thrashers:
-            thrasher.stop()
-            if thrasher.e:
-                raise RuntimeError('error during thrashing')
-            thrasher.join()
+        log.info('joining mds_thrasher')
+        thrasher.stop()
+        if thrasher.exception is not None:
+            raise RuntimeError('error during thrashing')
+        thrasher.join()
         log.info('done joining')
-
-        watchdog.stop()
-        watchdog.join()