]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Throttle.h
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / common / Throttle.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_THROTTLE_H
5 #define CEPH_THROTTLE_H
6
7 #include <atomic>
8 #include <chrono>
9 #include <iostream>
10 #include <list>
11 #include <map>
12
13 #include "common/ceph_mutex.h"
14 #include "include/Context.h"
15 #include "common/ThrottleInterface.h"
16 #include "common/Timer.h"
17 #include "common/convenience.h"
18 #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
19 #include "crimson/common/perf_counters_collection.h"
20 #else
21 #include "common/perf_counters_collection.h"
22 #endif
23
24 /**
25 * @class Throttle
26 * Throttles the maximum number of active requests.
27 *
28 * This class defines the maximum number of slots currently taken away. The
29 * excessive requests for more of them are delayed, until some slots are put
30 * back, so @p get_current() drops below the limit after fulfills the requests.
31 */
32 class Throttle final : public ThrottleInterface {
33 CephContext *cct;
34 const std::string name;
35 PerfCountersRef logger;
36 std::atomic<int64_t> count = { 0 }, max = { 0 };
37 std::mutex lock;
38 std::list<std::condition_variable> conds;
39 const bool use_perf;
40
41 public:
42 Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
43 ~Throttle() override;
44
45 private:
46 void _reset_max(int64_t m);
47 bool _should_wait(int64_t c) const {
48 int64_t m = max;
49 int64_t cur = count;
50 return
51 m &&
52 ((c <= m && cur + c > m) || // normally stay under max
53 (c >= m && cur > m)); // except for large c
54 }
55
56 bool _wait(int64_t c, std::unique_lock<std::mutex>& l);
57
58 public:
59 /**
60 * gets the number of currently taken slots
61 * @returns the number of taken slots
62 */
63 int64_t get_current() const {
64 return count;
65 }
66
67 /**
68 * get the max number of slots
69 * @returns the max number of slots
70 */
71 int64_t get_max() const { return max; }
72
73 /**
74 * return true if past midpoint
75 */
76 bool past_midpoint() const {
77 return count >= max / 2;
78 }
79
80 /**
81 * set the new max number, and wait until the number of taken slots drains
82 * and drops below this limit.
83 *
84 * @param m the new max number
85 * @returns true if this method is blocked, false it it returns immediately
86 */
87 bool wait(int64_t m = 0);
88
89 /**
90 * take the specified number of slots from the stock regardless the throttling
91 * @param c number of slots to take
92 * @returns the total number of taken slots
93 */
94 int64_t take(int64_t c = 1) override;
95
96 /**
97 * get the specified amount of slots from the stock, but will wait if the
98 * total number taken by consumer would exceed the maximum number.
99 * @param c number of slots to get
100 * @param m new maximum number to set, ignored if it is 0
101 * @returns true if this request is blocked due to the throttling, false
102 * otherwise
103 */
104 bool get(int64_t c = 1, int64_t m = 0);
105
106 /**
107 * the unblocked version of @p get()
108 * @returns true if it successfully got the requested amount,
109 * or false if it would block.
110 */
111 bool get_or_fail(int64_t c = 1);
112
113 /**
114 * put slots back to the stock
115 * @param c number of slots to return
116 * @returns number of requests being hold after this
117 */
118 int64_t put(int64_t c = 1) override;
119 /**
120 * reset the zero to the stock
121 */
122 void reset();
123
124 void reset_max(int64_t m) {
125 std::lock_guard l(lock);
126 _reset_max(m);
127 }
128 };
129
130 /**
131 * BackoffThrottle
132 *
133 * Creates a throttle which gradually induces delays when get() is called
134 * based on params low_threshold, high_threshold, expected_throughput,
135 * high_multiple, and max_multiple.
136 *
137 * In [0, low_threshold), we want no delay.
138 *
139 * In [low_threshold, high_threshold), delays should be injected based
140 * on a line from 0 at low_threshold to
141 * high_multiple * (1/expected_throughput) at high_threshold.
142 *
143 * In [high_threshold, 1), we want delays injected based on a line from
144 * (high_multiple * (1/expected_throughput)) at high_threshold to
145 * (high_multiple * (1/expected_throughput)) +
146 * (max_multiple * (1/expected_throughput)) at 1.
147 *
148 * Let the current throttle ratio (current/max) be r, low_threshold be l,
149 * high_threshold be h, high_delay (high_multiple / expected_throughput) be e,
150 * and max_delay (max_multiple / expected_throughput) be m.
151 *
152 * delay = 0, r \in [0, l)
153 * delay = (r - l) * (e / (h - l)), r \in [l, h)
154 * delay = e + (r - h)((m - e)/(1 - h))
155 */
156 class BackoffThrottle {
157 const std::string name;
158 PerfCountersRef logger;
159
160 std::mutex lock;
161 using locker = std::unique_lock<std::mutex>;
162
163 unsigned next_cond = 0;
164
165 /// allocated once to avoid constantly allocating new ones
166 std::vector<std::condition_variable> conds;
167
168 const bool use_perf;
169
170 /// pointers into conds
171 std::list<std::condition_variable*> waiters;
172
173 std::list<std::condition_variable*>::iterator _push_waiter() {
174 unsigned next = next_cond++;
175 if (next_cond == conds.size())
176 next_cond = 0;
177 return waiters.insert(waiters.end(), &(conds[next]));
178 }
179
180 void _kick_waiters() {
181 if (!waiters.empty())
182 waiters.front()->notify_all();
183 }
184
185 /// see above, values are in [0, 1].
186 double low_threshold = 0;
187 double high_threshold = 1;
188
189 /// see above, values are in seconds
190 double high_delay_per_count = 0;
191 double max_delay_per_count = 0;
192
193 /// Filled in in set_params
194 double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
195 double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
196
197 /// max
198 uint64_t max = 0;
199 uint64_t current = 0;
200
201 ceph::timespan _get_delay(uint64_t c) const;
202
203 public:
204 /**
205 * set_params
206 *
207 * Sets params. If the params are invalid, returns false
208 * and populates errstream (if non-null) with a user comprehensible
209 * explanation.
210 */
211 bool set_params(
212 double _low_threshold,
213 double _high_threshold,
214 double expected_throughput,
215 double high_multiple,
216 double max_multiple,
217 uint64_t throttle_max,
218 std::ostream *errstream);
219
220 ceph::timespan get(uint64_t c = 1);
221 ceph::timespan wait() {
222 return get(0);
223 }
224 uint64_t put(uint64_t c = 1);
225 uint64_t take(uint64_t c = 1);
226 uint64_t get_current();
227 uint64_t get_max();
228
229 BackoffThrottle(CephContext *cct, const std::string& n,
230 unsigned expected_concurrency, ///< [in] determines size of conds
231 bool _use_perf = true);
232 ~BackoffThrottle();
233 };
234
235
236 /**
237 * @class SimpleThrottle
238 * This is a simple way to bound the number of concurrent operations.
239 *
240 * It tracks the first error encountered, and makes it available
241 * when all requests are complete. wait_for_ret() should be called
242 * before the instance is destroyed.
243 *
244 * Re-using the same instance isn't safe if you want to check each set
245 * of operations for errors, since the return value is not reset.
246 */
247 class SimpleThrottle {
248 public:
249 SimpleThrottle(uint64_t max, bool ignore_enoent);
250 ~SimpleThrottle();
251 void start_op();
252 void end_op(int r);
253 bool pending_error() const;
254 int wait_for_ret();
255 private:
256 mutable std::mutex m_lock;
257 std::condition_variable m_cond;
258 uint64_t m_max;
259 uint64_t m_current = 0;
260 int m_ret = 0;
261 bool m_ignore_enoent;
262 uint32_t waiters = 0;
263 };
264
265
266 class OrderedThrottle;
267
268 class C_OrderedThrottle : public Context {
269 public:
270 C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
271 : m_ordered_throttle(ordered_throttle), m_tid(tid) {
272 }
273
274 protected:
275 void finish(int r) override;
276
277 private:
278 OrderedThrottle *m_ordered_throttle;
279 uint64_t m_tid;
280 };
281
282 /**
283 * @class OrderedThrottle
284 * Throttles the maximum number of active requests and completes them in order
285 *
286 * Operations can complete out-of-order but their associated Context callback
287 * will completed in-order during invocation of start_op() and wait_for_ret()
288 */
289 class OrderedThrottle {
290 public:
291 OrderedThrottle(uint64_t max, bool ignore_enoent);
292 ~OrderedThrottle();
293
294 C_OrderedThrottle *start_op(Context *on_finish);
295 void end_op(int r);
296
297 bool pending_error() const;
298 int wait_for_ret();
299
300 protected:
301 friend class C_OrderedThrottle;
302
303 void finish_op(uint64_t tid, int r);
304
305 private:
306 struct Result {
307 bool finished;
308 int ret_val;
309 Context *on_finish;
310
311 Result(Context *_on_finish = NULL)
312 : finished(false), ret_val(0), on_finish(_on_finish) {
313 }
314 };
315
316 typedef std::map<uint64_t, Result> TidResult;
317
318 mutable std::mutex m_lock;
319 std::condition_variable m_cond;
320 uint64_t m_max;
321 uint64_t m_current = 0;
322 int m_ret_val = 0;
323 bool m_ignore_enoent;
324
325 uint64_t m_next_tid = 0;
326 uint64_t m_complete_tid = 0;
327
328 TidResult m_tid_result;
329
330 void complete_pending_ops(std::unique_lock<std::mutex>& l);
331 uint32_t waiters = 0;
332 };
333
334
335 class TokenBucketThrottle {
336 struct Bucket {
337 CephContext *cct;
338 const std::string name;
339
340 uint64_t remain;
341 uint64_t max;
342 uint64_t capacity;
343 uint64_t available;
344
345 Bucket(CephContext *cct, const std::string &name, uint64_t m)
346 : cct(cct), name(name), remain(m), max(m), capacity(m), available(m) {}
347
348 uint64_t get(uint64_t c);
349 uint64_t put(uint64_t tokens, double burst_ratio);
350 void set_max(uint64_t max, uint64_t burst_seconds);
351 };
352
353 struct Blocker {
354 uint64_t tokens_requested;
355 Context *ctx;
356
357 Blocker(uint64_t _tokens_requested, Context* _ctx)
358 : tokens_requested(_tokens_requested), ctx(_ctx) {}
359 };
360
361 CephContext *m_cct;
362 const std::string m_name;
363 Bucket m_throttle;
364 uint64_t m_burst = 0;
365 uint64_t m_avg = 0;
366 SafeTimer *m_timer;
367 ceph::mutex *m_timer_lock;
368 Context *m_token_ctx = nullptr;
369 std::list<Blocker> m_blockers;
370 ceph::mutex m_lock;
371
372 // minimum of the filling period.
373 uint64_t m_tick_min = 50;
374 // tokens filling period, its unit is millisecond.
375 uint64_t m_tick = 0;
376 /**
377 * These variables are used to calculate how many tokens need to be put into
378 * the bucket within each tick.
379 *
380 * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second)
381 * may be a floating point number, but we need an 'uint64_t' to put into the
382 * bucket.
383 *
384 * For example, we set the value of rate to be 950, means 950 iops(or bps).
385 *
386 * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052,
387 * which is too small for the SafeTimer. So we should set the period(m_tick)
388 * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second).
389 * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer.
390 *
391 * To resolve this, we use a method called tokens_filled(m_current_tick) to
392 * calculate how many tokens will be put so far(until m_current_tick):
393 *
394 * tokens_filled = m_current_tick / m_ticks_per_second * m_avg
395 *
396 * And the difference between two ticks will be the result we expect.
397 * tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) = 47 - 0 = 47
398 * tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) = 95 - 47 = 48
399 * tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 - 95 = 47
400 *
401 * As a result, the tokens filled in one second will shown as this:
402 * tick | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20|
403 * tokens |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|
404 */
405 uint64_t m_ticks_per_second = 0;
406 uint64_t m_current_tick = 0;
407
408 // period for the bucket filling tokens, its unit is seconds.
409 double m_schedule_tick = 1.0;
410
411 public:
412 TokenBucketThrottle(CephContext *cct, const std::string &name,
413 uint64_t burst, uint64_t avg,
414 SafeTimer *timer, ceph::mutex *timer_lock);
415
416 ~TokenBucketThrottle();
417
418 const std::string &get_name() {
419 return m_name;
420 }
421
422 template <typename T, typename MF, typename I>
423 void add_blocker(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) {
424 auto ctx = new LambdaContext(
425 [t, mf, item=std::forward<I>(item), flag](int) mutable {
426 (t->*mf)(std::forward<I>(item), flag);
427 });
428 m_blockers.emplace_back(c, ctx);
429 }
430
431 template <typename T, typename MF, typename I>
432 bool get(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) {
433 bool wait = false;
434 uint64_t got = 0;
435 std::lock_guard lock(m_lock);
436 if (!m_blockers.empty()) {
437 // Keep the order of requests, add item after previous blocked requests.
438 wait = true;
439 } else {
440 if (0 == m_throttle.max || 0 == m_avg)
441 return false;
442
443 got = m_throttle.get(c);
444 if (got < c) {
445 // Not enough tokens, add a blocker for it.
446 wait = true;
447 }
448 }
449
450 if (wait) {
451 add_blocker(c - got, std::forward<T>(t), std::forward<MF>(mf),
452 std::forward<I>(item), flag);
453 }
454
455 return wait;
456 }
457
458 int set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds);
459 void set_schedule_tick_min(uint64_t tick);
460
461 private:
462 uint64_t tokens_filled(double tick);
463 uint64_t tokens_this_tick();
464 void add_tokens();
465 void schedule_timer();
466 void cancel_timer();
467 };
468
469 #endif