]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/write_controller.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / write_controller.cc
index 5480aabd149b7a100b27a072f41ce2e64970835b..c5f7443752f7342dc5e27109920da8158d33c88d 100644 (file)
@@ -5,10 +5,12 @@
 
 #include "db/write_controller.h"
 
+#include <algorithm>
 #include <atomic>
 #include <cassert>
 #include <ratio>
-#include "rocksdb/env.h"
+
+#include "rocksdb/system_clock.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -19,10 +21,14 @@ std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
 
 std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
     uint64_t write_rate) {
-  total_delayed_++;
-  // Reset counters.
-  last_refill_time_ = 0;
-  bytes_left_ = 0;
+  if (0 == total_delayed_++) {
+    // Starting delay, so reset counters.
+    next_refill_time_ = 0;
+    credit_in_bytes_ = 0;
+  }
+  // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in
+  // next_refill_time_ will be based on an old rate. This rate will apply
+  // for subsequent additional debts and for the next refill.
   set_delayed_write_rate(write_rate);
   return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
 }
@@ -42,7 +48,7 @@ bool WriteController::IsStopped() const {
 // If it turns out to be a performance issue, we can redesign the thread
 // synchronization model here.
 // The function trust caller will sleep micros returned.
-uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
+uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
   if (total_stopped_.load(std::memory_order_relaxed) > 0) {
     return 0;
   }
@@ -50,64 +56,51 @@ uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
     return 0;
   }
 
-  const uint64_t kMicrosPerSecond = 1000000;
-  const uint64_t kRefillInterval = 1024U;
-
-  if (bytes_left_ >= num_bytes) {
-    bytes_left_ -= num_bytes;
+  if (credit_in_bytes_ >= num_bytes) {
+    credit_in_bytes_ -= num_bytes;
     return 0;
   }
   // The frequency to get time inside DB mutex is less than one per refill
   // interval.
-  auto time_now = NowMicrosMonotonic(env);
-
-  uint64_t sleep_debt = 0;
-  uint64_t time_since_last_refill = 0;
-  if (last_refill_time_ != 0) {
-    if (last_refill_time_ > time_now) {
-      sleep_debt = last_refill_time_ - time_now;
-    } else {
-      time_since_last_refill = time_now - last_refill_time_;
-      bytes_left_ +=
-          static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
-                                kMicrosPerSecond * delayed_write_rate_);
-      if (time_since_last_refill >= kRefillInterval &&
-          bytes_left_ > num_bytes) {
-        // If refill interval already passed and we have enough bytes
-        // return without extra sleeping.
-        last_refill_time_ = time_now;
-        bytes_left_ -= num_bytes;
-        return 0;
-      }
+  auto time_now = NowMicrosMonotonic(clock);
+
+  const uint64_t kMicrosPerSecond = 1000000;
+  // Refill every 1 ms
+  const uint64_t kMicrosPerRefill = 1000;
+
+  if (next_refill_time_ == 0) {
+    // Start with an initial allotment of bytes for one interval
+    next_refill_time_ = time_now;
+  }
+  if (next_refill_time_ <= time_now) {
+    // Refill based on time interval plus any extra elapsed
+    uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill;
+    credit_in_bytes_ += static_cast<uint64_t>(
+        1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999);
+    next_refill_time_ = time_now + kMicrosPerRefill;
+
+    if (credit_in_bytes_ >= num_bytes) {
+      // Avoid delay if possible, to reduce DB mutex release & re-aquire.
+      credit_in_bytes_ -= num_bytes;
+      return 0;
     }
   }
 
-  uint64_t single_refill_amount =
-      delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
-  if (bytes_left_ + single_refill_amount >= num_bytes) {
-    // Wait until a refill interval
-    // Never trigger expire for less than one refill interval to avoid to get
-    // time.
-    bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
-    last_refill_time_ = time_now + kRefillInterval;
-    return kRefillInterval + sleep_debt;
-  }
+  // We need to delay to avoid exceeding write rate.
+  assert(num_bytes > credit_in_bytes_);
+  uint64_t bytes_over_budget = num_bytes - credit_in_bytes_;
+  uint64_t needed_delay = static_cast<uint64_t>(
+      1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond);
+
+  credit_in_bytes_ = 0;
+  next_refill_time_ += needed_delay;
 
-  // Need to refill more than one interval. Need to sleep longer. Check
-  // whether expiration will hit
-
-  // Sleep just until `num_bytes` is allowed.
-  uint64_t sleep_amount =
-      static_cast<uint64_t>(num_bytes /
-                            static_cast<long double>(delayed_write_rate_) *
-                            kMicrosPerSecond) +
-      sleep_debt;
-  last_refill_time_ = time_now + sleep_amount;
-  return sleep_amount;
+  // Minimum delay of refill interval, to reduce DB mutex contention.
+  return std::max(next_refill_time_ - time_now, kMicrosPerRefill);
 }
 
-uint64_t WriteController::NowMicrosMonotonic(Env* env) {
-  return env->NowNanos() / std::milli::den;
+uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
+  return clock->NowNanos() / std::milli::den;
 }
 
 StopWriteToken::~StopWriteToken() {