]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | ||
19 | /* | |
20 | * Copyright (C) 2016 ScyllaDB | |
21 | */ | |
22 | #pragma once | |
23 | ||
20effc67 | 24 | #include <boost/intrusive/slist.hpp> |
1e59de90 | 25 | #include <seastar/core/sstring.hh> |
11fdf7f2 | 26 | #include <seastar/core/shared_ptr.hh> |
11fdf7f2 | 27 | #include <seastar/core/circular_buffer.hh> |
1e59de90 TL |
28 | #include <seastar/core/metrics_registration.hh> |
29 | #include <seastar/util/shared_token_bucket.hh> | |
20effc67 | 30 | #include <functional> |
11fdf7f2 | 31 | #include <queue> |
11fdf7f2 TL |
32 | #include <chrono> |
33 | #include <unordered_set> | |
20effc67 TL |
34 | #include <optional> |
35 | ||
36 | namespace bi = boost::intrusive; | |
11fdf7f2 TL |
37 | |
38 | namespace seastar { | |
39 | ||
f67539c2 TL |
40 | /// \brief describes a request that passes through the \ref fair_queue. |
41 | /// | |
42 | /// A ticket is specified by a \c weight and a \c size. For example, one can specify a request of \c weight | |
43 | /// 1 and \c size 16kB. If the \ref fair_queue accepts one such request per second, it will sustain 1 IOPS | |
44 | /// at 16kB/s bandwidth. | |
11fdf7f2 TL |
45 | /// |
46 | /// \related fair_queue | |
f67539c2 TL |
47 | class fair_queue_ticket { |
48 | uint32_t _weight = 0; ///< the total weight of these requests for capacity purposes (IOPS). | |
49 | uint32_t _size = 0; ///< the total effective size of these requests | |
50 | public: | |
51 | /// Constructs a fair_queue_ticket with a given \c weight and a given \c size | |
52 | /// | |
53 | /// \param weight the weight of the request | |
54 | /// \param size the size of the request | |
20effc67 TL |
55 | fair_queue_ticket(uint32_t weight, uint32_t size) noexcept; |
56 | fair_queue_ticket() noexcept {} | |
57 | fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept; | |
58 | fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept; | |
f67539c2 TL |
59 | /// Increase the quantity represented in this ticket by the amount represented by \c desc |
60 | /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be added to this one | |
20effc67 | 61 | fair_queue_ticket& operator+=(fair_queue_ticket desc) noexcept; |
f67539c2 TL |
62 | /// Decreases the quantity represented in this ticket by the amount represented by \c desc |
63 | /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be decremented from this one | |
20effc67 TL |
64 | fair_queue_ticket& operator-=(fair_queue_ticket desc) noexcept; |
65 | /// Checks if the tickets fully equals to another one | |
66 | /// \param desc another \ref fair_queue_ticket to compare with | |
67 | bool operator==(const fair_queue_ticket& desc) const noexcept; | |
f67539c2 | 68 | |
f67539c2 TL |
69 | /// \returns true if the fair_queue_ticket represents a non-zero quantity. |
70 | /// | |
71 | /// For a fair_queue ticket to be non-zero, at least one of its represented quantities need to | |
72 | /// be non-zero | |
20effc67 | 73 | explicit operator bool() const noexcept; |
1e59de90 | 74 | bool is_non_zero() const noexcept; |
f67539c2 TL |
75 | |
76 | friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t); | |
77 | ||
78 | /// \returns the normalized value of this \ref fair_queue_ticket along a base axis | |
79 | /// | |
80 | /// The normalization function itself is an implementation detail, but one can expect either weight or | |
81 | /// size to have more or less relative importance depending on which of the dimensions in the | |
82 | /// denominator is relatively higher. For example, given this request a, and two other requests | |
83 | /// b and c, such that that c has the same \c weight but a higher \c size than b, one can expect | |
84 | /// the \c size component of this request to play a larger role. | |
85 | /// | |
86 | /// It is legal for the numerator to have one of the quantities set to zero, in which case only | |
87 | /// the other quantity is taken into consideration. | |
88 | /// | |
89 | /// It is however not legal for the axis to have any quantity set to zero. | |
90 | /// \param axis another \ref fair_queue_ticket to be used as a a base vector against which to normalize this fair_queue_ticket. | |
20effc67 | 91 | float normalize(fair_queue_ticket axis) const noexcept; |
20effc67 TL |
92 | |
93 | /* | |
1e59de90 TL |
94 | * For both dimentions checks if the first rover is ahead of the |
95 | * second and returns the difference. If behind returns zero. | |
20effc67 | 96 | */ |
1e59de90 | 97 | friend fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept; |
11fdf7f2 TL |
98 | }; |
99 | ||
100 | /// \addtogroup io-module | |
101 | /// @{ | |
102 | ||
20effc67 | 103 | class fair_queue_entry { |
11fdf7f2 | 104 | friend class fair_queue; |
11fdf7f2 | 105 | |
20effc67 TL |
106 | fair_queue_ticket _ticket; |
107 | bi::slist_member_hook<> _hook; | |
11fdf7f2 | 108 | |
11fdf7f2 | 109 | public: |
20effc67 TL |
110 | fair_queue_entry(fair_queue_ticket t) noexcept |
111 | : _ticket(std::move(t)) {} | |
112 | using container_list_t = bi::slist<fair_queue_entry, | |
113 | bi::constant_time_size<false>, | |
114 | bi::cache_last<true>, | |
115 | bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>; | |
116 | ||
117 | fair_queue_ticket ticket() const noexcept { return _ticket; } | |
11fdf7f2 | 118 | }; |
11fdf7f2 | 119 | |
20effc67 | 120 | /// \brief Group of queues class |
11fdf7f2 | 121 | /// |
20effc67 TL |
122 | /// This is a fair group. It's attached by one or mode fair queues. On machines having the |
123 | /// big* amount of shards, queues use the group to borrow/lend the needed capacity for | |
124 | /// requests dispatching. | |
11fdf7f2 | 125 | /// |
20effc67 TL |
126 | /// * Big means that when all shards sumbit requests alltogether the disk is unable to |
127 | /// dispatch them efficiently. The inability can be of two kinds -- either disk cannot | |
128 | /// cope with the number of arriving requests, or the total size of the data withing | |
129 | /// the given time frame exceeds the disk throughput. | |
130 | class fair_group { | |
1e59de90 TL |
131 | public: |
132 | using capacity_t = uint64_t; | |
133 | using clock_type = std::chrono::steady_clock; | |
20effc67 | 134 | |
1e59de90 TL |
135 | /* |
136 | * tldr; The math | |
137 | * | |
138 | * Bw, Br -- write/read bandwidth (bytes per second) | |
139 | * Ow, Or -- write/read iops (ops per second) | |
140 | * | |
141 | * xx_max -- their maximum values (configured) | |
142 | * | |
143 | * Throttling formula: | |
144 | * | |
145 | * Bw/Bw_max + Br/Br_max + Ow/Ow_max + Or/Or_max <= K | |
146 | * | |
147 | * where K is the scalar value <= 1.0 (also configured) | |
148 | * | |
149 | * Bandwidth is bytes time derivatite, iops is ops time derivative, i.e. | |
150 | * Bx = d(bx)/dt, Ox = d(ox)/dt. Then the formula turns into | |
151 | * | |
152 | * d(bw/Bw_max + br/Br_max + ow/Ow_max + or/Or_max)/dt <= K | |
153 | * | |
154 | * Fair queue tickets are {w, s} weight-size pairs that are | |
155 | * | |
156 | * s = read_base_count * br, for reads | |
157 | * Br_max/Bw_max * read_base_count * bw, for writes | |
158 | * | |
159 | * w = read_base_count, for reads | |
160 | * Or_max/Ow_max * read_base_count, for writes | |
161 | * | |
162 | * Thus the formula turns into | |
163 | * | |
164 | * d(sum(w/W + s/S))/dr <= K | |
165 | * | |
166 | * where {w, s} is the ticket value if a request and sum summarizes the | |
167 | * ticket values from all the requests seen so far, {W, S} is the ticket | |
168 | * value that corresonds to a virtual summary of Or_max requests of | |
169 | * Br_max size total. | |
170 | */ | |
171 | ||
172 | /* | |
173 | * The normalization results in a float of the 2^-30 seconds order of | |
174 | * magnitude. Not to invent float point atomic arithmetics, the result | |
175 | * is converted to an integer by multiplying by a factor that's large | |
176 | * enough to turn these values into a non-zero integer. | |
177 | * | |
178 | * Also, the rates in bytes/sec when adjusted by io-queue according to | |
179 | * multipliers become too large to be stored in 32-bit ticket value. | |
180 | * Thus the rate resolution is applied. The t.bucket is configured with a | |
181 | * time period for which the speeds from F (in above formula) are taken. | |
182 | */ | |
183 | ||
184 | static constexpr float fixed_point_factor = float(1 << 24); | |
185 | using rate_resolution = std::milli; | |
186 | using token_bucket_t = internal::shared_token_bucket<capacity_t, rate_resolution, internal::capped_release::yes>; | |
187 | ||
188 | private: | |
189 | ||
190 | /* | |
191 | * The dF/dt <= K limitation is managed by the modified token bucket | |
192 | * algo where tokens are ticket.normalize(cost_capacity), the refill | |
193 | * rate is K. | |
194 | * | |
195 | * The token bucket algo must have the limit on the number of tokens | |
196 | * accumulated. Here it's configured so that it accumulates for the | |
197 | * latency_goal duration. | |
198 | * | |
199 | * The replenish threshold is the minimal number of tokens to put back. | |
200 | * It's reserved for future use to reduce the load on the replenish | |
201 | * timestamp. | |
202 | * | |
203 | * The timestamp, in turn, is the time when the bucket was replenished | |
204 | * last. Every time a shard tries to get tokens from bucket it first | |
205 | * tries to convert the time that had passed since this timestamp | |
206 | * into more tokens in the bucket. | |
207 | */ | |
208 | ||
209 | const fair_queue_ticket _cost_capacity; | |
210 | token_bucket_t _token_bucket; | |
20effc67 TL |
211 | |
212 | public: | |
1e59de90 TL |
213 | |
214 | // Convert internal capacity value back into the real token | |
215 | static double capacity_tokens(capacity_t cap) noexcept { | |
216 | return (double)cap / fixed_point_factor / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(); | |
217 | } | |
218 | ||
219 | auto capacity_duration(capacity_t cap) const noexcept { | |
220 | return _token_bucket.duration_for(cap); | |
221 | } | |
222 | ||
20effc67 | 223 | struct config { |
1e59de90 TL |
224 | sstring label = ""; |
225 | /* | |
226 | * There are two "min" values that can be configured. The former one | |
227 | * is the minimal weight:size pair that the upper layer is going to | |
228 | * submit. However, it can submit _larger_ values, and the fair queue | |
229 | * must accept those as large as the latter pair (but it can accept | |
230 | * even larger values, of course) | |
231 | */ | |
232 | unsigned min_weight = 0; | |
233 | unsigned min_size = 0; | |
234 | unsigned limit_min_weight = 0; | |
235 | unsigned limit_min_size = 0; | |
236 | unsigned long weight_rate; | |
237 | unsigned long size_rate; | |
238 | float rate_factor = 1.0; | |
239 | std::chrono::duration<double> rate_limit_duration = std::chrono::milliseconds(1); | |
20effc67 | 240 | }; |
1e59de90 TL |
241 | |
242 | explicit fair_group(config cfg); | |
20effc67 TL |
243 | fair_group(fair_group&&) = delete; |
244 | ||
1e59de90 TL |
245 | fair_queue_ticket cost_capacity() const noexcept { return _cost_capacity; } |
246 | capacity_t maximum_capacity() const noexcept { return _token_bucket.limit(); } | |
247 | capacity_t grab_capacity(capacity_t cap) noexcept; | |
248 | clock_type::time_point replenished_ts() const noexcept { return _token_bucket.replenished_ts(); } | |
249 | void release_capacity(capacity_t cap) noexcept; | |
250 | void replenish_capacity(clock_type::time_point now) noexcept; | |
251 | void maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept; | |
20effc67 | 252 | |
1e59de90 TL |
253 | capacity_t capacity_deficiency(capacity_t from) const noexcept; |
254 | capacity_t ticket_capacity(fair_queue_ticket ticket) const noexcept; | |
255 | ||
256 | std::chrono::duration<double> rate_limit_duration() const noexcept { | |
257 | std::chrono::duration<double, rate_resolution> dur((double)_token_bucket.limit() / _token_bucket.rate()); | |
258 | return std::chrono::duration_cast<std::chrono::duration<double>>(dur); | |
20effc67 TL |
259 | } |
260 | }; | |
11fdf7f2 TL |
261 | |
262 | /// \brief Fair queuing class | |
263 | /// | |
264 | /// This is a fair queue, allowing multiple request producers to queue requests | |
265 | /// that will then be served proportionally to their classes' shares. | |
266 | /// | |
267 | /// To each request, a weight can also be associated. A request of weight 1 will consume | |
268 | /// 1 share. Higher weights for a request will consume a proportionally higher amount of | |
269 | /// shares. | |
270 | /// | |
20effc67 | 271 | /// The user of this interface is expected to register multiple `priority_class_data` |
11fdf7f2 TL |
272 | /// objects, which will each have a shares attribute. |
273 | /// | |
274 | /// Internally, each priority class may keep a separate queue of requests. | |
275 | /// Requests pertaining to a class can go through even if they are over its | |
276 | /// share limit, provided that the other classes have empty queues. | |
277 | /// | |
278 | /// When the classes that lag behind start seeing requests, the fair queue will serve | |
279 | /// them first, until balance is restored. This balancing is expected to happen within | |
280 | /// a certain time window that obeys an exponential decay. | |
281 | class fair_queue { | |
282 | public: | |
283 | /// \brief Fair Queue configuration structure. | |
284 | /// | |
285 | /// \sets the operation parameters of a \ref fair_queue | |
286 | /// \related fair_queue | |
287 | struct config { | |
1e59de90 | 288 | sstring label = ""; |
20effc67 | 289 | std::chrono::microseconds tau = std::chrono::milliseconds(5); |
11fdf7f2 | 290 | }; |
11fdf7f2 | 291 | |
20effc67 TL |
292 | using class_id = unsigned int; |
293 | class priority_class_data; | |
1e59de90 TL |
294 | using capacity_t = fair_group::capacity_t; |
295 | using signed_capacity_t = std::make_signed<capacity_t>::type; | |
20effc67 TL |
296 | |
297 | private: | |
1e59de90 | 298 | using clock_type = std::chrono::steady_clock; |
20effc67 | 299 | using priority_class_ptr = priority_class_data*; |
11fdf7f2 | 300 | struct class_compare { |
20effc67 | 301 | bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept; |
11fdf7f2 TL |
302 | }; |
303 | ||
1e59de90 TL |
304 | class priority_queue : public std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare> { |
305 | using super = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>; | |
306 | public: | |
307 | void reserve(size_t len) { | |
308 | c.reserve(len); | |
309 | } | |
310 | ||
311 | void assert_enough_capacity() const noexcept { | |
312 | assert(c.size() < c.capacity()); | |
313 | } | |
314 | }; | |
315 | ||
11fdf7f2 | 316 | config _config; |
20effc67 | 317 | fair_group& _group; |
1e59de90 | 318 | clock_type::time_point _group_replenish; |
f67539c2 TL |
319 | fair_queue_ticket _resources_executing; |
320 | fair_queue_ticket _resources_queued; | |
11fdf7f2 | 321 | unsigned _requests_executing = 0; |
11fdf7f2 | 322 | unsigned _requests_queued = 0; |
1e59de90 | 323 | priority_queue _handles; |
20effc67 | 324 | std::vector<std::unique_ptr<priority_class_data>> _priority_classes; |
1e59de90 TL |
325 | size_t _nr_classes = 0; |
326 | capacity_t _last_accumulated = 0; | |
20effc67 TL |
327 | |
328 | /* | |
329 | * When the shared capacity os over the local queue delays | |
330 | * further dispatching untill better times | |
331 | * | |
1e59de90 | 332 | * \head -- the value group head rover is expected to cross |
20effc67 TL |
333 | * \cap -- the capacity that's accounted on the group |
334 | * | |
335 | * The last field is needed to "rearm" the wait in case | |
336 | * queue decides that it wants to dispatch another capacity | |
337 | * in the middle of the waiting | |
338 | */ | |
339 | struct pending { | |
1e59de90 TL |
340 | capacity_t head; |
341 | capacity_t cap; | |
20effc67 | 342 | |
1e59de90 | 343 | pending(capacity_t t, capacity_t c) noexcept : head(t), cap(c) {} |
20effc67 | 344 | }; |
11fdf7f2 | 345 | |
20effc67 | 346 | std::optional<pending> _pending; |
11fdf7f2 | 347 | |
1e59de90 TL |
348 | void push_priority_class(priority_class_data& pc) noexcept; |
349 | void push_priority_class_from_idle(priority_class_data& pc) noexcept; | |
350 | void pop_priority_class(priority_class_data& pc) noexcept; | |
351 | void plug_priority_class(priority_class_data& pc) noexcept; | |
352 | void unplug_priority_class(priority_class_data& pc) noexcept; | |
11fdf7f2 | 353 | |
1e59de90 TL |
354 | enum class grab_result { grabbed, cant_preempt, pending }; |
355 | grab_result grab_capacity(const fair_queue_entry& ent) noexcept; | |
356 | grab_result grab_pending_capacity(const fair_queue_entry& ent) noexcept; | |
11fdf7f2 TL |
357 | public: |
358 | /// Constructs a fair queue with configuration parameters \c cfg. | |
359 | /// | |
360 | /// \param cfg an instance of the class \ref config | |
20effc67 TL |
361 | explicit fair_queue(fair_group& shared, config cfg); |
362 | fair_queue(fair_queue&&); | |
363 | ~fair_queue(); | |
11fdf7f2 | 364 | |
1e59de90 TL |
365 | sstring label() const noexcept { return _config.label; } |
366 | ||
11fdf7f2 TL |
367 | /// Registers a priority class against this fair queue. |
368 | /// | |
f67539c2 | 369 | /// \param shares how many shares to create this class with |
20effc67 | 370 | void register_priority_class(class_id c, uint32_t shares); |
11fdf7f2 TL |
371 | |
372 | /// Unregister a priority class. | |
373 | /// | |
374 | /// It is illegal to unregister a priority class that still have pending requests. | |
20effc67 TL |
375 | void unregister_priority_class(class_id c); |
376 | ||
377 | void update_shares_for_class(class_id c, uint32_t new_shares); | |
11fdf7f2 TL |
378 | |
379 | /// \return how many waiters are currently queued for all classes. | |
f67539c2 | 380 | [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]] |
9f95a23c | 381 | size_t waiters() const; |
11fdf7f2 TL |
382 | |
383 | /// \return the number of requests currently executing | |
f67539c2 | 384 | [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]] |
9f95a23c | 385 | size_t requests_currently_executing() const; |
11fdf7f2 | 386 | |
f67539c2 TL |
387 | /// \return how much resources (weight, size) are currently queued for all classes. |
388 | fair_queue_ticket resources_currently_waiting() const; | |
389 | ||
390 | /// \return the amount of resources (weight, size) currently executing | |
391 | fair_queue_ticket resources_currently_executing() const; | |
392 | ||
20effc67 | 393 | /// Queue the entry \c ent through this class' \ref fair_queue |
11fdf7f2 TL |
394 | /// |
395 | /// The user of this interface is supposed to call \ref notify_requests_finished when the | |
396 | /// request finishes executing - regardless of success or failure. | |
1e59de90 TL |
397 | void queue(class_id c, fair_queue_entry& ent) noexcept; |
398 | ||
399 | void plug_class(class_id c) noexcept; | |
400 | void unplug_class(class_id c) noexcept; | |
11fdf7f2 TL |
401 | |
402 | /// Notifies that ont request finished | |
f67539c2 | 403 | /// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished. |
20effc67 TL |
404 | void notify_request_finished(fair_queue_ticket desc) noexcept; |
405 | void notify_request_cancelled(fair_queue_entry& ent) noexcept; | |
11fdf7f2 TL |
406 | |
407 | /// Try to execute new requests if there is capacity left in the queue. | |
20effc67 TL |
408 | void dispatch_requests(std::function<void(fair_queue_entry&)> cb); |
409 | ||
1e59de90 | 410 | clock_type::time_point next_pending_aio() const noexcept; |
20effc67 | 411 | |
1e59de90 | 412 | std::vector<seastar::metrics::impl::metric_definition_impl> metrics(class_id c); |
11fdf7f2 TL |
413 | }; |
414 | /// @} | |
415 | ||
416 | } | |
1e59de90 TL |
417 | |
418 | #if FMT_VERSION >= 90000 | |
419 | template <> struct fmt::formatter<seastar::fair_queue_ticket> : fmt::ostream_formatter {}; | |
420 | #endif |