]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Throttle.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / Throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/scope_guard.h"
5
6 #include "common/Throttle.h"
7 #include "common/ceph_time.h"
8 #include "common/perf_counters.h"
9
10
11 // re-include our assert to clobber the system one; fix dout:
12 #include "include/ceph_assert.h"
13
14 #define dout_subsys ceph_subsys_throttle
15
16 #undef dout_prefix
17 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
18
19 using std::list;
20 using std::ostream;
21 using std::string;
22
23 using ceph::mono_clock;
24 using ceph::mono_time;
25 using ceph::timespan;
26
27 enum {
28 l_throttle_first = 532430,
29 l_throttle_val,
30 l_throttle_max,
31 l_throttle_get_started,
32 l_throttle_get,
33 l_throttle_get_sum,
34 l_throttle_get_or_fail_fail,
35 l_throttle_get_or_fail_success,
36 l_throttle_take,
37 l_throttle_take_sum,
38 l_throttle_put,
39 l_throttle_put_sum,
40 l_throttle_wait,
41 l_throttle_last,
42 };
43
44 Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m,
45 bool _use_perf)
46 : cct(cct), name(n), max(m),
47 use_perf(_use_perf)
48 {
49 ceph_assert(m >= 0);
50
51 if (!use_perf)
52 return;
53
54 if (cct->_conf->throttler_perf_counter) {
55 PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last);
56 b.add_u64(l_throttle_val, "val", "Currently available throttle");
57 b.add_u64(l_throttle_max, "max", "Max value for throttle");
58 b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait");
59 b.add_u64_counter(l_throttle_get, "get", "Gets");
60 b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data");
61 b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail");
62 b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail");
63 b.add_u64_counter(l_throttle_take, "take", "Takes");
64 b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data");
65 b.add_u64_counter(l_throttle_put, "put", "Puts");
66 b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
67 b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
68
69 logger = { b.create_perf_counters(), cct };
70 cct->get_perfcounters_collection()->add(logger.get());
71 logger->set(l_throttle_max, max);
72 }
73 }
74
75 Throttle::~Throttle()
76 {
77 std::lock_guard l(lock);
78 ceph_assert(conds.empty());
79 }
80
81 void Throttle::_reset_max(int64_t m)
82 {
83 // lock must be held.
84 if (max == m)
85 return;
86 if (!conds.empty())
87 conds.front().notify_one();
88 if (logger)
89 logger->set(l_throttle_max, m);
90 max = m;
91 }
92
93 bool Throttle::_wait(int64_t c, std::unique_lock<std::mutex>& l)
94 {
95 mono_time start;
96 bool waited = false;
97 if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters.
98 {
99 auto cv = conds.emplace(conds.end());
100 auto w = make_scope_guard([this, cv]() {
101 conds.erase(cv);
102 });
103 waited = true;
104 ldout(cct, 2) << "_wait waiting..." << dendl;
105 if (logger)
106 start = mono_clock::now();
107
108 cv->wait(l, [this, c, cv]() { return (!_should_wait(c) &&
109 cv == conds.begin()); });
110 ldout(cct, 2) << "_wait finished waiting" << dendl;
111 if (logger) {
112 logger->tinc(l_throttle_wait, mono_clock::now() - start);
113 }
114 }
115 // wake up the next guy
116 if (!conds.empty())
117 conds.front().notify_one();
118 }
119 return waited;
120 }
121
122 bool Throttle::wait(int64_t m)
123 {
124 if (0 == max && 0 == m) {
125 return false;
126 }
127
128 std::unique_lock l(lock);
129 if (m) {
130 ceph_assert(m > 0);
131 _reset_max(m);
132 }
133 ldout(cct, 10) << "wait" << dendl;
134 return _wait(0, l);
135 }
136
137 int64_t Throttle::take(int64_t c)
138 {
139 if (0 == max) {
140 return 0;
141 }
142 ceph_assert(c >= 0);
143 ldout(cct, 10) << "take " << c << dendl;
144 count += c;
145 if (logger) {
146 logger->inc(l_throttle_take);
147 logger->inc(l_throttle_take_sum, c);
148 logger->set(l_throttle_val, count);
149 }
150 return count;
151 }
152
153 bool Throttle::get(int64_t c, int64_t m)
154 {
155 if (0 == max && 0 == m) {
156 count += c;
157 return false;
158 }
159
160 ceph_assert(c >= 0);
161 ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
162 if (logger) {
163 logger->inc(l_throttle_get_started);
164 }
165 bool waited = false;
166 {
167 std::unique_lock l(lock);
168 if (m) {
169 ceph_assert(m > 0);
170 _reset_max(m);
171 }
172 waited = _wait(c, l);
173 count += c;
174 }
175 if (logger) {
176 logger->inc(l_throttle_get);
177 logger->inc(l_throttle_get_sum, c);
178 logger->set(l_throttle_val, count);
179 }
180 return waited;
181 }
182
183 /* Returns true if it successfully got the requested amount,
184 * or false if it would block.
185 */
186 bool Throttle::get_or_fail(int64_t c)
187 {
188 if (0 == max) {
189 count += c;
190 return true;
191 }
192
193 assert (c >= 0);
194 bool result = false;
195 {
196 std::lock_guard l(lock);
197 if (_should_wait(c) || !conds.empty()) {
198 ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
199 result = false;
200 } else {
201 ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load()
202 << " -> " << (count.load() + c) << ")" << dendl;
203 count += c;
204 result = true;
205 }
206 }
207
208 if (logger) {
209 if (result) {
210 logger->inc(l_throttle_get_or_fail_success);
211 logger->inc(l_throttle_get);
212 logger->inc(l_throttle_get_sum, c);
213 logger->set(l_throttle_val, count);
214 } else {
215 logger->inc(l_throttle_get_or_fail_fail);
216 }
217 }
218 return result;
219 }
220
221 int64_t Throttle::put(int64_t c)
222 {
223 if (0 == max) {
224 count -= c;
225 return 0;
226 }
227
228 ceph_assert(c >= 0);
229 ldout(cct, 10) << "put " << c << " (" << count.load() << " -> "
230 << (count.load()-c) << ")" << dendl;
231 int64_t new_count;
232 {
233 std::lock_guard l(lock);
234 new_count = count;
235 if (c) {
236 if (!conds.empty())
237 conds.front().notify_one();
238 // if count goes negative, we failed somewhere!
239 ceph_assert(count >= c);
240 new_count = count -= c;
241 }
242 }
243 if (logger) {
244 logger->inc(l_throttle_put);
245 logger->inc(l_throttle_put_sum, c);
246 logger->set(l_throttle_val, count);
247 }
248
249 return new_count;
250 }
251
252 void Throttle::reset()
253 {
254 std::lock_guard l(lock);
255 if (!conds.empty())
256 conds.front().notify_one();
257 count = 0;
258 if (logger) {
259 logger->set(l_throttle_val, 0);
260 }
261 }
262
263 enum {
264 l_backoff_throttle_first = l_throttle_last + 1,
265 l_backoff_throttle_val,
266 l_backoff_throttle_max,
267 l_backoff_throttle_get,
268 l_backoff_throttle_get_sum,
269 l_backoff_throttle_take,
270 l_backoff_throttle_take_sum,
271 l_backoff_throttle_put,
272 l_backoff_throttle_put_sum,
273 l_backoff_throttle_wait,
274 l_backoff_throttle_last,
275 };
276
277 BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n,
278 unsigned expected_concurrency, bool _use_perf)
279 : cct(cct), name(n),
280 conds(expected_concurrency),///< [in] determines size of conds
281 use_perf(_use_perf)
282 {
283 if (!use_perf)
284 return;
285
286 if (cct->_conf->throttler_perf_counter) {
287 PerfCountersBuilder b(cct, string("throttle-") + name,
288 l_backoff_throttle_first, l_backoff_throttle_last);
289 b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
290 b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
291 b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
292 b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data");
293 b.add_u64_counter(l_backoff_throttle_take, "take", "Takes");
294 b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data");
295 b.add_u64_counter(l_backoff_throttle_put, "put", "Puts");
296 b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
297 b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
298
299 logger = { b.create_perf_counters(), cct };
300 cct->get_perfcounters_collection()->add(logger.get());
301 logger->set(l_backoff_throttle_max, max);
302 }
303 }
304
305 BackoffThrottle::~BackoffThrottle()
306 {
307 std::lock_guard l(lock);
308 ceph_assert(waiters.empty());
309 }
310
311 bool BackoffThrottle::set_params(
312 double _low_threshold,
313 double _high_threshold,
314 double _expected_throughput,
315 double _high_multiple,
316 double _max_multiple,
317 uint64_t _throttle_max,
318 ostream *errstream)
319 {
320 bool valid = true;
321 if (_low_threshold > _high_threshold) {
322 valid = false;
323 if (errstream) {
324 *errstream << "low_threshold (" << _low_threshold
325 << ") > high_threshold (" << _high_threshold
326 << ")" << std::endl;
327 }
328 }
329
330 if (_high_multiple > _max_multiple) {
331 valid = false;
332 if (errstream) {
333 *errstream << "_high_multiple (" << _high_multiple
334 << ") > _max_multiple (" << _max_multiple
335 << ")" << std::endl;
336 }
337 }
338
339 if (_low_threshold > 1 || _low_threshold < 0) {
340 valid = false;
341 if (errstream) {
342 *errstream << "invalid low_threshold (" << _low_threshold << ")"
343 << std::endl;
344 }
345 }
346
347 if (_high_threshold > 1 || _high_threshold < 0) {
348 valid = false;
349 if (errstream) {
350 *errstream << "invalid high_threshold (" << _high_threshold << ")"
351 << std::endl;
352 }
353 }
354
355 if (_max_multiple < 0) {
356 valid = false;
357 if (errstream) {
358 *errstream << "invalid _max_multiple ("
359 << _max_multiple << ")"
360 << std::endl;
361 }
362 }
363
364 if (_high_multiple < 0) {
365 valid = false;
366 if (errstream) {
367 *errstream << "invalid _high_multiple ("
368 << _high_multiple << ")"
369 << std::endl;
370 }
371 }
372
373 if (_expected_throughput < 0) {
374 valid = false;
375 if (errstream) {
376 *errstream << "invalid _expected_throughput("
377 << _expected_throughput << ")"
378 << std::endl;
379 }
380 }
381
382 if (!valid)
383 return false;
384
385 locker l(lock);
386 low_threshold = _low_threshold;
387 high_threshold = _high_threshold;
388 high_delay_per_count = _high_multiple / _expected_throughput;
389 max_delay_per_count = _max_multiple / _expected_throughput;
390 max = _throttle_max;
391
392 if (logger)
393 logger->set(l_backoff_throttle_max, max);
394
395 if (high_threshold - low_threshold > 0) {
396 s0 = high_delay_per_count / (high_threshold - low_threshold);
397 } else {
398 low_threshold = high_threshold;
399 s0 = 0;
400 }
401
402 if (1 - high_threshold > 0) {
403 s1 = (max_delay_per_count - high_delay_per_count)
404 / (1 - high_threshold);
405 } else {
406 high_threshold = 1;
407 s1 = 0;
408 }
409
410 _kick_waiters();
411 return true;
412 }
413
414 ceph::timespan BackoffThrottle::_get_delay(uint64_t c) const
415 {
416 if (max == 0)
417 return ceph::timespan(0);
418
419 double r = ((double)current) / ((double)max);
420 if (r < low_threshold) {
421 return ceph::timespan(0);
422 } else if (r < high_threshold) {
423 return c * ceph::make_timespan(
424 (r - low_threshold) * s0);
425 } else {
426 return c * ceph::make_timespan(
427 high_delay_per_count + ((r - high_threshold) * s1));
428 }
429 }
430
431 ceph::timespan BackoffThrottle::get(uint64_t c)
432 {
433 locker l(lock);
434 auto delay = _get_delay(c);
435
436 if (logger) {
437 logger->inc(l_backoff_throttle_get);
438 logger->inc(l_backoff_throttle_get_sum, c);
439 }
440
441 // fast path
442 if (delay.count() == 0 &&
443 waiters.empty() &&
444 ((max == 0) || (current == 0) || ((current + c) <= max))) {
445 current += c;
446
447 if (logger) {
448 logger->set(l_backoff_throttle_val, current);
449 }
450
451 return ceph::make_timespan(0);
452 }
453
454 auto ticket = _push_waiter();
455 auto wait_from = mono_clock::now();
456 bool waited = false;
457
458 while (waiters.begin() != ticket) {
459 (*ticket)->wait(l);
460 waited = true;
461 }
462
463 auto start = mono_clock::now();
464 delay = _get_delay(c);
465 while (true) {
466 if (max != 0 && current != 0 && (current + c) > max) {
467 (*ticket)->wait(l);
468 waited = true;
469 } else if (delay.count() > 0) {
470 (*ticket)->wait_for(l, delay);
471 waited = true;
472 } else {
473 break;
474 }
475 ceph_assert(ticket == waiters.begin());
476 delay = _get_delay(c);
477 auto elapsed = mono_clock::now() - start;
478 if (delay <= elapsed) {
479 delay = timespan::zero();
480 } else {
481 delay -= elapsed;
482 }
483 }
484 waiters.pop_front();
485 _kick_waiters();
486
487 current += c;
488
489 if (logger) {
490 logger->set(l_backoff_throttle_val, current);
491 if (waited) {
492 logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from);
493 }
494 }
495
496 return mono_clock::now() - start;
497 }
498
499 uint64_t BackoffThrottle::put(uint64_t c)
500 {
501 locker l(lock);
502 ceph_assert(current >= c);
503 current -= c;
504 _kick_waiters();
505
506 if (logger) {
507 logger->inc(l_backoff_throttle_put);
508 logger->inc(l_backoff_throttle_put_sum, c);
509 logger->set(l_backoff_throttle_val, current);
510 }
511
512 return current;
513 }
514
515 uint64_t BackoffThrottle::take(uint64_t c)
516 {
517 locker l(lock);
518 current += c;
519
520 if (logger) {
521 logger->inc(l_backoff_throttle_take);
522 logger->inc(l_backoff_throttle_take_sum, c);
523 logger->set(l_backoff_throttle_val, current);
524 }
525
526 return current;
527 }
528
529 uint64_t BackoffThrottle::get_current()
530 {
531 locker l(lock);
532 return current;
533 }
534
535 uint64_t BackoffThrottle::get_max()
536 {
537 locker l(lock);
538 return max;
539 }
540
541 SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
542 : m_max(max), m_ignore_enoent(ignore_enoent) {}
543
544 SimpleThrottle::~SimpleThrottle()
545 {
546 std::lock_guard l(m_lock);
547 ceph_assert(m_current == 0);
548 ceph_assert(waiters == 0);
549 }
550
551 void SimpleThrottle::start_op()
552 {
553 std::unique_lock l(m_lock);
554 waiters++;
555 m_cond.wait(l, [this]() { return m_max != m_current; });
556 waiters--;
557 ++m_current;
558 }
559
560 void SimpleThrottle::end_op(int r)
561 {
562 std::lock_guard l(m_lock);
563 --m_current;
564 if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
565 m_ret = r;
566 m_cond.notify_all();
567 }
568
569 bool SimpleThrottle::pending_error() const
570 {
571 std::lock_guard l(m_lock);
572 return (m_ret < 0);
573 }
574
575 int SimpleThrottle::wait_for_ret()
576 {
577 std::unique_lock l(m_lock);
578 waiters++;
579 m_cond.wait(l, [this]() { return m_current == 0; });
580 waiters--;
581 return m_ret;
582 }
583
584 void C_OrderedThrottle::finish(int r) {
585 m_ordered_throttle->finish_op(m_tid, r);
586 }
587
588 OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
589 : m_max(max), m_ignore_enoent(ignore_enoent) {}
590
591 OrderedThrottle::~OrderedThrottle() {
592 std::lock_guard l(m_lock);
593 ceph_assert(waiters == 0);
594 }
595
596 C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
597 ceph_assert(on_finish);
598
599 std::unique_lock l(m_lock);
600 uint64_t tid = m_next_tid++;
601 m_tid_result[tid] = Result(on_finish);
602 auto ctx = std::make_unique<C_OrderedThrottle>(this, tid);
603
604 complete_pending_ops(l);
605 while (m_max == m_current) {
606 ++waiters;
607 m_cond.wait(l);
608 --waiters;
609 complete_pending_ops(l);
610 }
611 ++m_current;
612
613 return ctx.release();
614 }
615
616 void OrderedThrottle::end_op(int r) {
617 std::lock_guard l(m_lock);
618 ceph_assert(m_current > 0);
619
620 if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
621 m_ret_val = r;
622 }
623 --m_current;
624 m_cond.notify_all();
625 }
626
627 void OrderedThrottle::finish_op(uint64_t tid, int r) {
628 std::lock_guard l(m_lock);
629
630 auto it = m_tid_result.find(tid);
631 ceph_assert(it != m_tid_result.end());
632
633 it->second.finished = true;
634 it->second.ret_val = r;
635 m_cond.notify_all();
636 }
637
638 bool OrderedThrottle::pending_error() const {
639 std::lock_guard l(m_lock);
640 return (m_ret_val < 0);
641 }
642
643 int OrderedThrottle::wait_for_ret() {
644 std::unique_lock l(m_lock);
645 complete_pending_ops(l);
646
647 while (m_current > 0) {
648 ++waiters;
649 m_cond.wait(l);
650 --waiters;
651 complete_pending_ops(l);
652 }
653 return m_ret_val;
654 }
655
656 void OrderedThrottle::complete_pending_ops(std::unique_lock<std::mutex>& l) {
657 while (true) {
658 auto it = m_tid_result.begin();
659 if (it == m_tid_result.end() || it->first != m_complete_tid ||
660 !it->second.finished) {
661 break;
662 }
663
664 Result result = it->second;
665 m_tid_result.erase(it);
666
667 l.unlock();
668 result.on_finish->complete(result.ret_val);
669 l.lock();
670
671 ++m_complete_tid;
672 }
673 }
674
675 #undef dout_prefix
676 #define dout_prefix *_dout << "TokenBucketThrottle(" << m_name << " " \
677 << (void*)this << ") "
678
679 uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) {
680 if (0 == max) {
681 return 0;
682 }
683
684 uint64_t got = 0;
685 if (available >= c) {
686 // There is enough token in bucket, take c.
687 got = c;
688 available -= c;
689 remain -= c;
690 } else {
691 // There is not enough, take all available.
692 got = available;
693 remain -= available;
694 available = 0;
695 }
696 return got;
697 }
698
699 uint64_t TokenBucketThrottle::Bucket::put(uint64_t tokens, double burst_ratio) {
700 if (0 == max) {
701 return 0;
702 }
703
704 if (tokens) {
705 // put tokens into bucket
706 uint64_t current = remain;
707 if ((current + tokens) <= capacity) {
708 remain += tokens;
709 } else {
710 remain = capacity;
711 }
712
713 // available tokens increase at burst speed
714 uint64_t available_inc = tokens;
715 if (burst_ratio > 1) {
716 available_inc = (uint64_t)(tokens * burst_ratio);
717 }
718 uint64_t inc_upper_limit = remain > max ? max : remain;
719 if ((available + available_inc) <= inc_upper_limit ){
720 available += available_inc;
721 }else{
722 available = inc_upper_limit;
723 }
724
725 }
726 return remain;
727 }
728
729 void TokenBucketThrottle::Bucket::set_max(uint64_t max, uint64_t burst_seconds) {
730 // the capacity of bucket should not be less than max
731 if (burst_seconds < 1){
732 burst_seconds = 1;
733 }
734 uint64_t new_capacity = max*burst_seconds;
735 if (capacity != new_capacity){
736 capacity = new_capacity;
737 remain = capacity;
738 }
739 if (available > max || 0 == max) {
740 available = max;
741 }
742 this->max = max;
743 }
744
745 TokenBucketThrottle::TokenBucketThrottle(
746 CephContext *cct,
747 const std::string &name,
748 uint64_t burst,
749 uint64_t avg,
750 SafeTimer *timer,
751 ceph::mutex *timer_lock)
752 : m_cct(cct), m_name(name),
753 m_throttle(m_cct, name + "_bucket", burst),
754 m_burst(burst), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock),
755 m_lock(ceph::make_mutex(name + "_lock"))
756 {}
757
758 TokenBucketThrottle::~TokenBucketThrottle() {
759 // cancel the timer events.
760 {
761 std::lock_guard timer_locker(*m_timer_lock);
762 cancel_timer();
763 }
764
765 list<Blocker> tmp_blockers;
766 {
767 std::lock_guard blockers_lock(m_lock);
768 tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end());
769 }
770
771 for (auto b : tmp_blockers) {
772 b.ctx->complete(0);
773 }
774 }
775
776 int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds) {
777 {
778 std::lock_guard lock{m_lock};
779
780 if (0 < burst && burst < average) {
781 // the burst should never less than the average.
782 return -EINVAL;
783 }
784
785 m_avg = average;
786 m_burst = burst;
787
788 if (0 == average) {
789 // The limit is not set, and no tokens will be put into the bucket.
790 // So, we can schedule the timer slowly, or even cancel it.
791 m_tick = 1000;
792 } else {
793 // calculate the tick(ms), don't less than the minimum.
794 m_tick = 1000 / average;
795 if (m_tick < m_tick_min) {
796 m_tick = m_tick_min;
797 }
798
799 // this is for the number(avg) can not be divisible.
800 m_ticks_per_second = 1000 / m_tick;
801 m_current_tick = 0;
802
803 // for the default configuration of burst.
804 m_throttle.set_max(0 == burst ? average : burst, burst_seconds);
805 }
806 // turn millisecond to second
807 m_schedule_tick = m_tick / 1000.0;
808 }
809
810 // The schedule period will be changed when the average rate is set.
811 {
812 std::lock_guard timer_locker{*m_timer_lock};
813 cancel_timer();
814 schedule_timer();
815 }
816 return 0;
817 }
818
819 void TokenBucketThrottle::set_schedule_tick_min(uint64_t tick) {
820 std::lock_guard lock(m_lock);
821 if (tick != 0) {
822 m_tick_min = tick;
823 }
824 }
825
826 uint64_t TokenBucketThrottle::tokens_filled(double tick) {
827 return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg);
828 }
829
830 uint64_t TokenBucketThrottle::tokens_this_tick() {
831 if (0 == m_avg) {
832 return 0;
833 }
834 if (m_current_tick >= m_ticks_per_second) {
835 m_current_tick = 0;
836 }
837 m_current_tick++;
838
839 return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1);
840 }
841
842 void TokenBucketThrottle::add_tokens() {
843 list<Blocker> tmp_blockers;
844 {
845 std::lock_guard lock(m_lock);
846 // put tokens into bucket.
847 double burst_ratio = 1.0;
848 if (m_throttle.max > m_avg && m_avg > 0){
849 burst_ratio = (double)m_throttle.max/m_avg;
850 }
851 m_throttle.put(tokens_this_tick(), burst_ratio);
852 if (0 == m_avg || 0 == m_throttle.max)
853 tmp_blockers.swap(m_blockers);
854 // check the m_blockers from head to tail, if blocker can get
855 // enough tokens, let it go.
856 while (!m_blockers.empty()) {
857 Blocker &blocker = m_blockers.front();
858 uint64_t got = m_throttle.get(blocker.tokens_requested);
859 if (got == blocker.tokens_requested) {
860 // got enough tokens for front.
861 tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin());
862 } else {
863 // there is no more tokens.
864 blocker.tokens_requested -= got;
865 break;
866 }
867 }
868 }
869
870 for (auto b : tmp_blockers) {
871 b.ctx->complete(0);
872 }
873 }
874
875 void TokenBucketThrottle::schedule_timer() {
876 m_token_ctx = new LambdaContext(
877 [this](int r) {
878 schedule_timer();
879 });
880 m_timer->add_event_after(m_schedule_tick, m_token_ctx);
881
882 add_tokens();
883 }
884
885 void TokenBucketThrottle::cancel_timer() {
886 m_timer->cancel_event(m_token_ctx);
887 }