1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/scope_guard.h"
6 #include "common/Throttle.h"
7 #include "common/perf_counters.h"
9 // re-include our assert to clobber the system one; fix dout:
10 #include "include/assert.h"
12 #define dout_subsys ceph_subsys_throttle
15 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
18 l_throttle_first
= 532430,
21 l_throttle_get_started
,
24 l_throttle_get_or_fail_fail
,
25 l_throttle_get_or_fail_success
,
34 Throttle::Throttle(CephContext
*cct
, const std::string
& n
, int64_t m
, bool _use_perf
)
35 : cct(cct
), name(n
), logger(NULL
),
37 lock("Throttle::lock"),
45 if (cct
->_conf
->throttler_perf_counter
) {
46 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_throttle_first
, l_throttle_last
);
47 b
.add_u64(l_throttle_val
, "val", "Currently available throttle");
48 b
.add_u64(l_throttle_max
, "max", "Max value for throttle");
49 b
.add_u64_counter(l_throttle_get_started
, "get_started", "Number of get calls, increased before wait");
50 b
.add_u64_counter(l_throttle_get
, "get", "Gets");
51 b
.add_u64_counter(l_throttle_get_sum
, "get_sum", "Got data");
52 b
.add_u64_counter(l_throttle_get_or_fail_fail
, "get_or_fail_fail", "Get blocked during get_or_fail");
53 b
.add_u64_counter(l_throttle_get_or_fail_success
, "get_or_fail_success", "Successful get during get_or_fail");
54 b
.add_u64_counter(l_throttle_take
, "take", "Takes");
55 b
.add_u64_counter(l_throttle_take_sum
, "take_sum", "Taken data");
56 b
.add_u64_counter(l_throttle_put
, "put", "Puts");
57 b
.add_u64_counter(l_throttle_put_sum
, "put_sum", "Put data");
58 b
.add_time_avg(l_throttle_wait
, "wait", "Waiting latency");
60 logger
= b
.create_perf_counters();
61 cct
->get_perfcounters_collection()->add(logger
);
62 logger
->set(l_throttle_max
, max
);
69 Mutex::Locker
l(lock
);
77 cct
->get_perfcounters_collection()->remove(logger
);
82 void Throttle::_reset_max(int64_t m
)
85 assert(lock
.is_locked());
89 cond
.front()->SignalOne();
91 logger
->set(l_throttle_max
, m
);
95 bool Throttle::_wait(int64_t c
)
99 if (_should_wait(c
) || !cond
.empty()) { // always wait behind other waiters.
101 auto cv
= cond
.insert(cond
.end(), new Cond
);
102 auto w
= make_scope_guard([this, cv
]() {
107 ldout(cct
, 2) << "_wait waiting..." << dendl
;
109 start
= ceph_clock_now();
113 } while ((_should_wait(c
) || cv
!= cond
.begin()));
115 ldout(cct
, 2) << "_wait finished waiting" << dendl
;
117 utime_t dur
= ceph_clock_now() - start
;
118 logger
->tinc(l_throttle_wait
, dur
);
121 // wake up the next guy
123 cond
.front()->SignalOne();
128 bool Throttle::wait(int64_t m
)
130 if (0 == max
&& 0 == m
) {
134 Mutex::Locker
l(lock
);
139 ldout(cct
, 10) << "wait" << dendl
;
143 int64_t Throttle::take(int64_t c
)
149 ldout(cct
, 10) << "take " << c
<< dendl
;
151 Mutex::Locker
l(lock
);
155 logger
->inc(l_throttle_take
);
156 logger
->inc(l_throttle_take_sum
, c
);
157 logger
->set(l_throttle_val
, count
);
162 bool Throttle::get(int64_t c
, int64_t m
)
164 if (0 == max
&& 0 == m
) {
169 ldout(cct
, 10) << "get " << c
<< " (" << count
.load() << " -> " << (count
.load() + c
) << ")" << dendl
;
171 logger
->inc(l_throttle_get_started
);
175 Mutex::Locker
l(lock
);
184 logger
->inc(l_throttle_get
);
185 logger
->inc(l_throttle_get_sum
, c
);
186 logger
->set(l_throttle_val
, count
);
191 /* Returns true if it successfully got the requested amount,
192 * or false if it would block.
194 bool Throttle::get_or_fail(int64_t c
)
201 Mutex::Locker
l(lock
);
202 if (_should_wait(c
) || !cond
.empty()) {
203 ldout(cct
, 10) << "get_or_fail " << c
<< " failed" << dendl
;
205 logger
->inc(l_throttle_get_or_fail_fail
);
209 ldout(cct
, 10) << "get_or_fail " << c
<< " success (" << count
.load() << " -> " << (count
.load() + c
) << ")" << dendl
;
212 logger
->inc(l_throttle_get_or_fail_success
);
213 logger
->inc(l_throttle_get
);
214 logger
->inc(l_throttle_get_sum
, c
);
215 logger
->set(l_throttle_val
, count
);
221 int64_t Throttle::put(int64_t c
)
228 ldout(cct
, 10) << "put " << c
<< " (" << count
.load() << " -> " << (count
.load()-c
) << ")" << dendl
;
229 Mutex::Locker
l(lock
);
232 cond
.front()->SignalOne();
233 // if count goes negative, we failed somewhere!
237 logger
->inc(l_throttle_put
);
238 logger
->inc(l_throttle_put_sum
, c
);
239 logger
->set(l_throttle_val
, count
);
245 void Throttle::reset()
247 Mutex::Locker
l(lock
);
249 cond
.front()->SignalOne();
252 logger
->set(l_throttle_val
, 0);
257 l_backoff_throttle_first
= l_throttle_last
+ 1,
258 l_backoff_throttle_val
,
259 l_backoff_throttle_max
,
260 l_backoff_throttle_get
,
261 l_backoff_throttle_get_sum
,
262 l_backoff_throttle_take
,
263 l_backoff_throttle_take_sum
,
264 l_backoff_throttle_put
,
265 l_backoff_throttle_put_sum
,
266 l_backoff_throttle_wait
,
267 l_backoff_throttle_last
,
270 BackoffThrottle::BackoffThrottle(CephContext
*cct
, const std::string
& n
, unsigned expected_concurrency
, bool _use_perf
)
271 : cct(cct
), name(n
), logger(NULL
),
272 conds(expected_concurrency
),///< [in] determines size of conds
278 if (cct
->_conf
->throttler_perf_counter
) {
279 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_backoff_throttle_first
, l_backoff_throttle_last
);
280 b
.add_u64(l_backoff_throttle_val
, "val", "Currently available throttle");
281 b
.add_u64(l_backoff_throttle_max
, "max", "Max value for throttle");
282 b
.add_u64_counter(l_backoff_throttle_get
, "get", "Gets");
283 b
.add_u64_counter(l_backoff_throttle_get_sum
, "get_sum", "Got data");
284 b
.add_u64_counter(l_backoff_throttle_take
, "take", "Takes");
285 b
.add_u64_counter(l_backoff_throttle_take_sum
, "take_sum", "Taken data");
286 b
.add_u64_counter(l_backoff_throttle_put
, "put", "Puts");
287 b
.add_u64_counter(l_backoff_throttle_put_sum
, "put_sum", "Put data");
288 b
.add_time_avg(l_backoff_throttle_wait
, "wait", "Waiting latency");
290 logger
= b
.create_perf_counters();
291 cct
->get_perfcounters_collection()->add(logger
);
292 logger
->set(l_backoff_throttle_max
, max
);
296 BackoffThrottle::~BackoffThrottle()
300 assert(waiters
.empty());
307 cct
->get_perfcounters_collection()->remove(logger
);
312 bool BackoffThrottle::set_params(
313 double _low_threshhold
,
314 double _high_threshhold
,
315 double _expected_throughput
,
316 double _high_multiple
,
317 double _max_multiple
,
318 uint64_t _throttle_max
,
322 if (_low_threshhold
> _high_threshhold
) {
325 *errstream
<< "low_threshhold (" << _low_threshhold
326 << ") > high_threshhold (" << _high_threshhold
331 if (_high_multiple
> _max_multiple
) {
334 *errstream
<< "_high_multiple (" << _high_multiple
335 << ") > _max_multiple (" << _max_multiple
340 if (_low_threshhold
> 1 || _low_threshhold
< 0) {
343 *errstream
<< "invalid low_threshhold (" << _low_threshhold
<< ")"
348 if (_high_threshhold
> 1 || _high_threshhold
< 0) {
351 *errstream
<< "invalid high_threshhold (" << _high_threshhold
<< ")"
356 if (_max_multiple
< 0) {
359 *errstream
<< "invalid _max_multiple ("
360 << _max_multiple
<< ")"
365 if (_high_multiple
< 0) {
368 *errstream
<< "invalid _high_multiple ("
369 << _high_multiple
<< ")"
374 if (_expected_throughput
< 0) {
377 *errstream
<< "invalid _expected_throughput("
378 << _expected_throughput
<< ")"
387 low_threshhold
= _low_threshhold
;
388 high_threshhold
= _high_threshhold
;
389 high_delay_per_count
= _high_multiple
/ _expected_throughput
;
390 max_delay_per_count
= _max_multiple
/ _expected_throughput
;
394 logger
->set(l_backoff_throttle_max
, max
);
396 if (high_threshhold
- low_threshhold
> 0) {
397 s0
= high_delay_per_count
/ (high_threshhold
- low_threshhold
);
399 low_threshhold
= high_threshhold
;
403 if (1 - high_threshhold
> 0) {
404 s1
= (max_delay_per_count
- high_delay_per_count
)
405 / (1 - high_threshhold
);
415 std::chrono::duration
<double> BackoffThrottle::_get_delay(uint64_t c
) const
418 return std::chrono::duration
<double>(0);
420 double r
= ((double)current
) / ((double)max
);
421 if (r
< low_threshhold
) {
422 return std::chrono::duration
<double>(0);
423 } else if (r
< high_threshhold
) {
424 return c
* std::chrono::duration
<double>(
425 (r
- low_threshhold
) * s0
);
427 return c
* std::chrono::duration
<double>(
428 high_delay_per_count
+ ((r
- high_threshhold
) * s1
));
432 std::chrono::duration
<double> BackoffThrottle::get(uint64_t c
)
435 auto delay
= _get_delay(c
);
438 logger
->inc(l_backoff_throttle_get
);
439 logger
->inc(l_backoff_throttle_get_sum
, c
);
443 if (delay
== std::chrono::duration
<double>(0) &&
445 ((max
== 0) || (current
== 0) || ((current
+ c
) <= max
))) {
449 logger
->set(l_backoff_throttle_val
, current
);
452 return std::chrono::duration
<double>(0);
455 auto ticket
= _push_waiter();
456 utime_t wait_from
= ceph_clock_now();
459 while (waiters
.begin() != ticket
) {
464 auto start
= std::chrono::system_clock::now();
465 delay
= _get_delay(c
);
467 if (!((max
== 0) || (current
== 0) || (current
+ c
) <= max
)) {
470 } else if (delay
> std::chrono::duration
<double>(0)) {
471 (*ticket
)->wait_for(l
, delay
);
476 assert(ticket
== waiters
.begin());
477 delay
= _get_delay(c
) - (std::chrono::system_clock::now() - start
);
485 logger
->set(l_backoff_throttle_val
, current
);
487 logger
->tinc(l_backoff_throttle_wait
, ceph_clock_now() - wait_from
);
491 return std::chrono::system_clock::now() - start
;
494 uint64_t BackoffThrottle::put(uint64_t c
)
497 assert(current
>= c
);
502 logger
->inc(l_backoff_throttle_put
);
503 logger
->inc(l_backoff_throttle_put_sum
, c
);
504 logger
->set(l_backoff_throttle_val
, current
);
510 uint64_t BackoffThrottle::take(uint64_t c
)
516 logger
->inc(l_backoff_throttle_take
);
517 logger
->inc(l_backoff_throttle_take_sum
, c
);
518 logger
->set(l_backoff_throttle_val
, current
);
524 uint64_t BackoffThrottle::get_current()
530 uint64_t BackoffThrottle::get_max()
536 SimpleThrottle::SimpleThrottle(uint64_t max
, bool ignore_enoent
)
537 : m_lock("SimpleThrottle"),
541 m_ignore_enoent(ignore_enoent
)
545 SimpleThrottle::~SimpleThrottle()
547 Mutex::Locker
l(m_lock
);
548 assert(m_current
== 0);
549 assert(waiters
== 0);
552 void SimpleThrottle::start_op()
554 Mutex::Locker
l(m_lock
);
555 while (m_max
== m_current
) {
563 void SimpleThrottle::end_op(int r
)
565 Mutex::Locker
l(m_lock
);
567 if (r
< 0 && !m_ret
&& !(r
== -ENOENT
&& m_ignore_enoent
))
572 bool SimpleThrottle::pending_error() const
574 Mutex::Locker
l(m_lock
);
578 int SimpleThrottle::wait_for_ret()
580 Mutex::Locker
l(m_lock
);
581 while (m_current
> 0) {
589 void C_OrderedThrottle::finish(int r
) {
590 m_ordered_throttle
->finish_op(m_tid
, r
);
593 OrderedThrottle::OrderedThrottle(uint64_t max
, bool ignore_enoent
)
594 : m_lock("OrderedThrottle::m_lock"), m_max(max
), m_current(0), m_ret_val(0),
595 m_ignore_enoent(ignore_enoent
), m_next_tid(0), m_complete_tid(0) {
598 OrderedThrottle::~OrderedThrottle() {
599 Mutex::Locker
locker(m_lock
);
600 assert(waiters
== 0);
603 C_OrderedThrottle
*OrderedThrottle::start_op(Context
*on_finish
) {
604 assert(on_finish
!= NULL
);
606 Mutex::Locker
locker(m_lock
);
607 uint64_t tid
= m_next_tid
++;
608 m_tid_result
[tid
] = Result(on_finish
);
609 C_OrderedThrottle
*ctx
= new C_OrderedThrottle(this, tid
);
611 complete_pending_ops();
612 while (m_max
== m_current
) {
616 complete_pending_ops();
623 void OrderedThrottle::end_op(int r
) {
624 Mutex::Locker
locker(m_lock
);
625 assert(m_current
> 0);
627 if (r
< 0 && m_ret_val
== 0 && (r
!= -ENOENT
|| !m_ignore_enoent
)) {
634 void OrderedThrottle::finish_op(uint64_t tid
, int r
) {
635 Mutex::Locker
locker(m_lock
);
637 TidResult::iterator it
= m_tid_result
.find(tid
);
638 assert(it
!= m_tid_result
.end());
640 it
->second
.finished
= true;
641 it
->second
.ret_val
= r
;
645 bool OrderedThrottle::pending_error() const {
646 Mutex::Locker
locker(m_lock
);
647 return (m_ret_val
< 0);
650 int OrderedThrottle::wait_for_ret() {
651 Mutex::Locker
locker(m_lock
);
652 complete_pending_ops();
654 while (m_current
> 0) {
658 complete_pending_ops();
663 void OrderedThrottle::complete_pending_ops() {
664 assert(m_lock
.is_locked());
667 TidResult::iterator it
= m_tid_result
.begin();
668 if (it
== m_tid_result
.end() || it
->first
!= m_complete_tid
||
669 !it
->second
.finished
) {
673 Result result
= it
->second
;
674 m_tid_result
.erase(it
);
677 result
.on_finish
->complete(result
.ret_val
);