#include "db/write_controller.h"
+#include <algorithm>
#include <atomic>
#include <cassert>
#include <ratio>
-#include "rocksdb/env.h"
+
+#include "rocksdb/system_clock.h"
namespace ROCKSDB_NAMESPACE {
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
uint64_t write_rate) {
- total_delayed_++;
- // Reset counters.
- last_refill_time_ = 0;
- bytes_left_ = 0;
+ if (0 == total_delayed_++) {
+ // Starting delay, so reset counters.
+ next_refill_time_ = 0;
+ credit_in_bytes_ = 0;
+ }
+ // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in
+ // next_refill_time_ will be based on an old rate. This rate will apply
+ // for subsequent additional debts and for the next refill.
set_delayed_write_rate(write_rate);
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
}
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
-uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
+uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
return 0;
}
return 0;
}
- const uint64_t kMicrosPerSecond = 1000000;
- const uint64_t kRefillInterval = 1024U;
-
- if (bytes_left_ >= num_bytes) {
- bytes_left_ -= num_bytes;
+ if (credit_in_bytes_ >= num_bytes) {
+ credit_in_bytes_ -= num_bytes;
return 0;
}
// The frequency to get time inside DB mutex is less than one per refill
// interval.
- auto time_now = NowMicrosMonotonic(env);
-
- uint64_t sleep_debt = 0;
- uint64_t time_since_last_refill = 0;
- if (last_refill_time_ != 0) {
- if (last_refill_time_ > time_now) {
- sleep_debt = last_refill_time_ - time_now;
- } else {
- time_since_last_refill = time_now - last_refill_time_;
- bytes_left_ +=
- static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
- kMicrosPerSecond * delayed_write_rate_);
- if (time_since_last_refill >= kRefillInterval &&
- bytes_left_ > num_bytes) {
- // If refill interval already passed and we have enough bytes
- // return without extra sleeping.
- last_refill_time_ = time_now;
- bytes_left_ -= num_bytes;
- return 0;
- }
+ auto time_now = NowMicrosMonotonic(clock);
+
+ const uint64_t kMicrosPerSecond = 1000000;
+ // Refill every 1 ms
+ const uint64_t kMicrosPerRefill = 1000;
+
+ if (next_refill_time_ == 0) {
+ // Start with an initial allotment of bytes for one interval
+ next_refill_time_ = time_now;
+ }
+ if (next_refill_time_ <= time_now) {
+ // Refill based on time interval plus any extra elapsed
+ uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill;
+ credit_in_bytes_ += static_cast<uint64_t>(
+ 1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999);
+ next_refill_time_ = time_now + kMicrosPerRefill;
+
+ if (credit_in_bytes_ >= num_bytes) {
+ // Avoid delay if possible, to reduce DB mutex release & re-aquire.
+ credit_in_bytes_ -= num_bytes;
+ return 0;
}
}
- uint64_t single_refill_amount =
- delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
- if (bytes_left_ + single_refill_amount >= num_bytes) {
- // Wait until a refill interval
- // Never trigger expire for less than one refill interval to avoid to get
- // time.
- bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
- last_refill_time_ = time_now + kRefillInterval;
- return kRefillInterval + sleep_debt;
- }
+ // We need to delay to avoid exceeding write rate.
+ assert(num_bytes > credit_in_bytes_);
+ uint64_t bytes_over_budget = num_bytes - credit_in_bytes_;
+ uint64_t needed_delay = static_cast<uint64_t>(
+ 1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond);
+
+ credit_in_bytes_ = 0;
+ next_refill_time_ += needed_delay;
- // Need to refill more than one interval. Need to sleep longer. Check
- // whether expiration will hit
-
- // Sleep just until `num_bytes` is allowed.
- uint64_t sleep_amount =
- static_cast<uint64_t>(num_bytes /
- static_cast<long double>(delayed_write_rate_) *
- kMicrosPerSecond) +
- sleep_debt;
- last_refill_time_ = time_now + sleep_amount;
- return sleep_amount;
+ // Minimum delay of refill interval, to reduce DB mutex contention.
+ return std::max(next_refill_time_ - time_now, kMicrosPerRefill);
}
-uint64_t WriteController::NowMicrosMonotonic(Env* env) {
- return env->NowNanos() / std::milli::den;
+uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
+ return clock->NowNanos() / std::milli::den;
}
StopWriteToken::~StopWriteToken() {