]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/util/rate_limiter_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / util / rate_limiter_test.cc
index 228837aa6cbbae0846b31f639b6e7266f2890441..cda134867591322192b9f832d10f21f626ff5fcd 100644 (file)
 
 #include <chrono>
 #include <cinttypes>
+#include <cstdint>
 #include <limits>
 
 #include "db/db_test_util.h"
-#include "rocksdb/env.h"
+#include "port/port.h"
+#include "rocksdb/system_clock.h"
 #include "test_util/sync_point.h"
 #include "test_util/testharness.h"
 #include "util/random.h"
 namespace ROCKSDB_NAMESPACE {
 
 // TODO(yhchiang): the rate will not be accurate when we run test in parallel.
-class RateLimiterTest : public testing::Test {};
+class RateLimiterTest : public testing::Test {
+ protected:
+  ~RateLimiterTest() override {
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearAllCallBacks();
+  }
+};
 
 TEST_F(RateLimiterTest, OverflowRate) {
-  GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
-                             RateLimiter::Mode::kWritesOnly, Env::Default(),
-                             false /* auto_tuned */);
+  GenericRateLimiter limiter(std::numeric_limits<int64_t>::max(), 1000, 10,
+                             RateLimiter::Mode::kWritesOnly,
+                             SystemClock::Default(), false /* auto_tuned */);
   ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
 }
 
@@ -35,12 +43,127 @@ TEST_F(RateLimiterTest, StartStop) {
   std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
 }
 
+TEST_F(RateLimiterTest, GetTotalBytesThrough) {
+  std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+      200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+      10 /* fairness */));
+  for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+    ASSERT_EQ(limiter->GetTotalBytesThrough(static_cast<Env::IOPriority>(i)),
+              0);
+  }
+
+  std::int64_t request_byte = 200;
+  std::int64_t request_byte_sum = 0;
+  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+    limiter->Request(request_byte, static_cast<Env::IOPriority>(i),
+                     nullptr /* stats */, RateLimiter::OpType::kWrite);
+    request_byte_sum += request_byte;
+  }
+
+  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+    EXPECT_EQ(limiter->GetTotalBytesThrough(static_cast<Env::IOPriority>(i)),
+              request_byte)
+        << "Failed to track total_bytes_through_ correctly when IOPriority = "
+        << static_cast<Env::IOPriority>(i);
+  }
+  EXPECT_EQ(limiter->GetTotalBytesThrough(Env::IO_TOTAL), request_byte_sum)
+      << "Failed to track total_bytes_through_ correctly when IOPriority = "
+         "Env::IO_TOTAL";
+}
+
+TEST_F(RateLimiterTest, GetTotalRequests) {
+  std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+      200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+      10 /* fairness */));
+  for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+    ASSERT_EQ(limiter->GetTotalRequests(static_cast<Env::IOPriority>(i)), 0);
+  }
+
+  std::int64_t total_requests_sum = 0;
+  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+    limiter->Request(200, static_cast<Env::IOPriority>(i), nullptr /* stats */,
+                     RateLimiter::OpType::kWrite);
+    total_requests_sum += 1;
+  }
+
+  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+    EXPECT_EQ(limiter->GetTotalRequests(static_cast<Env::IOPriority>(i)), 1)
+        << "Failed to track total_requests_ correctly when IOPriority = "
+        << static_cast<Env::IOPriority>(i);
+  }
+  EXPECT_EQ(limiter->GetTotalRequests(Env::IO_TOTAL), total_requests_sum)
+      << "Failed to track total_requests_ correctly when IOPriority = "
+         "Env::IO_TOTAL";
+}
+
+TEST_F(RateLimiterTest, GetTotalPendingRequests) {
+  std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+      200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+      10 /* fairness */));
+  int64_t total_pending_requests = 0;
+  for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+    ASSERT_OK(limiter->GetTotalPendingRequests(
+        &total_pending_requests, static_cast<Env::IOPriority>(i)));
+    ASSERT_EQ(total_pending_requests, 0);
+  }
+  // This is a variable for making sure the following callback is called
+  // and the assertions in it are indeed excuted
+  bool nonzero_pending_requests_verified = false;
+  SyncPoint::GetInstance()->SetCallBack(
+      "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
+        port::Mutex* request_mutex = (port::Mutex*)arg;
+        // We temporarily unlock the mutex so that the following
+        // GetTotalPendingRequests() can acquire it
+        request_mutex->Unlock();
+        for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+          EXPECT_OK(limiter->GetTotalPendingRequests(
+              &total_pending_requests, static_cast<Env::IOPriority>(i)))
+              << "Failed to return total pending requests for priority level = "
+              << static_cast<Env::IOPriority>(i);
+          if (i == Env::IO_USER || i == Env::IO_TOTAL) {
+            EXPECT_EQ(total_pending_requests, 1)
+                << "Failed to correctly return total pending requests for "
+                   "priority level = "
+                << static_cast<Env::IOPriority>(i);
+          } else {
+            EXPECT_EQ(total_pending_requests, 0)
+                << "Failed to correctly return total pending requests for "
+                   "priority level = "
+                << static_cast<Env::IOPriority>(i);
+          }
+        }
+        // We lock the mutex again so that the request thread can resume running
+        // with the mutex locked
+        request_mutex->Lock();
+        nonzero_pending_requests_verified = true;
+      });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+  limiter->Request(200, Env::IO_USER, nullptr /* stats */,
+                   RateLimiter::OpType::kWrite);
+  ASSERT_EQ(nonzero_pending_requests_verified, true);
+  for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+    EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
+                                               static_cast<Env::IOPriority>(i)))
+        << "Failed to return total pending requests for priority level = "
+        << static_cast<Env::IOPriority>(i);
+    EXPECT_EQ(total_pending_requests, 0)
+        << "Failed to correctly return total pending requests for priority "
+           "level = "
+        << static_cast<Env::IOPriority>(i);
+  }
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearCallBack(
+      "GenericRateLimiter::Request:PostEnqueueRequest");
+}
+
 TEST_F(RateLimiterTest, Modes) {
   for (auto mode : {RateLimiter::Mode::kWritesOnly,
                     RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
-    GenericRateLimiter limiter(
-        2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
-        10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
+    GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */,
+                               1000 * 1000 /* refill_period_us */,
+                               10 /* fairness */, mode, SystemClock::Default(),
+                               false /* auto_tuned */);
     limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
                     RateLimiter::OpType::kRead);
     if (mode == RateLimiter::Mode::kWritesOnly) {
@@ -59,13 +182,93 @@ TEST_F(RateLimiterTest, Modes) {
   }
 }
 
-#if !((defined(TRAVIS) || defined(CIRCLECI)) && defined(OS_MACOSX))
+TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
+  std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+      200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+      10 /* fairness */));
+
+  bool possible_random_one_in_fairness_results_for_high_mid_pri[4][2] = {
+      {false, false}, {false, true}, {true, false}, {true, true}};
+  std::vector<Env::IOPriority> possible_priority_iteration_orders[4] = {
+      {Env::IO_USER, Env::IO_HIGH, Env::IO_MID, Env::IO_LOW},
+      {Env::IO_USER, Env::IO_HIGH, Env::IO_LOW, Env::IO_MID},
+      {Env::IO_USER, Env::IO_MID, Env::IO_LOW, Env::IO_HIGH},
+      {Env::IO_USER, Env::IO_LOW, Env::IO_MID, Env::IO_HIGH}};
+
+  for (int i = 0; i < 4; ++i) {
+    // These are variables for making sure the following callbacks are called
+    // and the assertion in the last callback is indeed excuted
+    bool high_pri_iterated_after_mid_low_pri_set = false;
+    bool mid_pri_itereated_after_low_pri_set = false;
+    bool pri_iteration_order_verified = false;
+    SyncPoint::GetInstance()->SetCallBack(
+        "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+        "PostRandomOneInFairnessForHighPri",
+        [&](void* arg) {
+          bool* high_pri_iterated_after_mid_low_pri = (bool*)arg;
+          *high_pri_iterated_after_mid_low_pri =
+              possible_random_one_in_fairness_results_for_high_mid_pri[i][0];
+          high_pri_iterated_after_mid_low_pri_set = true;
+        });
+
+    SyncPoint::GetInstance()->SetCallBack(
+        "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+        "PostRandomOneInFairnessForMidPri",
+        [&](void* arg) {
+          bool* mid_pri_itereated_after_low_pri = (bool*)arg;
+          *mid_pri_itereated_after_low_pri =
+              possible_random_one_in_fairness_results_for_high_mid_pri[i][1];
+          mid_pri_itereated_after_low_pri_set = true;
+        });
+
+    SyncPoint::GetInstance()->SetCallBack(
+        "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+        "PreReturnPriIterationOrder",
+        [&](void* arg) {
+          std::vector<Env::IOPriority>* pri_iteration_order =
+              (std::vector<Env::IOPriority>*)arg;
+          EXPECT_EQ(*pri_iteration_order, possible_priority_iteration_orders[i])
+              << "Failed to generate priority iteration order correctly when "
+                 "high_pri_iterated_after_mid_low_pri = "
+              << possible_random_one_in_fairness_results_for_high_mid_pri[i][0]
+              << ", mid_pri_itereated_after_low_pri = "
+              << possible_random_one_in_fairness_results_for_high_mid_pri[i][1]
+              << std::endl;
+          pri_iteration_order_verified = true;
+        });
+
+    SyncPoint::GetInstance()->EnableProcessing();
+    limiter->Request(200 /* request max bytes to drain so that refill and order
+                           generation will be triggered every time
+                           GenericRateLimiter::Request() is called */
+                     ,
+                     Env::IO_USER, nullptr /* stats */,
+                     RateLimiter::OpType::kWrite);
+    ASSERT_EQ(high_pri_iterated_after_mid_low_pri_set, true);
+    ASSERT_EQ(mid_pri_itereated_after_low_pri_set, true);
+    ASSERT_EQ(pri_iteration_order_verified, true);
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->ClearCallBack(
+        "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+        "PreReturnPriIterationOrder");
+    SyncPoint::GetInstance()->ClearCallBack(
+        "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+        "PostRandomOneInFairnessForMidPri");
+    SyncPoint::GetInstance()->ClearCallBack(
+        "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+        "PostRandomOneInFairnessForHighPri");
+  }
+}
+
 TEST_F(RateLimiterTest, Rate) {
   auto* env = Env::Default();
   struct Arg {
     Arg(int32_t _target_rate, int _burst)
-        : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
-          request_size(_target_rate / 10),
+        : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */,
+                                        100 * 1000 /* refill_period_us */,
+                                        10 /* fairness */)),
+          request_size(_target_rate /
+                       10 /* refill period here is 1/10 second */),
           burst(_burst) {}
     std::unique_ptr<RateLimiter> limiter;
     int32_t request_size;
@@ -73,23 +276,39 @@ TEST_F(RateLimiterTest, Rate) {
   };
 
   auto writer = [](void* p) {
-    auto* thread_env = Env::Default();
+    const auto& thread_clock = SystemClock::Default();
     auto* arg = static_cast<Arg*>(p);
     // Test for 2 seconds
-    auto until = thread_env->NowMicros() + 2 * 1000000;
-    Random r((uint32_t)(thread_env->NowNanos() %
+    auto until = thread_clock->NowMicros() + 2 * 1000000;
+    Random r((uint32_t)(thread_clock->NowNanos() %
                         std::numeric_limits<uint32_t>::max()));
-    while (thread_env->NowMicros() < until) {
+    while (thread_clock->NowMicros() < until) {
+      for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst * 2) + 1); ++i) {
+        arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
+                              Env::IO_USER, nullptr /* stats */,
+                              RateLimiter::OpType::kWrite);
+      }
+
       for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
         arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
                               Env::IO_HIGH, nullptr /* stats */,
                               RateLimiter::OpType::kWrite);
       }
+
+      for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst / 2 + 1) + 1);
+           ++i) {
+        arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_MID,
+                              nullptr /* stats */, RateLimiter::OpType::kWrite);
+      }
+
       arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
                             nullptr /* stats */, RateLimiter::OpType::kWrite);
     }
   };
 
+  int samples = 0;
+  int samples_at_minimum = 0;
+
   for (int i = 1; i <= 16; i *= 2) {
     int32_t target = i * 1024 * 10;
     Arg arg(target, i / 4 + 1);
@@ -117,12 +336,29 @@ TEST_F(RateLimiterTest, Rate) {
               arg.request_size - 1, target / 1024, rate / 1024,
               elapsed / 1000000.0);
 
-      ASSERT_GE(rate / target, 0.80);
+      ++samples;
+      if (rate / target >= 0.80) {
+        ++samples_at_minimum;
+      }
       ASSERT_LE(rate / target, 1.25);
     }
   }
-}
+
+  // This can fail due to slow execution speed, like when using valgrind or in
+  // heavily loaded CI environments
+  bool skip_minimum_rate_check =
+#if (defined(CIRCLECI) && defined(OS_MACOSX)) || defined(ROCKSDB_VALGRIND_RUN)
+      true;
+#else
+      getenv("SANDCASTLE");
 #endif
+  if (skip_minimum_rate_check) {
+    fprintf(stderr, "Skipped minimum rate check (%d / %d passed)\n",
+            samples_at_minimum, samples);
+  } else {
+    ASSERT_EQ(samples_at_minimum, samples);
+  }
+}
 
 TEST_F(RateLimiterTest, LimitChangeTest) {
   // starvation test when limit changes to a smaller value
@@ -151,12 +387,15 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
       std::shared_ptr<RateLimiter> limiter =
           std::make_shared<GenericRateLimiter>(
               target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
-              Env::Default(), false /* auto_tuned */);
+              SystemClock::Default(), false /* auto_tuned */);
+      // After "GenericRateLimiter::Request:1" the mutex is held until the bytes
+      // are refilled. This test could be improved to change the limit when lock
+      // is released in `TimedWait()`.
       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
           {{"GenericRateLimiter::Request",
             "RateLimiterTest::LimitChangeTest:changeLimitStart"},
            {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
-            "GenericRateLimiter::Refill"}});
+            "GenericRateLimiter::Request:1"}});
       Arg arg(target, Env::IO_HIGH, limiter);
       // The idea behind is to start a request first, then before it refills,
       // update limit to a different value (2X/0.5X). No starvation should
@@ -174,6 +413,7 @@ TEST_F(RateLimiterTest, LimitChangeTest) {
               target / 1024, new_limit / 1024, refill_period / 1000);
     }
   }
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
 TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
@@ -186,16 +426,16 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
   std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
       1000 /* rate_bytes_per_sec */,
       std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
-      RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
+      RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(),
+      true /* auto_tuned */));
 
-  // Use callback to advance time because we need to advance (1) after Request()
-  // has determined the bytes are not available; and (2) before Refill()
-  // computes the next refill time (ensuring refill time in the future allows
-  // the next request to drain the rate limiter).
+  // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
+  // `Env` to advance its time according to the fake wait duration. The
+  // workaround is to install a callback that advance the `Env`'s mock time.
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
-      "GenericRateLimiter::Refill", [&](void* /*arg*/) {
-        special_env.SleepForMicroseconds(static_cast<int>(
-            std::chrono::microseconds(kTimePerRefill).count()));
+      "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
+        int64_t time_waited_us = *static_cast<int64_t*>(arg);
+        special_env.SleepForMicroseconds(static_cast<int>(time_waited_us));
       });
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
@@ -213,6 +453,8 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
   ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
 
   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+      "GenericRateLimiter::Request:PostTimedWait");
 
   // decreases after a sequence of periods where rate limiter is not drained
   orig_bytes_per_sec = new_bytes_per_sec;
@@ -228,6 +470,7 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }