]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/common/Throttle.cc
update sources to v12.1.2
[ceph.git] / ceph / src / test / common / Throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Library Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library Public License for more details.
19 *
20 */
21
22 #include <stdio.h>
23 #include <signal.h>
24 #include "gtest/gtest.h"
25 #include "common/Mutex.h"
26 #include "common/Thread.h"
27 #include "common/Throttle.h"
28 #include "common/ceph_argparse.h"
29 #include "common/backport14.h"
30
31 #include <thread>
32 #include <atomic>
33 #include <chrono>
34 #include <mutex>
35 #include <list>
36 #include <random>
37
38 class ThrottleTest : public ::testing::Test {
39 protected:
40
41 class Thread_get : public Thread {
42 public:
43 Throttle &throttle;
44 int64_t count;
45 bool waited;
46
47 Thread_get(Throttle& _throttle, int64_t _count) :
48 throttle(_throttle),
49 count(_count),
50 waited(false)
51 {
52 }
53
54 void *entry() override {
55 usleep(5);
56 waited = throttle.get(count);
57 throttle.put(count);
58 return NULL;
59 }
60 };
61 };
62
63 TEST_F(ThrottleTest, Throttle) {
64 int64_t throttle_max = 10;
65 Throttle throttle(g_ceph_context, "throttle", throttle_max);
66 ASSERT_EQ(throttle.get_max(), throttle_max);
67 ASSERT_EQ(throttle.get_current(), 0);
68 }
69
70 TEST_F(ThrottleTest, take) {
71 int64_t throttle_max = 10;
72 Throttle throttle(g_ceph_context, "throttle", throttle_max);
73 ASSERT_EQ(throttle.take(throttle_max), throttle_max);
74 ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2);
75 }
76
77 TEST_F(ThrottleTest, get) {
78 int64_t throttle_max = 10;
79 Throttle throttle(g_ceph_context, "throttle");
80
81 // test increasing max from 0 to throttle_max
82 {
83 ASSERT_FALSE(throttle.get(throttle_max, throttle_max));
84 ASSERT_EQ(throttle.get_max(), throttle_max);
85 ASSERT_EQ(throttle.put(throttle_max), 0);
86 }
87
88 ASSERT_FALSE(throttle.get(5));
89 ASSERT_EQ(throttle.put(5), 0);
90
91 ASSERT_FALSE(throttle.get(throttle_max));
92 ASSERT_FALSE(throttle.get_or_fail(1));
93 ASSERT_FALSE(throttle.get(1, throttle_max + 1));
94 ASSERT_EQ(throttle.put(throttle_max + 1), 0);
95 ASSERT_FALSE(throttle.get(0, throttle_max));
96 ASSERT_FALSE(throttle.get(throttle_max));
97 ASSERT_FALSE(throttle.get_or_fail(1));
98 ASSERT_EQ(throttle.put(throttle_max), 0);
99
100 useconds_t delay = 1;
101
102 bool waited;
103
104 do {
105 cout << "Trying (1) with delay " << delay << "us\n";
106
107 ASSERT_FALSE(throttle.get(throttle_max));
108 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
109
110 Thread_get t(throttle, 7);
111 t.create("t_throttle_1");
112 usleep(delay);
113 ASSERT_EQ(throttle.put(throttle_max), 0);
114 t.join();
115
116 if (!(waited = t.waited))
117 delay *= 2;
118 } while(!waited);
119
120 delay = 1;
121 do {
122 cout << "Trying (2) with delay " << delay << "us\n";
123
124 ASSERT_FALSE(throttle.get(throttle_max / 2));
125 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
126
127 Thread_get t(throttle, throttle_max);
128 t.create("t_throttle_2");
129 usleep(delay);
130
131 Thread_get u(throttle, 1);
132 u.create("u_throttle_2");
133 usleep(delay);
134
135 throttle.put(throttle_max / 2);
136
137 t.join();
138 u.join();
139
140 if (!(waited = t.waited && u.waited))
141 delay *= 2;
142 } while(!waited);
143
144 }
145
146 TEST_F(ThrottleTest, get_or_fail) {
147 {
148 Throttle throttle(g_ceph_context, "throttle");
149
150 ASSERT_TRUE(throttle.get_or_fail(5));
151 ASSERT_TRUE(throttle.get_or_fail(5));
152 }
153
154 {
155 int64_t throttle_max = 10;
156 Throttle throttle(g_ceph_context, "throttle", throttle_max);
157
158 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
159 ASSERT_EQ(throttle.put(throttle_max), 0);
160
161 ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2));
162 ASSERT_FALSE(throttle.get_or_fail(1));
163 ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2));
164 ASSERT_EQ(throttle.put(throttle_max * 2), 0);
165
166 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
167 ASSERT_FALSE(throttle.get_or_fail(1));
168 ASSERT_EQ(throttle.put(throttle_max), 0);
169 }
170 }
171
172 TEST_F(ThrottleTest, wait) {
173 int64_t throttle_max = 10;
174 Throttle throttle(g_ceph_context, "throttle");
175
176 // test increasing max from 0 to throttle_max
177 {
178 ASSERT_FALSE(throttle.wait(throttle_max));
179 ASSERT_EQ(throttle.get_max(), throttle_max);
180 }
181
182 useconds_t delay = 1;
183
184 bool waited;
185
186 do {
187 cout << "Trying (3) with delay " << delay << "us\n";
188
189 ASSERT_FALSE(throttle.get(throttle_max / 2));
190 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
191
192 Thread_get t(throttle, throttle_max);
193 t.create("t_throttle_3");
194 usleep(delay);
195
196 //
197 // Throttle::_reset_max(int64_t m) used to contain a test
198 // that blocked the following statement, only if
199 // the argument was greater than throttle_max.
200 // Although a value lower than throttle_max would cover
201 // the same code in _reset_max, the throttle_max * 100
202 // value is left here to demonstrate that the problem
203 // has been solved.
204 //
205 throttle.wait(throttle_max * 100);
206 usleep(delay);
207 t.join();
208 ASSERT_EQ(throttle.get_current(), throttle_max / 2);
209
210 if (!(waited = t.waited)) {
211 delay *= 2;
212 // undo the changes we made
213 throttle.put(throttle_max / 2);
214 throttle.wait(throttle_max);
215 }
216 } while(!waited);
217 }
218
219
220 TEST_F(ThrottleTest, destructor) {
221 EXPECT_DEATH({
222 int64_t throttle_max = 10;
223 auto throttle = ceph::make_unique<Throttle>(g_ceph_context, "throttle",
224 throttle_max);
225
226
227 ASSERT_FALSE(throttle->get(5));
228 unique_ptr<Thread_get> t = ceph::make_unique<Thread_get>(*throttle, 7);
229 t->create("t_throttle");
230 bool blocked;
231 useconds_t delay = 1;
232 do {
233 usleep(delay);
234 if (throttle->get_or_fail(1)) {
235 throttle->put(1);
236 blocked = false;
237 } else {
238 blocked = true;
239 }
240 delay *= 2;
241 } while (!blocked);
242 }, ".*");
243 }
244
245 std::pair<double, std::chrono::duration<double> > test_backoff(
246 double low_threshhold,
247 double high_threshhold,
248 double expected_throughput,
249 double high_multiple,
250 double max_multiple,
251 uint64_t max,
252 double put_delay_per_count,
253 unsigned getters,
254 unsigned putters)
255 {
256 std::mutex l;
257 std::condition_variable c;
258 uint64_t total = 0;
259 std::list<uint64_t> in_queue;
260 bool stop_getters = false;
261 bool stop_putters = false;
262
263 auto wait_time = std::chrono::duration<double>(0);
264 uint64_t waits = 0;
265
266 uint64_t total_observed_total = 0;
267 uint64_t total_observations = 0;
268
269 BackoffThrottle throttle(g_ceph_context, "backoff_throttle_test", 5);
270 bool valid = throttle.set_params(
271 low_threshhold,
272 high_threshhold,
273 expected_throughput,
274 high_multiple,
275 max_multiple,
276 max,
277 0);
278 assert(valid);
279
280 auto getter = [&]() {
281 std::random_device rd;
282 std::mt19937 gen(rd());
283 std::uniform_int_distribution<> dis(0, 10);
284
285 std::unique_lock<std::mutex> g(l);
286 while (!stop_getters) {
287 g.unlock();
288
289 uint64_t to_get = dis(gen);
290 auto waited = throttle.get(to_get);
291
292 g.lock();
293 wait_time += waited;
294 waits += to_get;
295 total += to_get;
296 in_queue.push_back(to_get);
297 c.notify_one();
298 }
299 };
300
301 auto putter = [&]() {
302 std::unique_lock<std::mutex> g(l);
303 while (!stop_putters || !in_queue.empty()) {
304 if (in_queue.empty()) {
305 c.wait(g);
306 continue;
307 }
308
309 uint64_t c = in_queue.front();
310
311 total_observed_total += total;
312 total_observations++;
313 in_queue.pop_front();
314 assert(total <= max);
315
316 g.unlock();
317 std::this_thread::sleep_for(
318 c * std::chrono::duration<double>(put_delay_per_count*putters));
319 g.lock();
320
321 total -= c;
322 throttle.put(c);
323 }
324 };
325
326 vector<std::thread> gts(getters);
327 for (auto &&i: gts) i = std::thread(getter);
328
329 vector<std::thread> pts(putters);
330 for (auto &&i: pts) i = std::thread(putter);
331
332 std::this_thread::sleep_for(std::chrono::duration<double>(5));
333 {
334 std::unique_lock<std::mutex> g(l);
335 stop_getters = true;
336 c.notify_all();
337 }
338 for (auto &&i: gts) i.join();
339 gts.clear();
340
341 {
342 std::unique_lock<std::mutex> g(l);
343 stop_putters = true;
344 c.notify_all();
345 }
346 for (auto &&i: pts) i.join();
347 pts.clear();
348
349 return make_pair(
350 ((double)total_observed_total)/((double)total_observations),
351 wait_time / waits);
352 }
353
354 TEST(BackoffThrottle, destruct) {
355 EXPECT_DEATH({
356 auto throttle = ceph::make_unique<BackoffThrottle>(
357 g_ceph_context, "destructor test", 10);
358 ASSERT_TRUE(throttle->set_params(0.4, 0.6, 1000, 2, 10, 6, nullptr));
359
360 throttle->get(5);
361 {
362 auto& t = *throttle;
363 std::thread([&t]() {
364 usleep(5);
365 t.get(6);
366 });
367 }
368 // No equivalent of get_or_fail()
369 std::this_thread::sleep_for(std::chrono::milliseconds(250));
370 }, ".*");
371 }
372
373 TEST(BackoffThrottle, undersaturated)
374 {
375 auto results = test_backoff(
376 0.4,
377 0.6,
378 1000,
379 2,
380 10,
381 100,
382 0.0001,
383 3,
384 6);
385 ASSERT_LT(results.first, 45);
386 ASSERT_GT(results.first, 35);
387 ASSERT_LT(results.second.count(), 0.0002);
388 ASSERT_GT(results.second.count(), 0.00005);
389 }
390
391 TEST(BackoffThrottle, balanced)
392 {
393 auto results = test_backoff(
394 0.4,
395 0.6,
396 1000,
397 2,
398 10,
399 100,
400 0.001,
401 7,
402 2);
403 ASSERT_LT(results.first, 60);
404 ASSERT_GT(results.first, 40);
405 ASSERT_LT(results.second.count(), 0.002);
406 ASSERT_GT(results.second.count(), 0.0005);
407 }
408
409 TEST(BackoffThrottle, oversaturated)
410 {
411 auto results = test_backoff(
412 0.4,
413 0.6,
414 10000000,
415 2,
416 10,
417 100,
418 0.001,
419 1,
420 3);
421 ASSERT_LT(results.first, 101);
422 ASSERT_GT(results.first, 85);
423 ASSERT_LT(results.second.count(), 0.002);
424 ASSERT_GT(results.second.count(), 0.0005);
425 }
426
427 TEST(OrderedThrottle, destruct) {
428 EXPECT_DEATH({
429 auto throttle = ceph::make_unique<OrderedThrottle>(1, false);
430 throttle->start_op(nullptr);
431 {
432 auto& t = *throttle;
433 std::thread([&t]() {
434 usleep(5);
435 t.start_op(nullptr);
436 });
437 }
438 // No equivalent of get_or_fail()
439 std::this_thread::sleep_for(std::chrono::milliseconds(250));
440 }, ".*");
441 }
442
443 /*
444 * Local Variables:
445 * compile-command: "cd ../.. ;
446 * make unittest_throttle ;
447 * ./unittest_throttle # --gtest_filter=ThrottleTest.destructor \
448 * --log-to-stderr=true --debug-filestore=20
449 * "
450 * End:
451 */