]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "util/rate_limiter.h" | |
11 | #include "monitoring/statistics.h" | |
12 | #include "port/port.h" | |
13 | #include "rocksdb/env.h" | |
11fdf7f2 | 14 | #include "util/aligned_buffer.h" |
7c673cae FG |
15 | #include "util/sync_point.h" |
16 | ||
17 | namespace rocksdb { | |
18 | ||
11fdf7f2 TL |
19 | size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, |
20 | Env::IOPriority io_priority, Statistics* stats, | |
21 | RateLimiter::OpType op_type) { | |
22 | if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) { | |
23 | bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes())); | |
24 | ||
25 | if (alignment > 0) { | |
26 | // Here we may actually require more than burst and block | |
27 | // but we can not write less than one page at a time on direct I/O | |
28 | // thus we may want not to use ratelimiter | |
29 | bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); | |
30 | } | |
31 | Request(bytes, io_priority, stats, op_type); | |
32 | } | |
33 | return bytes; | |
34 | } | |
7c673cae FG |
35 | |
36 | // Pending request | |
37 | struct GenericRateLimiter::Req { | |
38 | explicit Req(int64_t _bytes, port::Mutex* _mu) | |
39 | : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} | |
40 | int64_t request_bytes; | |
41 | int64_t bytes; | |
42 | port::CondVar cv; | |
43 | bool granted; | |
44 | }; | |
45 | ||
46 | GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, | |
47 | int64_t refill_period_us, | |
11fdf7f2 TL |
48 | int32_t fairness, RateLimiter::Mode mode, |
49 | Env* env, bool auto_tuned) | |
50 | : RateLimiter(mode), | |
51 | refill_period_us_(refill_period_us), | |
52 | rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 | |
53 | : rate_bytes_per_sec), | |
7c673cae | 54 | refill_bytes_per_period_( |
11fdf7f2 TL |
55 | CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), |
56 | env_(env), | |
7c673cae FG |
57 | stop_(false), |
58 | exit_cv_(&request_mutex_), | |
59 | requests_to_wait_(0), | |
60 | available_bytes_(0), | |
61 | next_refill_us_(NowMicrosMonotonic(env_)), | |
62 | fairness_(fairness > 100 ? 100 : fairness), | |
63 | rnd_((uint32_t)time(nullptr)), | |
11fdf7f2 TL |
64 | leader_(nullptr), |
65 | auto_tuned_(auto_tuned), | |
66 | num_drains_(0), | |
67 | prev_num_drains_(0), | |
68 | max_bytes_per_sec_(rate_bytes_per_sec), | |
69 | tuned_time_(NowMicrosMonotonic(env_)) { | |
7c673cae FG |
70 | total_requests_[0] = 0; |
71 | total_requests_[1] = 0; | |
72 | total_bytes_through_[0] = 0; | |
73 | total_bytes_through_[1] = 0; | |
74 | } | |
75 | ||
76 | GenericRateLimiter::~GenericRateLimiter() { | |
77 | MutexLock g(&request_mutex_); | |
78 | stop_ = true; | |
79 | requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() + | |
80 | queue_[Env::IO_HIGH].size()); | |
81 | for (auto& r : queue_[Env::IO_HIGH]) { | |
82 | r->cv.Signal(); | |
83 | } | |
84 | for (auto& r : queue_[Env::IO_LOW]) { | |
85 | r->cv.Signal(); | |
86 | } | |
87 | while (requests_to_wait_ > 0) { | |
88 | exit_cv_.Wait(); | |
89 | } | |
90 | } | |
91 | ||
92 | // This API allows user to dynamically change rate limiter's bytes per second. | |
93 | void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { | |
94 | assert(bytes_per_second > 0); | |
11fdf7f2 | 95 | rate_bytes_per_sec_ = bytes_per_second; |
7c673cae FG |
96 | refill_bytes_per_period_.store( |
97 | CalculateRefillBytesPerPeriod(bytes_per_second), | |
98 | std::memory_order_relaxed); | |
99 | } | |
100 | ||
101 | void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, | |
102 | Statistics* stats) { | |
103 | assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); | |
104 | TEST_SYNC_POINT("GenericRateLimiter::Request"); | |
11fdf7f2 TL |
105 | TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1", |
106 | &rate_bytes_per_sec_); | |
7c673cae | 107 | MutexLock g(&request_mutex_); |
11fdf7f2 TL |
108 | |
109 | if (auto_tuned_) { | |
110 | static const int kRefillsPerTune = 100; | |
111 | std::chrono::microseconds now(NowMicrosMonotonic(env_)); | |
112 | if (now - tuned_time_ >= | |
113 | kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { | |
114 | Tune(); | |
115 | } | |
116 | } | |
117 | ||
7c673cae FG |
118 | if (stop_) { |
119 | return; | |
120 | } | |
121 | ||
122 | ++total_requests_[pri]; | |
123 | ||
124 | if (available_bytes_ >= bytes) { | |
125 | // Refill thread assigns quota and notifies requests waiting on | |
126 | // the queue under mutex. So if we get here, that means nobody | |
127 | // is waiting? | |
128 | available_bytes_ -= bytes; | |
129 | total_bytes_through_[pri] += bytes; | |
130 | return; | |
131 | } | |
132 | ||
133 | // Request cannot be satisfied at this moment, enqueue | |
134 | Req r(bytes, &request_mutex_); | |
135 | queue_[pri].push_back(&r); | |
136 | ||
137 | do { | |
138 | bool timedout = false; | |
139 | // Leader election, candidates can be: | |
140 | // (1) a new incoming request, | |
141 | // (2) a previous leader, whose quota has not been not assigned yet due | |
142 | // to lower priority | |
143 | // (3) a previous waiter at the front of queue, who got notified by | |
144 | // previous leader | |
145 | if (leader_ == nullptr && | |
146 | ((!queue_[Env::IO_HIGH].empty() && | |
147 | &r == queue_[Env::IO_HIGH].front()) || | |
148 | (!queue_[Env::IO_LOW].empty() && | |
149 | &r == queue_[Env::IO_LOW].front()))) { | |
150 | leader_ = &r; | |
151 | int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_); | |
152 | delta = delta > 0 ? delta : 0; | |
153 | if (delta == 0) { | |
154 | timedout = true; | |
155 | } else { | |
156 | int64_t wait_until = env_->NowMicros() + delta; | |
157 | RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); | |
11fdf7f2 | 158 | ++num_drains_; |
7c673cae FG |
159 | timedout = r.cv.TimedWait(wait_until); |
160 | } | |
161 | } else { | |
162 | // Not at the front of queue or an leader has already been elected | |
163 | r.cv.Wait(); | |
164 | } | |
165 | ||
166 | // request_mutex_ is held from now on | |
167 | if (stop_) { | |
168 | --requests_to_wait_; | |
169 | exit_cv_.Signal(); | |
170 | return; | |
171 | } | |
172 | ||
173 | // Make sure the waken up request is always the header of its queue | |
174 | assert(r.granted || | |
175 | (!queue_[Env::IO_HIGH].empty() && | |
176 | &r == queue_[Env::IO_HIGH].front()) || | |
177 | (!queue_[Env::IO_LOW].empty() && | |
178 | &r == queue_[Env::IO_LOW].front())); | |
179 | assert(leader_ == nullptr || | |
180 | (!queue_[Env::IO_HIGH].empty() && | |
181 | leader_ == queue_[Env::IO_HIGH].front()) || | |
182 | (!queue_[Env::IO_LOW].empty() && | |
183 | leader_ == queue_[Env::IO_LOW].front())); | |
184 | ||
185 | if (leader_ == &r) { | |
186 | // Waken up from TimedWait() | |
187 | if (timedout) { | |
188 | // Time to do refill! | |
189 | Refill(); | |
190 | ||
191 | // Re-elect a new leader regardless. This is to simplify the | |
192 | // election handling. | |
193 | leader_ = nullptr; | |
194 | ||
195 | // Notify the header of queue if current leader is going away | |
196 | if (r.granted) { | |
197 | // Current leader already got granted with quota. Notify header | |
198 | // of waiting queue to participate next round of election. | |
199 | assert((queue_[Env::IO_HIGH].empty() || | |
200 | &r != queue_[Env::IO_HIGH].front()) && | |
201 | (queue_[Env::IO_LOW].empty() || | |
202 | &r != queue_[Env::IO_LOW].front())); | |
203 | if (!queue_[Env::IO_HIGH].empty()) { | |
204 | queue_[Env::IO_HIGH].front()->cv.Signal(); | |
205 | } else if (!queue_[Env::IO_LOW].empty()) { | |
206 | queue_[Env::IO_LOW].front()->cv.Signal(); | |
207 | } | |
208 | // Done | |
209 | break; | |
210 | } | |
211 | } else { | |
212 | // Spontaneous wake up, need to continue to wait | |
213 | assert(!r.granted); | |
214 | leader_ = nullptr; | |
215 | } | |
216 | } else { | |
217 | // Waken up by previous leader: | |
218 | // (1) if requested quota is granted, it is done. | |
219 | // (2) if requested quota is not granted, this means current thread | |
220 | // was picked as a new leader candidate (previous leader got quota). | |
221 | // It needs to participate leader election because a new request may | |
222 | // come in before this thread gets waken up. So it may actually need | |
223 | // to do Wait() again. | |
224 | assert(!timedout); | |
225 | } | |
226 | } while (!r.granted); | |
227 | } | |
228 | ||
229 | void GenericRateLimiter::Refill() { | |
230 | TEST_SYNC_POINT("GenericRateLimiter::Refill"); | |
231 | next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_; | |
232 | // Carry over the left over quota from the last period | |
233 | auto refill_bytes_per_period = | |
234 | refill_bytes_per_period_.load(std::memory_order_relaxed); | |
235 | if (available_bytes_ < refill_bytes_per_period) { | |
236 | available_bytes_ += refill_bytes_per_period; | |
237 | } | |
238 | ||
239 | int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1; | |
240 | for (int q = 0; q < 2; ++q) { | |
241 | auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH; | |
242 | auto* queue = &queue_[use_pri]; | |
243 | while (!queue->empty()) { | |
244 | auto* next_req = queue->front(); | |
245 | if (available_bytes_ < next_req->request_bytes) { | |
246 | // avoid starvation | |
247 | next_req->request_bytes -= available_bytes_; | |
248 | available_bytes_ = 0; | |
249 | break; | |
250 | } | |
251 | available_bytes_ -= next_req->request_bytes; | |
252 | next_req->request_bytes = 0; | |
253 | total_bytes_through_[use_pri] += next_req->bytes; | |
254 | queue->pop_front(); | |
255 | ||
256 | next_req->granted = true; | |
257 | if (next_req != leader_) { | |
258 | // Quota granted, signal the thread | |
259 | next_req->cv.Signal(); | |
260 | } | |
261 | } | |
262 | } | |
263 | } | |
264 | ||
265 | int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( | |
266 | int64_t rate_bytes_per_sec) { | |
267 | if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) { | |
268 | // Avoid unexpected result in the overflow case. The result now is still | |
269 | // inaccurate but is a number that is large enough. | |
270 | return port::kMaxInt64 / 1000000; | |
271 | } else { | |
272 | return std::max(kMinRefillBytesPerPeriod, | |
273 | rate_bytes_per_sec * refill_period_us_ / 1000000); | |
274 | } | |
275 | } | |
276 | ||
11fdf7f2 TL |
277 | Status GenericRateLimiter::Tune() { |
278 | const int kLowWatermarkPct = 50; | |
279 | const int kHighWatermarkPct = 90; | |
280 | const int kAdjustFactorPct = 5; | |
281 | // computed rate limit will be in | |
282 | // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`. | |
283 | const int kAllowedRangeFactor = 20; | |
284 | ||
285 | std::chrono::microseconds prev_tuned_time = tuned_time_; | |
286 | tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_)); | |
287 | ||
288 | int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + | |
289 | std::chrono::microseconds(refill_period_us_) - | |
290 | std::chrono::microseconds(1)) / | |
291 | std::chrono::microseconds(refill_period_us_); | |
292 | // We tune every kRefillsPerTune intervals, so the overflow and division-by- | |
293 | // zero conditions should never happen. | |
294 | assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100); | |
295 | assert(elapsed_intervals > 0); | |
296 | int64_t drained_pct = | |
297 | (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals; | |
298 | ||
299 | int64_t prev_bytes_per_sec = GetBytesPerSecond(); | |
300 | int64_t new_bytes_per_sec; | |
301 | if (drained_pct == 0) { | |
302 | new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; | |
303 | } else if (drained_pct < kLowWatermarkPct) { | |
304 | // sanitize to prevent overflow | |
305 | int64_t sanitized_prev_bytes_per_sec = | |
306 | std::min(prev_bytes_per_sec, port::kMaxInt64 / 100); | |
307 | new_bytes_per_sec = | |
308 | std::max(max_bytes_per_sec_ / kAllowedRangeFactor, | |
309 | sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); | |
310 | } else if (drained_pct > kHighWatermarkPct) { | |
311 | // sanitize to prevent overflow | |
312 | int64_t sanitized_prev_bytes_per_sec = std::min( | |
313 | prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct)); | |
314 | new_bytes_per_sec = | |
315 | std::min(max_bytes_per_sec_, | |
316 | sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); | |
317 | } else { | |
318 | new_bytes_per_sec = prev_bytes_per_sec; | |
319 | } | |
320 | if (new_bytes_per_sec != prev_bytes_per_sec) { | |
321 | SetBytesPerSecond(new_bytes_per_sec); | |
322 | } | |
323 | num_drains_ = prev_num_drains_; | |
324 | return Status::OK(); | |
325 | } | |
326 | ||
7c673cae | 327 | RateLimiter* NewGenericRateLimiter( |
11fdf7f2 TL |
328 | int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, |
329 | int32_t fairness /* = 10 */, | |
330 | RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */, | |
331 | bool auto_tuned /* = false */) { | |
7c673cae FG |
332 | assert(rate_bytes_per_sec > 0); |
333 | assert(refill_period_us > 0); | |
334 | assert(fairness > 0); | |
11fdf7f2 TL |
335 | return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, |
336 | mode, Env::Default(), auto_tuned); | |
7c673cae FG |
337 | } |
338 | ||
339 | } // namespace rocksdb |