]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/common/Throttle.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / common / Throttle.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2013 Cloudwatt <libre.licensing@cloudwatt.com>
7 *
8 * Author: Loic Dachary <loic@dachary.org>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Library Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Library Public License for more details.
19 *
20 */
21
22#include <stdio.h>
23#include <signal.h>
11fdf7f2
TL
24
25#include <chrono>
26#include <list>
27#include <mutex>
28#include <random>
29#include <thread>
30
7c673cae 31#include "gtest/gtest.h"
7c673cae
FG
32#include "common/Thread.h"
33#include "common/Throttle.h"
34#include "common/ceph_argparse.h"
7c673cae
FG
35
36class ThrottleTest : public ::testing::Test {
37protected:
38
39 class Thread_get : public Thread {
40 public:
41 Throttle &throttle;
42 int64_t count;
11fdf7f2 43 bool waited = false;
7c673cae
FG
44
45 Thread_get(Throttle& _throttle, int64_t _count) :
11fdf7f2 46 throttle(_throttle), count(_count) {}
7c673cae
FG
47
48 void *entry() override {
49 usleep(5);
50 waited = throttle.get(count);
51 throttle.put(count);
11fdf7f2 52 return nullptr;
7c673cae
FG
53 }
54 };
7c673cae
FG
55};
56
57TEST_F(ThrottleTest, Throttle) {
58 int64_t throttle_max = 10;
59 Throttle throttle(g_ceph_context, "throttle", throttle_max);
60 ASSERT_EQ(throttle.get_max(), throttle_max);
61 ASSERT_EQ(throttle.get_current(), 0);
62}
63
64TEST_F(ThrottleTest, take) {
65 int64_t throttle_max = 10;
66 Throttle throttle(g_ceph_context, "throttle", throttle_max);
67 ASSERT_EQ(throttle.take(throttle_max), throttle_max);
68 ASSERT_EQ(throttle.take(throttle_max), throttle_max * 2);
69}
70
71TEST_F(ThrottleTest, get) {
72 int64_t throttle_max = 10;
73 Throttle throttle(g_ceph_context, "throttle");
74
75 // test increasing max from 0 to throttle_max
76 {
77 ASSERT_FALSE(throttle.get(throttle_max, throttle_max));
78 ASSERT_EQ(throttle.get_max(), throttle_max);
79 ASSERT_EQ(throttle.put(throttle_max), 0);
80 }
81
82 ASSERT_FALSE(throttle.get(5));
83 ASSERT_EQ(throttle.put(5), 0);
84
85 ASSERT_FALSE(throttle.get(throttle_max));
86 ASSERT_FALSE(throttle.get_or_fail(1));
87 ASSERT_FALSE(throttle.get(1, throttle_max + 1));
88 ASSERT_EQ(throttle.put(throttle_max + 1), 0);
89 ASSERT_FALSE(throttle.get(0, throttle_max));
90 ASSERT_FALSE(throttle.get(throttle_max));
91 ASSERT_FALSE(throttle.get_or_fail(1));
92 ASSERT_EQ(throttle.put(throttle_max), 0);
93
94 useconds_t delay = 1;
95
96 bool waited;
97
98 do {
99 cout << "Trying (1) with delay " << delay << "us\n";
100
101 ASSERT_FALSE(throttle.get(throttle_max));
102 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
103
104 Thread_get t(throttle, 7);
105 t.create("t_throttle_1");
106 usleep(delay);
107 ASSERT_EQ(throttle.put(throttle_max), 0);
108 t.join();
109
110 if (!(waited = t.waited))
111 delay *= 2;
112 } while(!waited);
113
114 delay = 1;
115 do {
116 cout << "Trying (2) with delay " << delay << "us\n";
117
118 ASSERT_FALSE(throttle.get(throttle_max / 2));
119 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
120
121 Thread_get t(throttle, throttle_max);
122 t.create("t_throttle_2");
123 usleep(delay);
124
125 Thread_get u(throttle, 1);
126 u.create("u_throttle_2");
127 usleep(delay);
128
129 throttle.put(throttle_max / 2);
130
131 t.join();
132 u.join();
133
134 if (!(waited = t.waited && u.waited))
135 delay *= 2;
136 } while(!waited);
137
138}
139
140TEST_F(ThrottleTest, get_or_fail) {
141 {
142 Throttle throttle(g_ceph_context, "throttle");
143
144 ASSERT_TRUE(throttle.get_or_fail(5));
145 ASSERT_TRUE(throttle.get_or_fail(5));
146 }
147
148 {
149 int64_t throttle_max = 10;
150 Throttle throttle(g_ceph_context, "throttle", throttle_max);
151
152 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
153 ASSERT_EQ(throttle.put(throttle_max), 0);
154
155 ASSERT_TRUE(throttle.get_or_fail(throttle_max * 2));
156 ASSERT_FALSE(throttle.get_or_fail(1));
157 ASSERT_FALSE(throttle.get_or_fail(throttle_max * 2));
158 ASSERT_EQ(throttle.put(throttle_max * 2), 0);
159
160 ASSERT_TRUE(throttle.get_or_fail(throttle_max));
161 ASSERT_FALSE(throttle.get_or_fail(1));
162 ASSERT_EQ(throttle.put(throttle_max), 0);
163 }
164}
165
166TEST_F(ThrottleTest, wait) {
167 int64_t throttle_max = 10;
168 Throttle throttle(g_ceph_context, "throttle");
169
170 // test increasing max from 0 to throttle_max
171 {
172 ASSERT_FALSE(throttle.wait(throttle_max));
173 ASSERT_EQ(throttle.get_max(), throttle_max);
174 }
175
176 useconds_t delay = 1;
177
178 bool waited;
179
180 do {
181 cout << "Trying (3) with delay " << delay << "us\n";
182
183 ASSERT_FALSE(throttle.get(throttle_max / 2));
184 ASSERT_FALSE(throttle.get_or_fail(throttle_max));
185
186 Thread_get t(throttle, throttle_max);
187 t.create("t_throttle_3");
188 usleep(delay);
189
190 //
191 // Throttle::_reset_max(int64_t m) used to contain a test
192 // that blocked the following statement, only if
193 // the argument was greater than throttle_max.
194 // Although a value lower than throttle_max would cover
195 // the same code in _reset_max, the throttle_max * 100
196 // value is left here to demonstrate that the problem
197 // has been solved.
198 //
199 throttle.wait(throttle_max * 100);
200 usleep(delay);
201 t.join();
202 ASSERT_EQ(throttle.get_current(), throttle_max / 2);
203
204 if (!(waited = t.waited)) {
205 delay *= 2;
206 // undo the changes we made
207 throttle.put(throttle_max / 2);
208 throttle.wait(throttle_max);
209 }
210 } while(!waited);
211}
212
7c673cae
FG
213std::pair<double, std::chrono::duration<double> > test_backoff(
214 double low_threshhold,
215 double high_threshhold,
216 double expected_throughput,
217 double high_multiple,
218 double max_multiple,
219 uint64_t max,
220 double put_delay_per_count,
221 unsigned getters,
222 unsigned putters)
223{
224 std::mutex l;
225 std::condition_variable c;
226 uint64_t total = 0;
227 std::list<uint64_t> in_queue;
228 bool stop_getters = false;
229 bool stop_putters = false;
230
231 auto wait_time = std::chrono::duration<double>(0);
232 uint64_t waits = 0;
233
234 uint64_t total_observed_total = 0;
235 uint64_t total_observations = 0;
236
237 BackoffThrottle throttle(g_ceph_context, "backoff_throttle_test", 5);
238 bool valid = throttle.set_params(
239 low_threshhold,
240 high_threshhold,
241 expected_throughput,
242 high_multiple,
243 max_multiple,
244 max,
245 0);
11fdf7f2 246 ceph_assert(valid);
7c673cae
FG
247
248 auto getter = [&]() {
249 std::random_device rd;
250 std::mt19937 gen(rd());
251 std::uniform_int_distribution<> dis(0, 10);
252
253 std::unique_lock<std::mutex> g(l);
254 while (!stop_getters) {
255 g.unlock();
256
257 uint64_t to_get = dis(gen);
258 auto waited = throttle.get(to_get);
259
260 g.lock();
261 wait_time += waited;
262 waits += to_get;
263 total += to_get;
264 in_queue.push_back(to_get);
265 c.notify_one();
266 }
267 };
268
269 auto putter = [&]() {
270 std::unique_lock<std::mutex> g(l);
271 while (!stop_putters || !in_queue.empty()) {
272 if (in_queue.empty()) {
273 c.wait(g);
274 continue;
275 }
276
277 uint64_t c = in_queue.front();
278
279 total_observed_total += total;
280 total_observations++;
281 in_queue.pop_front();
11fdf7f2 282 ceph_assert(total <= max);
7c673cae
FG
283
284 g.unlock();
285 std::this_thread::sleep_for(
286 c * std::chrono::duration<double>(put_delay_per_count*putters));
287 g.lock();
288
289 total -= c;
290 throttle.put(c);
291 }
292 };
293
294 vector<std::thread> gts(getters);
295 for (auto &&i: gts) i = std::thread(getter);
296
297 vector<std::thread> pts(putters);
298 for (auto &&i: pts) i = std::thread(putter);
299
300 std::this_thread::sleep_for(std::chrono::duration<double>(5));
301 {
302 std::unique_lock<std::mutex> g(l);
303 stop_getters = true;
304 c.notify_all();
305 }
306 for (auto &&i: gts) i.join();
307 gts.clear();
308
309 {
310 std::unique_lock<std::mutex> g(l);
311 stop_putters = true;
312 c.notify_all();
313 }
314 for (auto &&i: pts) i.join();
315 pts.clear();
316
317 return make_pair(
318 ((double)total_observed_total)/((double)total_observations),
319 wait_time / waits);
320}
321
322TEST(BackoffThrottle, undersaturated)
323{
324 auto results = test_backoff(
325 0.4,
326 0.6,
327 1000,
328 2,
329 10,
330 100,
331 0.0001,
332 3,
333 6);
334 ASSERT_LT(results.first, 45);
335 ASSERT_GT(results.first, 35);
336 ASSERT_LT(results.second.count(), 0.0002);
337 ASSERT_GT(results.second.count(), 0.00005);
338}
339
340TEST(BackoffThrottle, balanced)
341{
342 auto results = test_backoff(
343 0.4,
344 0.6,
345 1000,
346 2,
347 10,
348 100,
349 0.001,
350 7,
351 2);
352 ASSERT_LT(results.first, 60);
353 ASSERT_GT(results.first, 40);
354 ASSERT_LT(results.second.count(), 0.002);
355 ASSERT_GT(results.second.count(), 0.0005);
356}
357
358TEST(BackoffThrottle, oversaturated)
359{
360 auto results = test_backoff(
361 0.4,
362 0.6,
363 10000000,
364 2,
365 10,
366 100,
367 0.001,
368 1,
369 3);
370 ASSERT_LT(results.first, 101);
371 ASSERT_GT(results.first, 85);
372 ASSERT_LT(results.second.count(), 0.002);
373 ASSERT_GT(results.second.count(), 0.0005);
374}
375
376/*
377 * Local Variables:
378 * compile-command: "cd ../.. ;
379 * make unittest_throttle ;
11fdf7f2 380 * ./unittest_throttle # --gtest_filter=ThrottleTest.take \
7c673cae
FG
381 * --log-to-stderr=true --debug-filestore=20
382 * "
383 * End:
384 */