1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
7 #include "common/Throttle.h"
8 #include "common/dout.h"
9 #include "common/ceph_context.h"
10 #include "common/perf_counters.h"
12 #define dout_subsys ceph_subsys_throttle
15 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
18 l_throttle_first
= 532430,
21 l_throttle_get_started
,
24 l_throttle_get_or_fail_fail
,
25 l_throttle_get_or_fail_success
,
34 Throttle::Throttle(CephContext
*cct
, const std::string
& n
, int64_t m
, bool _use_perf
)
35 : cct(cct
), name(n
), logger(NULL
),
37 lock("Throttle::lock"),
45 if (cct
->_conf
->throttler_perf_counter
) {
46 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_throttle_first
, l_throttle_last
);
47 b
.add_u64(l_throttle_val
, "val", "Currently available throttle");
48 b
.add_u64(l_throttle_max
, "max", "Max value for throttle");
49 b
.add_u64_counter(l_throttle_get_started
, "get_started", "Number of get calls, increased before wait");
50 b
.add_u64_counter(l_throttle_get
, "get", "Gets");
51 b
.add_u64_counter(l_throttle_get_sum
, "get_sum", "Got data");
52 b
.add_u64_counter(l_throttle_get_or_fail_fail
, "get_or_fail_fail", "Get blocked during get_or_fail");
53 b
.add_u64_counter(l_throttle_get_or_fail_success
, "get_or_fail_success", "Successful get during get_or_fail");
54 b
.add_u64_counter(l_throttle_take
, "take", "Takes");
55 b
.add_u64_counter(l_throttle_take_sum
, "take_sum", "Taken data");
56 b
.add_u64_counter(l_throttle_put
, "put", "Puts");
57 b
.add_u64_counter(l_throttle_put_sum
, "put_sum", "Put data");
58 b
.add_time_avg(l_throttle_wait
, "wait", "Waiting latency");
60 logger
= b
.create_perf_counters();
61 cct
->get_perfcounters_collection()->add(logger
);
62 logger
->set(l_throttle_max
, max
.read());
68 while (!cond
.empty()) {
69 Cond
*cv
= cond
.front();
78 cct
->get_perfcounters_collection()->remove(logger
);
83 void Throttle::_reset_max(int64_t m
)
85 assert(lock
.is_locked());
86 if ((int64_t)max
.read() == m
)
89 cond
.front()->SignalOne();
91 logger
->set(l_throttle_max
, m
);
95 bool Throttle::_wait(int64_t c
)
99 if (_should_wait(c
) || !cond
.empty()) { // always wait behind other waiters.
103 ldout(cct
, 2) << "_wait waiting..." << dendl
;
105 start
= ceph_clock_now();
109 } while (_should_wait(c
) || cv
!= cond
.front());
111 ldout(cct
, 2) << "_wait finished waiting" << dendl
;
113 utime_t dur
= ceph_clock_now() - start
;
114 logger
->tinc(l_throttle_wait
, dur
);
120 // wake up the next guy
122 cond
.front()->SignalOne();
127 bool Throttle::wait(int64_t m
)
129 if (0 == max
.read() && 0 == m
) {
133 Mutex::Locker
l(lock
);
138 ldout(cct
, 10) << "wait" << dendl
;
142 int64_t Throttle::take(int64_t c
)
144 if (0 == max
.read()) {
148 ldout(cct
, 10) << "take " << c
<< dendl
;
150 Mutex::Locker
l(lock
);
154 logger
->inc(l_throttle_take
);
155 logger
->inc(l_throttle_take_sum
, c
);
156 logger
->set(l_throttle_val
, count
.read());
161 bool Throttle::get(int64_t c
, int64_t m
)
163 if (0 == max
.read() && 0 == m
) {
168 ldout(cct
, 10) << "get " << c
<< " (" << count
.read() << " -> " << (count
.read() + c
) << ")" << dendl
;
170 logger
->inc(l_throttle_get_started
);
174 Mutex::Locker
l(lock
);
183 logger
->inc(l_throttle_get
);
184 logger
->inc(l_throttle_get_sum
, c
);
185 logger
->set(l_throttle_val
, count
.read());
190 /* Returns true if it successfully got the requested amount,
191 * or false if it would block.
193 bool Throttle::get_or_fail(int64_t c
)
195 if (0 == max
.read()) {
200 Mutex::Locker
l(lock
);
201 if (_should_wait(c
) || !cond
.empty()) {
202 ldout(cct
, 10) << "get_or_fail " << c
<< " failed" << dendl
;
204 logger
->inc(l_throttle_get_or_fail_fail
);
208 ldout(cct
, 10) << "get_or_fail " << c
<< " success (" << count
.read() << " -> " << (count
.read() + c
) << ")" << dendl
;
211 logger
->inc(l_throttle_get_or_fail_success
);
212 logger
->inc(l_throttle_get
);
213 logger
->inc(l_throttle_get_sum
, c
);
214 logger
->set(l_throttle_val
, count
.read());
220 int64_t Throttle::put(int64_t c
)
222 if (0 == max
.read()) {
227 ldout(cct
, 10) << "put " << c
<< " (" << count
.read() << " -> " << (count
.read()-c
) << ")" << dendl
;
228 Mutex::Locker
l(lock
);
231 cond
.front()->SignalOne();
232 assert(((int64_t)count
.read()) >= c
); //if count goes negative, we failed somewhere!
235 logger
->inc(l_throttle_put
);
236 logger
->inc(l_throttle_put_sum
, c
);
237 logger
->set(l_throttle_val
, count
.read());
243 void Throttle::reset()
245 Mutex::Locker
l(lock
);
247 cond
.front()->SignalOne();
250 logger
->set(l_throttle_val
, 0);
255 l_backoff_throttle_first
= l_throttle_last
+ 1,
256 l_backoff_throttle_val
,
257 l_backoff_throttle_max
,
258 l_backoff_throttle_get
,
259 l_backoff_throttle_get_sum
,
260 l_backoff_throttle_take
,
261 l_backoff_throttle_take_sum
,
262 l_backoff_throttle_put
,
263 l_backoff_throttle_put_sum
,
264 l_backoff_throttle_wait
,
265 l_backoff_throttle_last
,
268 BackoffThrottle::BackoffThrottle(CephContext
*cct
, const std::string
& n
, unsigned expected_concurrency
, bool _use_perf
)
269 : cct(cct
), name(n
), logger(NULL
),
270 conds(expected_concurrency
),///< [in] determines size of conds
276 if (cct
->_conf
->throttler_perf_counter
) {
277 PerfCountersBuilder
b(cct
, string("throttle-") + name
, l_backoff_throttle_first
, l_backoff_throttle_last
);
278 b
.add_u64(l_backoff_throttle_val
, "val", "Currently available throttle");
279 b
.add_u64(l_backoff_throttle_max
, "max", "Max value for throttle");
280 b
.add_u64_counter(l_backoff_throttle_get
, "get", "Gets");
281 b
.add_u64_counter(l_backoff_throttle_get_sum
, "get_sum", "Got data");
282 b
.add_u64_counter(l_backoff_throttle_take
, "take", "Takes");
283 b
.add_u64_counter(l_backoff_throttle_take_sum
, "take_sum", "Taken data");
284 b
.add_u64_counter(l_backoff_throttle_put
, "put", "Puts");
285 b
.add_u64_counter(l_backoff_throttle_put_sum
, "put_sum", "Put data");
286 b
.add_time_avg(l_backoff_throttle_wait
, "wait", "Waiting latency");
288 logger
= b
.create_perf_counters();
289 cct
->get_perfcounters_collection()->add(logger
);
290 logger
->set(l_backoff_throttle_max
, max
);
294 BackoffThrottle::~BackoffThrottle()
300 cct
->get_perfcounters_collection()->remove(logger
);
305 bool BackoffThrottle::set_params(
306 double _low_threshhold
,
307 double _high_threshhold
,
308 double _expected_throughput
,
309 double _high_multiple
,
310 double _max_multiple
,
311 uint64_t _throttle_max
,
315 if (_low_threshhold
> _high_threshhold
) {
318 *errstream
<< "low_threshhold (" << _low_threshhold
319 << ") > high_threshhold (" << _high_threshhold
324 if (_high_multiple
> _max_multiple
) {
327 *errstream
<< "_high_multiple (" << _high_multiple
328 << ") > _max_multiple (" << _max_multiple
333 if (_low_threshhold
> 1 || _low_threshhold
< 0) {
336 *errstream
<< "invalid low_threshhold (" << _low_threshhold
<< ")"
341 if (_high_threshhold
> 1 || _high_threshhold
< 0) {
344 *errstream
<< "invalid high_threshhold (" << _high_threshhold
<< ")"
349 if (_max_multiple
< 0) {
352 *errstream
<< "invalid _max_multiple ("
353 << _max_multiple
<< ")"
358 if (_high_multiple
< 0) {
361 *errstream
<< "invalid _high_multiple ("
362 << _high_multiple
<< ")"
367 if (_expected_throughput
< 0) {
370 *errstream
<< "invalid _expected_throughput("
371 << _expected_throughput
<< ")"
380 low_threshhold
= _low_threshhold
;
381 high_threshhold
= _high_threshhold
;
382 high_delay_per_count
= _high_multiple
/ _expected_throughput
;
383 max_delay_per_count
= _max_multiple
/ _expected_throughput
;
387 logger
->set(l_backoff_throttle_max
, max
);
389 if (high_threshhold
- low_threshhold
> 0) {
390 s0
= high_delay_per_count
/ (high_threshhold
- low_threshhold
);
392 low_threshhold
= high_threshhold
;
396 if (1 - high_threshhold
> 0) {
397 s1
= (max_delay_per_count
- high_delay_per_count
)
398 / (1 - high_threshhold
);
408 std::chrono::duration
<double> BackoffThrottle::_get_delay(uint64_t c
) const
411 return std::chrono::duration
<double>(0);
413 double r
= ((double)current
) / ((double)max
);
414 if (r
< low_threshhold
) {
415 return std::chrono::duration
<double>(0);
416 } else if (r
< high_threshhold
) {
417 return c
* std::chrono::duration
<double>(
418 (r
- low_threshhold
) * s0
);
420 return c
* std::chrono::duration
<double>(
421 high_delay_per_count
+ ((r
- high_threshhold
) * s1
));
425 std::chrono::duration
<double> BackoffThrottle::get(uint64_t c
)
428 auto delay
= _get_delay(c
);
431 logger
->inc(l_backoff_throttle_get
);
432 logger
->inc(l_backoff_throttle_get_sum
, c
);
436 if (delay
== std::chrono::duration
<double>(0) &&
438 ((max
== 0) || (current
== 0) || ((current
+ c
) <= max
))) {
442 logger
->set(l_backoff_throttle_val
, current
);
445 return std::chrono::duration
<double>(0);
448 auto ticket
= _push_waiter();
449 utime_t wait_from
= ceph_clock_now();
452 while (waiters
.begin() != ticket
) {
457 auto start
= std::chrono::system_clock::now();
458 delay
= _get_delay(c
);
460 if (!((max
== 0) || (current
== 0) || (current
+ c
) <= max
)) {
463 } else if (delay
> std::chrono::duration
<double>(0)) {
464 (*ticket
)->wait_for(l
, delay
);
469 assert(ticket
== waiters
.begin());
470 delay
= _get_delay(c
) - (std::chrono::system_clock::now() - start
);
478 logger
->set(l_backoff_throttle_val
, current
);
480 logger
->tinc(l_backoff_throttle_wait
, ceph_clock_now() - wait_from
);
484 return std::chrono::system_clock::now() - start
;
487 uint64_t BackoffThrottle::put(uint64_t c
)
490 assert(current
>= c
);
495 logger
->inc(l_backoff_throttle_put
);
496 logger
->inc(l_backoff_throttle_put_sum
, c
);
497 logger
->set(l_backoff_throttle_val
, current
);
503 uint64_t BackoffThrottle::take(uint64_t c
)
509 logger
->inc(l_backoff_throttle_take
);
510 logger
->inc(l_backoff_throttle_take_sum
, c
);
511 logger
->set(l_backoff_throttle_val
, current
);
517 uint64_t BackoffThrottle::get_current()
523 uint64_t BackoffThrottle::get_max()
529 SimpleThrottle::SimpleThrottle(uint64_t max
, bool ignore_enoent
)
530 : m_lock("SimpleThrottle"),
534 m_ignore_enoent(ignore_enoent
)
538 SimpleThrottle::~SimpleThrottle()
540 Mutex::Locker
l(m_lock
);
541 assert(m_current
== 0);
544 void SimpleThrottle::start_op()
546 Mutex::Locker
l(m_lock
);
547 while (m_max
== m_current
)
552 void SimpleThrottle::end_op(int r
)
554 Mutex::Locker
l(m_lock
);
556 if (r
< 0 && !m_ret
&& !(r
== -ENOENT
&& m_ignore_enoent
))
561 bool SimpleThrottle::pending_error() const
563 Mutex::Locker
l(m_lock
);
567 int SimpleThrottle::wait_for_ret()
569 Mutex::Locker
l(m_lock
);
570 while (m_current
> 0)
575 void C_OrderedThrottle::finish(int r
) {
576 m_ordered_throttle
->finish_op(m_tid
, r
);
579 OrderedThrottle::OrderedThrottle(uint64_t max
, bool ignore_enoent
)
580 : m_lock("OrderedThrottle::m_lock"), m_max(max
), m_current(0), m_ret_val(0),
581 m_ignore_enoent(ignore_enoent
), m_next_tid(0), m_complete_tid(0) {
584 C_OrderedThrottle
*OrderedThrottle::start_op(Context
*on_finish
) {
585 assert(on_finish
!= NULL
);
587 Mutex::Locker
locker(m_lock
);
588 uint64_t tid
= m_next_tid
++;
589 m_tid_result
[tid
] = Result(on_finish
);
590 C_OrderedThrottle
*ctx
= new C_OrderedThrottle(this, tid
);
592 complete_pending_ops();
593 while (m_max
== m_current
) {
595 complete_pending_ops();
602 void OrderedThrottle::end_op(int r
) {
603 Mutex::Locker
locker(m_lock
);
604 assert(m_current
> 0);
606 if (r
< 0 && m_ret_val
== 0 && (r
!= -ENOENT
|| !m_ignore_enoent
)) {
613 void OrderedThrottle::finish_op(uint64_t tid
, int r
) {
614 Mutex::Locker
locker(m_lock
);
616 TidResult::iterator it
= m_tid_result
.find(tid
);
617 assert(it
!= m_tid_result
.end());
619 it
->second
.finished
= true;
620 it
->second
.ret_val
= r
;
624 bool OrderedThrottle::pending_error() const {
625 Mutex::Locker
locker(m_lock
);
626 return (m_ret_val
< 0);
629 int OrderedThrottle::wait_for_ret() {
630 Mutex::Locker
locker(m_lock
);
631 complete_pending_ops();
633 while (m_current
> 0) {
635 complete_pending_ops();
640 void OrderedThrottle::complete_pending_ops() {
641 assert(m_lock
.is_locked());
644 TidResult::iterator it
= m_tid_result
.begin();
645 if (it
== m_tid_result
.end() || it
->first
!= m_complete_tid
||
646 !it
->second
.finished
) {
650 Result result
= it
->second
;
651 m_tid_result
.erase(it
);
654 result
.on_finish
->complete(result
.ret_val
);