]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Thrash mds by simulating failures | |
3 | """ | |
4 | import logging | |
5 | import contextlib | |
6 | import ceph_manager | |
7 | import itertools | |
8 | import random | |
9 | import signal | |
10 | import time | |
11 | ||
12 | from gevent import sleep | |
13 | from gevent.greenlet import Greenlet | |
14 | from gevent.event import Event | |
15 | from teuthology import misc as teuthology | |
16 | ||
17 | from tasks.cephfs.filesystem import MDSCluster, Filesystem | |
18 | ||
19 | log = logging.getLogger(__name__) | |
20 | ||
21 | class DaemonWatchdog(Greenlet): | |
22 | """ | |
23 | DaemonWatchdog:: | |
24 | ||
25 | Watch Ceph daemons for failures. If an extended failure is detected (i.e. | |
26 | not intentional), then the watchdog will unmount file systems and send | |
27 | SIGTERM to all daemons. The duration of an extended failure is configurable | |
28 | with watchdog_daemon_timeout. | |
29 | ||
30 | watchdog_daemon_timeout [default: 300]: number of seconds a daemon | |
31 | is allowed to be failed before the watchdog will bark. | |
32 | """ | |
33 | ||
34 | def __init__(self, ctx, manager, config, thrashers): | |
35 | Greenlet.__init__(self) | |
36 | self.ctx = ctx | |
37 | self.config = config | |
38 | self.e = None | |
39 | self.logger = log.getChild('daemon_watchdog') | |
40 | self.manager = manager | |
41 | self.name = 'watchdog' | |
42 | self.stopping = Event() | |
43 | self.thrashers = thrashers | |
44 | ||
45 | def _run(self): | |
46 | try: | |
47 | self.watch() | |
48 | except Exception as e: | |
49 | # See _run exception comment for MDSThrasher | |
50 | self.e = e | |
51 | self.logger.exception("exception:") | |
52 | # allow successful completion so gevent doesn't see an exception... | |
53 | ||
54 | def log(self, x): | |
55 | """Write data to logger""" | |
56 | self.logger.info(x) | |
57 | ||
58 | def stop(self): | |
59 | self.stopping.set() | |
60 | ||
61 | def bark(self): | |
62 | self.log("BARK! unmounting mounts and killing all daemons") | |
63 | for mount in self.ctx.mounts.values(): | |
64 | try: | |
65 | mount.umount_wait(force=True) | |
66 | except: | |
67 | self.logger.exception("ignoring exception:") | |
68 | daemons = [] | |
69 | daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster))) | |
70 | daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster))) | |
71 | for daemon in daemons: | |
72 | try: | |
73 | daemon.signal(signal.SIGTERM) | |
74 | except: | |
75 | self.logger.exception("ignoring exception:") | |
76 | ||
77 | def watch(self): | |
78 | self.log("watchdog starting") | |
79 | daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300)) | |
80 | daemon_failure_time = {} | |
81 | while not self.stopping.is_set(): | |
82 | bark = False | |
83 | now = time.time() | |
84 | ||
85 | mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster) | |
86 | mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster) | |
87 | clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster) | |
88 | ||
89 | #for daemon in mons: | |
90 | # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished)) | |
91 | #for daemon in mdss: | |
92 | # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished)) | |
93 | ||
94 | daemon_failures = [] | |
95 | daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons)) | |
96 | daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss)) | |
97 | for daemon in daemon_failures: | |
98 | name = daemon.role + '.' + daemon.id_ | |
99 | dt = daemon_failure_time.setdefault(name, (daemon, now)) | |
100 | assert dt[0] is daemon | |
101 | delta = now-dt[1] | |
102 | self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta)) | |
103 | if delta > daemon_timeout: | |
104 | bark = True | |
105 | ||
106 | # If a daemon is no longer failed, remove it from tracking: | |
107 | for name in daemon_failure_time.keys(): | |
108 | if name not in [d.role + '.' + d.id_ for d in daemon_failures]: | |
109 | self.log("daemon {name} has been restored".format(name=name)) | |
110 | del daemon_failure_time[name] | |
111 | ||
112 | for thrasher in self.thrashers: | |
113 | if thrasher.e is not None: | |
114 | self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name)) | |
115 | bark = True | |
116 | ||
117 | if bark: | |
118 | self.bark() | |
119 | return | |
120 | ||
121 | sleep(5) | |
122 | ||
123 | self.log("watchdog finished") | |
124 | ||
125 | class MDSThrasher(Greenlet): | |
126 | """ | |
127 | MDSThrasher:: | |
128 | ||
129 | The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc). | |
130 | ||
131 | The config is optional. Many of the config parameters are a a maximum value | |
132 | to use when selecting a random value from a range. To always use the maximum | |
133 | value, set no_random to true. The config is a dict containing some or all of: | |
134 | ||
135 | max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at | |
136 | any given time. | |
137 | ||
138 | max_thrash_delay: [default: 30] maximum number of seconds to delay before | |
139 | thrashing again. | |
140 | ||
141 | max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in | |
142 | the replay state before thrashing. | |
143 | ||
144 | max_revive_delay: [default: 10] maximum number of seconds to delay before | |
145 | bringing back a thrashed MDS. | |
146 | ||
147 | randomize: [default: true] enables randomization and use the max/min values | |
148 | ||
149 | seed: [no default] seed the random number generator | |
150 | ||
151 | thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed | |
152 | during replay. Value should be between 0.0 and 1.0. | |
153 | ||
154 | thrash_max_mds: [default: 0.0] likelihood that the max_mds of the mds | |
155 | cluster will be modified to a value [1, current) or (current, starting | |
156 | max_mds]. When reduced, randomly selected MDSs other than rank 0 will be | |
157 | deactivated to reach the new max_mds. Value should be between 0.0 and 1.0. | |
158 | ||
159 | thrash_while_stopping: [default: false] thrash an MDS while there | |
160 | are MDS in up:stopping (because max_mds was changed and some | |
161 | MDS were deactivated). | |
162 | ||
163 | thrash_weights: allows specific MDSs to be thrashed more/less frequently. | |
164 | This option overrides anything specified by max_thrash. This option is a | |
165 | dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b: | |
166 | 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not | |
167 | specified will be automatically given a weight of 0.0 (not thrashed). | |
168 | For a given MDS, by default the trasher delays for up to | |
169 | max_thrash_delay, trashes, waits for the MDS to recover, and iterates. | |
170 | If a non-zero weight is specified for an MDS, for each iteration the | |
171 | thrasher chooses whether to thrash during that iteration based on a | |
172 | random value [0-1] not exceeding the weight of that MDS. | |
173 | ||
174 | Examples:: | |
175 | ||
176 | ||
177 | The following example sets the likelihood that mds.a will be thrashed | |
178 | to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the | |
179 | likelihood that an MDS will be thrashed in replay to 40%. | |
180 | Thrash weights do not have to sum to 1. | |
181 | ||
182 | tasks: | |
183 | - ceph: | |
184 | - mds_thrash: | |
185 | thrash_weights: | |
186 | - mds.a: 0.8 | |
187 | - mds.b: 0.2 | |
188 | thrash_in_replay: 0.4 | |
189 | - ceph-fuse: | |
190 | - workunit: | |
191 | clients: | |
192 | all: [suites/fsx.sh] | |
193 | ||
194 | The following example disables randomization, and uses the max delay values: | |
195 | ||
196 | tasks: | |
197 | - ceph: | |
198 | - mds_thrash: | |
199 | max_thrash_delay: 10 | |
200 | max_revive_delay: 1 | |
201 | max_replay_thrash_delay: 4 | |
202 | ||
203 | """ | |
204 | ||
205 | def __init__(self, ctx, manager, config, fs, max_mds): | |
206 | Greenlet.__init__(self) | |
207 | ||
208 | self.config = config | |
209 | self.ctx = ctx | |
210 | self.e = None | |
211 | self.logger = log.getChild('fs.[{f}]'.format(f = fs.name)) | |
212 | self.fs = fs | |
213 | self.manager = manager | |
214 | self.max_mds = max_mds | |
215 | self.name = 'thrasher.fs.[{f}]'.format(f = fs.name) | |
216 | self.stopping = Event() | |
217 | ||
218 | self.randomize = bool(self.config.get('randomize', True)) | |
219 | self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.0)) | |
220 | self.max_thrash = int(self.config.get('max_thrash', 1)) | |
221 | self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0)) | |
222 | self.thrash_in_replay = float(self.config.get('thrash_in_replay', False)) | |
223 | assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format( | |
224 | v=self.thrash_in_replay) | |
225 | self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0)) | |
226 | self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0)) | |
227 | ||
228 | def _run(self): | |
229 | try: | |
230 | self.do_thrash() | |
231 | except Exception as e: | |
232 | # Log exceptions here so we get the full backtrace (gevent loses them). | |
233 | # Also allow succesful completion as gevent exception handling is a broken mess: | |
234 | # | |
235 | # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051) | |
236 | # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error | |
237 | # self.print_exception(context, type, value, tb) | |
238 | # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception | |
239 | # traceback.print_exception(type, value, tb, file=errstream) | |
240 | # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception | |
241 | # _print(file, 'Traceback (most recent call last):') | |
242 | # File "/usr/lib/python2.7/traceback.py", line 13, in _print | |
243 | # file.write(str+terminator) | |
244 | # 2017-02-03T14:34:01.261 CRITICAL:root:IOError | |
245 | self.e = e | |
246 | self.logger.exception("exception:") | |
247 | # allow successful completion so gevent doesn't see an exception... | |
248 | ||
249 | def log(self, x): | |
250 | """Write data to logger assigned to this MDThrasher""" | |
251 | self.logger.info(x) | |
252 | ||
253 | def stop(self): | |
254 | self.stopping.set() | |
255 | ||
256 | def kill_mds(self, mds): | |
257 | if self.config.get('powercycle'): | |
258 | (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). | |
259 | remotes.iterkeys()) | |
260 | self.log('kill_mds on mds.{m} doing powercycle of {s}'. | |
261 | format(m=mds, s=remote.name)) | |
262 | self._assert_ipmi(remote) | |
263 | remote.console.power_off() | |
264 | else: | |
265 | self.ctx.daemons.get_daemon('mds', mds).stop() | |
266 | ||
267 | @staticmethod | |
268 | def _assert_ipmi(remote): | |
269 | assert remote.console.has_ipmi_credentials, ( | |
270 | "powercycling requested but RemoteConsole is not " | |
271 | "initialized. Check ipmi config.") | |
272 | ||
273 | def revive_mds(self, mds, standby_for_rank=None): | |
274 | """ | |
275 | Revive mds -- do an ipmpi powercycle (if indicated by the config) | |
276 | and then restart (using --hot-standby if specified. | |
277 | """ | |
278 | if self.config.get('powercycle'): | |
279 | (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). | |
280 | remotes.iterkeys()) | |
281 | self.log('revive_mds on mds.{m} doing powercycle of {s}'. | |
282 | format(m=mds, s=remote.name)) | |
283 | self._assert_ipmi(remote) | |
284 | remote.console.power_on() | |
285 | self.manager.make_admin_daemon_dir(self.ctx, remote) | |
286 | args = [] | |
287 | if standby_for_rank: | |
288 | args.extend(['--hot-standby', standby_for_rank]) | |
289 | self.ctx.daemons.get_daemon('mds', mds).restart(*args) | |
290 | ||
291 | def wait_for_stable(self, rank = None, gid = None): | |
292 | self.log('waiting for mds cluster to stabilize...') | |
293 | for itercount in itertools.count(): | |
294 | status = self.fs.status() | |
295 | max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds'] | |
296 | ranks = list(status.get_ranks(self.fs.id)) | |
297 | stopping = filter(lambda info: "up:stopping" == info['state'], ranks) | |
298 | actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks) | |
299 | ||
300 | if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0: | |
301 | if itercount % 5 == 0: | |
302 | self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)') | |
303 | else: | |
304 | if rank is not None: | |
305 | try: | |
306 | info = status.get_rank(self.fs.id, rank) | |
307 | if info['gid'] != gid and "up:active" == info['state']: | |
308 | self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid)) | |
309 | return status | |
310 | except: | |
311 | pass # no rank present | |
312 | if len(actives) >= max_mds: | |
313 | # no replacement can occur! | |
314 | self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank)) | |
315 | return status | |
316 | else: | |
317 | if len(actives) >= max_mds: | |
318 | self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives))) | |
319 | return status, None | |
320 | if itercount > 300/2: # 5 minutes | |
321 | raise RuntimeError('timeout waiting for cluster to stabilize') | |
322 | elif itercount % 5 == 0: | |
323 | self.log('mds map: {status}'.format(status=status)) | |
324 | else: | |
325 | self.log('no change') | |
326 | sleep(2) | |
327 | ||
328 | def do_thrash(self): | |
329 | """ | |
330 | Perform the random thrashing action | |
331 | """ | |
332 | ||
333 | self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name)) | |
334 | stats = { | |
335 | "max_mds": 0, | |
336 | "deactivate": 0, | |
337 | "kill": 0, | |
338 | } | |
339 | ||
340 | while not self.stopping.is_set(): | |
341 | delay = self.max_thrash_delay | |
342 | if self.randomize: | |
343 | delay = random.randrange(0.0, self.max_thrash_delay) | |
344 | ||
345 | if delay > 0.0: | |
346 | self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) | |
347 | self.stopping.wait(delay) | |
348 | if self.stopping.is_set(): | |
349 | continue | |
350 | ||
351 | status = self.fs.status() | |
352 | ||
353 | if random.random() <= self.thrash_max_mds: | |
354 | max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds'] | |
355 | options = range(1, max_mds)+range(max_mds+1, self.max_mds+1) | |
356 | if len(options) > 0: | |
357 | sample = random.sample(options, 1) | |
358 | new_max_mds = sample[0] | |
359 | self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds)) | |
360 | self.fs.set_max_mds(new_max_mds) | |
361 | stats['max_mds'] += 1 | |
362 | ||
224ce89b WB |
363 | targets = filter(lambda r: r['rank'] >= new_max_mds, status.get_ranks(self.fs.id)) |
364 | if len(targets) > 0: | |
365 | # deactivate mds in decending order | |
366 | targets = sorted(targets, key=lambda r: r['rank'], reverse=True) | |
367 | for target in targets: | |
368 | self.log("deactivating rank %d" % target['rank']) | |
369 | self.fs.deactivate(target['rank']) | |
370 | stats['deactivate'] += 1 | |
371 | status = self.wait_for_stable()[0] | |
372 | else: | |
373 | status = self.wait_for_stable()[0] | |
7c673cae FG |
374 | |
375 | count = 0 | |
376 | for info in status.get_ranks(self.fs.id): | |
377 | name = info['name'] | |
378 | label = 'mds.' + name | |
379 | rank = info['rank'] | |
380 | gid = info['gid'] | |
381 | ||
382 | # if thrash_weights isn't specified and we've reached max_thrash, | |
383 | # we're done | |
384 | count = count + 1 | |
385 | if 'thrash_weights' not in self.config and count > self.max_thrash: | |
386 | break | |
387 | ||
388 | weight = 1.0 | |
389 | if 'thrash_weights' in self.config: | |
390 | weight = self.config['thrash_weights'].get(label, '0.0') | |
391 | skip = random.randrange(0.0, 1.0) | |
392 | if weight <= skip: | |
393 | self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight)) | |
394 | continue | |
395 | ||
396 | self.log('kill {label} (rank={rank})'.format(label=label, rank=rank)) | |
397 | self.kill_mds(name) | |
398 | stats['kill'] += 1 | |
399 | ||
400 | # wait for mon to report killed mds as crashed | |
401 | last_laggy_since = None | |
402 | itercount = 0 | |
403 | while True: | |
404 | status = self.fs.status() | |
405 | info = status.get_mds(name) | |
406 | if not info: | |
407 | break | |
408 | if 'laggy_since' in info: | |
409 | last_laggy_since = info['laggy_since'] | |
410 | break | |
411 | if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]): | |
412 | break | |
413 | self.log( | |
414 | 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format( | |
415 | label=label)) | |
416 | itercount = itercount + 1 | |
417 | if itercount > 10: | |
418 | self.log('mds map: {status}'.format(status=status)) | |
419 | sleep(2) | |
420 | ||
421 | if last_laggy_since: | |
422 | self.log( | |
423 | '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since)) | |
424 | else: | |
425 | self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since)) | |
426 | ||
427 | # wait for a standby mds to takeover and become active | |
428 | status = self.wait_for_stable(rank, gid) | |
429 | ||
430 | # wait for a while before restarting old active to become new | |
431 | # standby | |
432 | delay = self.max_revive_delay | |
433 | if self.randomize: | |
434 | delay = random.randrange(0.0, self.max_revive_delay) | |
435 | ||
436 | self.log('waiting for {delay} secs before reviving {label}'.format( | |
437 | delay=delay, label=label)) | |
438 | sleep(delay) | |
439 | ||
440 | self.log('reviving {label}'.format(label=label)) | |
441 | self.revive_mds(name) | |
442 | ||
443 | for itercount in itertools.count(): | |
444 | if itercount > 300/2: # 5 minutes | |
445 | raise RuntimeError('timeout waiting for MDS to revive') | |
446 | status = self.fs.status() | |
447 | info = status.get_mds(name) | |
448 | if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'): | |
449 | self.log('{label} reported in {state} state'.format(label=label, state=info['state'])) | |
450 | break | |
451 | self.log( | |
452 | 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label)) | |
453 | sleep(2) | |
454 | ||
455 | for stat in stats: | |
456 | self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat])) | |
457 | ||
458 | # don't do replay thrashing right now | |
459 | # for info in status.get_replays(self.fs.id): | |
460 | # # this might race with replay -> active transition... | |
461 | # if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay: | |
462 | # delay = self.max_replay_thrash_delay | |
463 | # if self.randomize: | |
464 | # delay = random.randrange(0.0, self.max_replay_thrash_delay) | |
465 | # sleep(delay) | |
466 | # self.log('kill replaying mds.{id}'.format(id=self.to_kill)) | |
467 | # self.kill_mds(self.to_kill) | |
468 | # | |
469 | # delay = self.max_revive_delay | |
470 | # if self.randomize: | |
471 | # delay = random.randrange(0.0, self.max_revive_delay) | |
472 | # | |
473 | # self.log('waiting for {delay} secs before reviving mds.{id}'.format( | |
474 | # delay=delay, id=self.to_kill)) | |
475 | # sleep(delay) | |
476 | # | |
477 | # self.log('revive mds.{id}'.format(id=self.to_kill)) | |
478 | # self.revive_mds(self.to_kill) | |
479 | ||
480 | ||
481 | @contextlib.contextmanager | |
482 | def task(ctx, config): | |
483 | """ | |
484 | Stress test the mds by thrashing while another task/workunit | |
485 | is running. | |
486 | ||
487 | Please refer to MDSThrasher class for further information on the | |
488 | available options. | |
489 | """ | |
490 | ||
491 | mds_cluster = MDSCluster(ctx) | |
492 | ||
493 | if config is None: | |
494 | config = {} | |
495 | assert isinstance(config, dict), \ | |
496 | 'mds_thrash task only accepts a dict for configuration' | |
497 | mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds')) | |
498 | assert len(mdslist) > 1, \ | |
499 | 'mds_thrash task requires at least 2 metadata servers' | |
500 | ||
501 | # choose random seed | |
502 | if 'seed' in config: | |
503 | seed = int(config['seed']) | |
504 | else: | |
505 | seed = int(time.time()) | |
506 | log.info('mds thrasher using random seed: {seed}'.format(seed=seed)) | |
507 | random.seed(seed) | |
508 | ||
509 | (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys() | |
510 | manager = ceph_manager.CephManager( | |
511 | first, ctx=ctx, logger=log.getChild('ceph_manager'), | |
512 | ) | |
513 | ||
514 | # make sure everyone is in active, standby, or standby-replay | |
515 | log.info('Wait for all MDSs to reach steady state...') | |
516 | status = mds_cluster.status() | |
517 | while True: | |
518 | steady = True | |
519 | for info in status.get_all(): | |
520 | state = info['state'] | |
521 | if state not in ('up:active', 'up:standby', 'up:standby-replay'): | |
522 | steady = False | |
523 | break | |
524 | if steady: | |
525 | break | |
526 | sleep(2) | |
527 | status = mds_cluster.status() | |
528 | log.info('Ready to start thrashing') | |
529 | ||
530 | thrashers = [] | |
531 | ||
532 | watchdog = DaemonWatchdog(ctx, manager, config, thrashers) | |
533 | watchdog.start() | |
534 | ||
535 | manager.wait_for_clean() | |
536 | assert manager.is_clean() | |
537 | for fs in status.get_filesystems(): | |
538 | thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']) | |
539 | thrasher.start() | |
540 | thrashers.append(thrasher) | |
541 | ||
542 | try: | |
543 | log.debug('Yielding') | |
544 | yield | |
545 | finally: | |
546 | log.info('joining mds_thrashers') | |
547 | for thrasher in thrashers: | |
548 | thrasher.stop() | |
549 | if thrasher.e: | |
550 | raise RuntimeError('error during thrashing') | |
551 | thrasher.join() | |
552 | log.info('done joining') | |
553 | ||
554 | watchdog.stop() | |
555 | watchdog.join() |