]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/common/Throttle.cc
bump version to 12.1.1-pve1 while rebasing patches
[ceph.git] / ceph / src / test / common / Throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Library Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library Public License for more details.
19 *
20 */
21
22 #include <stdio.h>
23 #include <signal.h>
24 #include "gtest/gtest.h"
25 #include "common/Mutex.h"
26 #include "common/Thread.h"
27 #include "common/Throttle.h"
28 #include "common/ceph_argparse.h"
29
30 #include <thread>
31 #include <atomic>
32 #include <chrono>
33 #include <mutex>
34 #include <list>
35 #include <random>
36
37 class ThrottleTest : public ::testing::Test {
38 protected:
39
40 class Thread_get : public Thread {
41 public:
42 Throttle &throttle;
43 int64_t count;
44 bool waited;
45
46 Thread_get(Throttle& _throttle, int64_t _count) :
47 throttle(_throttle),
48 count(_count),
49 waited(false)
50 {
51 }
52
53 void *entry() override {
54 usleep(5);
55 waited = throttle.get(count);
56 throttle.put(count);
57 return NULL;
58 }
59 };
60
61 };
62
63 TEST_F(ThrottleTest, Throttle) {
64 int64_t throttle_max = 10;
65 Throttle throttle(g_ceph_context, "throttle", throttle_max);
66 ASSERT_EQ(throttle.get_max(), throttle_max);
67 ASSERT_EQ(throttle.get_current(), 0);
68 }
69
70 TEST_F(ThrottleTest, take) {
71 int64_t throttle_max = 10;
72 Throttle throttle(g_ceph_context, "throttle", throttle_max);
73 ASSERT_EQ(throttle.take(throttle_max), throttle_max);
74 ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2);
75 }
76
77 TEST_F(ThrottleTest, get) {
78 int64_t throttle_max = 10;
79 Throttle throttle(g_ceph_context, "throttle");
80
81 // test increasing max from 0 to throttle_max
82 {
83 ASSERT_FALSE(throttle.get(throttle_max, throttle_max));
84 ASSERT_EQ(throttle.get_max(), throttle_max);
85 ASSERT_EQ(throttle.put(throttle_max), 0);
86 }
87
88 ASSERT_FALSE(throttle.get(5));
89 ASSERT_EQ(throttle.put(5), 0);
90
91 ASSERT_FALSE(throttle.get(throttle_max));
92 ASSERT_FALSE(throttle.get_or_fail(1));
93 ASSERT_FALSE(throttle.get(1, throttle_max + 1));
94 ASSERT_EQ(throttle.put(throttle_max + 1), 0);
95 ASSERT_FALSE(throttle.get(0, throttle_max));
96 ASSERT_FALSE(throttle.get(throttle_max));
97 ASSERT_FALSE(throttle.get_or_fail(1));
98 ASSERT_EQ(throttle.put(throttle_max), 0);
99
100 useconds_t delay = 1;
101
102 bool waited;
103
104 do {
105 cout << "Trying (1) with delay " << delay << "us\n";
106
107 ASSERT_FALSE(throttle.get(throttle_max));
108 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
109
110 Thread_get t(throttle, 7);
111 t.create("t_throttle_1");
112 usleep(delay);
113 ASSERT_EQ(throttle.put(throttle_max), 0);
114 t.join();
115
116 if (!(waited = t.waited))
117 delay *= 2;
118 } while(!waited);
119
120 delay = 1;
121 do {
122 cout << "Trying (2) with delay " << delay << "us\n";
123
124 ASSERT_FALSE(throttle.get(throttle_max / 2));
125 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
126
127 Thread_get t(throttle, throttle_max);
128 t.create("t_throttle_2");
129 usleep(delay);
130
131 Thread_get u(throttle, 1);
132 u.create("u_throttle_2");
133 usleep(delay);
134
135 throttle.put(throttle_max / 2);
136
137 t.join();
138 u.join();
139
140 if (!(waited = t.waited && u.waited))
141 delay *= 2;
142 } while(!waited);
143
144 }
145
146 TEST_F(ThrottleTest, get_or_fail) {
147 {
148 Throttle throttle(g_ceph_context, "throttle");
149
150 ASSERT_TRUE(throttle.get_or_fail(5));
151 ASSERT_TRUE(throttle.get_or_fail(5));
152 }
153
154 {
155 int64_t throttle_max = 10;
156 Throttle throttle(g_ceph_context, "throttle", throttle_max);
157
158 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
159 ASSERT_EQ(throttle.put(throttle_max), 0);
160
161 ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2));
162 ASSERT_FALSE(throttle.get_or_fail(1));
163 ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2));
164 ASSERT_EQ(throttle.put(throttle_max * 2), 0);
165
166 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
167 ASSERT_FALSE(throttle.get_or_fail(1));
168 ASSERT_EQ(throttle.put(throttle_max), 0);
169 }
170 }
171
172 TEST_F(ThrottleTest, wait) {
173 int64_t throttle_max = 10;
174 Throttle throttle(g_ceph_context, "throttle");
175
176 // test increasing max from 0 to throttle_max
177 {
178 ASSERT_FALSE(throttle.wait(throttle_max));
179 ASSERT_EQ(throttle.get_max(), throttle_max);
180 }
181
182 useconds_t delay = 1;
183
184 bool waited;
185
186 do {
187 cout << "Trying (3) with delay " << delay << "us\n";
188
189 ASSERT_FALSE(throttle.get(throttle_max / 2));
190 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
191
192 Thread_get t(throttle, throttle_max);
193 t.create("t_throttle_3");
194 usleep(delay);
195
196 //
197 // Throttle::_reset_max(int64_t m) used to contain a test
198 // that blocked the following statement, only if
199 // the argument was greater than throttle_max.
200 // Although a value lower than throttle_max would cover
201 // the same code in _reset_max, the throttle_max * 100
202 // value is left here to demonstrate that the problem
203 // has been solved.
204 //
205 throttle.wait(throttle_max * 100);
206 usleep(delay);
207 t.join();
208 ASSERT_EQ(throttle.get_current(), throttle_max / 2);
209
210 if (!(waited = t.waited)) {
211 delay *= 2;
212 // undo the changes we made
213 throttle.put(throttle_max / 2);
214 throttle.wait(throttle_max);
215 }
216 } while(!waited);
217 }
218
219 TEST_F(ThrottleTest, destructor) {
220 Thread_get *t;
221 {
222 int64_t throttle_max = 10;
223 Throttle *throttle = new Throttle(g_ceph_context, "throttle", throttle_max);
224
225 ASSERT_FALSE(throttle->get(5));
226
227 t = new Thread_get(*throttle, 7);
228 t->create("t_throttle");
229 bool blocked;
230 useconds_t delay = 1;
231 do {
232 usleep(delay);
233 if (throttle->get_or_fail(1)) {
234 throttle->put(1);
235 blocked = false;
236 } else {
237 blocked = true;
238 }
239 delay *= 2;
240 } while(!blocked);
241 delete throttle;
242 }
243
244 { //
245 // The thread is left hanging, otherwise it will abort().
246 // Deleting the Throttle on which it is waiting creates a
247 // inconsistency that will be detected: the Throttle object that
248 // it references no longer exists.
249 //
250 pthread_t id = t->get_thread_id();
251 ASSERT_EQ(pthread_kill(id, 0), 0);
252 delete t;
253 ASSERT_EQ(pthread_kill(id, 0), 0);
254 }
255 }
256
257 std::pair<double, std::chrono::duration<double> > test_backoff(
258 double low_threshhold,
259 double high_threshhold,
260 double expected_throughput,
261 double high_multiple,
262 double max_multiple,
263 uint64_t max,
264 double put_delay_per_count,
265 unsigned getters,
266 unsigned putters)
267 {
268 std::mutex l;
269 std::condition_variable c;
270 uint64_t total = 0;
271 std::list<uint64_t> in_queue;
272 bool stop_getters = false;
273 bool stop_putters = false;
274
275 auto wait_time = std::chrono::duration<double>(0);
276 uint64_t waits = 0;
277
278 uint64_t total_observed_total = 0;
279 uint64_t total_observations = 0;
280
281 BackoffThrottle throttle(g_ceph_context, "backoff_throttle_test", 5);
282 bool valid = throttle.set_params(
283 low_threshhold,
284 high_threshhold,
285 expected_throughput,
286 high_multiple,
287 max_multiple,
288 max,
289 0);
290 assert(valid);
291
292 auto getter = [&]() {
293 std::random_device rd;
294 std::mt19937 gen(rd());
295 std::uniform_int_distribution<> dis(0, 10);
296
297 std::unique_lock<std::mutex> g(l);
298 while (!stop_getters) {
299 g.unlock();
300
301 uint64_t to_get = dis(gen);
302 auto waited = throttle.get(to_get);
303
304 g.lock();
305 wait_time += waited;
306 waits += to_get;
307 total += to_get;
308 in_queue.push_back(to_get);
309 c.notify_one();
310 }
311 };
312
313 auto putter = [&]() {
314 std::unique_lock<std::mutex> g(l);
315 while (!stop_putters || !in_queue.empty()) {
316 if (in_queue.empty()) {
317 c.wait(g);
318 continue;
319 }
320
321 uint64_t c = in_queue.front();
322
323 total_observed_total += total;
324 total_observations++;
325 in_queue.pop_front();
326 assert(total <= max);
327
328 g.unlock();
329 std::this_thread::sleep_for(
330 c * std::chrono::duration<double>(put_delay_per_count*putters));
331 g.lock();
332
333 total -= c;
334 throttle.put(c);
335 }
336 };
337
338 vector<std::thread> gts(getters);
339 for (auto &&i: gts) i = std::thread(getter);
340
341 vector<std::thread> pts(putters);
342 for (auto &&i: pts) i = std::thread(putter);
343
344 std::this_thread::sleep_for(std::chrono::duration<double>(5));
345 {
346 std::unique_lock<std::mutex> g(l);
347 stop_getters = true;
348 c.notify_all();
349 }
350 for (auto &&i: gts) i.join();
351 gts.clear();
352
353 {
354 std::unique_lock<std::mutex> g(l);
355 stop_putters = true;
356 c.notify_all();
357 }
358 for (auto &&i: pts) i.join();
359 pts.clear();
360
361 return make_pair(
362 ((double)total_observed_total)/((double)total_observations),
363 wait_time / waits);
364 }
365
366 TEST(BackoffThrottle, undersaturated)
367 {
368 auto results = test_backoff(
369 0.4,
370 0.6,
371 1000,
372 2,
373 10,
374 100,
375 0.0001,
376 3,
377 6);
378 ASSERT_LT(results.first, 45);
379 ASSERT_GT(results.first, 35);
380 ASSERT_LT(results.second.count(), 0.0002);
381 ASSERT_GT(results.second.count(), 0.00005);
382 }
383
384 TEST(BackoffThrottle, balanced)
385 {
386 auto results = test_backoff(
387 0.4,
388 0.6,
389 1000,
390 2,
391 10,
392 100,
393 0.001,
394 7,
395 2);
396 ASSERT_LT(results.first, 60);
397 ASSERT_GT(results.first, 40);
398 ASSERT_LT(results.second.count(), 0.002);
399 ASSERT_GT(results.second.count(), 0.0005);
400 }
401
402 TEST(BackoffThrottle, oversaturated)
403 {
404 auto results = test_backoff(
405 0.4,
406 0.6,
407 10000000,
408 2,
409 10,
410 100,
411 0.001,
412 1,
413 3);
414 ASSERT_LT(results.first, 101);
415 ASSERT_GT(results.first, 85);
416 ASSERT_LT(results.second.count(), 0.002);
417 ASSERT_GT(results.second.count(), 0.0005);
418 }
419
420 /*
421 * Local Variables:
422 * compile-command: "cd ../.. ;
423 * make unittest_throttle ;
424 * ./unittest_throttle # --gtest_filter=ThrottleTest.destructor \
425 * --log-to-stderr=true --debug-filestore=20
426 * "
427 * End:
428 */
429