]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
7c673cae | 10 | #include "util/rate_limiter.h" |
11fdf7f2 | 11 | |
11fdf7f2 | 12 | #include <chrono> |
f67539c2 | 13 | #include <cinttypes> |
7c673cae | 14 | #include <limits> |
11fdf7f2 TL |
15 | |
16 | #include "db/db_test_util.h" | |
7c673cae | 17 | #include "rocksdb/env.h" |
f67539c2 TL |
18 | #include "test_util/sync_point.h" |
19 | #include "test_util/testharness.h" | |
7c673cae | 20 | #include "util/random.h" |
7c673cae | 21 | |
f67539c2 | 22 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
23 | |
24 | // TODO(yhchiang): the rate will not be accurate when we run test in parallel. | |
25 | class RateLimiterTest : public testing::Test {}; | |
26 | ||
27 | TEST_F(RateLimiterTest, OverflowRate) { | |
11fdf7f2 TL |
28 | GenericRateLimiter limiter(port::kMaxInt64, 1000, 10, |
29 | RateLimiter::Mode::kWritesOnly, Env::Default(), | |
30 | false /* auto_tuned */); | |
7c673cae FG |
31 | ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll); |
32 | } | |
33 | ||
34 | TEST_F(RateLimiterTest, StartStop) { | |
35 | std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10)); | |
36 | } | |
37 | ||
11fdf7f2 TL |
38 | TEST_F(RateLimiterTest, Modes) { |
39 | for (auto mode : {RateLimiter::Mode::kWritesOnly, | |
40 | RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { | |
41 | GenericRateLimiter limiter( | |
42 | 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, | |
43 | 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */); | |
44 | limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, | |
45 | RateLimiter::OpType::kRead); | |
46 | if (mode == RateLimiter::Mode::kWritesOnly) { | |
47 | ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH)); | |
48 | } else { | |
49 | ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); | |
50 | } | |
51 | ||
52 | limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, | |
53 | RateLimiter::OpType::kWrite); | |
54 | if (mode == RateLimiter::Mode::kAllIo) { | |
55 | ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); | |
56 | } else { | |
57 | ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); | |
58 | } | |
59 | } | |
60 | } | |
61 | ||
62 | #if !(defined(TRAVIS) && defined(OS_MACOSX)) | |
7c673cae FG |
63 | TEST_F(RateLimiterTest, Rate) { |
64 | auto* env = Env::Default(); | |
65 | struct Arg { | |
66 | Arg(int32_t _target_rate, int _burst) | |
67 | : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)), | |
68 | request_size(_target_rate / 10), | |
69 | burst(_burst) {} | |
70 | std::unique_ptr<RateLimiter> limiter; | |
71 | int32_t request_size; | |
72 | int burst; | |
73 | }; | |
74 | ||
75 | auto writer = [](void* p) { | |
76 | auto* thread_env = Env::Default(); | |
77 | auto* arg = static_cast<Arg*>(p); | |
78 | // Test for 2 seconds | |
79 | auto until = thread_env->NowMicros() + 2 * 1000000; | |
80 | Random r((uint32_t)(thread_env->NowNanos() % | |
81 | std::numeric_limits<uint32_t>::max())); | |
82 | while (thread_env->NowMicros() < until) { | |
83 | for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) { | |
84 | arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, | |
11fdf7f2 TL |
85 | Env::IO_HIGH, nullptr /* stats */, |
86 | RateLimiter::OpType::kWrite); | |
7c673cae FG |
87 | } |
88 | arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, | |
11fdf7f2 | 89 | nullptr /* stats */, RateLimiter::OpType::kWrite); |
7c673cae FG |
90 | } |
91 | }; | |
92 | ||
93 | for (int i = 1; i <= 16; i *= 2) { | |
94 | int32_t target = i * 1024 * 10; | |
95 | Arg arg(target, i / 4 + 1); | |
96 | int64_t old_total_bytes_through = 0; | |
97 | for (int iter = 1; iter <= 2; ++iter) { | |
98 | // second iteration changes the target dynamically | |
99 | if (iter == 2) { | |
100 | target *= 2; | |
101 | arg.limiter->SetBytesPerSecond(target); | |
102 | } | |
103 | auto start = env->NowMicros(); | |
104 | for (int t = 0; t < i; ++t) { | |
105 | env->StartThread(writer, &arg); | |
106 | } | |
107 | env->WaitForJoin(); | |
108 | ||
109 | auto elapsed = env->NowMicros() - start; | |
110 | double rate = | |
111 | (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) * | |
112 | 1000000.0 / elapsed; | |
113 | old_total_bytes_through = arg.limiter->GetTotalBytesThrough(); | |
114 | fprintf(stderr, | |
115 | "request size [1 - %" PRIi32 "], limit %" PRIi32 | |
116 | " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n", | |
117 | arg.request_size - 1, target / 1024, rate / 1024, | |
118 | elapsed / 1000000.0); | |
119 | ||
120 | ASSERT_GE(rate / target, 0.80); | |
121 | ASSERT_LE(rate / target, 1.25); | |
122 | } | |
123 | } | |
124 | } | |
11fdf7f2 | 125 | #endif |
7c673cae FG |
126 | |
127 | TEST_F(RateLimiterTest, LimitChangeTest) { | |
128 | // starvation test when limit changes to a smaller value | |
129 | int64_t refill_period = 1000 * 1000; | |
130 | auto* env = Env::Default(); | |
f67539c2 | 131 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
7c673cae FG |
132 | struct Arg { |
133 | Arg(int32_t _request_size, Env::IOPriority _pri, | |
134 | std::shared_ptr<RateLimiter> _limiter) | |
135 | : request_size(_request_size), pri(_pri), limiter(_limiter) {} | |
136 | int32_t request_size; | |
137 | Env::IOPriority pri; | |
138 | std::shared_ptr<RateLimiter> limiter; | |
139 | }; | |
140 | ||
141 | auto writer = [](void* p) { | |
142 | auto* arg = static_cast<Arg*>(p); | |
11fdf7f2 TL |
143 | arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */, |
144 | RateLimiter::OpType::kWrite); | |
7c673cae FG |
145 | }; |
146 | ||
147 | for (uint32_t i = 1; i <= 16; i <<= 1) { | |
148 | int32_t target = i * 1024 * 10; | |
149 | // refill per second | |
150 | for (int iter = 0; iter < 2; iter++) { | |
151 | std::shared_ptr<RateLimiter> limiter = | |
11fdf7f2 TL |
152 | std::make_shared<GenericRateLimiter>( |
153 | target, refill_period, 10, RateLimiter::Mode::kWritesOnly, | |
154 | Env::Default(), false /* auto_tuned */); | |
f67539c2 | 155 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( |
7c673cae FG |
156 | {{"GenericRateLimiter::Request", |
157 | "RateLimiterTest::LimitChangeTest:changeLimitStart"}, | |
158 | {"RateLimiterTest::LimitChangeTest:changeLimitEnd", | |
159 | "GenericRateLimiter::Refill"}}); | |
160 | Arg arg(target, Env::IO_HIGH, limiter); | |
161 | // The idea behind is to start a request first, then before it refills, | |
162 | // update limit to a different value (2X/0.5X). No starvation should | |
163 | // be guaranteed under any situation | |
164 | // TODO(lightmark): more test cases are welcome. | |
165 | env->StartThread(writer, &arg); | |
166 | int32_t new_limit = (target << 1) >> (iter << 1); | |
167 | TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart"); | |
168 | arg.limiter->SetBytesPerSecond(new_limit); | |
169 | TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd"); | |
170 | env->WaitForJoin(); | |
171 | fprintf(stderr, | |
172 | "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32 | |
173 | "KB/sec, refill period %" PRIi64 " ms\n", | |
174 | target / 1024, new_limit / 1024, refill_period / 1000); | |
175 | } | |
176 | } | |
177 | } | |
178 | ||
11fdf7f2 TL |
179 | TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { |
180 | const std::chrono::seconds kTimePerRefill(1); | |
181 | const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc | |
182 | ||
183 | SpecialEnv special_env(Env::Default()); | |
184 | special_env.no_slowdown_ = true; | |
185 | special_env.time_elapse_only_sleep_ = true; | |
186 | ||
187 | auto stats = CreateDBStatistics(); | |
188 | std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter( | |
189 | 1000 /* rate_bytes_per_sec */, | |
190 | std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, | |
191 | RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */)); | |
192 | ||
193 | // Use callback to advance time because we need to advance (1) after Request() | |
194 | // has determined the bytes are not available; and (2) before Refill() | |
195 | // computes the next refill time (ensuring refill time in the future allows | |
196 | // the next request to drain the rate limiter). | |
f67539c2 | 197 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( |
11fdf7f2 TL |
198 | "GenericRateLimiter::Refill", [&](void* /*arg*/) { |
199 | special_env.SleepForMicroseconds(static_cast<int>( | |
200 | std::chrono::microseconds(kTimePerRefill).count())); | |
201 | }); | |
f67539c2 | 202 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
11fdf7f2 TL |
203 | |
204 | // verify rate limit increases after a sequence of periods where rate limiter | |
205 | // is always drained | |
206 | int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); | |
207 | rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), | |
208 | RateLimiter::OpType::kWrite); | |
209 | while (std::chrono::microseconds(special_env.NowMicros()) <= | |
210 | kRefillsPerTune * kTimePerRefill) { | |
211 | rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), | |
212 | RateLimiter::OpType::kWrite); | |
213 | } | |
214 | int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); | |
215 | ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec); | |
216 | ||
f67539c2 | 217 | ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
11fdf7f2 TL |
218 | |
219 | // decreases after a sequence of periods where rate limiter is not drained | |
220 | orig_bytes_per_sec = new_bytes_per_sec; | |
221 | special_env.SleepForMicroseconds(static_cast<int>( | |
222 | kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count())); | |
223 | // make a request so tuner can be triggered | |
224 | rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(), | |
225 | RateLimiter::OpType::kWrite); | |
226 | new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); | |
227 | ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); | |
228 | } | |
229 | ||
f67539c2 | 230 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
231 | |
232 | int main(int argc, char** argv) { | |
233 | ::testing::InitGoogleTest(&argc, argv); | |
234 | return RUN_ALL_TESTS(); | |
235 | } |