]>
git.proxmox.com Git - ceph.git/blob - ceph/src/common/Throttle.h
efc5ba037ae80e0b918641ff0702979118a684c6
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_THROTTLE_H
5 #define CEPH_THROTTLE_H
12 #include <condition_variable>
16 #include "include/Context.h"
23 * Throttles the maximum number of active requests.
25 * This class defines the maximum number of slots currently taken away. The
26 * excessive requests for more of them are delayed, until some slots are put
27 * back, so @p get_current() drops below the limit after fulfills the requests.
31 const std::string name
;
33 std::atomic
<unsigned> count
= { 0 }, max
= { 0 };
37 bool shutting_down
= false;
41 Throttle(CephContext
*cct
, const std::string
& n
, int64_t m
= 0, bool _use_perf
= true);
45 void _reset_max(int64_t m
);
46 bool _should_wait(int64_t c
) const {
51 ((c
<= m
&& cur
+ c
> m
) || // normally stay under max
52 (c
>= m
&& cur
> m
)); // except for large c
55 bool _wait(int64_t c
);
59 * gets the number of currently taken slots
60 * @returns the number of taken slots
62 int64_t get_current() const {
67 * get the max number of slots
68 * @returns the max number of slots
70 int64_t get_max() const { return max
; }
73 * return true if past midpoint
75 bool past_midpoint() const {
76 return count
>= max
/ 2;
80 * set the new max number, and wait until the number of taken slots drains
81 * and drops below this limit.
83 * @param m the new max number
84 * @returns true if this method is blocked, false it it returns immediately
86 bool wait(int64_t m
= 0);
89 * take the specified number of slots from the stock regardless the throttling
90 * @param c number of slots to take
91 * @returns the total number of taken slots
93 int64_t take(int64_t c
= 1);
96 * get the specified amount of slots from the stock, but will wait if the
97 * total number taken by consumer would exceed the maximum number.
98 * @param c number of slots to get
99 * @param m new maximum number to set, ignored if it is 0
100 * @returns true if this request is blocked due to the throttling, false
103 bool get(int64_t c
= 1, int64_t m
= 0);
106 * the unblocked version of @p get()
107 * @returns true if it successfully got the requested amount,
108 * or false if it would block.
110 bool get_or_fail(int64_t c
= 1);
113 * put slots back to the stock
114 * @param c number of slots to return
115 * @returns number of requests being hold after this
117 int64_t put(int64_t c
= 1);
119 * reset the zero to the stock
123 bool should_wait(int64_t c
) const {
124 return _should_wait(c
);
126 void reset_max(int64_t m
) {
127 Mutex::Locker
l(lock
);
135 * Creates a throttle which gradually induces delays when get() is called
136 * based on params low_threshhold, high_threshhold, expected_throughput,
137 * high_multiple, and max_multiple.
139 * In [0, low_threshhold), we want no delay.
141 * In [low_threshhold, high_threshhold), delays should be injected based
142 * on a line from 0 at low_threshhold to
143 * high_multiple * (1/expected_throughput) at high_threshhold.
145 * In [high_threshhold, 1), we want delays injected based on a line from
146 * (high_multiple * (1/expected_throughput)) at high_threshhold to
147 * (high_multiple * (1/expected_throughput)) +
148 * (max_multiple * (1/expected_throughput)) at 1.
150 * Let the current throttle ratio (current/max) be r, low_threshhold be l,
151 * high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
152 * and max_delay (max_muliple / expected_throughput) be m.
154 * delay = 0, r \in [0, l)
155 * delay = (r - l) * (e / (h - l)), r \in [l, h)
156 * delay = e + (r - h)((m - e)/(1 - h))
158 class BackoffThrottle
{
160 const std::string name
;
161 PerfCounters
*logger
;
164 using locker
= std::unique_lock
<std::mutex
>;
166 unsigned next_cond
= 0;
168 /// allocated once to avoid constantly allocating new ones
169 vector
<std::condition_variable
> conds
;
173 /// pointers into conds
174 list
<std::condition_variable
*> waiters
;
176 std::list
<std::condition_variable
*>::iterator
_push_waiter() {
177 unsigned next
= next_cond
++;
178 if (next_cond
== conds
.size())
180 return waiters
.insert(waiters
.end(), &(conds
[next
]));
183 void _kick_waiters() {
184 if (!waiters
.empty())
185 waiters
.front()->notify_all();
188 /// see above, values are in [0, 1].
189 double low_threshhold
= 0;
190 double high_threshhold
= 1;
192 /// see above, values are in seconds
193 double high_delay_per_count
= 0;
194 double max_delay_per_count
= 0;
196 /// Filled in in set_params
197 double s0
= 0; ///< e / (h - l), l != h, 0 otherwise
198 double s1
= 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
202 uint64_t current
= 0;
204 std::chrono::duration
<double> _get_delay(uint64_t c
) const;
210 * Sets params. If the params are invalid, returns false
211 * and populates errstream (if non-null) with a user compreshensible
215 double low_threshhold
,
216 double high_threshhold
,
217 double expected_throughput
,
218 double high_multiple
,
220 uint64_t throttle_max
,
223 std::chrono::duration
<double> get(uint64_t c
= 1);
224 std::chrono::duration
<double> wait() {
227 uint64_t put(uint64_t c
= 1);
228 uint64_t take(uint64_t c
= 1);
229 uint64_t get_current();
232 BackoffThrottle(CephContext
*cct
, const std::string
& n
,
233 unsigned expected_concurrency
, ///< [in] determines size of conds
234 bool _use_perf
= true);
240 * @class SimpleThrottle
241 * This is a simple way to bound the number of concurrent operations.
243 * It tracks the first error encountered, and makes it available
244 * when all requests are complete. wait_for_ret() should be called
245 * before the instance is destroyed.
247 * Re-using the same instance isn't safe if you want to check each set
248 * of operations for errors, since the return value is not reset.
250 class SimpleThrottle
{
252 SimpleThrottle(uint64_t max
, bool ignore_enoent
);
256 bool pending_error() const;
259 mutable Mutex m_lock
;
264 bool m_ignore_enoent
;
265 uint32_t waiters
= 0;
269 class OrderedThrottle
;
271 class C_OrderedThrottle
: public Context
{
273 C_OrderedThrottle(OrderedThrottle
*ordered_throttle
, uint64_t tid
)
274 : m_ordered_throttle(ordered_throttle
), m_tid(tid
) {
278 void finish(int r
) override
;
281 OrderedThrottle
*m_ordered_throttle
;
286 * @class OrderedThrottle
287 * Throttles the maximum number of active requests and completes them in order
289 * Operations can complete out-of-order but their associated Context callback
290 * will completed in-order during invokation of start_op() and wait_for_ret()
292 class OrderedThrottle
{
294 OrderedThrottle(uint64_t max
, bool ignore_enoent
);
297 C_OrderedThrottle
*start_op(Context
*on_finish
);
300 bool pending_error() const;
304 friend class C_OrderedThrottle
;
306 void finish_op(uint64_t tid
, int r
);
314 Result(Context
*_on_finish
= NULL
)
315 : finished(false), ret_val(0), on_finish(_on_finish
) {
319 typedef std::map
<uint64_t, Result
> TidResult
;
321 mutable Mutex m_lock
;
326 bool m_ignore_enoent
;
329 uint64_t m_complete_tid
;
331 TidResult m_tid_result
;
333 void complete_pending_ops();
334 uint32_t waiters
= 0;