]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs_mirror_thrash.py
import ceph 16.2.7
[ceph.git] / ceph / qa / tasks / cephfs_mirror_thrash.py
1 """
2 Task for thrashing cephfs-mirror daemons
3 """
4
5 import contextlib
6 import logging
7 import random
8 import signal
9 import socket
10 import time
11
12 from gevent import sleep
13 from gevent.greenlet import Greenlet
14 from gevent.event import Event
15
16 from teuthology.exceptions import CommandFailedError
17 from teuthology.orchestra import run
18 from tasks.thrasher import Thrasher
19
20 log = logging.getLogger(__name__)
21
22
23 class CephFSMirrorThrasher(Thrasher, Greenlet):
24 """
25 CephFSMirrorThrasher::
26
27 The CephFSMirrorThrasher thrashes cephfs-mirror daemons during execution of other
28 tasks (workunits, etc).
29
30 The config is optional. Many of the config parameters are a maximum value
31 to use when selecting a random value from a range. The config is a dict
32 containing some or all of:
33
34 cluster: [default: ceph] cluster to thrash
35
36 max_thrash: [default: 1] the maximum number of active cephfs-mirror daemons per
37 cluster will be thrashed at any given time.
38
39 min_thrash_delay: [default: 60] minimum number of seconds to delay before
40 thrashing again.
41
42 max_thrash_delay: [default: 120] maximum number of seconds to delay before
43 thrashing again.
44
45 max_revive_delay: [default: 10] maximum number of seconds to delay before
46 bringing back a thrashed cephfs-mirror daemon.
47
48 randomize: [default: true] enables randomization and use the max/min values
49
50 seed: [no default] seed the random number generator
51
52 Examples::
53
54 The following example disables randomization, and uses the max delay
55 values:
56
57 tasks:
58 - ceph:
59 - cephfs_mirror_thrash:
60 randomize: False
61 max_thrash_delay: 10
62 """
63
64 def __init__(self, ctx, config, cluster, daemons):
65 super(CephFSMirrorThrasher, self).__init__()
66
67 self.ctx = ctx
68 self.config = config
69 self.cluster = cluster
70 self.daemons = daemons
71
72 self.logger = log
73 self.name = 'thrasher.cephfs_mirror.[{cluster}]'.format(cluster = cluster)
74 self.stopping = Event()
75
76 self.randomize = bool(self.config.get('randomize', True))
77 self.max_thrash = int(self.config.get('max_thrash', 1))
78 self.min_thrash_delay = float(self.config.get('min_thrash_delay', 5.0))
79 self.max_thrash_delay = float(self.config.get('max_thrash_delay', 10))
80 self.max_revive_delay = float(self.config.get('max_revive_delay', 15.0))
81
82 def _run(self):
83 try:
84 self.do_thrash()
85 except Exception as e:
86 # See _run exception comment for MDSThrasher
87 self.set_thrasher_exception(e)
88 self.logger.exception("exception:")
89 # Allow successful completion so gevent doesn't see an exception.
90 # The DaemonWatchdog will observe the error and tear down the test.
91
92 def log(self, x):
93 """Write data to logger assigned to this CephFSMirrorThrasher"""
94 self.logger.info(x)
95
96 def stop(self):
97 self.stopping.set()
98
99 def do_thrash(self):
100 """
101 Perform the random thrashing action
102 """
103
104 self.log('starting thrash for cluster {cluster}'.format(cluster=self.cluster))
105 stats = {
106 "kill": 0,
107 }
108
109 while not self.stopping.is_set():
110 delay = self.max_thrash_delay
111 if self.randomize:
112 delay = random.randrange(self.min_thrash_delay, self.max_thrash_delay)
113
114 if delay > 0.0:
115 self.log('waiting for {delay} secs before thrashing'.format(delay=delay))
116 self.stopping.wait(delay)
117 if self.stopping.is_set():
118 continue
119
120 killed_daemons = []
121
122 weight = 1.0 / len(self.daemons)
123 count = 0
124 for daemon in self.daemons:
125 skip = random.uniform(0.0, 1.0)
126 if weight <= skip:
127 self.log('skipping daemon {label} with skip ({skip}) > weight ({weight})'.format(
128 label=daemon.id_, skip=skip, weight=weight))
129 continue
130
131 self.log('kill {label}'.format(label=daemon.id_))
132 try:
133 daemon.signal(signal.SIGTERM)
134 except Exception as e:
135 self.log(f'exception when stopping mirror daemon: {e}')
136 else:
137 killed_daemons.append(daemon)
138 stats['kill'] += 1
139
140 # if we've reached max_thrash, we're done
141 count += 1
142 if count >= self.max_thrash:
143 break
144
145 if killed_daemons:
146 # wait for a while before restarting
147 delay = self.max_revive_delay
148 if self.randomize:
149 delay = random.randrange(0.0, self.max_revive_delay)
150
151 self.log('waiting for {delay} secs before reviving daemons'.format(delay=delay))
152 sleep(delay)
153
154 for daemon in killed_daemons:
155 self.log('waiting for {label}'.format(label=daemon.id_))
156 try:
157 run.wait([daemon.proc], timeout=600)
158 except CommandFailedError:
159 pass
160 except:
161 self.log('Failed to stop {label}'.format(label=daemon.id_))
162
163 try:
164 # try to capture a core dump
165 daemon.signal(signal.SIGABRT)
166 except socket.error:
167 pass
168 raise
169 finally:
170 daemon.reset()
171
172 for daemon in killed_daemons:
173 self.log('reviving {label}'.format(label=daemon.id_))
174 daemon.start()
175
176 for stat in stats:
177 self.log("stat['{key}'] = {value}".format(key = stat, value = stats[stat]))
178
179 @contextlib.contextmanager
180 def task(ctx, config):
181 """
182 Stress test the cephfs-mirror by thrashing while another task/workunit
183 is running.
184
185 Please refer to CephFSMirrorThrasher class for further information on the
186 available options.
187 """
188 if config is None:
189 config = {}
190 assert isinstance(config, dict), \
191 'cephfs_mirror_thrash task only accepts a dict for configuration'
192
193 cluster = config.get('cluster', 'ceph')
194 daemons = list(ctx.daemons.iter_daemons_of_role('cephfs-mirror', cluster))
195 assert len(daemons) > 0, \
196 'cephfs_mirror_thrash task requires at least 1 cephfs-mirror daemon'
197
198 # choose random seed
199 if 'seed' in config:
200 seed = int(config['seed'])
201 else:
202 seed = int(time.time())
203 log.info('cephfs_mirror_thrash using random seed: {seed}'.format(seed=seed))
204 random.seed(seed)
205
206 thrasher = CephFSMirrorThrasher(ctx, config, cluster, daemons)
207 thrasher.start()
208 ctx.ceph[cluster].thrashers.append(thrasher)
209
210 try:
211 log.debug('Yielding')
212 yield
213 finally:
214 log.info('joining cephfs_mirror_thrash')
215 thrasher.stop()
216 if thrasher.exception is not None:
217 raise RuntimeError('error during thrashing')
218 thrasher.join()
219 log.info('done joining')