]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | """ |
2 | Thrash mds by simulating failures | |
3 | """ | |
4 | import logging | |
5 | import contextlib | |
6 | ||
7 | from gevent import sleep, GreenletExit | |
8 | from gevent.greenlet import Greenlet | |
9 | from gevent.event import Event | |
10 | from teuthology import misc as teuthology | |
11 | ||
12 | from tasks import ceph_manager | |
13 | from tasks.cephfs.filesystem import MDSCluster, Filesystem | |
14 | from tasks.thrasher import Thrasher | |
15 | ||
16 | log = logging.getLogger(__name__) | |
17 | ||
18 | class ForwardScrubber(Thrasher, Greenlet): | |
19 | """ | |
20 | ForwardScrubber:: | |
21 | ||
22 | The ForwardScrubber does forward scrubbing of file-systems during execution | |
23 | of other tasks (workunits, etc). | |
24 | """ | |
25 | ||
26 | def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1): | |
27 | super(ForwardScrubber, self).__init__() | |
28 | ||
29 | self.logger = log.getChild('fs.[{f}]'.format(f=fs.name)) | |
30 | self.fs = fs | |
31 | self.name = 'thrasher.fs.[{f}]'.format(f=fs.name) | |
32 | self.stopping = Event() | |
33 | self.scrub_timeout = scrub_timeout | |
34 | self.sleep_between_iterations = sleep_between_iterations | |
35 | ||
36 | def _run(self): | |
37 | try: | |
38 | self.do_scrub() | |
39 | except Exception as e: | |
40 | self.set_thrasher_exception(e) | |
41 | self.logger.exception("exception:") | |
42 | # allow successful completion so gevent doesn't see an exception... | |
43 | ||
44 | def stop(self): | |
45 | self.stopping.set() | |
46 | ||
47 | def do_scrub(self): | |
48 | """ | |
49 | Perform the file-system scrubbing | |
50 | """ | |
51 | self.logger.info(f'start scrubbing fs: {self.fs.name}') | |
52 | ||
53 | try: | |
54 | while not self.stopping.is_set(): | |
55 | self._scrub() | |
56 | sleep(self.sleep_between_iterations) | |
57 | except GreenletExit: | |
58 | pass | |
59 | ||
60 | self.logger.info(f'end scrubbing fs: {self.fs.name}') | |
61 | ||
62 | def _scrub(self, path="/", recursive=True): | |
63 | self.logger.info(f"scrubbing fs: {self.fs.name}") | |
b3b6e05e TL |
64 | scrubopts = ["force"] |
65 | if recursive: | |
66 | scrubopts.append("recursive") | |
67 | out_json = self.fs.run_scrub(["start", path, ",".join(scrubopts)]) | |
f67539c2 TL |
68 | assert out_json is not None |
69 | ||
70 | tag = out_json['scrub_tag'] | |
71 | ||
72 | assert tag is not None | |
73 | assert out_json['return_code'] == 0 | |
74 | assert out_json['mode'] == 'asynchronous' | |
75 | ||
76 | return self.fs.wait_until_scrub_complete(tag=tag, sleep=30, | |
77 | timeout=self.scrub_timeout) | |
78 | ||
79 | def stop_all_fwd_scrubbers(thrashers): | |
80 | for thrasher in thrashers: | |
81 | if not isinstance(thrasher, ForwardScrubber): | |
82 | continue | |
83 | thrasher.stop() | |
84 | thrasher.join() | |
85 | if thrasher.exception is not None: | |
86 | raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}") | |
87 | ||
88 | ||
89 | @contextlib.contextmanager | |
90 | def task(ctx, config): | |
91 | """ | |
92 | Stress test the mds by running scrub iterations while another task/workunit | |
93 | is running. | |
94 | Example config: | |
95 | ||
96 | - fwd_scrub: | |
97 | scrub_timeout: 300 | |
98 | sleep_between_iterations: 1 | |
99 | """ | |
100 | ||
101 | mds_cluster = MDSCluster(ctx) | |
102 | ||
103 | if config is None: | |
104 | config = {} | |
105 | assert isinstance(config, dict), \ | |
106 | 'fwd_scrub task only accepts a dict for configuration' | |
107 | mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds')) | |
108 | assert len(mdslist) > 0, \ | |
109 | 'fwd_scrub task requires at least 1 metadata server' | |
110 | ||
111 | (first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys() | |
112 | manager = ceph_manager.CephManager( | |
113 | first, ctx=ctx, logger=log.getChild('ceph_manager'), | |
114 | ) | |
115 | ||
116 | # make sure everyone is in active, standby, or standby-replay | |
117 | log.info('Wait for all MDSs to reach steady state...') | |
118 | status = mds_cluster.status() | |
119 | while True: | |
120 | steady = True | |
121 | for info in status.get_all(): | |
122 | state = info['state'] | |
123 | if state not in ('up:active', 'up:standby', 'up:standby-replay'): | |
124 | steady = False | |
125 | break | |
126 | if steady: | |
127 | break | |
128 | sleep(2) | |
129 | status = mds_cluster.status() | |
130 | ||
131 | log.info('Ready to start scrub thrashing') | |
132 | ||
133 | manager.wait_for_clean() | |
134 | assert manager.is_clean() | |
135 | ||
136 | if 'cluster' not in config: | |
137 | config['cluster'] = 'ceph' | |
138 | ||
139 | for fs in status.get_filesystems(): | |
140 | fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']), | |
141 | config['scrub_timeout'], | |
142 | config['sleep_between_iterations']) | |
143 | fwd_scrubber.start() | |
144 | ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber) | |
145 | ||
146 | try: | |
147 | log.debug('Yielding') | |
148 | yield | |
149 | finally: | |
150 | log.info('joining ForwardScrubbers') | |
151 | stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers) | |
152 | log.info('done joining') |