]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/fair_queue.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / core / fair_queue.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
20effc67 22#include <boost/intrusive/parent_from_member.hpp>
9f95a23c
TL
23#include <seastar/core/fair_queue.hh>
24#include <seastar/core/future.hh>
25#include <seastar/core/shared_ptr.hh>
26#include <seastar/core/circular_buffer.hh>
27#include <seastar/util/noncopyable_function.hh>
20effc67 28#include <seastar/core/reactor.hh>
9f95a23c
TL
29#include <queue>
30#include <chrono>
31#include <unordered_set>
32#include <cmath>
33
f67539c2
TL
34#include "fmt/format.h"
35#include "fmt/ostream.h"
36
9f95a23c
TL
37namespace seastar {
38
f67539c2 39static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
20effc67
TL
40static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
41static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
f67539c2 42
20effc67 43fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size) noexcept
f67539c2
TL
44 : _weight(weight)
45 , _size(size)
46{}
47
20effc67 48float fair_queue_ticket::normalize(fair_queue_ticket denominator) const noexcept {
f67539c2
TL
49 return float(_weight) / denominator._weight + float(_size) / denominator._size;
50}
51
20effc67 52fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const noexcept {
f67539c2
TL
53 return fair_queue_ticket(_weight + desc._weight, _size + desc._size);
54}
55
20effc67 56fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) noexcept {
f67539c2
TL
57 _weight += desc._weight;
58 _size += desc._size;
59 return *this;
60}
61
20effc67 62fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const noexcept {
f67539c2
TL
63 return fair_queue_ticket(_weight - desc._weight, _size - desc._size);
64}
65
20effc67 66fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) noexcept {
f67539c2
TL
67 _weight -= desc._weight;
68 _size -= desc._size;
69 return *this;
70}
71
20effc67
TL
72fair_queue_ticket::operator bool() const noexcept {
73 return (_weight > 0) || (_size > 0);
f67539c2
TL
74}
75
20effc67
TL
76bool fair_queue_ticket::operator==(const fair_queue_ticket& o) const noexcept {
77 return _weight == o._weight && _size == o._size;
f67539c2
TL
78}
79
80std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) {
81 return os << t._weight << ":" << t._size;
82}
83
20effc67
TL
84fair_group_rover::fair_group_rover(uint32_t weight, uint32_t size) noexcept
85 : _weight(weight)
86 , _size(size)
87{}
88
89fair_queue_ticket fair_group_rover::maybe_ahead_of(const fair_group_rover& other) const noexcept {
90 return fair_queue_ticket(std::max<int32_t>(_weight - other._weight, 0),
91 std::max<int32_t>(_size - other._size, 0));
92}
93
94fair_group_rover fair_group_rover::operator+(fair_queue_ticket t) const noexcept {
95 return fair_group_rover(_weight + t._weight, _size + t._size);
96}
97
98fair_group_rover& fair_group_rover::operator+=(fair_queue_ticket t) noexcept {
99 _weight += t._weight;
100 _size += t._size;
101 return *this;
102}
103
104std::ostream& operator<<(std::ostream& os, fair_group_rover r) {
105 return os << r._weight << ":" << r._size;
106}
107
108fair_group::fair_group(config cfg) noexcept
109 : _capacity_tail(fair_group_rover(0, 0))
110 , _capacity_head(fair_group_rover(cfg.max_req_count, cfg.max_bytes_count))
111 , _maximum_capacity(cfg.max_req_count, cfg.max_bytes_count)
112{
113 assert(!_capacity_tail.load(std::memory_order_relaxed)
114 .maybe_ahead_of(_capacity_head.load(std::memory_order_relaxed)));
115 seastar_logger.debug("Created fair group, capacity {}:{}", cfg.max_req_count, cfg.max_bytes_count);
116}
117
118fair_group_rover fair_group::grab_capacity(fair_queue_ticket cap) noexcept {
119 fair_group_rover cur = _capacity_tail.load(std::memory_order_relaxed);
120 while (!_capacity_tail.compare_exchange_weak(cur, cur + cap)) ;
121 return cur;
122}
123
124void fair_group::release_capacity(fair_queue_ticket cap) noexcept {
125 fair_group_rover cur = _capacity_head.load(std::memory_order_relaxed);
126 while (!_capacity_head.compare_exchange_weak(cur, cur + cap)) ;
127}
128
129// Priority class, to be used with a given fair_queue
130class fair_queue::priority_class_data {
131 using accumulator_t = double;
132 friend class fair_queue;
133 uint32_t _shares = 0;
134 accumulator_t _accumulated = 0;
135 fair_queue_entry::container_list_t _queue;
136 bool _queued = false;
137
138public:
139 explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {}
140
141 void update_shares(uint32_t shares) noexcept {
142 _shares = (std::max(shares, 1u));
143 }
144};
145
146bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept {
147 return lhs->_accumulated > rhs->_accumulated;
148}
149
150fair_queue::fair_queue(fair_group& group, config cfg)
f67539c2 151 : _config(std::move(cfg))
20effc67 152 , _group(group)
f67539c2 153 , _base(std::chrono::steady_clock::now())
20effc67
TL
154{
155 seastar_logger.debug("Created fair queue, ticket pace {}:{}", cfg.ticket_weight_pace, cfg.ticket_size_pace);
156}
157
158fair_queue::fair_queue(fair_queue&& other)
159 : _config(std::move(other._config))
160 , _group(other._group)
161 , _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{}))
162 , _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{}))
163 , _requests_executing(std::exchange(other._requests_executing, 0))
164 , _requests_queued(std::exchange(other._requests_queued, 0))
165 , _base(other._base)
166 , _handles(std::move(other._handles))
167 , _priority_classes(std::move(other._priority_classes))
168{
169}
f67539c2 170
20effc67
TL
171fair_queue::~fair_queue() {
172 for (const auto& fq : _priority_classes) {
173 assert(!fq);
9f95a23c
TL
174 }
175}
176
20effc67
TL
177void fair_queue::push_priority_class(priority_class_data& pc) {
178 if (!pc._queued) {
179 _handles.push(&pc);
180 pc._queued = true;
181 }
9f95a23c
TL
182}
183
20effc67
TL
184void fair_queue::pop_priority_class(priority_class_data& pc) {
185 assert(pc._queued);
186 pc._queued = false;
187 _handles.pop();
9f95a23c
TL
188}
189
190void fair_queue::normalize_stats() {
20effc67
TL
191 _base = std::chrono::steady_clock::now() - _config.tau;
192 for (auto& pc: _priority_classes) {
193 if (pc) {
194 pc->_accumulated *= std::numeric_limits<priority_class_data::accumulator_t>::min();
195 }
196 }
197}
198
199std::chrono::microseconds fair_queue_ticket::duration_at_pace(float weight_pace, float size_pace) const noexcept {
200 unsigned long dur = ((_weight * weight_pace) + (_size * size_pace));
201 return std::chrono::microseconds(dur);
202}
203
204bool fair_queue::grab_pending_capacity(fair_queue_ticket cap) noexcept {
205 fair_group_rover pending_head = _pending->orig_tail + cap;
206 if (pending_head.maybe_ahead_of(_group.head())) {
207 return false;
208 }
209
210 if (cap == _pending->cap) {
211 _pending.reset();
212 } else {
213 /*
214 * This branch is called when the fair queue decides to
215 * submit not the same request that entered it into the
216 * pending state and this new request crawls through the
217 * expected head value.
218 */
219 _group.grab_capacity(cap);
220 _pending->orig_tail += cap;
221 }
222
223 return true;
224}
225
226bool fair_queue::grab_capacity(fair_queue_ticket cap) noexcept {
227 if (_pending) {
228 return grab_pending_capacity(cap);
229 }
230
231 fair_group_rover orig_tail = _group.grab_capacity(cap);
232 if ((orig_tail + cap).maybe_ahead_of(_group.head())) {
233 _pending.emplace(orig_tail, cap);
234 return false;
9f95a23c 235 }
20effc67
TL
236
237 return true;
9f95a23c
TL
238}
239
20effc67
TL
240void fair_queue::register_priority_class(class_id id, uint32_t shares) {
241 if (id >= _priority_classes.size()) {
242 _priority_classes.resize(id + 1);
243 } else {
244 assert(!_priority_classes[id]);
245 }
246
247 _priority_classes[id] = std::make_unique<priority_class_data>(shares);
9f95a23c
TL
248}
249
20effc67
TL
250void fair_queue::unregister_priority_class(class_id id) {
251 auto& pclass = _priority_classes[id];
252 assert(pclass && pclass->_queue.empty());
253 pclass.reset();
9f95a23c
TL
254}
255
20effc67
TL
256void fair_queue::update_shares_for_class(class_id id, uint32_t shares) {
257 assert(id < _priority_classes.size());
258 auto& pc = _priority_classes[id];
259 assert(pc);
260 pc->update_shares(shares);
9f95a23c
TL
261}
262
263size_t fair_queue::waiters() const {
264 return _requests_queued;
265}
266
267size_t fair_queue::requests_currently_executing() const {
268 return _requests_executing;
269}
270
f67539c2
TL
271fair_queue_ticket fair_queue::resources_currently_waiting() const {
272 return _resources_queued;
273}
274
275fair_queue_ticket fair_queue::resources_currently_executing() const {
276 return _resources_executing;
277}
278
20effc67
TL
279void fair_queue::queue(class_id id, fair_queue_entry& ent) {
280 priority_class_data& pc = *_priority_classes[id];
9f95a23c
TL
281 // We need to return a future in this function on which the caller can wait.
282 // Since we don't know which queue we will use to execute the next request - if ours or
283 // someone else's, we need a separate promise at this point.
284 push_priority_class(pc);
20effc67
TL
285 pc._queue.push_back(ent);
286 _resources_queued += ent._ticket;
9f95a23c
TL
287 _requests_queued++;
288}
289
20effc67 290void fair_queue::notify_request_finished(fair_queue_ticket desc) noexcept {
f67539c2 291 _resources_executing -= desc;
20effc67
TL
292 _requests_executing--;
293 _group.release_capacity(desc);
9f95a23c
TL
294}
295
20effc67
TL
296void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept {
297 _resources_queued -= ent._ticket;
298 ent._ticket = fair_queue_ticket();
299}
300
301void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) {
302 while (!_handles.empty()) {
303 priority_class_data& h = *_handles.top();
304 if (h._queue.empty()) {
305 pop_priority_class(h);
306 continue;
307 }
9f95a23c 308
20effc67
TL
309 auto& req = h._queue.front();
310 if (!grab_capacity(req._ticket)) {
311 break;
312 }
313
314 pop_priority_class(h);
315 h._queue.pop_front();
316
317 _resources_executing += req._ticket;
318 _resources_queued -= req._ticket;
9f95a23c 319 _requests_executing++;
9f95a23c
TL
320 _requests_queued--;
321
322 auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
20effc67
TL
323 auto req_cost = req._ticket.normalize(_group.maximum_capacity()) / h._shares;
324 auto cost = exp(priority_class_data::accumulator_t(1.0f/_config.tau.count() * delta.count())) * req_cost;
325 priority_class_data::accumulator_t next_accumulated = h._accumulated + cost;
9f95a23c
TL
326 while (std::isinf(next_accumulated)) {
327 normalize_stats();
328 // If we have renormalized, our time base will have changed. This should happen very infrequently
329 delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
20effc67
TL
330 cost = exp(priority_class_data::accumulator_t(1.0f/_config.tau.count() * delta.count())) * req_cost;
331 next_accumulated = h._accumulated + cost;
9f95a23c 332 }
20effc67 333 h._accumulated = next_accumulated;
9f95a23c 334
20effc67 335 if (!h._queue.empty()) {
9f95a23c
TL
336 push_priority_class(h);
337 }
20effc67
TL
338
339 cb(req);
9f95a23c
TL
340 }
341}
342
9f95a23c 343}