]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Thrash mds by simulating failures | |
3 | """ | |
4 | import logging | |
5 | import contextlib | |
7c673cae FG |
6 | import itertools |
7 | import random | |
7c673cae FG |
8 | import time |
9 | ||
10 | from gevent import sleep | |
11 | from gevent.greenlet import Greenlet | |
12 | from gevent.event import Event | |
13 | from teuthology import misc as teuthology | |
14 | ||
e306af50 | 15 | from tasks import ceph_manager |
7c673cae | 16 | from tasks.cephfs.filesystem import MDSCluster, Filesystem |
9f95a23c | 17 | from tasks.thrasher import Thrasher |
7c673cae FG |
18 | |
19 | log = logging.getLogger(__name__) | |
20 | ||
9f95a23c | 21 | class MDSThrasher(Thrasher, Greenlet): |
7c673cae FG |
22 | """ |
23 | MDSThrasher:: | |
24 | ||
25 | The MDSThrasher thrashes MDSs during execution of other tasks (workunits, etc). | |
26 | ||
27 | The config is optional. Many of the config parameters are a a maximum value | |
28 | to use when selecting a random value from a range. To always use the maximum | |
29 | value, set no_random to true. The config is a dict containing some or all of: | |
30 | ||
31 | max_thrash: [default: 1] the maximum number of active MDSs per FS that will be thrashed at | |
32 | any given time. | |
33 | ||
34 | max_thrash_delay: [default: 30] maximum number of seconds to delay before | |
35 | thrashing again. | |
36 | ||
37 | max_replay_thrash_delay: [default: 4] maximum number of seconds to delay while in | |
38 | the replay state before thrashing. | |
39 | ||
40 | max_revive_delay: [default: 10] maximum number of seconds to delay before | |
41 | bringing back a thrashed MDS. | |
42 | ||
43 | randomize: [default: true] enables randomization and use the max/min values | |
44 | ||
45 | seed: [no default] seed the random number generator | |
46 | ||
47 | thrash_in_replay: [default: 0.0] likelihood that the MDS will be thrashed | |
48 | during replay. Value should be between 0.0 and 1.0. | |
49 | ||
c07f9fc5 | 50 | thrash_max_mds: [default: 0.05] likelihood that the max_mds of the mds |
7c673cae | 51 | cluster will be modified to a value [1, current) or (current, starting |
11fdf7f2 | 52 | max_mds]. Value should be between 0.0 and 1.0. |
7c673cae FG |
53 | |
54 | thrash_while_stopping: [default: false] thrash an MDS while there | |
55 | are MDS in up:stopping (because max_mds was changed and some | |
56 | MDS were deactivated). | |
57 | ||
58 | thrash_weights: allows specific MDSs to be thrashed more/less frequently. | |
59 | This option overrides anything specified by max_thrash. This option is a | |
60 | dict containing mds.x: weight pairs. For example, [mds.a: 0.7, mds.b: | |
61 | 0.3, mds.c: 0.0]. Each weight is a value from 0.0 to 1.0. Any MDSs not | |
62 | specified will be automatically given a weight of 0.0 (not thrashed). | |
63 | For a given MDS, by default the trasher delays for up to | |
64 | max_thrash_delay, trashes, waits for the MDS to recover, and iterates. | |
65 | If a non-zero weight is specified for an MDS, for each iteration the | |
66 | thrasher chooses whether to thrash during that iteration based on a | |
67 | random value [0-1] not exceeding the weight of that MDS. | |
68 | ||
69 | Examples:: | |
70 | ||
71 | ||
72 | The following example sets the likelihood that mds.a will be thrashed | |
73 | to 80%, mds.b to 20%, and other MDSs will not be thrashed. It also sets the | |
74 | likelihood that an MDS will be thrashed in replay to 40%. | |
75 | Thrash weights do not have to sum to 1. | |
76 | ||
77 | tasks: | |
78 | - ceph: | |
79 | - mds_thrash: | |
80 | thrash_weights: | |
81 | - mds.a: 0.8 | |
82 | - mds.b: 0.2 | |
83 | thrash_in_replay: 0.4 | |
84 | - ceph-fuse: | |
85 | - workunit: | |
86 | clients: | |
87 | all: [suites/fsx.sh] | |
88 | ||
89 | The following example disables randomization, and uses the max delay values: | |
90 | ||
91 | tasks: | |
92 | - ceph: | |
93 | - mds_thrash: | |
94 | max_thrash_delay: 10 | |
95 | max_revive_delay: 1 | |
96 | max_replay_thrash_delay: 4 | |
97 | ||
98 | """ | |
99 | ||
100 | def __init__(self, ctx, manager, config, fs, max_mds): | |
9f95a23c | 101 | super(MDSThrasher, self).__init__() |
7c673cae FG |
102 | |
103 | self.config = config | |
104 | self.ctx = ctx | |
7c673cae FG |
105 | self.logger = log.getChild('fs.[{f}]'.format(f = fs.name)) |
106 | self.fs = fs | |
107 | self.manager = manager | |
108 | self.max_mds = max_mds | |
109 | self.name = 'thrasher.fs.[{f}]'.format(f = fs.name) | |
110 | self.stopping = Event() | |
111 | ||
112 | self.randomize = bool(self.config.get('randomize', True)) | |
c07f9fc5 | 113 | self.thrash_max_mds = float(self.config.get('thrash_max_mds', 0.05)) |
7c673cae FG |
114 | self.max_thrash = int(self.config.get('max_thrash', 1)) |
115 | self.max_thrash_delay = float(self.config.get('thrash_delay', 120.0)) | |
116 | self.thrash_in_replay = float(self.config.get('thrash_in_replay', False)) | |
117 | assert self.thrash_in_replay >= 0.0 and self.thrash_in_replay <= 1.0, 'thrash_in_replay ({v}) must be between [0.0, 1.0]'.format( | |
118 | v=self.thrash_in_replay) | |
119 | self.max_replay_thrash_delay = float(self.config.get('max_replay_thrash_delay', 4.0)) | |
120 | self.max_revive_delay = float(self.config.get('max_revive_delay', 10.0)) | |
121 | ||
122 | def _run(self): | |
123 | try: | |
124 | self.do_thrash() | |
125 | except Exception as e: | |
126 | # Log exceptions here so we get the full backtrace (gevent loses them). | |
11fdf7f2 | 127 | # Also allow successful completion as gevent exception handling is a broken mess: |
7c673cae FG |
128 | # |
129 | # 2017-02-03T14:34:01.259 CRITICAL:root: File "gevent.libev.corecext.pyx", line 367, in gevent.libev.corecext.loop.handle_error (src/gevent/libev/gevent.corecext.c:5051) | |
130 | # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 558, in handle_error | |
131 | # self.print_exception(context, type, value, tb) | |
132 | # File "/home/teuthworker/src/git.ceph.com_git_teuthology_master/virtualenv/local/lib/python2.7/site-packages/gevent/hub.py", line 605, in print_exception | |
133 | # traceback.print_exception(type, value, tb, file=errstream) | |
134 | # File "/usr/lib/python2.7/traceback.py", line 124, in print_exception | |
135 | # _print(file, 'Traceback (most recent call last):') | |
136 | # File "/usr/lib/python2.7/traceback.py", line 13, in _print | |
137 | # file.write(str+terminator) | |
138 | # 2017-02-03T14:34:01.261 CRITICAL:root:IOError | |
9f95a23c | 139 | self.set_thrasher_exception(e) |
7c673cae FG |
140 | self.logger.exception("exception:") |
141 | # allow successful completion so gevent doesn't see an exception... | |
142 | ||
143 | def log(self, x): | |
9f95a23c | 144 | """Write data to the logger assigned to MDSThrasher""" |
7c673cae FG |
145 | self.logger.info(x) |
146 | ||
147 | def stop(self): | |
148 | self.stopping.set() | |
149 | ||
150 | def kill_mds(self, mds): | |
151 | if self.config.get('powercycle'): | |
152 | (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). | |
9f95a23c | 153 | remotes.keys()) |
7c673cae FG |
154 | self.log('kill_mds on mds.{m} doing powercycle of {s}'. |
155 | format(m=mds, s=remote.name)) | |
156 | self._assert_ipmi(remote) | |
157 | remote.console.power_off() | |
158 | else: | |
159 | self.ctx.daemons.get_daemon('mds', mds).stop() | |
160 | ||
161 | @staticmethod | |
162 | def _assert_ipmi(remote): | |
163 | assert remote.console.has_ipmi_credentials, ( | |
164 | "powercycling requested but RemoteConsole is not " | |
165 | "initialized. Check ipmi config.") | |
166 | ||
11fdf7f2 | 167 | def revive_mds(self, mds): |
7c673cae FG |
168 | """ |
169 | Revive mds -- do an ipmpi powercycle (if indicated by the config) | |
11fdf7f2 | 170 | and then restart. |
7c673cae FG |
171 | """ |
172 | if self.config.get('powercycle'): | |
173 | (remote,) = (self.ctx.cluster.only('mds.{m}'.format(m=mds)). | |
9f95a23c | 174 | remotes.keys()) |
7c673cae FG |
175 | self.log('revive_mds on mds.{m} doing powercycle of {s}'. |
176 | format(m=mds, s=remote.name)) | |
177 | self._assert_ipmi(remote) | |
178 | remote.console.power_on() | |
179 | self.manager.make_admin_daemon_dir(self.ctx, remote) | |
180 | args = [] | |
7c673cae FG |
181 | self.ctx.daemons.get_daemon('mds', mds).restart(*args) |
182 | ||
183 | def wait_for_stable(self, rank = None, gid = None): | |
184 | self.log('waiting for mds cluster to stabilize...') | |
185 | for itercount in itertools.count(): | |
186 | status = self.fs.status() | |
187 | max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds'] | |
188 | ranks = list(status.get_ranks(self.fs.id)) | |
e306af50 TL |
189 | stopping = sum(1 for _ in ranks if "up:stopping" == _['state']) |
190 | actives = sum(1 for _ in ranks | |
191 | if "up:active" == _['state'] and "laggy_since" not in _) | |
7c673cae | 192 | |
e306af50 | 193 | if not bool(self.config.get('thrash_while_stopping', False)) and stopping > 0: |
7c673cae FG |
194 | if itercount % 5 == 0: |
195 | self.log('cluster is considered unstable while MDS are in up:stopping (!thrash_while_stopping)') | |
196 | else: | |
197 | if rank is not None: | |
198 | try: | |
199 | info = status.get_rank(self.fs.id, rank) | |
200 | if info['gid'] != gid and "up:active" == info['state']: | |
201 | self.log('mds.{name} has gained rank={rank}, replacing gid={gid}'.format(name = info['name'], rank = rank, gid = gid)) | |
202 | return status | |
203 | except: | |
204 | pass # no rank present | |
e306af50 | 205 | if actives >= max_mds: |
7c673cae | 206 | # no replacement can occur! |
9f95a23c | 207 | self.log("cluster has {actives} actives (max_mds is {max_mds}), no MDS can replace rank {rank}".format( |
e306af50 | 208 | actives=actives, max_mds=max_mds, rank=rank)) |
7c673cae FG |
209 | return status |
210 | else: | |
e306af50 TL |
211 | if actives == max_mds: |
212 | self.log('mds cluster has {count} alive and active, now stable!'.format(count = actives)) | |
7c673cae FG |
213 | return status, None |
214 | if itercount > 300/2: # 5 minutes | |
215 | raise RuntimeError('timeout waiting for cluster to stabilize') | |
216 | elif itercount % 5 == 0: | |
217 | self.log('mds map: {status}'.format(status=status)) | |
218 | else: | |
219 | self.log('no change') | |
220 | sleep(2) | |
221 | ||
222 | def do_thrash(self): | |
223 | """ | |
224 | Perform the random thrashing action | |
225 | """ | |
226 | ||
227 | self.log('starting mds_do_thrash for fs {fs}'.format(fs = self.fs.name)) | |
228 | stats = { | |
229 | "max_mds": 0, | |
230 | "deactivate": 0, | |
231 | "kill": 0, | |
232 | } | |
233 | ||
234 | while not self.stopping.is_set(): | |
235 | delay = self.max_thrash_delay | |
236 | if self.randomize: | |
237 | delay = random.randrange(0.0, self.max_thrash_delay) | |
238 | ||
239 | if delay > 0.0: | |
240 | self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) | |
241 | self.stopping.wait(delay) | |
242 | if self.stopping.is_set(): | |
243 | continue | |
244 | ||
245 | status = self.fs.status() | |
246 | ||
247 | if random.random() <= self.thrash_max_mds: | |
248 | max_mds = status.get_fsmap(self.fs.id)['mdsmap']['max_mds'] | |
e306af50 | 249 | options = list(range(1, max_mds))+list(range(max_mds+1, self.max_mds+1)) |
7c673cae FG |
250 | if len(options) > 0: |
251 | sample = random.sample(options, 1) | |
252 | new_max_mds = sample[0] | |
253 | self.log('thrashing max_mds: %d -> %d' % (max_mds, new_max_mds)) | |
254 | self.fs.set_max_mds(new_max_mds) | |
255 | stats['max_mds'] += 1 | |
11fdf7f2 | 256 | self.wait_for_stable() |
7c673cae FG |
257 | |
258 | count = 0 | |
259 | for info in status.get_ranks(self.fs.id): | |
260 | name = info['name'] | |
261 | label = 'mds.' + name | |
262 | rank = info['rank'] | |
263 | gid = info['gid'] | |
264 | ||
265 | # if thrash_weights isn't specified and we've reached max_thrash, | |
266 | # we're done | |
267 | count = count + 1 | |
268 | if 'thrash_weights' not in self.config and count > self.max_thrash: | |
269 | break | |
270 | ||
271 | weight = 1.0 | |
272 | if 'thrash_weights' in self.config: | |
273 | weight = self.config['thrash_weights'].get(label, '0.0') | |
274 | skip = random.randrange(0.0, 1.0) | |
275 | if weight <= skip: | |
276 | self.log('skipping thrash iteration with skip ({skip}) > weight ({weight})'.format(skip=skip, weight=weight)) | |
277 | continue | |
278 | ||
279 | self.log('kill {label} (rank={rank})'.format(label=label, rank=rank)) | |
280 | self.kill_mds(name) | |
281 | stats['kill'] += 1 | |
282 | ||
283 | # wait for mon to report killed mds as crashed | |
284 | last_laggy_since = None | |
285 | itercount = 0 | |
286 | while True: | |
287 | status = self.fs.status() | |
288 | info = status.get_mds(name) | |
289 | if not info: | |
290 | break | |
291 | if 'laggy_since' in info: | |
292 | last_laggy_since = info['laggy_since'] | |
293 | break | |
294 | if any([(f == name) for f in status.get_fsmap(self.fs.id)['mdsmap']['failed']]): | |
295 | break | |
296 | self.log( | |
297 | 'waiting till mds map indicates {label} is laggy/crashed, in failed state, or {label} is removed from mdsmap'.format( | |
298 | label=label)) | |
299 | itercount = itercount + 1 | |
300 | if itercount > 10: | |
301 | self.log('mds map: {status}'.format(status=status)) | |
302 | sleep(2) | |
303 | ||
304 | if last_laggy_since: | |
305 | self.log( | |
306 | '{label} reported laggy/crashed since: {since}'.format(label=label, since=last_laggy_since)) | |
307 | else: | |
9f95a23c | 308 | self.log('{label} down, removed from mdsmap'.format(label=label)) |
7c673cae FG |
309 | |
310 | # wait for a standby mds to takeover and become active | |
311 | status = self.wait_for_stable(rank, gid) | |
312 | ||
313 | # wait for a while before restarting old active to become new | |
314 | # standby | |
315 | delay = self.max_revive_delay | |
316 | if self.randomize: | |
317 | delay = random.randrange(0.0, self.max_revive_delay) | |
318 | ||
319 | self.log('waiting for {delay} secs before reviving {label}'.format( | |
320 | delay=delay, label=label)) | |
321 | sleep(delay) | |
322 | ||
323 | self.log('reviving {label}'.format(label=label)) | |
324 | self.revive_mds(name) | |
325 | ||
326 | for itercount in itertools.count(): | |
327 | if itercount > 300/2: # 5 minutes | |
328 | raise RuntimeError('timeout waiting for MDS to revive') | |
329 | status = self.fs.status() | |
330 | info = status.get_mds(name) | |
331 | if info and info['state'] in ('up:standby', 'up:standby-replay', 'up:active'): | |
332 | self.log('{label} reported in {state} state'.format(label=label, state=info['state'])) | |
333 | break | |
334 | self.log( | |
335 | 'waiting till mds map indicates {label} is in active, standby or standby-replay'.format(label=label)) | |
336 | sleep(2) | |
337 | ||
338 | for stat in stats: | |
339 | self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat])) | |
340 | ||
341 | # don't do replay thrashing right now | |
342 | # for info in status.get_replays(self.fs.id): | |
343 | # # this might race with replay -> active transition... | |
344 | # if status['state'] == 'up:replay' and random.randrange(0.0, 1.0) < self.thrash_in_replay: | |
345 | # delay = self.max_replay_thrash_delay | |
346 | # if self.randomize: | |
347 | # delay = random.randrange(0.0, self.max_replay_thrash_delay) | |
348 | # sleep(delay) | |
349 | # self.log('kill replaying mds.{id}'.format(id=self.to_kill)) | |
350 | # self.kill_mds(self.to_kill) | |
351 | # | |
352 | # delay = self.max_revive_delay | |
353 | # if self.randomize: | |
354 | # delay = random.randrange(0.0, self.max_revive_delay) | |
355 | # | |
356 | # self.log('waiting for {delay} secs before reviving mds.{id}'.format( | |
357 | # delay=delay, id=self.to_kill)) | |
358 | # sleep(delay) | |
359 | # | |
360 | # self.log('revive mds.{id}'.format(id=self.to_kill)) | |
361 | # self.revive_mds(self.to_kill) | |
362 | ||
363 | ||
364 | @contextlib.contextmanager | |
365 | def task(ctx, config): | |
366 | """ | |
367 | Stress test the mds by thrashing while another task/workunit | |
368 | is running. | |
369 | ||
370 | Please refer to MDSThrasher class for further information on the | |
371 | available options. | |
372 | """ | |
373 | ||
374 | mds_cluster = MDSCluster(ctx) | |
375 | ||
376 | if config is None: | |
377 | config = {} | |
378 | assert isinstance(config, dict), \ | |
379 | 'mds_thrash task only accepts a dict for configuration' | |
380 | mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds')) | |
381 | assert len(mdslist) > 1, \ | |
382 | 'mds_thrash task requires at least 2 metadata servers' | |
383 | ||
384 | # choose random seed | |
385 | if 'seed' in config: | |
386 | seed = int(config['seed']) | |
387 | else: | |
388 | seed = int(time.time()) | |
389 | log.info('mds thrasher using random seed: {seed}'.format(seed=seed)) | |
390 | random.seed(seed) | |
391 | ||
9f95a23c | 392 | (first,) = ctx.cluster.only('mds.{_id}'.format(_id=mdslist[0])).remotes.keys() |
7c673cae FG |
393 | manager = ceph_manager.CephManager( |
394 | first, ctx=ctx, logger=log.getChild('ceph_manager'), | |
395 | ) | |
396 | ||
397 | # make sure everyone is in active, standby, or standby-replay | |
398 | log.info('Wait for all MDSs to reach steady state...') | |
399 | status = mds_cluster.status() | |
400 | while True: | |
401 | steady = True | |
402 | for info in status.get_all(): | |
403 | state = info['state'] | |
404 | if state not in ('up:active', 'up:standby', 'up:standby-replay'): | |
405 | steady = False | |
406 | break | |
407 | if steady: | |
408 | break | |
409 | sleep(2) | |
410 | status = mds_cluster.status() | |
411 | log.info('Ready to start thrashing') | |
412 | ||
7c673cae FG |
413 | manager.wait_for_clean() |
414 | assert manager.is_clean() | |
9f95a23c TL |
415 | |
416 | if 'cluster' not in config: | |
417 | config['cluster'] = 'ceph' | |
418 | ||
7c673cae FG |
419 | for fs in status.get_filesystems(): |
420 | thrasher = MDSThrasher(ctx, manager, config, Filesystem(ctx, fs['id']), fs['mdsmap']['max_mds']) | |
421 | thrasher.start() | |
9f95a23c | 422 | ctx.ceph[config['cluster']].thrashers.append(thrasher) |
7c673cae FG |
423 | |
424 | try: | |
425 | log.debug('Yielding') | |
426 | yield | |
427 | finally: | |
9f95a23c TL |
428 | log.info('joining mds_thrasher') |
429 | thrasher.stop() | |
430 | if thrasher.exception is not None: | |
431 | raise RuntimeError('error during thrashing') | |
432 | thrasher.join() | |
7c673cae | 433 | log.info('done joining') |