1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_THROTTLE_H
5 #define CEPH_THROTTLE_H
13 #include "common/ceph_mutex.h"
14 #include "include/Context.h"
15 #include "common/ThrottleInterface.h"
16 #include "common/Timer.h"
17 #include "common/convenience.h"
18 #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
19 #include "crimson/common/perf_counters_collection.h"
21 #include "common/perf_counters_collection.h"
26 * Throttles the maximum number of active requests.
28 * This class defines the maximum number of slots currently taken away. The
29 * excessive requests for more of them are delayed, until some slots are put
30 * back, so @p get_current() drops below the limit after fulfills the requests.
32 class Throttle final
: public ThrottleInterface
{
34 const std::string name
;
35 PerfCountersRef logger
;
36 std::atomic
<int64_t> count
= { 0 }, max
= { 0 };
38 std::list
<std::condition_variable
> conds
;
42 Throttle(CephContext
*cct
, const std::string
& n
, int64_t m
= 0, bool _use_perf
= true);
46 void _reset_max(int64_t m
);
47 bool _should_wait(int64_t c
) const {
52 ((c
<= m
&& cur
+ c
> m
) || // normally stay under max
53 (c
>= m
&& cur
> m
)); // except for large c
56 bool _wait(int64_t c
, std::unique_lock
<std::mutex
>& l
);
60 * gets the number of currently taken slots
61 * @returns the number of taken slots
63 int64_t get_current() const {
68 * get the max number of slots
69 * @returns the max number of slots
71 int64_t get_max() const { return max
; }
74 * return true if past midpoint
76 bool past_midpoint() const {
77 return count
>= max
/ 2;
81 * set the new max number, and wait until the number of taken slots drains
82 * and drops below this limit.
84 * @param m the new max number
85 * @returns true if this method is blocked, false it it returns immediately
87 bool wait(int64_t m
= 0);
90 * take the specified number of slots from the stock regardless the throttling
91 * @param c number of slots to take
92 * @returns the total number of taken slots
94 int64_t take(int64_t c
= 1) override
;
97 * get the specified amount of slots from the stock, but will wait if the
98 * total number taken by consumer would exceed the maximum number.
99 * @param c number of slots to get
100 * @param m new maximum number to set, ignored if it is 0
101 * @returns true if this request is blocked due to the throttling, false
104 bool get(int64_t c
= 1, int64_t m
= 0);
107 * the unblocked version of @p get()
108 * @returns true if it successfully got the requested amount,
109 * or false if it would block.
111 bool get_or_fail(int64_t c
= 1);
114 * put slots back to the stock
115 * @param c number of slots to return
116 * @returns number of requests being hold after this
118 int64_t put(int64_t c
= 1) override
;
120 * reset the zero to the stock
124 void reset_max(int64_t m
) {
125 std::lock_guard
l(lock
);
133 * Creates a throttle which gradually induces delays when get() is called
134 * based on params low_threshold, high_threshold, expected_throughput,
135 * high_multiple, and max_multiple.
137 * In [0, low_threshold), we want no delay.
139 * In [low_threshold, high_threshold), delays should be injected based
140 * on a line from 0 at low_threshold to
141 * high_multiple * (1/expected_throughput) at high_threshold.
143 * In [high_threshold, 1), we want delays injected based on a line from
144 * (high_multiple * (1/expected_throughput)) at high_threshold to
145 * (high_multiple * (1/expected_throughput)) +
146 * (max_multiple * (1/expected_throughput)) at 1.
148 * Let the current throttle ratio (current/max) be r, low_threshold be l,
149 * high_threshold be h, high_delay (high_multiple / expected_throughput) be e,
150 * and max_delay (max_multiple / expected_throughput) be m.
152 * delay = 0, r \in [0, l)
153 * delay = (r - l) * (e / (h - l)), r \in [l, h)
154 * delay = e + (r - h)((m - e)/(1 - h))
156 class BackoffThrottle
{
157 const std::string name
;
158 PerfCountersRef logger
;
161 using locker
= std::unique_lock
<std::mutex
>;
163 unsigned next_cond
= 0;
165 /// allocated once to avoid constantly allocating new ones
166 std::vector
<std::condition_variable
> conds
;
170 /// pointers into conds
171 std::list
<std::condition_variable
*> waiters
;
173 std::list
<std::condition_variable
*>::iterator
_push_waiter() {
174 unsigned next
= next_cond
++;
175 if (next_cond
== conds
.size())
177 return waiters
.insert(waiters
.end(), &(conds
[next
]));
180 void _kick_waiters() {
181 if (!waiters
.empty())
182 waiters
.front()->notify_all();
185 /// see above, values are in [0, 1].
186 double low_threshold
= 0;
187 double high_threshold
= 1;
189 /// see above, values are in seconds
190 double high_delay_per_count
= 0;
191 double max_delay_per_count
= 0;
193 /// Filled in in set_params
194 double s0
= 0; ///< e / (h - l), l != h, 0 otherwise
195 double s1
= 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
199 uint64_t current
= 0;
201 ceph::timespan
_get_delay(uint64_t c
) const;
207 * Sets params. If the params are invalid, returns false
208 * and populates errstream (if non-null) with a user comprehensible
212 double _low_threshold
,
213 double _high_threshold
,
214 double expected_throughput
,
215 double high_multiple
,
217 uint64_t throttle_max
,
218 std::ostream
*errstream
);
220 ceph::timespan
get(uint64_t c
= 1);
221 ceph::timespan
wait() {
224 uint64_t put(uint64_t c
= 1);
225 uint64_t take(uint64_t c
= 1);
226 uint64_t get_current();
229 BackoffThrottle(CephContext
*cct
, const std::string
& n
,
230 unsigned expected_concurrency
, ///< [in] determines size of conds
231 bool _use_perf
= true);
237 * @class SimpleThrottle
238 * This is a simple way to bound the number of concurrent operations.
240 * It tracks the first error encountered, and makes it available
241 * when all requests are complete. wait_for_ret() should be called
242 * before the instance is destroyed.
244 * Re-using the same instance isn't safe if you want to check each set
245 * of operations for errors, since the return value is not reset.
247 class SimpleThrottle
{
249 SimpleThrottle(uint64_t max
, bool ignore_enoent
);
253 bool pending_error() const;
256 mutable std::mutex m_lock
;
257 std::condition_variable m_cond
;
259 uint64_t m_current
= 0;
261 bool m_ignore_enoent
;
262 uint32_t waiters
= 0;
266 class OrderedThrottle
;
268 class C_OrderedThrottle
: public Context
{
270 C_OrderedThrottle(OrderedThrottle
*ordered_throttle
, uint64_t tid
)
271 : m_ordered_throttle(ordered_throttle
), m_tid(tid
) {
275 void finish(int r
) override
;
278 OrderedThrottle
*m_ordered_throttle
;
283 * @class OrderedThrottle
284 * Throttles the maximum number of active requests and completes them in order
286 * Operations can complete out-of-order but their associated Context callback
287 * will completed in-order during invocation of start_op() and wait_for_ret()
289 class OrderedThrottle
{
291 OrderedThrottle(uint64_t max
, bool ignore_enoent
);
294 C_OrderedThrottle
*start_op(Context
*on_finish
);
297 bool pending_error() const;
301 friend class C_OrderedThrottle
;
303 void finish_op(uint64_t tid
, int r
);
311 Result(Context
*_on_finish
= NULL
)
312 : finished(false), ret_val(0), on_finish(_on_finish
) {
316 typedef std::map
<uint64_t, Result
> TidResult
;
318 mutable std::mutex m_lock
;
319 std::condition_variable m_cond
;
321 uint64_t m_current
= 0;
323 bool m_ignore_enoent
;
325 uint64_t m_next_tid
= 0;
326 uint64_t m_complete_tid
= 0;
328 TidResult m_tid_result
;
330 void complete_pending_ops(std::unique_lock
<std::mutex
>& l
);
331 uint32_t waiters
= 0;
335 class TokenBucketThrottle
{
338 const std::string name
;
345 Bucket(CephContext
*cct
, const std::string
&name
, uint64_t m
)
346 : cct(cct
), name(name
), remain(m
), max(m
), capacity(m
), available(m
) {}
348 uint64_t get(uint64_t c
);
349 uint64_t put(uint64_t tokens
, double burst_ratio
);
350 void set_max(uint64_t max
, uint64_t burst_seconds
);
354 uint64_t tokens_requested
;
357 Blocker(uint64_t _tokens_requested
, Context
* _ctx
)
358 : tokens_requested(_tokens_requested
), ctx(_ctx
) {}
362 const std::string m_name
;
364 uint64_t m_burst
= 0;
367 ceph::mutex
*m_timer_lock
;
368 Context
*m_token_ctx
= nullptr;
369 std::list
<Blocker
> m_blockers
;
372 // minimum of the filling period.
373 uint64_t m_tick_min
= 50;
374 // tokens filling period, its unit is millisecond.
377 * These variables are used to calculate how many tokens need to be put into
378 * the bucket within each tick.
380 * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second)
381 * may be a floating point number, but we need an 'uint64_t' to put into the
384 * For example, we set the value of rate to be 950, means 950 iops(or bps).
386 * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052,
387 * which is too small for the SafeTimer. So we should set the period(m_tick)
388 * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second).
389 * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer.
391 * To resolve this, we use a method called tokens_filled(m_current_tick) to
392 * calculate how many tokens will be put so far(until m_current_tick):
394 * tokens_filled = m_current_tick / m_ticks_per_second * m_avg
396 * And the difference between two ticks will be the result we expect.
397 * tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) = 47 - 0 = 47
398 * tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) = 95 - 47 = 48
399 * tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 - 95 = 47
401 * As a result, the tokens filled in one second will shown as this:
402 * tick | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20|
403 * tokens |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|
405 uint64_t m_ticks_per_second
= 0;
406 uint64_t m_current_tick
= 0;
408 // period for the bucket filling tokens, its unit is seconds.
409 double m_schedule_tick
= 1.0;
412 TokenBucketThrottle(CephContext
*cct
, const std::string
&name
,
413 uint64_t burst
, uint64_t avg
,
414 SafeTimer
*timer
, ceph::mutex
*timer_lock
);
416 ~TokenBucketThrottle();
418 const std::string
&get_name() {
422 template <typename T
, typename MF
, typename I
>
423 void add_blocker(uint64_t c
, T
&& t
, MF
&& mf
, I
&& item
, uint64_t flag
) {
424 auto ctx
= new LambdaContext(
425 [t
, mf
, item
=std::forward
<I
>(item
), flag
](int) mutable {
426 (t
->*mf
)(std::forward
<I
>(item
), flag
);
428 m_blockers
.emplace_back(c
, ctx
);
431 template <typename T
, typename MF
, typename I
>
432 bool get(uint64_t c
, T
&& t
, MF
&& mf
, I
&& item
, uint64_t flag
) {
435 std::lock_guard
lock(m_lock
);
436 if (!m_blockers
.empty()) {
437 // Keep the order of requests, add item after previous blocked requests.
440 if (0 == m_throttle
.max
|| 0 == m_avg
)
443 got
= m_throttle
.get(c
);
445 // Not enough tokens, add a blocker for it.
451 add_blocker(c
- got
, std::forward
<T
>(t
), std::forward
<MF
>(mf
),
452 std::forward
<I
>(item
), flag
);
458 int set_limit(uint64_t average
, uint64_t burst
, uint64_t burst_seconds
);
459 void set_schedule_tick_min(uint64_t tick
);
462 uint64_t tokens_filled(double tick
);
463 uint64_t tokens_this_tick();
465 void schedule_timer();