]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/mds_thrash.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / mds_thrash.py
1 """
2 Thrash mds by simulating failures
3 """
4 import logging
5 import contextlib
6 import itertools
7 import random
8 import time
9
10 from gevent import sleep
11 from gevent.greenlet import Greenlet
12 from gevent.event import Event
13 from teuthology import misc as teuthology
14
15 from tasks import ceph_manager
16 from tasks.cephfs.filesystem import MDSCluster, Filesystem
17 from tasks.thrasher import Thrasher
18
19 log = logging.getLogger(__name__)
20
21 class MDSThrasher(Thrasher, Greenlet):
22 """
23 MDSThrasher::
24
25 The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc).
26
27 The config is optional. Many of the config parameters are a a maximum value
28 to use when selecting a random value from a range. To always use the maximum
29 value, set no_random to true. The config is a dict containing some or all of:
30
31 max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at
32 any given time.
33
34 max_thrash_delay: [default: 30] maximum number of seconds to delay before
35 thrashing again.
36
37 max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in
38 the replay state before thrashing.
39
40 max_revive_delay: [default: 10] maximum number of seconds to delay before
41 bringing back a thrashed MDS.
42
43 randomize: [default: true] enables randomization and use the max/min values
44
45 seed: [no default] seed the random number generator
46
47 thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed
48 during replay. Value should be between 0.0 and 1.0.
49
50 thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds
51 cluster will be modified to a value [1, current) or (current, starting
52 max_mds]. Value should be between 0.0 and 1.0.
53
54 thrash_while_stopping: [default: false] thrash an MDS while there
55 are MDS in up:stopping (because max_mds was changed and some
56 MDS were deactivated).
57
58 thrash_weights: allows specific MDSs to be thrashed more/less frequently.
59 This option overrides anything specified by max_thrash. This option is a
60 dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b:
61 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not
62 specified will be automatically given a weight of 0.0 (not thrashed).
63 For a given MDS, by default the trasher delays for up to
64 max_thrash_delay, trashes, waits for the MDS to recover, and iterates.
65 If a non-zero weight is specified for an MDS, for each iteration the
66 thrasher chooses whether to thrash during that iteration based on a
67 random value [0-1] not exceeding the weight of that MDS.
68
69 Examples::
70
71
72 The following example sets the likelihood that mds.a will be thrashed
73 to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the
74 likelihood that an MDS will be thrashed in replay to 40%.
75 Thrash weights do not have to sum to 1.
76
77 tasks:
78 - ceph:
79 - mds_thrash:
80 thrash_weights:
81 - mds.a: 0.8
82 - mds.b: 0.2
83 thrash_in_replay: 0.4
84 - ceph-fuse:
85 - workunit:
86 clients:
87 all: [suites/fsx.sh]
88
89 The following example disables randomization, and uses the max delay values:
90
91 tasks:
92 - ceph:
93 - mds_thrash:
94 max_thrash_delay: 10
95 max_revive_delay: 1
96 max_replay_thrash_delay: 4
97
98 """
99
100 def __init__(self, ctx, manager, config, fs, max_mds):
101 super(MDSThrasher, self).__init__()
102
103 self.config = config
104 self.ctx = ctx
105 self.logger = log.getChild('fs.[{f}]'.format(f = fs.name))
106 self.fs = fs
107 self.manager = manager
108 self.max_mds = max_mds
109 self.name = 'thrasher.fs.[{f}]'.format(f = fs.name)
110 self.stopping = Event()
111
112 self.randomize = bool(self.config.get('randomize', True))
113 self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05))
114 self.max_thrash = int(self.config.get('max_thrash', 1))
115 self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0))
116 self.thrash_in_replay = float(self.config.get('thrash_in_replay', False))
117 assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format(
118 v=self.thrash_in_replay)
119 self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0))
120 self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0))
121
122 def _run(self):
123 try:
124 self.do_thrash()
125 except Exception as e:
126 # Log exceptions here so we get the full backtrace (gevent loses them).
127 # Also allow successful completion as gevent exception handling is a broken mess:
128 #
129 # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051)
130 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error
131 # self.print_exception(context, type, value, tb)
132 # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception
133 # traceback.print_exception(type, value, tb, file=errstream)
134 # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception
135 # _print(file, 'Traceback (most recent call last):')
136 # File "/usr/lib/python2.7/traceback.py", line 13, in _print
137 # file.write(str+terminator)
138 # 2017-02-03T14:34:01.261 CRITICAL:root:IOError
139 self.set_thrasher_exception(e)
140 self.logger.exception("exception:")
141 # allow successful completion so gevent doesn't see an exception...
142
143 def log(self, x):
144 """Write data to the logger assigned to MDSThrasher"""
145 self.logger.info(x)
146
147 def stop(self):
148 self.stopping.set()
149
150 def kill_mds(self, mds):
151 if self.config.get('powercycle'):
152 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
153 remotes.keys())
154 self.log('kill_mds on mds.{m} doing powercycle of {s}'.
155 format(m=mds, s=remote.name))
156 self._assert_ipmi(remote)
157 remote.console.power_off()
158 else:
159 self.ctx.daemons.get_daemon('mds', mds).stop()
160
161 @staticmethod
162 def _assert_ipmi(remote):
163 assert remote.console.has_ipmi_credentials, (
164 "powercycling requested but RemoteConsole is not "
165 "initialized. Check ipmi config.")
166
167 def revive_mds(self, mds):
168 """
169 Revive mds -- do an ipmpi powercycle (if indicated by the config)
170 and then restart.
171 """
172 if self.config.get('powercycle'):
173 (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)).
174 remotes.keys())
175 self.log('revive_mds on mds.{m} doing powercycle of {s}'.
176 format(m=mds, s=remote.name))
177 self._assert_ipmi(remote)
178 remote.console.power_on()
179 self.manager.make_admin_daemon_dir(self.ctx, remote)
180 args = []
181 self.ctx.daemons.get_daemon('mds', mds).restart(*args)
182
183 def wait_for_stable(self, rank = None, gid = None):
184 self.log('waiting for mds cluster to stabilize...')
185 for itercount in itertools.count():
186 status = self.fs.status()
187 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
188 ranks = list(status.get_ranks(self.fs.id))
189 stopping = sum(1 for _ in ranks if "up:stopping" == _['state'])
190 actives = sum(1 for _ in ranks
191 if "up:active" == _['state'] and "laggy_since" not in _)
192
193 if not bool(self.config.get('thrash_while_stopping', False)) and stopping > 0:
194 if itercount % 5 == 0:
195 self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)')
196 else:
197 if rank is not None:
198 try:
199 info = status.get_rank(self.fs.id, rank)
200 if info['gid'] != gid and "up:active" == info['state']:
201 self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid))
202 return status
203 except:
204 pass # no rank present
205 if actives >= max_mds:
206 # no replacement can occur!
207 self.log("cluster has {actives} actives (max_mds is {max_mds}), no MDS can replace rank {rank}".format(
208 actives=actives, max_mds=max_mds, rank=rank))
209 return status
210 else:
211 if actives == max_mds:
212 self.log('mds cluster has {count} alive and active, now stable!'.format(count = actives))
213 return status, None
214 if itercount > 300/2: # 5 minutes
215 raise RuntimeError('timeout waiting for cluster to stabilize')
216 elif itercount % 5 == 0:
217 self.log('mds map: {status}'.format(status=status))
218 else:
219 self.log('no change')
220 sleep(2)
221
222 def do_thrash(self):
223 """
224 Perform the random thrashing action
225 """
226
227 self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name))
228 stats = {
229 "max_mds": 0,
230 "deactivate": 0,
231 "kill": 0,
232 }
233
234 while not self.stopping.is_set():
235 delay = self.max_thrash_delay
236 if self.randomize:
237 delay = random.randrange(0.0, self.max_thrash_delay)
238
239 if delay > 0.0:
240 self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
241 self.stopping.wait(delay)
242 if self.stopping.is_set():
243 continue
244
245 status = self.fs.status()
246
247 if random.random() <= self.thrash_max_mds:
248 max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds']
249 options = list(range(1, max_mds))+list(range(max_mds+1, self.max_mds+1))
250 if len(options) > 0:
251 sample = random.sample(options, 1)
252 new_max_mds = sample[0]
253 self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds))
254 self.fs.set_max_mds(new_max_mds)
255 stats['max_mds'] += 1
256 self.wait_for_stable()
257
258 count = 0
259 for info in status.get_ranks(self.fs.id):
260 name = info['name']
261 label = 'mds.' + name
262 rank = info['rank']
263 gid = info['gid']
264
265 # if thrash_weights isn't specified and we've reached max_thrash,
266 # we're done
267 count = count + 1
268 if 'thrash_weights' not in self.config and count > self.max_thrash:
269 break
270
271 weight = 1.0
272 if 'thrash_weights' in self.config:
273 weight = self.config['thrash_weights'].get(label, '0.0')
274 skip = random.randrange(0.0, 1.0)
275 if weight <= skip:
276 self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight))
277 continue
278
279 self.log('kill {label} (rank={rank})'.format(label=label, rank=rank))
280 self.kill_mds(name)
281 stats['kill'] += 1
282
283 # wait for mon to report killed mds as crashed
284 last_laggy_since = None
285 itercount = 0
286 while True:
287 status = self.fs.status()
288 info = status.get_mds(name)
289 if not info:
290 break
291 if 'laggy_since' in info:
292 last_laggy_since = info['laggy_since']
293 break
294 if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]):
295 break
296 self.log(
297 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format(
298 label=label))
299 itercount = itercount + 1
300 if itercount > 10:
301 self.log('mds map: {status}'.format(status=status))
302 sleep(2)
303
304 if last_laggy_since:
305 self.log(
306 '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since))
307 else:
308 self.log('{label} down, removed from mdsmap'.format(label=label))
309
310 # wait for a standby mds to takeover and become active
311 status = self.wait_for_stable(rank, gid)
312
313 # wait for a while before restarting old active to become new
314 # standby
315 delay = self.max_revive_delay
316 if self.randomize:
317 delay = random.randrange(0.0, self.max_revive_delay)
318
319 self.log('waiting for {delay} secs before reviving {label}'.format(
320 delay=delay, label=label))
321 sleep(delay)
322
323 self.log('reviving {label}'.format(label=label))
324 self.revive_mds(name)
325
326 for itercount in itertools.count():
327 if itercount > 300/2: # 5 minutes
328 raise RuntimeError('timeout waiting for MDS to revive')
329 status = self.fs.status()
330 info = status.get_mds(name)
331 if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'):
332 self.log('{label} reported in {state} state'.format(label=label, state=info['state']))
333 break
334 self.log(
335 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label))
336 sleep(2)
337
338 for stat in stats:
339 self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
340
341 # don't do replay thrashing right now
342 # for info in status.get_replays(self.fs.id):
343 # # this might race with replay -> active transition...
344 # if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay:
345 # delay = self.max_replay_thrash_delay
346 # if self.randomize:
347 # delay = random.randrange(0.0, self.max_replay_thrash_delay)
348 # sleep(delay)
349 # self.log('kill replaying mds.{id}'.format(id=self.to_kill))
350 # self.kill_mds(self.to_kill)
351 #
352 # delay = self.max_revive_delay
353 # if self.randomize:
354 # delay = random.randrange(0.0, self.max_revive_delay)
355 #
356 # self.log('waiting for {delay} secs before reviving mds.{id}'.format(
357 # delay=delay, id=self.to_kill))
358 # sleep(delay)
359 #
360 # self.log('revive mds.{id}'.format(id=self.to_kill))
361 # self.revive_mds(self.to_kill)
362
363
364 @contextlib.contextmanager
365 def task(ctx, config):
366 """
367 Stress test the mds by thrashing while another task/workunit
368 is running.
369
370 Please refer to MDSThrasher class for further information on the
371 available options.
372 """
373
374 mds_cluster = MDSCluster(ctx)
375
376 if config is None:
377 config = {}
378 assert isinstance(config, dict), \
379 'mds_thrash task only accepts a dict for configuration'
380 mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
381 assert len(mdslist) > 1, \
382 'mds_thrash task requires at least 2 metadata servers'
383
384 # choose random seed
385 if 'seed' in config:
386 seed = int(config['seed'])
387 else:
388 seed = int(time.time())
389 log.info('mds thrasher using random seed: {seed}'.format(seed=seed))
390 random.seed(seed)
391
392 (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.keys()
393 manager = ceph_manager.CephManager(
394 first, ctx=ctx, logger=log.getChild('ceph_manager'),
395 )
396
397 # make sure everyone is in active, standby, or standby-replay
398 log.info('Wait for all MDSs to reach steady state...')
399 status = mds_cluster.status()
400 while True:
401 steady = True
402 for info in status.get_all():
403 state = info['state']
404 if state not in ('up:active', 'up:standby', 'up:standby-replay'):
405 steady = False
406 break
407 if steady:
408 break
409 sleep(2)
410 status = mds_cluster.status()
411 log.info('Ready to start thrashing')
412
413 manager.wait_for_clean()
414 assert manager.is_clean()
415
416 if 'cluster' not in config:
417 config['cluster'] = 'ceph'
418
419 for fs in status.get_filesystems():
420 thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds'])
421 thrasher.start()
422 ctx.ceph[config['cluster']].thrashers.append(thrasher)
423
424 try:
425 log.debug('Yielding')
426 yield
427 finally:
428 log.info('joining mds_thrasher')
429 thrasher.stop()
430 if thrasher.exception is not None:
431 raise RuntimeError('error during thrashing')
432 thrasher.join()
433 log.info('done joining')