]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Throttle.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / Throttle.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_THROTTLE_H
5#define CEPH_THROTTLE_H
6
31f18b77 7#include <atomic>
11fdf7f2 8#include <chrono>
7c673cae 9#include <iostream>
11fdf7f2
TL
10#include <list>
11#include <map>
31f18b77 12
11fdf7f2 13#include "common/ceph_mutex.h"
7c673cae 14#include "include/Context.h"
11fdf7f2
TL
15#include "common/ThrottleInterface.h"
16#include "common/Timer.h"
17#include "common/convenience.h"
18#include "common/perf_counters_collection.h"
7c673cae
FG
19
20/**
21 * @class Throttle
22 * Throttles the maximum number of active requests.
23 *
24 * This class defines the maximum number of slots currently taken away. The
25 * excessive requests for more of them are delayed, until some slots are put
26 * back, so @p get_current() drops below the limit after fulfills the requests.
27 */
11fdf7f2 28class Throttle final : public ThrottleInterface {
7c673cae
FG
29 CephContext *cct;
30 const std::string name;
11fdf7f2 31 PerfCountersRef logger;
b32b8144 32 std::atomic<int64_t> count = { 0 }, max = { 0 };
11fdf7f2
TL
33 std::mutex lock;
34 std::list<std::condition_variable> conds;
7c673cae
FG
35 const bool use_perf;
36
37public:
38 Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
11fdf7f2 39 ~Throttle() override;
7c673cae
FG
40
41private:
42 void _reset_max(int64_t m);
43 bool _should_wait(int64_t c) const {
31f18b77
FG
44 int64_t m = max;
45 int64_t cur = count;
7c673cae
FG
46 return
47 m &&
48 ((c <= m && cur + c > m) || // normally stay under max
49 (c >= m && cur > m)); // except for large c
50 }
51
11fdf7f2 52 bool _wait(int64_t c, std::unique_lock<std::mutex>& l);
7c673cae
FG
53
54public:
55 /**
56 * gets the number of currently taken slots
57 * @returns the number of taken slots
58 */
59 int64_t get_current() const {
31f18b77 60 return count;
7c673cae
FG
61 }
62
63 /**
64 * get the max number of slots
65 * @returns the max number of slots
66 */
31f18b77 67 int64_t get_max() const { return max; }
7c673cae
FG
68
69 /**
70 * return true if past midpoint
71 */
72 bool past_midpoint() const {
31f18b77 73 return count >= max / 2;
7c673cae
FG
74 }
75
76 /**
77 * set the new max number, and wait until the number of taken slots drains
78 * and drops below this limit.
79 *
80 * @param m the new max number
81 * @returns true if this method is blocked, false it it returns immediately
82 */
83 bool wait(int64_t m = 0);
84
85 /**
86 * take the specified number of slots from the stock regardless the throttling
87 * @param c number of slots to take
88 * @returns the total number of taken slots
89 */
11fdf7f2 90 int64_t take(int64_t c = 1) override;
7c673cae
FG
91
92 /**
93 * get the specified amount of slots from the stock, but will wait if the
94 * total number taken by consumer would exceed the maximum number.
95 * @param c number of slots to get
96 * @param m new maximum number to set, ignored if it is 0
97 * @returns true if this request is blocked due to the throttling, false
98 * otherwise
99 */
100 bool get(int64_t c = 1, int64_t m = 0);
101
102 /**
103 * the unblocked version of @p get()
104 * @returns true if it successfully got the requested amount,
105 * or false if it would block.
106 */
107 bool get_or_fail(int64_t c = 1);
108
109 /**
110 * put slots back to the stock
111 * @param c number of slots to return
112 * @returns number of requests being hold after this
113 */
11fdf7f2 114 int64_t put(int64_t c = 1) override;
7c673cae
FG
115 /**
116 * reset the zero to the stock
117 */
118 void reset();
119
7c673cae 120 void reset_max(int64_t m) {
11fdf7f2 121 std::lock_guard l(lock);
7c673cae
FG
122 _reset_max(m);
123 }
124};
125
126/**
127 * BackoffThrottle
128 *
129 * Creates a throttle which gradually induces delays when get() is called
11fdf7f2 130 * based on params low_threshold, high_threshold, expected_throughput,
7c673cae
FG
131 * high_multiple, and max_multiple.
132 *
11fdf7f2 133 * In [0, low_threshold), we want no delay.
7c673cae 134 *
11fdf7f2
TL
135 * In [low_threshold, high_threshold), delays should be injected based
136 * on a line from 0 at low_threshold to
137 * high_multiple * (1/expected_throughput) at high_threshold.
7c673cae 138 *
11fdf7f2
TL
139 * In [high_threshold, 1), we want delays injected based on a line from
140 * (high_multiple * (1/expected_throughput)) at high_threshold to
7c673cae
FG
141 * (high_multiple * (1/expected_throughput)) +
142 * (max_multiple * (1/expected_throughput)) at 1.
143 *
11fdf7f2
TL
144 * Let the current throttle ratio (current/max) be r, low_threshold be l,
145 * high_threshold be h, high_delay (high_multiple / expected_throughput) be e,
146 * and max_delay (max_multiple / expected_throughput) be m.
7c673cae
FG
147 *
148 * delay = 0, r \in [0, l)
149 * delay = (r - l) * (e / (h - l)), r \in [l, h)
150 * delay = e + (r - h)((m - e)/(1 - h))
151 */
152class BackoffThrottle {
153 CephContext *cct;
154 const std::string name;
11fdf7f2 155 PerfCountersRef logger;
7c673cae
FG
156
157 std::mutex lock;
158 using locker = std::unique_lock<std::mutex>;
159
160 unsigned next_cond = 0;
161
162 /// allocated once to avoid constantly allocating new ones
9f95a23c 163 std::vector<std::condition_variable> conds;
7c673cae
FG
164
165 const bool use_perf;
166
167 /// pointers into conds
9f95a23c 168 std::list<std::condition_variable*> waiters;
7c673cae
FG
169
170 std::list<std::condition_variable*>::iterator _push_waiter() {
171 unsigned next = next_cond++;
172 if (next_cond == conds.size())
173 next_cond = 0;
174 return waiters.insert(waiters.end(), &(conds[next]));
175 }
176
177 void _kick_waiters() {
178 if (!waiters.empty())
179 waiters.front()->notify_all();
180 }
181
182 /// see above, values are in [0, 1].
11fdf7f2
TL
183 double low_threshold = 0;
184 double high_threshold = 1;
7c673cae
FG
185
186 /// see above, values are in seconds
187 double high_delay_per_count = 0;
188 double max_delay_per_count = 0;
189
190 /// Filled in in set_params
191 double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
192 double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
193
194 /// max
195 uint64_t max = 0;
196 uint64_t current = 0;
197
11fdf7f2 198 ceph::timespan _get_delay(uint64_t c) const;
7c673cae
FG
199
200public:
201 /**
202 * set_params
203 *
204 * Sets params. If the params are invalid, returns false
11fdf7f2 205 * and populates errstream (if non-null) with a user comprehensible
7c673cae
FG
206 * explanation.
207 */
208 bool set_params(
11fdf7f2
TL
209 double _low_threshold,
210 double _high_threshold,
7c673cae
FG
211 double expected_throughput,
212 double high_multiple,
213 double max_multiple,
214 uint64_t throttle_max,
9f95a23c 215 std::ostream *errstream);
7c673cae 216
11fdf7f2
TL
217 ceph::timespan get(uint64_t c = 1);
218 ceph::timespan wait() {
7c673cae
FG
219 return get(0);
220 }
221 uint64_t put(uint64_t c = 1);
222 uint64_t take(uint64_t c = 1);
223 uint64_t get_current();
224 uint64_t get_max();
225
226 BackoffThrottle(CephContext *cct, const std::string& n,
227 unsigned expected_concurrency, ///< [in] determines size of conds
228 bool _use_perf = true);
229 ~BackoffThrottle();
230};
231
232
233/**
234 * @class SimpleThrottle
235 * This is a simple way to bound the number of concurrent operations.
236 *
237 * It tracks the first error encountered, and makes it available
238 * when all requests are complete. wait_for_ret() should be called
239 * before the instance is destroyed.
240 *
241 * Re-using the same instance isn't safe if you want to check each set
242 * of operations for errors, since the return value is not reset.
243 */
244class SimpleThrottle {
245public:
246 SimpleThrottle(uint64_t max, bool ignore_enoent);
247 ~SimpleThrottle();
248 void start_op();
249 void end_op(int r);
250 bool pending_error() const;
251 int wait_for_ret();
252private:
11fdf7f2
TL
253 mutable std::mutex m_lock;
254 std::condition_variable m_cond;
7c673cae 255 uint64_t m_max;
11fdf7f2
TL
256 uint64_t m_current = 0;
257 int m_ret = 0;
7c673cae 258 bool m_ignore_enoent;
c07f9fc5 259 uint32_t waiters = 0;
7c673cae
FG
260};
261
262
263class OrderedThrottle;
264
265class C_OrderedThrottle : public Context {
266public:
267 C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
268 : m_ordered_throttle(ordered_throttle), m_tid(tid) {
269 }
270
271protected:
272 void finish(int r) override;
273
274private:
275 OrderedThrottle *m_ordered_throttle;
276 uint64_t m_tid;
277};
278
279/**
280 * @class OrderedThrottle
281 * Throttles the maximum number of active requests and completes them in order
282 *
283 * Operations can complete out-of-order but their associated Context callback
11fdf7f2 284 * will completed in-order during invocation of start_op() and wait_for_ret()
7c673cae
FG
285 */
286class OrderedThrottle {
287public:
288 OrderedThrottle(uint64_t max, bool ignore_enoent);
c07f9fc5 289 ~OrderedThrottle();
7c673cae
FG
290
291 C_OrderedThrottle *start_op(Context *on_finish);
292 void end_op(int r);
293
294 bool pending_error() const;
295 int wait_for_ret();
296
297protected:
298 friend class C_OrderedThrottle;
299
300 void finish_op(uint64_t tid, int r);
301
302private:
303 struct Result {
304 bool finished;
305 int ret_val;
306 Context *on_finish;
307
308 Result(Context *_on_finish = NULL)
309 : finished(false), ret_val(0), on_finish(_on_finish) {
310 }
311 };
312
313 typedef std::map<uint64_t, Result> TidResult;
314
11fdf7f2
TL
315 mutable std::mutex m_lock;
316 std::condition_variable m_cond;
7c673cae 317 uint64_t m_max;
11fdf7f2
TL
318 uint64_t m_current = 0;
319 int m_ret_val = 0;
7c673cae
FG
320 bool m_ignore_enoent;
321
11fdf7f2
TL
322 uint64_t m_next_tid = 0;
323 uint64_t m_complete_tid = 0;
7c673cae
FG
324
325 TidResult m_tid_result;
326
11fdf7f2 327 void complete_pending_ops(std::unique_lock<std::mutex>& l);
c07f9fc5 328 uint32_t waiters = 0;
7c673cae
FG
329};
330
11fdf7f2
TL
331
332class TokenBucketThrottle {
333 struct Bucket {
334 CephContext *cct;
335 const std::string name;
336
337 uint64_t remain;
338 uint64_t max;
f67539c2
TL
339 uint64_t capacity;
340 uint64_t available;
11fdf7f2
TL
341
342 Bucket(CephContext *cct, const std::string &name, uint64_t m)
f67539c2 343 : cct(cct), name(name), remain(m), max(m), capacity(m), available(m) {}
11fdf7f2
TL
344
345 uint64_t get(uint64_t c);
f67539c2
TL
346 uint64_t put(uint64_t tokens, double burst_ratio);
347 void set_max(uint64_t max, uint64_t burst_seconds);
11fdf7f2
TL
348 };
349
350 struct Blocker {
351 uint64_t tokens_requested;
352 Context *ctx;
353
354 Blocker(uint64_t _tokens_requested, Context* _ctx)
355 : tokens_requested(_tokens_requested), ctx(_ctx) {}
356 };
357
358 CephContext *m_cct;
359 const std::string m_name;
360 Bucket m_throttle;
11fdf7f2 361 uint64_t m_burst = 0;
f67539c2 362 uint64_t m_avg = 0;
11fdf7f2 363 SafeTimer *m_timer;
9f95a23c
TL
364 ceph::mutex *m_timer_lock;
365 Context *m_token_ctx = nullptr;
366 std::list<Blocker> m_blockers;
367 ceph::mutex m_lock;
11fdf7f2
TL
368
369 // minimum of the filling period.
370 uint64_t m_tick_min = 50;
371 // tokens filling period, its unit is millisecond.
372 uint64_t m_tick = 0;
373 /**
374 * These variables are used to calculate how many tokens need to be put into
375 * the bucket within each tick.
376 *
377 * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second)
378 * may be a floating point number, but we need an 'uint64_t' to put into the
379 * bucket.
380 *
381 * For example, we set the value of rate to be 950, means 950 iops(or bps).
382 *
383 * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052,
384 * which is too small for the SafeTimer. So we should set the period(m_tick)
385 * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second).
386 * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer.
387 *
388 * To resolve this, we use a method called tokens_filled(m_current_tick) to
389 * calculate how many tokens will be put so far(until m_current_tick):
390 *
391 * tokens_filled = m_current_tick / m_ticks_per_second * m_avg
392 *
393 * And the difference between two ticks will be the result we expect.
394 * tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) = 47 - 0 = 47
395 * tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) = 95 - 47 = 48
396 * tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 - 95 = 47
397 *
398 * As a result, the tokens filled in one second will shown as this:
399 * tick | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20|
400 * tokens |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|
401 */
402 uint64_t m_ticks_per_second = 0;
403 uint64_t m_current_tick = 0;
404
405 // period for the bucket filling tokens, its unit is seconds.
406 double m_schedule_tick = 1.0;
407
408public:
409 TokenBucketThrottle(CephContext *cct, const std::string &name,
f67539c2 410 uint64_t burst, uint64_t avg,
9f95a23c 411 SafeTimer *timer, ceph::mutex *timer_lock);
81eedcae 412
11fdf7f2
TL
413 ~TokenBucketThrottle();
414
415 const std::string &get_name() {
416 return m_name;
417 }
418
f67539c2
TL
419 template <typename T, typename MF, typename I>
420 void add_blocker(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) {
421 auto ctx = new LambdaContext(
422 [t, mf, item=std::forward<I>(item), flag](int) mutable {
423 (t->*mf)(std::forward<I>(item), flag);
11fdf7f2
TL
424 });
425 m_blockers.emplace_back(c, ctx);
426 }
81eedcae 427
f67539c2
TL
428 template <typename T, typename MF, typename I>
429 bool get(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) {
11fdf7f2
TL
430 bool wait = false;
431 uint64_t got = 0;
432 std::lock_guard lock(m_lock);
433 if (!m_blockers.empty()) {
434 // Keep the order of requests, add item after previous blocked requests.
435 wait = true;
436 } else {
437 if (0 == m_throttle.max || 0 == m_avg)
438 return false;
81eedcae 439
11fdf7f2
TL
440 got = m_throttle.get(c);
441 if (got < c) {
442 // Not enough tokens, add a blocker for it.
443 wait = true;
444 }
445 }
446
f67539c2
TL
447 if (wait) {
448 add_blocker(c - got, std::forward<T>(t), std::forward<MF>(mf),
449 std::forward<I>(item), flag);
450 }
11fdf7f2
TL
451
452 return wait;
453 }
81eedcae 454
f67539c2 455 int set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds);
11fdf7f2
TL
456 void set_schedule_tick_min(uint64_t tick);
457
458private:
459 uint64_t tokens_filled(double tick);
460 uint64_t tokens_this_tick();
461 void add_tokens();
462 void schedule_timer();
463 void cancel_timer();
464};
465
7c673cae 466#endif