1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/scope_guard.h"
6 #include "common/Throttle.h"
7 #include "common/ceph_time.h"
8 #include "common/perf_counters.h"
11 // re-include our assert to clobber the system one; fix dout:
12 #include "include/ceph_assert.h"
14 #define dout_subsys ceph_subsys_throttle
17 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
23 using ceph::mono_clock
;
24 using ceph::mono_time
;
28 l_throttle_first
= 532430,
31 l_throttle_get_started
,
34 l_throttle_get_or_fail_fail
,
35 l_throttle_get_or_fail_success
,
44 Throttle::Throttle(CephContext
*cct
, const std::string
& n
, int64_t m
,
46 : cct(cct
), name(n
), max(m
),
54 if (cct
->_conf
->throttler_perf_counter
) {
55 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_throttle_first
, l_throttle_last
);
56 b
.add_u64(l_throttle_val
, "val", "Currently taken slots");
57 b
.add_u64(l_throttle_max
, "max", "Max value for throttle");
58 b
.add_u64_counter(l_throttle_get_started
, "get_started", "Number of get calls, increased before wait");
59 b
.add_u64_counter(l_throttle_get
, "get", "Gets");
60 b
.add_u64_counter(l_throttle_get_sum
, "get_sum", "Got data");
61 b
.add_u64_counter(l_throttle_get_or_fail_fail
, "get_or_fail_fail", "Get blocked during get_or_fail");
62 b
.add_u64_counter(l_throttle_get_or_fail_success
, "get_or_fail_success", "Successful get during get_or_fail");
63 b
.add_u64_counter(l_throttle_take
, "take", "Takes");
64 b
.add_u64_counter(l_throttle_take_sum
, "take_sum", "Taken data");
65 b
.add_u64_counter(l_throttle_put
, "put", "Puts");
66 b
.add_u64_counter(l_throttle_put_sum
, "put_sum", "Put data");
67 b
.add_time_avg(l_throttle_wait
, "wait", "Waiting latency");
69 logger
= { b
.create_perf_counters(), cct
};
70 cct
->get_perfcounters_collection()->add(logger
.get());
71 logger
->set(l_throttle_max
, max
);
77 std::lock_guard
l(lock
);
78 ceph_assert(conds
.empty());
81 void Throttle::_reset_max(int64_t m
)
87 conds
.front().notify_one();
89 logger
->set(l_throttle_max
, m
);
93 bool Throttle::_wait(int64_t c
, std::unique_lock
<std::mutex
>& l
)
97 if (_should_wait(c
) || !conds
.empty()) { // always wait behind other waiters.
99 auto cv
= conds
.emplace(conds
.end());
100 auto w
= make_scope_guard([this, cv
]() {
104 ldout(cct
, 2) << "_wait waiting..." << dendl
;
106 start
= mono_clock::now();
108 cv
->wait(l
, [this, c
, cv
]() { return (!_should_wait(c
) &&
109 cv
== conds
.begin()); });
110 ldout(cct
, 2) << "_wait finished waiting" << dendl
;
112 logger
->tinc(l_throttle_wait
, mono_clock::now() - start
);
115 // wake up the next guy
117 conds
.front().notify_one();
122 bool Throttle::wait(int64_t m
)
124 if (0 == max
&& 0 == m
) {
128 std::unique_lock
l(lock
);
133 ldout(cct
, 10) << "wait" << dendl
;
137 int64_t Throttle::take(int64_t c
)
143 ldout(cct
, 10) << "take " << c
<< dendl
;
146 logger
->inc(l_throttle_take
);
147 logger
->inc(l_throttle_take_sum
, c
);
148 logger
->set(l_throttle_val
, count
);
153 bool Throttle::get(int64_t c
, int64_t m
)
155 if (0 == max
&& 0 == m
) {
161 ldout(cct
, 10) << "get " << c
<< " (" << count
.load() << " -> " << (count
.load() + c
) << ")" << dendl
;
163 logger
->inc(l_throttle_get_started
);
167 std::unique_lock
l(lock
);
172 waited
= _wait(c
, l
);
176 logger
->inc(l_throttle_get
);
177 logger
->inc(l_throttle_get_sum
, c
);
178 logger
->set(l_throttle_val
, count
);
183 /* Returns true if it successfully got the requested amount,
184 * or false if it would block.
186 bool Throttle::get_or_fail(int64_t c
)
196 std::lock_guard
l(lock
);
197 if (_should_wait(c
) || !conds
.empty()) {
198 ldout(cct
, 10) << "get_or_fail " << c
<< " failed" << dendl
;
201 ldout(cct
, 10) << "get_or_fail " << c
<< " success (" << count
.load()
202 << " -> " << (count
.load() + c
) << ")" << dendl
;
210 logger
->inc(l_throttle_get_or_fail_success
);
211 logger
->inc(l_throttle_get
);
212 logger
->inc(l_throttle_get_sum
, c
);
213 logger
->set(l_throttle_val
, count
);
215 logger
->inc(l_throttle_get_or_fail_fail
);
221 int64_t Throttle::put(int64_t c
)
229 ldout(cct
, 10) << "put " << c
<< " (" << count
.load() << " -> "
230 << (count
.load()-c
) << ")" << dendl
;
233 std::lock_guard
l(lock
);
237 conds
.front().notify_one();
238 // if count goes negative, we failed somewhere!
239 ceph_assert(count
>= c
);
240 new_count
= count
-= c
;
244 logger
->inc(l_throttle_put
);
245 logger
->inc(l_throttle_put_sum
, c
);
246 logger
->set(l_throttle_val
, count
);
252 void Throttle::reset()
254 std::lock_guard
l(lock
);
256 conds
.front().notify_one();
259 logger
->set(l_throttle_val
, 0);
264 l_backoff_throttle_first
= l_throttle_last
+ 1,
265 l_backoff_throttle_val
,
266 l_backoff_throttle_max
,
267 l_backoff_throttle_get
,
268 l_backoff_throttle_get_sum
,
269 l_backoff_throttle_take
,
270 l_backoff_throttle_take_sum
,
271 l_backoff_throttle_put
,
272 l_backoff_throttle_put_sum
,
273 l_backoff_throttle_wait
,
274 l_backoff_throttle_last
,
277 BackoffThrottle::BackoffThrottle(CephContext
*cct
, const std::string
& n
,
278 unsigned expected_concurrency
, bool _use_perf
)
280 conds(expected_concurrency
),///< [in] determines size of conds
286 if (cct
->_conf
->throttler_perf_counter
) {
287 PerfCountersBuilder
b(cct
, string("throttle-") + name
,
288 l_backoff_throttle_first
, l_backoff_throttle_last
);
289 b
.add_u64(l_backoff_throttle_val
, "val", "Currently available throttle");
290 b
.add_u64(l_backoff_throttle_max
, "max", "Max value for throttle");
291 b
.add_u64_counter(l_backoff_throttle_get
, "get", "Gets");
292 b
.add_u64_counter(l_backoff_throttle_get_sum
, "get_sum", "Got data");
293 b
.add_u64_counter(l_backoff_throttle_take
, "take", "Takes");
294 b
.add_u64_counter(l_backoff_throttle_take_sum
, "take_sum", "Taken data");
295 b
.add_u64_counter(l_backoff_throttle_put
, "put", "Puts");
296 b
.add_u64_counter(l_backoff_throttle_put_sum
, "put_sum", "Put data");
297 b
.add_time_avg(l_backoff_throttle_wait
, "wait", "Waiting latency");
299 logger
= { b
.create_perf_counters(), cct
};
300 cct
->get_perfcounters_collection()->add(logger
.get());
301 logger
->set(l_backoff_throttle_max
, max
);
305 BackoffThrottle::~BackoffThrottle()
307 std::lock_guard
l(lock
);
308 ceph_assert(waiters
.empty());
311 bool BackoffThrottle::set_params(
312 double _low_threshold
,
313 double _high_threshold
,
314 double _expected_throughput
,
315 double _high_multiple
,
316 double _max_multiple
,
317 uint64_t _throttle_max
,
321 if (_low_threshold
> _high_threshold
) {
324 *errstream
<< "low_threshold (" << _low_threshold
325 << ") > high_threshold (" << _high_threshold
330 if (_high_multiple
> _max_multiple
) {
333 *errstream
<< "_high_multiple (" << _high_multiple
334 << ") > _max_multiple (" << _max_multiple
339 if (_low_threshold
> 1 || _low_threshold
< 0) {
342 *errstream
<< "invalid low_threshold (" << _low_threshold
<< ")"
347 if (_high_threshold
> 1 || _high_threshold
< 0) {
350 *errstream
<< "invalid high_threshold (" << _high_threshold
<< ")"
355 if (_max_multiple
< 0) {
358 *errstream
<< "invalid _max_multiple ("
359 << _max_multiple
<< ")"
364 if (_high_multiple
< 0) {
367 *errstream
<< "invalid _high_multiple ("
368 << _high_multiple
<< ")"
373 if (_expected_throughput
< 0) {
376 *errstream
<< "invalid _expected_throughput("
377 << _expected_throughput
<< ")"
386 low_threshold
= _low_threshold
;
387 high_threshold
= _high_threshold
;
388 high_delay_per_count
= _high_multiple
/ _expected_throughput
;
389 max_delay_per_count
= _max_multiple
/ _expected_throughput
;
393 logger
->set(l_backoff_throttle_max
, max
);
395 if (high_threshold
- low_threshold
> 0) {
396 s0
= high_delay_per_count
/ (high_threshold
- low_threshold
);
398 low_threshold
= high_threshold
;
402 if (1 - high_threshold
> 0) {
403 s1
= (max_delay_per_count
- high_delay_per_count
)
404 / (1 - high_threshold
);
414 ceph::timespan
BackoffThrottle::_get_delay(uint64_t c
) const
417 return ceph::timespan(0);
419 double r
= ((double)current
) / ((double)max
);
420 if (r
< low_threshold
) {
421 return ceph::timespan(0);
422 } else if (r
< high_threshold
) {
423 return c
* ceph::make_timespan(
424 (r
- low_threshold
) * s0
);
426 return c
* ceph::make_timespan(
427 high_delay_per_count
+ ((r
- high_threshold
) * s1
));
431 ceph::timespan
BackoffThrottle::get(uint64_t c
)
434 auto delay
= _get_delay(c
);
437 logger
->inc(l_backoff_throttle_get
);
438 logger
->inc(l_backoff_throttle_get_sum
, c
);
442 if (delay
.count() == 0 &&
444 ((max
== 0) || (current
== 0) || ((current
+ c
) <= max
))) {
448 logger
->set(l_backoff_throttle_val
, current
);
451 return ceph::make_timespan(0);
454 auto ticket
= _push_waiter();
455 auto wait_from
= mono_clock::now();
458 while (waiters
.begin() != ticket
) {
463 auto start
= mono_clock::now();
464 delay
= _get_delay(c
);
466 if (max
!= 0 && current
!= 0 && (current
+ c
) > max
) {
469 } else if (delay
.count() > 0) {
470 (*ticket
)->wait_for(l
, delay
);
475 ceph_assert(ticket
== waiters
.begin());
476 delay
= _get_delay(c
);
477 auto elapsed
= mono_clock::now() - start
;
478 if (delay
<= elapsed
) {
479 delay
= timespan::zero();
490 logger
->set(l_backoff_throttle_val
, current
);
492 logger
->tinc(l_backoff_throttle_wait
, mono_clock::now() - wait_from
);
496 return mono_clock::now() - start
;
499 uint64_t BackoffThrottle::put(uint64_t c
)
502 ceph_assert(current
>= c
);
507 logger
->inc(l_backoff_throttle_put
);
508 logger
->inc(l_backoff_throttle_put_sum
, c
);
509 logger
->set(l_backoff_throttle_val
, current
);
515 uint64_t BackoffThrottle::take(uint64_t c
)
521 logger
->inc(l_backoff_throttle_take
);
522 logger
->inc(l_backoff_throttle_take_sum
, c
);
523 logger
->set(l_backoff_throttle_val
, current
);
529 uint64_t BackoffThrottle::get_current()
535 uint64_t BackoffThrottle::get_max()
541 SimpleThrottle::SimpleThrottle(uint64_t max
, bool ignore_enoent
)
542 : m_max(max
), m_ignore_enoent(ignore_enoent
) {}
544 SimpleThrottle::~SimpleThrottle()
546 std::lock_guard
l(m_lock
);
547 ceph_assert(m_current
== 0);
548 ceph_assert(waiters
== 0);
551 void SimpleThrottle::start_op()
553 std::unique_lock
l(m_lock
);
555 m_cond
.wait(l
, [this]() { return m_max
!= m_current
; });
560 void SimpleThrottle::end_op(int r
)
562 std::lock_guard
l(m_lock
);
564 if (r
< 0 && !m_ret
&& !(r
== -ENOENT
&& m_ignore_enoent
))
569 bool SimpleThrottle::pending_error() const
571 std::lock_guard
l(m_lock
);
575 int SimpleThrottle::wait_for_ret()
577 std::unique_lock
l(m_lock
);
579 m_cond
.wait(l
, [this]() { return m_current
== 0; });
584 void C_OrderedThrottle::finish(int r
) {
585 m_ordered_throttle
->finish_op(m_tid
, r
);
588 OrderedThrottle::OrderedThrottle(uint64_t max
, bool ignore_enoent
)
589 : m_max(max
), m_ignore_enoent(ignore_enoent
) {}
591 OrderedThrottle::~OrderedThrottle() {
592 std::lock_guard
l(m_lock
);
593 ceph_assert(waiters
== 0);
596 C_OrderedThrottle
*OrderedThrottle::start_op(Context
*on_finish
) {
597 ceph_assert(on_finish
);
599 std::unique_lock
l(m_lock
);
600 uint64_t tid
= m_next_tid
++;
601 m_tid_result
[tid
] = Result(on_finish
);
602 auto ctx
= std::make_unique
<C_OrderedThrottle
>(this, tid
);
604 complete_pending_ops(l
);
605 while (m_max
== m_current
) {
609 complete_pending_ops(l
);
613 return ctx
.release();
616 void OrderedThrottle::end_op(int r
) {
617 std::lock_guard
l(m_lock
);
618 ceph_assert(m_current
> 0);
620 if (r
< 0 && m_ret_val
== 0 && (r
!= -ENOENT
|| !m_ignore_enoent
)) {
627 void OrderedThrottle::finish_op(uint64_t tid
, int r
) {
628 std::lock_guard
l(m_lock
);
630 auto it
= m_tid_result
.find(tid
);
631 ceph_assert(it
!= m_tid_result
.end());
633 it
->second
.finished
= true;
634 it
->second
.ret_val
= r
;
638 bool OrderedThrottle::pending_error() const {
639 std::lock_guard
l(m_lock
);
640 return (m_ret_val
< 0);
643 int OrderedThrottle::wait_for_ret() {
644 std::unique_lock
l(m_lock
);
645 complete_pending_ops(l
);
647 while (m_current
> 0) {
651 complete_pending_ops(l
);
656 void OrderedThrottle::complete_pending_ops(std::unique_lock
<std::mutex
>& l
) {
658 auto it
= m_tid_result
.begin();
659 if (it
== m_tid_result
.end() || it
->first
!= m_complete_tid
||
660 !it
->second
.finished
) {
664 Result result
= it
->second
;
665 m_tid_result
.erase(it
);
668 result
.on_finish
->complete(result
.ret_val
);
676 #define dout_prefix *_dout << "TokenBucketThrottle(" << m_name << " " \
677 << (void*)this << ") "
679 uint64_t TokenBucketThrottle::Bucket::get(uint64_t c
) {
685 if (available
>= c
) {
686 // There is enough token in bucket, take c.
691 // There is not enough, take all available.
699 uint64_t TokenBucketThrottle::Bucket::put(uint64_t tokens
, double burst_ratio
) {
705 // put tokens into bucket
706 uint64_t current
= remain
;
707 if ((current
+ tokens
) <= capacity
) {
713 // available tokens increase at burst speed
714 uint64_t available_inc
= tokens
;
715 if (burst_ratio
> 1) {
716 available_inc
= (uint64_t)(tokens
* burst_ratio
);
718 uint64_t inc_upper_limit
= remain
> max
? max
: remain
;
719 if ((available
+ available_inc
) <= inc_upper_limit
){
720 available
+= available_inc
;
722 available
= inc_upper_limit
;
729 void TokenBucketThrottle::Bucket::set_max(uint64_t max
, uint64_t burst_seconds
) {
730 // the capacity of bucket should not be less than max
731 if (burst_seconds
< 1){
734 uint64_t new_capacity
= max
*burst_seconds
;
735 if (capacity
!= new_capacity
){
736 capacity
= new_capacity
;
739 if (available
> max
|| 0 == max
) {
745 TokenBucketThrottle::TokenBucketThrottle(
747 const std::string
&name
,
751 ceph::mutex
*timer_lock
)
752 : m_cct(cct
), m_name(name
),
753 m_throttle(m_cct
, name
+ "_bucket", burst
),
754 m_burst(burst
), m_avg(avg
), m_timer(timer
), m_timer_lock(timer_lock
),
755 m_lock(ceph::make_mutex(name
+ "_lock"))
758 TokenBucketThrottle::~TokenBucketThrottle() {
759 // cancel the timer events.
761 std::lock_guard
timer_locker(*m_timer_lock
);
765 list
<Blocker
> tmp_blockers
;
767 std::lock_guard
blockers_lock(m_lock
);
768 tmp_blockers
.splice(tmp_blockers
.begin(), m_blockers
, m_blockers
.begin(), m_blockers
.end());
771 for (auto b
: tmp_blockers
) {
776 int TokenBucketThrottle::set_limit(uint64_t average
, uint64_t burst
, uint64_t burst_seconds
) {
778 std::lock_guard lock
{m_lock
};
780 if (0 < burst
&& burst
< average
) {
781 // the burst should never less than the average.
789 // The limit is not set, and no tokens will be put into the bucket.
790 // So, we can schedule the timer slowly, or even cancel it.
793 // calculate the tick(ms), don't less than the minimum.
794 m_tick
= 1000 / average
;
795 if (m_tick
< m_tick_min
) {
799 // this is for the number(avg) can not be divisible.
800 m_ticks_per_second
= 1000 / m_tick
;
803 // for the default configuration of burst.
804 m_throttle
.set_max(0 == burst
? average
: burst
, burst_seconds
);
806 // turn millisecond to second
807 m_schedule_tick
= m_tick
/ 1000.0;
810 // The schedule period will be changed when the average rate is set.
812 std::lock_guard timer_locker
{*m_timer_lock
};
819 void TokenBucketThrottle::set_schedule_tick_min(uint64_t tick
) {
820 std::lock_guard
lock(m_lock
);
826 uint64_t TokenBucketThrottle::tokens_filled(double tick
) {
827 return (0 == m_avg
) ? 0 : (tick
/ m_ticks_per_second
* m_avg
);
830 uint64_t TokenBucketThrottle::tokens_this_tick() {
834 if (m_current_tick
>= m_ticks_per_second
) {
839 return tokens_filled(m_current_tick
) - tokens_filled(m_current_tick
- 1);
842 void TokenBucketThrottle::add_tokens() {
843 list
<Blocker
> tmp_blockers
;
845 std::lock_guard
lock(m_lock
);
846 // put tokens into bucket.
847 double burst_ratio
= 1.0;
848 if (m_throttle
.max
> m_avg
&& m_avg
> 0){
849 burst_ratio
= (double)m_throttle
.max
/m_avg
;
851 m_throttle
.put(tokens_this_tick(), burst_ratio
);
852 if (0 == m_avg
|| 0 == m_throttle
.max
)
853 tmp_blockers
.swap(m_blockers
);
854 // check the m_blockers from head to tail, if blocker can get
855 // enough tokens, let it go.
856 while (!m_blockers
.empty()) {
857 Blocker
&blocker
= m_blockers
.front();
858 uint64_t got
= m_throttle
.get(blocker
.tokens_requested
);
859 if (got
== blocker
.tokens_requested
) {
860 // got enough tokens for front.
861 tmp_blockers
.splice(tmp_blockers
.end(), m_blockers
, m_blockers
.begin());
863 // there is no more tokens.
864 blocker
.tokens_requested
-= got
;
870 for (auto b
: tmp_blockers
) {
875 void TokenBucketThrottle::schedule_timer() {
876 m_token_ctx
= new LambdaContext(
880 m_timer
->add_event_after(m_schedule_tick
, m_token_ctx
);
885 void TokenBucketThrottle::cancel_timer() {
886 m_timer
->cancel_event(m_token_ctx
);