1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
17 #include "port/port.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/rate_limiter.h"
20 #include "rocksdb/status.h"
21 #include "rocksdb/system_clock.h"
22 #include "util/mutexlock.h"
23 #include "util/random.h"
25 namespace ROCKSDB_NAMESPACE
{
27 class GenericRateLimiter
: public RateLimiter
{
29 GenericRateLimiter(int64_t refill_bytes
, int64_t refill_period_us
,
30 int32_t fairness
, RateLimiter::Mode mode
,
31 const std::shared_ptr
<SystemClock
>& clock
,
34 virtual ~GenericRateLimiter();
36 // This API allows user to dynamically change rate limiter's bytes per second.
37 virtual void SetBytesPerSecond(int64_t bytes_per_second
) override
;
39 // Request for token to write bytes. If this request can not be satisfied,
40 // the call is blocked. Caller is responsible to make sure
41 // bytes <= GetSingleBurstBytes() and bytes >= 0. Negative bytes
42 // passed in will be rounded up to 0.
43 using RateLimiter::Request
;
44 virtual void Request(const int64_t bytes
, const Env::IOPriority pri
,
45 Statistics
* stats
) override
;
47 virtual int64_t GetSingleBurstBytes() const override
{
48 return refill_bytes_per_period_
.load(std::memory_order_relaxed
);
51 virtual int64_t GetTotalBytesThrough(
52 const Env::IOPriority pri
= Env::IO_TOTAL
) const override
{
53 MutexLock
g(&request_mutex_
);
54 if (pri
== Env::IO_TOTAL
) {
55 int64_t total_bytes_through_sum
= 0;
56 for (int i
= Env::IO_LOW
; i
< Env::IO_TOTAL
; ++i
) {
57 total_bytes_through_sum
+= total_bytes_through_
[i
];
59 return total_bytes_through_sum
;
61 return total_bytes_through_
[pri
];
64 virtual int64_t GetTotalRequests(
65 const Env::IOPriority pri
= Env::IO_TOTAL
) const override
{
66 MutexLock
g(&request_mutex_
);
67 if (pri
== Env::IO_TOTAL
) {
68 int64_t total_requests_sum
= 0;
69 for (int i
= Env::IO_LOW
; i
< Env::IO_TOTAL
; ++i
) {
70 total_requests_sum
+= total_requests_
[i
];
72 return total_requests_sum
;
74 return total_requests_
[pri
];
77 virtual Status
GetTotalPendingRequests(
78 int64_t* total_pending_requests
,
79 const Env::IOPriority pri
= Env::IO_TOTAL
) const override
{
80 assert(total_pending_requests
!= nullptr);
81 MutexLock
g(&request_mutex_
);
82 if (pri
== Env::IO_TOTAL
) {
83 int64_t total_pending_requests_sum
= 0;
84 for (int i
= Env::IO_LOW
; i
< Env::IO_TOTAL
; ++i
) {
85 total_pending_requests_sum
+= static_cast<int64_t>(queue_
[i
].size());
87 *total_pending_requests
= total_pending_requests_sum
;
89 *total_pending_requests
= static_cast<int64_t>(queue_
[pri
].size());
94 virtual int64_t GetBytesPerSecond() const override
{
95 return rate_bytes_per_sec_
.load(std::memory_order_relaxed
);
98 virtual void TEST_SetClock(std::shared_ptr
<SystemClock
> clock
) {
99 MutexLock
g(&request_mutex_
);
100 clock_
= std::move(clock
);
101 next_refill_us_
= NowMicrosMonotonicLocked();
105 void RefillBytesAndGrantRequestsLocked();
106 std::vector
<Env::IOPriority
> GeneratePriorityIterationOrderLocked();
107 int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec
);
109 void SetBytesPerSecondLocked(int64_t bytes_per_second
);
111 uint64_t NowMicrosMonotonicLocked() {
112 return clock_
->NowNanos() / std::milli::den
;
115 // This mutex guard all internal states
116 mutable port::Mutex request_mutex_
;
118 const int64_t refill_period_us_
;
120 std::atomic
<int64_t> rate_bytes_per_sec_
;
121 std::atomic
<int64_t> refill_bytes_per_period_
;
122 std::shared_ptr
<SystemClock
> clock_
;
125 port::CondVar exit_cv_
;
126 int32_t requests_to_wait_
;
128 int64_t total_requests_
[Env::IO_TOTAL
];
129 int64_t total_bytes_through_
[Env::IO_TOTAL
];
130 int64_t available_bytes_
;
131 int64_t next_refill_us_
;
137 std::deque
<Req
*> queue_
[Env::IO_TOTAL
];
138 bool wait_until_refill_pending_
;
142 const int64_t max_bytes_per_sec_
;
143 std::chrono::microseconds tuned_time_
;
146 } // namespace ROCKSDB_NAMESPACE