]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/fair_queue.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / src / core / fair_queue.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #include <boost/container/small_vector.hpp>
23 #include <boost/intrusive/parent_from_member.hpp>
24 #include <seastar/core/fair_queue.hh>
25 #include <seastar/core/future.hh>
26 #include <seastar/core/shared_ptr.hh>
27 #include <seastar/core/circular_buffer.hh>
28 #include <seastar/util/noncopyable_function.hh>
29 #include <seastar/core/reactor.hh>
30 #include <seastar/core/metrics.hh>
31 #include <queue>
32 #include <chrono>
33 #include <unordered_set>
34
35 #include "fmt/format.h"
36 #include "fmt/ostream.h"
37
38 namespace seastar {
39
40 static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
41 static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
42 static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
43
44 fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
45 : _weight(weight)
46 , _size(size)
47 {}
48
49 float fair_queue_ticket::normalize(fair_queue_ticket denominator) const noexcept {
50 return float(_weight) / denominator._weight + float(_size) / denominator._size;
51 }
52
53 fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const noexcept {
54 return fair_queue_ticket(_weight + desc._weight, _size + desc._size);
55 }
56
57 fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) noexcept {
58 _weight += desc._weight;
59 _size += desc._size;
60 return *this;
61 }
62
63 fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const noexcept {
64 return fair_queue_ticket(_weight - desc._weight, _size - desc._size);
65 }
66
67 fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) noexcept {
68 _weight -= desc._weight;
69 _size -= desc._size;
70 return *this;
71 }
72
73 fair_queue_ticket::operator bool() const noexcept {
74 return (_weight > 0) || (_size > 0);
75 }
76
77 bool fair_queue_ticket::is_non_zero() const noexcept {
78 return (_weight > 0) && (_size > 0);
79 }
80
81 bool fair_queue_ticket::operator==(const fair_queue_ticket& o) const noexcept {
82 return _weight == o._weight && _size == o._size;
83 }
84
85 std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) {
86 return os << t._weight << ":" << t._size;
87 }
88
89 fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept {
90 return fair_queue_ticket(std::max<int32_t>(a._weight - b._weight, 0),
91 std::max<int32_t>(a._size - b._size, 0));
92 }
93
94 fair_group::fair_group(config cfg)
95 : _cost_capacity(cfg.weight_rate / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(), cfg.size_rate / token_bucket_t::rate_cast(std::chrono::seconds(1)).count())
96 , _token_bucket(cfg.rate_factor * fixed_point_factor,
97 std::max<capacity_t>(cfg.rate_factor * fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), ticket_capacity(fair_queue_ticket(cfg.limit_min_weight, cfg.limit_min_size))),
98 ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size))
99 )
100 {
101 assert(_cost_capacity.is_non_zero());
102 seastar_logger.info("Created fair group {}, capacity rate {}, limit {}, rate {} (factor {}), threshold {}", cfg.label,
103 _cost_capacity, _token_bucket.limit(), _token_bucket.rate(), cfg.rate_factor, _token_bucket.threshold());
104
105 if (cfg.rate_factor * fixed_point_factor > _token_bucket.max_rate) {
106 throw std::runtime_error("Fair-group rate_factor is too large");
107 }
108
109 if (ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size)) > _token_bucket.threshold()) {
110 throw std::runtime_error("Fair-group replenisher limit is lower than threshold");
111 }
112 }
113
114 auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t {
115 assert(cap <= _token_bucket.limit());
116 return _token_bucket.grab(cap);
117 }
118
119 void fair_group::release_capacity(capacity_t cap) noexcept {
120 _token_bucket.release(cap);
121 }
122
123 void fair_group::replenish_capacity(clock_type::time_point now) noexcept {
124 _token_bucket.replenish(now);
125 }
126
127 void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept {
128 auto now = clock_type::now();
129 auto extra = _token_bucket.accumulated_in(now - local_ts);
130
131 if (extra >= _token_bucket.threshold()) {
132 local_ts = now;
133 replenish_capacity(now);
134 }
135 }
136
137 auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t {
138 return _token_bucket.deficiency(from);
139 }
140
141 auto fair_group::ticket_capacity(fair_queue_ticket t) const noexcept -> capacity_t {
142 return t.normalize(_cost_capacity) * fixed_point_factor;
143 }
144
145 // Priority class, to be used with a given fair_queue
146 class fair_queue::priority_class_data {
147 friend class fair_queue;
148 uint32_t _shares = 0;
149 capacity_t _accumulated = 0;
150 capacity_t _pure_accumulated = 0;
151 fair_queue_entry::container_list_t _queue;
152 bool _queued = false;
153 bool _plugged = true;
154
155 public:
156 explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
157 priority_class_data(const priority_class_data&) = delete;
158 priority_class_data(priority_class_data&&) = delete;
159
160 void update_shares(uint32_t shares) noexcept {
161 _shares = (std::max(shares, 1u));
162 }
163 };
164
165 bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept {
166 return lhs->_accumulated > rhs->_accumulated;
167 }
168
169 fair_queue::fair_queue(fair_group& group, config cfg)
170 : _config(std::move(cfg))
171 , _group(group)
172 , _group_replenish(clock_type::now())
173 {
174 }
175
176 fair_queue::fair_queue(fair_queue&& other)
177 : _config(std::move(other._config))
178 , _group(other._group)
179 , _group_replenish(std::move(other._group_replenish))
180 , _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{}))
181 , _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{}))
182 , _requests_executing(std::exchange(other._requests_executing, 0))
183 , _requests_queued(std::exchange(other._requests_queued, 0))
184 , _handles(std::move(other._handles))
185 , _priority_classes(std::move(other._priority_classes))
186 , _last_accumulated(other._last_accumulated)
187 {
188 }
189
190 fair_queue::~fair_queue() {
191 for (const auto& fq : _priority_classes) {
192 assert(!fq);
193 }
194 }
195
196 void fair_queue::push_priority_class(priority_class_data& pc) noexcept {
197 assert(pc._plugged && !pc._queued);
198 _handles.assert_enough_capacity();
199 _handles.push(&pc);
200 pc._queued = true;
201 }
202
203 void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept {
204 if (!pc._queued) {
205 // Don't let the newcomer monopolize the disk for more than tau
206 // duration. For this estimate how many capacity units can be
207 // accumulated with the current class shares per rate resulution
208 // and scale it up to tau.
209 capacity_t max_deviation = fair_group::fixed_point_factor / pc._shares * fair_group::token_bucket_t::rate_cast(_config.tau).count();
210 // On start this deviation can go to negative values, so not to
211 // introduce extra if's for that short corner case, use signed
212 // arithmetics and make sure the _accumulated value doesn't grow
213 // over signed maximum (see overflow check below)
214 pc._accumulated = std::max<signed_capacity_t>(_last_accumulated - max_deviation, pc._accumulated);
215 _handles.assert_enough_capacity();
216 _handles.push(&pc);
217 pc._queued = true;
218 }
219 }
220
221 void fair_queue::pop_priority_class(priority_class_data& pc) noexcept {
222 assert(pc._plugged && pc._queued);
223 pc._queued = false;
224 _handles.pop();
225 }
226
227 void fair_queue::plug_priority_class(priority_class_data& pc) noexcept {
228 assert(!pc._plugged && !pc._queued);
229 pc._plugged = true;
230 if (!pc._queue.empty()) {
231 push_priority_class_from_idle(pc);
232 }
233 }
234
235 void fair_queue::plug_class(class_id cid) noexcept {
236 plug_priority_class(*_priority_classes[cid]);
237 }
238
239 void fair_queue::unplug_priority_class(priority_class_data& pc) noexcept {
240 assert(pc._plugged);
241 if (pc._queued) {
242 pop_priority_class(pc);
243 }
244 pc._plugged = false;
245 }
246
247 void fair_queue::unplug_class(class_id cid) noexcept {
248 unplug_priority_class(*_priority_classes[cid]);
249 }
250
251 auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
252 _group.maybe_replenish_capacity(_group_replenish);
253
254 if (_group.capacity_deficiency(_pending->head)) {
255 return grab_result::pending;
256 }
257
258 capacity_t cap = _group.ticket_capacity(ent._ticket);
259 if (cap > _pending->cap) {
260 return grab_result::cant_preempt;
261 }
262
263 if (cap < _pending->cap) {
264 _group.release_capacity(_pending->cap - cap); // FIXME -- replenish right at once?
265 }
266
267 _pending.reset();
268 return grab_result::grabbed;
269 }
270
271 auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result {
272 if (_pending) {
273 return grab_pending_capacity(ent);
274 }
275
276 capacity_t cap = _group.ticket_capacity(ent._ticket);
277 capacity_t want_head = _group.grab_capacity(cap);
278 if (_group.capacity_deficiency(want_head)) {
279 _pending.emplace(want_head, cap);
280 return grab_result::pending;
281 }
282
283 return grab_result::grabbed;
284 }
285
286 void fair_queue::register_priority_class(class_id id, uint32_t shares) {
287 if (id >= _priority_classes.size()) {
288 _priority_classes.resize(id + 1);
289 } else {
290 assert(!_priority_classes[id]);
291 }
292
293 _handles.reserve(_nr_classes + 1);
294 _priority_classes[id] = std::make_unique<priority_class_data>(shares);
295 _nr_classes++;
296 }
297
298 void fair_queue::unregister_priority_class(class_id id) {
299 auto& pclass = _priority_classes[id];
300 assert(pclass && pclass->_queue.empty());
301 pclass.reset();
302 _nr_classes--;
303 }
304
305 void fair_queue::update_shares_for_class(class_id id, uint32_t shares) {
306 assert(id < _priority_classes.size());
307 auto& pc = _priority_classes[id];
308 assert(pc);
309 pc->update_shares(shares);
310 }
311
312 size_t fair_queue::waiters() const {
313 return _requests_queued;
314 }
315
316 size_t fair_queue::requests_currently_executing() const {
317 return _requests_executing;
318 }
319
320 fair_queue_ticket fair_queue::resources_currently_waiting() const {
321 return _resources_queued;
322 }
323
324 fair_queue_ticket fair_queue::resources_currently_executing() const {
325 return _resources_executing;
326 }
327
328 void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept {
329 priority_class_data& pc = *_priority_classes[id];
330 // We need to return a future in this function on which the caller can wait.
331 // Since we don't know which queue we will use to execute the next request - if ours or
332 // someone else's, we need a separate promise at this point.
333 if (pc._plugged) {
334 push_priority_class_from_idle(pc);
335 }
336 pc._queue.push_back(ent);
337 _resources_queued += ent._ticket;
338 _requests_queued++;
339 }
340
341 void fair_queue::notify_request_finished(fair_queue_ticket desc) noexcept {
342 _resources_executing -= desc;
343 _requests_executing--;
344 _group.release_capacity(_group.ticket_capacity(desc));
345 }
346
347 void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
348 _resources_queued -= ent._ticket;
349 ent._ticket = fair_queue_ticket();
350 }
351
352 fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept {
353 if (_pending) {
354 /*
355 * We expect the disk to release the ticket within some time,
356 * but it's ... OK if it doesn't -- the pending wait still
357 * needs the head rover value to be ahead of the needed value.
358 *
359 * It may happen that the capacity gets released before we think
360 * it will, in this case we will wait for the full value again,
361 * which's sub-optimal. The expectation is that we think disk
362 * works faster, than it really does.
363 */
364 auto over = _group.capacity_deficiency(_pending->head);
365 auto ticks = _group.capacity_duration(over);
366 return std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::microseconds>(ticks);
367 }
368
369 return std::chrono::steady_clock::time_point::max();
370 }
371
372 void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
373 capacity_t dispatched = 0;
374 boost::container::small_vector<priority_class_ptr, 2> preempt;
375
376 while (!_handles.empty() && (dispatched < _group.maximum_capacity() / smp::count)) {
377 priority_class_data& h = *_handles.top();
378 if (h._queue.empty()) {
379 pop_priority_class(h);
380 continue;
381 }
382
383 auto& req = h._queue.front();
384 auto gr = grab_capacity(req);
385 if (gr == grab_result::pending) {
386 break;
387 }
388
389 if (gr == grab_result::cant_preempt) {
390 pop_priority_class(h);
391 preempt.emplace_back(&h);
392 continue;
393 }
394
395 _last_accumulated = std::max(h._accumulated, _last_accumulated);
396 pop_priority_class(h);
397 h._queue.pop_front();
398
399 _resources_executing += req._ticket;
400 _resources_queued -= req._ticket;
401 _requests_executing++;
402 _requests_queued--;
403
404 // Usually the cost of request is tens to hundreeds of thousands. However, for
405 // unrestricted queue it can be as low as 2k. With large enough shares this
406 // has chances to be translated into zero cost which, in turn, will make the
407 // class show no progress and monopolize the queue.
408 auto req_cap = _group.ticket_capacity(req._ticket);
409 auto req_cost = std::max(req_cap / h._shares, (capacity_t)1);
410 // signed overflow check to make push_priority_class_from_idle math work
411 if (h._accumulated >= std::numeric_limits<signed_capacity_t>::max() - req_cost) {
412 for (auto& pc : _priority_classes) {
413 if (pc) {
414 if (pc->_queued) {
415 pc->_accumulated -= h._accumulated;
416 } else { // this includes h
417 pc->_accumulated = 0;
418 }
419 }
420 }
421 _last_accumulated = 0;
422 }
423 h._accumulated += req_cost;
424 h._pure_accumulated += req_cap;
425
426 dispatched += _group.ticket_capacity(req._ticket);
427 cb(req);
428
429 if (h._plugged && !h._queue.empty()) {
430 push_priority_class(h);
431 }
432 }
433
434 for (auto&& h : preempt) {
435 push_priority_class(*h);
436 }
437 }
438
439 std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) {
440 namespace sm = seastar::metrics;
441 priority_class_data& pc = *_priority_classes[c];
442 return std::vector<sm::impl::metric_definition_impl>({
443 sm::make_counter("consumption",
444 [&pc] { return fair_group::capacity_tokens(pc._pure_accumulated); },
445 sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")),
446 sm::make_counter("adjusted_consumption",
447 [&pc] { return fair_group::capacity_tokens(pc._accumulated); },
448 sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")),
449 });
450 }
451
452 }