]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/fair_queue.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / core / fair_queue.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2016 ScyllaDB
21 */
22 #pragma once
23
24 #include <seastar/core/shared_ptr.hh>
25 #include <seastar/core/circular_buffer.hh>
26 #include <seastar/util/noncopyable_function.hh>
27 #include <queue>
28 #include <chrono>
29 #include <unordered_set>
30
31 namespace seastar {
32
33 /// \brief describes a request that passes through the fair queue
34 ///
35 /// \related fair_queue
36 struct fair_queue_request_descriptor {
37 unsigned weight = 1; ///< the weight of this request for capacity purposes (IOPS).
38 unsigned size = 1; ///< the effective size of this request
39 };
40
41 /// \addtogroup io-module
42 /// @{
43
44 /// \cond internal
45 class priority_class {
46 struct request {
47 noncopyable_function<void()> func;
48 fair_queue_request_descriptor desc;
49 };
50 friend class fair_queue;
51 uint32_t _shares = 0;
52 float _accumulated = 0;
53 circular_buffer<request> _queue;
54 bool _queued = false;
55
56 friend struct shared_ptr_no_esft<priority_class>;
57 explicit priority_class(uint32_t shares) : _shares(std::max(shares, 1u)) {}
58
59 void update_shares(uint32_t shares) {
60 _shares = (std::max(shares, 1u));
61 }
62 public:
63 /// \brief return the current amount of shares for this priority class
64 uint32_t shares() const {
65 return _shares;
66 }
67 };
68 /// \endcond
69
70 /// \brief Priority class, to be used with a given \ref fair_queue
71 ///
72 /// An instance of this class is associated with a given \ref fair_queue. When registering
73 /// a class, the caller will receive a \ref lw_shared_ptr to an object of this class. All its methods
74 /// are private, so the only thing the caller is expected to do with it is to pass it later
75 /// to the \ref fair_queue to identify a given class.
76 ///
77 /// \related fair_queue
78 using priority_class_ptr = lw_shared_ptr<priority_class>;
79
80 /// \brief Fair queuing class
81 ///
82 /// This is a fair queue, allowing multiple request producers to queue requests
83 /// that will then be served proportionally to their classes' shares.
84 ///
85 /// To each request, a weight can also be associated. A request of weight 1 will consume
86 /// 1 share. Higher weights for a request will consume a proportionally higher amount of
87 /// shares.
88 ///
89 /// The user of this interface is expected to register multiple \ref priority_class
90 /// objects, which will each have a shares attribute.
91 ///
92 /// Internally, each priority class may keep a separate queue of requests.
93 /// Requests pertaining to a class can go through even if they are over its
94 /// share limit, provided that the other classes have empty queues.
95 ///
96 /// When the classes that lag behind start seeing requests, the fair queue will serve
97 /// them first, until balance is restored. This balancing is expected to happen within
98 /// a certain time window that obeys an exponential decay.
99 class fair_queue {
100 public:
101 /// \brief Fair Queue configuration structure.
102 ///
103 /// \sets the operation parameters of a \ref fair_queue
104 /// \related fair_queue
105 struct config {
106 unsigned capacity = std::numeric_limits<unsigned>::max();
107 std::chrono::microseconds tau = std::chrono::milliseconds(100);
108 unsigned max_req_count = std::numeric_limits<unsigned>::max();
109 unsigned max_bytes_count = std::numeric_limits<unsigned>::max();
110 };
111 private:
112 friend priority_class;
113
114 struct class_compare {
115 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr& rhs) const {
116 return lhs->_accumulated > rhs->_accumulated;
117 }
118 };
119
120 config _config;
121 unsigned _requests_executing = 0;
122 unsigned _req_count_executing = 0;
123 unsigned _bytes_count_executing = 0;
124 unsigned _requests_queued = 0;
125 using clock_type = std::chrono::steady_clock::time_point;
126 clock_type _base;
127 using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
128 prioq _handles;
129 std::unordered_set<priority_class_ptr> _all_classes;
130
131 void push_priority_class(priority_class_ptr pc);
132
133 priority_class_ptr pop_priority_class();
134
135 float normalize_factor() const;
136
137 void normalize_stats();
138
139 bool can_dispatch() const;
140 public:
141 /// Constructs a fair queue with configuration parameters \c cfg.
142 ///
143 /// \param cfg an instance of the class \ref config
144 explicit fair_queue(config cfg)
145 : _config(std::move(cfg))
146 , _base(std::chrono::steady_clock::now())
147 {}
148
149 /// Constructs a fair queue with a given \c capacity.
150 ///
151 /// \param capacity how many concurrent requests are allowed in this queue.
152 /// \param tau the queue exponential decay parameter, as in exp(-1/tau * t)
153 explicit fair_queue(unsigned capacity, std::chrono::microseconds tau = std::chrono::milliseconds(100))
154 : fair_queue(config{capacity, tau}) {}
155
156 /// Registers a priority class against this fair queue.
157 ///
158 /// \param shares, how many shares to create this class with
159 priority_class_ptr register_priority_class(uint32_t shares);
160
161 /// Unregister a priority class.
162 ///
163 /// It is illegal to unregister a priority class that still have pending requests.
164 void unregister_priority_class(priority_class_ptr pclass);
165
166 /// \return how many waiters are currently queued for all classes.
167 size_t waiters() const;
168
169 /// \return the number of requests currently executing
170 size_t requests_currently_executing() const;
171
172 /// Queue the function \c func through this class' \ref fair_queue, with weight \c weight
173 ///
174 /// It is expected that \c func doesn't throw. If it does throw, it will be just removed from
175 /// the queue and discarded.
176 ///
177 /// The user of this interface is supposed to call \ref notify_requests_finished when the
178 /// request finishes executing - regardless of success or failure.
179 void queue(priority_class_ptr pc, fair_queue_request_descriptor desc, noncopyable_function<void()> func);
180
181 /// Notifies that ont request finished
182 /// \param desc an instance of \c fair_queue_request_descriptor structure describing the request that just finished.
183 void notify_requests_finished(fair_queue_request_descriptor& desc);
184
185 /// Try to execute new requests if there is capacity left in the queue.
186 void dispatch_requests();
187
188 /// Updates the current shares of this priority class
189 ///
190 /// \param new_shares the new number of shares for this priority class
191 static void update_shares(priority_class_ptr pc, uint32_t new_shares);
192 };
193 /// @}
194
195 }