]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/rate_limiter_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / util / rate_limiter_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#ifndef __STDC_FORMAT_MACROS
11#define __STDC_FORMAT_MACROS
12#endif
13
14#include "util/rate_limiter.h"
11fdf7f2 15
7c673cae 16#include <inttypes.h>
11fdf7f2 17#include <chrono>
7c673cae 18#include <limits>
11fdf7f2
TL
19
20#include "db/db_test_util.h"
7c673cae
FG
21#include "rocksdb/env.h"
22#include "util/random.h"
23#include "util/sync_point.h"
24#include "util/testharness.h"
25
26namespace rocksdb {
27
28// TODO(yhchiang): the rate will not be accurate when we run test in parallel.
29class RateLimiterTest : public testing::Test {};
30
31TEST_F(RateLimiterTest, OverflowRate) {
11fdf7f2
TL
32 GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
33 RateLimiter::Mode::kWritesOnly, Env::Default(),
34 false /* auto_tuned */);
7c673cae
FG
35 ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
36}
37
38TEST_F(RateLimiterTest, StartStop) {
39 std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
40}
41
11fdf7f2
TL
42TEST_F(RateLimiterTest, Modes) {
43 for (auto mode : {RateLimiter::Mode::kWritesOnly,
44 RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
45 GenericRateLimiter limiter(
46 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
47 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
48 limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
49 RateLimiter::OpType::kRead);
50 if (mode == RateLimiter::Mode::kWritesOnly) {
51 ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH));
52 } else {
53 ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
54 }
55
56 limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
57 RateLimiter::OpType::kWrite);
58 if (mode == RateLimiter::Mode::kAllIo) {
59 ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
60 } else {
61 ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
62 }
63 }
64}
65
66#if !(defined(TRAVIS) && defined(OS_MACOSX))
7c673cae
FG
67TEST_F(RateLimiterTest, Rate) {
68 auto* env = Env::Default();
69 struct Arg {
70 Arg(int32_t _target_rate, int _burst)
71 : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
72 request_size(_target_rate / 10),
73 burst(_burst) {}
74 std::unique_ptr<RateLimiter> limiter;
75 int32_t request_size;
76 int burst;
77 };
78
79 auto writer = [](void* p) {
80 auto* thread_env = Env::Default();
81 auto* arg = static_cast<Arg*>(p);
82 // Test for 2 seconds
83 auto until = thread_env->NowMicros() + 2 * 1000000;
84 Random r((uint32_t)(thread_env->NowNanos() %
85 std::numeric_limits<uint32_t>::max()));
86 while (thread_env->NowMicros() < until) {
87 for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
88 arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
11fdf7f2
TL
89 Env::IO_HIGH, nullptr /* stats */,
90 RateLimiter::OpType::kWrite);
7c673cae
FG
91 }
92 arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
11fdf7f2 93 nullptr /* stats */, RateLimiter::OpType::kWrite);
7c673cae
FG
94 }
95 };
96
97 for (int i = 1; i <= 16; i *= 2) {
98 int32_t target = i * 1024 * 10;
99 Arg arg(target, i / 4 + 1);
100 int64_t old_total_bytes_through = 0;
101 for (int iter = 1; iter <= 2; ++iter) {
102 // second iteration changes the target dynamically
103 if (iter == 2) {
104 target *= 2;
105 arg.limiter->SetBytesPerSecond(target);
106 }
107 auto start = env->NowMicros();
108 for (int t = 0; t < i; ++t) {
109 env->StartThread(writer, &arg);
110 }
111 env->WaitForJoin();
112
113 auto elapsed = env->NowMicros() - start;
114 double rate =
115 (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
116 1000000.0 / elapsed;
117 old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
118 fprintf(stderr,
119 "request size [1 - %" PRIi32 "], limit %" PRIi32
120 " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
121 arg.request_size - 1, target / 1024, rate / 1024,
122 elapsed / 1000000.0);
123
124 ASSERT_GE(rate / target, 0.80);
125 ASSERT_LE(rate / target, 1.25);
126 }
127 }
128}
11fdf7f2 129#endif
7c673cae
FG
130
131TEST_F(RateLimiterTest, LimitChangeTest) {
132 // starvation test when limit changes to a smaller value
133 int64_t refill_period = 1000 * 1000;
134 auto* env = Env::Default();
135 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
136 struct Arg {
137 Arg(int32_t _request_size, Env::IOPriority _pri,
138 std::shared_ptr<RateLimiter> _limiter)
139 : request_size(_request_size), pri(_pri), limiter(_limiter) {}
140 int32_t request_size;
141 Env::IOPriority pri;
142 std::shared_ptr<RateLimiter> limiter;
143 };
144
145 auto writer = [](void* p) {
146 auto* arg = static_cast<Arg*>(p);
11fdf7f2
TL
147 arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */,
148 RateLimiter::OpType::kWrite);
7c673cae
FG
149 };
150
151 for (uint32_t i = 1; i <= 16; i <<= 1) {
152 int32_t target = i * 1024 * 10;
153 // refill per second
154 for (int iter = 0; iter < 2; iter++) {
155 std::shared_ptr<RateLimiter> limiter =
11fdf7f2
TL
156 std::make_shared<GenericRateLimiter>(
157 target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
158 Env::Default(), false /* auto_tuned */);
7c673cae
FG
159 rocksdb::SyncPoint::GetInstance()->LoadDependency(
160 {{"GenericRateLimiter::Request",
161 "RateLimiterTest::LimitChangeTest:changeLimitStart"},
162 {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
163 "GenericRateLimiter::Refill"}});
164 Arg arg(target, Env::IO_HIGH, limiter);
165 // The idea behind is to start a request first, then before it refills,
166 // update limit to a different value (2X/0.5X). No starvation should
167 // be guaranteed under any situation
168 // TODO(lightmark): more test cases are welcome.
169 env->StartThread(writer, &arg);
170 int32_t new_limit = (target << 1) >> (iter << 1);
171 TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
172 arg.limiter->SetBytesPerSecond(new_limit);
173 TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
174 env->WaitForJoin();
175 fprintf(stderr,
176 "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
177 "KB/sec, refill period %" PRIi64 " ms\n",
178 target / 1024, new_limit / 1024, refill_period / 1000);
179 }
180 }
181}
182
11fdf7f2
TL
183TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
184 const std::chrono::seconds kTimePerRefill(1);
185 const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
186
187 SpecialEnv special_env(Env::Default());
188 special_env.no_slowdown_ = true;
189 special_env.time_elapse_only_sleep_ = true;
190
191 auto stats = CreateDBStatistics();
192 std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
193 1000 /* rate_bytes_per_sec */,
194 std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
195 RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
196
197 // Use callback to advance time because we need to advance (1) after Request()
198 // has determined the bytes are not available; and (2) before Refill()
199 // computes the next refill time (ensuring refill time in the future allows
200 // the next request to drain the rate limiter).
201 rocksdb::SyncPoint::GetInstance()->SetCallBack(
202 "GenericRateLimiter::Refill", [&](void* /*arg*/) {
203 special_env.SleepForMicroseconds(static_cast<int>(
204 std::chrono::microseconds(kTimePerRefill).count()));
205 });
206 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
207
208 // verify rate limit increases after a sequence of periods where rate limiter
209 // is always drained
210 int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
211 rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
212 RateLimiter::OpType::kWrite);
213 while (std::chrono::microseconds(special_env.NowMicros()) <=
214 kRefillsPerTune * kTimePerRefill) {
215 rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
216 RateLimiter::OpType::kWrite);
217 }
218 int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
219 ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
220
221 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
222
223 // decreases after a sequence of periods where rate limiter is not drained
224 orig_bytes_per_sec = new_bytes_per_sec;
225 special_env.SleepForMicroseconds(static_cast<int>(
226 kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
227 // make a request so tuner can be triggered
228 rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),
229 RateLimiter::OpType::kWrite);
230 new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
231 ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
232}
233
7c673cae
FG
234} // namespace rocksdb
235
236int main(int argc, char** argv) {
237 ::testing::InitGoogleTest(&argc, argv);
238 return RUN_ALL_TESTS();
239}