2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2016 ScyllaDB
24 #include <seastar/core/shared_ptr.hh>
25 #include <seastar/core/circular_buffer.hh>
26 #include <seastar/util/noncopyable_function.hh>
29 #include <unordered_set>
33 /// \brief describes a request that passes through the fair queue
35 /// \related fair_queue
36 struct fair_queue_request_descriptor {
37 unsigned weight = 1; ///< the weight of this request for capacity purposes (IOPS).
38 unsigned size = 1; ///< the effective size of this request
41 /// \addtogroup io-module
45 class priority_class {
47 noncopyable_function<void()> func;
48 fair_queue_request_descriptor desc;
50 friend class fair_queue;
52 float _accumulated = 0;
53 circular_buffer<request> _queue;
56 friend struct shared_ptr_no_esft<priority_class>;
57 explicit priority_class(uint32_t shares) : _shares(std::max(shares, 1u)) {}
59 void update_shares(uint32_t shares) {
60 _shares = (std::max(shares, 1u));
63 /// \brief return the current amount of shares for this priority class
64 uint32_t shares() const {
70 /// \brief Priority class, to be used with a given \ref fair_queue
72 /// An instance of this class is associated with a given \ref fair_queue. When registering
73 /// a class, the caller will receive a \ref lw_shared_ptr to an object of this class. All its methods
74 /// are private, so the only thing the caller is expected to do with it is to pass it later
75 /// to the \ref fair_queue to identify a given class.
77 /// \related fair_queue
78 using priority_class_ptr = lw_shared_ptr<priority_class>;
80 /// \brief Fair queuing class
82 /// This is a fair queue, allowing multiple request producers to queue requests
83 /// that will then be served proportionally to their classes' shares.
85 /// To each request, a weight can also be associated. A request of weight 1 will consume
86 /// 1 share. Higher weights for a request will consume a proportionally higher amount of
89 /// The user of this interface is expected to register multiple \ref priority_class
90 /// objects, which will each have a shares attribute.
92 /// Internally, each priority class may keep a separate queue of requests.
93 /// Requests pertaining to a class can go through even if they are over its
94 /// share limit, provided that the other classes have empty queues.
96 /// When the classes that lag behind start seeing requests, the fair queue will serve
97 /// them first, until balance is restored. This balancing is expected to happen within
98 /// a certain time window that obeys an exponential decay.
101 /// \brief Fair Queue configuration structure.
103 /// \sets the operation parameters of a \ref fair_queue
104 /// \related fair_queue
106 unsigned capacity = std::numeric_limits<unsigned>::max();
107 std::chrono::microseconds tau = std::chrono::milliseconds(100);
108 unsigned max_req_count = std::numeric_limits<unsigned>::max();
109 unsigned max_bytes_count = std::numeric_limits<unsigned>::max();
112 friend priority_class;
114 struct class_compare {
115 bool operator() (const priority_class_ptr& lhs, const priority_class_ptr& rhs) const {
116 return lhs->_accumulated > rhs->_accumulated;
121 unsigned _requests_executing = 0;
122 unsigned _req_count_executing = 0;
123 unsigned _bytes_count_executing = 0;
124 unsigned _requests_queued = 0;
125 using clock_type = std::chrono::steady_clock::time_point;
127 using prioq = std::priority_queue<priority_class_ptr, std::vector<priority_class_ptr>, class_compare>;
129 std::unordered_set<priority_class_ptr> _all_classes;
131 void push_priority_class(priority_class_ptr pc);
133 priority_class_ptr pop_priority_class();
135 float normalize_factor() const;
137 void normalize_stats();
139 bool can_dispatch() const;
141 /// Constructs a fair queue with configuration parameters \c cfg.
143 /// \param cfg an instance of the class \ref config
144 explicit fair_queue(config cfg)
145 : _config(std::move(cfg))
146 , _base(std::chrono::steady_clock::now())
149 /// Constructs a fair queue with a given \c capacity.
151 /// \param capacity how many concurrent requests are allowed in this queue.
152 /// \param tau the queue exponential decay parameter, as in exp(-1/tau * t)
153 explicit fair_queue(unsigned capacity, std::chrono::microseconds tau = std::chrono::milliseconds(100))
154 : fair_queue(config{capacity, tau}) {}
156 /// Registers a priority class against this fair queue.
158 /// \param shares, how many shares to create this class with
159 priority_class_ptr register_priority_class(uint32_t shares);
161 /// Unregister a priority class.
163 /// It is illegal to unregister a priority class that still have pending requests.
164 void unregister_priority_class(priority_class_ptr pclass);
166 /// \return how many waiters are currently queued for all classes.
167 size_t waiters() const;
169 /// \return the number of requests currently executing
170 size_t requests_currently_executing() const;
172 /// Queue the function \c func through this class' \ref fair_queue, with weight \c weight
174 /// It is expected that \c func doesn't throw. If it does throw, it will be just removed from
175 /// the queue and discarded.
177 /// The user of this interface is supposed to call \ref notify_requests_finished when the
178 /// request finishes executing - regardless of success or failure.
179 void queue(priority_class_ptr pc, fair_queue_request_descriptor desc, noncopyable_function<void()> func);
181 /// Notifies that ont request finished
182 /// \param desc an instance of \c fair_queue_request_descriptor structure describing the request that just finished.
183 void notify_requests_finished(fair_queue_request_descriptor& desc);
185 /// Try to execute new requests if there is capacity left in the queue.
186 void dispatch_requests();
188 /// Updates the current shares of this priority class
190 /// \param new_shares the new number of shares for this priority class
191 static void update_shares(priority_class_ptr pc, uint32_t new_shares);