]>
Commit | Line | Data |
---|---|---|
a4b75251 TL |
1 | """ |
2 | Task for thrashing cephfs-mirror daemons | |
3 | """ | |
4 | ||
5 | import contextlib | |
6 | import logging | |
7 | import random | |
8 | import signal | |
9 | import socket | |
10 | import time | |
11 | ||
12 | from gevent import sleep | |
13 | from gevent.greenlet import Greenlet | |
14 | from gevent.event import Event | |
15 | ||
16 | from teuthology.exceptions import CommandFailedError | |
17 | from teuthology.orchestra import run | |
18 | from tasks.thrasher import Thrasher | |
19 | ||
20 | log = logging.getLogger(__name__) | |
21 | ||
22 | ||
23 | class CephFSMirrorThrasher(Thrasher, Greenlet): | |
24 | """ | |
25 | CephFSMirrorThrasher:: | |
26 | ||
27 | The CephFSMirrorThrasher thrashes cephfs-mirror daemons during execution of other | |
28 | tasks (workunits, etc). | |
29 | ||
30 | The config is optional. Many of the config parameters are a maximum value | |
31 | to use when selecting a random value from a range. The config is a dict | |
32 | containing some or all of: | |
33 | ||
34 | cluster: [default: ceph] cluster to thrash | |
35 | ||
36 | max_thrash: [default: 1] the maximum number of active cephfs-mirror daemons per | |
37 | cluster will be thrashed at any given time. | |
38 | ||
39 | min_thrash_delay: [default: 60] minimum number of seconds to delay before | |
40 | thrashing again. | |
41 | ||
42 | max_thrash_delay: [default: 120] maximum number of seconds to delay before | |
43 | thrashing again. | |
44 | ||
45 | max_revive_delay: [default: 10] maximum number of seconds to delay before | |
46 | bringing back a thrashed cephfs-mirror daemon. | |
47 | ||
48 | randomize: [default: true] enables randomization and use the max/min values | |
49 | ||
50 | seed: [no default] seed the random number generator | |
51 | ||
52 | Examples:: | |
53 | ||
54 | The following example disables randomization, and uses the max delay | |
55 | values: | |
56 | ||
57 | tasks: | |
58 | - ceph: | |
59 | - cephfs_mirror_thrash: | |
60 | randomize: False | |
61 | max_thrash_delay: 10 | |
62 | """ | |
63 | ||
64 | def __init__(self, ctx, config, cluster, daemons): | |
65 | super(CephFSMirrorThrasher, self).__init__() | |
66 | ||
67 | self.ctx = ctx | |
68 | self.config = config | |
69 | self.cluster = cluster | |
70 | self.daemons = daemons | |
71 | ||
72 | self.logger = log | |
73 | self.name = 'thrasher.cephfs_mirror.[{cluster}]'.format(cluster = cluster) | |
74 | self.stopping = Event() | |
75 | ||
76 | self.randomize = bool(self.config.get('randomize', True)) | |
77 | self.max_thrash = int(self.config.get('max_thrash', 1)) | |
78 | self.min_thrash_delay = float(self.config.get('min_thrash_delay', 5.0)) | |
79 | self.max_thrash_delay = float(self.config.get('max_thrash_delay', 10)) | |
80 | self.max_revive_delay = float(self.config.get('max_revive_delay', 15.0)) | |
81 | ||
82 | def _run(self): | |
83 | try: | |
84 | self.do_thrash() | |
85 | except Exception as e: | |
86 | # See _run exception comment for MDSThrasher | |
87 | self.set_thrasher_exception(e) | |
88 | self.logger.exception("exception:") | |
89 | # Allow successful completion so gevent doesn't see an exception. | |
90 | # The DaemonWatchdog will observe the error and tear down the test. | |
91 | ||
92 | def log(self, x): | |
93 | """Write data to logger assigned to this CephFSMirrorThrasher""" | |
94 | self.logger.info(x) | |
95 | ||
96 | def stop(self): | |
97 | self.stopping.set() | |
98 | ||
99 | def do_thrash(self): | |
100 | """ | |
101 | Perform the random thrashing action | |
102 | """ | |
103 | ||
104 | self.log('starting thrash for cluster {cluster}'.format(cluster=self.cluster)) | |
105 | stats = { | |
106 | "kill": 0, | |
107 | } | |
108 | ||
109 | while not self.stopping.is_set(): | |
110 | delay = self.max_thrash_delay | |
111 | if self.randomize: | |
112 | delay = random.randrange(self.min_thrash_delay, self.max_thrash_delay) | |
113 | ||
114 | if delay > 0.0: | |
115 | self.log('waiting for {delay} secs before thrashing'.format(delay=delay)) | |
116 | self.stopping.wait(delay) | |
117 | if self.stopping.is_set(): | |
118 | continue | |
119 | ||
120 | killed_daemons = [] | |
121 | ||
122 | weight = 1.0 / len(self.daemons) | |
123 | count = 0 | |
124 | for daemon in self.daemons: | |
125 | skip = random.uniform(0.0, 1.0) | |
126 | if weight <= skip: | |
127 | self.log('skipping daemon {label} with skip ({skip}) > weight ({weight})'.format( | |
128 | label=daemon.id_, skip=skip, weight=weight)) | |
129 | continue | |
130 | ||
131 | self.log('kill {label}'.format(label=daemon.id_)) | |
132 | try: | |
133 | daemon.signal(signal.SIGTERM) | |
134 | except Exception as e: | |
135 | self.log(f'exception when stopping mirror daemon: {e}') | |
136 | else: | |
137 | killed_daemons.append(daemon) | |
138 | stats['kill'] += 1 | |
139 | ||
140 | # if we've reached max_thrash, we're done | |
141 | count += 1 | |
142 | if count >= self.max_thrash: | |
143 | break | |
144 | ||
145 | if killed_daemons: | |
146 | # wait for a while before restarting | |
147 | delay = self.max_revive_delay | |
148 | if self.randomize: | |
149 | delay = random.randrange(0.0, self.max_revive_delay) | |
150 | ||
151 | self.log('waiting for {delay} secs before reviving daemons'.format(delay=delay)) | |
152 | sleep(delay) | |
153 | ||
154 | for daemon in killed_daemons: | |
155 | self.log('waiting for {label}'.format(label=daemon.id_)) | |
156 | try: | |
157 | run.wait([daemon.proc], timeout=600) | |
158 | except CommandFailedError: | |
159 | pass | |
160 | except: | |
161 | self.log('Failed to stop {label}'.format(label=daemon.id_)) | |
162 | ||
163 | try: | |
164 | # try to capture a core dump | |
165 | daemon.signal(signal.SIGABRT) | |
166 | except socket.error: | |
167 | pass | |
168 | raise | |
169 | finally: | |
170 | daemon.reset() | |
171 | ||
172 | for daemon in killed_daemons: | |
173 | self.log('reviving {label}'.format(label=daemon.id_)) | |
174 | daemon.start() | |
175 | ||
176 | for stat in stats: | |
177 | self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat])) | |
178 | ||
179 | @contextlib.contextmanager | |
180 | def task(ctx, config): | |
181 | """ | |
182 | Stress test the cephfs-mirror by thrashing while another task/workunit | |
183 | is running. | |
184 | ||
185 | Please refer to CephFSMirrorThrasher class for further information on the | |
186 | available options. | |
187 | """ | |
188 | if config is None: | |
189 | config = {} | |
190 | assert isinstance(config, dict), \ | |
191 | 'cephfs_mirror_thrash task only accepts a dict for configuration' | |
192 | ||
193 | cluster = config.get('cluster', 'ceph') | |
194 | daemons = list(ctx.daemons.iter_daemons_of_role('cephfs-mirror', cluster)) | |
195 | assert len(daemons) > 0, \ | |
196 | 'cephfs_mirror_thrash task requires at least 1 cephfs-mirror daemon' | |
197 | ||
198 | # choose random seed | |
199 | if 'seed' in config: | |
200 | seed = int(config['seed']) | |
201 | else: | |
202 | seed = int(time.time()) | |
203 | log.info('cephfs_mirror_thrash using random seed: {seed}'.format(seed=seed)) | |
204 | random.seed(seed) | |
205 | ||
206 | thrasher = CephFSMirrorThrasher(ctx, config, cluster, daemons) | |
207 | thrasher.start() | |
208 | ctx.ceph[cluster].thrashers.append(thrasher) | |
209 | ||
210 | try: | |
211 | log.debug('Yielding') | |
212 | yield | |
213 | finally: | |
214 | log.info('joining cephfs_mirror_thrash') | |
215 | thrasher.stop() | |
216 | if thrasher.exception is not None: | |
217 | raise RuntimeError('error during thrashing') | |
218 | thrasher.join() | |
219 | log.info('done joining') |