]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/Throttle.cc
update sources to v12.2.3
[ceph.git] / ceph / src / common / Throttle.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
c07f9fc5
FG
4#include "include/scope_guard.h"
5
7c673cae 6#include "common/Throttle.h"
7c673cae
FG
7#include "common/perf_counters.h"
8
31f18b77
FG
9// re-include our assert to clobber the system one; fix dout:
10#include "include/assert.h"
11
7c673cae
FG
12#define dout_subsys ceph_subsys_throttle
13
14#undef dout_prefix
15#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
16
17enum {
18 l_throttle_first = 532430,
19 l_throttle_val,
20 l_throttle_max,
21 l_throttle_get_started,
22 l_throttle_get,
23 l_throttle_get_sum,
24 l_throttle_get_or_fail_fail,
25 l_throttle_get_or_fail_success,
26 l_throttle_take,
27 l_throttle_take_sum,
28 l_throttle_put,
29 l_throttle_put_sum,
30 l_throttle_wait,
31 l_throttle_last,
32};
33
34Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf)
35 : cct(cct), name(n), logger(NULL),
36 max(m),
37 lock("Throttle::lock"),
38 use_perf(_use_perf)
39{
40 assert(m >= 0);
41
42 if (!use_perf)
43 return;
44
45 if (cct->_conf->throttler_perf_counter) {
46 PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last);
47 b.add_u64(l_throttle_val, "val", "Currently available throttle");
48 b.add_u64(l_throttle_max, "max", "Max value for throttle");
49 b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait");
50 b.add_u64_counter(l_throttle_get, "get", "Gets");
51 b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data");
52 b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail");
53 b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail");
54 b.add_u64_counter(l_throttle_take, "take", "Takes");
55 b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data");
56 b.add_u64_counter(l_throttle_put, "put", "Puts");
57 b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
58 b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
59
60 logger = b.create_perf_counters();
61 cct->get_perfcounters_collection()->add(logger);
31f18b77 62 logger->set(l_throttle_max, max);
7c673cae
FG
63 }
64}
65
66Throttle::~Throttle()
67{
c07f9fc5
FG
68 {
69 Mutex::Locker l(lock);
70 assert(cond.empty());
7c673cae
FG
71 }
72
73 if (!use_perf)
74 return;
75
76 if (logger) {
77 cct->get_perfcounters_collection()->remove(logger);
78 delete logger;
79 }
80}
81
82void Throttle::_reset_max(int64_t m)
83{
b32b8144 84 // lock must be held.
7c673cae 85 assert(lock.is_locked());
b32b8144 86 if (max == m)
7c673cae
FG
87 return;
88 if (!cond.empty())
89 cond.front()->SignalOne();
90 if (logger)
91 logger->set(l_throttle_max, m);
31f18b77 92 max = m;
7c673cae
FG
93}
94
95bool Throttle::_wait(int64_t c)
96{
97 utime_t start;
98 bool waited = false;
99 if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
c07f9fc5
FG
100 {
101 auto cv = cond.insert(cond.end(), new Cond);
102 auto w = make_scope_guard([this, cv]() {
103 delete *cv;
104 cond.erase(cv);
105 });
106 waited = true;
107 ldout(cct, 2) << "_wait waiting..." << dendl;
108 if (logger)
109 start = ceph_clock_now();
110
111 do {
112 (*cv)->Wait(lock);
113 } while ((_should_wait(c) || cv != cond.begin()));
114
115 ldout(cct, 2) << "_wait finished waiting" << dendl;
116 if (logger) {
117 utime_t dur = ceph_clock_now() - start;
118 logger->tinc(l_throttle_wait, dur);
119 }
7c673cae 120 }
7c673cae
FG
121 // wake up the next guy
122 if (!cond.empty())
123 cond.front()->SignalOne();
124 }
125 return waited;
126}
127
128bool Throttle::wait(int64_t m)
129{
31f18b77 130 if (0 == max && 0 == m) {
7c673cae
FG
131 return false;
132 }
133
134 Mutex::Locker l(lock);
135 if (m) {
136 assert(m > 0);
137 _reset_max(m);
138 }
139 ldout(cct, 10) << "wait" << dendl;
140 return _wait(0);
141}
142
143int64_t Throttle::take(int64_t c)
144{
31f18b77 145 if (0 == max) {
7c673cae
FG
146 return 0;
147 }
148 assert(c >= 0);
149 ldout(cct, 10) << "take " << c << dendl;
150 {
151 Mutex::Locker l(lock);
31f18b77 152 count += c;
7c673cae
FG
153 }
154 if (logger) {
155 logger->inc(l_throttle_take);
156 logger->inc(l_throttle_take_sum, c);
31f18b77 157 logger->set(l_throttle_val, count);
7c673cae 158 }
31f18b77 159 return count;
7c673cae
FG
160}
161
162bool Throttle::get(int64_t c, int64_t m)
163{
31f18b77 164 if (0 == max && 0 == m) {
7c673cae
FG
165 return false;
166 }
167
168 assert(c >= 0);
31f18b77 169 ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
7c673cae
FG
170 if (logger) {
171 logger->inc(l_throttle_get_started);
172 }
173 bool waited = false;
174 {
175 Mutex::Locker l(lock);
176 if (m) {
177 assert(m > 0);
178 _reset_max(m);
179 }
180 waited = _wait(c);
31f18b77 181 count += c;
7c673cae
FG
182 }
183 if (logger) {
184 logger->inc(l_throttle_get);
185 logger->inc(l_throttle_get_sum, c);
31f18b77 186 logger->set(l_throttle_val, count);
7c673cae
FG
187 }
188 return waited;
189}
190
191/* Returns true if it successfully got the requested amount,
192 * or false if it would block.
193 */
194bool Throttle::get_or_fail(int64_t c)
195{
31f18b77 196 if (0 == max) {
7c673cae
FG
197 return true;
198 }
199
200 assert (c >= 0);
201 Mutex::Locker l(lock);
202 if (_should_wait(c) || !cond.empty()) {
203 ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
204 if (logger) {
205 logger->inc(l_throttle_get_or_fail_fail);
206 }
207 return false;
208 } else {
31f18b77
FG
209 ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() << " -> " << (count.load() + c) << ")" << dendl;
210 count += c;
7c673cae
FG
211 if (logger) {
212 logger->inc(l_throttle_get_or_fail_success);
213 logger->inc(l_throttle_get);
214 logger->inc(l_throttle_get_sum, c);
31f18b77 215 logger->set(l_throttle_val, count);
7c673cae
FG
216 }
217 return true;
218 }
219}
220
221int64_t Throttle::put(int64_t c)
222{
31f18b77 223 if (0 == max) {
7c673cae
FG
224 return 0;
225 }
226
227 assert(c >= 0);
31f18b77 228 ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " << (count.load()-c) << ")" << dendl;
7c673cae
FG
229 Mutex::Locker l(lock);
230 if (c) {
231 if (!cond.empty())
232 cond.front()->SignalOne();
b32b8144
FG
233 // if count goes negative, we failed somewhere!
234 assert(count >= c);
31f18b77 235 count -= c;
7c673cae
FG
236 if (logger) {
237 logger->inc(l_throttle_put);
238 logger->inc(l_throttle_put_sum, c);
31f18b77 239 logger->set(l_throttle_val, count);
7c673cae
FG
240 }
241 }
31f18b77 242 return count;
7c673cae
FG
243}
244
245void Throttle::reset()
246{
247 Mutex::Locker l(lock);
248 if (!cond.empty())
249 cond.front()->SignalOne();
31f18b77 250 count = 0;
7c673cae
FG
251 if (logger) {
252 logger->set(l_throttle_val, 0);
253 }
254}
255
256enum {
257 l_backoff_throttle_first = l_throttle_last + 1,
258 l_backoff_throttle_val,
259 l_backoff_throttle_max,
260 l_backoff_throttle_get,
261 l_backoff_throttle_get_sum,
262 l_backoff_throttle_take,
263 l_backoff_throttle_take_sum,
264 l_backoff_throttle_put,
265 l_backoff_throttle_put_sum,
266 l_backoff_throttle_wait,
267 l_backoff_throttle_last,
268};
269
270BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf)
271 : cct(cct), name(n), logger(NULL),
272 conds(expected_concurrency),///< [in] determines size of conds
273 use_perf(_use_perf)
274{
275 if (!use_perf)
276 return;
277
278 if (cct->_conf->throttler_perf_counter) {
279 PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last);
280 b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
281 b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
282 b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
283 b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data");
284 b.add_u64_counter(l_backoff_throttle_take, "take", "Takes");
285 b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data");
286 b.add_u64_counter(l_backoff_throttle_put, "put", "Puts");
287 b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
288 b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
289
290 logger = b.create_perf_counters();
291 cct->get_perfcounters_collection()->add(logger);
292 logger->set(l_backoff_throttle_max, max);
293 }
294}
295
296BackoffThrottle::~BackoffThrottle()
297{
c07f9fc5
FG
298 {
299 locker l(lock);
300 assert(waiters.empty());
301 }
302
7c673cae
FG
303 if (!use_perf)
304 return;
305
306 if (logger) {
307 cct->get_perfcounters_collection()->remove(logger);
308 delete logger;
309 }
310}
311
312bool BackoffThrottle::set_params(
313 double _low_threshhold,
314 double _high_threshhold,
315 double _expected_throughput,
316 double _high_multiple,
317 double _max_multiple,
318 uint64_t _throttle_max,
319 ostream *errstream)
320{
321 bool valid = true;
322 if (_low_threshhold > _high_threshhold) {
323 valid = false;
324 if (errstream) {
325 *errstream << "low_threshhold (" << _low_threshhold
326 << ") > high_threshhold (" << _high_threshhold
327 << ")" << std::endl;
328 }
329 }
330
331 if (_high_multiple > _max_multiple) {
332 valid = false;
333 if (errstream) {
334 *errstream << "_high_multiple (" << _high_multiple
335 << ") > _max_multiple (" << _max_multiple
336 << ")" << std::endl;
337 }
338 }
339
340 if (_low_threshhold > 1 || _low_threshhold < 0) {
341 valid = false;
342 if (errstream) {
343 *errstream << "invalid low_threshhold (" << _low_threshhold << ")"
344 << std::endl;
345 }
346 }
347
348 if (_high_threshhold > 1 || _high_threshhold < 0) {
349 valid = false;
350 if (errstream) {
351 *errstream << "invalid high_threshhold (" << _high_threshhold << ")"
352 << std::endl;
353 }
354 }
355
356 if (_max_multiple < 0) {
357 valid = false;
358 if (errstream) {
359 *errstream << "invalid _max_multiple ("
360 << _max_multiple << ")"
361 << std::endl;
362 }
363 }
364
365 if (_high_multiple < 0) {
366 valid = false;
367 if (errstream) {
368 *errstream << "invalid _high_multiple ("
369 << _high_multiple << ")"
370 << std::endl;
371 }
372 }
373
374 if (_expected_throughput < 0) {
375 valid = false;
376 if (errstream) {
377 *errstream << "invalid _expected_throughput("
378 << _expected_throughput << ")"
379 << std::endl;
380 }
381 }
382
383 if (!valid)
384 return false;
385
386 locker l(lock);
387 low_threshhold = _low_threshhold;
388 high_threshhold = _high_threshhold;
389 high_delay_per_count = _high_multiple / _expected_throughput;
390 max_delay_per_count = _max_multiple / _expected_throughput;
391 max = _throttle_max;
392
393 if (logger)
394 logger->set(l_backoff_throttle_max, max);
395
396 if (high_threshhold - low_threshhold > 0) {
397 s0 = high_delay_per_count / (high_threshhold - low_threshhold);
398 } else {
399 low_threshhold = high_threshhold;
400 s0 = 0;
401 }
402
403 if (1 - high_threshhold > 0) {
404 s1 = (max_delay_per_count - high_delay_per_count)
405 / (1 - high_threshhold);
406 } else {
407 high_threshhold = 1;
408 s1 = 0;
409 }
410
411 _kick_waiters();
412 return true;
413}
414
415std::chrono::duration<double> BackoffThrottle::_get_delay(uint64_t c) const
416{
417 if (max == 0)
418 return std::chrono::duration<double>(0);
419
420 double r = ((double)current) / ((double)max);
421 if (r < low_threshhold) {
422 return std::chrono::duration<double>(0);
423 } else if (r < high_threshhold) {
424 return c * std::chrono::duration<double>(
425 (r - low_threshhold) * s0);
426 } else {
427 return c * std::chrono::duration<double>(
428 high_delay_per_count + ((r - high_threshhold) * s1));
429 }
430}
431
432std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
433{
434 locker l(lock);
435 auto delay = _get_delay(c);
436
437 if (logger) {
438 logger->inc(l_backoff_throttle_get);
439 logger->inc(l_backoff_throttle_get_sum, c);
440 }
441
442 // fast path
443 if (delay == std::chrono::duration<double>(0) &&
444 waiters.empty() &&
445 ((max == 0) || (current == 0) || ((current + c) <= max))) {
446 current += c;
447
448 if (logger) {
449 logger->set(l_backoff_throttle_val, current);
450 }
451
452 return std::chrono::duration<double>(0);
453 }
454
455 auto ticket = _push_waiter();
456 utime_t wait_from = ceph_clock_now();
457 bool waited = false;
458
459 while (waiters.begin() != ticket) {
460 (*ticket)->wait(l);
461 waited = true;
462 }
463
464 auto start = std::chrono::system_clock::now();
465 delay = _get_delay(c);
466 while (true) {
467 if (!((max == 0) || (current == 0) || (current + c) <= max)) {
468 (*ticket)->wait(l);
469 waited = true;
470 } else if (delay > std::chrono::duration<double>(0)) {
471 (*ticket)->wait_for(l, delay);
472 waited = true;
473 } else {
474 break;
475 }
476 assert(ticket == waiters.begin());
477 delay = _get_delay(c) - (std::chrono::system_clock::now() - start);
478 }
479 waiters.pop_front();
480 _kick_waiters();
481
482 current += c;
483
484 if (logger) {
485 logger->set(l_backoff_throttle_val, current);
486 if (waited) {
487 logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from);
488 }
489 }
490
491 return std::chrono::system_clock::now() - start;
492}
493
494uint64_t BackoffThrottle::put(uint64_t c)
495{
496 locker l(lock);
497 assert(current >= c);
498 current -= c;
499 _kick_waiters();
500
501 if (logger) {
502 logger->inc(l_backoff_throttle_put);
503 logger->inc(l_backoff_throttle_put_sum, c);
504 logger->set(l_backoff_throttle_val, current);
505 }
506
507 return current;
508}
509
510uint64_t BackoffThrottle::take(uint64_t c)
511{
512 locker l(lock);
513 current += c;
514
515 if (logger) {
516 logger->inc(l_backoff_throttle_take);
517 logger->inc(l_backoff_throttle_take_sum, c);
518 logger->set(l_backoff_throttle_val, current);
519 }
520
521 return current;
522}
523
524uint64_t BackoffThrottle::get_current()
525{
526 locker l(lock);
527 return current;
528}
529
530uint64_t BackoffThrottle::get_max()
531{
532 locker l(lock);
533 return max;
534}
535
536SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
537 : m_lock("SimpleThrottle"),
538 m_max(max),
539 m_current(0),
540 m_ret(0),
541 m_ignore_enoent(ignore_enoent)
542{
543}
544
545SimpleThrottle::~SimpleThrottle()
546{
547 Mutex::Locker l(m_lock);
548 assert(m_current == 0);
c07f9fc5 549 assert(waiters == 0);
7c673cae
FG
550}
551
552void SimpleThrottle::start_op()
553{
554 Mutex::Locker l(m_lock);
c07f9fc5
FG
555 while (m_max == m_current) {
556 waiters++;
7c673cae 557 m_cond.Wait(m_lock);
c07f9fc5
FG
558 waiters--;
559 }
7c673cae
FG
560 ++m_current;
561}
562
563void SimpleThrottle::end_op(int r)
564{
565 Mutex::Locker l(m_lock);
566 --m_current;
567 if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
568 m_ret = r;
569 m_cond.Signal();
570}
571
572bool SimpleThrottle::pending_error() const
573{
574 Mutex::Locker l(m_lock);
575 return (m_ret < 0);
576}
577
578int SimpleThrottle::wait_for_ret()
579{
580 Mutex::Locker l(m_lock);
c07f9fc5
FG
581 while (m_current > 0) {
582 waiters++;
7c673cae 583 m_cond.Wait(m_lock);
c07f9fc5
FG
584 waiters--;
585 }
7c673cae
FG
586 return m_ret;
587}
588
589void C_OrderedThrottle::finish(int r) {
590 m_ordered_throttle->finish_op(m_tid, r);
591}
592
593OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
594 : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
595 m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
596}
597
c07f9fc5
FG
598OrderedThrottle::~OrderedThrottle() {
599 Mutex::Locker locker(m_lock);
600 assert(waiters == 0);
601}
602
7c673cae
FG
603C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
604 assert(on_finish != NULL);
605
606 Mutex::Locker locker(m_lock);
607 uint64_t tid = m_next_tid++;
608 m_tid_result[tid] = Result(on_finish);
609 C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
610
611 complete_pending_ops();
612 while (m_max == m_current) {
c07f9fc5 613 ++waiters;
7c673cae 614 m_cond.Wait(m_lock);
c07f9fc5 615 --waiters;
7c673cae
FG
616 complete_pending_ops();
617 }
618 ++m_current;
619
620 return ctx;
621}
622
623void OrderedThrottle::end_op(int r) {
624 Mutex::Locker locker(m_lock);
625 assert(m_current > 0);
626
627 if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
628 m_ret_val = r;
629 }
630 --m_current;
631 m_cond.Signal();
632}
633
634void OrderedThrottle::finish_op(uint64_t tid, int r) {
635 Mutex::Locker locker(m_lock);
636
637 TidResult::iterator it = m_tid_result.find(tid);
638 assert(it != m_tid_result.end());
639
640 it->second.finished = true;
641 it->second.ret_val = r;
642 m_cond.Signal();
643}
644
645bool OrderedThrottle::pending_error() const {
646 Mutex::Locker locker(m_lock);
647 return (m_ret_val < 0);
648}
649
650int OrderedThrottle::wait_for_ret() {
651 Mutex::Locker locker(m_lock);
652 complete_pending_ops();
653
654 while (m_current > 0) {
c07f9fc5 655 ++waiters;
7c673cae 656 m_cond.Wait(m_lock);
c07f9fc5 657 --waiters;
7c673cae
FG
658 complete_pending_ops();
659 }
660 return m_ret_val;
661}
662
663void OrderedThrottle::complete_pending_ops() {
664 assert(m_lock.is_locked());
665
666 while (true) {
667 TidResult::iterator it = m_tid_result.begin();
668 if (it == m_tid_result.end() || it->first != m_complete_tid ||
669 !it->second.finished) {
670 break;
671 }
672
673 Result result = it->second;
674 m_tid_result.erase(it);
675
676 m_lock.Unlock();
677 result.on_finish->complete(result.ret_val);
678 m_lock.Lock();
679
680 ++m_complete_tid;
681 }
682}