]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/fair_queue.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / seastar / src / core / fair_queue.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
1e59de90 22#include <boost/container/small_vector.hpp>
20effc67 23#include <boost/intrusive/parent_from_member.hpp>
9f95a23c
TL
24#include <seastar/core/fair_queue.hh>
25#include <seastar/core/future.hh>
26#include <seastar/core/shared_ptr.hh>
27#include <seastar/core/circular_buffer.hh>
28#include <seastar/util/noncopyable_function.hh>
20effc67 29#include <seastar/core/reactor.hh>
1e59de90 30#include <seastar/core/metrics.hh>
9f95a23c
TL
31#include <queue>
32#include <chrono>
33#include <unordered_set>
9f95a23c 34
f67539c2
TL
35#include "fmt/format.h"
36#include "fmt/ostream.h"
37
9f95a23c
TL
38namespace seastar {
39
f67539c2 40static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
20effc67
TL
41static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
42static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
f67539c2 43
20effc67 44fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
f67539c2
TL
45 : _weight(weight)
46 , _size(size)
47{}
48
20effc67 49float fair_queue_ticket::normalize(fair_queue_ticket denominator) const noexcept {
f67539c2
TL
50 return float(_weight) / denominator._weight + float(_size) / denominator._size;
51}
52
20effc67 53fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const noexcept {
f67539c2
TL
54 return fair_queue_ticket(_weight + desc._weight, _size + desc._size);
55}
56
20effc67 57fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) noexcept {
f67539c2
TL
58 _weight += desc._weight;
59 _size += desc._size;
60 return *this;
61}
62
20effc67 63fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const noexcept {
f67539c2
TL
64 return fair_queue_ticket(_weight - desc._weight, _size - desc._size);
65}
66
20effc67 67fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) noexcept {
f67539c2
TL
68 _weight -= desc._weight;
69 _size -= desc._size;
70 return *this;
71}
72
20effc67
TL
73fair_queue_ticket::operator bool() const noexcept {
74 return (_weight > 0) || (_size > 0);
f67539c2
TL
75}
76
1e59de90
TL
77bool fair_queue_ticket::is_non_zero() const noexcept {
78 return (_weight > 0) && (_size > 0);
79}
80
20effc67
TL
81bool fair_queue_ticket::operator==(const fair_queue_ticket& o) const noexcept {
82 return _weight == o._weight && _size == o._size;
f67539c2
TL
83}
84
85std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) {
86 return os << t._weight << ":" << t._size;
87}
88
1e59de90
TL
89fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept {
90 return fair_queue_ticket(std::max<int32_t>(a._weight - b._weight, 0),
91 std::max<int32_t>(a._size - b._size, 0));
92}
93
94fair_group::fair_group(config cfg)
95 : _cost_capacity(cfg.weight_rate / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(), cfg.size_rate / token_bucket_t::rate_cast(std::chrono::seconds(1)).count())
96 , _token_bucket(cfg.rate_factor * fixed_point_factor,
97 std::max<capacity_t>(cfg.rate_factor * fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), ticket_capacity(fair_queue_ticket(cfg.limit_min_weight, cfg.limit_min_size))),
98 ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size))
99 )
100{
101 assert(_cost_capacity.is_non_zero());
102 seastar_logger.info("Created fair group {}, capacity rate {}, limit {}, rate {} (factor {}), threshold {}", cfg.label,
103 _cost_capacity, _token_bucket.limit(), _token_bucket.rate(), cfg.rate_factor, _token_bucket.threshold());
20effc67 104
1e59de90
TL
105 if (cfg.rate_factor * fixed_point_factor > _token_bucket.max_rate) {
106 throw std::runtime_error("Fair-group rate_factor is too large");
107 }
108
109 if (ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size)) > _token_bucket.threshold()) {
110 throw std::runtime_error("Fair-group replenisher limit is lower than threshold");
111 }
20effc67
TL
112}
113
1e59de90
TL
114auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {
115 assert(cap <= _token_bucket.limit());
116 return _token_bucket.grab(cap);
20effc67
TL
117}
118
1e59de90
TL
119void fair_group::release_capacity(capacity_t cap) noexcept {
120 _token_bucket.release(cap);
20effc67
TL
121}
122
1e59de90
TL
123void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
124 _token_bucket.replenish(now);
20effc67
TL
125}
126
1e59de90
TL
127void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
128 auto now = clock_type::now();
129 auto extra = _token_bucket.accumulated_in(now - local_ts);
130
131 if (extra >= _token_bucket.threshold()) {
132 local_ts = now;
133 replenish_capacity(now);
134 }
20effc67
TL
135}
136
1e59de90
TL
137auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
138 return _token_bucket.deficiency(from);
20effc67
TL
139}
140
1e59de90
TL
141auto fair_group::ticket_capacity(fair_queue_ticket t) const noexcept -> capacity_t {
142 return t.normalize(_cost_capacity) * fixed_point_factor;
20effc67
TL
143}
144
145// Priority class, to be used with a given fair_queue
146class fair_queue::priority_class_data {
20effc67
TL
147 friend class fair_queue;
148 uint32_t _shares = 0;
1e59de90
TL
149 capacity_t _accumulated = 0;
150 capacity_t _pure_accumulated = 0;
20effc67
TL
151 fair_queue_entry::container_list_t _queue;
152 bool _queued = false;
1e59de90 153 bool _plugged = true;
20effc67
TL
154
155public:
156 explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
1e59de90
TL
157 priority_class_data(const priority_class_data&) = delete;
158 priority_class_data(priority_class_data&&) = delete;
20effc67
TL
159
160 void update_shares(uint32_t shares) noexcept {
161 _shares = (std::max(shares, 1u));
162 }
163};
164
165bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept {
166 return lhs->_accumulated > rhs->_accumulated;
167}
168
169fair_queue::fair_queue(fair_group& group, config cfg)
f67539c2 170 : _config(std::move(cfg))
20effc67 171 , _group(group)
1e59de90 172 , _group_replenish(clock_type::now())
20effc67 173{
20effc67
TL
174}
175
176fair_queue::fair_queue(fair_queue&& other)
177 : _config(std::move(other._config))
178 , _group(other._group)
1e59de90 179 , _group_replenish(std::move(other._group_replenish))
20effc67
TL
180 , _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{}))
181 , _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{}))
182 , _requests_executing(std::exchange(other._requests_executing, 0))
183 , _requests_queued(std::exchange(other._requests_queued, 0))
20effc67
TL
184 , _handles(std::move(other._handles))
185 , _priority_classes(std::move(other._priority_classes))
1e59de90 186 , _last_accumulated(other._last_accumulated)
20effc67
TL
187{
188}
f67539c2 189
20effc67
TL
190fair_queue::~fair_queue() {
191 for (const auto& fq : _priority_classes) {
192 assert(!fq);
9f95a23c
TL
193 }
194}
195
1e59de90
TL
196void fair_queue::push_priority_class(priority_class_data& pc) noexcept {
197 assert(pc._plugged && !pc._queued);
198 _handles.assert_enough_capacity();
199 _handles.push(&pc);
200 pc._queued = true;
201}
202
203void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept {
20effc67 204 if (!pc._queued) {
1e59de90
TL
205 // Don't let the newcomer monopolize the disk for more than tau
206 // duration. For this estimate how many capacity units can be
207 // accumulated with the current class shares per rate resulution
208 // and scale it up to tau.
209 capacity_t max_deviation = fair_group::fixed_point_factor / pc._shares * fair_group::token_bucket_t::rate_cast(_config.tau).count();
210 // On start this deviation can go to negative values, so not to
211 // introduce extra if's for that short corner case, use signed
212 // arithmetics and make sure the _accumulated value doesn't grow
213 // over signed maximum (see overflow check below)
214 pc._accumulated = std::max<signed_capacity_t>(_last_accumulated - max_deviation, pc._accumulated);
215 _handles.assert_enough_capacity();
20effc67
TL
216 _handles.push(&pc);
217 pc._queued = true;
218 }
9f95a23c
TL
219}
220
1e59de90
TL
221void fair_queue::pop_priority_class(priority_class_data& pc) noexcept {
222 assert(pc._plugged && pc._queued);
20effc67
TL
223 pc._queued = false;
224 _handles.pop();
9f95a23c
TL
225}
226
1e59de90
TL
227void fair_queue::plug_priority_class(priority_class_data& pc) noexcept {
228 assert(!pc._plugged && !pc._queued);
229 pc._plugged = true;
230 if (!pc._queue.empty()) {
231 push_priority_class_from_idle(pc);
20effc67
TL
232 }
233}
234
1e59de90
TL
235void fair_queue::plug_class(class_id cid) noexcept {
236 plug_priority_class(*_priority_classes[cid]);
20effc67
TL
237}
238
1e59de90
TL
239void fair_queue::unplug_priority_class(priority_class_data& pc) noexcept {
240 assert(pc._plugged);
241 if (pc._queued) {
242 pop_priority_class(pc);
20effc67 243 }
1e59de90
TL
244 pc._plugged = false;
245}
20effc67 246
1e59de90
TL
247void fair_queue::unplug_class(class_id cid) noexcept {
248 unplug_priority_class(*_priority_classes[cid]);
249}
250
251auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
252 _group.maybe_replenish_capacity(_group_replenish);
253
254 if (_group.capacity_deficiency(_pending->head)) {
255 return grab_result::pending;
20effc67
TL
256 }
257
1e59de90
TL
258 capacity_t cap = _group.ticket_capacity(ent._ticket);
259 if (cap > _pending->cap) {
260 return grab_result::cant_preempt;
261 }
262
263 if (cap < _pending->cap) {
264 _group.release_capacity(_pending->cap - cap); // FIXME -- replenish right at once?
265 }
266
267 _pending.reset();
268 return grab_result::grabbed;
20effc67
TL
269}
270
1e59de90 271auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
20effc67 272 if (_pending) {
1e59de90 273 return grab_pending_capacity(ent);
20effc67
TL
274 }
275
1e59de90
TL
276 capacity_t cap = _group.ticket_capacity(ent._ticket);
277 capacity_t want_head = _group.grab_capacity(cap);
278 if (_group.capacity_deficiency(want_head)) {
279 _pending.emplace(want_head, cap);
280 return grab_result::pending;
9f95a23c 281 }
20effc67 282
1e59de90 283 return grab_result::grabbed;
9f95a23c
TL
284}
285
20effc67
TL
286void fair_queue::register_priority_class(class_id id, uint32_t shares) {
287 if (id >= _priority_classes.size()) {
288 _priority_classes.resize(id + 1);
289 } else {
290 assert(!_priority_classes[id]);
291 }
292
1e59de90 293 _handles.reserve(_nr_classes + 1);
20effc67 294 _priority_classes[id] = std::make_unique<priority_class_data>(shares);
1e59de90 295 _nr_classes++;
9f95a23c
TL
296}
297
20effc67
TL
298void fair_queue::unregister_priority_class(class_id id) {
299 auto& pclass = _priority_classes[id];
300 assert(pclass && pclass->_queue.empty());
301 pclass.reset();
1e59de90 302 _nr_classes--;
9f95a23c
TL
303}
304
20effc67
TL
305void fair_queue::update_shares_for_class(class_id id, uint32_t shares) {
306 assert(id < _priority_classes.size());
307 auto& pc = _priority_classes[id];
308 assert(pc);
309 pc->update_shares(shares);
9f95a23c
TL
310}
311
312size_t fair_queue::waiters() const {
313 return _requests_queued;
314}
315
316size_t fair_queue::requests_currently_executing() const {
317 return _requests_executing;
318}
319
f67539c2
TL
320fair_queue_ticket fair_queue::resources_currently_waiting() const {
321 return _resources_queued;
322}
323
324fair_queue_ticket fair_queue::resources_currently_executing() const {
325 return _resources_executing;
326}
327
1e59de90 328void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept {
20effc67 329 priority_class_data& pc = *_priority_classes[id];
9f95a23c
TL
330 // We need to return a future in this function on which the caller can wait.
331 // Since we don't know which queue we will use to execute the next request - if ours or
332 // someone else's, we need a separate promise at this point.
1e59de90
TL
333 if (pc._plugged) {
334 push_priority_class_from_idle(pc);
335 }
20effc67
TL
336 pc._queue.push_back(ent);
337 _resources_queued += ent._ticket;
9f95a23c
TL
338 _requests_queued++;
339}
340
20effc67 341void fair_queue::notify_request_finished(fair_queue_ticket desc) noexcept {
f67539c2 342 _resources_executing -= desc;
20effc67 343 _requests_executing--;
1e59de90 344 _group.release_capacity(_group.ticket_capacity(desc));
9f95a23c
TL
345}
346
20effc67
TL
347void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
348 _resources_queued -= ent._ticket;
349 ent._ticket = fair_queue_ticket();
350}
351
1e59de90
TL
352fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept {
353 if (_pending) {
354 /*
355 * We expect the disk to release the ticket within some time,
356 * but it's ... OK if it doesn't -- the pending wait still
357 * needs the head rover value to be ahead of the needed value.
358 *
359 * It may happen that the capacity gets released before we think
360 * it will, in this case we will wait for the full value again,
361 * which's sub-optimal. The expectation is that we think disk
362 * works faster, than it really does.
363 */
364 auto over = _group.capacity_deficiency(_pending->head);
365 auto ticks = _group.capacity_duration(over);
366 return std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::microseconds>(ticks);
367 }
368
369 return std::chrono::steady_clock::time_point::max();
370}
371
20effc67 372void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
1e59de90
TL
373 capacity_t dispatched = 0;
374 boost::container::small_vector<priority_class_ptr, 2> preempt;
375
376 while (!_handles.empty() && (dispatched < _group.maximum_capacity() / smp::count)) {
20effc67
TL
377 priority_class_data& h = *_handles.top();
378 if (h._queue.empty()) {
379 pop_priority_class(h);
380 continue;
381 }
9f95a23c 382
20effc67 383 auto& req = h._queue.front();
1e59de90
TL
384 auto gr = grab_capacity(req);
385 if (gr == grab_result::pending) {
20effc67
TL
386 break;
387 }
388
1e59de90
TL
389 if (gr == grab_result::cant_preempt) {
390 pop_priority_class(h);
391 preempt.emplace_back(&h);
392 continue;
393 }
394
395 _last_accumulated = std::max(h._accumulated, _last_accumulated);
20effc67
TL
396 pop_priority_class(h);
397 h._queue.pop_front();
398
399 _resources_executing += req._ticket;
400 _resources_queued -= req._ticket;
9f95a23c 401 _requests_executing++;
9f95a23c
TL
402 _requests_queued--;
403
1e59de90
TL
404 // Usually the cost of request is tens to hundreeds of thousands. However, for
405 // unrestricted queue it can be as low as 2k. With large enough shares this
406 // has chances to be translated into zero cost which, in turn, will make the
407 // class show no progress and monopolize the queue.
408 auto req_cap = _group.ticket_capacity(req._ticket);
409 auto req_cost = std::max(req_cap / h._shares, (capacity_t)1);
410 // signed overflow check to make push_priority_class_from_idle math work
411 if (h._accumulated >= std::numeric_limits<signed_capacity_t>::max() - req_cost) {
412 for (auto& pc : _priority_classes) {
413 if (pc) {
414 if (pc->_queued) {
415 pc->_accumulated -= h._accumulated;
416 } else { // this includes h
417 pc->_accumulated = 0;
418 }
419 }
420 }
421 _last_accumulated = 0;
9f95a23c 422 }
1e59de90
TL
423 h._accumulated += req_cost;
424 h._pure_accumulated += req_cap;
9f95a23c 425
1e59de90
TL
426 dispatched += _group.ticket_capacity(req._ticket);
427 cb(req);
428
429 if (h._plugged && !h._queue.empty()) {
9f95a23c
TL
430 push_priority_class(h);
431 }
1e59de90 432 }
20effc67 433
1e59de90
TL
434 for (auto&& h : preempt) {
435 push_priority_class(*h);
9f95a23c
TL
436 }
437}
438
1e59de90
TL
439std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) {
440 namespace sm = seastar::metrics;
441 priority_class_data& pc = *_priority_classes[c];
442 return std::vector<sm::impl::metric_definition_impl>({
443 sm::make_counter("consumption",
444 [&pc] { return fair_group::capacity_tokens(pc._pure_accumulated); },
445 sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")),
446 sm::make_counter("adjusted_consumption",
447 [&pc] { return fair_group::capacity_tokens(pc._accumulated); },
448 sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")),
449 });
450}
451
9f95a23c 452}