]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/fwd_scrub.py
a7ae1c4ed74a485e18ac03df292f882979dd7808
[ceph.git] / ceph / qa / tasks / fwd_scrub.py
1 """
2 Thrash mds by simulating failures
3 """
4 import logging
5 import contextlib
6
7 from gevent import sleep, GreenletExit
8 from gevent.greenlet import Greenlet
9 from gevent.event import Event
10 from teuthology import misc as teuthology
11
12 from tasks import ceph_manager
13 from tasks.cephfs.filesystem import MDSCluster, Filesystem
14 from tasks.thrasher import Thrasher
15
16 log = logging.getLogger(__name__)
17
18 class ForwardScrubber(Thrasher, Greenlet):
19 """
20 ForwardScrubber::
21
22 The ForwardScrubber does forward scrubbing of file-systems during execution
23 of other tasks (workunits, etc).
24 """
25
26 def __init__(self, fs, scrub_timeout=300, sleep_between_iterations=1):
27 super(ForwardScrubber, self).__init__()
28
29 self.logger = log.getChild('fs.[{f}]'.format(f=fs.name))
30 self.fs = fs
31 self.name = 'thrasher.fs.[{f}]'.format(f=fs.name)
32 self.stopping = Event()
33 self.scrub_timeout = scrub_timeout
34 self.sleep_between_iterations = sleep_between_iterations
35
36 def _run(self):
37 try:
38 self.do_scrub()
39 except Exception as e:
40 self.set_thrasher_exception(e)
41 self.logger.exception("exception:")
42 # allow successful completion so gevent doesn't see an exception...
43
44 def stop(self):
45 self.stopping.set()
46
47 def do_scrub(self):
48 """
49 Perform the file-system scrubbing
50 """
51 self.logger.info(f'start scrubbing fs: {self.fs.name}')
52
53 try:
54 while not self.stopping.is_set():
55 self._scrub()
56 sleep(self.sleep_between_iterations)
57 except GreenletExit:
58 pass
59
60 self.logger.info(f'end scrubbing fs: {self.fs.name}')
61
62 def _scrub(self, path="/", recursive=True):
63 self.logger.info(f"scrubbing fs: {self.fs.name}")
64 recopt = ["recursive", "force"] if recursive else ["force"]
65 out_json = self.fs.run_scrub(["start", path] + recopt)
66 assert out_json is not None
67
68 tag = out_json['scrub_tag']
69
70 assert tag is not None
71 assert out_json['return_code'] == 0
72 assert out_json['mode'] == 'asynchronous'
73
74 return self.fs.wait_until_scrub_complete(tag=tag, sleep=30,
75 timeout=self.scrub_timeout)
76
77 def stop_all_fwd_scrubbers(thrashers):
78 for thrasher in thrashers:
79 if not isinstance(thrasher, ForwardScrubber):
80 continue
81 thrasher.stop()
82 thrasher.join()
83 if thrasher.exception is not None:
84 raise RuntimeError(f"error during scrub thrashing: {thrasher.exception}")
85
86
87 @contextlib.contextmanager
88 def task(ctx, config):
89 """
90 Stress test the mds by running scrub iterations while another task/workunit
91 is running.
92 Example config:
93
94 - fwd_scrub:
95 scrub_timeout: 300
96 sleep_between_iterations: 1
97 """
98
99 mds_cluster = MDSCluster(ctx)
100
101 if config is None:
102 config = {}
103 assert isinstance(config, dict), \
104 'fwd_scrub task only accepts a dict for configuration'
105 mdslist = list(teuthology.all_roles_of_type(ctx.cluster, 'mds'))
106 assert len(mdslist) > 0, \
107 'fwd_scrub task requires at least 1 metadata server'
108
109 (first,) = ctx.cluster.only(f'mds.{mdslist[0]}').remotes.keys()
110 manager = ceph_manager.CephManager(
111 first, ctx=ctx, logger=log.getChild('ceph_manager'),
112 )
113
114 # make sure everyone is in active, standby, or standby-replay
115 log.info('Wait for all MDSs to reach steady state...')
116 status = mds_cluster.status()
117 while True:
118 steady = True
119 for info in status.get_all():
120 state = info['state']
121 if state not in ('up:active', 'up:standby', 'up:standby-replay'):
122 steady = False
123 break
124 if steady:
125 break
126 sleep(2)
127 status = mds_cluster.status()
128
129 log.info('Ready to start scrub thrashing')
130
131 manager.wait_for_clean()
132 assert manager.is_clean()
133
134 if 'cluster' not in config:
135 config['cluster'] = 'ceph'
136
137 for fs in status.get_filesystems():
138 fwd_scrubber = ForwardScrubber(Filesystem(ctx, fscid=fs['id']),
139 config['scrub_timeout'],
140 config['sleep_between_iterations'])
141 fwd_scrubber.start()
142 ctx.ceph[config['cluster']].thrashers.append(fwd_scrubber)
143
144 try:
145 log.debug('Yielding')
146 yield
147 finally:
148 log.info('joining ForwardScrubbers')
149 stop_all_fwd_scrubbers(ctx.ceph[config['cluster']].thrashers)
150 log.info('done joining')