1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/scope_guard.h"
6 #include "common/Throttle.h"
7 #include "common/perf_counters.h"
9 // re-include our assert to clobber the system one; fix dout:
10 #include "include/assert.h"
12 #define dout_subsys ceph_subsys_throttle
15 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
18 l_throttle_first
= 532430,
21 l_throttle_get_started
,
24 l_throttle_get_or_fail_fail
,
25 l_throttle_get_or_fail_success
,
34 Throttle::Throttle(CephContext
*cct
, const std::string
& n
, int64_t m
, bool _use_perf
)
35 : cct(cct
), name(n
), logger(NULL
),
37 lock("Throttle::lock"),
45 if (cct
->_conf
->throttler_perf_counter
) {
46 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_throttle_first
, l_throttle_last
);
47 b
.add_u64(l_throttle_val
, "val", "Currently available throttle");
48 b
.add_u64(l_throttle_max
, "max", "Max value for throttle");
49 b
.add_u64_counter(l_throttle_get_started
, "get_started", "Number of get calls, increased before wait");
50 b
.add_u64_counter(l_throttle_get
, "get", "Gets");
51 b
.add_u64_counter(l_throttle_get_sum
, "get_sum", "Got data");
52 b
.add_u64_counter(l_throttle_get_or_fail_fail
, "get_or_fail_fail", "Get blocked during get_or_fail");
53 b
.add_u64_counter(l_throttle_get_or_fail_success
, "get_or_fail_success", "Successful get during get_or_fail");
54 b
.add_u64_counter(l_throttle_take
, "take", "Takes");
55 b
.add_u64_counter(l_throttle_take_sum
, "take_sum", "Taken data");
56 b
.add_u64_counter(l_throttle_put
, "put", "Puts");
57 b
.add_u64_counter(l_throttle_put_sum
, "put_sum", "Put data");
58 b
.add_time_avg(l_throttle_wait
, "wait", "Waiting latency");
60 logger
= b
.create_perf_counters();
61 cct
->get_perfcounters_collection()->add(logger
);
62 logger
->set(l_throttle_max
, max
);
69 Mutex::Locker
l(lock
);
77 cct
->get_perfcounters_collection()->remove(logger
);
82 void Throttle::_reset_max(int64_t m
)
84 assert(lock
.is_locked());
85 if (static_cast<int64_t>(max
) == m
)
88 cond
.front()->SignalOne();
90 logger
->set(l_throttle_max
, m
);
94 bool Throttle::_wait(int64_t c
)
98 if (_should_wait(c
) || !cond
.empty()) { // always wait behind other waiters.
100 auto cv
= cond
.insert(cond
.end(), new Cond
);
101 auto w
= make_scope_guard([this, cv
]() {
106 ldout(cct
, 2) << "_wait waiting..." << dendl
;
108 start
= ceph_clock_now();
112 } while ((_should_wait(c
) || cv
!= cond
.begin()));
114 ldout(cct
, 2) << "_wait finished waiting" << dendl
;
116 utime_t dur
= ceph_clock_now() - start
;
117 logger
->tinc(l_throttle_wait
, dur
);
120 // wake up the next guy
122 cond
.front()->SignalOne();
127 bool Throttle::wait(int64_t m
)
129 if (0 == max
&& 0 == m
) {
133 Mutex::Locker
l(lock
);
138 ldout(cct
, 10) << "wait" << dendl
;
142 int64_t Throttle::take(int64_t c
)
148 ldout(cct
, 10) << "take " << c
<< dendl
;
150 Mutex::Locker
l(lock
);
154 logger
->inc(l_throttle_take
);
155 logger
->inc(l_throttle_take_sum
, c
);
156 logger
->set(l_throttle_val
, count
);
161 bool Throttle::get(int64_t c
, int64_t m
)
163 if (0 == max
&& 0 == m
) {
168 ldout(cct
, 10) << "get " << c
<< " (" << count
.load() << " -> " << (count
.load() + c
) << ")" << dendl
;
170 logger
->inc(l_throttle_get_started
);
174 Mutex::Locker
l(lock
);
183 logger
->inc(l_throttle_get
);
184 logger
->inc(l_throttle_get_sum
, c
);
185 logger
->set(l_throttle_val
, count
);
190 /* Returns true if it successfully got the requested amount,
191 * or false if it would block.
193 bool Throttle::get_or_fail(int64_t c
)
200 Mutex::Locker
l(lock
);
201 if (_should_wait(c
) || !cond
.empty()) {
202 ldout(cct
, 10) << "get_or_fail " << c
<< " failed" << dendl
;
204 logger
->inc(l_throttle_get_or_fail_fail
);
208 ldout(cct
, 10) << "get_or_fail " << c
<< " success (" << count
.load() << " -> " << (count
.load() + c
) << ")" << dendl
;
211 logger
->inc(l_throttle_get_or_fail_success
);
212 logger
->inc(l_throttle_get
);
213 logger
->inc(l_throttle_get_sum
, c
);
214 logger
->set(l_throttle_val
, count
);
220 int64_t Throttle::put(int64_t c
)
227 ldout(cct
, 10) << "put " << c
<< " (" << count
.load() << " -> " << (count
.load()-c
) << ")" << dendl
;
228 Mutex::Locker
l(lock
);
231 cond
.front()->SignalOne();
232 assert(static_cast<int64_t>(count
) >= c
); // if count goes negative, we failed somewhere!
235 logger
->inc(l_throttle_put
);
236 logger
->inc(l_throttle_put_sum
, c
);
237 logger
->set(l_throttle_val
, count
);
243 void Throttle::reset()
245 Mutex::Locker
l(lock
);
247 cond
.front()->SignalOne();
250 logger
->set(l_throttle_val
, 0);
255 l_backoff_throttle_first
= l_throttle_last
+ 1,
256 l_backoff_throttle_val
,
257 l_backoff_throttle_max
,
258 l_backoff_throttle_get
,
259 l_backoff_throttle_get_sum
,
260 l_backoff_throttle_take
,
261 l_backoff_throttle_take_sum
,
262 l_backoff_throttle_put
,
263 l_backoff_throttle_put_sum
,
264 l_backoff_throttle_wait
,
265 l_backoff_throttle_last
,
268 BackoffThrottle::BackoffThrottle(CephContext
*cct
, const std::string
& n
, unsigned expected_concurrency
, bool _use_perf
)
269 : cct(cct
), name(n
), logger(NULL
),
270 conds(expected_concurrency
),///< [in] determines size of conds
276 if (cct
->_conf
->throttler_perf_counter
) {
277 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_backoff_throttle_first
, l_backoff_throttle_last
);
278 b
.add_u64(l_backoff_throttle_val
, "val", "Currently available throttle");
279 b
.add_u64(l_backoff_throttle_max
, "max", "Max value for throttle");
280 b
.add_u64_counter(l_backoff_throttle_get
, "get", "Gets");
281 b
.add_u64_counter(l_backoff_throttle_get_sum
, "get_sum", "Got data");
282 b
.add_u64_counter(l_backoff_throttle_take
, "take", "Takes");
283 b
.add_u64_counter(l_backoff_throttle_take_sum
, "take_sum", "Taken data");
284 b
.add_u64_counter(l_backoff_throttle_put
, "put", "Puts");
285 b
.add_u64_counter(l_backoff_throttle_put_sum
, "put_sum", "Put data");
286 b
.add_time_avg(l_backoff_throttle_wait
, "wait", "Waiting latency");
288 logger
= b
.create_perf_counters();
289 cct
->get_perfcounters_collection()->add(logger
);
290 logger
->set(l_backoff_throttle_max
, max
);
294 BackoffThrottle::~BackoffThrottle()
298 assert(waiters
.empty());
305 cct
->get_perfcounters_collection()->remove(logger
);
310 bool BackoffThrottle::set_params(
311 double _low_threshhold
,
312 double _high_threshhold
,
313 double _expected_throughput
,
314 double _high_multiple
,
315 double _max_multiple
,
316 uint64_t _throttle_max
,
320 if (_low_threshhold
> _high_threshhold
) {
323 *errstream
<< "low_threshhold (" << _low_threshhold
324 << ") > high_threshhold (" << _high_threshhold
329 if (_high_multiple
> _max_multiple
) {
332 *errstream
<< "_high_multiple (" << _high_multiple
333 << ") > _max_multiple (" << _max_multiple
338 if (_low_threshhold
> 1 || _low_threshhold
< 0) {
341 *errstream
<< "invalid low_threshhold (" << _low_threshhold
<< ")"
346 if (_high_threshhold
> 1 || _high_threshhold
< 0) {
349 *errstream
<< "invalid high_threshhold (" << _high_threshhold
<< ")"
354 if (_max_multiple
< 0) {
357 *errstream
<< "invalid _max_multiple ("
358 << _max_multiple
<< ")"
363 if (_high_multiple
< 0) {
366 *errstream
<< "invalid _high_multiple ("
367 << _high_multiple
<< ")"
372 if (_expected_throughput
< 0) {
375 *errstream
<< "invalid _expected_throughput("
376 << _expected_throughput
<< ")"
385 low_threshhold
= _low_threshhold
;
386 high_threshhold
= _high_threshhold
;
387 high_delay_per_count
= _high_multiple
/ _expected_throughput
;
388 max_delay_per_count
= _max_multiple
/ _expected_throughput
;
392 logger
->set(l_backoff_throttle_max
, max
);
394 if (high_threshhold
- low_threshhold
> 0) {
395 s0
= high_delay_per_count
/ (high_threshhold
- low_threshhold
);
397 low_threshhold
= high_threshhold
;
401 if (1 - high_threshhold
> 0) {
402 s1
= (max_delay_per_count
- high_delay_per_count
)
403 / (1 - high_threshhold
);
413 std::chrono::duration
<double> BackoffThrottle::_get_delay(uint64_t c
) const
416 return std::chrono::duration
<double>(0);
418 double r
= ((double)current
) / ((double)max
);
419 if (r
< low_threshhold
) {
420 return std::chrono::duration
<double>(0);
421 } else if (r
< high_threshhold
) {
422 return c
* std::chrono::duration
<double>(
423 (r
- low_threshhold
) * s0
);
425 return c
* std::chrono::duration
<double>(
426 high_delay_per_count
+ ((r
- high_threshhold
) * s1
));
430 std::chrono::duration
<double> BackoffThrottle::get(uint64_t c
)
433 auto delay
= _get_delay(c
);
436 logger
->inc(l_backoff_throttle_get
);
437 logger
->inc(l_backoff_throttle_get_sum
, c
);
441 if (delay
== std::chrono::duration
<double>(0) &&
443 ((max
== 0) || (current
== 0) || ((current
+ c
) <= max
))) {
447 logger
->set(l_backoff_throttle_val
, current
);
450 return std::chrono::duration
<double>(0);
453 auto ticket
= _push_waiter();
454 utime_t wait_from
= ceph_clock_now();
457 while (waiters
.begin() != ticket
) {
462 auto start
= std::chrono::system_clock::now();
463 delay
= _get_delay(c
);
465 if (!((max
== 0) || (current
== 0) || (current
+ c
) <= max
)) {
468 } else if (delay
> std::chrono::duration
<double>(0)) {
469 (*ticket
)->wait_for(l
, delay
);
474 assert(ticket
== waiters
.begin());
475 delay
= _get_delay(c
) - (std::chrono::system_clock::now() - start
);
483 logger
->set(l_backoff_throttle_val
, current
);
485 logger
->tinc(l_backoff_throttle_wait
, ceph_clock_now() - wait_from
);
489 return std::chrono::system_clock::now() - start
;
492 uint64_t BackoffThrottle::put(uint64_t c
)
495 assert(current
>= c
);
500 logger
->inc(l_backoff_throttle_put
);
501 logger
->inc(l_backoff_throttle_put_sum
, c
);
502 logger
->set(l_backoff_throttle_val
, current
);
508 uint64_t BackoffThrottle::take(uint64_t c
)
514 logger
->inc(l_backoff_throttle_take
);
515 logger
->inc(l_backoff_throttle_take_sum
, c
);
516 logger
->set(l_backoff_throttle_val
, current
);
522 uint64_t BackoffThrottle::get_current()
528 uint64_t BackoffThrottle::get_max()
534 SimpleThrottle::SimpleThrottle(uint64_t max
, bool ignore_enoent
)
535 : m_lock("SimpleThrottle"),
539 m_ignore_enoent(ignore_enoent
)
543 SimpleThrottle::~SimpleThrottle()
545 Mutex::Locker
l(m_lock
);
546 assert(m_current
== 0);
547 assert(waiters
== 0);
550 void SimpleThrottle::start_op()
552 Mutex::Locker
l(m_lock
);
553 while (m_max
== m_current
) {
561 void SimpleThrottle::end_op(int r
)
563 Mutex::Locker
l(m_lock
);
565 if (r
< 0 && !m_ret
&& !(r
== -ENOENT
&& m_ignore_enoent
))
570 bool SimpleThrottle::pending_error() const
572 Mutex::Locker
l(m_lock
);
576 int SimpleThrottle::wait_for_ret()
578 Mutex::Locker
l(m_lock
);
579 while (m_current
> 0) {
587 void C_OrderedThrottle::finish(int r
) {
588 m_ordered_throttle
->finish_op(m_tid
, r
);
591 OrderedThrottle::OrderedThrottle(uint64_t max
, bool ignore_enoent
)
592 : m_lock("OrderedThrottle::m_lock"), m_max(max
), m_current(0), m_ret_val(0),
593 m_ignore_enoent(ignore_enoent
), m_next_tid(0), m_complete_tid(0) {
596 OrderedThrottle::~OrderedThrottle() {
597 Mutex::Locker
locker(m_lock
);
598 assert(waiters
== 0);
601 C_OrderedThrottle
*OrderedThrottle::start_op(Context
*on_finish
) {
602 assert(on_finish
!= NULL
);
604 Mutex::Locker
locker(m_lock
);
605 uint64_t tid
= m_next_tid
++;
606 m_tid_result
[tid
] = Result(on_finish
);
607 C_OrderedThrottle
*ctx
= new C_OrderedThrottle(this, tid
);
609 complete_pending_ops();
610 while (m_max
== m_current
) {
614 complete_pending_ops();
621 void OrderedThrottle::end_op(int r
) {
622 Mutex::Locker
locker(m_lock
);
623 assert(m_current
> 0);
625 if (r
< 0 && m_ret_val
== 0 && (r
!= -ENOENT
|| !m_ignore_enoent
)) {
632 void OrderedThrottle::finish_op(uint64_t tid
, int r
) {
633 Mutex::Locker
locker(m_lock
);
635 TidResult::iterator it
= m_tid_result
.find(tid
);
636 assert(it
!= m_tid_result
.end());
638 it
->second
.finished
= true;
639 it
->second
.ret_val
= r
;
643 bool OrderedThrottle::pending_error() const {
644 Mutex::Locker
locker(m_lock
);
645 return (m_ret_val
< 0);
648 int OrderedThrottle::wait_for_ret() {
649 Mutex::Locker
locker(m_lock
);
650 complete_pending_ops();
652 while (m_current
> 0) {
656 complete_pending_ops();
661 void OrderedThrottle::complete_pending_ops() {
662 assert(m_lock
.is_locked());
665 TidResult::iterator it
= m_tid_result
.begin();
666 if (it
== m_tid_result
.end() || it
->first
!= m_complete_tid
||
667 !it
->second
.finished
) {
671 Result result
= it
->second
;
672 m_tid_result
.erase(it
);
675 result
.on_finish
->complete(result
.ret_val
);