#include <chrono>
#include <cinttypes>
+#include <cstdint>
#include <limits>
#include "db/db_test_util.h"
-#include "rocksdb/env.h"
+#include "port/port.h"
+#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
// TODO(yhchiang): the rate will not be accurate when we run test in parallel.
-class RateLimiterTest : public testing::Test {};
+class RateLimiterTest : public testing::Test {
+ protected:
+ ~RateLimiterTest() override {
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ }
+};
TEST_F(RateLimiterTest, OverflowRate) {
- GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
- RateLimiter::Mode::kWritesOnly, Env::Default(),
- false /* auto_tuned */);
+ GenericRateLimiter limiter(std::numeric_limits<int64_t>::max(), 1000, 10,
+ RateLimiter::Mode::kWritesOnly,
+ SystemClock::Default(), false /* auto_tuned */);
ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
}
std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
}
+TEST_F(RateLimiterTest, GetTotalBytesThrough) {
+ std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+ 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */));
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ ASSERT_EQ(limiter->GetTotalBytesThrough(static_cast<Env::IOPriority>(i)),
+ 0);
+ }
+
+ std::int64_t request_byte = 200;
+ std::int64_t request_byte_sum = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ limiter->Request(request_byte, static_cast<Env::IOPriority>(i),
+ nullptr /* stats */, RateLimiter::OpType::kWrite);
+ request_byte_sum += request_byte;
+ }
+
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ EXPECT_EQ(limiter->GetTotalBytesThrough(static_cast<Env::IOPriority>(i)),
+ request_byte)
+ << "Failed to track total_bytes_through_ correctly when IOPriority = "
+ << static_cast<Env::IOPriority>(i);
+ }
+ EXPECT_EQ(limiter->GetTotalBytesThrough(Env::IO_TOTAL), request_byte_sum)
+ << "Failed to track total_bytes_through_ correctly when IOPriority = "
+ "Env::IO_TOTAL";
+}
+
+TEST_F(RateLimiterTest, GetTotalRequests) {
+ std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+ 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */));
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ ASSERT_EQ(limiter->GetTotalRequests(static_cast<Env::IOPriority>(i)), 0);
+ }
+
+ std::int64_t total_requests_sum = 0;
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ limiter->Request(200, static_cast<Env::IOPriority>(i), nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ total_requests_sum += 1;
+ }
+
+ for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
+ EXPECT_EQ(limiter->GetTotalRequests(static_cast<Env::IOPriority>(i)), 1)
+ << "Failed to track total_requests_ correctly when IOPriority = "
+ << static_cast<Env::IOPriority>(i);
+ }
+ EXPECT_EQ(limiter->GetTotalRequests(Env::IO_TOTAL), total_requests_sum)
+ << "Failed to track total_requests_ correctly when IOPriority = "
+ "Env::IO_TOTAL";
+}
+
+TEST_F(RateLimiterTest, GetTotalPendingRequests) {
+ std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+ 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */));
+ int64_t total_pending_requests = 0;
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ ASSERT_OK(limiter->GetTotalPendingRequests(
+ &total_pending_requests, static_cast<Env::IOPriority>(i)));
+ ASSERT_EQ(total_pending_requests, 0);
+ }
+ // This is a variable for making sure the following callback is called
+ // and the assertions in it are indeed excuted
+ bool nonzero_pending_requests_verified = false;
+ SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) {
+ port::Mutex* request_mutex = (port::Mutex*)arg;
+ // We temporarily unlock the mutex so that the following
+ // GetTotalPendingRequests() can acquire it
+ request_mutex->Unlock();
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ EXPECT_OK(limiter->GetTotalPendingRequests(
+ &total_pending_requests, static_cast<Env::IOPriority>(i)))
+ << "Failed to return total pending requests for priority level = "
+ << static_cast<Env::IOPriority>(i);
+ if (i == Env::IO_USER || i == Env::IO_TOTAL) {
+ EXPECT_EQ(total_pending_requests, 1)
+ << "Failed to correctly return total pending requests for "
+ "priority level = "
+ << static_cast<Env::IOPriority>(i);
+ } else {
+ EXPECT_EQ(total_pending_requests, 0)
+ << "Failed to correctly return total pending requests for "
+ "priority level = "
+ << static_cast<Env::IOPriority>(i);
+ }
+ }
+ // We lock the mutex again so that the request thread can resume running
+ // with the mutex locked
+ request_mutex->Lock();
+ nonzero_pending_requests_verified = true;
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ limiter->Request(200, Env::IO_USER, nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ ASSERT_EQ(nonzero_pending_requests_verified, true);
+ for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
+ EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
+ static_cast<Env::IOPriority>(i)))
+ << "Failed to return total pending requests for priority level = "
+ << static_cast<Env::IOPriority>(i);
+ EXPECT_EQ(total_pending_requests, 0)
+ << "Failed to correctly return total pending requests for priority "
+ "level = "
+ << static_cast<Env::IOPriority>(i);
+ }
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::Request:PostEnqueueRequest");
+}
+
TEST_F(RateLimiterTest, Modes) {
for (auto mode : {RateLimiter::Mode::kWritesOnly,
RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
- GenericRateLimiter limiter(
- 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
- 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
+ GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */,
+ 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */, mode, SystemClock::Default(),
+ false /* auto_tuned */);
limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
if (mode == RateLimiter::Mode::kWritesOnly) {
}
}
-#if !((defined(TRAVIS) || defined(CIRCLECI)) && defined(OS_MACOSX))
+TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) {
+ std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
+ 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
+ 10 /* fairness */));
+
+ bool possible_random_one_in_fairness_results_for_high_mid_pri[4][2] = {
+ {false, false}, {false, true}, {true, false}, {true, true}};
+ std::vector<Env::IOPriority> possible_priority_iteration_orders[4] = {
+ {Env::IO_USER, Env::IO_HIGH, Env::IO_MID, Env::IO_LOW},
+ {Env::IO_USER, Env::IO_HIGH, Env::IO_LOW, Env::IO_MID},
+ {Env::IO_USER, Env::IO_MID, Env::IO_LOW, Env::IO_HIGH},
+ {Env::IO_USER, Env::IO_LOW, Env::IO_MID, Env::IO_HIGH}};
+
+ for (int i = 0; i < 4; ++i) {
+ // These are variables for making sure the following callbacks are called
+ // and the assertion in the last callback is indeed excuted
+ bool high_pri_iterated_after_mid_low_pri_set = false;
+ bool mid_pri_itereated_after_low_pri_set = false;
+ bool pri_iteration_order_verified = false;
+ SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForHighPri",
+ [&](void* arg) {
+ bool* high_pri_iterated_after_mid_low_pri = (bool*)arg;
+ *high_pri_iterated_after_mid_low_pri =
+ possible_random_one_in_fairness_results_for_high_mid_pri[i][0];
+ high_pri_iterated_after_mid_low_pri_set = true;
+ });
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForMidPri",
+ [&](void* arg) {
+ bool* mid_pri_itereated_after_low_pri = (bool*)arg;
+ *mid_pri_itereated_after_low_pri =
+ possible_random_one_in_fairness_results_for_high_mid_pri[i][1];
+ mid_pri_itereated_after_low_pri_set = true;
+ });
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PreReturnPriIterationOrder",
+ [&](void* arg) {
+ std::vector<Env::IOPriority>* pri_iteration_order =
+ (std::vector<Env::IOPriority>*)arg;
+ EXPECT_EQ(*pri_iteration_order, possible_priority_iteration_orders[i])
+ << "Failed to generate priority iteration order correctly when "
+ "high_pri_iterated_after_mid_low_pri = "
+ << possible_random_one_in_fairness_results_for_high_mid_pri[i][0]
+ << ", mid_pri_itereated_after_low_pri = "
+ << possible_random_one_in_fairness_results_for_high_mid_pri[i][1]
+ << std::endl;
+ pri_iteration_order_verified = true;
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ limiter->Request(200 /* request max bytes to drain so that refill and order
+ generation will be triggered every time
+ GenericRateLimiter::Request() is called */
+ ,
+ Env::IO_USER, nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ ASSERT_EQ(high_pri_iterated_after_mid_low_pri_set, true);
+ ASSERT_EQ(mid_pri_itereated_after_low_pri_set, true);
+ ASSERT_EQ(pri_iteration_order_verified, true);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PreReturnPriIterationOrder");
+ SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForMidPri");
+ SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
+ "PostRandomOneInFairnessForHighPri");
+ }
+}
+
TEST_F(RateLimiterTest, Rate) {
auto* env = Env::Default();
struct Arg {
Arg(int32_t _target_rate, int _burst)
- : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
- request_size(_target_rate / 10),
+ : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */,
+ 100 * 1000 /* refill_period_us */,
+ 10 /* fairness */)),
+ request_size(_target_rate /
+ 10 /* refill period here is 1/10 second */),
burst(_burst) {}
std::unique_ptr<RateLimiter> limiter;
int32_t request_size;
};
auto writer = [](void* p) {
- auto* thread_env = Env::Default();
+ const auto& thread_clock = SystemClock::Default();
auto* arg = static_cast<Arg*>(p);
// Test for 2 seconds
- auto until = thread_env->NowMicros() + 2 * 1000000;
- Random r((uint32_t)(thread_env->NowNanos() %
+ auto until = thread_clock->NowMicros() + 2 * 1000000;
+ Random r((uint32_t)(thread_clock->NowNanos() %
std::numeric_limits<uint32_t>::max()));
- while (thread_env->NowMicros() < until) {
+ while (thread_clock->NowMicros() < until) {
+ for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst * 2) + 1); ++i) {
+ arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
+ Env::IO_USER, nullptr /* stats */,
+ RateLimiter::OpType::kWrite);
+ }
+
for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kWrite);
}
+
+ for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst / 2 + 1) + 1);
+ ++i) {
+ arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_MID,
+ nullptr /* stats */, RateLimiter::OpType::kWrite);
+ }
+
arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
};
+ int samples = 0;
+ int samples_at_minimum = 0;
+
for (int i = 1; i <= 16; i *= 2) {
int32_t target = i * 1024 * 10;
Arg arg(target, i / 4 + 1);
arg.request_size - 1, target / 1024, rate / 1024,
elapsed / 1000000.0);
- ASSERT_GE(rate / target, 0.80);
+ ++samples;
+ if (rate / target >= 0.80) {
+ ++samples_at_minimum;
+ }
ASSERT_LE(rate / target, 1.25);
}
}
-}
+
+ // This can fail due to slow execution speed, like when using valgrind or in
+ // heavily loaded CI environments
+ bool skip_minimum_rate_check =
+#if (defined(CIRCLECI) && defined(OS_MACOSX)) || defined(ROCKSDB_VALGRIND_RUN)
+ true;
+#else
+ getenv("SANDCASTLE");
#endif
+ if (skip_minimum_rate_check) {
+ fprintf(stderr, "Skipped minimum rate check (%d / %d passed)\n",
+ samples_at_minimum, samples);
+ } else {
+ ASSERT_EQ(samples_at_minimum, samples);
+ }
+}
TEST_F(RateLimiterTest, LimitChangeTest) {
// starvation test when limit changes to a smaller value
std::shared_ptr<RateLimiter> limiter =
std::make_shared<GenericRateLimiter>(
target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
- Env::Default(), false /* auto_tuned */);
+ SystemClock::Default(), false /* auto_tuned */);
+ // After "GenericRateLimiter::Request:1" the mutex is held until the bytes
+ // are refilled. This test could be improved to change the limit when lock
+ // is released in `TimedWait()`.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"GenericRateLimiter::Request",
"RateLimiterTest::LimitChangeTest:changeLimitStart"},
{"RateLimiterTest::LimitChangeTest:changeLimitEnd",
- "GenericRateLimiter::Refill"}});
+ "GenericRateLimiter::Request:1"}});
Arg arg(target, Env::IO_HIGH, limiter);
// The idea behind is to start a request first, then before it refills,
// update limit to a different value (2X/0.5X). No starvation should
target / 1024, new_limit / 1024, refill_period / 1000);
}
}
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
1000 /* rate_bytes_per_sec */,
std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
- RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
+ RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(),
+ true /* auto_tuned */));
- // Use callback to advance time because we need to advance (1) after Request()
- // has determined the bytes are not available; and (2) before Refill()
- // computes the next refill time (ensuring refill time in the future allows
- // the next request to drain the rate limiter).
+ // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
+ // `Env` to advance its time according to the fake wait duration. The
+ // workaround is to install a callback that advance the `Env`'s mock time.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "GenericRateLimiter::Refill", [&](void* /*arg*/) {
- special_env.SleepForMicroseconds(static_cast<int>(
- std::chrono::microseconds(kTimePerRefill).count()));
+ "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
+ int64_t time_waited_us = *static_cast<int64_t*>(arg);
+ special_env.SleepForMicroseconds(static_cast<int>(time_waited_us));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
+ "GenericRateLimiter::Request:PostTimedWait");
// decreases after a sequence of periods where rate limiter is not drained
orig_bytes_per_sec = new_bytes_per_sec;
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}