]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/rate_limiter_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / util / rate_limiter_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "util/rate_limiter.h"
11
12 #include <chrono>
13 #include <cinttypes>
14 #include <limits>
15
16 #include "db/db_test_util.h"
17 #include "rocksdb/env.h"
18 #include "test_util/sync_point.h"
19 #include "test_util/testharness.h"
20 #include "util/random.h"
21
22 namespace ROCKSDB_NAMESPACE {
23
24 // TODO(yhchiang): the rate will not be accurate when we run test in parallel.
25 class RateLimiterTest : public testing::Test {};
26
27 TEST_F(RateLimiterTest, OverflowRate) {
28 GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
29 RateLimiter::Mode::kWritesOnly, Env::Default(),
30 false /* auto_tuned */);
31 ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
32 }
33
34 TEST_F(RateLimiterTest, StartStop) {
35 std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
36 }
37
38 TEST_F(RateLimiterTest, Modes) {
39 for (auto mode : {RateLimiter::Mode::kWritesOnly,
40 RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
41 GenericRateLimiter limiter(
42 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
43 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
44 limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
45 RateLimiter::OpType::kRead);
46 if (mode == RateLimiter::Mode::kWritesOnly) {
47 ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH));
48 } else {
49 ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
50 }
51
52 limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
53 RateLimiter::OpType::kWrite);
54 if (mode == RateLimiter::Mode::kAllIo) {
55 ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
56 } else {
57 ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
58 }
59 }
60 }
61
62 #if !((defined(TRAVIS) || defined(CIRCLECI)) && defined(OS_MACOSX))
63 TEST_F(RateLimiterTest, Rate) {
64 auto* env = Env::Default();
65 struct Arg {
66 Arg(int32_t _target_rate, int _burst)
67 : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
68 request_size(_target_rate / 10),
69 burst(_burst) {}
70 std::unique_ptr<RateLimiter> limiter;
71 int32_t request_size;
72 int burst;
73 };
74
75 auto writer = [](void* p) {
76 auto* thread_env = Env::Default();
77 auto* arg = static_cast<Arg*>(p);
78 // Test for 2 seconds
79 auto until = thread_env->NowMicros() + 2 * 1000000;
80 Random r((uint32_t)(thread_env->NowNanos() %
81 std::numeric_limits<uint32_t>::max()));
82 while (thread_env->NowMicros() < until) {
83 for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
84 arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
85 Env::IO_HIGH, nullptr /* stats */,
86 RateLimiter::OpType::kWrite);
87 }
88 arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
89 nullptr /* stats */, RateLimiter::OpType::kWrite);
90 }
91 };
92
93 for (int i = 1; i <= 16; i *= 2) {
94 int32_t target = i * 1024 * 10;
95 Arg arg(target, i / 4 + 1);
96 int64_t old_total_bytes_through = 0;
97 for (int iter = 1; iter <= 2; ++iter) {
98 // second iteration changes the target dynamically
99 if (iter == 2) {
100 target *= 2;
101 arg.limiter->SetBytesPerSecond(target);
102 }
103 auto start = env->NowMicros();
104 for (int t = 0; t < i; ++t) {
105 env->StartThread(writer, &arg);
106 }
107 env->WaitForJoin();
108
109 auto elapsed = env->NowMicros() - start;
110 double rate =
111 (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
112 1000000.0 / elapsed;
113 old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
114 fprintf(stderr,
115 "request size [1 - %" PRIi32 "], limit %" PRIi32
116 " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
117 arg.request_size - 1, target / 1024, rate / 1024,
118 elapsed / 1000000.0);
119
120 ASSERT_GE(rate / target, 0.80);
121 ASSERT_LE(rate / target, 1.25);
122 }
123 }
124 }
125 #endif
126
127 TEST_F(RateLimiterTest, LimitChangeTest) {
128 // starvation test when limit changes to a smaller value
129 int64_t refill_period = 1000 * 1000;
130 auto* env = Env::Default();
131 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
132 struct Arg {
133 Arg(int32_t _request_size, Env::IOPriority _pri,
134 std::shared_ptr<RateLimiter> _limiter)
135 : request_size(_request_size), pri(_pri), limiter(_limiter) {}
136 int32_t request_size;
137 Env::IOPriority pri;
138 std::shared_ptr<RateLimiter> limiter;
139 };
140
141 auto writer = [](void* p) {
142 auto* arg = static_cast<Arg*>(p);
143 arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */,
144 RateLimiter::OpType::kWrite);
145 };
146
147 for (uint32_t i = 1; i <= 16; i <<= 1) {
148 int32_t target = i * 1024 * 10;
149 // refill per second
150 for (int iter = 0; iter < 2; iter++) {
151 std::shared_ptr<RateLimiter> limiter =
152 std::make_shared<GenericRateLimiter>(
153 target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
154 Env::Default(), false /* auto_tuned */);
155 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
156 {{"GenericRateLimiter::Request",
157 "RateLimiterTest::LimitChangeTest:changeLimitStart"},
158 {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
159 "GenericRateLimiter::Refill"}});
160 Arg arg(target, Env::IO_HIGH, limiter);
161 // The idea behind is to start a request first, then before it refills,
162 // update limit to a different value (2X/0.5X). No starvation should
163 // be guaranteed under any situation
164 // TODO(lightmark): more test cases are welcome.
165 env->StartThread(writer, &arg);
166 int32_t new_limit = (target << 1) >> (iter << 1);
167 TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
168 arg.limiter->SetBytesPerSecond(new_limit);
169 TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
170 env->WaitForJoin();
171 fprintf(stderr,
172 "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
173 "KB/sec, refill period %" PRIi64 " ms\n",
174 target / 1024, new_limit / 1024, refill_period / 1000);
175 }
176 }
177 }
178
179 TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
180 const std::chrono::seconds kTimePerRefill(1);
181 const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
182
183 SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true);
184
185 auto stats = CreateDBStatistics();
186 std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
187 1000 /* rate_bytes_per_sec */,
188 std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
189 RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
190
191 // Use callback to advance time because we need to advance (1) after Request()
192 // has determined the bytes are not available; and (2) before Refill()
193 // computes the next refill time (ensuring refill time in the future allows
194 // the next request to drain the rate limiter).
195 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
196 "GenericRateLimiter::Refill", [&](void* /*arg*/) {
197 special_env.SleepForMicroseconds(static_cast<int>(
198 std::chrono::microseconds(kTimePerRefill).count()));
199 });
200 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
201
202 // verify rate limit increases after a sequence of periods where rate limiter
203 // is always drained
204 int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
205 rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
206 RateLimiter::OpType::kWrite);
207 while (std::chrono::microseconds(special_env.NowMicros()) <=
208 kRefillsPerTune * kTimePerRefill) {
209 rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
210 RateLimiter::OpType::kWrite);
211 }
212 int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
213 ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
214
215 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
216
217 // decreases after a sequence of periods where rate limiter is not drained
218 orig_bytes_per_sec = new_bytes_per_sec;
219 special_env.SleepForMicroseconds(static_cast<int>(
220 kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
221 // make a request so tuner can be triggered
222 rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),
223 RateLimiter::OpType::kWrite);
224 new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
225 ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
226 }
227
228 } // namespace ROCKSDB_NAMESPACE
229
230 int main(int argc, char** argv) {
231 ::testing::InitGoogleTest(&argc, argv);
232 return RUN_ALL_TESTS();
233 }