]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/common/Throttle.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / common / Throttle.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Library Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library Public License for more details.
19 *
20 */
21
22#include <stdio.h>
23#include <signal.h>
11fdf7f2
TL
24
25#include <chrono>
26#include <list>
27#include <mutex>
28#include <random>
29#include <thread>
30
7c673cae 31#include "gtest/gtest.h"
7c673cae
FG
32#include "common/Thread.h"
33#include "common/Throttle.h"
34#include "common/ceph_argparse.h"
7c673cae 35
20effc67
TL
36using namespace std;
37
7c673cae
FG
38class ThrottleTest : public ::testing::Test {
39protected:
40
41 class Thread_get : public Thread {
42 public:
43 Throttle &throttle;
44 int64_t count;
11fdf7f2 45 bool waited = false;
7c673cae
FG
46
47 Thread_get(Throttle& _throttle, int64_t _count) :
11fdf7f2 48 throttle(_throttle), count(_count) {}
7c673cae
FG
49
50 void *entry() override {
51 usleep(5);
52 waited = throttle.get(count);
53 throttle.put(count);
11fdf7f2 54 return nullptr;
7c673cae
FG
55 }
56 };
7c673cae
FG
57};
58
59TEST_F(ThrottleTest, Throttle) {
60 int64_t throttle_max = 10;
61 Throttle throttle(g_ceph_context, "throttle", throttle_max);
62 ASSERT_EQ(throttle.get_max(), throttle_max);
63 ASSERT_EQ(throttle.get_current(), 0);
64}
65
66TEST_F(ThrottleTest, take) {
67 int64_t throttle_max = 10;
68 Throttle throttle(g_ceph_context, "throttle", throttle_max);
69 ASSERT_EQ(throttle.take(throttle_max), throttle_max);
70 ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2);
71}
72
73TEST_F(ThrottleTest, get) {
74 int64_t throttle_max = 10;
75 Throttle throttle(g_ceph_context, "throttle");
76
77 // test increasing max from 0 to throttle_max
78 {
79 ASSERT_FALSE(throttle.get(throttle_max, throttle_max));
80 ASSERT_EQ(throttle.get_max(), throttle_max);
81 ASSERT_EQ(throttle.put(throttle_max), 0);
82 }
83
84 ASSERT_FALSE(throttle.get(5));
85 ASSERT_EQ(throttle.put(5), 0);
86
87 ASSERT_FALSE(throttle.get(throttle_max));
88 ASSERT_FALSE(throttle.get_or_fail(1));
89 ASSERT_FALSE(throttle.get(1, throttle_max + 1));
90 ASSERT_EQ(throttle.put(throttle_max + 1), 0);
91 ASSERT_FALSE(throttle.get(0, throttle_max));
92 ASSERT_FALSE(throttle.get(throttle_max));
93 ASSERT_FALSE(throttle.get_or_fail(1));
94 ASSERT_EQ(throttle.put(throttle_max), 0);
95
96 useconds_t delay = 1;
97
98 bool waited;
99
100 do {
101 cout << "Trying (1) with delay " << delay << "us\n";
102
103 ASSERT_FALSE(throttle.get(throttle_max));
104 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
105
106 Thread_get t(throttle, 7);
107 t.create("t_throttle_1");
108 usleep(delay);
109 ASSERT_EQ(throttle.put(throttle_max), 0);
110 t.join();
111
112 if (!(waited = t.waited))
113 delay *= 2;
114 } while(!waited);
115
116 delay = 1;
117 do {
118 cout << "Trying (2) with delay " << delay << "us\n";
119
120 ASSERT_FALSE(throttle.get(throttle_max / 2));
121 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
122
123 Thread_get t(throttle, throttle_max);
124 t.create("t_throttle_2");
125 usleep(delay);
126
127 Thread_get u(throttle, 1);
128 u.create("u_throttle_2");
129 usleep(delay);
130
131 throttle.put(throttle_max / 2);
132
133 t.join();
134 u.join();
135
136 if (!(waited = t.waited && u.waited))
137 delay *= 2;
138 } while(!waited);
139
140}
141
142TEST_F(ThrottleTest, get_or_fail) {
143 {
144 Throttle throttle(g_ceph_context, "throttle");
145
146 ASSERT_TRUE(throttle.get_or_fail(5));
147 ASSERT_TRUE(throttle.get_or_fail(5));
148 }
149
150 {
151 int64_t throttle_max = 10;
152 Throttle throttle(g_ceph_context, "throttle", throttle_max);
153
154 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
155 ASSERT_EQ(throttle.put(throttle_max), 0);
156
157 ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2));
158 ASSERT_FALSE(throttle.get_or_fail(1));
159 ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2));
160 ASSERT_EQ(throttle.put(throttle_max * 2), 0);
161
162 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
163 ASSERT_FALSE(throttle.get_or_fail(1));
164 ASSERT_EQ(throttle.put(throttle_max), 0);
165 }
166}
167
168TEST_F(ThrottleTest, wait) {
169 int64_t throttle_max = 10;
170 Throttle throttle(g_ceph_context, "throttle");
171
172 // test increasing max from 0 to throttle_max
173 {
174 ASSERT_FALSE(throttle.wait(throttle_max));
175 ASSERT_EQ(throttle.get_max(), throttle_max);
176 }
177
178 useconds_t delay = 1;
179
180 bool waited;
181
182 do {
183 cout << "Trying (3) with delay " << delay << "us\n";
184
185 ASSERT_FALSE(throttle.get(throttle_max / 2));
186 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
187
188 Thread_get t(throttle, throttle_max);
189 t.create("t_throttle_3");
190 usleep(delay);
191
192 //
193 // Throttle::_reset_max(int64_t m) used to contain a test
194 // that blocked the following statement, only if
195 // the argument was greater than throttle_max.
196 // Although a value lower than throttle_max would cover
197 // the same code in _reset_max, the throttle_max * 100
198 // value is left here to demonstrate that the problem
199 // has been solved.
200 //
201 throttle.wait(throttle_max * 100);
202 usleep(delay);
203 t.join();
204 ASSERT_EQ(throttle.get_current(), throttle_max / 2);
205
206 if (!(waited = t.waited)) {
207 delay *= 2;
208 // undo the changes we made
209 throttle.put(throttle_max / 2);
210 throttle.wait(throttle_max);
211 }
212 } while(!waited);
213}
214
7c673cae
FG
215std::pair<double, std::chrono::duration<double> > test_backoff(
216 double low_threshhold,
217 double high_threshhold,
218 double expected_throughput,
219 double high_multiple,
220 double max_multiple,
221 uint64_t max,
222 double put_delay_per_count,
223 unsigned getters,
224 unsigned putters)
225{
226 std::mutex l;
227 std::condition_variable c;
228 uint64_t total = 0;
229 std::list<uint64_t> in_queue;
230 bool stop_getters = false;
231 bool stop_putters = false;
232
233 auto wait_time = std::chrono::duration<double>(0);
234 uint64_t waits = 0;
235
236 uint64_t total_observed_total = 0;
237 uint64_t total_observations = 0;
238
239 BackoffThrottle throttle(g_ceph_context, "backoff_throttle_test", 5);
240 bool valid = throttle.set_params(
241 low_threshhold,
242 high_threshhold,
243 expected_throughput,
244 high_multiple,
245 max_multiple,
246 max,
247 0);
11fdf7f2 248 ceph_assert(valid);
7c673cae
FG
249
250 auto getter = [&]() {
251 std::random_device rd;
252 std::mt19937 gen(rd());
253 std::uniform_int_distribution<> dis(0, 10);
254
255 std::unique_lock<std::mutex> g(l);
256 while (!stop_getters) {
257 g.unlock();
258
259 uint64_t to_get = dis(gen);
260 auto waited = throttle.get(to_get);
261
262 g.lock();
263 wait_time += waited;
264 waits += to_get;
265 total += to_get;
266 in_queue.push_back(to_get);
267 c.notify_one();
268 }
269 };
270
271 auto putter = [&]() {
272 std::unique_lock<std::mutex> g(l);
273 while (!stop_putters || !in_queue.empty()) {
274 if (in_queue.empty()) {
275 c.wait(g);
276 continue;
277 }
278
279 uint64_t c = in_queue.front();
280
281 total_observed_total += total;
282 total_observations++;
283 in_queue.pop_front();
11fdf7f2 284 ceph_assert(total <= max);
7c673cae
FG
285
286 g.unlock();
287 std::this_thread::sleep_for(
288 c * std::chrono::duration<double>(put_delay_per_count*putters));
289 g.lock();
290
291 total -= c;
292 throttle.put(c);
293 }
294 };
295
296 vector<std::thread> gts(getters);
297 for (auto &&i: gts) i = std::thread(getter);
298
299 vector<std::thread> pts(putters);
300 for (auto &&i: pts) i = std::thread(putter);
301
302 std::this_thread::sleep_for(std::chrono::duration<double>(5));
303 {
304 std::unique_lock<std::mutex> g(l);
305 stop_getters = true;
306 c.notify_all();
307 }
308 for (auto &&i: gts) i.join();
309 gts.clear();
310
311 {
312 std::unique_lock<std::mutex> g(l);
313 stop_putters = true;
314 c.notify_all();
315 }
316 for (auto &&i: pts) i.join();
317 pts.clear();
318
319 return make_pair(
320 ((double)total_observed_total)/((double)total_observations),
321 wait_time / waits);
322}
323
324TEST(BackoffThrottle, undersaturated)
325{
326 auto results = test_backoff(
327 0.4,
328 0.6,
329 1000,
330 2,
331 10,
332 100,
333 0.0001,
334 3,
335 6);
336 ASSERT_LT(results.first, 45);
337 ASSERT_GT(results.first, 35);
338 ASSERT_LT(results.second.count(), 0.0002);
339 ASSERT_GT(results.second.count(), 0.00005);
340}
341
342TEST(BackoffThrottle, balanced)
343{
344 auto results = test_backoff(
345 0.4,
346 0.6,
347 1000,
348 2,
349 10,
350 100,
351 0.001,
352 7,
353 2);
354 ASSERT_LT(results.first, 60);
355 ASSERT_GT(results.first, 40);
356 ASSERT_LT(results.second.count(), 0.002);
357 ASSERT_GT(results.second.count(), 0.0005);
358}
359
360TEST(BackoffThrottle, oversaturated)
361{
362 auto results = test_backoff(
363 0.4,
364 0.6,
365 10000000,
366 2,
367 10,
368 100,
369 0.001,
370 1,
371 3);
372 ASSERT_LT(results.first, 101);
373 ASSERT_GT(results.first, 85);
374 ASSERT_LT(results.second.count(), 0.002);
375 ASSERT_GT(results.second.count(), 0.0005);
376}
377
378/*
379 * Local Variables:
380 * compile-command: "cd ../.. ;
381 * make unittest_throttle ;
11fdf7f2 382 * ./unittest_throttle # --gtest_filter=ThrottleTest.take \
7c673cae
FG
383 * --log-to-stderr=true --debug-filestore=20
384 * "
385 * End:
386 */