]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Throttle.h
efc5ba037ae80e0b918641ff0702979118a684c6
[ceph.git] / ceph / src / common / Throttle.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_THROTTLE_H
5 #define CEPH_THROTTLE_H
6
7 #include <map>
8 #include <list>
9 #include <chrono>
10 #include <atomic>
11 #include <iostream>
12 #include <condition_variable>
13 #include <stdexcept>
14
15 #include "Cond.h"
16 #include "include/Context.h"
17
18 class CephContext;
19 class PerfCounters;
20
21 /**
22 * @class Throttle
23 * Throttles the maximum number of active requests.
24 *
25 * This class defines the maximum number of slots currently taken away. The
26 * excessive requests for more of them are delayed, until some slots are put
27 * back, so @p get_current() drops below the limit after fulfills the requests.
28 */
29 class Throttle {
30 CephContext *cct;
31 const std::string name;
32 PerfCounters *logger;
33 std::atomic<unsigned> count = { 0 }, max = { 0 };
34 Mutex lock;
35 list<Cond*> cond;
36 const bool use_perf;
37 bool shutting_down = false;
38 Cond shutdown_clear;
39
40 public:
41 Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
42 ~Throttle();
43
44 private:
45 void _reset_max(int64_t m);
46 bool _should_wait(int64_t c) const {
47 int64_t m = max;
48 int64_t cur = count;
49 return
50 m &&
51 ((c <= m && cur + c > m) || // normally stay under max
52 (c >= m && cur > m)); // except for large c
53 }
54
55 bool _wait(int64_t c);
56
57 public:
58 /**
59 * gets the number of currently taken slots
60 * @returns the number of taken slots
61 */
62 int64_t get_current() const {
63 return count;
64 }
65
66 /**
67 * get the max number of slots
68 * @returns the max number of slots
69 */
70 int64_t get_max() const { return max; }
71
72 /**
73 * return true if past midpoint
74 */
75 bool past_midpoint() const {
76 return count >= max / 2;
77 }
78
79 /**
80 * set the new max number, and wait until the number of taken slots drains
81 * and drops below this limit.
82 *
83 * @param m the new max number
84 * @returns true if this method is blocked, false it it returns immediately
85 */
86 bool wait(int64_t m = 0);
87
88 /**
89 * take the specified number of slots from the stock regardless the throttling
90 * @param c number of slots to take
91 * @returns the total number of taken slots
92 */
93 int64_t take(int64_t c = 1);
94
95 /**
96 * get the specified amount of slots from the stock, but will wait if the
97 * total number taken by consumer would exceed the maximum number.
98 * @param c number of slots to get
99 * @param m new maximum number to set, ignored if it is 0
100 * @returns true if this request is blocked due to the throttling, false
101 * otherwise
102 */
103 bool get(int64_t c = 1, int64_t m = 0);
104
105 /**
106 * the unblocked version of @p get()
107 * @returns true if it successfully got the requested amount,
108 * or false if it would block.
109 */
110 bool get_or_fail(int64_t c = 1);
111
112 /**
113 * put slots back to the stock
114 * @param c number of slots to return
115 * @returns number of requests being hold after this
116 */
117 int64_t put(int64_t c = 1);
118 /**
119 * reset the zero to the stock
120 */
121 void reset();
122
123 bool should_wait(int64_t c) const {
124 return _should_wait(c);
125 }
126 void reset_max(int64_t m) {
127 Mutex::Locker l(lock);
128 _reset_max(m);
129 }
130 };
131
132 /**
133 * BackoffThrottle
134 *
135 * Creates a throttle which gradually induces delays when get() is called
136 * based on params low_threshhold, high_threshhold, expected_throughput,
137 * high_multiple, and max_multiple.
138 *
139 * In [0, low_threshhold), we want no delay.
140 *
141 * In [low_threshhold, high_threshhold), delays should be injected based
142 * on a line from 0 at low_threshhold to
143 * high_multiple * (1/expected_throughput) at high_threshhold.
144 *
145 * In [high_threshhold, 1), we want delays injected based on a line from
146 * (high_multiple * (1/expected_throughput)) at high_threshhold to
147 * (high_multiple * (1/expected_throughput)) +
148 * (max_multiple * (1/expected_throughput)) at 1.
149 *
150 * Let the current throttle ratio (current/max) be r, low_threshhold be l,
151 * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
152 * and max_delay (max_muliple / expected_throughput) be m.
153 *
154 * delay = 0, r \in [0, l)
155 * delay = (r - l) * (e / (h - l)), r \in [l, h)
156 * delay = e + (r - h)((m - e)/(1 - h))
157 */
158 class BackoffThrottle {
159 CephContext *cct;
160 const std::string name;
161 PerfCounters *logger;
162
163 std::mutex lock;
164 using locker = std::unique_lock<std::mutex>;
165
166 unsigned next_cond = 0;
167
168 /// allocated once to avoid constantly allocating new ones
169 vector<std::condition_variable> conds;
170
171 const bool use_perf;
172
173 /// pointers into conds
174 list<std::condition_variable*> waiters;
175
176 std::list<std::condition_variable*>::iterator _push_waiter() {
177 unsigned next = next_cond++;
178 if (next_cond == conds.size())
179 next_cond = 0;
180 return waiters.insert(waiters.end(), &(conds[next]));
181 }
182
183 void _kick_waiters() {
184 if (!waiters.empty())
185 waiters.front()->notify_all();
186 }
187
188 /// see above, values are in [0, 1].
189 double low_threshhold = 0;
190 double high_threshhold = 1;
191
192 /// see above, values are in seconds
193 double high_delay_per_count = 0;
194 double max_delay_per_count = 0;
195
196 /// Filled in in set_params
197 double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
198 double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
199
200 /// max
201 uint64_t max = 0;
202 uint64_t current = 0;
203
204 std::chrono::duration<double> _get_delay(uint64_t c) const;
205
206 public:
207 /**
208 * set_params
209 *
210 * Sets params. If the params are invalid, returns false
211 * and populates errstream (if non-null) with a user compreshensible
212 * explanation.
213 */
214 bool set_params(
215 double low_threshhold,
216 double high_threshhold,
217 double expected_throughput,
218 double high_multiple,
219 double max_multiple,
220 uint64_t throttle_max,
221 ostream *errstream);
222
223 std::chrono::duration<double> get(uint64_t c = 1);
224 std::chrono::duration<double> wait() {
225 return get(0);
226 }
227 uint64_t put(uint64_t c = 1);
228 uint64_t take(uint64_t c = 1);
229 uint64_t get_current();
230 uint64_t get_max();
231
232 BackoffThrottle(CephContext *cct, const std::string& n,
233 unsigned expected_concurrency, ///< [in] determines size of conds
234 bool _use_perf = true);
235 ~BackoffThrottle();
236 };
237
238
239 /**
240 * @class SimpleThrottle
241 * This is a simple way to bound the number of concurrent operations.
242 *
243 * It tracks the first error encountered, and makes it available
244 * when all requests are complete. wait_for_ret() should be called
245 * before the instance is destroyed.
246 *
247 * Re-using the same instance isn't safe if you want to check each set
248 * of operations for errors, since the return value is not reset.
249 */
250 class SimpleThrottle {
251 public:
252 SimpleThrottle(uint64_t max, bool ignore_enoent);
253 ~SimpleThrottle();
254 void start_op();
255 void end_op(int r);
256 bool pending_error() const;
257 int wait_for_ret();
258 private:
259 mutable Mutex m_lock;
260 Cond m_cond;
261 uint64_t m_max;
262 uint64_t m_current;
263 int m_ret;
264 bool m_ignore_enoent;
265 uint32_t waiters = 0;
266 };
267
268
269 class OrderedThrottle;
270
271 class C_OrderedThrottle : public Context {
272 public:
273 C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
274 : m_ordered_throttle(ordered_throttle), m_tid(tid) {
275 }
276
277 protected:
278 void finish(int r) override;
279
280 private:
281 OrderedThrottle *m_ordered_throttle;
282 uint64_t m_tid;
283 };
284
285 /**
286 * @class OrderedThrottle
287 * Throttles the maximum number of active requests and completes them in order
288 *
289 * Operations can complete out-of-order but their associated Context callback
290 * will completed in-order during invokation of start_op() and wait_for_ret()
291 */
292 class OrderedThrottle {
293 public:
294 OrderedThrottle(uint64_t max, bool ignore_enoent);
295 ~OrderedThrottle();
296
297 C_OrderedThrottle *start_op(Context *on_finish);
298 void end_op(int r);
299
300 bool pending_error() const;
301 int wait_for_ret();
302
303 protected:
304 friend class C_OrderedThrottle;
305
306 void finish_op(uint64_t tid, int r);
307
308 private:
309 struct Result {
310 bool finished;
311 int ret_val;
312 Context *on_finish;
313
314 Result(Context *_on_finish = NULL)
315 : finished(false), ret_val(0), on_finish(_on_finish) {
316 }
317 };
318
319 typedef std::map<uint64_t, Result> TidResult;
320
321 mutable Mutex m_lock;
322 Cond m_cond;
323 uint64_t m_max;
324 uint64_t m_current;
325 int m_ret_val;
326 bool m_ignore_enoent;
327
328 uint64_t m_next_tid;
329 uint64_t m_complete_tid;
330
331 TidResult m_tid_result;
332
333 void complete_pending_ops();
334 uint32_t waiters = 0;
335 };
336
337 #endif