]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mds_thrash.py
8714967b95229880835be0acdcbf16d707db2a61
[ceph.git] / ceph / qa / tasks / mds_thrash.py
1 """
2 Thrash mds by simulating failures
3 """
4 import logging
5 import contextlib
6 import ceph_manager
7 import itertools
8 import random
9 import signal
10 import time
11
12 from gevent import sleep
13 from gevent.greenlet import Greenlet
14 from gevent.event import Event
15 from teuthology import misc as teuthology
16
17 from tasks.cephfs.filesystem import MDSCluster, Filesystem
18
19 log = logging.getLogger(__name__)
20
21 class DaemonWatchdog(Greenlet):
22 """
23 DaemonWatchdog::
24
25 Watch Ceph daemons for failures. If an extended failure is detected (i.e.
26 not intentional), then the watchdog will unmount file systems and send
27 SIGTERM to all daemons. The duration of an extended failure is configurable
28 with watchdog_daemon_timeout.
29
30 watchdog_daemon_timeout [default: 300]: number of seconds a daemon
31 is allowed to be failed before the watchdog will bark.
32 """
33
34 def __init__(self, ctx, manager, config, thrashers):
35 Greenlet.__init__(self)
36 self.ctx = ctx
37 self.config = config
38 self.e = None
39 self.logger = log.getChild('daemon_watchdog')
40 self.manager = manager
41 self.name = 'watchdog'
42 self.stopping = Event()
43 self.thrashers = thrashers
44
45 def _run(self):
46 try:
47 self.watch()
48 except Exception as e:
49 # See _run exception comment for MDSThrasher
50 self.e = e
51 self.logger.exception("exception:")
52 # allow successful completion so gevent doesn't see an exception...
53
54 def log(self, x):
55 """Write data to logger"""
56 self.logger.info(x)
57
58 def stop(self):
59 self.stopping.set()
60
61 def bark(self):
62 self.log("BARK! unmounting mounts and killing all daemons")
63 for mount in self.ctx.mounts.values():
64 try:
65 mount.umount_wait(force=True)
66 except:
67 self.logger.exception("ignoring exception:")
68 daemons = []
69 daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
70 daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
71 for daemon in daemons:
72 try:
73 daemon.signal(signal.SIGTERM)
74 except:
75 self.logger.exception("ignoring exception:")
76
77 def watch(self):
78 self.log("watchdog starting")
79 daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
80 daemon_failure_time = {}
81 while not self.stopping.is_set():
82 bark = False
83 now = time.time()
84
85 mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
86 mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
87 clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
88
89 #for daemon in mons:
90 # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
91 #for daemon in mdss:
92 # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
93
94 daemon_failures = []
95 daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
96 daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
97 for daemon in daemon_failures:
98 name = daemon.role + '.' + daemon.id_
99 dt = daemon_failure_time.setdefault(name, (daemon, now))
100 assert dt[0] is daemon
101 delta = now-dt[1]
102 self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
103 if delta > daemon_timeout:
104 bark = True
105
106 # If a daemon is no longer failed, remove it from tracking:
107 for name in daemon_failure_time.keys():
108 if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
109 self.log("daemon {name} has been restored".format(name=name))
110 del daemon_failure_time[name]
111
112 for thrasher in self.thrashers:
113 if thrasher.e is not None:
114 self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
115 bark = True
116
117 if bark:
118 self.bark()
119 return
120
121 sleep(5)
122
123 self.log("watchdog finished")
124
125 class MDSThrasher(Greenlet):
126 """
127 MDSThrasher::
128
129 The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
130
131 The config is optional. Many of the config parameters are a a maximum value
132 to use when selecting a random value from a range. To always use the maximum
133 value, set no_random to true. The config is a dict containing some or all of:
134
135 max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
136 any given time.
137
138 max_thrash_delay: [default: 30] maximum number of seconds to delay before
139 thrashing again.
140
141 max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
142 the replay state before thrashing.
143
144 max_revive_delay: [default: 10] maximum number of seconds to delay before
145 bringing back a thrashed MDS.
146
147 randomize: [default: true] enables randomization and use the max/min values
148
149 seed: [no default] seed the random number generator
150
151 thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
152 during replay. Value should be between 0.0 and 1.0.
153
154 thrash_max_mds: [default: 0.0] likelihood that the max_mds of the mds
155 cluster will be modified to a value [1, current) or (current, starting
156 max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
157 deactivated to reach the new max_mds. Value should be between 0.0 and 1.0.
158
159 thrash_while_stopping: [default: false] thrash an MDS while there
160 are MDS in up:stopping (because max_mds was changed and some
161 MDS were deactivated).
162
163 thrash_weights: allows specific MDSs to be thrashed more/less frequently.
164 This option overrides anything specified by max_thrash. This option is a
165 dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
166 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
167 specified will be automatically given a weight of 0.0 (not thrashed).
168 For a given MDS, by default the trasher delays for up to
169 max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
170 If a non-zero weight is specified for an MDS, for each iteration the
171 thrasher chooses whether to thrash during that iteration based on a
172 random value [0-1] not exceeding the weight of that MDS.
173
174 Examples::
175
176
177 The following example sets the likelihood that mds.a will be thrashed
178 to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the
179 likelihood that an MDS will be thrashed in replay to 40%.
180 Thrash weights do not have to sum to 1.
181
182 tasks:
183 - ceph:
184 - mds_thrash:
185 thrash_weights:
186 - mds.a: 0.8
187 - mds.b: 0.2
188 thrash_in_replay: 0.4
189 - ceph-fuse:
190 - workunit:
191 clients:
192 all: [suites/fsx.sh]
193
194 The following example disables randomization, and uses the max delay values:
195
196 tasks:
197 - ceph:
198 - mds_thrash:
199 max_thrash_delay: 10
200 max_revive_delay: 1
201 max_replay_thrash_delay: 4
202
203 """
204
205 def __init__(self, ctx, manager, config, fs, max_mds):
206 Greenlet.__init__(self)
207
208 self.config = config
209 self.ctx = ctx
210 self.e = None
211 self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
212 self.fs = fs
213 self.manager = manager
214 self.max_mds = max_mds
215 self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
216 self.stopping = Event()
217
218 self.randomize = bool(self.config.get('randomize', True))
219 self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.0))
220 self.max_thrash = int(self.config.get('max_thrash', 1))
221 self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
222 self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
223 assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
224 v=self.thrash_in_replay)
225 self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
226 self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
227
228 def _run(self):
229 try:
230 self.do_thrash()
231 except Exception as e:
232 # Log exceptions here so we get the full backtrace (gevent loses them).
233 # Also allow succesful completion as gevent exception handling is a broken mess:
234 #
235 # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
236 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
237 # self.print_exception(context, type, value, tb)
238 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
239 # traceback.print_exception(type, value, tb, file=errstream)
240 # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
241 # _print(file, 'Traceback (most recent call last):')
242 # File "/usr/lib/python2.7/traceback.py", line 13, in _print
243 # file.write(str+terminator)
244 # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
245 self.e = e
246 self.logger.exception("exception:")
247 # allow successful completion so gevent doesn't see an exception...
248
249 def log(self, x):
250 """Write data to logger assigned to this MDThrasher"""
251 self.logger.info(x)
252
253 def stop(self):
254 self.stopping.set()
255
256 def kill_mds(self, mds):
257 if self.config.get('powercycle'):
258 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
259 remotes.iterkeys())
260 self.log('kill_mds on mds.{m} doing powercycle of {s}'.
261 format(m=mds, s=remote.name))
262 self._assert_ipmi(remote)
263 remote.console.power_off()
264 else:
265 self.ctx.daemons.get_daemon('mds', mds).stop()
266
267 @staticmethod
268 def _assert_ipmi(remote):
269 assert remote.console.has_ipmi_credentials, (
270 "powercycling requested but RemoteConsole is not "
271 "initialized. Check ipmi config.")
272
273 def revive_mds(self, mds, standby_for_rank=None):
274 """
275 Revive mds -- do an ipmpi powercycle (if indicated by the config)
276 and then restart (using --hot-standby if specified.
277 """
278 if self.config.get('powercycle'):
279 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
280 remotes.iterkeys())
281 self.log('revive_mds on mds.{m} doing powercycle of {s}'.
282 format(m=mds, s=remote.name))
283 self._assert_ipmi(remote)
284 remote.console.power_on()
285 self.manager.make_admin_daemon_dir(self.ctx, remote)
286 args = []
287 if standby_for_rank:
288 args.extend(['--hot-standby', standby_for_rank])
289 self.ctx.daemons.get_daemon('mds', mds).restart(*args)
290
291 def wait_for_stable(self, rank = None, gid = None):
292 self.log('waiting for mds cluster to stabilize...')
293 for itercount in itertools.count():
294 status = self.fs.status()
295 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
296 ranks = list(status.get_ranks(self.fs.id))
297 stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
298 actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
299
300 if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
301 if itercount % 5 == 0:
302 self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
303 else:
304 if rank is not None:
305 try:
306 info = status.get_rank(self.fs.id, rank)
307 if info['gid'] != gid and "up:active" == info['state']:
308 self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
309 return status
310 except:
311 pass # no rank present
312 if len(actives) >= max_mds:
313 # no replacement can occur!
314 self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
315 return status
316 else:
317 if len(actives) >= max_mds:
318 self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
319 return status, None
320 if itercount > 300/2: # 5 minutes
321 raise RuntimeError('timeout waiting for cluster to stabilize')
322 elif itercount % 5 == 0:
323 self.log('mds map: {status}'.format(status=status))
324 else:
325 self.log('no change')
326 sleep(2)
327
328 def do_thrash(self):
329 """
330 Perform the random thrashing action
331 """
332
333 self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
334 stats = {
335 "max_mds": 0,
336 "deactivate": 0,
337 "kill": 0,
338 }
339
340 while not self.stopping.is_set():
341 delay = self.max_thrash_delay
342 if self.randomize:
343 delay = random.randrange(0.0, self.max_thrash_delay)
344
345 if delay > 0.0:
346 self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
347 self.stopping.wait(delay)
348 if self.stopping.is_set():
349 continue
350
351 status = self.fs.status()
352
353 if random.random() <= self.thrash_max_mds:
354 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
355 options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
356 if len(options) > 0:
357 sample = random.sample(options, 1)
358 new_max_mds = sample[0]
359 self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
360 self.fs.set_max_mds(new_max_mds)
361 stats['max_mds'] += 1
362
363 # Now randomly deactivate mds if we shrank
364 # TODO: it's desirable to deactivate in order. Make config to do random.
365 targets = filter(lambda r: r['rank'] > 0, status.get_ranks(self.fs.id)) # can't deactivate 0
366 for target in random.sample(targets, max(0, max_mds-new_max_mds)):
367 self.log("deactivating rank %d" % target['rank'])
368 self.fs.deactivate(target['rank'])
369 stats['deactivate'] += 1
370
371 status = self.wait_for_stable()[0]
372
373 count = 0
374 for info in status.get_ranks(self.fs.id):
375 name = info['name']
376 label = 'mds.' + name
377 rank = info['rank']
378 gid = info['gid']
379
380 # if thrash_weights isn't specified and we've reached max_thrash,
381 # we're done
382 count = count + 1
383 if 'thrash_weights' not in self.config and count > self.max_thrash:
384 break
385
386 weight = 1.0
387 if 'thrash_weights' in self.config:
388 weight = self.config['thrash_weights'].get(label, '0.0')
389 skip = random.randrange(0.0, 1.0)
390 if weight <= skip:
391 self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
392 continue
393
394 self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
395 self.kill_mds(name)
396 stats['kill'] += 1
397
398 # wait for mon to report killed mds as crashed
399 last_laggy_since = None
400 itercount = 0
401 while True:
402 status = self.fs.status()
403 info = status.get_mds(name)
404 if not info:
405 break
406 if 'laggy_since' in info:
407 last_laggy_since = info['laggy_since']
408 break
409 if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
410 break
411 self.log(
412 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
413 label=label))
414 itercount = itercount + 1
415 if itercount > 10:
416 self.log('mds map: {status}'.format(status=status))
417 sleep(2)
418
419 if last_laggy_since:
420 self.log(
421 '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
422 else:
423 self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
424
425 # wait for a standby mds to takeover and become active
426 status = self.wait_for_stable(rank, gid)
427
428 # wait for a while before restarting old active to become new
429 # standby
430 delay = self.max_revive_delay
431 if self.randomize:
432 delay = random.randrange(0.0, self.max_revive_delay)
433
434 self.log('waiting for {delay} secs before reviving {label}'.format(
435 delay=delay, label=label))
436 sleep(delay)
437
438 self.log('reviving {label}'.format(label=label))
439 self.revive_mds(name)
440
441 for itercount in itertools.count():
442 if itercount > 300/2: # 5 minutes
443 raise RuntimeError('timeout waiting for MDS to revive')
444 status = self.fs.status()
445 info = status.get_mds(name)
446 if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
447 self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
448 break
449 self.log(
450 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
451 sleep(2)
452
453 for stat in stats:
454 self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
455
456 # don't do replay thrashing right now
457 # for info in status.get_replays(self.fs.id):
458 # # this might race with replay -> active transition...
459 # if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
460 # delay = self.max_replay_thrash_delay
461 # if self.randomize:
462 # delay = random.randrange(0.0, self.max_replay_thrash_delay)
463 # sleep(delay)
464 # self.log('kill replaying mds.{id}'.format(id=self.to_kill))
465 # self.kill_mds(self.to_kill)
466 #
467 # delay = self.max_revive_delay
468 # if self.randomize:
469 # delay = random.randrange(0.0, self.max_revive_delay)
470 #
471 # self.log('waiting for {delay} secs before reviving mds.{id}'.format(
472 # delay=delay, id=self.to_kill))
473 # sleep(delay)
474 #
475 # self.log('revive mds.{id}'.format(id=self.to_kill))
476 # self.revive_mds(self.to_kill)
477
478
479 @contextlib.contextmanager
480 def task(ctx, config):
481 """
482 Stress test the mds by thrashing while another task/workunit
483 is running.
484
485 Please refer to MDSThrasher class for further information on the
486 available options.
487 """
488
489 mds_cluster = MDSCluster(ctx)
490
491 if config is None:
492 config = {}
493 assert isinstance(config, dict), \
494 'mds_thrash task only accepts a dict for configuration'
495 mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
496 assert len(mdslist) > 1, \
497 'mds_thrash task requires at least 2 metadata servers'
498
499 # choose random seed
500 if 'seed' in config:
501 seed = int(config['seed'])
502 else:
503 seed = int(time.time())
504 log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
505 random.seed(seed)
506
507 (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
508 manager = ceph_manager.CephManager(
509 first, ctx=ctx, logger=log.getChild('ceph_manager'),
510 )
511
512 # make sure everyone is in active, standby, or standby-replay
513 log.info('Wait for all MDSs to reach steady state...')
514 status = mds_cluster.status()
515 while True:
516 steady = True
517 for info in status.get_all():
518 state = info['state']
519 if state not in ('up:active', 'up:standby', 'up:standby-replay'):
520 steady = False
521 break
522 if steady:
523 break
524 sleep(2)
525 status = mds_cluster.status()
526 log.info('Ready to start thrashing')
527
528 thrashers = []
529
530 watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
531 watchdog.start()
532
533 manager.wait_for_clean()
534 assert manager.is_clean()
535 for fs in status.get_filesystems():
536 thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
537 thrasher.start()
538 thrashers.append(thrasher)
539
540 try:
541 log.debug('Yielding')
542 yield
543 finally:
544 log.info('joining mds_thrashers')
545 for thrasher in thrashers:
546 thrasher.stop()
547 if thrasher.e:
548 raise RuntimeError('error during thrashing')
549 thrasher.join()
550 log.info('done joining')
551
552 watchdog.stop()
553 watchdog.join()