]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/util/rate_limiter.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / rate_limiter.h
index 5f047a567712938c7dd34c2dc27c19f262d3d6cc..4c078f5a0eef0d026652fc67e7164bf07f22fb6d 100644 (file)
 #include <atomic>
 #include <chrono>
 #include <deque>
+
 #include "port/port.h"
 #include "rocksdb/env.h"
 #include "rocksdb/rate_limiter.h"
+#include "rocksdb/status.h"
+#include "rocksdb/system_clock.h"
 #include "util/mutexlock.h"
 #include "util/random.h"
 
@@ -24,7 +27,8 @@ namespace ROCKSDB_NAMESPACE {
 class GenericRateLimiter : public RateLimiter {
  public:
   GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
-                     int32_t fairness, RateLimiter::Mode mode, Env* env,
+                     int32_t fairness, RateLimiter::Mode mode,
+                     const std::shared_ptr<SystemClock>& clock,
                      bool auto_tuned);
 
   virtual ~GenericRateLimiter();
@@ -34,7 +38,8 @@ class GenericRateLimiter : public RateLimiter {
 
   // Request for token to write bytes. If this request can not be satisfied,
   // the call is blocked. Caller is responsible to make sure
-  // bytes <= GetSingleBurstBytes()
+  // bytes <= GetSingleBurstBytes() and bytes >= 0. Negative bytes
+  // passed in will be rounded up to 0.
   using RateLimiter::Request;
   virtual void Request(const int64_t bytes, const Env::IOPriority pri,
                        Statistics* stats) override;
@@ -47,8 +52,11 @@ class GenericRateLimiter : public RateLimiter {
       const Env::IOPriority pri = Env::IO_TOTAL) const override {
     MutexLock g(&request_mutex_);
     if (pri == Env::IO_TOTAL) {
-      return total_bytes_through_[Env::IO_LOW] +
-             total_bytes_through_[Env::IO_HIGH];
+      int64_t total_bytes_through_sum = 0;
+      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+        total_bytes_through_sum += total_bytes_through_[i];
+      }
+      return total_bytes_through_sum;
     }
     return total_bytes_through_[pri];
   }
@@ -57,35 +65,61 @@ class GenericRateLimiter : public RateLimiter {
       const Env::IOPriority pri = Env::IO_TOTAL) const override {
     MutexLock g(&request_mutex_);
     if (pri == Env::IO_TOTAL) {
-      return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH];
+      int64_t total_requests_sum = 0;
+      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+        total_requests_sum += total_requests_[i];
+      }
+      return total_requests_sum;
     }
     return total_requests_[pri];
   }
 
+  virtual Status GetTotalPendingRequests(
+      int64_t* total_pending_requests,
+      const Env::IOPriority pri = Env::IO_TOTAL) const override {
+    assert(total_pending_requests != nullptr);
+    MutexLock g(&request_mutex_);
+    if (pri == Env::IO_TOTAL) {
+      int64_t total_pending_requests_sum = 0;
+      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+        total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
+      }
+      *total_pending_requests = total_pending_requests_sum;
+    } else {
+      *total_pending_requests = static_cast<int64_t>(queue_[pri].size());
+    }
+    return Status::OK();
+  }
+
   virtual int64_t GetBytesPerSecond() const override {
-    return rate_bytes_per_sec_;
+    return rate_bytes_per_sec_.load(std::memory_order_relaxed);
   }
 
- private:
-  void Refill();
-  int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
-  Status Tune();
+  virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
+    MutexLock g(&request_mutex_);
+    clock_ = std::move(clock);
+    next_refill_us_ = NowMicrosMonotonicLocked();
+  }
 
-  uint64_t NowMicrosMonotonic(Env* env) {
-    return env->NowNanos() / std::milli::den;
+ private:
+  void RefillBytesAndGrantRequestsLocked();
+  std::vector<Env::IOPriority> GeneratePriorityIterationOrderLocked();
+  int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec);
+  Status TuneLocked();
+  void SetBytesPerSecondLocked(int64_t bytes_per_second);
+
+  uint64_t NowMicrosMonotonicLocked() {
+    return clock_->NowNanos() / std::milli::den;
   }
 
   // This mutex guard all internal states
   mutable port::Mutex request_mutex_;
 
-  const int64_t kMinRefillBytesPerPeriod = 100;
-
   const int64_t refill_period_us_;
 
-  int64_t rate_bytes_per_sec_;
-  // This variable can be changed dynamically.
+  std::atomic<int64_t> rate_bytes_per_sec_;
   std::atomic<int64_t> refill_bytes_per_period_;
-  Env* const env_;
+  std::shared_ptr<SystemClock> clock_;
 
   bool stop_;
   port::CondVar exit_cv_;
@@ -100,12 +134,11 @@ class GenericRateLimiter : public RateLimiter {
   Random rnd_;
 
   struct Req;
-  Req* leader_;
   std::deque<Req*> queue_[Env::IO_TOTAL];
+  bool wait_until_refill_pending_;
 
   bool auto_tuned_;
   int64_t num_drains_;
-  int64_t prev_num_drains_;
   const int64_t max_bytes_per_sec_;
   std::chrono::microseconds tuned_time_;
 };