1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "util/rate_limiter.h"
11 #include "monitoring/statistics.h"
12 #include "port/port.h"
13 #include "rocksdb/env.h"
14 #include "util/aligned_buffer.h"
15 #include "util/sync_point.h"
19 size_t RateLimiter::RequestToken(size_t bytes
, size_t alignment
,
20 Env::IOPriority io_priority
, Statistics
* stats
,
21 RateLimiter::OpType op_type
) {
22 if (io_priority
< Env::IO_TOTAL
&& IsRateLimited(op_type
)) {
23 bytes
= std::min(bytes
, static_cast<size_t>(GetSingleBurstBytes()));
26 // Here we may actually require more than burst and block
27 // but we can not write less than one page at a time on direct I/O
28 // thus we may want not to use ratelimiter
29 bytes
= std::max(alignment
, TruncateToPageBoundary(alignment
, bytes
));
31 Request(bytes
, io_priority
, stats
, op_type
);
37 struct GenericRateLimiter::Req
{
38 explicit Req(int64_t _bytes
, port::Mutex
* _mu
)
39 : request_bytes(_bytes
), bytes(_bytes
), cv(_mu
), granted(false) {}
40 int64_t request_bytes
;
46 GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec
,
47 int64_t refill_period_us
,
48 int32_t fairness
, RateLimiter::Mode mode
,
49 Env
* env
, bool auto_tuned
)
51 refill_period_us_(refill_period_us
),
52 rate_bytes_per_sec_(auto_tuned
? rate_bytes_per_sec
/ 2
53 : rate_bytes_per_sec
),
54 refill_bytes_per_period_(
55 CalculateRefillBytesPerPeriod(rate_bytes_per_sec_
)),
58 exit_cv_(&request_mutex_
),
61 next_refill_us_(NowMicrosMonotonic(env_
)),
62 fairness_(fairness
> 100 ? 100 : fairness
),
63 rnd_((uint32_t)time(nullptr)),
65 auto_tuned_(auto_tuned
),
68 max_bytes_per_sec_(rate_bytes_per_sec
),
69 tuned_time_(NowMicrosMonotonic(env_
)) {
70 total_requests_
[0] = 0;
71 total_requests_
[1] = 0;
72 total_bytes_through_
[0] = 0;
73 total_bytes_through_
[1] = 0;
76 GenericRateLimiter::~GenericRateLimiter() {
77 MutexLock
g(&request_mutex_
);
79 requests_to_wait_
= static_cast<int32_t>(queue_
[Env::IO_LOW
].size() +
80 queue_
[Env::IO_HIGH
].size());
81 for (auto& r
: queue_
[Env::IO_HIGH
]) {
84 for (auto& r
: queue_
[Env::IO_LOW
]) {
87 while (requests_to_wait_
> 0) {
92 // This API allows user to dynamically change rate limiter's bytes per second.
93 void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second
) {
94 assert(bytes_per_second
> 0);
95 rate_bytes_per_sec_
= bytes_per_second
;
96 refill_bytes_per_period_
.store(
97 CalculateRefillBytesPerPeriod(bytes_per_second
),
98 std::memory_order_relaxed
);
101 void GenericRateLimiter::Request(int64_t bytes
, const Env::IOPriority pri
,
103 assert(bytes
<= refill_bytes_per_period_
.load(std::memory_order_relaxed
));
104 TEST_SYNC_POINT("GenericRateLimiter::Request");
105 TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
106 &rate_bytes_per_sec_
);
107 MutexLock
g(&request_mutex_
);
110 static const int kRefillsPerTune
= 100;
111 std::chrono::microseconds
now(NowMicrosMonotonic(env_
));
112 if (now
- tuned_time_
>=
113 kRefillsPerTune
* std::chrono::microseconds(refill_period_us_
)) {
122 ++total_requests_
[pri
];
124 if (available_bytes_
>= bytes
) {
125 // Refill thread assigns quota and notifies requests waiting on
126 // the queue under mutex. So if we get here, that means nobody
128 available_bytes_
-= bytes
;
129 total_bytes_through_
[pri
] += bytes
;
133 // Request cannot be satisfied at this moment, enqueue
134 Req
r(bytes
, &request_mutex_
);
135 queue_
[pri
].push_back(&r
);
138 bool timedout
= false;
139 // Leader election, candidates can be:
140 // (1) a new incoming request,
141 // (2) a previous leader, whose quota has not been not assigned yet due
143 // (3) a previous waiter at the front of queue, who got notified by
145 if (leader_
== nullptr &&
146 ((!queue_
[Env::IO_HIGH
].empty() &&
147 &r
== queue_
[Env::IO_HIGH
].front()) ||
148 (!queue_
[Env::IO_LOW
].empty() &&
149 &r
== queue_
[Env::IO_LOW
].front()))) {
151 int64_t delta
= next_refill_us_
- NowMicrosMonotonic(env_
);
152 delta
= delta
> 0 ? delta
: 0;
156 int64_t wait_until
= env_
->NowMicros() + delta
;
157 RecordTick(stats
, NUMBER_RATE_LIMITER_DRAINS
);
159 timedout
= r
.cv
.TimedWait(wait_until
);
162 // Not at the front of queue or an leader has already been elected
166 // request_mutex_ is held from now on
173 // Make sure the waken up request is always the header of its queue
175 (!queue_
[Env::IO_HIGH
].empty() &&
176 &r
== queue_
[Env::IO_HIGH
].front()) ||
177 (!queue_
[Env::IO_LOW
].empty() &&
178 &r
== queue_
[Env::IO_LOW
].front()));
179 assert(leader_
== nullptr ||
180 (!queue_
[Env::IO_HIGH
].empty() &&
181 leader_
== queue_
[Env::IO_HIGH
].front()) ||
182 (!queue_
[Env::IO_LOW
].empty() &&
183 leader_
== queue_
[Env::IO_LOW
].front()));
186 // Waken up from TimedWait()
188 // Time to do refill!
191 // Re-elect a new leader regardless. This is to simplify the
192 // election handling.
195 // Notify the header of queue if current leader is going away
197 // Current leader already got granted with quota. Notify header
198 // of waiting queue to participate next round of election.
199 assert((queue_
[Env::IO_HIGH
].empty() ||
200 &r
!= queue_
[Env::IO_HIGH
].front()) &&
201 (queue_
[Env::IO_LOW
].empty() ||
202 &r
!= queue_
[Env::IO_LOW
].front()));
203 if (!queue_
[Env::IO_HIGH
].empty()) {
204 queue_
[Env::IO_HIGH
].front()->cv
.Signal();
205 } else if (!queue_
[Env::IO_LOW
].empty()) {
206 queue_
[Env::IO_LOW
].front()->cv
.Signal();
212 // Spontaneous wake up, need to continue to wait
217 // Waken up by previous leader:
218 // (1) if requested quota is granted, it is done.
219 // (2) if requested quota is not granted, this means current thread
220 // was picked as a new leader candidate (previous leader got quota).
221 // It needs to participate leader election because a new request may
222 // come in before this thread gets waken up. So it may actually need
223 // to do Wait() again.
226 } while (!r
.granted
);
229 void GenericRateLimiter::Refill() {
230 TEST_SYNC_POINT("GenericRateLimiter::Refill");
231 next_refill_us_
= NowMicrosMonotonic(env_
) + refill_period_us_
;
232 // Carry over the left over quota from the last period
233 auto refill_bytes_per_period
=
234 refill_bytes_per_period_
.load(std::memory_order_relaxed
);
235 if (available_bytes_
< refill_bytes_per_period
) {
236 available_bytes_
+= refill_bytes_per_period
;
239 int use_low_pri_first
= rnd_
.OneIn(fairness_
) ? 0 : 1;
240 for (int q
= 0; q
< 2; ++q
) {
241 auto use_pri
= (use_low_pri_first
== q
) ? Env::IO_LOW
: Env::IO_HIGH
;
242 auto* queue
= &queue_
[use_pri
];
243 while (!queue
->empty()) {
244 auto* next_req
= queue
->front();
245 if (available_bytes_
< next_req
->request_bytes
) {
247 next_req
->request_bytes
-= available_bytes_
;
248 available_bytes_
= 0;
251 available_bytes_
-= next_req
->request_bytes
;
252 next_req
->request_bytes
= 0;
253 total_bytes_through_
[use_pri
] += next_req
->bytes
;
256 next_req
->granted
= true;
257 if (next_req
!= leader_
) {
258 // Quota granted, signal the thread
259 next_req
->cv
.Signal();
265 int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
266 int64_t rate_bytes_per_sec
) {
267 if (port::kMaxInt64
/ rate_bytes_per_sec
< refill_period_us_
) {
268 // Avoid unexpected result in the overflow case. The result now is still
269 // inaccurate but is a number that is large enough.
270 return port::kMaxInt64
/ 1000000;
272 return std::max(kMinRefillBytesPerPeriod
,
273 rate_bytes_per_sec
* refill_period_us_
/ 1000000);
277 Status
GenericRateLimiter::Tune() {
278 const int kLowWatermarkPct
= 50;
279 const int kHighWatermarkPct
= 90;
280 const int kAdjustFactorPct
= 5;
281 // computed rate limit will be in
282 // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
283 const int kAllowedRangeFactor
= 20;
285 std::chrono::microseconds prev_tuned_time
= tuned_time_
;
286 tuned_time_
= std::chrono::microseconds(NowMicrosMonotonic(env_
));
288 int64_t elapsed_intervals
= (tuned_time_
- prev_tuned_time
+
289 std::chrono::microseconds(refill_period_us_
) -
290 std::chrono::microseconds(1)) /
291 std::chrono::microseconds(refill_period_us_
);
292 // We tune every kRefillsPerTune intervals, so the overflow and division-by-
293 // zero conditions should never happen.
294 assert(num_drains_
- prev_num_drains_
<= port::kMaxInt64
/ 100);
295 assert(elapsed_intervals
> 0);
296 int64_t drained_pct
=
297 (num_drains_
- prev_num_drains_
) * 100 / elapsed_intervals
;
299 int64_t prev_bytes_per_sec
= GetBytesPerSecond();
300 int64_t new_bytes_per_sec
;
301 if (drained_pct
== 0) {
302 new_bytes_per_sec
= max_bytes_per_sec_
/ kAllowedRangeFactor
;
303 } else if (drained_pct
< kLowWatermarkPct
) {
304 // sanitize to prevent overflow
305 int64_t sanitized_prev_bytes_per_sec
=
306 std::min(prev_bytes_per_sec
, port::kMaxInt64
/ 100);
308 std::max(max_bytes_per_sec_
/ kAllowedRangeFactor
,
309 sanitized_prev_bytes_per_sec
* 100 / (100 + kAdjustFactorPct
));
310 } else if (drained_pct
> kHighWatermarkPct
) {
311 // sanitize to prevent overflow
312 int64_t sanitized_prev_bytes_per_sec
= std::min(
313 prev_bytes_per_sec
, port::kMaxInt64
/ (100 + kAdjustFactorPct
));
315 std::min(max_bytes_per_sec_
,
316 sanitized_prev_bytes_per_sec
* (100 + kAdjustFactorPct
) / 100);
318 new_bytes_per_sec
= prev_bytes_per_sec
;
320 if (new_bytes_per_sec
!= prev_bytes_per_sec
) {
321 SetBytesPerSecond(new_bytes_per_sec
);
323 num_drains_
= prev_num_drains_
;
327 RateLimiter
* NewGenericRateLimiter(
328 int64_t rate_bytes_per_sec
, int64_t refill_period_us
/* = 100 * 1000 */,
329 int32_t fairness
/* = 10 */,
330 RateLimiter::Mode mode
/* = RateLimiter::Mode::kWritesOnly */,
331 bool auto_tuned
/* = false */) {
332 assert(rate_bytes_per_sec
> 0);
333 assert(refill_period_us
> 0);
334 assert(fairness
> 0);
335 return new GenericRateLimiter(rate_bytes_per_sec
, refill_period_us
, fairness
,
336 mode
, Env::Default(), auto_tuned
);
339 } // namespace rocksdb