]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #pragma once | |
7 | ||
8 | #include <stdint.h> | |
9 | ||
10 | #include <atomic> | |
11 | #include <memory> | |
11fdf7f2 | 12 | #include "rocksdb/rate_limiter.h" |
7c673cae | 13 | |
f67539c2 | 14 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
15 | |
16 | class Env; | |
17 | class WriteControllerToken; | |
18 | ||
19 | // WriteController is controlling write stalls in our write code-path. Write | |
20 | // stalls happen when compaction can't keep up with write rate. | |
21 | // All of the methods here (including WriteControllerToken's destructors) need | |
22 | // to be called while holding DB mutex | |
23 | class WriteController { | |
24 | public: | |
11fdf7f2 TL |
25 | explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u, |
26 | int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) | |
7c673cae FG |
27 | : total_stopped_(0), |
28 | total_delayed_(0), | |
29 | total_compaction_pressure_(0), | |
30 | bytes_left_(0), | |
11fdf7f2 TL |
31 | last_refill_time_(0), |
32 | low_pri_rate_limiter_( | |
33 | NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { | |
7c673cae FG |
34 | set_max_delayed_write_rate(_delayed_write_rate); |
35 | } | |
36 | ~WriteController() = default; | |
37 | ||
38 | // When an actor (column family) requests a stop token, all writes will be | |
39 | // stopped until the stop token is released (deleted) | |
40 | std::unique_ptr<WriteControllerToken> GetStopToken(); | |
41 | // When an actor (column family) requests a delay token, total delay for all | |
42 | // writes to the DB will be controlled under the delayed write rate. Every | |
43 | // write needs to call GetDelay() with number of bytes writing to the DB, | |
44 | // which returns number of microseconds to sleep. | |
45 | std::unique_ptr<WriteControllerToken> GetDelayToken( | |
46 | uint64_t delayed_write_rate); | |
47 | // When an actor (column family) requests a moderate token, compaction | |
48 | // threads will be increased | |
49 | std::unique_ptr<WriteControllerToken> GetCompactionPressureToken(); | |
50 | ||
51 | // these three metods are querying the state of the WriteController | |
52 | bool IsStopped() const; | |
53 | bool NeedsDelay() const { return total_delayed_.load() > 0; } | |
54 | bool NeedSpeedupCompaction() const { | |
55 | return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; | |
56 | } | |
57 | // return how many microseconds the caller needs to sleep after the call | |
58 | // num_bytes: how many number of bytes to put into the DB. | |
59 | // Prerequisite: DB mutex held. | |
60 | uint64_t GetDelay(Env* env, uint64_t num_bytes); | |
61 | void set_delayed_write_rate(uint64_t write_rate) { | |
62 | // avoid divide 0 | |
63 | if (write_rate == 0) { | |
64 | write_rate = 1u; | |
65 | } else if (write_rate > max_delayed_write_rate()) { | |
66 | write_rate = max_delayed_write_rate(); | |
67 | } | |
68 | delayed_write_rate_ = write_rate; | |
69 | } | |
70 | ||
71 | void set_max_delayed_write_rate(uint64_t write_rate) { | |
72 | // avoid divide 0 | |
73 | if (write_rate == 0) { | |
74 | write_rate = 1u; | |
75 | } | |
76 | max_delayed_write_rate_ = write_rate; | |
77 | // update delayed_write_rate_ as well | |
78 | delayed_write_rate_ = write_rate; | |
79 | } | |
80 | ||
81 | uint64_t delayed_write_rate() const { return delayed_write_rate_; } | |
82 | ||
83 | uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; } | |
84 | ||
11fdf7f2 TL |
85 | RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); } |
86 | ||
7c673cae FG |
87 | private: |
88 | uint64_t NowMicrosMonotonic(Env* env); | |
89 | ||
90 | friend class WriteControllerToken; | |
91 | friend class StopWriteToken; | |
92 | friend class DelayWriteToken; | |
93 | friend class CompactionPressureToken; | |
94 | ||
11fdf7f2 | 95 | std::atomic<int> total_stopped_; |
7c673cae | 96 | std::atomic<int> total_delayed_; |
11fdf7f2 | 97 | std::atomic<int> total_compaction_pressure_; |
7c673cae FG |
98 | uint64_t bytes_left_; |
99 | uint64_t last_refill_time_; | |
100 | // write rate set when initialization or by `DBImpl::SetDBOptions` | |
101 | uint64_t max_delayed_write_rate_; | |
102 | // current write rate | |
103 | uint64_t delayed_write_rate_; | |
11fdf7f2 TL |
104 | |
105 | std::unique_ptr<RateLimiter> low_pri_rate_limiter_; | |
7c673cae FG |
106 | }; |
107 | ||
108 | class WriteControllerToken { | |
109 | public: | |
110 | explicit WriteControllerToken(WriteController* controller) | |
111 | : controller_(controller) {} | |
112 | virtual ~WriteControllerToken() {} | |
113 | ||
114 | protected: | |
115 | WriteController* controller_; | |
116 | ||
117 | private: | |
118 | // no copying allowed | |
119 | WriteControllerToken(const WriteControllerToken&) = delete; | |
120 | void operator=(const WriteControllerToken&) = delete; | |
121 | }; | |
122 | ||
123 | class StopWriteToken : public WriteControllerToken { | |
124 | public: | |
125 | explicit StopWriteToken(WriteController* controller) | |
126 | : WriteControllerToken(controller) {} | |
127 | virtual ~StopWriteToken(); | |
128 | }; | |
129 | ||
130 | class DelayWriteToken : public WriteControllerToken { | |
131 | public: | |
132 | explicit DelayWriteToken(WriteController* controller) | |
133 | : WriteControllerToken(controller) {} | |
134 | virtual ~DelayWriteToken(); | |
135 | }; | |
136 | ||
137 | class CompactionPressureToken : public WriteControllerToken { | |
138 | public: | |
139 | explicit CompactionPressureToken(WriteController* controller) | |
140 | : WriteControllerToken(controller) {} | |
141 | virtual ~CompactionPressureToken(); | |
142 | }; | |
143 | ||
f67539c2 | 144 | } // namespace ROCKSDB_NAMESPACE |