]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/recovery_bench.py
bump version to 12.2.12-pve1
[ceph.git] / ceph / qa / tasks / recovery_bench.py
1 """
2 Recovery system benchmarking
3 """
4 from cStringIO import StringIO
5
6 import contextlib
7 import gevent
8 import json
9 import logging
10 import random
11 import time
12
13 import ceph_manager
14 from teuthology import misc as teuthology
15
16 log = logging.getLogger(__name__)
17
18 @contextlib.contextmanager
19 def task(ctx, config):
20 """
21 Benchmark the recovery system.
22
23 Generates objects with smalliobench, runs it normally to get a
24 baseline performance measurement, then marks an OSD out and reruns
25 to measure performance during recovery.
26
27 The config should be as follows:
28
29 recovery_bench:
30 duration: <seconds for each measurement run>
31 num_objects: <number of objects>
32 io_size: <io size in bytes>
33
34 example:
35
36 tasks:
37 - ceph:
38 - recovery_bench:
39 duration: 60
40 num_objects: 500
41 io_size: 4096
42 """
43 if config is None:
44 config = {}
45 assert isinstance(config, dict), \
46 'recovery_bench task only accepts a dict for configuration'
47
48 log.info('Beginning recovery bench...')
49
50 first_mon = teuthology.get_first_mon(ctx, config)
51 (mon,) = ctx.cluster.only(first_mon).remotes.iterkeys()
52
53 manager = ceph_manager.CephManager(
54 mon,
55 ctx=ctx,
56 logger=log.getChild('ceph_manager'),
57 )
58
59 num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd')
60 while len(manager.get_osd_status()['up']) < num_osds:
61 time.sleep(10)
62
63 bench_proc = RecoveryBencher(
64 manager,
65 config,
66 )
67 try:
68 yield
69 finally:
70 log.info('joining recovery bencher')
71 bench_proc.do_join()
72
73 class RecoveryBencher:
74 """
75 RecoveryBencher
76 """
77 def __init__(self, manager, config):
78 self.ceph_manager = manager
79 self.ceph_manager.wait_for_clean()
80
81 osd_status = self.ceph_manager.get_osd_status()
82 self.osds = osd_status['up']
83
84 self.config = config
85 if self.config is None:
86 self.config = dict()
87
88 else:
89 def tmp(x):
90 """
91 Local wrapper to print value.
92 """
93 print x
94 self.log = tmp
95
96 log.info("spawning thread")
97
98 self.thread = gevent.spawn(self.do_bench)
99
100 def do_join(self):
101 """
102 Join the recovery bencher. This is called after the main
103 task exits.
104 """
105 self.thread.get()
106
107 def do_bench(self):
108 """
109 Do the benchmarking.
110 """
111 duration = self.config.get("duration", 60)
112 num_objects = self.config.get("num_objects", 500)
113 io_size = self.config.get("io_size", 4096)
114
115 osd = str(random.choice(self.osds))
116 (osd_remote,) = self.ceph_manager.ctx.cluster.only('osd.%s' % osd).remotes.iterkeys()
117
118 testdir = teuthology.get_testdir(self.ceph_manager.ctx)
119
120 # create the objects
121 osd_remote.run(
122 args=[
123 'adjust-ulimits',
124 'ceph-coverage',
125 '{tdir}/archive/coverage'.format(tdir=testdir),
126 'smalliobench'.format(tdir=testdir),
127 '--use-prefix', 'recovery_bench',
128 '--init-only', '1',
129 '--num-objects', str(num_objects),
130 '--io-size', str(io_size),
131 ],
132 wait=True,
133 )
134
135 # baseline bench
136 log.info('non-recovery (baseline)')
137 p = osd_remote.run(
138 args=[
139 'adjust-ulimits',
140 'ceph-coverage',
141 '{tdir}/archive/coverage'.format(tdir=testdir),
142 'smalliobench',
143 '--use-prefix', 'recovery_bench',
144 '--do-not-init', '1',
145 '--duration', str(duration),
146 '--io-size', str(io_size),
147 ],
148 stdout=StringIO(),
149 stderr=StringIO(),
150 wait=True,
151 )
152 self.process_samples(p.stderr.getvalue())
153
154 self.ceph_manager.raw_cluster_cmd('osd', 'out', osd)
155 time.sleep(5)
156
157 # recovery bench
158 log.info('recovery active')
159 p = osd_remote.run(
160 args=[
161 'adjust-ulimits',
162 'ceph-coverage',
163 '{tdir}/archive/coverage'.format(tdir=testdir),
164 'smalliobench',
165 '--use-prefix', 'recovery_bench',
166 '--do-not-init', '1',
167 '--duration', str(duration),
168 '--io-size', str(io_size),
169 ],
170 stdout=StringIO(),
171 stderr=StringIO(),
172 wait=True,
173 )
174 self.process_samples(p.stderr.getvalue())
175
176 self.ceph_manager.raw_cluster_cmd('osd', 'in', osd)
177
178 def process_samples(self, input):
179 """
180 Extract samples from the input and process the results
181
182 :param input: input lines in JSON format
183 """
184 lat = {}
185 for line in input.split('\n'):
186 try:
187 sample = json.loads(line)
188 samples = lat.setdefault(sample['type'], [])
189 samples.append(float(sample['latency']))
190 except Exception:
191 pass
192
193 for type in lat:
194 samples = lat[type]
195 samples.sort()
196
197 num = len(samples)
198
199 # median
200 if num & 1 == 1: # odd number of samples
201 median = samples[num / 2]
202 else:
203 median = (samples[num / 2] + samples[num / 2 - 1]) / 2
204
205 # 99%
206 ninety_nine = samples[int(num * 0.99)]
207
208 log.info("%s: median %f, 99%% %f" % (type, median, ninety_nine))