]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
c07f9fc5 FG |
4 | #include "include/scope_guard.h" |
5 | ||
7c673cae | 6 | #include "common/Throttle.h" |
11fdf7f2 | 7 | #include "common/ceph_time.h" |
7c673cae FG |
8 | #include "common/perf_counters.h" |
9 | ||
11fdf7f2 | 10 | |
31f18b77 | 11 | // re-include our assert to clobber the system one; fix dout: |
11fdf7f2 | 12 | #include "include/ceph_assert.h" |
31f18b77 | 13 | |
7c673cae FG |
14 | #define dout_subsys ceph_subsys_throttle |
15 | ||
16 | #undef dout_prefix | |
17 | #define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " | |
18 | ||
f67539c2 TL |
19 | using std::list; |
20 | using std::ostream; | |
21 | using std::string; | |
22 | ||
11fdf7f2 TL |
23 | using ceph::mono_clock; |
24 | using ceph::mono_time; | |
f67539c2 | 25 | using ceph::timespan; |
11fdf7f2 | 26 | |
7c673cae FG |
27 | enum { |
28 | l_throttle_first = 532430, | |
29 | l_throttle_val, | |
30 | l_throttle_max, | |
31 | l_throttle_get_started, | |
32 | l_throttle_get, | |
33 | l_throttle_get_sum, | |
34 | l_throttle_get_or_fail_fail, | |
35 | l_throttle_get_or_fail_success, | |
36 | l_throttle_take, | |
37 | l_throttle_take_sum, | |
38 | l_throttle_put, | |
39 | l_throttle_put_sum, | |
40 | l_throttle_wait, | |
41 | l_throttle_last, | |
42 | }; | |
43 | ||
11fdf7f2 TL |
44 | Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, |
45 | bool _use_perf) | |
46 | : cct(cct), name(n), max(m), | |
7c673cae FG |
47 | use_perf(_use_perf) |
48 | { | |
11fdf7f2 | 49 | ceph_assert(m >= 0); |
7c673cae FG |
50 | |
51 | if (!use_perf) | |
52 | return; | |
53 | ||
54 | if (cct->_conf->throttler_perf_counter) { | |
55 | PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last); | |
56 | b.add_u64(l_throttle_val, "val", "Currently available throttle"); | |
57 | b.add_u64(l_throttle_max, "max", "Max value for throttle"); | |
58 | b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait"); | |
59 | b.add_u64_counter(l_throttle_get, "get", "Gets"); | |
60 | b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data"); | |
61 | b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail"); | |
62 | b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail"); | |
63 | b.add_u64_counter(l_throttle_take, "take", "Takes"); | |
64 | b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data"); | |
65 | b.add_u64_counter(l_throttle_put, "put", "Puts"); | |
66 | b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data"); | |
67 | b.add_time_avg(l_throttle_wait, "wait", "Waiting latency"); | |
68 | ||
11fdf7f2 TL |
69 | logger = { b.create_perf_counters(), cct }; |
70 | cct->get_perfcounters_collection()->add(logger.get()); | |
31f18b77 | 71 | logger->set(l_throttle_max, max); |
7c673cae FG |
72 | } |
73 | } | |
74 | ||
75 | Throttle::~Throttle() | |
76 | { | |
11fdf7f2 TL |
77 | std::lock_guard l(lock); |
78 | ceph_assert(conds.empty()); | |
7c673cae FG |
79 | } |
80 | ||
81 | void Throttle::_reset_max(int64_t m) | |
82 | { | |
b32b8144 | 83 | // lock must be held. |
b32b8144 | 84 | if (max == m) |
7c673cae | 85 | return; |
11fdf7f2 TL |
86 | if (!conds.empty()) |
87 | conds.front().notify_one(); | |
7c673cae FG |
88 | if (logger) |
89 | logger->set(l_throttle_max, m); | |
31f18b77 | 90 | max = m; |
7c673cae FG |
91 | } |
92 | ||
11fdf7f2 | 93 | bool Throttle::_wait(int64_t c, std::unique_lock<std::mutex>& l) |
7c673cae | 94 | { |
11fdf7f2 | 95 | mono_time start; |
7c673cae | 96 | bool waited = false; |
11fdf7f2 | 97 | if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters. |
c07f9fc5 | 98 | { |
11fdf7f2 | 99 | auto cv = conds.emplace(conds.end()); |
c07f9fc5 | 100 | auto w = make_scope_guard([this, cv]() { |
11fdf7f2 | 101 | conds.erase(cv); |
c07f9fc5 FG |
102 | }); |
103 | waited = true; | |
104 | ldout(cct, 2) << "_wait waiting..." << dendl; | |
105 | if (logger) | |
11fdf7f2 | 106 | start = mono_clock::now(); |
c07f9fc5 | 107 | |
11fdf7f2 TL |
108 | cv->wait(l, [this, c, cv]() { return (!_should_wait(c) && |
109 | cv == conds.begin()); }); | |
c07f9fc5 FG |
110 | ldout(cct, 2) << "_wait finished waiting" << dendl; |
111 | if (logger) { | |
11fdf7f2 | 112 | logger->tinc(l_throttle_wait, mono_clock::now() - start); |
c07f9fc5 | 113 | } |
7c673cae | 114 | } |
7c673cae | 115 | // wake up the next guy |
11fdf7f2 TL |
116 | if (!conds.empty()) |
117 | conds.front().notify_one(); | |
7c673cae FG |
118 | } |
119 | return waited; | |
120 | } | |
121 | ||
122 | bool Throttle::wait(int64_t m) | |
123 | { | |
31f18b77 | 124 | if (0 == max && 0 == m) { |
7c673cae FG |
125 | return false; |
126 | } | |
127 | ||
11fdf7f2 | 128 | std::unique_lock l(lock); |
7c673cae | 129 | if (m) { |
11fdf7f2 | 130 | ceph_assert(m > 0); |
7c673cae FG |
131 | _reset_max(m); |
132 | } | |
133 | ldout(cct, 10) << "wait" << dendl; | |
11fdf7f2 | 134 | return _wait(0, l); |
7c673cae FG |
135 | } |
136 | ||
137 | int64_t Throttle::take(int64_t c) | |
138 | { | |
31f18b77 | 139 | if (0 == max) { |
7c673cae FG |
140 | return 0; |
141 | } | |
11fdf7f2 | 142 | ceph_assert(c >= 0); |
7c673cae | 143 | ldout(cct, 10) << "take " << c << dendl; |
f67539c2 | 144 | count += c; |
7c673cae FG |
145 | if (logger) { |
146 | logger->inc(l_throttle_take); | |
147 | logger->inc(l_throttle_take_sum, c); | |
31f18b77 | 148 | logger->set(l_throttle_val, count); |
7c673cae | 149 | } |
31f18b77 | 150 | return count; |
7c673cae FG |
151 | } |
152 | ||
153 | bool Throttle::get(int64_t c, int64_t m) | |
154 | { | |
31f18b77 | 155 | if (0 == max && 0 == m) { |
9f95a23c | 156 | count += c; |
7c673cae FG |
157 | return false; |
158 | } | |
159 | ||
11fdf7f2 | 160 | ceph_assert(c >= 0); |
31f18b77 | 161 | ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; |
7c673cae FG |
162 | if (logger) { |
163 | logger->inc(l_throttle_get_started); | |
164 | } | |
165 | bool waited = false; | |
166 | { | |
11fdf7f2 | 167 | std::unique_lock l(lock); |
7c673cae | 168 | if (m) { |
11fdf7f2 | 169 | ceph_assert(m > 0); |
7c673cae FG |
170 | _reset_max(m); |
171 | } | |
11fdf7f2 | 172 | waited = _wait(c, l); |
31f18b77 | 173 | count += c; |
7c673cae FG |
174 | } |
175 | if (logger) { | |
176 | logger->inc(l_throttle_get); | |
177 | logger->inc(l_throttle_get_sum, c); | |
31f18b77 | 178 | logger->set(l_throttle_val, count); |
7c673cae FG |
179 | } |
180 | return waited; | |
181 | } | |
182 | ||
183 | /* Returns true if it successfully got the requested amount, | |
184 | * or false if it would block. | |
185 | */ | |
186 | bool Throttle::get_or_fail(int64_t c) | |
187 | { | |
31f18b77 | 188 | if (0 == max) { |
9f95a23c | 189 | count += c; |
7c673cae FG |
190 | return true; |
191 | } | |
192 | ||
193 | assert (c >= 0); | |
f67539c2 TL |
194 | bool result = false; |
195 | { | |
196 | std::lock_guard l(lock); | |
197 | if (_should_wait(c) || !conds.empty()) { | |
198 | ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; | |
199 | result = false; | |
200 | } else { | |
201 | ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() | |
202 | << " -> " << (count.load() + c) << ")" << dendl; | |
203 | count += c; | |
204 | result = true; | |
7c673cae | 205 | } |
f67539c2 TL |
206 | } |
207 | ||
208 | if (logger) { | |
209 | if (result) { | |
7c673cae FG |
210 | logger->inc(l_throttle_get_or_fail_success); |
211 | logger->inc(l_throttle_get); | |
212 | logger->inc(l_throttle_get_sum, c); | |
31f18b77 | 213 | logger->set(l_throttle_val, count); |
f67539c2 TL |
214 | } else { |
215 | logger->inc(l_throttle_get_or_fail_fail); | |
7c673cae | 216 | } |
7c673cae | 217 | } |
f67539c2 | 218 | return result; |
7c673cae FG |
219 | } |
220 | ||
221 | int64_t Throttle::put(int64_t c) | |
222 | { | |
31f18b77 | 223 | if (0 == max) { |
9f95a23c | 224 | count -= c; |
7c673cae FG |
225 | return 0; |
226 | } | |
227 | ||
11fdf7f2 TL |
228 | ceph_assert(c >= 0); |
229 | ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " | |
230 | << (count.load()-c) << ")" << dendl; | |
f67539c2 TL |
231 | int64_t new_count; |
232 | { | |
233 | std::lock_guard l(lock); | |
234 | new_count = count; | |
235 | if (c) { | |
236 | if (!conds.empty()) | |
237 | conds.front().notify_one(); | |
238 | // if count goes negative, we failed somewhere! | |
239 | ceph_assert(count >= c); | |
240 | new_count = count -= c; | |
7c673cae FG |
241 | } |
242 | } | |
f67539c2 TL |
243 | if (logger) { |
244 | logger->inc(l_throttle_put); | |
245 | logger->inc(l_throttle_put_sum, c); | |
246 | logger->set(l_throttle_val, count); | |
247 | } | |
248 | ||
249 | return new_count; | |
7c673cae FG |
250 | } |
251 | ||
252 | void Throttle::reset() | |
253 | { | |
11fdf7f2 TL |
254 | std::lock_guard l(lock); |
255 | if (!conds.empty()) | |
256 | conds.front().notify_one(); | |
31f18b77 | 257 | count = 0; |
7c673cae FG |
258 | if (logger) { |
259 | logger->set(l_throttle_val, 0); | |
260 | } | |
261 | } | |
262 | ||
263 | enum { | |
264 | l_backoff_throttle_first = l_throttle_last + 1, | |
265 | l_backoff_throttle_val, | |
266 | l_backoff_throttle_max, | |
267 | l_backoff_throttle_get, | |
268 | l_backoff_throttle_get_sum, | |
269 | l_backoff_throttle_take, | |
270 | l_backoff_throttle_take_sum, | |
271 | l_backoff_throttle_put, | |
272 | l_backoff_throttle_put_sum, | |
273 | l_backoff_throttle_wait, | |
274 | l_backoff_throttle_last, | |
275 | }; | |
276 | ||
11fdf7f2 TL |
277 | BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, |
278 | unsigned expected_concurrency, bool _use_perf) | |
279 | : cct(cct), name(n), | |
7c673cae FG |
280 | conds(expected_concurrency),///< [in] determines size of conds |
281 | use_perf(_use_perf) | |
282 | { | |
283 | if (!use_perf) | |
284 | return; | |
285 | ||
286 | if (cct->_conf->throttler_perf_counter) { | |
11fdf7f2 TL |
287 | PerfCountersBuilder b(cct, string("throttle-") + name, |
288 | l_backoff_throttle_first, l_backoff_throttle_last); | |
7c673cae FG |
289 | b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle"); |
290 | b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle"); | |
291 | b.add_u64_counter(l_backoff_throttle_get, "get", "Gets"); | |
292 | b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data"); | |
293 | b.add_u64_counter(l_backoff_throttle_take, "take", "Takes"); | |
294 | b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data"); | |
295 | b.add_u64_counter(l_backoff_throttle_put, "put", "Puts"); | |
296 | b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data"); | |
297 | b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency"); | |
298 | ||
11fdf7f2 TL |
299 | logger = { b.create_perf_counters(), cct }; |
300 | cct->get_perfcounters_collection()->add(logger.get()); | |
7c673cae FG |
301 | logger->set(l_backoff_throttle_max, max); |
302 | } | |
303 | } | |
304 | ||
305 | BackoffThrottle::~BackoffThrottle() | |
306 | { | |
11fdf7f2 TL |
307 | std::lock_guard l(lock); |
308 | ceph_assert(waiters.empty()); | |
7c673cae FG |
309 | } |
310 | ||
311 | bool BackoffThrottle::set_params( | |
11fdf7f2 TL |
312 | double _low_threshold, |
313 | double _high_threshold, | |
7c673cae FG |
314 | double _expected_throughput, |
315 | double _high_multiple, | |
316 | double _max_multiple, | |
317 | uint64_t _throttle_max, | |
318 | ostream *errstream) | |
319 | { | |
320 | bool valid = true; | |
11fdf7f2 | 321 | if (_low_threshold > _high_threshold) { |
7c673cae FG |
322 | valid = false; |
323 | if (errstream) { | |
11fdf7f2 TL |
324 | *errstream << "low_threshold (" << _low_threshold |
325 | << ") > high_threshold (" << _high_threshold | |
7c673cae FG |
326 | << ")" << std::endl; |
327 | } | |
328 | } | |
329 | ||
330 | if (_high_multiple > _max_multiple) { | |
331 | valid = false; | |
332 | if (errstream) { | |
333 | *errstream << "_high_multiple (" << _high_multiple | |
334 | << ") > _max_multiple (" << _max_multiple | |
335 | << ")" << std::endl; | |
336 | } | |
337 | } | |
338 | ||
11fdf7f2 | 339 | if (_low_threshold > 1 || _low_threshold < 0) { |
7c673cae FG |
340 | valid = false; |
341 | if (errstream) { | |
11fdf7f2 | 342 | *errstream << "invalid low_threshold (" << _low_threshold << ")" |
7c673cae FG |
343 | << std::endl; |
344 | } | |
345 | } | |
346 | ||
11fdf7f2 | 347 | if (_high_threshold > 1 || _high_threshold < 0) { |
7c673cae FG |
348 | valid = false; |
349 | if (errstream) { | |
11fdf7f2 | 350 | *errstream << "invalid high_threshold (" << _high_threshold << ")" |
7c673cae FG |
351 | << std::endl; |
352 | } | |
353 | } | |
354 | ||
355 | if (_max_multiple < 0) { | |
356 | valid = false; | |
357 | if (errstream) { | |
358 | *errstream << "invalid _max_multiple (" | |
359 | << _max_multiple << ")" | |
360 | << std::endl; | |
361 | } | |
362 | } | |
363 | ||
364 | if (_high_multiple < 0) { | |
365 | valid = false; | |
366 | if (errstream) { | |
367 | *errstream << "invalid _high_multiple (" | |
368 | << _high_multiple << ")" | |
369 | << std::endl; | |
370 | } | |
371 | } | |
372 | ||
373 | if (_expected_throughput < 0) { | |
374 | valid = false; | |
375 | if (errstream) { | |
376 | *errstream << "invalid _expected_throughput(" | |
377 | << _expected_throughput << ")" | |
378 | << std::endl; | |
379 | } | |
380 | } | |
381 | ||
382 | if (!valid) | |
383 | return false; | |
384 | ||
385 | locker l(lock); | |
11fdf7f2 TL |
386 | low_threshold = _low_threshold; |
387 | high_threshold = _high_threshold; | |
7c673cae FG |
388 | high_delay_per_count = _high_multiple / _expected_throughput; |
389 | max_delay_per_count = _max_multiple / _expected_throughput; | |
390 | max = _throttle_max; | |
391 | ||
392 | if (logger) | |
393 | logger->set(l_backoff_throttle_max, max); | |
394 | ||
11fdf7f2 TL |
395 | if (high_threshold - low_threshold > 0) { |
396 | s0 = high_delay_per_count / (high_threshold - low_threshold); | |
7c673cae | 397 | } else { |
11fdf7f2 | 398 | low_threshold = high_threshold; |
7c673cae FG |
399 | s0 = 0; |
400 | } | |
401 | ||
11fdf7f2 | 402 | if (1 - high_threshold > 0) { |
7c673cae | 403 | s1 = (max_delay_per_count - high_delay_per_count) |
11fdf7f2 | 404 | / (1 - high_threshold); |
7c673cae | 405 | } else { |
11fdf7f2 | 406 | high_threshold = 1; |
7c673cae FG |
407 | s1 = 0; |
408 | } | |
409 | ||
410 | _kick_waiters(); | |
411 | return true; | |
412 | } | |
413 | ||
11fdf7f2 | 414 | ceph::timespan BackoffThrottle::_get_delay(uint64_t c) const |
7c673cae FG |
415 | { |
416 | if (max == 0) | |
11fdf7f2 | 417 | return ceph::timespan(0); |
7c673cae FG |
418 | |
419 | double r = ((double)current) / ((double)max); | |
11fdf7f2 TL |
420 | if (r < low_threshold) { |
421 | return ceph::timespan(0); | |
422 | } else if (r < high_threshold) { | |
423 | return c * ceph::make_timespan( | |
424 | (r - low_threshold) * s0); | |
7c673cae | 425 | } else { |
11fdf7f2 TL |
426 | return c * ceph::make_timespan( |
427 | high_delay_per_count + ((r - high_threshold) * s1)); | |
7c673cae FG |
428 | } |
429 | } | |
430 | ||
11fdf7f2 | 431 | ceph::timespan BackoffThrottle::get(uint64_t c) |
7c673cae FG |
432 | { |
433 | locker l(lock); | |
434 | auto delay = _get_delay(c); | |
435 | ||
436 | if (logger) { | |
437 | logger->inc(l_backoff_throttle_get); | |
438 | logger->inc(l_backoff_throttle_get_sum, c); | |
439 | } | |
440 | ||
441 | // fast path | |
11fdf7f2 | 442 | if (delay.count() == 0 && |
7c673cae FG |
443 | waiters.empty() && |
444 | ((max == 0) || (current == 0) || ((current + c) <= max))) { | |
445 | current += c; | |
446 | ||
447 | if (logger) { | |
448 | logger->set(l_backoff_throttle_val, current); | |
449 | } | |
450 | ||
11fdf7f2 | 451 | return ceph::make_timespan(0); |
7c673cae FG |
452 | } |
453 | ||
454 | auto ticket = _push_waiter(); | |
11fdf7f2 | 455 | auto wait_from = mono_clock::now(); |
7c673cae FG |
456 | bool waited = false; |
457 | ||
458 | while (waiters.begin() != ticket) { | |
459 | (*ticket)->wait(l); | |
460 | waited = true; | |
461 | } | |
462 | ||
11fdf7f2 | 463 | auto start = mono_clock::now(); |
7c673cae FG |
464 | delay = _get_delay(c); |
465 | while (true) { | |
11fdf7f2 | 466 | if (max != 0 && current != 0 && (current + c) > max) { |
7c673cae FG |
467 | (*ticket)->wait(l); |
468 | waited = true; | |
11fdf7f2 | 469 | } else if (delay.count() > 0) { |
7c673cae FG |
470 | (*ticket)->wait_for(l, delay); |
471 | waited = true; | |
472 | } else { | |
473 | break; | |
474 | } | |
11fdf7f2 TL |
475 | ceph_assert(ticket == waiters.begin()); |
476 | delay = _get_delay(c); | |
477 | auto elapsed = mono_clock::now() - start; | |
478 | if (delay <= elapsed) { | |
479 | delay = timespan::zero(); | |
480 | } else { | |
481 | delay -= elapsed; | |
482 | } | |
7c673cae FG |
483 | } |
484 | waiters.pop_front(); | |
485 | _kick_waiters(); | |
486 | ||
487 | current += c; | |
488 | ||
489 | if (logger) { | |
490 | logger->set(l_backoff_throttle_val, current); | |
491 | if (waited) { | |
11fdf7f2 | 492 | logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from); |
7c673cae FG |
493 | } |
494 | } | |
495 | ||
11fdf7f2 | 496 | return mono_clock::now() - start; |
7c673cae FG |
497 | } |
498 | ||
499 | uint64_t BackoffThrottle::put(uint64_t c) | |
500 | { | |
501 | locker l(lock); | |
11fdf7f2 | 502 | ceph_assert(current >= c); |
7c673cae FG |
503 | current -= c; |
504 | _kick_waiters(); | |
505 | ||
506 | if (logger) { | |
507 | logger->inc(l_backoff_throttle_put); | |
508 | logger->inc(l_backoff_throttle_put_sum, c); | |
509 | logger->set(l_backoff_throttle_val, current); | |
510 | } | |
511 | ||
512 | return current; | |
513 | } | |
514 | ||
515 | uint64_t BackoffThrottle::take(uint64_t c) | |
516 | { | |
517 | locker l(lock); | |
518 | current += c; | |
519 | ||
520 | if (logger) { | |
521 | logger->inc(l_backoff_throttle_take); | |
522 | logger->inc(l_backoff_throttle_take_sum, c); | |
523 | logger->set(l_backoff_throttle_val, current); | |
524 | } | |
525 | ||
526 | return current; | |
527 | } | |
528 | ||
529 | uint64_t BackoffThrottle::get_current() | |
530 | { | |
531 | locker l(lock); | |
532 | return current; | |
533 | } | |
534 | ||
535 | uint64_t BackoffThrottle::get_max() | |
536 | { | |
537 | locker l(lock); | |
538 | return max; | |
539 | } | |
540 | ||
541 | SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) | |
11fdf7f2 | 542 | : m_max(max), m_ignore_enoent(ignore_enoent) {} |
7c673cae FG |
543 | |
544 | SimpleThrottle::~SimpleThrottle() | |
545 | { | |
11fdf7f2 TL |
546 | std::lock_guard l(m_lock); |
547 | ceph_assert(m_current == 0); | |
548 | ceph_assert(waiters == 0); | |
7c673cae FG |
549 | } |
550 | ||
551 | void SimpleThrottle::start_op() | |
552 | { | |
11fdf7f2 TL |
553 | std::unique_lock l(m_lock); |
554 | waiters++; | |
555 | m_cond.wait(l, [this]() { return m_max != m_current; }); | |
556 | waiters--; | |
7c673cae FG |
557 | ++m_current; |
558 | } | |
559 | ||
560 | void SimpleThrottle::end_op(int r) | |
561 | { | |
11fdf7f2 | 562 | std::lock_guard l(m_lock); |
7c673cae FG |
563 | --m_current; |
564 | if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent)) | |
565 | m_ret = r; | |
11fdf7f2 | 566 | m_cond.notify_all(); |
7c673cae FG |
567 | } |
568 | ||
569 | bool SimpleThrottle::pending_error() const | |
570 | { | |
11fdf7f2 | 571 | std::lock_guard l(m_lock); |
7c673cae FG |
572 | return (m_ret < 0); |
573 | } | |
574 | ||
575 | int SimpleThrottle::wait_for_ret() | |
576 | { | |
11fdf7f2 TL |
577 | std::unique_lock l(m_lock); |
578 | waiters++; | |
579 | m_cond.wait(l, [this]() { return m_current == 0; }); | |
580 | waiters--; | |
7c673cae FG |
581 | return m_ret; |
582 | } | |
583 | ||
584 | void C_OrderedThrottle::finish(int r) { | |
585 | m_ordered_throttle->finish_op(m_tid, r); | |
586 | } | |
587 | ||
588 | OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) | |
11fdf7f2 | 589 | : m_max(max), m_ignore_enoent(ignore_enoent) {} |
7c673cae | 590 | |
c07f9fc5 | 591 | OrderedThrottle::~OrderedThrottle() { |
11fdf7f2 TL |
592 | std::lock_guard l(m_lock); |
593 | ceph_assert(waiters == 0); | |
c07f9fc5 FG |
594 | } |
595 | ||
7c673cae | 596 | C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { |
11fdf7f2 | 597 | ceph_assert(on_finish); |
7c673cae | 598 | |
11fdf7f2 | 599 | std::unique_lock l(m_lock); |
7c673cae FG |
600 | uint64_t tid = m_next_tid++; |
601 | m_tid_result[tid] = Result(on_finish); | |
11fdf7f2 | 602 | auto ctx = std::make_unique<C_OrderedThrottle>(this, tid); |
7c673cae | 603 | |
11fdf7f2 | 604 | complete_pending_ops(l); |
7c673cae | 605 | while (m_max == m_current) { |
c07f9fc5 | 606 | ++waiters; |
11fdf7f2 | 607 | m_cond.wait(l); |
c07f9fc5 | 608 | --waiters; |
11fdf7f2 | 609 | complete_pending_ops(l); |
7c673cae FG |
610 | } |
611 | ++m_current; | |
612 | ||
11fdf7f2 | 613 | return ctx.release(); |
7c673cae FG |
614 | } |
615 | ||
616 | void OrderedThrottle::end_op(int r) { | |
11fdf7f2 TL |
617 | std::lock_guard l(m_lock); |
618 | ceph_assert(m_current > 0); | |
7c673cae FG |
619 | |
620 | if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { | |
621 | m_ret_val = r; | |
622 | } | |
623 | --m_current; | |
11fdf7f2 | 624 | m_cond.notify_all(); |
7c673cae FG |
625 | } |
626 | ||
627 | void OrderedThrottle::finish_op(uint64_t tid, int r) { | |
11fdf7f2 | 628 | std::lock_guard l(m_lock); |
7c673cae | 629 | |
11fdf7f2 TL |
630 | auto it = m_tid_result.find(tid); |
631 | ceph_assert(it != m_tid_result.end()); | |
7c673cae FG |
632 | |
633 | it->second.finished = true; | |
634 | it->second.ret_val = r; | |
11fdf7f2 | 635 | m_cond.notify_all(); |
7c673cae FG |
636 | } |
637 | ||
638 | bool OrderedThrottle::pending_error() const { | |
11fdf7f2 | 639 | std::lock_guard l(m_lock); |
7c673cae FG |
640 | return (m_ret_val < 0); |
641 | } | |
642 | ||
643 | int OrderedThrottle::wait_for_ret() { | |
11fdf7f2 TL |
644 | std::unique_lock l(m_lock); |
645 | complete_pending_ops(l); | |
7c673cae FG |
646 | |
647 | while (m_current > 0) { | |
c07f9fc5 | 648 | ++waiters; |
11fdf7f2 | 649 | m_cond.wait(l); |
c07f9fc5 | 650 | --waiters; |
11fdf7f2 | 651 | complete_pending_ops(l); |
7c673cae FG |
652 | } |
653 | return m_ret_val; | |
654 | } | |
655 | ||
11fdf7f2 | 656 | void OrderedThrottle::complete_pending_ops(std::unique_lock<std::mutex>& l) { |
7c673cae | 657 | while (true) { |
11fdf7f2 | 658 | auto it = m_tid_result.begin(); |
7c673cae FG |
659 | if (it == m_tid_result.end() || it->first != m_complete_tid || |
660 | !it->second.finished) { | |
661 | break; | |
662 | } | |
663 | ||
664 | Result result = it->second; | |
665 | m_tid_result.erase(it); | |
666 | ||
11fdf7f2 | 667 | l.unlock(); |
7c673cae | 668 | result.on_finish->complete(result.ret_val); |
11fdf7f2 | 669 | l.lock(); |
7c673cae FG |
670 | |
671 | ++m_complete_tid; | |
672 | } | |
673 | } | |
11fdf7f2 TL |
674 | |
675 | #undef dout_prefix | |
676 | #define dout_prefix *_dout << "TokenBucketThrottle(" << m_name << " " \ | |
677 | << (void*)this << ") " | |
678 | ||
679 | uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) { | |
680 | if (0 == max) { | |
681 | return 0; | |
682 | } | |
683 | ||
684 | uint64_t got = 0; | |
f67539c2 | 685 | if (available >= c) { |
11fdf7f2 TL |
686 | // There is enough token in bucket, take c. |
687 | got = c; | |
f67539c2 | 688 | available -= c; |
11fdf7f2 TL |
689 | remain -= c; |
690 | } else { | |
f67539c2 TL |
691 | // There is not enough, take all available. |
692 | got = available; | |
693 | remain -= available; | |
694 | available = 0; | |
11fdf7f2 TL |
695 | } |
696 | return got; | |
697 | } | |
698 | ||
f67539c2 | 699 | uint64_t TokenBucketThrottle::Bucket::put(uint64_t tokens, double burst_ratio) { |
11fdf7f2 TL |
700 | if (0 == max) { |
701 | return 0; | |
702 | } | |
703 | ||
f67539c2 TL |
704 | if (tokens) { |
705 | // put tokens into bucket | |
11fdf7f2 | 706 | uint64_t current = remain; |
f67539c2 TL |
707 | if ((current + tokens) <= capacity) { |
708 | remain += tokens; | |
11fdf7f2 | 709 | } else { |
f67539c2 TL |
710 | remain = capacity; |
711 | } | |
712 | ||
713 | // available tokens increase at burst speed | |
714 | uint64_t available_inc = tokens; | |
715 | if (burst_ratio > 1) { | |
716 | available_inc = (uint64_t)(tokens * burst_ratio); | |
11fdf7f2 | 717 | } |
f67539c2 TL |
718 | uint64_t inc_upper_limit = remain > max ? max : remain; |
719 | if ((available + available_inc) <= inc_upper_limit ){ | |
720 | available += available_inc; | |
721 | }else{ | |
722 | available = inc_upper_limit; | |
723 | } | |
724 | ||
11fdf7f2 TL |
725 | } |
726 | return remain; | |
727 | } | |
728 | ||
f67539c2 TL |
729 | void TokenBucketThrottle::Bucket::set_max(uint64_t max, uint64_t burst_seconds) { |
730 | // the capacity of bucket should not be less than max | |
731 | if (burst_seconds < 1){ | |
732 | burst_seconds = 1; | |
11fdf7f2 | 733 | } |
f67539c2 TL |
734 | uint64_t new_capacity = max*burst_seconds; |
735 | if (capacity != new_capacity){ | |
736 | capacity = new_capacity; | |
737 | remain = capacity; | |
738 | } | |
739 | if (available > max || 0 == max) { | |
740 | available = max; | |
741 | } | |
742 | this->max = max; | |
11fdf7f2 TL |
743 | } |
744 | ||
745 | TokenBucketThrottle::TokenBucketThrottle( | |
746 | CephContext *cct, | |
747 | const std::string &name, | |
f67539c2 | 748 | uint64_t burst, |
11fdf7f2 TL |
749 | uint64_t avg, |
750 | SafeTimer *timer, | |
9f95a23c | 751 | ceph::mutex *timer_lock) |
11fdf7f2 | 752 | : m_cct(cct), m_name(name), |
f67539c2 TL |
753 | m_throttle(m_cct, name + "_bucket", burst), |
754 | m_burst(burst), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock), | |
9f95a23c | 755 | m_lock(ceph::make_mutex(name + "_lock")) |
11fdf7f2 TL |
756 | {} |
757 | ||
758 | TokenBucketThrottle::~TokenBucketThrottle() { | |
759 | // cancel the timer events. | |
760 | { | |
761 | std::lock_guard timer_locker(*m_timer_lock); | |
762 | cancel_timer(); | |
763 | } | |
764 | ||
765 | list<Blocker> tmp_blockers; | |
766 | { | |
767 | std::lock_guard blockers_lock(m_lock); | |
768 | tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end()); | |
769 | } | |
770 | ||
771 | for (auto b : tmp_blockers) { | |
772 | b.ctx->complete(0); | |
773 | } | |
774 | } | |
775 | ||
f67539c2 | 776 | int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds) { |
11fdf7f2 | 777 | { |
9f95a23c | 778 | std::lock_guard lock{m_lock}; |
11fdf7f2 TL |
779 | |
780 | if (0 < burst && burst < average) { | |
781 | // the burst should never less than the average. | |
782 | return -EINVAL; | |
783 | } | |
784 | ||
785 | m_avg = average; | |
786 | m_burst = burst; | |
787 | ||
788 | if (0 == average) { | |
789 | // The limit is not set, and no tokens will be put into the bucket. | |
790 | // So, we can schedule the timer slowly, or even cancel it. | |
791 | m_tick = 1000; | |
792 | } else { | |
793 | // calculate the tick(ms), don't less than the minimum. | |
794 | m_tick = 1000 / average; | |
795 | if (m_tick < m_tick_min) { | |
796 | m_tick = m_tick_min; | |
797 | } | |
798 | ||
799 | // this is for the number(avg) can not be divisible. | |
800 | m_ticks_per_second = 1000 / m_tick; | |
801 | m_current_tick = 0; | |
802 | ||
803 | // for the default configuration of burst. | |
f67539c2 | 804 | m_throttle.set_max(0 == burst ? average : burst, burst_seconds); |
11fdf7f2 TL |
805 | } |
806 | // turn millisecond to second | |
807 | m_schedule_tick = m_tick / 1000.0; | |
808 | } | |
809 | ||
810 | // The schedule period will be changed when the average rate is set. | |
811 | { | |
9f95a23c | 812 | std::lock_guard timer_locker{*m_timer_lock}; |
11fdf7f2 TL |
813 | cancel_timer(); |
814 | schedule_timer(); | |
815 | } | |
816 | return 0; | |
817 | } | |
818 | ||
819 | void TokenBucketThrottle::set_schedule_tick_min(uint64_t tick) { | |
820 | std::lock_guard lock(m_lock); | |
821 | if (tick != 0) { | |
822 | m_tick_min = tick; | |
823 | } | |
824 | } | |
825 | ||
826 | uint64_t TokenBucketThrottle::tokens_filled(double tick) { | |
827 | return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg); | |
828 | } | |
829 | ||
830 | uint64_t TokenBucketThrottle::tokens_this_tick() { | |
831 | if (0 == m_avg) { | |
832 | return 0; | |
833 | } | |
834 | if (m_current_tick >= m_ticks_per_second) { | |
835 | m_current_tick = 0; | |
836 | } | |
837 | m_current_tick++; | |
838 | ||
839 | return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1); | |
840 | } | |
841 | ||
842 | void TokenBucketThrottle::add_tokens() { | |
843 | list<Blocker> tmp_blockers; | |
844 | { | |
845 | std::lock_guard lock(m_lock); | |
846 | // put tokens into bucket. | |
f67539c2 TL |
847 | double burst_ratio = 1.0; |
848 | if (m_throttle.max > m_avg && m_avg > 0){ | |
849 | burst_ratio = (double)m_throttle.max/m_avg; | |
850 | } | |
851 | m_throttle.put(tokens_this_tick(), burst_ratio); | |
11fdf7f2 TL |
852 | if (0 == m_avg || 0 == m_throttle.max) |
853 | tmp_blockers.swap(m_blockers); | |
854 | // check the m_blockers from head to tail, if blocker can get | |
855 | // enough tokens, let it go. | |
856 | while (!m_blockers.empty()) { | |
857 | Blocker &blocker = m_blockers.front(); | |
858 | uint64_t got = m_throttle.get(blocker.tokens_requested); | |
859 | if (got == blocker.tokens_requested) { | |
860 | // got enough tokens for front. | |
861 | tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin()); | |
862 | } else { | |
863 | // there is no more tokens. | |
864 | blocker.tokens_requested -= got; | |
865 | break; | |
866 | } | |
867 | } | |
868 | } | |
869 | ||
870 | for (auto b : tmp_blockers) { | |
871 | b.ctx->complete(0); | |
872 | } | |
873 | } | |
874 | ||
875 | void TokenBucketThrottle::schedule_timer() { | |
9f95a23c | 876 | m_token_ctx = new LambdaContext( |
11fdf7f2 TL |
877 | [this](int r) { |
878 | schedule_timer(); | |
879 | }); | |
880 | m_timer->add_event_after(m_schedule_tick, m_token_ctx); | |
881 | ||
882 | add_tokens(); | |
883 | } | |
884 | ||
885 | void TokenBucketThrottle::cancel_timer() { | |
886 | m_timer->cancel_event(m_token_ctx); | |
887 | } |