]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/fair_queue.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / fair_queue.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19/*
20 * Copyright (C) 2016 ScyllaDB
21 */
22#pragma once
23
20effc67 24#include <boost/intrusive/slist.hpp>
1e59de90 25#include <seastar/core/sstring.hh>
11fdf7f2 26#include <seastar/core/shared_ptr.hh>
11fdf7f2 27#include <seastar/core/circular_buffer.hh>
1e59de90
TL
28#include <seastar/core/metrics_registration.hh>
29#include <seastar/util/shared_token_bucket.hh>
20effc67 30#include <functional>
11fdf7f2 31#include <queue>
11fdf7f2
TL
32#include <chrono>
33#include <unordered_set>
20effc67
TL
34#include <optional>
35
36namespace bi = boost::intrusive;
11fdf7f2
TL
37
38namespace seastar {
39
f67539c2
TL
40/// \brief describes a request that passes through the \ref fair_queue.
41///
42/// A ticket is specified by a \c weight and a \c size. For example, one can specify a request of \c weight
43/// 1 and \c size 16kB. If the \ref fair_queue accepts one such request per second, it will sustain 1 IOPS
44/// at 16kB/s bandwidth.
11fdf7f2
TL
45///
46/// \related fair_queue
f67539c2
TL
47class fair_queue_ticket {
48 uint32_t _weight = 0; ///< the total weight of these requests for capacity purposes (IOPS).
49 uint32_t _size = 0; ///< the total effective size of these requests
50public:
51 /// Constructs a fair_queue_ticket with a given \c weight and a given \c size
52 ///
53 /// \param weight the weight of the request
54 /// \param size the size of the request
20effc67
TL
55 fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
56 fair_queue_ticket() noexcept {}
57 fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
58 fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
f67539c2
TL
59 /// Increase the quantity represented in this ticket by the amount represented by \c desc
60 /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be added to this one
20effc67 61 fair_queue_ticket& operator+=(fair_queue_ticket desc) noexcept;
f67539c2
TL
62 /// Decreases the quantity represented in this ticket by the amount represented by \c desc
63 /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be decremented from this one
20effc67
TL
64 fair_queue_ticket& operator-=(fair_queue_ticket desc) noexcept;
65 /// Checks if the tickets fully equals to another one
66 /// \param desc another \ref fair_queue_ticket to compare with
67 bool operator==(const fair_queue_ticket& desc) const noexcept;
f67539c2 68
f67539c2
TL
69 /// \returns true if the fair_queue_ticket represents a non-zero quantity.
70 ///
71 /// For a fair_queue ticket to be non-zero, at least one of its represented quantities need to
72 /// be non-zero
20effc67 73 explicit operator bool() const noexcept;
1e59de90 74 bool is_non_zero() const noexcept;
f67539c2
TL
75
76 friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
77
78 /// \returns the normalized value of this \ref fair_queue_ticket along a base axis
79 ///
80 /// The normalization function itself is an implementation detail, but one can expect either weight or
81 /// size to have more or less relative importance depending on which of the dimensions in the
82 /// denominator is relatively higher. For example, given this request a, and two other requests
83 /// b and c, such that that c has the same \c weight but a higher \c size than b, one can expect
84 /// the \c size component of this request to play a larger role.
85 ///
86 /// It is legal for the numerator to have one of the quantities set to zero, in which case only
87 /// the other quantity is taken into consideration.
88 ///
89 /// It is however not legal for the axis to have any quantity set to zero.
90 /// \param axis another \ref fair_queue_ticket to be used as a a base vector against which to normalize this fair_queue_ticket.
20effc67 91 float normalize(fair_queue_ticket axis) const noexcept;
20effc67
TL
92
93 /*
1e59de90
TL
94 * For both dimentions checks if the first rover is ahead of the
95 * second and returns the difference. If behind returns zero.
20effc67 96 */
1e59de90 97 friend fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept;
11fdf7f2
TL
98};
99
100/// \addtogroup io-module
101/// @{
102
20effc67 103class fair_queue_entry {
11fdf7f2 104 friend class fair_queue;
11fdf7f2 105
20effc67
TL
106 fair_queue_ticket _ticket;
107 bi::slist_member_hook<> _hook;
11fdf7f2 108
11fdf7f2 109public:
20effc67
TL
110 fair_queue_entry(fair_queue_ticket t) noexcept
111 : _ticket(std::move(t)) {}
112 using container_list_t = bi::slist<fair_queue_entry,
113 bi::constant_time_size<false>,
114 bi::cache_last<true>,
115 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
116
117 fair_queue_ticket ticket() const noexcept { return _ticket; }
11fdf7f2 118};
11fdf7f2 119
20effc67 120/// \brief Group of queues class
11fdf7f2 121///
20effc67
TL
122/// This is a fair group. It's attached by one or mode fair queues. On machines having the
123/// big* amount of shards, queues use the group to borrow/lend the needed capacity for
124/// requests dispatching.
11fdf7f2 125///
20effc67
TL
126/// * Big means that when all shards sumbit requests alltogether the disk is unable to
127/// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
128/// cope with the number of arriving requests, or the total size of the data withing
129/// the given time frame exceeds the disk throughput.
130class fair_group {
1e59de90
TL
131public:
132 using capacity_t = uint64_t;
133 using clock_type = std::chrono::steady_clock;
20effc67 134
1e59de90
TL
135 /*
136 * tldr; The math
137 *
138 * Bw, Br -- write/read bandwidth (bytes per second)
139 * Ow, Or -- write/read iops (ops per second)
140 *
141 * xx_max -- their maximum values (configured)
142 *
143 * Throttling formula:
144 *
145 * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K
146 *
147 * where K is the scalar value <= 1.0 (also configured)
148 *
149 * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e.
150 * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into
151 *
152 * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K
153 *
154 * Fair queue tickets are {w, s} weight-size pairs that are
155 *
156 * s = read_base_count * br, for reads
157 * Br_max/Bw_max * read_base_count * bw, for writes
158 *
159 * w = read_base_count, for reads
160 * Or_max/Ow_max * read_base_count, for writes
161 *
162 * Thus the formula turns into
163 *
164 * d(sum(w/W + s/S))/dr <= K
165 *
166 * where {w, s} is the ticket value if a request and sum summarizes the
167 * ticket values from all the requests seen so far, {W, S} is the ticket
168 * value that corresonds to a virtual summary of Or_max requests of
169 * Br_max size total.
170 */
171
172 /*
173 * The normalization results in a float of the 2^-30 seconds order of
174 * magnitude. Not to invent float point atomic arithmetics, the result
175 * is converted to an integer by multiplying by a factor that's large
176 * enough to turn these values into a non-zero integer.
177 *
178 * Also, the rates in bytes/sec when adjusted by io-queue according to
179 * multipliers become too large to be stored in 32-bit ticket value.
180 * Thus the rate resolution is applied. The t.bucket is configured with a
181 * time period for which the speeds from F (in above formula) are taken.
182 */
183
184 static constexpr float fixed_point_factor = float(1 << 24);
185 using rate_resolution = std::milli;
186 using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::yes>;
187
188private:
189
190 /*
191 * The dF/dt <= K limitation is managed by the modified token bucket
192 * algo where tokens are ticket.normalize(cost_capacity), the refill
193 * rate is K.
194 *
195 * The token bucket algo must have the limit on the number of tokens
196 * accumulated. Here it's configured so that it accumulates for the
197 * latency_goal duration.
198 *
199 * The replenish threshold is the minimal number of tokens to put back.
200 * It's reserved for future use to reduce the load on the replenish
201 * timestamp.
202 *
203 * The timestamp, in turn, is the time when the bucket was replenished
204 * last. Every time a shard tries to get tokens from bucket it first
205 * tries to convert the time that had passed since this timestamp
206 * into more tokens in the bucket.
207 */
208
209 const fair_queue_ticket _cost_capacity;
210 token_bucket_t _token_bucket;
20effc67
TL
211
212public:
1e59de90
TL
213
214 // Convert internal capacity value back into the real token
215 static double capacity_tokens(capacity_t cap) noexcept {
216 return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count();
217 }
218
219 auto capacity_duration(capacity_t cap) const noexcept {
220 return _token_bucket.duration_for(cap);
221 }
222
20effc67 223 struct config {
1e59de90
TL
224 sstring label = "";
225 /*
226 * There are two "min" values that can be configured. The former one
227 * is the minimal weight:size pair that the upper layer is going to
228 * submit. However, it can submit _larger_ values, and the fair queue
229 * must accept those as large as the latter pair (but it can accept
230 * even larger values, of course)
231 */
232 unsigned min_weight = 0;
233 unsigned min_size = 0;
234 unsigned limit_min_weight = 0;
235 unsigned limit_min_size = 0;
236 unsigned long weight_rate;
237 unsigned long size_rate;
238 float rate_factor = 1.0;
239 std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1);
20effc67 240 };
1e59de90
TL
241
242 explicit fair_group(config cfg);
20effc67
TL
243 fair_group(fair_group&&) = delete;
244
1e59de90
TL
245 fair_queue_ticket cost_capacity() const noexcept { return _cost_capacity; }
246 capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); }
247 capacity_t grab_capacity(capacity_t cap) noexcept;
248 clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); }
249 void release_capacity(capacity_t cap) noexcept;
250 void replenish_capacity(clock_type::time_point now) noexcept;
251 void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept;
20effc67 252
1e59de90
TL
253 capacity_t capacity_deficiency(capacity_t from) const noexcept;
254 capacity_t ticket_capacity(fair_queue_ticket ticket) const noexcept;
255
256 std::chrono::duration<double> rate_limit_duration() const noexcept {
257 std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate());
258 return std::chrono::duration_cast<std::chrono::duration<double>>(dur);
20effc67
TL
259 }
260};
11fdf7f2
TL
261
262/// \brief Fair queuing class
263///
264/// This is a fair queue, allowing multiple request producers to queue requests
265/// that will then be served proportionally to their classes' shares.
266///
267/// To each request, a weight can also be associated. A request of weight 1 will consume
268/// 1 share. Higher weights for a request will consume a proportionally higher amount of
269/// shares.
270///
20effc67 271/// The user of this interface is expected to register multiple `priority_class_data`
11fdf7f2
TL
272/// objects, which will each have a shares attribute.
273///
274/// Internally, each priority class may keep a separate queue of requests.
275/// Requests pertaining to a class can go through even if they are over its
276/// share limit, provided that the other classes have empty queues.
277///
278/// When the classes that lag behind start seeing requests, the fair queue will serve
279/// them first, until balance is restored. This balancing is expected to happen within
280/// a certain time window that obeys an exponential decay.
281class fair_queue {
282public:
283 /// \brief Fair Queue configuration structure.
284 ///
285 /// \sets the operation parameters of a \ref fair_queue
286 /// \related fair_queue
287 struct config {
1e59de90 288 sstring label = "";
20effc67 289 std::chrono::microseconds tau = std::chrono::milliseconds(5);
11fdf7f2 290 };
11fdf7f2 291
20effc67
TL
292 using class_id = unsigned int;
293 class priority_class_data;
1e59de90
TL
294 using capacity_t = fair_group::capacity_t;
295 using signed_capacity_t = std::make_signed<capacity_t>::type;
20effc67
TL
296
297private:
1e59de90 298 using clock_type = std::chrono::steady_clock;
20effc67 299 using priority_class_ptr = priority_class_data*;
11fdf7f2 300 struct class_compare {
20effc67 301 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
11fdf7f2
TL
302 };
303
1e59de90
TL
304 class priority_queue : public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> {
305 using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
306 public:
307 void reserve(size_t len) {
308 c.reserve(len);
309 }
310
311 void assert_enough_capacity() const noexcept {
312 assert(c.size() < c.capacity());
313 }
314 };
315
11fdf7f2 316 config _config;
20effc67 317 fair_group& _group;
1e59de90 318 clock_type::time_point _group_replenish;
f67539c2
TL
319 fair_queue_ticket _resources_executing;
320 fair_queue_ticket _resources_queued;
11fdf7f2 321 unsigned _requests_executing = 0;
11fdf7f2 322 unsigned _requests_queued = 0;
1e59de90 323 priority_queue _handles;
20effc67 324 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
1e59de90
TL
325 size_t _nr_classes = 0;
326 capacity_t _last_accumulated = 0;
20effc67
TL
327
328 /*
329 * When the shared capacity os over the local queue delays
330 * further dispatching untill better times
331 *
1e59de90 332 * \head -- the value group head rover is expected to cross
20effc67
TL
333 * \cap -- the capacity that's accounted on the group
334 *
335 * The last field is needed to "rearm" the wait in case
336 * queue decides that it wants to dispatch another capacity
337 * in the middle of the waiting
338 */
339 struct pending {
1e59de90
TL
340 capacity_t head;
341 capacity_t cap;
20effc67 342
1e59de90 343 pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {}
20effc67 344 };
11fdf7f2 345
20effc67 346 std::optional<pending> _pending;
11fdf7f2 347
1e59de90
TL
348 void push_priority_class(priority_class_data& pc) noexcept;
349 void push_priority_class_from_idle(priority_class_data& pc) noexcept;
350 void pop_priority_class(priority_class_data& pc) noexcept;
351 void plug_priority_class(priority_class_data& pc) noexcept;
352 void unplug_priority_class(priority_class_data& pc) noexcept;
11fdf7f2 353
1e59de90
TL
354 enum class grab_result { grabbed, cant_preempt, pending };
355 grab_result grab_capacity(const fair_queue_entry& ent) noexcept;
356 grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept;
11fdf7f2
TL
357public:
358 /// Constructs a fair queue with configuration parameters \c cfg.
359 ///
360 /// \param cfg an instance of the class \ref config
20effc67
TL
361 explicit fair_queue(fair_group& shared, config cfg);
362 fair_queue(fair_queue&&);
363 ~fair_queue();
11fdf7f2 364
1e59de90
TL
365 sstring label() const noexcept { return _config.label; }
366
11fdf7f2
TL
367 /// Registers a priority class against this fair queue.
368 ///
f67539c2 369 /// \param shares how many shares to create this class with
20effc67 370 void register_priority_class(class_id c, uint32_t shares);
11fdf7f2
TL
371
372 /// Unregister a priority class.
373 ///
374 /// It is illegal to unregister a priority class that still have pending requests.
20effc67
TL
375 void unregister_priority_class(class_id c);
376
377 void update_shares_for_class(class_id c, uint32_t new_shares);
11fdf7f2
TL
378
379 /// \return how many waiters are currently queued for all classes.
f67539c2 380 [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
9f95a23c 381 size_t waiters() const;
11fdf7f2
TL
382
383 /// \return the number of requests currently executing
f67539c2 384 [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
9f95a23c 385 size_t requests_currently_executing() const;
11fdf7f2 386
f67539c2
TL
387 /// \return how much resources (weight, size) are currently queued for all classes.
388 fair_queue_ticket resources_currently_waiting() const;
389
390 /// \return the amount of resources (weight, size) currently executing
391 fair_queue_ticket resources_currently_executing() const;
392
20effc67 393 /// Queue the entry \c ent through this class' \ref fair_queue
11fdf7f2
TL
394 ///
395 /// The user of this interface is supposed to call \ref notify_requests_finished when the
396 /// request finishes executing - regardless of success or failure.
1e59de90
TL
397 void queue(class_id c, fair_queue_entry& ent) noexcept;
398
399 void plug_class(class_id c) noexcept;
400 void unplug_class(class_id c) noexcept;
11fdf7f2
TL
401
402 /// Notifies that ont request finished
f67539c2 403 /// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
20effc67
TL
404 void notify_request_finished(fair_queue_ticket desc) noexcept;
405 void notify_request_cancelled(fair_queue_entry& ent) noexcept;
11fdf7f2
TL
406
407 /// Try to execute new requests if there is capacity left in the queue.
20effc67
TL
408 void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
409
1e59de90 410 clock_type::time_point next_pending_aio() const noexcept;
20effc67 411
1e59de90 412 std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c);
11fdf7f2
TL
413};
414/// @}
415
416}
1e59de90
TL
417
418#if FMT_VERSION >= 90000
419template <> struct fmt::formatter<seastar::fair_queue_ticket> : fmt::ostream_formatter {};
420#endif