]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mds_thrash.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / qa / tasks / mds_thrash.py
1 """
2 Thrash mds by simulating failures
3 """
4 import logging
5 import contextlib
6 import ceph_manager
7 import itertools
8 import random
9 import signal
10 import time
11
12 from gevent import sleep
13 from gevent.greenlet import Greenlet
14 from gevent.event import Event
15 from teuthology import misc as teuthology
16
17 from tasks.cephfs.filesystem import MDSCluster, Filesystem
18
19 log = logging.getLogger(__name__)
20
21 class DaemonWatchdog(Greenlet):
22 """
23 DaemonWatchdog::
24
25 Watch Ceph daemons for failures. If an extended failure is detected (i.e.
26 not intentional), then the watchdog will unmount file systems and send
27 SIGTERM to all daemons. The duration of an extended failure is configurable
28 with watchdog_daemon_timeout.
29
30 watchdog_daemon_timeout [default: 300]: number of seconds a daemon
31 is allowed to be failed before the watchdog will bark.
32 """
33
34 def __init__(self, ctx, manager, config, thrashers):
35 Greenlet.__init__(self)
36 self.ctx = ctx
37 self.config = config
38 self.e = None
39 self.logger = log.getChild('daemon_watchdog')
40 self.manager = manager
41 self.name = 'watchdog'
42 self.stopping = Event()
43 self.thrashers = thrashers
44
45 def _run(self):
46 try:
47 self.watch()
48 except Exception as e:
49 # See _run exception comment for MDSThrasher
50 self.e = e
51 self.logger.exception("exception:")
52 # allow successful completion so gevent doesn't see an exception...
53
54 def log(self, x):
55 """Write data to logger"""
56 self.logger.info(x)
57
58 def stop(self):
59 self.stopping.set()
60
61 def bark(self):
62 self.log("BARK! unmounting mounts and killing all daemons")
63 for mount in self.ctx.mounts.values():
64 try:
65 mount.umount_wait(force=True)
66 except:
67 self.logger.exception("ignoring exception:")
68 daemons = []
69 daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)))
70 daemons.extend(filter(lambda daemon: daemon.running() and not daemon.proc.finished, self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)))
71 for daemon in daemons:
72 try:
73 daemon.signal(signal.SIGTERM)
74 except:
75 self.logger.exception("ignoring exception:")
76
77 def watch(self):
78 self.log("watchdog starting")
79 daemon_timeout = int(self.config.get('watchdog_daemon_timeout', 300))
80 daemon_failure_time = {}
81 while not self.stopping.is_set():
82 bark = False
83 now = time.time()
84
85 mons = self.ctx.daemons.iter_daemons_of_role('mon', cluster=self.manager.cluster)
86 mdss = self.ctx.daemons.iter_daemons_of_role('mds', cluster=self.manager.cluster)
87 clients = self.ctx.daemons.iter_daemons_of_role('client', cluster=self.manager.cluster)
88
89 #for daemon in mons:
90 # self.log("mon daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
91 #for daemon in mdss:
92 # self.log("mds daemon {role}.{id}: running={r}".format(role=daemon.role, id=daemon.id_, r=daemon.running() and not daemon.proc.finished))
93
94 daemon_failures = []
95 daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mons))
96 daemon_failures.extend(filter(lambda daemon: daemon.running() and daemon.proc.finished, mdss))
97 for daemon in daemon_failures:
98 name = daemon.role + '.' + daemon.id_
99 dt = daemon_failure_time.setdefault(name, (daemon, now))
100 assert dt[0] is daemon
101 delta = now-dt[1]
102 self.log("daemon {name} is failed for ~{t:.0f}s".format(name=name, t=delta))
103 if delta > daemon_timeout:
104 bark = True
105
106 # If a daemon is no longer failed, remove it from tracking:
107 for name in daemon_failure_time.keys():
108 if name not in [d.role + '.' + d.id_ for d in daemon_failures]:
109 self.log("daemon {name} has been restored".format(name=name))
110 del daemon_failure_time[name]
111
112 for thrasher in self.thrashers:
113 if thrasher.e is not None:
114 self.log("thrasher on fs.{name} failed".format(name=thrasher.fs.name))
115 bark = True
116
117 if bark:
118 self.bark()
119 return
120
121 sleep(5)
122
123 self.log("watchdog finished")
124
125 class MDSThrasher(Greenlet):
126 """
127 MDSThrasher::
128
129 The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
130
131 The config is optional. Many of the config parameters are a a maximum value
132 to use when selecting a random value from a range. To always use the maximum
133 value, set no_random to true. The config is a dict containing some or all of:
134
135 max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
136 any given time.
137
138 max_thrash_delay: [default: 30] maximum number of seconds to delay before
139 thrashing again.
140
141 max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
142 the replay state before thrashing.
143
144 max_revive_delay: [default: 10] maximum number of seconds to delay before
145 bringing back a thrashed MDS.
146
147 randomize: [default: true] enables randomization and use the max/min values
148
149 seed: [no default] seed the random number generator
150
151 thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
152 during replay. Value should be between 0.0 and 1.0.
153
154 thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
155 cluster will be modified to a value [1, current) or (current, starting
156 max_mds]. Value should be between 0.0 and 1.0.
157
158 thrash_while_stopping: [default: false] thrash an MDS while there
159 are MDS in up:stopping (because max_mds was changed and some
160 MDS were deactivated).
161
162 thrash_weights: allows specific MDSs to be thrashed more/less frequently.
163 This option overrides anything specified by max_thrash. This option is a
164 dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
165 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
166 specified will be automatically given a weight of 0.0 (not thrashed).
167 For a given MDS, by default the trasher delays for up to
168 max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
169 If a non-zero weight is specified for an MDS, for each iteration the
170 thrasher chooses whether to thrash during that iteration based on a
171 random value [0-1] not exceeding the weight of that MDS.
172
173 Examples::
174
175
176 The following example sets the likelihood that mds.a will be thrashed
177 to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the
178 likelihood that an MDS will be thrashed in replay to 40%.
179 Thrash weights do not have to sum to 1.
180
181 tasks:
182 - ceph:
183 - mds_thrash:
184 thrash_weights:
185 - mds.a: 0.8
186 - mds.b: 0.2
187 thrash_in_replay: 0.4
188 - ceph-fuse:
189 - workunit:
190 clients:
191 all: [suites/fsx.sh]
192
193 The following example disables randomization, and uses the max delay values:
194
195 tasks:
196 - ceph:
197 - mds_thrash:
198 max_thrash_delay: 10
199 max_revive_delay: 1
200 max_replay_thrash_delay: 4
201
202 """
203
204 def __init__(self, ctx, manager, config, fs, max_mds):
205 Greenlet.__init__(self)
206
207 self.config = config
208 self.ctx = ctx
209 self.e = None
210 self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
211 self.fs = fs
212 self.manager = manager
213 self.max_mds = max_mds
214 self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
215 self.stopping = Event()
216
217 self.randomize = bool(self.config.get('randomize', True))
218 self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
219 self.max_thrash = int(self.config.get('max_thrash', 1))
220 self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
221 self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
222 assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
223 v=self.thrash_in_replay)
224 self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
225 self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
226
227 def _run(self):
228 try:
229 self.do_thrash()
230 except Exception as e:
231 # Log exceptions here so we get the full backtrace (gevent loses them).
232 # Also allow successful completion as gevent exception handling is a broken mess:
233 #
234 # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
235 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
236 # self.print_exception(context, type, value, tb)
237 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
238 # traceback.print_exception(type, value, tb, file=errstream)
239 # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
240 # _print(file, 'Traceback (most recent call last):')
241 # File "/usr/lib/python2.7/traceback.py", line 13, in _print
242 # file.write(str+terminator)
243 # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
244 self.e = e
245 self.logger.exception("exception:")
246 # allow successful completion so gevent doesn't see an exception...
247
248 def log(self, x):
249 """Write data to logger assigned to this MDThrasher"""
250 self.logger.info(x)
251
252 def stop(self):
253 self.stopping.set()
254
255 def kill_mds(self, mds):
256 if self.config.get('powercycle'):
257 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
258 remotes.iterkeys())
259 self.log('kill_mds on mds.{m} doing powercycle of {s}'.
260 format(m=mds, s=remote.name))
261 self._assert_ipmi(remote)
262 remote.console.power_off()
263 else:
264 self.ctx.daemons.get_daemon('mds', mds).stop()
265
266 @staticmethod
267 def _assert_ipmi(remote):
268 assert remote.console.has_ipmi_credentials, (
269 "powercycling requested but RemoteConsole is not "
270 "initialized. Check ipmi config.")
271
272 def revive_mds(self, mds):
273 """
274 Revive mds -- do an ipmpi powercycle (if indicated by the config)
275 and then restart.
276 """
277 if self.config.get('powercycle'):
278 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
279 remotes.iterkeys())
280 self.log('revive_mds on mds.{m} doing powercycle of {s}'.
281 format(m=mds, s=remote.name))
282 self._assert_ipmi(remote)
283 remote.console.power_on()
284 self.manager.make_admin_daemon_dir(self.ctx, remote)
285 args = []
286 self.ctx.daemons.get_daemon('mds', mds).restart(*args)
287
288 def wait_for_stable(self, rank = None, gid = None):
289 self.log('waiting for mds cluster to stabilize...')
290 for itercount in itertools.count():
291 status = self.fs.status()
292 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
293 ranks = list(status.get_ranks(self.fs.id))
294 stopping = filter(lambda info: "up:stopping" == info['state'], ranks)
295 actives = filter(lambda info: "up:active" == info['state'] and "laggy_since" not in info, ranks)
296
297 if not bool(self.config.get('thrash_while_stopping', False)) and len(stopping) > 0:
298 if itercount % 5 == 0:
299 self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
300 else:
301 if rank is not None:
302 try:
303 info = status.get_rank(self.fs.id, rank)
304 if info['gid'] != gid and "up:active" == info['state']:
305 self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
306 return status
307 except:
308 pass # no rank present
309 if len(actives) >= max_mds:
310 # no replacement can occur!
311 self.log("cluster has %d actives (max_mds is %d), no MDS can replace rank %d".format(len(actives), max_mds, rank))
312 return status
313 else:
314 if len(actives) == max_mds:
315 self.log('mds cluster has {count} alive and active, now stable!'.format(count = len(actives)))
316 return status, None
317 if itercount > 300/2: # 5 minutes
318 raise RuntimeError('timeout waiting for cluster to stabilize')
319 elif itercount % 5 == 0:
320 self.log('mds map: {status}'.format(status=status))
321 else:
322 self.log('no change')
323 sleep(2)
324
325 def do_thrash(self):
326 """
327 Perform the random thrashing action
328 """
329
330 self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
331 stats = {
332 "max_mds": 0,
333 "deactivate": 0,
334 "kill": 0,
335 }
336
337 while not self.stopping.is_set():
338 delay = self.max_thrash_delay
339 if self.randomize:
340 delay = random.randrange(0.0, self.max_thrash_delay)
341
342 if delay > 0.0:
343 self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
344 self.stopping.wait(delay)
345 if self.stopping.is_set():
346 continue
347
348 status = self.fs.status()
349
350 if random.random() <= self.thrash_max_mds:
351 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
352 options = range(1, max_mds)+range(max_mds+1, self.max_mds+1)
353 if len(options) > 0:
354 sample = random.sample(options, 1)
355 new_max_mds = sample[0]
356 self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
357 self.fs.set_max_mds(new_max_mds)
358 stats['max_mds'] += 1
359 self.wait_for_stable()
360
361 count = 0
362 for info in status.get_ranks(self.fs.id):
363 name = info['name']
364 label = 'mds.' + name
365 rank = info['rank']
366 gid = info['gid']
367
368 # if thrash_weights isn't specified and we've reached max_thrash,
369 # we're done
370 count = count + 1
371 if 'thrash_weights' not in self.config and count > self.max_thrash:
372 break
373
374 weight = 1.0
375 if 'thrash_weights' in self.config:
376 weight = self.config['thrash_weights'].get(label, '0.0')
377 skip = random.randrange(0.0, 1.0)
378 if weight <= skip:
379 self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
380 continue
381
382 self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
383 self.kill_mds(name)
384 stats['kill'] += 1
385
386 # wait for mon to report killed mds as crashed
387 last_laggy_since = None
388 itercount = 0
389 while True:
390 status = self.fs.status()
391 info = status.get_mds(name)
392 if not info:
393 break
394 if 'laggy_since' in info:
395 last_laggy_since = info['laggy_since']
396 break
397 if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
398 break
399 self.log(
400 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
401 label=label))
402 itercount = itercount + 1
403 if itercount > 10:
404 self.log('mds map: {status}'.format(status=status))
405 sleep(2)
406
407 if last_laggy_since:
408 self.log(
409 '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
410 else:
411 self.log('{label} down, removed from mdsmap'.format(label=label, since=last_laggy_since))
412
413 # wait for a standby mds to takeover and become active
414 status = self.wait_for_stable(rank, gid)
415
416 # wait for a while before restarting old active to become new
417 # standby
418 delay = self.max_revive_delay
419 if self.randomize:
420 delay = random.randrange(0.0, self.max_revive_delay)
421
422 self.log('waiting for {delay} secs before reviving {label}'.format(
423 delay=delay, label=label))
424 sleep(delay)
425
426 self.log('reviving {label}'.format(label=label))
427 self.revive_mds(name)
428
429 for itercount in itertools.count():
430 if itercount > 300/2: # 5 minutes
431 raise RuntimeError('timeout waiting for MDS to revive')
432 status = self.fs.status()
433 info = status.get_mds(name)
434 if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
435 self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
436 break
437 self.log(
438 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
439 sleep(2)
440
441 for stat in stats:
442 self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
443
444 # don't do replay thrashing right now
445 # for info in status.get_replays(self.fs.id):
446 # # this might race with replay -> active transition...
447 # if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
448 # delay = self.max_replay_thrash_delay
449 # if self.randomize:
450 # delay = random.randrange(0.0, self.max_replay_thrash_delay)
451 # sleep(delay)
452 # self.log('kill replaying mds.{id}'.format(id=self.to_kill))
453 # self.kill_mds(self.to_kill)
454 #
455 # delay = self.max_revive_delay
456 # if self.randomize:
457 # delay = random.randrange(0.0, self.max_revive_delay)
458 #
459 # self.log('waiting for {delay} secs before reviving mds.{id}'.format(
460 # delay=delay, id=self.to_kill))
461 # sleep(delay)
462 #
463 # self.log('revive mds.{id}'.format(id=self.to_kill))
464 # self.revive_mds(self.to_kill)
465
466
467 @contextlib.contextmanager
468 def task(ctx, config):
469 """
470 Stress test the mds by thrashing while another task/workunit
471 is running.
472
473 Please refer to MDSThrasher class for further information on the
474 available options.
475 """
476
477 mds_cluster = MDSCluster(ctx)
478
479 if config is None:
480 config = {}
481 assert isinstance(config, dict), \
482 'mds_thrash task only accepts a dict for configuration'
483 mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
484 assert len(mdslist) > 1, \
485 'mds_thrash task requires at least 2 metadata servers'
486
487 # choose random seed
488 if 'seed' in config:
489 seed = int(config['seed'])
490 else:
491 seed = int(time.time())
492 log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
493 random.seed(seed)
494
495 (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.iterkeys()
496 manager = ceph_manager.CephManager(
497 first, ctx=ctx, logger=log.getChild('ceph_manager'),
498 )
499
500 # make sure everyone is in active, standby, or standby-replay
501 log.info('Wait for all MDSs to reach steady state...')
502 status = mds_cluster.status()
503 while True:
504 steady = True
505 for info in status.get_all():
506 state = info['state']
507 if state not in ('up:active', 'up:standby', 'up:standby-replay'):
508 steady = False
509 break
510 if steady:
511 break
512 sleep(2)
513 status = mds_cluster.status()
514 log.info('Ready to start thrashing')
515
516 thrashers = []
517
518 watchdog = DaemonWatchdog(ctx, manager, config, thrashers)
519 watchdog.start()
520
521 manager.wait_for_clean()
522 assert manager.is_clean()
523 for fs in status.get_filesystems():
524 thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
525 thrasher.start()
526 thrashers.append(thrasher)
527
528 try:
529 log.debug('Yielding')
530 yield
531 finally:
532 log.info('joining mds_thrashers')
533 for thrasher in thrashers:
534 thrasher.stop()
535 if thrasher.e:
536 raise RuntimeError('error during thrashing')
537 thrasher.join()
538 log.info('done joining')
539
540 watchdog.stop()
541 watchdog.join()