]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Throttle.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / Throttle.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
c07f9fc5
FG
4#include "include/scope_guard.h"
5
7c673cae 6#include "common/Throttle.h"
11fdf7f2 7#include "common/ceph_time.h"
7c673cae
FG
8#include "common/perf_counters.h"
9
11fdf7f2 10
31f18b77 11// re-include our assert to clobber the system one; fix dout:
11fdf7f2 12#include "include/ceph_assert.h"
31f18b77 13
7c673cae
FG
14#define dout_subsys ceph_subsys_throttle
15
16#undef dout_prefix
17#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
18
f67539c2
TL
19using std::list;
20using std::ostream;
21using std::string;
22
11fdf7f2
TL
23using ceph::mono_clock;
24using ceph::mono_time;
f67539c2 25using ceph::timespan;
11fdf7f2 26
7c673cae
FG
27enum {
28 l_throttle_first = 532430,
29 l_throttle_val,
30 l_throttle_max,
31 l_throttle_get_started,
32 l_throttle_get,
33 l_throttle_get_sum,
34 l_throttle_get_or_fail_fail,
35 l_throttle_get_or_fail_success,
36 l_throttle_take,
37 l_throttle_take_sum,
38 l_throttle_put,
39 l_throttle_put_sum,
40 l_throttle_wait,
41 l_throttle_last,
42};
43
11fdf7f2
TL
44Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m,
45 bool _use_perf)
46 : cct(cct), name(n), max(m),
7c673cae
FG
47 use_perf(_use_perf)
48{
11fdf7f2 49 ceph_assert(m >= 0);
7c673cae
FG
50
51 if (!use_perf)
52 return;
53
54 if (cct->_conf->throttler_perf_counter) {
55 PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last);
56 b.add_u64(l_throttle_val, "val", "Currently available throttle");
57 b.add_u64(l_throttle_max, "max", "Max value for throttle");
58 b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait");
59 b.add_u64_counter(l_throttle_get, "get", "Gets");
60 b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data");
61 b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail");
62 b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail");
63 b.add_u64_counter(l_throttle_take, "take", "Takes");
64 b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data");
65 b.add_u64_counter(l_throttle_put, "put", "Puts");
66 b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
67 b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
68
11fdf7f2
TL
69 logger = { b.create_perf_counters(), cct };
70 cct->get_perfcounters_collection()->add(logger.get());
31f18b77 71 logger->set(l_throttle_max, max);
7c673cae
FG
72 }
73}
74
75Throttle::~Throttle()
76{
11fdf7f2
TL
77 std::lock_guard l(lock);
78 ceph_assert(conds.empty());
7c673cae
FG
79}
80
81void Throttle::_reset_max(int64_t m)
82{
b32b8144 83 // lock must be held.
b32b8144 84 if (max == m)
7c673cae 85 return;
11fdf7f2
TL
86 if (!conds.empty())
87 conds.front().notify_one();
7c673cae
FG
88 if (logger)
89 logger->set(l_throttle_max, m);
31f18b77 90 max = m;
7c673cae
FG
91}
92
11fdf7f2 93bool Throttle::_wait(int64_t c, std::unique_lock<std::mutex>& l)
7c673cae 94{
11fdf7f2 95 mono_time start;
7c673cae 96 bool waited = false;
11fdf7f2 97 if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters.
c07f9fc5 98 {
11fdf7f2 99 auto cv = conds.emplace(conds.end());
c07f9fc5 100 auto w = make_scope_guard([this, cv]() {
11fdf7f2 101 conds.erase(cv);
c07f9fc5
FG
102 });
103 waited = true;
104 ldout(cct, 2) << "_wait waiting..." << dendl;
105 if (logger)
11fdf7f2 106 start = mono_clock::now();
c07f9fc5 107
11fdf7f2
TL
108 cv->wait(l, [this, c, cv]() { return (!_should_wait(c) &&
109 cv == conds.begin()); });
c07f9fc5
FG
110 ldout(cct, 2) << "_wait finished waiting" << dendl;
111 if (logger) {
11fdf7f2 112 logger->tinc(l_throttle_wait, mono_clock::now() - start);
c07f9fc5 113 }
7c673cae 114 }
7c673cae 115 // wake up the next guy
11fdf7f2
TL
116 if (!conds.empty())
117 conds.front().notify_one();
7c673cae
FG
118 }
119 return waited;
120}
121
122bool Throttle::wait(int64_t m)
123{
31f18b77 124 if (0 == max && 0 == m) {
7c673cae
FG
125 return false;
126 }
127
11fdf7f2 128 std::unique_lock l(lock);
7c673cae 129 if (m) {
11fdf7f2 130 ceph_assert(m > 0);
7c673cae
FG
131 _reset_max(m);
132 }
133 ldout(cct, 10) << "wait" << dendl;
11fdf7f2 134 return _wait(0, l);
7c673cae
FG
135}
136
137int64_t Throttle::take(int64_t c)
138{
31f18b77 139 if (0 == max) {
7c673cae
FG
140 return 0;
141 }
11fdf7f2 142 ceph_assert(c >= 0);
7c673cae 143 ldout(cct, 10) << "take " << c << dendl;
f67539c2 144 count += c;
7c673cae
FG
145 if (logger) {
146 logger->inc(l_throttle_take);
147 logger->inc(l_throttle_take_sum, c);
31f18b77 148 logger->set(l_throttle_val, count);
7c673cae 149 }
31f18b77 150 return count;
7c673cae
FG
151}
152
153bool Throttle::get(int64_t c, int64_t m)
154{
31f18b77 155 if (0 == max && 0 == m) {
9f95a23c 156 count += c;
7c673cae
FG
157 return false;
158 }
159
11fdf7f2 160 ceph_assert(c >= 0);
31f18b77 161 ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
7c673cae
FG
162 if (logger) {
163 logger->inc(l_throttle_get_started);
164 }
165 bool waited = false;
166 {
11fdf7f2 167 std::unique_lock l(lock);
7c673cae 168 if (m) {
11fdf7f2 169 ceph_assert(m > 0);
7c673cae
FG
170 _reset_max(m);
171 }
11fdf7f2 172 waited = _wait(c, l);
31f18b77 173 count += c;
7c673cae
FG
174 }
175 if (logger) {
176 logger->inc(l_throttle_get);
177 logger->inc(l_throttle_get_sum, c);
31f18b77 178 logger->set(l_throttle_val, count);
7c673cae
FG
179 }
180 return waited;
181}
182
183/* Returns true if it successfully got the requested amount,
184 * or false if it would block.
185 */
186bool Throttle::get_or_fail(int64_t c)
187{
31f18b77 188 if (0 == max) {
9f95a23c 189 count += c;
7c673cae
FG
190 return true;
191 }
192
193 assert (c >= 0);
f67539c2
TL
194 bool result = false;
195 {
196 std::lock_guard l(lock);
197 if (_should_wait(c) || !conds.empty()) {
198 ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
199 result = false;
200 } else {
201 ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
202 << " -> " << (count.load() + c) << ")" << dendl;
203 count += c;
204 result = true;
7c673cae 205 }
f67539c2
TL
206 }
207
208 if (logger) {
209 if (result) {
7c673cae
FG
210 logger->inc(l_throttle_get_or_fail_success);
211 logger->inc(l_throttle_get);
212 logger->inc(l_throttle_get_sum, c);
31f18b77 213 logger->set(l_throttle_val, count);
f67539c2
TL
214 } else {
215 logger->inc(l_throttle_get_or_fail_fail);
7c673cae 216 }
7c673cae 217 }
f67539c2 218 return result;
7c673cae
FG
219}
220
221int64_t Throttle::put(int64_t c)
222{
31f18b77 223 if (0 == max) {
9f95a23c 224 count -= c;
7c673cae
FG
225 return 0;
226 }
227
11fdf7f2
TL
228 ceph_assert(c >= 0);
229 ldout(cct, 10) << "put " << c << " (" << count.load() << " -> "
230 << (count.load()-c) << ")" << dendl;
f67539c2
TL
231 int64_t new_count;
232 {
233 std::lock_guard l(lock);
234 new_count = count;
235 if (c) {
236 if (!conds.empty())
237 conds.front().notify_one();
238 // if count goes negative, we failed somewhere!
239 ceph_assert(count >= c);
240 new_count = count -= c;
7c673cae
FG
241 }
242 }
f67539c2
TL
243 if (logger) {
244 logger->inc(l_throttle_put);
245 logger->inc(l_throttle_put_sum, c);
246 logger->set(l_throttle_val, count);
247 }
248
249 return new_count;
7c673cae
FG
250}
251
252void Throttle::reset()
253{
11fdf7f2
TL
254 std::lock_guard l(lock);
255 if (!conds.empty())
256 conds.front().notify_one();
31f18b77 257 count = 0;
7c673cae
FG
258 if (logger) {
259 logger->set(l_throttle_val, 0);
260 }
261}
262
263enum {
264 l_backoff_throttle_first = l_throttle_last + 1,
265 l_backoff_throttle_val,
266 l_backoff_throttle_max,
267 l_backoff_throttle_get,
268 l_backoff_throttle_get_sum,
269 l_backoff_throttle_take,
270 l_backoff_throttle_take_sum,
271 l_backoff_throttle_put,
272 l_backoff_throttle_put_sum,
273 l_backoff_throttle_wait,
274 l_backoff_throttle_last,
275};
276
11fdf7f2
TL
277BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n,
278 unsigned expected_concurrency, bool _use_perf)
279 : cct(cct), name(n),
7c673cae
FG
280 conds(expected_concurrency),///< [in] determines size of conds
281 use_perf(_use_perf)
282{
283 if (!use_perf)
284 return;
285
286 if (cct->_conf->throttler_perf_counter) {
11fdf7f2
TL
287 PerfCountersBuilder b(cct, string("throttle-") + name,
288 l_backoff_throttle_first, l_backoff_throttle_last);
7c673cae
FG
289 b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
290 b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
291 b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
292 b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data");
293 b.add_u64_counter(l_backoff_throttle_take, "take", "Takes");
294 b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data");
295 b.add_u64_counter(l_backoff_throttle_put, "put", "Puts");
296 b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
297 b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
298
11fdf7f2
TL
299 logger = { b.create_perf_counters(), cct };
300 cct->get_perfcounters_collection()->add(logger.get());
7c673cae
FG
301 logger->set(l_backoff_throttle_max, max);
302 }
303}
304
305BackoffThrottle::~BackoffThrottle()
306{
11fdf7f2
TL
307 std::lock_guard l(lock);
308 ceph_assert(waiters.empty());
7c673cae
FG
309}
310
311bool BackoffThrottle::set_params(
11fdf7f2
TL
312 double _low_threshold,
313 double _high_threshold,
7c673cae
FG
314 double _expected_throughput,
315 double _high_multiple,
316 double _max_multiple,
317 uint64_t _throttle_max,
318 ostream *errstream)
319{
320 bool valid = true;
11fdf7f2 321 if (_low_threshold > _high_threshold) {
7c673cae
FG
322 valid = false;
323 if (errstream) {
11fdf7f2
TL
324 *errstream << "low_threshold (" << _low_threshold
325 << ") > high_threshold (" << _high_threshold
7c673cae
FG
326 << ")" << std::endl;
327 }
328 }
329
330 if (_high_multiple > _max_multiple) {
331 valid = false;
332 if (errstream) {
333 *errstream << "_high_multiple (" << _high_multiple
334 << ") > _max_multiple (" << _max_multiple
335 << ")" << std::endl;
336 }
337 }
338
11fdf7f2 339 if (_low_threshold > 1 || _low_threshold < 0) {
7c673cae
FG
340 valid = false;
341 if (errstream) {
11fdf7f2 342 *errstream << "invalid low_threshold (" << _low_threshold << ")"
7c673cae
FG
343 << std::endl;
344 }
345 }
346
11fdf7f2 347 if (_high_threshold > 1 || _high_threshold < 0) {
7c673cae
FG
348 valid = false;
349 if (errstream) {
11fdf7f2 350 *errstream << "invalid high_threshold (" << _high_threshold << ")"
7c673cae
FG
351 << std::endl;
352 }
353 }
354
355 if (_max_multiple < 0) {
356 valid = false;
357 if (errstream) {
358 *errstream << "invalid _max_multiple ("
359 << _max_multiple << ")"
360 << std::endl;
361 }
362 }
363
364 if (_high_multiple < 0) {
365 valid = false;
366 if (errstream) {
367 *errstream << "invalid _high_multiple ("
368 << _high_multiple << ")"
369 << std::endl;
370 }
371 }
372
373 if (_expected_throughput < 0) {
374 valid = false;
375 if (errstream) {
376 *errstream << "invalid _expected_throughput("
377 << _expected_throughput << ")"
378 << std::endl;
379 }
380 }
381
382 if (!valid)
383 return false;
384
385 locker l(lock);
11fdf7f2
TL
386 low_threshold = _low_threshold;
387 high_threshold = _high_threshold;
7c673cae
FG
388 high_delay_per_count = _high_multiple / _expected_throughput;
389 max_delay_per_count = _max_multiple / _expected_throughput;
390 max = _throttle_max;
391
392 if (logger)
393 logger->set(l_backoff_throttle_max, max);
394
11fdf7f2
TL
395 if (high_threshold - low_threshold > 0) {
396 s0 = high_delay_per_count / (high_threshold - low_threshold);
7c673cae 397 } else {
11fdf7f2 398 low_threshold = high_threshold;
7c673cae
FG
399 s0 = 0;
400 }
401
11fdf7f2 402 if (1 - high_threshold > 0) {
7c673cae 403 s1 = (max_delay_per_count - high_delay_per_count)
11fdf7f2 404 / (1 - high_threshold);
7c673cae 405 } else {
11fdf7f2 406 high_threshold = 1;
7c673cae
FG
407 s1 = 0;
408 }
409
410 _kick_waiters();
411 return true;
412}
413
11fdf7f2 414ceph::timespan BackoffThrottle::_get_delay(uint64_t c) const
7c673cae
FG
415{
416 if (max == 0)
11fdf7f2 417 return ceph::timespan(0);
7c673cae
FG
418
419 double r = ((double)current) / ((double)max);
11fdf7f2
TL
420 if (r < low_threshold) {
421 return ceph::timespan(0);
422 } else if (r < high_threshold) {
423 return c * ceph::make_timespan(
424 (r - low_threshold) * s0);
7c673cae 425 } else {
11fdf7f2
TL
426 return c * ceph::make_timespan(
427 high_delay_per_count + ((r - high_threshold) * s1));
7c673cae
FG
428 }
429}
430
11fdf7f2 431ceph::timespan BackoffThrottle::get(uint64_t c)
7c673cae
FG
432{
433 locker l(lock);
434 auto delay = _get_delay(c);
435
436 if (logger) {
437 logger->inc(l_backoff_throttle_get);
438 logger->inc(l_backoff_throttle_get_sum, c);
439 }
440
441 // fast path
11fdf7f2 442 if (delay.count() == 0 &&
7c673cae
FG
443 waiters.empty() &&
444 ((max == 0) || (current == 0) || ((current + c) <= max))) {
445 current += c;
446
447 if (logger) {
448 logger->set(l_backoff_throttle_val, current);
449 }
450
11fdf7f2 451 return ceph::make_timespan(0);
7c673cae
FG
452 }
453
454 auto ticket = _push_waiter();
11fdf7f2 455 auto wait_from = mono_clock::now();
7c673cae
FG
456 bool waited = false;
457
458 while (waiters.begin() != ticket) {
459 (*ticket)->wait(l);
460 waited = true;
461 }
462
11fdf7f2 463 auto start = mono_clock::now();
7c673cae
FG
464 delay = _get_delay(c);
465 while (true) {
11fdf7f2 466 if (max != 0 && current != 0 && (current + c) > max) {
7c673cae
FG
467 (*ticket)->wait(l);
468 waited = true;
11fdf7f2 469 } else if (delay.count() > 0) {
7c673cae
FG
470 (*ticket)->wait_for(l, delay);
471 waited = true;
472 } else {
473 break;
474 }
11fdf7f2
TL
475 ceph_assert(ticket == waiters.begin());
476 delay = _get_delay(c);
477 auto elapsed = mono_clock::now() - start;
478 if (delay <= elapsed) {
479 delay = timespan::zero();
480 } else {
481 delay -= elapsed;
482 }
7c673cae
FG
483 }
484 waiters.pop_front();
485 _kick_waiters();
486
487 current += c;
488
489 if (logger) {
490 logger->set(l_backoff_throttle_val, current);
491 if (waited) {
11fdf7f2 492 logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from);
7c673cae
FG
493 }
494 }
495
11fdf7f2 496 return mono_clock::now() - start;
7c673cae
FG
497}
498
499uint64_t BackoffThrottle::put(uint64_t c)
500{
501 locker l(lock);
11fdf7f2 502 ceph_assert(current >= c);
7c673cae
FG
503 current -= c;
504 _kick_waiters();
505
506 if (logger) {
507 logger->inc(l_backoff_throttle_put);
508 logger->inc(l_backoff_throttle_put_sum, c);
509 logger->set(l_backoff_throttle_val, current);
510 }
511
512 return current;
513}
514
515uint64_t BackoffThrottle::take(uint64_t c)
516{
517 locker l(lock);
518 current += c;
519
520 if (logger) {
521 logger->inc(l_backoff_throttle_take);
522 logger->inc(l_backoff_throttle_take_sum, c);
523 logger->set(l_backoff_throttle_val, current);
524 }
525
526 return current;
527}
528
529uint64_t BackoffThrottle::get_current()
530{
531 locker l(lock);
532 return current;
533}
534
535uint64_t BackoffThrottle::get_max()
536{
537 locker l(lock);
538 return max;
539}
540
541SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
11fdf7f2 542 : m_max(max), m_ignore_enoent(ignore_enoent) {}
7c673cae
FG
543
544SimpleThrottle::~SimpleThrottle()
545{
11fdf7f2
TL
546 std::lock_guard l(m_lock);
547 ceph_assert(m_current == 0);
548 ceph_assert(waiters == 0);
7c673cae
FG
549}
550
551void SimpleThrottle::start_op()
552{
11fdf7f2
TL
553 std::unique_lock l(m_lock);
554 waiters++;
555 m_cond.wait(l, [this]() { return m_max != m_current; });
556 waiters--;
7c673cae
FG
557 ++m_current;
558}
559
560void SimpleThrottle::end_op(int r)
561{
11fdf7f2 562 std::lock_guard l(m_lock);
7c673cae
FG
563 --m_current;
564 if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
565 m_ret = r;
11fdf7f2 566 m_cond.notify_all();
7c673cae
FG
567}
568
569bool SimpleThrottle::pending_error() const
570{
11fdf7f2 571 std::lock_guard l(m_lock);
7c673cae
FG
572 return (m_ret < 0);
573}
574
575int SimpleThrottle::wait_for_ret()
576{
11fdf7f2
TL
577 std::unique_lock l(m_lock);
578 waiters++;
579 m_cond.wait(l, [this]() { return m_current == 0; });
580 waiters--;
7c673cae
FG
581 return m_ret;
582}
583
584void C_OrderedThrottle::finish(int r) {
585 m_ordered_throttle->finish_op(m_tid, r);
586}
587
588OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
11fdf7f2 589 : m_max(max), m_ignore_enoent(ignore_enoent) {}
7c673cae 590
c07f9fc5 591OrderedThrottle::~OrderedThrottle() {
11fdf7f2
TL
592 std::lock_guard l(m_lock);
593 ceph_assert(waiters == 0);
c07f9fc5
FG
594}
595
7c673cae 596C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
11fdf7f2 597 ceph_assert(on_finish);
7c673cae 598
11fdf7f2 599 std::unique_lock l(m_lock);
7c673cae
FG
600 uint64_t tid = m_next_tid++;
601 m_tid_result[tid] = Result(on_finish);
11fdf7f2 602 auto ctx = std::make_unique<C_OrderedThrottle>(this, tid);
7c673cae 603
11fdf7f2 604 complete_pending_ops(l);
7c673cae 605 while (m_max == m_current) {
c07f9fc5 606 ++waiters;
11fdf7f2 607 m_cond.wait(l);
c07f9fc5 608 --waiters;
11fdf7f2 609 complete_pending_ops(l);
7c673cae
FG
610 }
611 ++m_current;
612
11fdf7f2 613 return ctx.release();
7c673cae
FG
614}
615
616void OrderedThrottle::end_op(int r) {
11fdf7f2
TL
617 std::lock_guard l(m_lock);
618 ceph_assert(m_current > 0);
7c673cae
FG
619
620 if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
621 m_ret_val = r;
622 }
623 --m_current;
11fdf7f2 624 m_cond.notify_all();
7c673cae
FG
625}
626
627void OrderedThrottle::finish_op(uint64_t tid, int r) {
11fdf7f2 628 std::lock_guard l(m_lock);
7c673cae 629
11fdf7f2
TL
630 auto it = m_tid_result.find(tid);
631 ceph_assert(it != m_tid_result.end());
7c673cae
FG
632
633 it->second.finished = true;
634 it->second.ret_val = r;
11fdf7f2 635 m_cond.notify_all();
7c673cae
FG
636}
637
638bool OrderedThrottle::pending_error() const {
11fdf7f2 639 std::lock_guard l(m_lock);
7c673cae
FG
640 return (m_ret_val < 0);
641}
642
643int OrderedThrottle::wait_for_ret() {
11fdf7f2
TL
644 std::unique_lock l(m_lock);
645 complete_pending_ops(l);
7c673cae
FG
646
647 while (m_current > 0) {
c07f9fc5 648 ++waiters;
11fdf7f2 649 m_cond.wait(l);
c07f9fc5 650 --waiters;
11fdf7f2 651 complete_pending_ops(l);
7c673cae
FG
652 }
653 return m_ret_val;
654}
655
11fdf7f2 656void OrderedThrottle::complete_pending_ops(std::unique_lock<std::mutex>& l) {
7c673cae 657 while (true) {
11fdf7f2 658 auto it = m_tid_result.begin();
7c673cae
FG
659 if (it == m_tid_result.end() || it->first != m_complete_tid ||
660 !it->second.finished) {
661 break;
662 }
663
664 Result result = it->second;
665 m_tid_result.erase(it);
666
11fdf7f2 667 l.unlock();
7c673cae 668 result.on_finish->complete(result.ret_val);
11fdf7f2 669 l.lock();
7c673cae
FG
670
671 ++m_complete_tid;
672 }
673}
11fdf7f2
TL
674
675#undef dout_prefix
676#define dout_prefix *_dout << "TokenBucketThrottle(" << m_name << " " \
677 << (void*)this << ") "
678
679uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) {
680 if (0 == max) {
681 return 0;
682 }
683
684 uint64_t got = 0;
f67539c2 685 if (available >= c) {
11fdf7f2
TL
686 // There is enough token in bucket, take c.
687 got = c;
f67539c2 688 available -= c;
11fdf7f2
TL
689 remain -= c;
690 } else {
f67539c2
TL
691 // There is not enough, take all available.
692 got = available;
693 remain -= available;
694 available = 0;
11fdf7f2
TL
695 }
696 return got;
697}
698
f67539c2 699uint64_t TokenBucketThrottle::Bucket::put(uint64_t tokens, double burst_ratio) {
11fdf7f2
TL
700 if (0 == max) {
701 return 0;
702 }
703
f67539c2
TL
704 if (tokens) {
705 // put tokens into bucket
11fdf7f2 706 uint64_t current = remain;
f67539c2
TL
707 if ((current + tokens) <= capacity) {
708 remain += tokens;
11fdf7f2 709 } else {
f67539c2
TL
710 remain = capacity;
711 }
712
713 // available tokens increase at burst speed
714 uint64_t available_inc = tokens;
715 if (burst_ratio > 1) {
716 available_inc = (uint64_t)(tokens * burst_ratio);
11fdf7f2 717 }
f67539c2
TL
718 uint64_t inc_upper_limit = remain > max ? max : remain;
719 if ((available + available_inc) <= inc_upper_limit ){
720 available += available_inc;
721 }else{
722 available = inc_upper_limit;
723 }
724
11fdf7f2
TL
725 }
726 return remain;
727}
728
f67539c2
TL
729void TokenBucketThrottle::Bucket::set_max(uint64_t max, uint64_t burst_seconds) {
730 // the capacity of bucket should not be less than max
731 if (burst_seconds < 1){
732 burst_seconds = 1;
11fdf7f2 733 }
f67539c2
TL
734 uint64_t new_capacity = max*burst_seconds;
735 if (capacity != new_capacity){
736 capacity = new_capacity;
737 remain = capacity;
738 }
739 if (available > max || 0 == max) {
740 available = max;
741 }
742 this->max = max;
11fdf7f2
TL
743}
744
745TokenBucketThrottle::TokenBucketThrottle(
746 CephContext *cct,
747 const std::string &name,
f67539c2 748 uint64_t burst,
11fdf7f2
TL
749 uint64_t avg,
750 SafeTimer *timer,
9f95a23c 751 ceph::mutex *timer_lock)
11fdf7f2 752 : m_cct(cct), m_name(name),
f67539c2
TL
753 m_throttle(m_cct, name + "_bucket", burst),
754 m_burst(burst), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
9f95a23c 755 m_lock(ceph::make_mutex(name + "_lock"))
11fdf7f2
TL
756{}
757
758TokenBucketThrottle::~TokenBucketThrottle() {
759 // cancel the timer events.
760 {
761 std::lock_guard timer_locker(*m_timer_lock);
762 cancel_timer();
763 }
764
765 list<Blocker> tmp_blockers;
766 {
767 std::lock_guard blockers_lock(m_lock);
768 tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end());
769 }
770
771 for (auto b : tmp_blockers) {
772 b.ctx->complete(0);
773 }
774}
775
f67539c2 776int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds) {
11fdf7f2 777 {
9f95a23c 778 std::lock_guard lock{m_lock};
11fdf7f2
TL
779
780 if (0 < burst && burst < average) {
781 // the burst should never less than the average.
782 return -EINVAL;
783 }
784
785 m_avg = average;
786 m_burst = burst;
787
788 if (0 == average) {
789 // The limit is not set, and no tokens will be put into the bucket.
790 // So, we can schedule the timer slowly, or even cancel it.
791 m_tick = 1000;
792 } else {
793 // calculate the tick(ms), don't less than the minimum.
794 m_tick = 1000 / average;
795 if (m_tick < m_tick_min) {
796 m_tick = m_tick_min;
797 }
798
799 // this is for the number(avg) can not be divisible.
800 m_ticks_per_second = 1000 / m_tick;
801 m_current_tick = 0;
802
803 // for the default configuration of burst.
f67539c2 804 m_throttle.set_max(0 == burst ? average : burst, burst_seconds);
11fdf7f2
TL
805 }
806 // turn millisecond to second
807 m_schedule_tick = m_tick / 1000.0;
808 }
809
810 // The schedule period will be changed when the average rate is set.
811 {
9f95a23c 812 std::lock_guard timer_locker{*m_timer_lock};
11fdf7f2
TL
813 cancel_timer();
814 schedule_timer();
815 }
816 return 0;
817}
818
819void TokenBucketThrottle::set_schedule_tick_min(uint64_t tick) {
820 std::lock_guard lock(m_lock);
821 if (tick != 0) {
822 m_tick_min = tick;
823 }
824}
825
826uint64_t TokenBucketThrottle::tokens_filled(double tick) {
827 return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg);
828}
829
830uint64_t TokenBucketThrottle::tokens_this_tick() {
831 if (0 == m_avg) {
832 return 0;
833 }
834 if (m_current_tick >= m_ticks_per_second) {
835 m_current_tick = 0;
836 }
837 m_current_tick++;
838
839 return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1);
840}
841
842void TokenBucketThrottle::add_tokens() {
843 list<Blocker> tmp_blockers;
844 {
845 std::lock_guard lock(m_lock);
846 // put tokens into bucket.
f67539c2
TL
847 double burst_ratio = 1.0;
848 if (m_throttle.max > m_avg && m_avg > 0){
849 burst_ratio = (double)m_throttle.max/m_avg;
850 }
851 m_throttle.put(tokens_this_tick(), burst_ratio);
11fdf7f2
TL
852 if (0 == m_avg || 0 == m_throttle.max)
853 tmp_blockers.swap(m_blockers);
854 // check the m_blockers from head to tail, if blocker can get
855 // enough tokens, let it go.
856 while (!m_blockers.empty()) {
857 Blocker &blocker = m_blockers.front();
858 uint64_t got = m_throttle.get(blocker.tokens_requested);
859 if (got == blocker.tokens_requested) {
860 // got enough tokens for front.
861 tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
862 } else {
863 // there is no more tokens.
864 blocker.tokens_requested -= got;
865 break;
866 }
867 }
868 }
869
870 for (auto b : tmp_blockers) {
871 b.ctx->complete(0);
872 }
873}
874
875void TokenBucketThrottle::schedule_timer() {
9f95a23c 876 m_token_ctx = new LambdaContext(
11fdf7f2
TL
877 [this](int r) {
878 schedule_timer();
879 });
880 m_timer->add_event_after(m_schedule_tick, m_token_ctx);
881
882 add_tokens();
883}
884
885void TokenBucketThrottle::cancel_timer() {
886 m_timer->cancel_event(m_token_ctx);
887}