]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
20effc67 | 22 | #include <boost/intrusive/parent_from_member.hpp> |
9f95a23c TL |
23 | #include <seastar/core/fair_queue.hh> |
24 | #include <seastar/core/future.hh> | |
25 | #include <seastar/core/shared_ptr.hh> | |
26 | #include <seastar/core/circular_buffer.hh> | |
27 | #include <seastar/util/noncopyable_function.hh> | |
20effc67 | 28 | #include <seastar/core/reactor.hh> |
9f95a23c TL |
29 | #include <queue> |
30 | #include <chrono> | |
31 | #include <unordered_set> | |
32 | #include <cmath> | |
33 | ||
f67539c2 TL |
34 | #include "fmt/format.h" |
35 | #include "fmt/ostream.h" | |
36 | ||
9f95a23c TL |
37 | namespace seastar { |
38 | ||
f67539c2 | 39 | static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size"); |
20effc67 TL |
40 | static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size"); |
41 | static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size"); | |
f67539c2 | 42 | |
20effc67 | 43 | fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size) noexcept |
f67539c2 TL |
44 | : _weight(weight) |
45 | , _size(size) | |
46 | {} | |
47 | ||
20effc67 | 48 | float fair_queue_ticket::normalize(fair_queue_ticket denominator) const noexcept { |
f67539c2 TL |
49 | return float(_weight) / denominator._weight + float(_size) / denominator._size; |
50 | } | |
51 | ||
20effc67 | 52 | fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const noexcept { |
f67539c2 TL |
53 | return fair_queue_ticket(_weight + desc._weight, _size + desc._size); |
54 | } | |
55 | ||
20effc67 | 56 | fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) noexcept { |
f67539c2 TL |
57 | _weight += desc._weight; |
58 | _size += desc._size; | |
59 | return *this; | |
60 | } | |
61 | ||
20effc67 | 62 | fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const noexcept { |
f67539c2 TL |
63 | return fair_queue_ticket(_weight - desc._weight, _size - desc._size); |
64 | } | |
65 | ||
20effc67 | 66 | fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) noexcept { |
f67539c2 TL |
67 | _weight -= desc._weight; |
68 | _size -= desc._size; | |
69 | return *this; | |
70 | } | |
71 | ||
20effc67 TL |
72 | fair_queue_ticket::operator bool() const noexcept { |
73 | return (_weight > 0) || (_size > 0); | |
f67539c2 TL |
74 | } |
75 | ||
20effc67 TL |
76 | bool fair_queue_ticket::operator==(const fair_queue_ticket& o) const noexcept { |
77 | return _weight == o._weight && _size == o._size; | |
f67539c2 TL |
78 | } |
79 | ||
80 | std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) { | |
81 | return os << t._weight << ":" << t._size; | |
82 | } | |
83 | ||
20effc67 TL |
84 | fair_group_rover::fair_group_rover(uint32_t weight, uint32_t size) noexcept |
85 | : _weight(weight) | |
86 | , _size(size) | |
87 | {} | |
88 | ||
89 | fair_queue_ticket fair_group_rover::maybe_ahead_of(const fair_group_rover& other) const noexcept { | |
90 | return fair_queue_ticket(std::max<int32_t>(_weight - other._weight, 0), | |
91 | std::max<int32_t>(_size - other._size, 0)); | |
92 | } | |
93 | ||
94 | fair_group_rover fair_group_rover::operator+(fair_queue_ticket t) const noexcept { | |
95 | return fair_group_rover(_weight + t._weight, _size + t._size); | |
96 | } | |
97 | ||
98 | fair_group_rover& fair_group_rover::operator+=(fair_queue_ticket t) noexcept { | |
99 | _weight += t._weight; | |
100 | _size += t._size; | |
101 | return *this; | |
102 | } | |
103 | ||
104 | std::ostream& operator<<(std::ostream& os, fair_group_rover r) { | |
105 | return os << r._weight << ":" << r._size; | |
106 | } | |
107 | ||
108 | fair_group::fair_group(config cfg) noexcept | |
109 | : _capacity_tail(fair_group_rover(0, 0)) | |
110 | , _capacity_head(fair_group_rover(cfg.max_req_count, cfg.max_bytes_count)) | |
111 | , _maximum_capacity(cfg.max_req_count, cfg.max_bytes_count) | |
112 | { | |
113 | assert(!_capacity_tail.load(std::memory_order_relaxed) | |
114 | .maybe_ahead_of(_capacity_head.load(std::memory_order_relaxed))); | |
115 | seastar_logger.debug("Created fair group, capacity {}:{}", cfg.max_req_count, cfg.max_bytes_count); | |
116 | } | |
117 | ||
118 | fair_group_rover fair_group::grab_capacity(fair_queue_ticket cap) noexcept { | |
119 | fair_group_rover cur = _capacity_tail.load(std::memory_order_relaxed); | |
120 | while (!_capacity_tail.compare_exchange_weak(cur, cur + cap)) ; | |
121 | return cur; | |
122 | } | |
123 | ||
124 | void fair_group::release_capacity(fair_queue_ticket cap) noexcept { | |
125 | fair_group_rover cur = _capacity_head.load(std::memory_order_relaxed); | |
126 | while (!_capacity_head.compare_exchange_weak(cur, cur + cap)) ; | |
127 | } | |
128 | ||
129 | // Priority class, to be used with a given fair_queue | |
130 | class fair_queue::priority_class_data { | |
131 | using accumulator_t = double; | |
132 | friend class fair_queue; | |
133 | uint32_t _shares = 0; | |
134 | accumulator_t _accumulated = 0; | |
135 | fair_queue_entry::container_list_t _queue; | |
136 | bool _queued = false; | |
137 | ||
138 | public: | |
139 | explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {} | |
140 | ||
141 | void update_shares(uint32_t shares) noexcept { | |
142 | _shares = (std::max(shares, 1u)); | |
143 | } | |
144 | }; | |
145 | ||
146 | bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept { | |
147 | return lhs->_accumulated > rhs->_accumulated; | |
148 | } | |
149 | ||
150 | fair_queue::fair_queue(fair_group& group, config cfg) | |
f67539c2 | 151 | : _config(std::move(cfg)) |
20effc67 | 152 | , _group(group) |
f67539c2 | 153 | , _base(std::chrono::steady_clock::now()) |
20effc67 TL |
154 | { |
155 | seastar_logger.debug("Created fair queue, ticket pace {}:{}", cfg.ticket_weight_pace, cfg.ticket_size_pace); | |
156 | } | |
157 | ||
158 | fair_queue::fair_queue(fair_queue&& other) | |
159 | : _config(std::move(other._config)) | |
160 | , _group(other._group) | |
161 | , _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{})) | |
162 | , _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{})) | |
163 | , _requests_executing(std::exchange(other._requests_executing, 0)) | |
164 | , _requests_queued(std::exchange(other._requests_queued, 0)) | |
165 | , _base(other._base) | |
166 | , _handles(std::move(other._handles)) | |
167 | , _priority_classes(std::move(other._priority_classes)) | |
168 | { | |
169 | } | |
f67539c2 | 170 | |
20effc67 TL |
171 | fair_queue::~fair_queue() { |
172 | for (const auto& fq : _priority_classes) { | |
173 | assert(!fq); | |
9f95a23c TL |
174 | } |
175 | } | |
176 | ||
20effc67 TL |
177 | void fair_queue::push_priority_class(priority_class_data& pc) { |
178 | if (!pc._queued) { | |
179 | _handles.push(&pc); | |
180 | pc._queued = true; | |
181 | } | |
9f95a23c TL |
182 | } |
183 | ||
20effc67 TL |
184 | void fair_queue::pop_priority_class(priority_class_data& pc) { |
185 | assert(pc._queued); | |
186 | pc._queued = false; | |
187 | _handles.pop(); | |
9f95a23c TL |
188 | } |
189 | ||
190 | void fair_queue::normalize_stats() { | |
20effc67 TL |
191 | _base = std::chrono::steady_clock::now() - _config.tau; |
192 | for (auto& pc: _priority_classes) { | |
193 | if (pc) { | |
194 | pc->_accumulated *= std::numeric_limits<priority_class_data::accumulator_t>::min(); | |
195 | } | |
196 | } | |
197 | } | |
198 | ||
199 | std::chrono::microseconds fair_queue_ticket::duration_at_pace(float weight_pace, float size_pace) const noexcept { | |
200 | unsigned long dur = ((_weight * weight_pace) + (_size * size_pace)); | |
201 | return std::chrono::microseconds(dur); | |
202 | } | |
203 | ||
204 | bool fair_queue::grab_pending_capacity(fair_queue_ticket cap) noexcept { | |
205 | fair_group_rover pending_head = _pending->orig_tail + cap; | |
206 | if (pending_head.maybe_ahead_of(_group.head())) { | |
207 | return false; | |
208 | } | |
209 | ||
210 | if (cap == _pending->cap) { | |
211 | _pending.reset(); | |
212 | } else { | |
213 | /* | |
214 | * This branch is called when the fair queue decides to | |
215 | * submit not the same request that entered it into the | |
216 | * pending state and this new request crawls through the | |
217 | * expected head value. | |
218 | */ | |
219 | _group.grab_capacity(cap); | |
220 | _pending->orig_tail += cap; | |
221 | } | |
222 | ||
223 | return true; | |
224 | } | |
225 | ||
226 | bool fair_queue::grab_capacity(fair_queue_ticket cap) noexcept { | |
227 | if (_pending) { | |
228 | return grab_pending_capacity(cap); | |
229 | } | |
230 | ||
231 | fair_group_rover orig_tail = _group.grab_capacity(cap); | |
232 | if ((orig_tail + cap).maybe_ahead_of(_group.head())) { | |
233 | _pending.emplace(orig_tail, cap); | |
234 | return false; | |
9f95a23c | 235 | } |
20effc67 TL |
236 | |
237 | return true; | |
9f95a23c TL |
238 | } |
239 | ||
20effc67 TL |
240 | void fair_queue::register_priority_class(class_id id, uint32_t shares) { |
241 | if (id >= _priority_classes.size()) { | |
242 | _priority_classes.resize(id + 1); | |
243 | } else { | |
244 | assert(!_priority_classes[id]); | |
245 | } | |
246 | ||
247 | _priority_classes[id] = std::make_unique<priority_class_data>(shares); | |
9f95a23c TL |
248 | } |
249 | ||
20effc67 TL |
250 | void fair_queue::unregister_priority_class(class_id id) { |
251 | auto& pclass = _priority_classes[id]; | |
252 | assert(pclass && pclass->_queue.empty()); | |
253 | pclass.reset(); | |
9f95a23c TL |
254 | } |
255 | ||
20effc67 TL |
256 | void fair_queue::update_shares_for_class(class_id id, uint32_t shares) { |
257 | assert(id < _priority_classes.size()); | |
258 | auto& pc = _priority_classes[id]; | |
259 | assert(pc); | |
260 | pc->update_shares(shares); | |
9f95a23c TL |
261 | } |
262 | ||
263 | size_t fair_queue::waiters() const { | |
264 | return _requests_queued; | |
265 | } | |
266 | ||
267 | size_t fair_queue::requests_currently_executing() const { | |
268 | return _requests_executing; | |
269 | } | |
270 | ||
f67539c2 TL |
271 | fair_queue_ticket fair_queue::resources_currently_waiting() const { |
272 | return _resources_queued; | |
273 | } | |
274 | ||
275 | fair_queue_ticket fair_queue::resources_currently_executing() const { | |
276 | return _resources_executing; | |
277 | } | |
278 | ||
20effc67 TL |
279 | void fair_queue::queue(class_id id, fair_queue_entry& ent) { |
280 | priority_class_data& pc = *_priority_classes[id]; | |
9f95a23c TL |
281 | // We need to return a future in this function on which the caller can wait. |
282 | // Since we don't know which queue we will use to execute the next request - if ours or | |
283 | // someone else's, we need a separate promise at this point. | |
284 | push_priority_class(pc); | |
20effc67 TL |
285 | pc._queue.push_back(ent); |
286 | _resources_queued += ent._ticket; | |
9f95a23c TL |
287 | _requests_queued++; |
288 | } | |
289 | ||
20effc67 | 290 | void fair_queue::notify_request_finished(fair_queue_ticket desc) noexcept { |
f67539c2 | 291 | _resources_executing -= desc; |
20effc67 TL |
292 | _requests_executing--; |
293 | _group.release_capacity(desc); | |
9f95a23c TL |
294 | } |
295 | ||
20effc67 TL |
296 | void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { |
297 | _resources_queued -= ent._ticket; | |
298 | ent._ticket = fair_queue_ticket(); | |
299 | } | |
300 | ||
301 | void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) { | |
302 | while (!_handles.empty()) { | |
303 | priority_class_data& h = *_handles.top(); | |
304 | if (h._queue.empty()) { | |
305 | pop_priority_class(h); | |
306 | continue; | |
307 | } | |
9f95a23c | 308 | |
20effc67 TL |
309 | auto& req = h._queue.front(); |
310 | if (!grab_capacity(req._ticket)) { | |
311 | break; | |
312 | } | |
313 | ||
314 | pop_priority_class(h); | |
315 | h._queue.pop_front(); | |
316 | ||
317 | _resources_executing += req._ticket; | |
318 | _resources_queued -= req._ticket; | |
9f95a23c | 319 | _requests_executing++; |
9f95a23c TL |
320 | _requests_queued--; |
321 | ||
322 | auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); | |
20effc67 TL |
323 | auto req_cost = req._ticket.normalize(_group.maximum_capacity()) / h._shares; |
324 | auto cost = exp(priority_class_data::accumulator_t(1.0f/_config.tau.count() * delta.count())) * req_cost; | |
325 | priority_class_data::accumulator_t next_accumulated = h._accumulated + cost; | |
9f95a23c TL |
326 | while (std::isinf(next_accumulated)) { |
327 | normalize_stats(); | |
328 | // If we have renormalized, our time base will have changed. This should happen very infrequently | |
329 | delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); | |
20effc67 TL |
330 | cost = exp(priority_class_data::accumulator_t(1.0f/_config.tau.count() * delta.count())) * req_cost; |
331 | next_accumulated = h._accumulated + cost; | |
9f95a23c | 332 | } |
20effc67 | 333 | h._accumulated = next_accumulated; |
9f95a23c | 334 | |
20effc67 | 335 | if (!h._queue.empty()) { |
9f95a23c TL |
336 | push_priority_class(h); |
337 | } | |
20effc67 TL |
338 | |
339 | cb(req); | |
9f95a23c TL |
340 | } |
341 | } | |
342 | ||
9f95a23c | 343 | } |