]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/fair_queue.hh
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / include / seastar / core / fair_queue.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2016 ScyllaDB
21 */
22 #pragma once
23
24 #include <boost/intrusive/slist.hpp>
25 #include <seastar/core/shared_ptr.hh>
26 #include <seastar/core/circular_buffer.hh>
27 #include <functional>
28 #include <atomic>
29 #include <queue>
30 #include <chrono>
31 #include <unordered_set>
32 #include <optional>
33
34 namespace bi = boost::intrusive;
35
36 namespace seastar {
37
38 class fair_group_rover;
39
40 /// \brief describes a request that passes through the \ref fair_queue.
41 ///
42 /// A ticket is specified by a \c weight and a \c size. For example, one can specify a request of \c weight
43 /// 1 and \c size 16kB. If the \ref fair_queue accepts one such request per second, it will sustain 1 IOPS
44 /// at 16kB/s bandwidth.
45 ///
46 /// \related fair_queue
47 class fair_queue_ticket {
48 uint32_t _weight = 0; ///< the total weight of these requests for capacity purposes (IOPS).
49 uint32_t _size = 0; ///< the total effective size of these requests
50 friend class fair_group_rover;
51 public:
52 /// Constructs a fair_queue_ticket with a given \c weight and a given \c size
53 ///
54 /// \param weight the weight of the request
55 /// \param size the size of the request
56 fair_queue_ticket(uint32_t weight, uint32_t size) noexcept;
57 fair_queue_ticket() noexcept {}
58 fair_queue_ticket operator+(fair_queue_ticket desc) const noexcept;
59 fair_queue_ticket operator-(fair_queue_ticket desc) const noexcept;
60 /// Increase the quantity represented in this ticket by the amount represented by \c desc
61 /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be added to this one
62 fair_queue_ticket& operator+=(fair_queue_ticket desc) noexcept;
63 /// Decreases the quantity represented in this ticket by the amount represented by \c desc
64 /// \param desc another \ref fair_queue_ticket whose \c weight \c and size will be decremented from this one
65 fair_queue_ticket& operator-=(fair_queue_ticket desc) noexcept;
66 /// Checks if the tickets fully equals to another one
67 /// \param desc another \ref fair_queue_ticket to compare with
68 bool operator==(const fair_queue_ticket& desc) const noexcept;
69
70 std::chrono::microseconds duration_at_pace(float weight_pace, float size_pace) const noexcept;
71
72 /// \returns true if the fair_queue_ticket represents a non-zero quantity.
73 ///
74 /// For a fair_queue ticket to be non-zero, at least one of its represented quantities need to
75 /// be non-zero
76 explicit operator bool() const noexcept;
77
78 friend std::ostream& operator<<(std::ostream& os, fair_queue_ticket t);
79
80 /// \returns the normalized value of this \ref fair_queue_ticket along a base axis
81 ///
82 /// The normalization function itself is an implementation detail, but one can expect either weight or
83 /// size to have more or less relative importance depending on which of the dimensions in the
84 /// denominator is relatively higher. For example, given this request a, and two other requests
85 /// b and c, such that that c has the same \c weight but a higher \c size than b, one can expect
86 /// the \c size component of this request to play a larger role.
87 ///
88 /// It is legal for the numerator to have one of the quantities set to zero, in which case only
89 /// the other quantity is taken into consideration.
90 ///
91 /// It is however not legal for the axis to have any quantity set to zero.
92 /// \param axis another \ref fair_queue_ticket to be used as a a base vector against which to normalize this fair_queue_ticket.
93 float normalize(fair_queue_ticket axis) const noexcept;
94 };
95
96 class fair_group_rover {
97 uint32_t _weight = 0;
98 uint32_t _size = 0;
99
100 public:
101 fair_group_rover(uint32_t weight, uint32_t size) noexcept;
102
103 /*
104 * For both dimentions checks if the current rover is ahead of the
105 * other and returns the difference. If this is behind returns zero.
106 */
107 fair_queue_ticket maybe_ahead_of(const fair_group_rover& other) const noexcept;
108 fair_group_rover operator+(fair_queue_ticket t) const noexcept;
109 fair_group_rover& operator+=(fair_queue_ticket t) noexcept;
110
111 friend std::ostream& operator<<(std::ostream& os, fair_group_rover r);
112 };
113
114 /// \addtogroup io-module
115 /// @{
116
117 class fair_queue_entry {
118 friend class fair_queue;
119
120 fair_queue_ticket _ticket;
121 bi::slist_member_hook<> _hook;
122
123 public:
124 fair_queue_entry(fair_queue_ticket t) noexcept
125 : _ticket(std::move(t)) {}
126 using container_list_t = bi::slist<fair_queue_entry,
127 bi::constant_time_size<false>,
128 bi::cache_last<true>,
129 bi::member_hook<fair_queue_entry, bi::slist_member_hook<>, &fair_queue_entry::_hook>>;
130
131 fair_queue_ticket ticket() const noexcept { return _ticket; }
132 };
133
134 /// \brief Group of queues class
135 ///
136 /// This is a fair group. It's attached by one or mode fair queues. On machines having the
137 /// big* amount of shards, queues use the group to borrow/lend the needed capacity for
138 /// requests dispatching.
139 ///
140 /// * Big means that when all shards sumbit requests alltogether the disk is unable to
141 /// dispatch them efficiently. The inability can be of two kinds -- either disk cannot
142 /// cope with the number of arriving requests, or the total size of the data withing
143 /// the given time frame exceeds the disk throughput.
144 class fair_group {
145 using fair_group_atomic_rover = std::atomic<fair_group_rover>;
146 static_assert(fair_group_atomic_rover::is_always_lock_free);
147
148 fair_group_atomic_rover _capacity_tail;
149 fair_group_atomic_rover _capacity_head;
150 fair_queue_ticket _maximum_capacity;
151
152 public:
153 struct config {
154 unsigned max_req_count;
155 unsigned max_bytes_count;
156
157 /// Constructs a config with the given \c capacity, expressed in maximum
158 /// values for requests and bytes.
159 ///
160 /// \param max_requests how many concurrent requests are allowed in this queue.
161 /// \param max_bytes how many total bytes are allowed in this queue.
162 config(unsigned max_requests, unsigned max_bytes) noexcept
163 : max_req_count(max_requests), max_bytes_count(max_bytes) {}
164 };
165 explicit fair_group(config cfg) noexcept;
166 fair_group(fair_group&&) = delete;
167
168 fair_queue_ticket maximum_capacity() const noexcept { return _maximum_capacity; }
169 fair_group_rover grab_capacity(fair_queue_ticket cap) noexcept;
170 void release_capacity(fair_queue_ticket cap) noexcept;
171
172 fair_group_rover head() const noexcept {
173 return _capacity_head.load(std::memory_order_relaxed);
174 }
175 };
176
177 /// \brief Fair queuing class
178 ///
179 /// This is a fair queue, allowing multiple request producers to queue requests
180 /// that will then be served proportionally to their classes' shares.
181 ///
182 /// To each request, a weight can also be associated. A request of weight 1 will consume
183 /// 1 share. Higher weights for a request will consume a proportionally higher amount of
184 /// shares.
185 ///
186 /// The user of this interface is expected to register multiple `priority_class_data`
187 /// objects, which will each have a shares attribute.
188 ///
189 /// Internally, each priority class may keep a separate queue of requests.
190 /// Requests pertaining to a class can go through even if they are over its
191 /// share limit, provided that the other classes have empty queues.
192 ///
193 /// When the classes that lag behind start seeing requests, the fair queue will serve
194 /// them first, until balance is restored. This balancing is expected to happen within
195 /// a certain time window that obeys an exponential decay.
196 class fair_queue {
197 public:
198 /// \brief Fair Queue configuration structure.
199 ///
200 /// \sets the operation parameters of a \ref fair_queue
201 /// \related fair_queue
202 struct config {
203 std::chrono::microseconds tau = std::chrono::milliseconds(5);
204 // Time (in microseconds) is takes to process one ticket value
205 float ticket_size_pace;
206 float ticket_weight_pace;
207 };
208
209 using class_id = unsigned int;
210 class priority_class_data;
211
212 private:
213 using priority_class_ptr = priority_class_data*;
214 struct class_compare {
215 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept;
216 };
217
218 config _config;
219 fair_group& _group;
220 fair_queue_ticket _resources_executing;
221 fair_queue_ticket _resources_queued;
222 unsigned _requests_executing = 0;
223 unsigned _requests_queued = 0;
224 using clock_type = std::chrono::steady_clock::time_point;
225 clock_type _base;
226 using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
227 prioq _handles;
228 std::vector<std::unique_ptr<priority_class_data>> _priority_classes;
229
230 /*
231 * When the shared capacity os over the local queue delays
232 * further dispatching untill better times
233 *
234 * \orig_tail -- the value group tail rover had when it happened
235 * \cap -- the capacity that's accounted on the group
236 *
237 * The last field is needed to "rearm" the wait in case
238 * queue decides that it wants to dispatch another capacity
239 * in the middle of the waiting
240 */
241 struct pending {
242 fair_group_rover orig_tail;
243 fair_queue_ticket cap;
244
245 pending(fair_group_rover t, fair_queue_ticket c) noexcept : orig_tail(t), cap(c) {}
246 };
247
248 std::optional<pending> _pending;
249
250 void push_priority_class(priority_class_data& pc);
251 void pop_priority_class(priority_class_data& pc);
252
253 void normalize_stats();
254
255 // Estimated time to process the given ticket
256 std::chrono::microseconds duration(fair_queue_ticket desc) const noexcept {
257 return desc.duration_at_pace(_config.ticket_weight_pace, _config.ticket_size_pace);
258 }
259
260 bool grab_capacity(fair_queue_ticket cap) noexcept;
261 bool grab_pending_capacity(fair_queue_ticket cap) noexcept;
262 public:
263 /// Constructs a fair queue with configuration parameters \c cfg.
264 ///
265 /// \param cfg an instance of the class \ref config
266 explicit fair_queue(fair_group& shared, config cfg);
267 fair_queue(fair_queue&&);
268 ~fair_queue();
269
270 /// Registers a priority class against this fair queue.
271 ///
272 /// \param shares how many shares to create this class with
273 void register_priority_class(class_id c, uint32_t shares);
274
275 /// Unregister a priority class.
276 ///
277 /// It is illegal to unregister a priority class that still have pending requests.
278 void unregister_priority_class(class_id c);
279
280 void update_shares_for_class(class_id c, uint32_t new_shares);
281
282 /// \return how many waiters are currently queued for all classes.
283 [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
284 size_t waiters() const;
285
286 /// \return the number of requests currently executing
287 [[deprecated("fair_queue users should not track individual requests, but resources (weight, size) passing through the queue")]]
288 size_t requests_currently_executing() const;
289
290 /// \return how much resources (weight, size) are currently queued for all classes.
291 fair_queue_ticket resources_currently_waiting() const;
292
293 /// \return the amount of resources (weight, size) currently executing
294 fair_queue_ticket resources_currently_executing() const;
295
296 /// Queue the entry \c ent through this class' \ref fair_queue
297 ///
298 /// The user of this interface is supposed to call \ref notify_requests_finished when the
299 /// request finishes executing - regardless of success or failure.
300 void queue(class_id c, fair_queue_entry& ent);
301
302 /// Notifies that ont request finished
303 /// \param desc an instance of \c fair_queue_ticket structure describing the request that just finished.
304 void notify_request_finished(fair_queue_ticket desc) noexcept;
305 void notify_request_cancelled(fair_queue_entry& ent) noexcept;
306
307 /// Try to execute new requests if there is capacity left in the queue.
308 void dispatch_requests(std::function<void(fair_queue_entry&)> cb);
309
310 clock_type next_pending_aio() const noexcept {
311 if (_pending) {
312 /*
313 * We expect the disk to release the ticket within some time,
314 * but it's ... OK if it doesn't -- the pending wait still
315 * needs the head rover value to be ahead of the needed value.
316 *
317 * It may happen that the capacity gets released before we think
318 * it will, in this case we will wait for the full value again,
319 * which's sub-optimal. The expectation is that we think disk
320 * works faster, than it really does.
321 */
322 fair_group_rover pending_head = _pending->orig_tail + _pending->cap;
323 fair_queue_ticket over = pending_head.maybe_ahead_of(_group.head());
324 return std::chrono::steady_clock::now() + duration(over);
325 }
326
327 return std::chrono::steady_clock::time_point::max();
328 }
329 };
330 /// @}
331
332 }