]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/mds_thrash.py
update sources to v12.1.1
[ceph.git] / ceph / qa / tasks / mds_thrash.py
CommitLineData
7c673cae
FG
1"""
2Thrash mds by simulating failures
3"""
4import logging
5import contextlib
6import ceph_manager
7import itertools
8import random
9import signal
10import time
11
12from gevent import sleep
13from gevent.greenlet import Greenlet
14from gevent.event import Event
15from teuthology import misc as teuthology
16
17from tasks.cephfs.filesystem import MDSCluster, Filesystem
18
19log = logging.getLogger(__name__)
20
21class DaemonWatchdog(Greenlet):
22 """
23 DaemonWatchdog::
24
25 Watch Ceph daemons for failures. If an extended failure is detected (i.e.
26 not intentional), then the watchdog will unmount file systems and send
27 SIGTERM to all daemons. The duration of an extended failure is configurable
28 with watchdog_daemon_timeout.
29
30 watchdog_daemon_timeout [default: 300]: number of seconds a daemon
31 is allowed to be failed before the watchdog will bark.
32 """
33
34 def __init__(self, ctx, manager, config, thrashers):
35 Greenlet.__init__(self)
36 self.ctx = ctx
37 self.config = config
38 self.e = None
39 self.logger = log.getChild('daemon_watchdog')
40 self.manager = manager
41 self.name = 'watchdog'
42 self.stopping = Event()
43 self.thrashers = thrashers
44
45 def _run(self):
46 try:
47 self.watch()
48 except Exception as e:
49 # See _run exception comment for MDSThrasher
50 self.e = e
51 self.logger.exception("exception:")
52 # allow successful completion so gevent doesn't see an exception...
53
54 def log(self, x):
55 """Write data to logger"""
56 self.logger.info(x)
57
58 def stop(self):
59 self.stopping.set()
60
61 def bark(self):
62 self.log("BARK! unmounting mounts and killing all daemons")
63 for mount in self.ctx.mounts.values():
64 try:
65 mount.umount_wait(force=True)
66 except:
67 self.logger.exception("ignoring exception:")
68 daemons = []
69 daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
70 daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
71 for daemon in daemons:
72 try:
73 daemon.signal(signal.SIGTERM)
74 except:
75 self.logger.exception("ignoring exception:")
76
77 def watch(self):
78 self.log("watchdog starting")
79 daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
80 daemon_failure_time = {}
81 while not self.stopping.is_set():
82 bark = False
83 now = time.time()
84
85 mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
86 mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
87 clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
88
89 #for daemon in mons:
90 # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
91 #for daemon in mdss:
92 # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
93
94 daemon_failures = []
95 daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
96 daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
97 for daemon in daemon_failures:
98 name = daemon.role + '.' + daemon.id_
99 dt = daemon_failure_time.setdefault(name, (daemon, now))
100 assert dt[0] is daemon
101 delta = now-dt[1]
102 self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
103 if delta > daemon_timeout:
104 bark = True
105
106 # If a daemon is no longer failed, remove it from tracking:
107 for name in daemon_failure_time.keys():
108 if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
109 self.log("daemon {name} has been restored".format(name=name))
110 del daemon_failure_time[name]
111
112 for thrasher in self.thrashers:
113 if thrasher.e is not None:
114 self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
115 bark = True
116
117 if bark:
118 self.bark()
119 return
120
121 sleep(5)
122
123 self.log("watchdog finished")
124
125class MDSThrasher(Greenlet):
126 """
127 MDSThrasher::
128
129 The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
130
131 The config is optional. Many of the config parameters are a a maximum value
132 to use when selecting a random value from a range. To always use the maximum
133 value, set no_random to true. The config is a dict containing some or all of:
134
135 max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
136 any given time.
137
138 max_thrash_delay: [default: 30] maximum number of seconds to delay before
139 thrashing again.
140
141 max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
142 the replay state before thrashing.
143
144 max_revive_delay: [default: 10] maximum number of seconds to delay before
145 bringing back a thrashed MDS.
146
147 randomize: [default: true] enables randomization and use the max/min values
148
149 seed: [no default] seed the random number generator
150
151 thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
152 during replay. Value should be between 0.0 and 1.0.
153
154 thrash_max_mds: [default: 0.0] likelihood that the max_mds of the mds
155 cluster will be modified to a value [1, current) or (current, starting
156 max_mds]. When reduced, randomly selected MDSs other than rank 0 will be
157 deactivated to reach the new max_mds. Value should be between 0.0 and 1.0.
158
159 thrash_while_stopping: [default: false] thrash an MDS while there
160 are MDS in up:stopping (because max_mds was changed and some
161 MDS were deactivated).
162
163 thrash_weights: allows specific MDSs to be thrashed more/less frequently.
164 This option overrides anything specified by max_thrash. This option is a
165 dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
166 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
167 specified will be automatically given a weight of 0.0 (not thrashed).
168 For a given MDS, by default the trasher delays for up to
169 max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
170 If a non-zero weight is specified for an MDS, for each iteration the
171 thrasher chooses whether to thrash during that iteration based on a
172 random value [0-1] not exceeding the weight of that MDS.
173
174 Examples::
175
176
177 The following example sets the likelihood that mds.a will be thrashed
178 to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the
179 likelihood that an MDS will be thrashed in replay to 40%.
180 Thrash weights do not have to sum to 1.
181
182 tasks:
183 - ceph:
184 - mds_thrash:
185 thrash_weights:
186 - mds.a: 0.8
187 - mds.b: 0.2
188 thrash_in_replay: 0.4
189 - ceph-fuse:
190 - workunit:
191 clients:
192 all: [suites/fsx.sh]
193
194 The following example disables randomization, and uses the max delay values:
195
196 tasks:
197 - ceph:
198 - mds_thrash:
199 max_thrash_delay: 10
200 max_revive_delay: 1
201 max_replay_thrash_delay: 4
202
203 """
204
205 def __init__(self, ctx, manager, config, fs, max_mds):
206 Greenlet.__init__(self)
207
208 self.config = config
209 self.ctx = ctx
210 self.e = None
211 self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
212 self.fs = fs
213 self.manager = manager
214 self.max_mds = max_mds
215 self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
216 self.stopping = Event()
217
218 self.randomize = bool(self.config.get('randomize', True))
219 self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.0))
220 self.max_thrash = int(self.config.get('max_thrash', 1))
221 self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
222 self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
223 assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
224 v=self.thrash_in_replay)
225 self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
226 self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
227
228 def _run(self):
229 try:
230 self.do_thrash()
231 except Exception as e:
232 # Log exceptions here so we get the full backtrace (gevent loses them).
233 # Also allow succesful completion as gevent exception handling is a broken mess:
234 #
235 # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
236 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
237 # self.print_exception(context, type, value, tb)
238 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
239 # traceback.print_exception(type, value, tb, file=errstream)
240 # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
241 # _print(file, 'Traceback (most recent call last):')
242 # File "/usr/lib/python2.7/traceback.py", line 13, in _print
243 # file.write(str+terminator)
244 # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
245 self.e = e
246 self.logger.exception("exception:")
247 # allow successful completion so gevent doesn't see an exception...
248
249 def log(self, x):
250 """Write data to logger assigned to this MDThrasher"""
251 self.logger.info(x)
252
253 def stop(self):
254 self.stopping.set()
255
256 def kill_mds(self, mds):
257 if self.config.get('powercycle'):
258 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
259 remotes.iterkeys())
260 self.log('kill_mds on mds.{m} doing powercycle of {s}'.
261 format(m=mds, s=remote.name))
262 self._assert_ipmi(remote)
263 remote.console.power_off()
264 else:
265 self.ctx.daemons.get_daemon('mds', mds).stop()
266
267 @staticmethod
268 def _assert_ipmi(remote):
269 assert remote.console.has_ipmi_credentials, (
270 "powercycling requested but RemoteConsole is not "
271 "initialized. Check ipmi config.")
272
273 def revive_mds(self, mds, standby_for_rank=None):
274 """
275 Revive mds -- do an ipmpi powercycle (if indicated by the config)
276 and then restart (using --hot-standby if specified.
277 """
278 if self.config.get('powercycle'):
279 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
280 remotes.iterkeys())
281 self.log('revive_mds on mds.{m} doing powercycle of {s}'.
282 format(m=mds, s=remote.name))
283 self._assert_ipmi(remote)
284 remote.console.power_on()
285 self.manager.make_admin_daemon_dir(self.ctx, remote)
286 args = []
287 if standby_for_rank:
288 args.extend(['--hot-standby', standby_for_rank])
289 self.ctx.daemons.get_daemon('mds', mds).restart(*args)
290
291 def wait_for_stable(self, rank = None, gid = None):
292 self.log('waiting for mds cluster to stabilize...')
293 for itercount in itertools.count():
294 status = self.fs.status()
295 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
296 ranks = list(status.get_ranks(self.fs.id))
297 stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
298 actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
299
300 if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
301 if itercount % 5 == 0:
302 self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
303 else:
304 if rank is not None:
305 try:
306 info = status.get_rank(self.fs.id, rank)
307 if info['gid'] != gid and "up:active" == info['state']:
308 self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
309 return status
310 except:
311 pass # no rank present
312 if len(actives) >= max_mds:
313 # no replacement can occur!
314 self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
315 return status
316 else:
317 if len(actives) >= max_mds:
318 self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
319 return status, None
320 if itercount > 300/2: # 5 minutes
321 raise RuntimeError('timeout waiting for cluster to stabilize')
322 elif itercount % 5 == 0:
323 self.log('mds map: {status}'.format(status=status))
324 else:
325 self.log('no change')
326 sleep(2)
327
328 def do_thrash(self):
329 """
330 Perform the random thrashing action
331 """
332
333 self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
334 stats = {
335 "max_mds": 0,
336 "deactivate": 0,
337 "kill": 0,
338 }
339
340 while not self.stopping.is_set():
341 delay = self.max_thrash_delay
342 if self.randomize:
343 delay = random.randrange(0.0, self.max_thrash_delay)
344
345 if delay > 0.0:
346 self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
347 self.stopping.wait(delay)
348 if self.stopping.is_set():
349 continue
350
351 status = self.fs.status()
352
353 if random.random() <= self.thrash_max_mds:
354 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
355 options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
356 if len(options) > 0:
357 sample = random.sample(options, 1)
358 new_max_mds = sample[0]
359 self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
360 self.fs.set_max_mds(new_max_mds)
361 stats['max_mds'] += 1
362
224ce89b
WB
363 targets = filter(lambda r: r['rank'] >= new_max_mds, status.get_ranks(self.fs.id))
364 if len(targets) > 0:
365 # deactivate mds in decending order
366 targets = sorted(targets, key=lambda r: r['rank'], reverse=True)
367 for target in targets:
368 self.log("deactivating rank %d" % target['rank'])
369 self.fs.deactivate(target['rank'])
370 stats['deactivate'] += 1
371 status = self.wait_for_stable()[0]
372 else:
373 status = self.wait_for_stable()[0]
7c673cae
FG
374
375 count = 0
376 for info in status.get_ranks(self.fs.id):
377 name = info['name']
378 label = 'mds.' + name
379 rank = info['rank']
380 gid = info['gid']
381
382 # if thrash_weights isn't specified and we've reached max_thrash,
383 # we're done
384 count = count + 1
385 if 'thrash_weights' not in self.config and count > self.max_thrash:
386 break
387
388 weight = 1.0
389 if 'thrash_weights' in self.config:
390 weight = self.config['thrash_weights'].get(label, '0.0')
391 skip = random.randrange(0.0, 1.0)
392 if weight <= skip:
393 self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
394 continue
395
396 self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
397 self.kill_mds(name)
398 stats['kill'] += 1
399
400 # wait for mon to report killed mds as crashed
401 last_laggy_since = None
402 itercount = 0
403 while True:
404 status = self.fs.status()
405 info = status.get_mds(name)
406 if not info:
407 break
408 if 'laggy_since' in info:
409 last_laggy_since = info['laggy_since']
410 break
411 if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
412 break
413 self.log(
414 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
415 label=label))
416 itercount = itercount + 1
417 if itercount > 10:
418 self.log('mds map: {status}'.format(status=status))
419 sleep(2)
420
421 if last_laggy_since:
422 self.log(
423 '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
424 else:
425 self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
426
427 # wait for a standby mds to takeover and become active
428 status = self.wait_for_stable(rank, gid)
429
430 # wait for a while before restarting old active to become new
431 # standby
432 delay = self.max_revive_delay
433 if self.randomize:
434 delay = random.randrange(0.0, self.max_revive_delay)
435
436 self.log('waiting for {delay} secs before reviving {label}'.format(
437 delay=delay, label=label))
438 sleep(delay)
439
440 self.log('reviving {label}'.format(label=label))
441 self.revive_mds(name)
442
443 for itercount in itertools.count():
444 if itercount > 300/2: # 5 minutes
445 raise RuntimeError('timeout waiting for MDS to revive')
446 status = self.fs.status()
447 info = status.get_mds(name)
448 if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
449 self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
450 break
451 self.log(
452 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
453 sleep(2)
454
455 for stat in stats:
456 self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
457
458 # don't do replay thrashing right now
459# for info in status.get_replays(self.fs.id):
460# # this might race with replay -> active transition...
461# if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
462# delay = self.max_replay_thrash_delay
463# if self.randomize:
464# delay = random.randrange(0.0, self.max_replay_thrash_delay)
465# sleep(delay)
466# self.log('kill replaying mds.{id}'.format(id=self.to_kill))
467# self.kill_mds(self.to_kill)
468#
469# delay = self.max_revive_delay
470# if self.randomize:
471# delay = random.randrange(0.0, self.max_revive_delay)
472#
473# self.log('waiting for {delay} secs before reviving mds.{id}'.format(
474# delay=delay, id=self.to_kill))
475# sleep(delay)
476#
477# self.log('revive mds.{id}'.format(id=self.to_kill))
478# self.revive_mds(self.to_kill)
479
480
481@contextlib.contextmanager
482def task(ctx, config):
483 """
484 Stress test the mds by thrashing while another task/workunit
485 is running.
486
487 Please refer to MDSThrasher class for further information on the
488 available options.
489 """
490
491 mds_cluster = MDSCluster(ctx)
492
493 if config is None:
494 config = {}
495 assert isinstance(config, dict), \
496 'mds_thrash task only accepts a dict for configuration'
497 mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
498 assert len(mdslist) > 1, \
499 'mds_thrash task requires at least 2 metadata servers'
500
501 # choose random seed
502 if 'seed' in config:
503 seed = int(config['seed'])
504 else:
505 seed = int(time.time())
506 log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
507 random.seed(seed)
508
509 (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
510 manager = ceph_manager.CephManager(
511 first, ctx=ctx, logger=log.getChild('ceph_manager'),
512 )
513
514 # make sure everyone is in active, standby, or standby-replay
515 log.info('Wait for all MDSs to reach steady state...')
516 status = mds_cluster.status()
517 while True:
518 steady = True
519 for info in status.get_all():
520 state = info['state']
521 if state not in ('up:active', 'up:standby', 'up:standby-replay'):
522 steady = False
523 break
524 if steady:
525 break
526 sleep(2)
527 status = mds_cluster.status()
528 log.info('Ready to start thrashing')
529
530 thrashers = []
531
532 watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
533 watchdog.start()
534
535 manager.wait_for_clean()
536 assert manager.is_clean()
537 for fs in status.get_filesystems():
538 thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
539 thrasher.start()
540 thrashers.append(thrasher)
541
542 try:
543 log.debug('Yielding')
544 yield
545 finally:
546 log.info('joining mds_thrashers')
547 for thrasher in thrashers:
548 thrasher.stop()
549 if thrasher.e:
550 raise RuntimeError('error during thrashing')
551 thrasher.join()
552 log.info('done joining')
553
554 watchdog.stop()
555 watchdog.join()