2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
22 #include <boost/container/small_vector.hpp>
23 #include <boost/intrusive/parent_from_member.hpp>
24 #include <seastar/core/fair_queue.hh>
25 #include <seastar/core/future.hh>
26 #include <seastar/core/shared_ptr.hh>
27 #include <seastar/core/circular_buffer.hh>
28 #include <seastar/util/noncopyable_function.hh>
29 #include <seastar/core/reactor.hh>
30 #include <seastar/core/metrics.hh>
33 #include <unordered_set>
35 #include "fmt/format.h"
36 #include "fmt/ostream.h"
40 static_assert(sizeof(fair_queue_ticket
) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
41 static_assert(sizeof(fair_queue_entry
) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size");
42 static_assert(sizeof(fair_queue_entry::container_list_t
) == 2 * sizeof(void*), "unexpected priority_class::_queue size");
44 fair_queue_ticket::fair_queue_ticket(uint32_t weight
, uint32_t size
) noexcept
49 float fair_queue_ticket::normalize(fair_queue_ticket denominator
) const noexcept
{
50 return float(_weight
) / denominator
._weight
+ float(_size
) / denominator
._size
;
53 fair_queue_ticket
fair_queue_ticket::operator+(fair_queue_ticket desc
) const noexcept
{
54 return fair_queue_ticket(_weight
+ desc
._weight
, _size
+ desc
._size
);
57 fair_queue_ticket
& fair_queue_ticket::operator+=(fair_queue_ticket desc
) noexcept
{
58 _weight
+= desc
._weight
;
63 fair_queue_ticket
fair_queue_ticket::operator-(fair_queue_ticket desc
) const noexcept
{
64 return fair_queue_ticket(_weight
- desc
._weight
, _size
- desc
._size
);
67 fair_queue_ticket
& fair_queue_ticket::operator-=(fair_queue_ticket desc
) noexcept
{
68 _weight
-= desc
._weight
;
73 fair_queue_ticket::operator bool() const noexcept
{
74 return (_weight
> 0) || (_size
> 0);
77 bool fair_queue_ticket::is_non_zero() const noexcept
{
78 return (_weight
> 0) && (_size
> 0);
81 bool fair_queue_ticket::operator==(const fair_queue_ticket
& o
) const noexcept
{
82 return _weight
== o
._weight
&& _size
== o
._size
;
85 std::ostream
& operator<<(std::ostream
& os
, fair_queue_ticket t
) {
86 return os
<< t
._weight
<< ":" << t
._size
;
89 fair_queue_ticket
wrapping_difference(const fair_queue_ticket
& a
, const fair_queue_ticket
& b
) noexcept
{
90 return fair_queue_ticket(std::max
<int32_t>(a
._weight
- b
._weight
, 0),
91 std::max
<int32_t>(a
._size
- b
._size
, 0));
94 fair_group::fair_group(config cfg
)
95 : _cost_capacity(cfg
.weight_rate
/ token_bucket_t::rate_cast(std::chrono::seconds(1)).count(), cfg
.size_rate
/ token_bucket_t::rate_cast(std::chrono::seconds(1)).count())
96 , _token_bucket(cfg
.rate_factor
* fixed_point_factor
,
97 std::max
<capacity_t
>(cfg
.rate_factor
* fixed_point_factor
* token_bucket_t::rate_cast(cfg
.rate_limit_duration
).count(), ticket_capacity(fair_queue_ticket(cfg
.limit_min_weight
, cfg
.limit_min_size
))),
98 ticket_capacity(fair_queue_ticket(cfg
.min_weight
, cfg
.min_size
))
101 assert(_cost_capacity
.is_non_zero());
102 seastar_logger
.info("Created fair group {}, capacity rate {}, limit {}, rate {} (factor {}), threshold {}", cfg
.label
,
103 _cost_capacity
, _token_bucket
.limit(), _token_bucket
.rate(), cfg
.rate_factor
, _token_bucket
.threshold());
105 if (cfg
.rate_factor
* fixed_point_factor
> _token_bucket
.max_rate
) {
106 throw std::runtime_error("Fair-group rate_factor is too large");
109 if (ticket_capacity(fair_queue_ticket(cfg
.min_weight
, cfg
.min_size
)) > _token_bucket
.threshold()) {
110 throw std::runtime_error("Fair-group replenisher limit is lower than threshold");
114 auto fair_group::grab_capacity(capacity_t cap
) noexcept
-> capacity_t
{
115 assert(cap
<= _token_bucket
.limit());
116 return _token_bucket
.grab(cap
);
119 void fair_group::release_capacity(capacity_t cap
) noexcept
{
120 _token_bucket
.release(cap
);
123 void fair_group::replenish_capacity(clock_type::time_point now
) noexcept
{
124 _token_bucket
.replenish(now
);
127 void fair_group::maybe_replenish_capacity(clock_type::time_point
& local_ts
) noexcept
{
128 auto now
= clock_type::now();
129 auto extra
= _token_bucket
.accumulated_in(now
- local_ts
);
131 if (extra
>= _token_bucket
.threshold()) {
133 replenish_capacity(now
);
137 auto fair_group::capacity_deficiency(capacity_t from
) const noexcept
-> capacity_t
{
138 return _token_bucket
.deficiency(from
);
141 auto fair_group::ticket_capacity(fair_queue_ticket t
) const noexcept
-> capacity_t
{
142 return t
.normalize(_cost_capacity
) * fixed_point_factor
;
145 // Priority class, to be used with a given fair_queue
146 class fair_queue::priority_class_data
{
147 friend class fair_queue
;
148 uint32_t _shares
= 0;
149 capacity_t _accumulated
= 0;
150 capacity_t _pure_accumulated
= 0;
151 fair_queue_entry::container_list_t _queue
;
152 bool _queued
= false;
153 bool _plugged
= true;
156 explicit priority_class_data(uint32_t shares
) noexcept
: _shares(std::max(shares
, 1u)) {}
157 priority_class_data(const priority_class_data
&) = delete;
158 priority_class_data(priority_class_data
&&) = delete;
160 void update_shares(uint32_t shares
) noexcept
{
161 _shares
= (std::max(shares
, 1u));
165 bool fair_queue::class_compare::operator() (const priority_class_ptr
& lhs
, const priority_class_ptr
& rhs
) const noexcept
{
166 return lhs
->_accumulated
> rhs
->_accumulated
;
169 fair_queue::fair_queue(fair_group
& group
, config cfg
)
170 : _config(std::move(cfg
))
172 , _group_replenish(clock_type::now())
176 fair_queue::fair_queue(fair_queue
&& other
)
177 : _config(std::move(other
._config
))
178 , _group(other
._group
)
179 , _group_replenish(std::move(other
._group_replenish
))
180 , _resources_executing(std::exchange(other
._resources_executing
, fair_queue_ticket
{}))
181 , _resources_queued(std::exchange(other
._resources_queued
, fair_queue_ticket
{}))
182 , _requests_executing(std::exchange(other
._requests_executing
, 0))
183 , _requests_queued(std::exchange(other
._requests_queued
, 0))
184 , _handles(std::move(other
._handles
))
185 , _priority_classes(std::move(other
._priority_classes
))
186 , _last_accumulated(other
._last_accumulated
)
190 fair_queue::~fair_queue() {
191 for (const auto& fq
: _priority_classes
) {
196 void fair_queue::push_priority_class(priority_class_data
& pc
) noexcept
{
197 assert(pc
._plugged
&& !pc
._queued
);
198 _handles
.assert_enough_capacity();
203 void fair_queue::push_priority_class_from_idle(priority_class_data
& pc
) noexcept
{
205 // Don't let the newcomer monopolize the disk for more than tau
206 // duration. For this estimate how many capacity units can be
207 // accumulated with the current class shares per rate resulution
208 // and scale it up to tau.
209 capacity_t max_deviation
= fair_group::fixed_point_factor
/ pc
._shares
* fair_group::token_bucket_t::rate_cast(_config
.tau
).count();
210 // On start this deviation can go to negative values, so not to
211 // introduce extra if's for that short corner case, use signed
212 // arithmetics and make sure the _accumulated value doesn't grow
213 // over signed maximum (see overflow check below)
214 pc
._accumulated
= std::max
<signed_capacity_t
>(_last_accumulated
- max_deviation
, pc
._accumulated
);
215 _handles
.assert_enough_capacity();
221 void fair_queue::pop_priority_class(priority_class_data
& pc
) noexcept
{
222 assert(pc
._plugged
&& pc
._queued
);
227 void fair_queue::plug_priority_class(priority_class_data
& pc
) noexcept
{
228 assert(!pc
._plugged
&& !pc
._queued
);
230 if (!pc
._queue
.empty()) {
231 push_priority_class_from_idle(pc
);
235 void fair_queue::plug_class(class_id cid
) noexcept
{
236 plug_priority_class(*_priority_classes
[cid
]);
239 void fair_queue::unplug_priority_class(priority_class_data
& pc
) noexcept
{
242 pop_priority_class(pc
);
247 void fair_queue::unplug_class(class_id cid
) noexcept
{
248 unplug_priority_class(*_priority_classes
[cid
]);
251 auto fair_queue::grab_pending_capacity(const fair_queue_entry
& ent
) noexcept
-> grab_result
{
252 _group
.maybe_replenish_capacity(_group_replenish
);
254 if (_group
.capacity_deficiency(_pending
->head
)) {
255 return grab_result::pending
;
258 capacity_t cap
= _group
.ticket_capacity(ent
._ticket
);
259 if (cap
> _pending
->cap
) {
260 return grab_result::cant_preempt
;
263 if (cap
< _pending
->cap
) {
264 _group
.release_capacity(_pending
->cap
- cap
); // FIXME -- replenish right at once?
268 return grab_result::grabbed
;
271 auto fair_queue::grab_capacity(const fair_queue_entry
& ent
) noexcept
-> grab_result
{
273 return grab_pending_capacity(ent
);
276 capacity_t cap
= _group
.ticket_capacity(ent
._ticket
);
277 capacity_t want_head
= _group
.grab_capacity(cap
);
278 if (_group
.capacity_deficiency(want_head
)) {
279 _pending
.emplace(want_head
, cap
);
280 return grab_result::pending
;
283 return grab_result::grabbed
;
286 void fair_queue::register_priority_class(class_id id
, uint32_t shares
) {
287 if (id
>= _priority_classes
.size()) {
288 _priority_classes
.resize(id
+ 1);
290 assert(!_priority_classes
[id
]);
293 _handles
.reserve(_nr_classes
+ 1);
294 _priority_classes
[id
] = std::make_unique
<priority_class_data
>(shares
);
298 void fair_queue::unregister_priority_class(class_id id
) {
299 auto& pclass
= _priority_classes
[id
];
300 assert(pclass
&& pclass
->_queue
.empty());
305 void fair_queue::update_shares_for_class(class_id id
, uint32_t shares
) {
306 assert(id
< _priority_classes
.size());
307 auto& pc
= _priority_classes
[id
];
309 pc
->update_shares(shares
);
312 size_t fair_queue::waiters() const {
313 return _requests_queued
;
316 size_t fair_queue::requests_currently_executing() const {
317 return _requests_executing
;
320 fair_queue_ticket
fair_queue::resources_currently_waiting() const {
321 return _resources_queued
;
324 fair_queue_ticket
fair_queue::resources_currently_executing() const {
325 return _resources_executing
;
328 void fair_queue::queue(class_id id
, fair_queue_entry
& ent
) noexcept
{
329 priority_class_data
& pc
= *_priority_classes
[id
];
330 // We need to return a future in this function on which the caller can wait.
331 // Since we don't know which queue we will use to execute the next request - if ours or
332 // someone else's, we need a separate promise at this point.
334 push_priority_class_from_idle(pc
);
336 pc
._queue
.push_back(ent
);
337 _resources_queued
+= ent
._ticket
;
341 void fair_queue::notify_request_finished(fair_queue_ticket desc
) noexcept
{
342 _resources_executing
-= desc
;
343 _requests_executing
--;
344 _group
.release_capacity(_group
.ticket_capacity(desc
));
347 void fair_queue::notify_request_cancelled(fair_queue_entry
& ent
) noexcept
{
348 _resources_queued
-= ent
._ticket
;
349 ent
._ticket
= fair_queue_ticket();
352 fair_queue::clock_type::time_point
fair_queue::next_pending_aio() const noexcept
{
355 * We expect the disk to release the ticket within some time,
356 * but it's ... OK if it doesn't -- the pending wait still
357 * needs the head rover value to be ahead of the needed value.
359 * It may happen that the capacity gets released before we think
360 * it will, in this case we will wait for the full value again,
361 * which's sub-optimal. The expectation is that we think disk
362 * works faster, than it really does.
364 auto over
= _group
.capacity_deficiency(_pending
->head
);
365 auto ticks
= _group
.capacity_duration(over
);
366 return std::chrono::steady_clock::now() + std::chrono::duration_cast
<std::chrono::microseconds
>(ticks
);
369 return std::chrono::steady_clock::time_point::max();
372 void fair_queue::dispatch_requests(std::function
<void(fair_queue_entry
&)> cb
) {
373 capacity_t dispatched
= 0;
374 boost::container::small_vector
<priority_class_ptr
, 2> preempt
;
376 while (!_handles
.empty() && (dispatched
< _group
.maximum_capacity() / smp::count
)) {
377 priority_class_data
& h
= *_handles
.top();
378 if (h
._queue
.empty()) {
379 pop_priority_class(h
);
383 auto& req
= h
._queue
.front();
384 auto gr
= grab_capacity(req
);
385 if (gr
== grab_result::pending
) {
389 if (gr
== grab_result::cant_preempt
) {
390 pop_priority_class(h
);
391 preempt
.emplace_back(&h
);
395 _last_accumulated
= std::max(h
._accumulated
, _last_accumulated
);
396 pop_priority_class(h
);
397 h
._queue
.pop_front();
399 _resources_executing
+= req
._ticket
;
400 _resources_queued
-= req
._ticket
;
401 _requests_executing
++;
404 // Usually the cost of request is tens to hundreeds of thousands. However, for
405 // unrestricted queue it can be as low as 2k. With large enough shares this
406 // has chances to be translated into zero cost which, in turn, will make the
407 // class show no progress and monopolize the queue.
408 auto req_cap
= _group
.ticket_capacity(req
._ticket
);
409 auto req_cost
= std::max(req_cap
/ h
._shares
, (capacity_t
)1);
410 // signed overflow check to make push_priority_class_from_idle math work
411 if (h
._accumulated
>= std::numeric_limits
<signed_capacity_t
>::max() - req_cost
) {
412 for (auto& pc
: _priority_classes
) {
415 pc
->_accumulated
-= h
._accumulated
;
416 } else { // this includes h
417 pc
->_accumulated
= 0;
421 _last_accumulated
= 0;
423 h
._accumulated
+= req_cost
;
424 h
._pure_accumulated
+= req_cap
;
426 dispatched
+= _group
.ticket_capacity(req
._ticket
);
429 if (h
._plugged
&& !h
._queue
.empty()) {
430 push_priority_class(h
);
434 for (auto&& h
: preempt
) {
435 push_priority_class(*h
);
439 std::vector
<seastar::metrics::impl::metric_definition_impl
> fair_queue::metrics(class_id c
) {
440 namespace sm
= seastar::metrics
;
441 priority_class_data
& pc
= *_priority_classes
[c
];
442 return std::vector
<sm::impl::metric_definition_impl
>({
443 sm::make_counter("consumption",
444 [&pc
] { return fair_group::capacity_tokens(pc
._pure_accumulated
); },
445 sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")),
446 sm::make_counter("adjusted_consumption",
447 [&pc
] { return fair_group::capacity_tokens(pc
._accumulated
); },
448 sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")),