]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/rate_limiter.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / util / rate_limiter.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "util/rate_limiter.h"
11#include "monitoring/statistics.h"
12#include "port/port.h"
13#include "rocksdb/env.h"
11fdf7f2 14#include "util/aligned_buffer.h"
7c673cae
FG
15#include "util/sync_point.h"
16
17namespace rocksdb {
18
11fdf7f2
TL
19size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
20 Env::IOPriority io_priority, Statistics* stats,
21 RateLimiter::OpType op_type) {
22 if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
23 bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
24
25 if (alignment > 0) {
26 // Here we may actually require more than burst and block
27 // but we can not write less than one page at a time on direct I/O
28 // thus we may want not to use ratelimiter
29 bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
30 }
31 Request(bytes, io_priority, stats, op_type);
32 }
33 return bytes;
34}
7c673cae
FG
35
36// Pending request
37struct GenericRateLimiter::Req {
38 explicit Req(int64_t _bytes, port::Mutex* _mu)
39 : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}
40 int64_t request_bytes;
41 int64_t bytes;
42 port::CondVar cv;
43 bool granted;
44};
45
46GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
47 int64_t refill_period_us,
11fdf7f2
TL
48 int32_t fairness, RateLimiter::Mode mode,
49 Env* env, bool auto_tuned)
50 : RateLimiter(mode),
51 refill_period_us_(refill_period_us),
52 rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
53 : rate_bytes_per_sec),
7c673cae 54 refill_bytes_per_period_(
11fdf7f2
TL
55 CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
56 env_(env),
7c673cae
FG
57 stop_(false),
58 exit_cv_(&request_mutex_),
59 requests_to_wait_(0),
60 available_bytes_(0),
61 next_refill_us_(NowMicrosMonotonic(env_)),
62 fairness_(fairness > 100 ? 100 : fairness),
63 rnd_((uint32_t)time(nullptr)),
11fdf7f2
TL
64 leader_(nullptr),
65 auto_tuned_(auto_tuned),
66 num_drains_(0),
67 prev_num_drains_(0),
68 max_bytes_per_sec_(rate_bytes_per_sec),
69 tuned_time_(NowMicrosMonotonic(env_)) {
7c673cae
FG
70 total_requests_[0] = 0;
71 total_requests_[1] = 0;
72 total_bytes_through_[0] = 0;
73 total_bytes_through_[1] = 0;
74}
75
76GenericRateLimiter::~GenericRateLimiter() {
77 MutexLock g(&request_mutex_);
78 stop_ = true;
79 requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() +
80 queue_[Env::IO_HIGH].size());
81 for (auto& r : queue_[Env::IO_HIGH]) {
82 r->cv.Signal();
83 }
84 for (auto& r : queue_[Env::IO_LOW]) {
85 r->cv.Signal();
86 }
87 while (requests_to_wait_ > 0) {
88 exit_cv_.Wait();
89 }
90}
91
92// This API allows user to dynamically change rate limiter's bytes per second.
93void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
94 assert(bytes_per_second > 0);
11fdf7f2 95 rate_bytes_per_sec_ = bytes_per_second;
7c673cae
FG
96 refill_bytes_per_period_.store(
97 CalculateRefillBytesPerPeriod(bytes_per_second),
98 std::memory_order_relaxed);
99}
100
101void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
102 Statistics* stats) {
103 assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
104 TEST_SYNC_POINT("GenericRateLimiter::Request");
11fdf7f2
TL
105 TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
106 &rate_bytes_per_sec_);
7c673cae 107 MutexLock g(&request_mutex_);
11fdf7f2
TL
108
109 if (auto_tuned_) {
110 static const int kRefillsPerTune = 100;
111 std::chrono::microseconds now(NowMicrosMonotonic(env_));
112 if (now - tuned_time_ >=
113 kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
114 Tune();
115 }
116 }
117
7c673cae
FG
118 if (stop_) {
119 return;
120 }
121
122 ++total_requests_[pri];
123
124 if (available_bytes_ >= bytes) {
125 // Refill thread assigns quota and notifies requests waiting on
126 // the queue under mutex. So if we get here, that means nobody
127 // is waiting?
128 available_bytes_ -= bytes;
129 total_bytes_through_[pri] += bytes;
130 return;
131 }
132
133 // Request cannot be satisfied at this moment, enqueue
134 Req r(bytes, &request_mutex_);
135 queue_[pri].push_back(&r);
136
137 do {
138 bool timedout = false;
139 // Leader election, candidates can be:
140 // (1) a new incoming request,
141 // (2) a previous leader, whose quota has not been not assigned yet due
142 // to lower priority
143 // (3) a previous waiter at the front of queue, who got notified by
144 // previous leader
145 if (leader_ == nullptr &&
146 ((!queue_[Env::IO_HIGH].empty() &&
147 &r == queue_[Env::IO_HIGH].front()) ||
148 (!queue_[Env::IO_LOW].empty() &&
149 &r == queue_[Env::IO_LOW].front()))) {
150 leader_ = &r;
151 int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_);
152 delta = delta > 0 ? delta : 0;
153 if (delta == 0) {
154 timedout = true;
155 } else {
156 int64_t wait_until = env_->NowMicros() + delta;
157 RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
11fdf7f2 158 ++num_drains_;
7c673cae
FG
159 timedout = r.cv.TimedWait(wait_until);
160 }
161 } else {
162 // Not at the front of queue or an leader has already been elected
163 r.cv.Wait();
164 }
165
166 // request_mutex_ is held from now on
167 if (stop_) {
168 --requests_to_wait_;
169 exit_cv_.Signal();
170 return;
171 }
172
173 // Make sure the waken up request is always the header of its queue
174 assert(r.granted ||
175 (!queue_[Env::IO_HIGH].empty() &&
176 &r == queue_[Env::IO_HIGH].front()) ||
177 (!queue_[Env::IO_LOW].empty() &&
178 &r == queue_[Env::IO_LOW].front()));
179 assert(leader_ == nullptr ||
180 (!queue_[Env::IO_HIGH].empty() &&
181 leader_ == queue_[Env::IO_HIGH].front()) ||
182 (!queue_[Env::IO_LOW].empty() &&
183 leader_ == queue_[Env::IO_LOW].front()));
184
185 if (leader_ == &r) {
186 // Waken up from TimedWait()
187 if (timedout) {
188 // Time to do refill!
189 Refill();
190
191 // Re-elect a new leader regardless. This is to simplify the
192 // election handling.
193 leader_ = nullptr;
194
195 // Notify the header of queue if current leader is going away
196 if (r.granted) {
197 // Current leader already got granted with quota. Notify header
198 // of waiting queue to participate next round of election.
199 assert((queue_[Env::IO_HIGH].empty() ||
200 &r != queue_[Env::IO_HIGH].front()) &&
201 (queue_[Env::IO_LOW].empty() ||
202 &r != queue_[Env::IO_LOW].front()));
203 if (!queue_[Env::IO_HIGH].empty()) {
204 queue_[Env::IO_HIGH].front()->cv.Signal();
205 } else if (!queue_[Env::IO_LOW].empty()) {
206 queue_[Env::IO_LOW].front()->cv.Signal();
207 }
208 // Done
209 break;
210 }
211 } else {
212 // Spontaneous wake up, need to continue to wait
213 assert(!r.granted);
214 leader_ = nullptr;
215 }
216 } else {
217 // Waken up by previous leader:
218 // (1) if requested quota is granted, it is done.
219 // (2) if requested quota is not granted, this means current thread
220 // was picked as a new leader candidate (previous leader got quota).
221 // It needs to participate leader election because a new request may
222 // come in before this thread gets waken up. So it may actually need
223 // to do Wait() again.
224 assert(!timedout);
225 }
226 } while (!r.granted);
227}
228
229void GenericRateLimiter::Refill() {
230 TEST_SYNC_POINT("GenericRateLimiter::Refill");
231 next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_;
232 // Carry over the left over quota from the last period
233 auto refill_bytes_per_period =
234 refill_bytes_per_period_.load(std::memory_order_relaxed);
235 if (available_bytes_ < refill_bytes_per_period) {
236 available_bytes_ += refill_bytes_per_period;
237 }
238
239 int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
240 for (int q = 0; q < 2; ++q) {
241 auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;
242 auto* queue = &queue_[use_pri];
243 while (!queue->empty()) {
244 auto* next_req = queue->front();
245 if (available_bytes_ < next_req->request_bytes) {
246 // avoid starvation
247 next_req->request_bytes -= available_bytes_;
248 available_bytes_ = 0;
249 break;
250 }
251 available_bytes_ -= next_req->request_bytes;
252 next_req->request_bytes = 0;
253 total_bytes_through_[use_pri] += next_req->bytes;
254 queue->pop_front();
255
256 next_req->granted = true;
257 if (next_req != leader_) {
258 // Quota granted, signal the thread
259 next_req->cv.Signal();
260 }
261 }
262 }
263}
264
265int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
266 int64_t rate_bytes_per_sec) {
267 if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) {
268 // Avoid unexpected result in the overflow case. The result now is still
269 // inaccurate but is a number that is large enough.
270 return port::kMaxInt64 / 1000000;
271 } else {
272 return std::max(kMinRefillBytesPerPeriod,
273 rate_bytes_per_sec * refill_period_us_ / 1000000);
274 }
275}
276
11fdf7f2
TL
277Status GenericRateLimiter::Tune() {
278 const int kLowWatermarkPct = 50;
279 const int kHighWatermarkPct = 90;
280 const int kAdjustFactorPct = 5;
281 // computed rate limit will be in
282 // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
283 const int kAllowedRangeFactor = 20;
284
285 std::chrono::microseconds prev_tuned_time = tuned_time_;
286 tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));
287
288 int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
289 std::chrono::microseconds(refill_period_us_) -
290 std::chrono::microseconds(1)) /
291 std::chrono::microseconds(refill_period_us_);
292 // We tune every kRefillsPerTune intervals, so the overflow and division-by-
293 // zero conditions should never happen.
294 assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
295 assert(elapsed_intervals > 0);
296 int64_t drained_pct =
297 (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;
298
299 int64_t prev_bytes_per_sec = GetBytesPerSecond();
300 int64_t new_bytes_per_sec;
301 if (drained_pct == 0) {
302 new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
303 } else if (drained_pct < kLowWatermarkPct) {
304 // sanitize to prevent overflow
305 int64_t sanitized_prev_bytes_per_sec =
306 std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
307 new_bytes_per_sec =
308 std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
309 sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
310 } else if (drained_pct > kHighWatermarkPct) {
311 // sanitize to prevent overflow
312 int64_t sanitized_prev_bytes_per_sec = std::min(
313 prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
314 new_bytes_per_sec =
315 std::min(max_bytes_per_sec_,
316 sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
317 } else {
318 new_bytes_per_sec = prev_bytes_per_sec;
319 }
320 if (new_bytes_per_sec != prev_bytes_per_sec) {
321 SetBytesPerSecond(new_bytes_per_sec);
322 }
323 num_drains_ = prev_num_drains_;
324 return Status::OK();
325}
326
7c673cae 327RateLimiter* NewGenericRateLimiter(
11fdf7f2
TL
328 int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
329 int32_t fairness /* = 10 */,
330 RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
331 bool auto_tuned /* = false */) {
7c673cae
FG
332 assert(rate_bytes_per_sec > 0);
333 assert(refill_period_us > 0);
334 assert(fairness > 0);
11fdf7f2
TL
335 return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
336 mode, Env::Default(), auto_tuned);
7c673cae
FG
337}
338
339} // namespace rocksdb