]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef CEPH_THROTTLE_H | |
5 | #define CEPH_THROTTLE_H | |
6 | ||
31f18b77 | 7 | #include <atomic> |
11fdf7f2 | 8 | #include <chrono> |
7c673cae | 9 | #include <iostream> |
11fdf7f2 TL |
10 | #include <list> |
11 | #include <map> | |
31f18b77 | 12 | |
11fdf7f2 | 13 | #include "common/ceph_mutex.h" |
7c673cae | 14 | #include "include/Context.h" |
11fdf7f2 TL |
15 | #include "common/ThrottleInterface.h" |
16 | #include "common/Timer.h" | |
17 | #include "common/convenience.h" | |
20effc67 TL |
18 | #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN) |
19 | #include "crimson/common/perf_counters_collection.h" | |
20 | #else | |
11fdf7f2 | 21 | #include "common/perf_counters_collection.h" |
20effc67 | 22 | #endif |
7c673cae FG |
23 | |
24 | /** | |
25 | * @class Throttle | |
26 | * Throttles the maximum number of active requests. | |
27 | * | |
28 | * This class defines the maximum number of slots currently taken away. The | |
29 | * excessive requests for more of them are delayed, until some slots are put | |
30 | * back, so @p get_current() drops below the limit after fulfills the requests. | |
31 | */ | |
11fdf7f2 | 32 | class Throttle final : public ThrottleInterface { |
7c673cae FG |
33 | CephContext *cct; |
34 | const std::string name; | |
11fdf7f2 | 35 | PerfCountersRef logger; |
b32b8144 | 36 | std::atomic<int64_t> count = { 0 }, max = { 0 }; |
11fdf7f2 TL |
37 | std::mutex lock; |
38 | std::list<std::condition_variable> conds; | |
7c673cae FG |
39 | const bool use_perf; |
40 | ||
41 | public: | |
42 | Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true); | |
11fdf7f2 | 43 | ~Throttle() override; |
7c673cae FG |
44 | |
45 | private: | |
46 | void _reset_max(int64_t m); | |
47 | bool _should_wait(int64_t c) const { | |
31f18b77 FG |
48 | int64_t m = max; |
49 | int64_t cur = count; | |
7c673cae FG |
50 | return |
51 | m && | |
52 | ((c <= m && cur + c > m) || // normally stay under max | |
53 | (c >= m && cur > m)); // except for large c | |
54 | } | |
55 | ||
11fdf7f2 | 56 | bool _wait(int64_t c, std::unique_lock<std::mutex>& l); |
7c673cae FG |
57 | |
58 | public: | |
59 | /** | |
60 | * gets the number of currently taken slots | |
61 | * @returns the number of taken slots | |
62 | */ | |
63 | int64_t get_current() const { | |
31f18b77 | 64 | return count; |
7c673cae FG |
65 | } |
66 | ||
67 | /** | |
68 | * get the max number of slots | |
69 | * @returns the max number of slots | |
70 | */ | |
31f18b77 | 71 | int64_t get_max() const { return max; } |
7c673cae FG |
72 | |
73 | /** | |
74 | * return true if past midpoint | |
75 | */ | |
76 | bool past_midpoint() const { | |
31f18b77 | 77 | return count >= max / 2; |
7c673cae FG |
78 | } |
79 | ||
80 | /** | |
81 | * set the new max number, and wait until the number of taken slots drains | |
82 | * and drops below this limit. | |
83 | * | |
84 | * @param m the new max number | |
85 | * @returns true if this method is blocked, false it it returns immediately | |
86 | */ | |
87 | bool wait(int64_t m = 0); | |
88 | ||
89 | /** | |
90 | * take the specified number of slots from the stock regardless the throttling | |
91 | * @param c number of slots to take | |
92 | * @returns the total number of taken slots | |
93 | */ | |
11fdf7f2 | 94 | int64_t take(int64_t c = 1) override; |
7c673cae FG |
95 | |
96 | /** | |
97 | * get the specified amount of slots from the stock, but will wait if the | |
98 | * total number taken by consumer would exceed the maximum number. | |
99 | * @param c number of slots to get | |
100 | * @param m new maximum number to set, ignored if it is 0 | |
101 | * @returns true if this request is blocked due to the throttling, false | |
102 | * otherwise | |
103 | */ | |
104 | bool get(int64_t c = 1, int64_t m = 0); | |
105 | ||
106 | /** | |
107 | * the unblocked version of @p get() | |
108 | * @returns true if it successfully got the requested amount, | |
109 | * or false if it would block. | |
110 | */ | |
111 | bool get_or_fail(int64_t c = 1); | |
112 | ||
113 | /** | |
114 | * put slots back to the stock | |
115 | * @param c number of slots to return | |
116 | * @returns number of requests being hold after this | |
117 | */ | |
11fdf7f2 | 118 | int64_t put(int64_t c = 1) override; |
7c673cae FG |
119 | /** |
120 | * reset the zero to the stock | |
121 | */ | |
122 | void reset(); | |
123 | ||
7c673cae | 124 | void reset_max(int64_t m) { |
11fdf7f2 | 125 | std::lock_guard l(lock); |
7c673cae FG |
126 | _reset_max(m); |
127 | } | |
128 | }; | |
129 | ||
130 | /** | |
131 | * BackoffThrottle | |
132 | * | |
133 | * Creates a throttle which gradually induces delays when get() is called | |
11fdf7f2 | 134 | * based on params low_threshold, high_threshold, expected_throughput, |
7c673cae FG |
135 | * high_multiple, and max_multiple. |
136 | * | |
11fdf7f2 | 137 | * In [0, low_threshold), we want no delay. |
7c673cae | 138 | * |
11fdf7f2 TL |
139 | * In [low_threshold, high_threshold), delays should be injected based |
140 | * on a line from 0 at low_threshold to | |
141 | * high_multiple * (1/expected_throughput) at high_threshold. | |
7c673cae | 142 | * |
11fdf7f2 TL |
143 | * In [high_threshold, 1), we want delays injected based on a line from |
144 | * (high_multiple * (1/expected_throughput)) at high_threshold to | |
7c673cae FG |
145 | * (high_multiple * (1/expected_throughput)) + |
146 | * (max_multiple * (1/expected_throughput)) at 1. | |
147 | * | |
11fdf7f2 TL |
148 | * Let the current throttle ratio (current/max) be r, low_threshold be l, |
149 | * high_threshold be h, high_delay (high_multiple / expected_throughput) be e, | |
150 | * and max_delay (max_multiple / expected_throughput) be m. | |
7c673cae FG |
151 | * |
152 | * delay = 0, r \in [0, l) | |
153 | * delay = (r - l) * (e / (h - l)), r \in [l, h) | |
154 | * delay = e + (r - h)((m - e)/(1 - h)) | |
155 | */ | |
156 | class BackoffThrottle { | |
7c673cae | 157 | const std::string name; |
11fdf7f2 | 158 | PerfCountersRef logger; |
7c673cae FG |
159 | |
160 | std::mutex lock; | |
161 | using locker = std::unique_lock<std::mutex>; | |
162 | ||
163 | unsigned next_cond = 0; | |
164 | ||
165 | /// allocated once to avoid constantly allocating new ones | |
9f95a23c | 166 | std::vector<std::condition_variable> conds; |
7c673cae FG |
167 | |
168 | const bool use_perf; | |
169 | ||
170 | /// pointers into conds | |
9f95a23c | 171 | std::list<std::condition_variable*> waiters; |
7c673cae FG |
172 | |
173 | std::list<std::condition_variable*>::iterator _push_waiter() { | |
174 | unsigned next = next_cond++; | |
175 | if (next_cond == conds.size()) | |
176 | next_cond = 0; | |
177 | return waiters.insert(waiters.end(), &(conds[next])); | |
178 | } | |
179 | ||
180 | void _kick_waiters() { | |
181 | if (!waiters.empty()) | |
182 | waiters.front()->notify_all(); | |
183 | } | |
184 | ||
185 | /// see above, values are in [0, 1]. | |
11fdf7f2 TL |
186 | double low_threshold = 0; |
187 | double high_threshold = 1; | |
7c673cae FG |
188 | |
189 | /// see above, values are in seconds | |
190 | double high_delay_per_count = 0; | |
191 | double max_delay_per_count = 0; | |
192 | ||
193 | /// Filled in in set_params | |
194 | double s0 = 0; ///< e / (h - l), l != h, 0 otherwise | |
195 | double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise | |
196 | ||
197 | /// max | |
198 | uint64_t max = 0; | |
199 | uint64_t current = 0; | |
200 | ||
11fdf7f2 | 201 | ceph::timespan _get_delay(uint64_t c) const; |
7c673cae FG |
202 | |
203 | public: | |
204 | /** | |
205 | * set_params | |
206 | * | |
207 | * Sets params. If the params are invalid, returns false | |
11fdf7f2 | 208 | * and populates errstream (if non-null) with a user comprehensible |
7c673cae FG |
209 | * explanation. |
210 | */ | |
211 | bool set_params( | |
11fdf7f2 TL |
212 | double _low_threshold, |
213 | double _high_threshold, | |
7c673cae FG |
214 | double expected_throughput, |
215 | double high_multiple, | |
216 | double max_multiple, | |
217 | uint64_t throttle_max, | |
9f95a23c | 218 | std::ostream *errstream); |
7c673cae | 219 | |
11fdf7f2 TL |
220 | ceph::timespan get(uint64_t c = 1); |
221 | ceph::timespan wait() { | |
7c673cae FG |
222 | return get(0); |
223 | } | |
224 | uint64_t put(uint64_t c = 1); | |
225 | uint64_t take(uint64_t c = 1); | |
226 | uint64_t get_current(); | |
227 | uint64_t get_max(); | |
228 | ||
229 | BackoffThrottle(CephContext *cct, const std::string& n, | |
230 | unsigned expected_concurrency, ///< [in] determines size of conds | |
231 | bool _use_perf = true); | |
232 | ~BackoffThrottle(); | |
233 | }; | |
234 | ||
235 | ||
236 | /** | |
237 | * @class SimpleThrottle | |
238 | * This is a simple way to bound the number of concurrent operations. | |
239 | * | |
240 | * It tracks the first error encountered, and makes it available | |
241 | * when all requests are complete. wait_for_ret() should be called | |
242 | * before the instance is destroyed. | |
243 | * | |
244 | * Re-using the same instance isn't safe if you want to check each set | |
245 | * of operations for errors, since the return value is not reset. | |
246 | */ | |
247 | class SimpleThrottle { | |
248 | public: | |
249 | SimpleThrottle(uint64_t max, bool ignore_enoent); | |
250 | ~SimpleThrottle(); | |
251 | void start_op(); | |
252 | void end_op(int r); | |
253 | bool pending_error() const; | |
254 | int wait_for_ret(); | |
255 | private: | |
11fdf7f2 TL |
256 | mutable std::mutex m_lock; |
257 | std::condition_variable m_cond; | |
7c673cae | 258 | uint64_t m_max; |
11fdf7f2 TL |
259 | uint64_t m_current = 0; |
260 | int m_ret = 0; | |
7c673cae | 261 | bool m_ignore_enoent; |
c07f9fc5 | 262 | uint32_t waiters = 0; |
7c673cae FG |
263 | }; |
264 | ||
265 | ||
266 | class OrderedThrottle; | |
267 | ||
268 | class C_OrderedThrottle : public Context { | |
269 | public: | |
270 | C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid) | |
271 | : m_ordered_throttle(ordered_throttle), m_tid(tid) { | |
272 | } | |
273 | ||
274 | protected: | |
275 | void finish(int r) override; | |
276 | ||
277 | private: | |
278 | OrderedThrottle *m_ordered_throttle; | |
279 | uint64_t m_tid; | |
280 | }; | |
281 | ||
282 | /** | |
283 | * @class OrderedThrottle | |
284 | * Throttles the maximum number of active requests and completes them in order | |
285 | * | |
286 | * Operations can complete out-of-order but their associated Context callback | |
11fdf7f2 | 287 | * will completed in-order during invocation of start_op() and wait_for_ret() |
7c673cae FG |
288 | */ |
289 | class OrderedThrottle { | |
290 | public: | |
291 | OrderedThrottle(uint64_t max, bool ignore_enoent); | |
c07f9fc5 | 292 | ~OrderedThrottle(); |
7c673cae FG |
293 | |
294 | C_OrderedThrottle *start_op(Context *on_finish); | |
295 | void end_op(int r); | |
296 | ||
297 | bool pending_error() const; | |
298 | int wait_for_ret(); | |
299 | ||
300 | protected: | |
301 | friend class C_OrderedThrottle; | |
302 | ||
303 | void finish_op(uint64_t tid, int r); | |
304 | ||
305 | private: | |
306 | struct Result { | |
307 | bool finished; | |
308 | int ret_val; | |
309 | Context *on_finish; | |
310 | ||
311 | Result(Context *_on_finish = NULL) | |
312 | : finished(false), ret_val(0), on_finish(_on_finish) { | |
313 | } | |
314 | }; | |
315 | ||
316 | typedef std::map<uint64_t, Result> TidResult; | |
317 | ||
11fdf7f2 TL |
318 | mutable std::mutex m_lock; |
319 | std::condition_variable m_cond; | |
7c673cae | 320 | uint64_t m_max; |
11fdf7f2 TL |
321 | uint64_t m_current = 0; |
322 | int m_ret_val = 0; | |
7c673cae FG |
323 | bool m_ignore_enoent; |
324 | ||
11fdf7f2 TL |
325 | uint64_t m_next_tid = 0; |
326 | uint64_t m_complete_tid = 0; | |
7c673cae FG |
327 | |
328 | TidResult m_tid_result; | |
329 | ||
11fdf7f2 | 330 | void complete_pending_ops(std::unique_lock<std::mutex>& l); |
c07f9fc5 | 331 | uint32_t waiters = 0; |
7c673cae FG |
332 | }; |
333 | ||
11fdf7f2 TL |
334 | |
335 | class TokenBucketThrottle { | |
336 | struct Bucket { | |
337 | CephContext *cct; | |
338 | const std::string name; | |
339 | ||
340 | uint64_t remain; | |
341 | uint64_t max; | |
f67539c2 TL |
342 | uint64_t capacity; |
343 | uint64_t available; | |
11fdf7f2 TL |
344 | |
345 | Bucket(CephContext *cct, const std::string &name, uint64_t m) | |
f67539c2 | 346 | : cct(cct), name(name), remain(m), max(m), capacity(m), available(m) {} |
11fdf7f2 TL |
347 | |
348 | uint64_t get(uint64_t c); | |
f67539c2 TL |
349 | uint64_t put(uint64_t tokens, double burst_ratio); |
350 | void set_max(uint64_t max, uint64_t burst_seconds); | |
11fdf7f2 TL |
351 | }; |
352 | ||
353 | struct Blocker { | |
354 | uint64_t tokens_requested; | |
355 | Context *ctx; | |
356 | ||
357 | Blocker(uint64_t _tokens_requested, Context* _ctx) | |
358 | : tokens_requested(_tokens_requested), ctx(_ctx) {} | |
359 | }; | |
360 | ||
361 | CephContext *m_cct; | |
362 | const std::string m_name; | |
363 | Bucket m_throttle; | |
11fdf7f2 | 364 | uint64_t m_burst = 0; |
f67539c2 | 365 | uint64_t m_avg = 0; |
11fdf7f2 | 366 | SafeTimer *m_timer; |
9f95a23c TL |
367 | ceph::mutex *m_timer_lock; |
368 | Context *m_token_ctx = nullptr; | |
369 | std::list<Blocker> m_blockers; | |
370 | ceph::mutex m_lock; | |
11fdf7f2 TL |
371 | |
372 | // minimum of the filling period. | |
373 | uint64_t m_tick_min = 50; | |
374 | // tokens filling period, its unit is millisecond. | |
375 | uint64_t m_tick = 0; | |
376 | /** | |
377 | * These variables are used to calculate how many tokens need to be put into | |
378 | * the bucket within each tick. | |
379 | * | |
380 | * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second) | |
381 | * may be a floating point number, but we need an 'uint64_t' to put into the | |
382 | * bucket. | |
383 | * | |
384 | * For example, we set the value of rate to be 950, means 950 iops(or bps). | |
385 | * | |
386 | * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052, | |
387 | * which is too small for the SafeTimer. So we should set the period(m_tick) | |
388 | * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second). | |
389 | * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer. | |
390 | * | |
391 | * To resolve this, we use a method called tokens_filled(m_current_tick) to | |
392 | * calculate how many tokens will be put so far(until m_current_tick): | |
393 | * | |
394 | * tokens_filled = m_current_tick / m_ticks_per_second * m_avg | |
395 | * | |
396 | * And the difference between two ticks will be the result we expect. | |
397 | * tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) = 47 - 0 = 47 | |
398 | * tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) = 95 - 47 = 48 | |
399 | * tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 - 95 = 47 | |
400 | * | |
401 | * As a result, the tokens filled in one second will shown as this: | |
402 | * tick | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20| | |
403 | * tokens |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48| | |
404 | */ | |
405 | uint64_t m_ticks_per_second = 0; | |
406 | uint64_t m_current_tick = 0; | |
407 | ||
408 | // period for the bucket filling tokens, its unit is seconds. | |
409 | double m_schedule_tick = 1.0; | |
410 | ||
411 | public: | |
412 | TokenBucketThrottle(CephContext *cct, const std::string &name, | |
f67539c2 | 413 | uint64_t burst, uint64_t avg, |
9f95a23c | 414 | SafeTimer *timer, ceph::mutex *timer_lock); |
81eedcae | 415 | |
11fdf7f2 TL |
416 | ~TokenBucketThrottle(); |
417 | ||
418 | const std::string &get_name() { | |
419 | return m_name; | |
420 | } | |
421 | ||
f67539c2 TL |
422 | template <typename T, typename MF, typename I> |
423 | void add_blocker(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) { | |
424 | auto ctx = new LambdaContext( | |
425 | [t, mf, item=std::forward<I>(item), flag](int) mutable { | |
426 | (t->*mf)(std::forward<I>(item), flag); | |
11fdf7f2 TL |
427 | }); |
428 | m_blockers.emplace_back(c, ctx); | |
429 | } | |
81eedcae | 430 | |
f67539c2 TL |
431 | template <typename T, typename MF, typename I> |
432 | bool get(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) { | |
11fdf7f2 TL |
433 | bool wait = false; |
434 | uint64_t got = 0; | |
435 | std::lock_guard lock(m_lock); | |
436 | if (!m_blockers.empty()) { | |
437 | // Keep the order of requests, add item after previous blocked requests. | |
438 | wait = true; | |
439 | } else { | |
440 | if (0 == m_throttle.max || 0 == m_avg) | |
441 | return false; | |
81eedcae | 442 | |
11fdf7f2 TL |
443 | got = m_throttle.get(c); |
444 | if (got < c) { | |
445 | // Not enough tokens, add a blocker for it. | |
446 | wait = true; | |
447 | } | |
448 | } | |
449 | ||
f67539c2 TL |
450 | if (wait) { |
451 | add_blocker(c - got, std::forward<T>(t), std::forward<MF>(mf), | |
452 | std::forward<I>(item), flag); | |
453 | } | |
11fdf7f2 TL |
454 | |
455 | return wait; | |
456 | } | |
81eedcae | 457 | |
f67539c2 | 458 | int set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds); |
11fdf7f2 TL |
459 | void set_schedule_tick_min(uint64_t tick); |
460 | ||
461 | private: | |
462 | uint64_t tokens_filled(double tick); | |
463 | uint64_t tokens_this_tick(); | |
464 | void add_tokens(); | |
465 | void schedule_timer(); | |
466 | void cancel_timer(); | |
467 | }; | |
468 | ||
7c673cae | 469 | #endif |