// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/rate_limiter.h"
+
+#include <algorithm>
+
#include "monitoring/statistics.h"
#include "port/port.h"
-#include "rocksdb/env.h"
+#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/aligned_buffer.h"
namespace ROCKSDB_NAMESPACE {
-
size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
Env::IOPriority io_priority, Statistics* stats,
RateLimiter::OpType op_type) {
if (alignment > 0) {
// Here we may actually require more than burst and block
- // but we can not write less than one page at a time on direct I/O
- // thus we may want not to use ratelimiter
+ // as we can not write/read less than one page at a time on direct I/O
+ // thus we do not want to be strictly constrained by burst
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
Request(bytes, io_priority, stats, op_type);
bool granted;
};
-GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
- int64_t refill_period_us,
- int32_t fairness, RateLimiter::Mode mode,
- Env* env, bool auto_tuned)
+GenericRateLimiter::GenericRateLimiter(
+ int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
+ RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
+ bool auto_tuned)
: RateLimiter(mode),
refill_period_us_(refill_period_us),
rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
: rate_bytes_per_sec),
refill_bytes_per_period_(
- CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
- env_(env),
+ CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)),
+ clock_(clock),
stop_(false),
exit_cv_(&request_mutex_),
requests_to_wait_(0),
available_bytes_(0),
- next_refill_us_(NowMicrosMonotonic(env_)),
+ next_refill_us_(NowMicrosMonotonicLocked()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
- leader_(nullptr),
+ wait_until_refill_pending_(false),
auto_tuned_(auto_tuned),
num_drains_(0),
- prev_num_drains_(0),
max_bytes_per_sec_(rate_bytes_per_sec),
- tuned_time_(NowMicrosMonotonic(env_)) {
- total_requests_[0] = 0;
- total_requests_[1] = 0;
- total_bytes_through_[0] = 0;
- total_bytes_through_[1] = 0;
+ tuned_time_(NowMicrosMonotonicLocked()) {
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ total_requests_[i] = 0;
+ total_bytes_through_[i] = 0;
+ }
}
GenericRateLimiter::~GenericRateLimiter() {
MutexLock g(&request_mutex_);
stop_ = true;
- requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() +
- queue_[Env::IO_HIGH].size());
- for (auto& r : queue_[Env::IO_HIGH]) {
- r->cv.Signal();
+ std::deque<Req*>::size_type queues_size_sum = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ queues_size_sum += queue_[i].size();
}
- for (auto& r : queue_[Env::IO_LOW]) {
- r->cv.Signal();
+ requests_to_wait_ = static_cast<int32_t>(queues_size_sum);
+
+ for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
+ std::deque<Req*> queue = queue_[i];
+ for (auto& r : queue) {
+ r->cv.Signal();
+ }
}
+
while (requests_to_wait_ > 0) {
exit_cv_.Wait();
}
// This API allows user to dynamically change rate limiter's bytes per second.
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
+ MutexLock g(&request_mutex_);
+ SetBytesPerSecondLocked(bytes_per_second);
+}
+
+void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) {
assert(bytes_per_second > 0);
- rate_bytes_per_sec_ = bytes_per_second;
+ rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed);
refill_bytes_per_period_.store(
- CalculateRefillBytesPerPeriod(bytes_per_second),
+ CalculateRefillBytesPerPeriodLocked(bytes_per_second),
std::memory_order_relaxed);
}
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
Statistics* stats) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
+ bytes = std::max(static_cast<int64_t>(0), bytes);
TEST_SYNC_POINT("GenericRateLimiter::Request");
TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
&rate_bytes_per_sec_);
if (auto_tuned_) {
static const int kRefillsPerTune = 100;
- std::chrono::microseconds now(NowMicrosMonotonic(env_));
+ std::chrono::microseconds now(NowMicrosMonotonicLocked());
if (now - tuned_time_ >=
kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
- Tune();
+ Status s = TuneLocked();
+ s.PermitUncheckedError(); //**TODO: What to do on error?
}
}
if (stop_) {
+ // It is now in the clean-up of ~GenericRateLimiter().
+ // Therefore any new incoming request will exit from here
+ // and not get satiesfied.
return;
}
// Request cannot be satisfied at this moment, enqueue
Req r(bytes, &request_mutex_);
queue_[pri].push_back(&r);
-
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
+ &request_mutex_);
+ // A thread representing a queued request coordinates with other such threads.
+ // There are two main duties.
+ //
+ // (1) Waiting for the next refill time.
+ // (2) Refilling the bytes and granting requests.
do {
- bool timedout = false;
- // Leader election, candidates can be:
- // (1) a new incoming request,
- // (2) a previous leader, whose quota has not been not assigned yet due
- // to lower priority
- // (3) a previous waiter at the front of queue, who got notified by
- // previous leader
- if (leader_ == nullptr &&
- ((!queue_[Env::IO_HIGH].empty() &&
- &r == queue_[Env::IO_HIGH].front()) ||
- (!queue_[Env::IO_LOW].empty() &&
- &r == queue_[Env::IO_LOW].front()))) {
- leader_ = &r;
- int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_);
- delta = delta > 0 ? delta : 0;
- if (delta == 0) {
- timedout = true;
+ int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked();
+ if (time_until_refill_us > 0) {
+ if (wait_until_refill_pending_) {
+ // Somebody is performing (1). Trust we'll be woken up when our request
+ // is granted or we are needed for future duties.
+ r.cv.Wait();
} else {
- int64_t wait_until = env_->NowMicros() + delta;
+ // Whichever thread reaches here first performs duty (1) as described
+ // above.
+ int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
++num_drains_;
- timedout = r.cv.TimedWait(wait_until);
+ wait_until_refill_pending_ = true;
+ r.cv.TimedWait(wait_until);
+ TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
+ &time_until_refill_us);
+ wait_until_refill_pending_ = false;
}
} else {
- // Not at the front of queue or an leader has already been elected
- r.cv.Wait();
- }
-
- // request_mutex_ is held from now on
- if (stop_) {
- --requests_to_wait_;
- exit_cv_.Signal();
- return;
- }
-
- // Make sure the waken up request is always the header of its queue
- assert(r.granted ||
- (!queue_[Env::IO_HIGH].empty() &&
- &r == queue_[Env::IO_HIGH].front()) ||
- (!queue_[Env::IO_LOW].empty() &&
- &r == queue_[Env::IO_LOW].front()));
- assert(leader_ == nullptr ||
- (!queue_[Env::IO_HIGH].empty() &&
- leader_ == queue_[Env::IO_HIGH].front()) ||
- (!queue_[Env::IO_LOW].empty() &&
- leader_ == queue_[Env::IO_LOW].front()));
-
- if (leader_ == &r) {
- // Waken up from TimedWait()
- if (timedout) {
- // Time to do refill!
- Refill();
-
- // Re-elect a new leader regardless. This is to simplify the
- // election handling.
- leader_ = nullptr;
-
- // Notify the header of queue if current leader is going away
- if (r.granted) {
- // Current leader already got granted with quota. Notify header
- // of waiting queue to participate next round of election.
- assert((queue_[Env::IO_HIGH].empty() ||
- &r != queue_[Env::IO_HIGH].front()) &&
- (queue_[Env::IO_LOW].empty() ||
- &r != queue_[Env::IO_LOW].front()));
- if (!queue_[Env::IO_HIGH].empty()) {
- queue_[Env::IO_HIGH].front()->cv.Signal();
- } else if (!queue_[Env::IO_LOW].empty()) {
- queue_[Env::IO_LOW].front()->cv.Signal();
+ // Whichever thread reaches here first performs duty (2) as described
+ // above.
+ RefillBytesAndGrantRequestsLocked();
+ if (r.granted) {
+ // If there is any remaining requests, make sure there exists at least
+ // one candidate is awake for future duties by signaling a front request
+ // of a queue.
+ for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
+ std::deque<Req*> queue = queue_[i];
+ if (!queue.empty()) {
+ queue.front()->cv.Signal();
+ break;
}
- // Done
- break;
}
- } else {
- // Spontaneous wake up, need to continue to wait
- assert(!r.granted);
- leader_ = nullptr;
}
+ }
+ // Invariant: non-granted request is always in one queue, and granted
+ // request is always in zero queues.
+#ifndef NDEBUG
+ int num_found = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ if (std::find(queue_[i].begin(), queue_[i].end(), &r) !=
+ queue_[i].end()) {
+ ++num_found;
+ }
+ }
+ if (r.granted) {
+ assert(num_found == 0);
} else {
- // Waken up by previous leader:
- // (1) if requested quota is granted, it is done.
- // (2) if requested quota is not granted, this means current thread
- // was picked as a new leader candidate (previous leader got quota).
- // It needs to participate leader election because a new request may
- // come in before this thread gets waken up. So it may actually need
- // to do Wait() again.
- assert(!timedout);
+ assert(num_found == 1);
}
- } while (!r.granted);
+#endif // NDEBUG
+ } while (!stop_ && !r.granted);
+
+ if (stop_) {
+ // It is now in the clean-up of ~GenericRateLimiter().
+ // Therefore any woken-up request will have come out of the loop and then
+ // exit here. It might or might not have been satisfied.
+ --requests_to_wait_;
+ exit_cv_.Signal();
+ }
+}
+
+std::vector<Env::IOPriority>
+GenericRateLimiter::GeneratePriorityIterationOrderLocked() {
+ std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
+ // We make Env::IO_USER a superior priority by always iterating its queue
+ // first
+ pri_iteration_order[0] = Env::IO_USER;
+
+ bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForHighPri",
+ &high_pri_iterated_after_mid_low_pri);
+ bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForMidPri",
+ &mid_pri_itereated_after_low_pri);
+
+ if (high_pri_iterated_after_mid_low_pri) {
+ pri_iteration_order[3] = Env::IO_HIGH;
+ pri_iteration_order[2] =
+ mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
+ pri_iteration_order[1] =
+ (pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
+ } else {
+ pri_iteration_order[1] = Env::IO_HIGH;
+ pri_iteration_order[3] =
+ mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
+ pri_iteration_order[2] =
+ (pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
+ }
+
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PreReturnPriIterationOrder",
+ &pri_iteration_order);
+ return pri_iteration_order;
}
-void GenericRateLimiter::Refill() {
- TEST_SYNC_POINT("GenericRateLimiter::Refill");
- next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_;
+void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() {
+ TEST_SYNC_POINT_CALLBACK(
+ "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_);
+ next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed);
available_bytes_ += refill_bytes_per_period;
}
- int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
- for (int q = 0; q < 2; ++q) {
- auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;
- auto* queue = &queue_[use_pri];
+ std::vector<Env::IOPriority> pri_iteration_order =
+ GeneratePriorityIterationOrderLocked();
+
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ assert(!pri_iteration_order.empty());
+ Env::IOPriority current_pri = pri_iteration_order[i];
+ auto* queue = &queue_[current_pri];
while (!queue->empty()) {
auto* next_req = queue->front();
if (available_bytes_ < next_req->request_bytes) {
- // avoid starvation
+ // Grant partial request_bytes to avoid starvation of requests
+ // that become asking for more bytes than available_bytes_
+ // due to dynamically reduced rate limiter's bytes_per_second that
+ // leads to reduced refill_bytes_per_period hence available_bytes_
next_req->request_bytes -= available_bytes_;
available_bytes_ = 0;
break;
}
available_bytes_ -= next_req->request_bytes;
next_req->request_bytes = 0;
- total_bytes_through_[use_pri] += next_req->bytes;
+ total_bytes_through_[current_pri] += next_req->bytes;
queue->pop_front();
next_req->granted = true;
- if (next_req != leader_) {
- // Quota granted, signal the thread
- next_req->cv.Signal();
- }
+ // Quota granted, signal the thread to exit
+ next_req->cv.Signal();
}
}
}
-int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
+int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked(
int64_t rate_bytes_per_sec) {
- if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) {
+ if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
+ refill_period_us_) {
// Avoid unexpected result in the overflow case. The result now is still
// inaccurate but is a number that is large enough.
- return port::kMaxInt64 / 1000000;
+ return std::numeric_limits<int64_t>::max() / 1000000;
} else {
- return std::max(kMinRefillBytesPerPeriod,
- rate_bytes_per_sec * refill_period_us_ / 1000000);
+ return rate_bytes_per_sec * refill_period_us_ / 1000000;
}
}
-Status GenericRateLimiter::Tune() {
+Status GenericRateLimiter::TuneLocked() {
const int kLowWatermarkPct = 50;
const int kHighWatermarkPct = 90;
const int kAdjustFactorPct = 5;
const int kAllowedRangeFactor = 20;
std::chrono::microseconds prev_tuned_time = tuned_time_;
- tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));
+ tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked());
int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
std::chrono::microseconds(refill_period_us_) -
std::chrono::microseconds(refill_period_us_);
// We tune every kRefillsPerTune intervals, so the overflow and division-by-
// zero conditions should never happen.
- assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
+ assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100);
assert(elapsed_intervals > 0);
- int64_t drained_pct =
- (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;
+ int64_t drained_pct = num_drains_ * 100 / elapsed_intervals;
int64_t prev_bytes_per_sec = GetBytesPerSecond();
int64_t new_bytes_per_sec;
} else if (drained_pct < kLowWatermarkPct) {
// sanitize to prevent overflow
int64_t sanitized_prev_bytes_per_sec =
- std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
+ std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100);
new_bytes_per_sec =
std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
} else if (drained_pct > kHighWatermarkPct) {
// sanitize to prevent overflow
- int64_t sanitized_prev_bytes_per_sec = std::min(
- prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
+ int64_t sanitized_prev_bytes_per_sec =
+ std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() /
+ (100 + kAdjustFactorPct));
new_bytes_per_sec =
std::min(max_bytes_per_sec_,
sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
new_bytes_per_sec = prev_bytes_per_sec;
}
if (new_bytes_per_sec != prev_bytes_per_sec) {
- SetBytesPerSecond(new_bytes_per_sec);
+ SetBytesPerSecondLocked(new_bytes_per_sec);
}
- num_drains_ = prev_num_drains_;
+ num_drains_ = 0;
return Status::OK();
}
assert(rate_bytes_per_sec > 0);
assert(refill_period_us > 0);
assert(fairness > 0);
- return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
- mode, Env::Default(), auto_tuned);
+ std::unique_ptr<RateLimiter> limiter(
+ new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
+ mode, SystemClock::Default(), auto_tuned));
+ return limiter.release();
}
} // namespace ROCKSDB_NAMESPACE