]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Throttle.cc
update sources to v12.1.2
[ceph.git] / ceph / src / common / Throttle.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
c07f9fc5
FG
4#include "include/scope_guard.h"
5
7c673cae 6#include "common/Throttle.h"
7c673cae
FG
7#include "common/perf_counters.h"
8
31f18b77
FG
9// re-include our assert to clobber the system one; fix dout:
10#include "include/assert.h"
11
7c673cae
FG
12#define dout_subsys ceph_subsys_throttle
13
14#undef dout_prefix
15#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
16
17enum {
18 l_throttle_first = 532430,
19 l_throttle_val,
20 l_throttle_max,
21 l_throttle_get_started,
22 l_throttle_get,
23 l_throttle_get_sum,
24 l_throttle_get_or_fail_fail,
25 l_throttle_get_or_fail_success,
26 l_throttle_take,
27 l_throttle_take_sum,
28 l_throttle_put,
29 l_throttle_put_sum,
30 l_throttle_wait,
31 l_throttle_last,
32};
33
34Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf)
35 : cct(cct), name(n), logger(NULL),
36 max(m),
37 lock("Throttle::lock"),
38 use_perf(_use_perf)
39{
40 assert(m >= 0);
41
42 if (!use_perf)
43 return;
44
45 if (cct->_conf->throttler_perf_counter) {
46 PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last);
47 b.add_u64(l_throttle_val, "val", "Currently available throttle");
48 b.add_u64(l_throttle_max, "max", "Max value for throttle");
49 b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait");
50 b.add_u64_counter(l_throttle_get, "get", "Gets");
51 b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data");
52 b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail");
53 b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail");
54 b.add_u64_counter(l_throttle_take, "take", "Takes");
55 b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data");
56 b.add_u64_counter(l_throttle_put, "put", "Puts");
57 b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
58 b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
59
60 logger = b.create_perf_counters();
61 cct->get_perfcounters_collection()->add(logger);
31f18b77 62 logger->set(l_throttle_max, max);
7c673cae
FG
63 }
64}
65
66Throttle::~Throttle()
67{
c07f9fc5
FG
68 {
69 Mutex::Locker l(lock);
70 assert(cond.empty());
7c673cae
FG
71 }
72
73 if (!use_perf)
74 return;
75
76 if (logger) {
77 cct->get_perfcounters_collection()->remove(logger);
78 delete logger;
79 }
80}
81
82void Throttle::_reset_max(int64_t m)
83{
84 assert(lock.is_locked());
31f18b77 85 if (static_cast<int64_t>(max) == m)
7c673cae
FG
86 return;
87 if (!cond.empty())
88 cond.front()->SignalOne();
89 if (logger)
90 logger->set(l_throttle_max, m);
31f18b77 91 max = m;
7c673cae
FG
92}
93
94bool Throttle::_wait(int64_t c)
95{
96 utime_t start;
97 bool waited = false;
98 if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
c07f9fc5
FG
99 {
100 auto cv = cond.insert(cond.end(), new Cond);
101 auto w = make_scope_guard([this, cv]() {
102 delete *cv;
103 cond.erase(cv);
104 });
105 waited = true;
106 ldout(cct, 2) << "_wait waiting..." << dendl;
107 if (logger)
108 start = ceph_clock_now();
109
110 do {
111 (*cv)->Wait(lock);
112 } while ((_should_wait(c) || cv != cond.begin()));
113
114 ldout(cct, 2) << "_wait finished waiting" << dendl;
115 if (logger) {
116 utime_t dur = ceph_clock_now() - start;
117 logger->tinc(l_throttle_wait, dur);
118 }
7c673cae 119 }
7c673cae
FG
120 // wake up the next guy
121 if (!cond.empty())
122 cond.front()->SignalOne();
123 }
124 return waited;
125}
126
127bool Throttle::wait(int64_t m)
128{
31f18b77 129 if (0 == max && 0 == m) {
7c673cae
FG
130 return false;
131 }
132
133 Mutex::Locker l(lock);
134 if (m) {
135 assert(m > 0);
136 _reset_max(m);
137 }
138 ldout(cct, 10) << "wait" << dendl;
139 return _wait(0);
140}
141
142int64_t Throttle::take(int64_t c)
143{
31f18b77 144 if (0 == max) {
7c673cae
FG
145 return 0;
146 }
147 assert(c >= 0);
148 ldout(cct, 10) << "take " << c << dendl;
149 {
150 Mutex::Locker l(lock);
31f18b77 151 count += c;
7c673cae
FG
152 }
153 if (logger) {
154 logger->inc(l_throttle_take);
155 logger->inc(l_throttle_take_sum, c);
31f18b77 156 logger->set(l_throttle_val, count);
7c673cae 157 }
31f18b77 158 return count;
7c673cae
FG
159}
160
161bool Throttle::get(int64_t c, int64_t m)
162{
31f18b77 163 if (0 == max && 0 == m) {
7c673cae
FG
164 return false;
165 }
166
167 assert(c >= 0);
31f18b77 168 ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
7c673cae
FG
169 if (logger) {
170 logger->inc(l_throttle_get_started);
171 }
172 bool waited = false;
173 {
174 Mutex::Locker l(lock);
175 if (m) {
176 assert(m > 0);
177 _reset_max(m);
178 }
179 waited = _wait(c);
31f18b77 180 count += c;
7c673cae
FG
181 }
182 if (logger) {
183 logger->inc(l_throttle_get);
184 logger->inc(l_throttle_get_sum, c);
31f18b77 185 logger->set(l_throttle_val, count);
7c673cae
FG
186 }
187 return waited;
188}
189
190/* Returns true if it successfully got the requested amount,
191 * or false if it would block.
192 */
193bool Throttle::get_or_fail(int64_t c)
194{
31f18b77 195 if (0 == max) {
7c673cae
FG
196 return true;
197 }
198
199 assert (c >= 0);
200 Mutex::Locker l(lock);
201 if (_should_wait(c) || !cond.empty()) {
202 ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
203 if (logger) {
204 logger->inc(l_throttle_get_or_fail_fail);
205 }
206 return false;
207 } else {
31f18b77
FG
208 ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
209 count += c;
7c673cae
FG
210 if (logger) {
211 logger->inc(l_throttle_get_or_fail_success);
212 logger->inc(l_throttle_get);
213 logger->inc(l_throttle_get_sum, c);
31f18b77 214 logger->set(l_throttle_val, count);
7c673cae
FG
215 }
216 return true;
217 }
218}
219
220int64_t Throttle::put(int64_t c)
221{
31f18b77 222 if (0 == max) {
7c673cae
FG
223 return 0;
224 }
225
226 assert(c >= 0);
31f18b77 227 ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
7c673cae
FG
228 Mutex::Locker l(lock);
229 if (c) {
230 if (!cond.empty())
231 cond.front()->SignalOne();
31f18b77
FG
232 assert(static_cast<int64_t>(count) >= c); // if count goes negative, we failed somewhere!
233 count -= c;
7c673cae
FG
234 if (logger) {
235 logger->inc(l_throttle_put);
236 logger->inc(l_throttle_put_sum, c);
31f18b77 237 logger->set(l_throttle_val, count);
7c673cae
FG
238 }
239 }
31f18b77 240 return count;
7c673cae
FG
241}
242
243void Throttle::reset()
244{
245 Mutex::Locker l(lock);
246 if (!cond.empty())
247 cond.front()->SignalOne();
31f18b77 248 count = 0;
7c673cae
FG
249 if (logger) {
250 logger->set(l_throttle_val, 0);
251 }
252}
253
254enum {
255 l_backoff_throttle_first = l_throttle_last + 1,
256 l_backoff_throttle_val,
257 l_backoff_throttle_max,
258 l_backoff_throttle_get,
259 l_backoff_throttle_get_sum,
260 l_backoff_throttle_take,
261 l_backoff_throttle_take_sum,
262 l_backoff_throttle_put,
263 l_backoff_throttle_put_sum,
264 l_backoff_throttle_wait,
265 l_backoff_throttle_last,
266};
267
268BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf)
269 : cct(cct), name(n), logger(NULL),
270 conds(expected_concurrency),///< [in] determines size of conds
271 use_perf(_use_perf)
272{
273 if (!use_perf)
274 return;
275
276 if (cct->_conf->throttler_perf_counter) {
277 PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last);
278 b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
279 b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
280 b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
281 b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data");
282 b.add_u64_counter(l_backoff_throttle_take, "take", "Takes");
283 b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data");
284 b.add_u64_counter(l_backoff_throttle_put, "put", "Puts");
285 b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
286 b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
287
288 logger = b.create_perf_counters();
289 cct->get_perfcounters_collection()->add(logger);
290 logger->set(l_backoff_throttle_max, max);
291 }
292}
293
294BackoffThrottle::~BackoffThrottle()
295{
c07f9fc5
FG
296 {
297 locker l(lock);
298 assert(waiters.empty());
299 }
300
7c673cae
FG
301 if (!use_perf)
302 return;
303
304 if (logger) {
305 cct->get_perfcounters_collection()->remove(logger);
306 delete logger;
307 }
308}
309
310bool BackoffThrottle::set_params(
311 double _low_threshhold,
312 double _high_threshhold,
313 double _expected_throughput,
314 double _high_multiple,
315 double _max_multiple,
316 uint64_t _throttle_max,
317 ostream *errstream)
318{
319 bool valid = true;
320 if (_low_threshhold > _high_threshhold) {
321 valid = false;
322 if (errstream) {
323 *errstream << "low_threshhold (" << _low_threshhold
324 << ") > high_threshhold (" << _high_threshhold
325 << ")" << std::endl;
326 }
327 }
328
329 if (_high_multiple > _max_multiple) {
330 valid = false;
331 if (errstream) {
332 *errstream << "_high_multiple (" << _high_multiple
333 << ") > _max_multiple (" << _max_multiple
334 << ")" << std::endl;
335 }
336 }
337
338 if (_low_threshhold > 1 || _low_threshhold < 0) {
339 valid = false;
340 if (errstream) {
341 *errstream << "invalid low_threshhold (" << _low_threshhold << ")"
342 << std::endl;
343 }
344 }
345
346 if (_high_threshhold > 1 || _high_threshhold < 0) {
347 valid = false;
348 if (errstream) {
349 *errstream << "invalid high_threshhold (" << _high_threshhold << ")"
350 << std::endl;
351 }
352 }
353
354 if (_max_multiple < 0) {
355 valid = false;
356 if (errstream) {
357 *errstream << "invalid _max_multiple ("
358 << _max_multiple << ")"
359 << std::endl;
360 }
361 }
362
363 if (_high_multiple < 0) {
364 valid = false;
365 if (errstream) {
366 *errstream << "invalid _high_multiple ("
367 << _high_multiple << ")"
368 << std::endl;
369 }
370 }
371
372 if (_expected_throughput < 0) {
373 valid = false;
374 if (errstream) {
375 *errstream << "invalid _expected_throughput("
376 << _expected_throughput << ")"
377 << std::endl;
378 }
379 }
380
381 if (!valid)
382 return false;
383
384 locker l(lock);
385 low_threshhold = _low_threshhold;
386 high_threshhold = _high_threshhold;
387 high_delay_per_count = _high_multiple / _expected_throughput;
388 max_delay_per_count = _max_multiple / _expected_throughput;
389 max = _throttle_max;
390
391 if (logger)
392 logger->set(l_backoff_throttle_max, max);
393
394 if (high_threshhold - low_threshhold > 0) {
395 s0 = high_delay_per_count / (high_threshhold - low_threshhold);
396 } else {
397 low_threshhold = high_threshhold;
398 s0 = 0;
399 }
400
401 if (1 - high_threshhold > 0) {
402 s1 = (max_delay_per_count - high_delay_per_count)
403 / (1 - high_threshhold);
404 } else {
405 high_threshhold = 1;
406 s1 = 0;
407 }
408
409 _kick_waiters();
410 return true;
411}
412
413std::chrono::duration<double> BackoffThrottle::_get_delay(uint64_t c) const
414{
415 if (max == 0)
416 return std::chrono::duration<double>(0);
417
418 double r = ((double)current) / ((double)max);
419 if (r < low_threshhold) {
420 return std::chrono::duration<double>(0);
421 } else if (r < high_threshhold) {
422 return c * std::chrono::duration<double>(
423 (r - low_threshhold) * s0);
424 } else {
425 return c * std::chrono::duration<double>(
426 high_delay_per_count + ((r - high_threshhold) * s1));
427 }
428}
429
430std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
431{
432 locker l(lock);
433 auto delay = _get_delay(c);
434
435 if (logger) {
436 logger->inc(l_backoff_throttle_get);
437 logger->inc(l_backoff_throttle_get_sum, c);
438 }
439
440 // fast path
441 if (delay == std::chrono::duration<double>(0) &&
442 waiters.empty() &&
443 ((max == 0) || (current == 0) || ((current + c) <= max))) {
444 current += c;
445
446 if (logger) {
447 logger->set(l_backoff_throttle_val, current);
448 }
449
450 return std::chrono::duration<double>(0);
451 }
452
453 auto ticket = _push_waiter();
454 utime_t wait_from = ceph_clock_now();
455 bool waited = false;
456
457 while (waiters.begin() != ticket) {
458 (*ticket)->wait(l);
459 waited = true;
460 }
461
462 auto start = std::chrono::system_clock::now();
463 delay = _get_delay(c);
464 while (true) {
465 if (!((max == 0) || (current == 0) || (current + c) <= max)) {
466 (*ticket)->wait(l);
467 waited = true;
468 } else if (delay > std::chrono::duration<double>(0)) {
469 (*ticket)->wait_for(l, delay);
470 waited = true;
471 } else {
472 break;
473 }
474 assert(ticket == waiters.begin());
475 delay = _get_delay(c) - (std::chrono::system_clock::now() - start);
476 }
477 waiters.pop_front();
478 _kick_waiters();
479
480 current += c;
481
482 if (logger) {
483 logger->set(l_backoff_throttle_val, current);
484 if (waited) {
485 logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from);
486 }
487 }
488
489 return std::chrono::system_clock::now() - start;
490}
491
492uint64_t BackoffThrottle::put(uint64_t c)
493{
494 locker l(lock);
495 assert(current >= c);
496 current -= c;
497 _kick_waiters();
498
499 if (logger) {
500 logger->inc(l_backoff_throttle_put);
501 logger->inc(l_backoff_throttle_put_sum, c);
502 logger->set(l_backoff_throttle_val, current);
503 }
504
505 return current;
506}
507
508uint64_t BackoffThrottle::take(uint64_t c)
509{
510 locker l(lock);
511 current += c;
512
513 if (logger) {
514 logger->inc(l_backoff_throttle_take);
515 logger->inc(l_backoff_throttle_take_sum, c);
516 logger->set(l_backoff_throttle_val, current);
517 }
518
519 return current;
520}
521
522uint64_t BackoffThrottle::get_current()
523{
524 locker l(lock);
525 return current;
526}
527
528uint64_t BackoffThrottle::get_max()
529{
530 locker l(lock);
531 return max;
532}
533
534SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
535 : m_lock("SimpleThrottle"),
536 m_max(max),
537 m_current(0),
538 m_ret(0),
539 m_ignore_enoent(ignore_enoent)
540{
541}
542
543SimpleThrottle::~SimpleThrottle()
544{
545 Mutex::Locker l(m_lock);
546 assert(m_current == 0);
c07f9fc5 547 assert(waiters == 0);
7c673cae
FG
548}
549
550void SimpleThrottle::start_op()
551{
552 Mutex::Locker l(m_lock);
c07f9fc5
FG
553 while (m_max == m_current) {
554 waiters++;
7c673cae 555 m_cond.Wait(m_lock);
c07f9fc5
FG
556 waiters--;
557 }
7c673cae
FG
558 ++m_current;
559}
560
561void SimpleThrottle::end_op(int r)
562{
563 Mutex::Locker l(m_lock);
564 --m_current;
565 if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
566 m_ret = r;
567 m_cond.Signal();
568}
569
570bool SimpleThrottle::pending_error() const
571{
572 Mutex::Locker l(m_lock);
573 return (m_ret < 0);
574}
575
576int SimpleThrottle::wait_for_ret()
577{
578 Mutex::Locker l(m_lock);
c07f9fc5
FG
579 while (m_current > 0) {
580 waiters++;
7c673cae 581 m_cond.Wait(m_lock);
c07f9fc5
FG
582 waiters--;
583 }
7c673cae
FG
584 return m_ret;
585}
586
587void C_OrderedThrottle::finish(int r) {
588 m_ordered_throttle->finish_op(m_tid, r);
589}
590
591OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
592 : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
593 m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
594}
595
c07f9fc5
FG
596OrderedThrottle::~OrderedThrottle() {
597 Mutex::Locker locker(m_lock);
598 assert(waiters == 0);
599}
600
7c673cae
FG
601C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
602 assert(on_finish != NULL);
603
604 Mutex::Locker locker(m_lock);
605 uint64_t tid = m_next_tid++;
606 m_tid_result[tid] = Result(on_finish);
607 C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
608
609 complete_pending_ops();
610 while (m_max == m_current) {
c07f9fc5 611 ++waiters;
7c673cae 612 m_cond.Wait(m_lock);
c07f9fc5 613 --waiters;
7c673cae
FG
614 complete_pending_ops();
615 }
616 ++m_current;
617
618 return ctx;
619}
620
621void OrderedThrottle::end_op(int r) {
622 Mutex::Locker locker(m_lock);
623 assert(m_current > 0);
624
625 if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
626 m_ret_val = r;
627 }
628 --m_current;
629 m_cond.Signal();
630}
631
632void OrderedThrottle::finish_op(uint64_t tid, int r) {
633 Mutex::Locker locker(m_lock);
634
635 TidResult::iterator it = m_tid_result.find(tid);
636 assert(it != m_tid_result.end());
637
638 it->second.finished = true;
639 it->second.ret_val = r;
640 m_cond.Signal();
641}
642
643bool OrderedThrottle::pending_error() const {
644 Mutex::Locker locker(m_lock);
645 return (m_ret_val < 0);
646}
647
648int OrderedThrottle::wait_for_ret() {
649 Mutex::Locker locker(m_lock);
650 complete_pending_ops();
651
652 while (m_current > 0) {
c07f9fc5 653 ++waiters;
7c673cae 654 m_cond.Wait(m_lock);
c07f9fc5 655 --waiters;
7c673cae
FG
656 complete_pending_ops();
657 }
658 return m_ret_val;
659}
660
661void OrderedThrottle::complete_pending_ops() {
662 assert(m_lock.is_locked());
663
664 while (true) {
665 TidResult::iterator it = m_tid_result.begin();
666 if (it == m_tid_result.end() || it->first != m_complete_tid ||
667 !it->second.finished) {
668 break;
669 }
670
671 Result result = it->second;
672 m_tid_result.erase(it);
673
674 m_lock.Unlock();
675 result.on_finish->complete(result.ret_val);
676 m_lock.Lock();
677
678 ++m_complete_tid;
679 }
680}