]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/Throttle.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / Throttle.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5 #include <thread>
6
7 #include "common/Throttle.h"
8 #include "common/dout.h"
9 #include "common/ceph_context.h"
10 #include "common/perf_counters.h"
11
12 #define dout_subsys ceph_subsys_throttle
13
14 #undef dout_prefix
15 #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") "
16
17 enum {
18 l_throttle_first = 532430,
19 l_throttle_val,
20 l_throttle_max,
21 l_throttle_get_started,
22 l_throttle_get,
23 l_throttle_get_sum,
24 l_throttle_get_or_fail_fail,
25 l_throttle_get_or_fail_success,
26 l_throttle_take,
27 l_throttle_take_sum,
28 l_throttle_put,
29 l_throttle_put_sum,
30 l_throttle_wait,
31 l_throttle_last,
32 };
33
34 Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, bool _use_perf)
35 : cct(cct), name(n), logger(NULL),
36 max(m),
37 lock("Throttle::lock"),
38 use_perf(_use_perf)
39 {
40 assert(m >= 0);
41
42 if (!use_perf)
43 return;
44
45 if (cct->_conf->throttler_perf_counter) {
46 PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last);
47 b.add_u64(l_throttle_val, "val", "Currently available throttle");
48 b.add_u64(l_throttle_max, "max", "Max value for throttle");
49 b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait");
50 b.add_u64_counter(l_throttle_get, "get", "Gets");
51 b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data");
52 b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail");
53 b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail");
54 b.add_u64_counter(l_throttle_take, "take", "Takes");
55 b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data");
56 b.add_u64_counter(l_throttle_put, "put", "Puts");
57 b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data");
58 b.add_time_avg(l_throttle_wait, "wait", "Waiting latency");
59
60 logger = b.create_perf_counters();
61 cct->get_perfcounters_collection()->add(logger);
62 logger->set(l_throttle_max, max.read());
63 }
64 }
65
66 Throttle::~Throttle()
67 {
68 while (!cond.empty()) {
69 Cond *cv = cond.front();
70 delete cv;
71 cond.pop_front();
72 }
73
74 if (!use_perf)
75 return;
76
77 if (logger) {
78 cct->get_perfcounters_collection()->remove(logger);
79 delete logger;
80 }
81 }
82
83 void Throttle::_reset_max(int64_t m)
84 {
85 assert(lock.is_locked());
86 if ((int64_t)max.read() == m)
87 return;
88 if (!cond.empty())
89 cond.front()->SignalOne();
90 if (logger)
91 logger->set(l_throttle_max, m);
92 max.set((size_t)m);
93 }
94
95 bool Throttle::_wait(int64_t c)
96 {
97 utime_t start;
98 bool waited = false;
99 if (_should_wait(c) || !cond.empty()) { // always wait behind other waiters.
100 Cond *cv = new Cond;
101 cond.push_back(cv);
102 waited = true;
103 ldout(cct, 2) << "_wait waiting..." << dendl;
104 if (logger)
105 start = ceph_clock_now();
106
107 do {
108 cv->Wait(lock);
109 } while (_should_wait(c) || cv != cond.front());
110
111 ldout(cct, 2) << "_wait finished waiting" << dendl;
112 if (logger) {
113 utime_t dur = ceph_clock_now() - start;
114 logger->tinc(l_throttle_wait, dur);
115 }
116
117 delete cv;
118 cond.pop_front();
119
120 // wake up the next guy
121 if (!cond.empty())
122 cond.front()->SignalOne();
123 }
124 return waited;
125 }
126
127 bool Throttle::wait(int64_t m)
128 {
129 if (0 == max.read() && 0 == m) {
130 return false;
131 }
132
133 Mutex::Locker l(lock);
134 if (m) {
135 assert(m > 0);
136 _reset_max(m);
137 }
138 ldout(cct, 10) << "wait" << dendl;
139 return _wait(0);
140 }
141
142 int64_t Throttle::take(int64_t c)
143 {
144 if (0 == max.read()) {
145 return 0;
146 }
147 assert(c >= 0);
148 ldout(cct, 10) << "take " << c << dendl;
149 {
150 Mutex::Locker l(lock);
151 count.add(c);
152 }
153 if (logger) {
154 logger->inc(l_throttle_take);
155 logger->inc(l_throttle_take_sum, c);
156 logger->set(l_throttle_val, count.read());
157 }
158 return count.read();
159 }
160
161 bool Throttle::get(int64_t c, int64_t m)
162 {
163 if (0 == max.read() && 0 == m) {
164 return false;
165 }
166
167 assert(c >= 0);
168 ldout(cct, 10) << "get " << c << " (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
169 if (logger) {
170 logger->inc(l_throttle_get_started);
171 }
172 bool waited = false;
173 {
174 Mutex::Locker l(lock);
175 if (m) {
176 assert(m > 0);
177 _reset_max(m);
178 }
179 waited = _wait(c);
180 count.add(c);
181 }
182 if (logger) {
183 logger->inc(l_throttle_get);
184 logger->inc(l_throttle_get_sum, c);
185 logger->set(l_throttle_val, count.read());
186 }
187 return waited;
188 }
189
190 /* Returns true if it successfully got the requested amount,
191 * or false if it would block.
192 */
193 bool Throttle::get_or_fail(int64_t c)
194 {
195 if (0 == max.read()) {
196 return true;
197 }
198
199 assert (c >= 0);
200 Mutex::Locker l(lock);
201 if (_should_wait(c) || !cond.empty()) {
202 ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl;
203 if (logger) {
204 logger->inc(l_throttle_get_or_fail_fail);
205 }
206 return false;
207 } else {
208 ldout(cct, 10) << "get_or_fail " << c << " success (" << count.read() << " -> " << (count.read() + c) << ")" << dendl;
209 count.add(c);
210 if (logger) {
211 logger->inc(l_throttle_get_or_fail_success);
212 logger->inc(l_throttle_get);
213 logger->inc(l_throttle_get_sum, c);
214 logger->set(l_throttle_val, count.read());
215 }
216 return true;
217 }
218 }
219
220 int64_t Throttle::put(int64_t c)
221 {
222 if (0 == max.read()) {
223 return 0;
224 }
225
226 assert(c >= 0);
227 ldout(cct, 10) << "put " << c << " (" << count.read() << " -> " << (count.read()-c) << ")" << dendl;
228 Mutex::Locker l(lock);
229 if (c) {
230 if (!cond.empty())
231 cond.front()->SignalOne();
232 assert(((int64_t)count.read()) >= c); //if count goes negative, we failed somewhere!
233 count.sub(c);
234 if (logger) {
235 logger->inc(l_throttle_put);
236 logger->inc(l_throttle_put_sum, c);
237 logger->set(l_throttle_val, count.read());
238 }
239 }
240 return count.read();
241 }
242
243 void Throttle::reset()
244 {
245 Mutex::Locker l(lock);
246 if (!cond.empty())
247 cond.front()->SignalOne();
248 count.set(0);
249 if (logger) {
250 logger->set(l_throttle_val, 0);
251 }
252 }
253
254 enum {
255 l_backoff_throttle_first = l_throttle_last + 1,
256 l_backoff_throttle_val,
257 l_backoff_throttle_max,
258 l_backoff_throttle_get,
259 l_backoff_throttle_get_sum,
260 l_backoff_throttle_take,
261 l_backoff_throttle_take_sum,
262 l_backoff_throttle_put,
263 l_backoff_throttle_put_sum,
264 l_backoff_throttle_wait,
265 l_backoff_throttle_last,
266 };
267
268 BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, unsigned expected_concurrency, bool _use_perf)
269 : cct(cct), name(n), logger(NULL),
270 conds(expected_concurrency),///< [in] determines size of conds
271 use_perf(_use_perf)
272 {
273 if (!use_perf)
274 return;
275
276 if (cct->_conf->throttler_perf_counter) {
277 PerfCountersBuilder b(cct, string("throttle-") + name, l_backoff_throttle_first, l_backoff_throttle_last);
278 b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle");
279 b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle");
280 b.add_u64_counter(l_backoff_throttle_get, "get", "Gets");
281 b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data");
282 b.add_u64_counter(l_backoff_throttle_take, "take", "Takes");
283 b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data");
284 b.add_u64_counter(l_backoff_throttle_put, "put", "Puts");
285 b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data");
286 b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency");
287
288 logger = b.create_perf_counters();
289 cct->get_perfcounters_collection()->add(logger);
290 logger->set(l_backoff_throttle_max, max);
291 }
292 }
293
294 BackoffThrottle::~BackoffThrottle()
295 {
296 if (!use_perf)
297 return;
298
299 if (logger) {
300 cct->get_perfcounters_collection()->remove(logger);
301 delete logger;
302 }
303 }
304
305 bool BackoffThrottle::set_params(
306 double _low_threshhold,
307 double _high_threshhold,
308 double _expected_throughput,
309 double _high_multiple,
310 double _max_multiple,
311 uint64_t _throttle_max,
312 ostream *errstream)
313 {
314 bool valid = true;
315 if (_low_threshhold > _high_threshhold) {
316 valid = false;
317 if (errstream) {
318 *errstream << "low_threshhold (" << _low_threshhold
319 << ") > high_threshhold (" << _high_threshhold
320 << ")" << std::endl;
321 }
322 }
323
324 if (_high_multiple > _max_multiple) {
325 valid = false;
326 if (errstream) {
327 *errstream << "_high_multiple (" << _high_multiple
328 << ") > _max_multiple (" << _max_multiple
329 << ")" << std::endl;
330 }
331 }
332
333 if (_low_threshhold > 1 || _low_threshhold < 0) {
334 valid = false;
335 if (errstream) {
336 *errstream << "invalid low_threshhold (" << _low_threshhold << ")"
337 << std::endl;
338 }
339 }
340
341 if (_high_threshhold > 1 || _high_threshhold < 0) {
342 valid = false;
343 if (errstream) {
344 *errstream << "invalid high_threshhold (" << _high_threshhold << ")"
345 << std::endl;
346 }
347 }
348
349 if (_max_multiple < 0) {
350 valid = false;
351 if (errstream) {
352 *errstream << "invalid _max_multiple ("
353 << _max_multiple << ")"
354 << std::endl;
355 }
356 }
357
358 if (_high_multiple < 0) {
359 valid = false;
360 if (errstream) {
361 *errstream << "invalid _high_multiple ("
362 << _high_multiple << ")"
363 << std::endl;
364 }
365 }
366
367 if (_expected_throughput < 0) {
368 valid = false;
369 if (errstream) {
370 *errstream << "invalid _expected_throughput("
371 << _expected_throughput << ")"
372 << std::endl;
373 }
374 }
375
376 if (!valid)
377 return false;
378
379 locker l(lock);
380 low_threshhold = _low_threshhold;
381 high_threshhold = _high_threshhold;
382 high_delay_per_count = _high_multiple / _expected_throughput;
383 max_delay_per_count = _max_multiple / _expected_throughput;
384 max = _throttle_max;
385
386 if (logger)
387 logger->set(l_backoff_throttle_max, max);
388
389 if (high_threshhold - low_threshhold > 0) {
390 s0 = high_delay_per_count / (high_threshhold - low_threshhold);
391 } else {
392 low_threshhold = high_threshhold;
393 s0 = 0;
394 }
395
396 if (1 - high_threshhold > 0) {
397 s1 = (max_delay_per_count - high_delay_per_count)
398 / (1 - high_threshhold);
399 } else {
400 high_threshhold = 1;
401 s1 = 0;
402 }
403
404 _kick_waiters();
405 return true;
406 }
407
408 std::chrono::duration<double> BackoffThrottle::_get_delay(uint64_t c) const
409 {
410 if (max == 0)
411 return std::chrono::duration<double>(0);
412
413 double r = ((double)current) / ((double)max);
414 if (r < low_threshhold) {
415 return std::chrono::duration<double>(0);
416 } else if (r < high_threshhold) {
417 return c * std::chrono::duration<double>(
418 (r - low_threshhold) * s0);
419 } else {
420 return c * std::chrono::duration<double>(
421 high_delay_per_count + ((r - high_threshhold) * s1));
422 }
423 }
424
425 std::chrono::duration<double> BackoffThrottle::get(uint64_t c)
426 {
427 locker l(lock);
428 auto delay = _get_delay(c);
429
430 if (logger) {
431 logger->inc(l_backoff_throttle_get);
432 logger->inc(l_backoff_throttle_get_sum, c);
433 }
434
435 // fast path
436 if (delay == std::chrono::duration<double>(0) &&
437 waiters.empty() &&
438 ((max == 0) || (current == 0) || ((current + c) <= max))) {
439 current += c;
440
441 if (logger) {
442 logger->set(l_backoff_throttle_val, current);
443 }
444
445 return std::chrono::duration<double>(0);
446 }
447
448 auto ticket = _push_waiter();
449 utime_t wait_from = ceph_clock_now();
450 bool waited = false;
451
452 while (waiters.begin() != ticket) {
453 (*ticket)->wait(l);
454 waited = true;
455 }
456
457 auto start = std::chrono::system_clock::now();
458 delay = _get_delay(c);
459 while (true) {
460 if (!((max == 0) || (current == 0) || (current + c) <= max)) {
461 (*ticket)->wait(l);
462 waited = true;
463 } else if (delay > std::chrono::duration<double>(0)) {
464 (*ticket)->wait_for(l, delay);
465 waited = true;
466 } else {
467 break;
468 }
469 assert(ticket == waiters.begin());
470 delay = _get_delay(c) - (std::chrono::system_clock::now() - start);
471 }
472 waiters.pop_front();
473 _kick_waiters();
474
475 current += c;
476
477 if (logger) {
478 logger->set(l_backoff_throttle_val, current);
479 if (waited) {
480 logger->tinc(l_backoff_throttle_wait, ceph_clock_now() - wait_from);
481 }
482 }
483
484 return std::chrono::system_clock::now() - start;
485 }
486
487 uint64_t BackoffThrottle::put(uint64_t c)
488 {
489 locker l(lock);
490 assert(current >= c);
491 current -= c;
492 _kick_waiters();
493
494 if (logger) {
495 logger->inc(l_backoff_throttle_put);
496 logger->inc(l_backoff_throttle_put_sum, c);
497 logger->set(l_backoff_throttle_val, current);
498 }
499
500 return current;
501 }
502
503 uint64_t BackoffThrottle::take(uint64_t c)
504 {
505 locker l(lock);
506 current += c;
507
508 if (logger) {
509 logger->inc(l_backoff_throttle_take);
510 logger->inc(l_backoff_throttle_take_sum, c);
511 logger->set(l_backoff_throttle_val, current);
512 }
513
514 return current;
515 }
516
517 uint64_t BackoffThrottle::get_current()
518 {
519 locker l(lock);
520 return current;
521 }
522
523 uint64_t BackoffThrottle::get_max()
524 {
525 locker l(lock);
526 return max;
527 }
528
529 SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent)
530 : m_lock("SimpleThrottle"),
531 m_max(max),
532 m_current(0),
533 m_ret(0),
534 m_ignore_enoent(ignore_enoent)
535 {
536 }
537
538 SimpleThrottle::~SimpleThrottle()
539 {
540 Mutex::Locker l(m_lock);
541 assert(m_current == 0);
542 }
543
544 void SimpleThrottle::start_op()
545 {
546 Mutex::Locker l(m_lock);
547 while (m_max == m_current)
548 m_cond.Wait(m_lock);
549 ++m_current;
550 }
551
552 void SimpleThrottle::end_op(int r)
553 {
554 Mutex::Locker l(m_lock);
555 --m_current;
556 if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent))
557 m_ret = r;
558 m_cond.Signal();
559 }
560
561 bool SimpleThrottle::pending_error() const
562 {
563 Mutex::Locker l(m_lock);
564 return (m_ret < 0);
565 }
566
567 int SimpleThrottle::wait_for_ret()
568 {
569 Mutex::Locker l(m_lock);
570 while (m_current > 0)
571 m_cond.Wait(m_lock);
572 return m_ret;
573 }
574
575 void C_OrderedThrottle::finish(int r) {
576 m_ordered_throttle->finish_op(m_tid, r);
577 }
578
579 OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent)
580 : m_lock("OrderedThrottle::m_lock"), m_max(max), m_current(0), m_ret_val(0),
581 m_ignore_enoent(ignore_enoent), m_next_tid(0), m_complete_tid(0) {
582 }
583
584 C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) {
585 assert(on_finish != NULL);
586
587 Mutex::Locker locker(m_lock);
588 uint64_t tid = m_next_tid++;
589 m_tid_result[tid] = Result(on_finish);
590 C_OrderedThrottle *ctx = new C_OrderedThrottle(this, tid);
591
592 complete_pending_ops();
593 while (m_max == m_current) {
594 m_cond.Wait(m_lock);
595 complete_pending_ops();
596 }
597 ++m_current;
598
599 return ctx;
600 }
601
602 void OrderedThrottle::end_op(int r) {
603 Mutex::Locker locker(m_lock);
604 assert(m_current > 0);
605
606 if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) {
607 m_ret_val = r;
608 }
609 --m_current;
610 m_cond.Signal();
611 }
612
613 void OrderedThrottle::finish_op(uint64_t tid, int r) {
614 Mutex::Locker locker(m_lock);
615
616 TidResult::iterator it = m_tid_result.find(tid);
617 assert(it != m_tid_result.end());
618
619 it->second.finished = true;
620 it->second.ret_val = r;
621 m_cond.Signal();
622 }
623
624 bool OrderedThrottle::pending_error() const {
625 Mutex::Locker locker(m_lock);
626 return (m_ret_val < 0);
627 }
628
629 int OrderedThrottle::wait_for_ret() {
630 Mutex::Locker locker(m_lock);
631 complete_pending_ops();
632
633 while (m_current > 0) {
634 m_cond.Wait(m_lock);
635 complete_pending_ops();
636 }
637 return m_ret_val;
638 }
639
640 void OrderedThrottle::complete_pending_ops() {
641 assert(m_lock.is_locked());
642
643 while (true) {
644 TidResult::iterator it = m_tid_result.begin();
645 if (it == m_tid_result.end() || it->first != m_complete_tid ||
646 !it->second.finished) {
647 break;
648 }
649
650 Result result = it->second;
651 m_tid_result.erase(it);
652
653 m_lock.Unlock();
654 result.on_finish->complete(result.ret_val);
655 m_lock.Lock();
656
657 ++m_complete_tid;
658 }
659 }