2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
22 #include <seastar/core/fair_queue.hh>
23 #include <seastar/core/future.hh>
24 #include <seastar/core/shared_ptr.hh>
25 #include <seastar/core/circular_buffer.hh>
26 #include <seastar/util/noncopyable_function.hh>
29 #include <unordered_set>
34 void fair_queue::push_priority_class(priority_class_ptr pc
) {
41 priority_class_ptr
fair_queue::pop_priority_class() {
42 assert(!_handles
.empty());
43 auto h
= _handles
.top();
50 float fair_queue::normalize_factor() const {
51 return std::numeric_limits
<float>::min();
54 void fair_queue::normalize_stats() {
55 auto time_delta
= std::log(normalize_factor()) * _config
.tau
;
56 // time_delta is negative; and this may advance _base into the future
57 _base
-= std::chrono::duration_cast
<clock_type::duration
>(time_delta
);
58 for (auto& pc
: _all_classes
) {
59 pc
->_accumulated
*= normalize_factor();
63 bool fair_queue::can_dispatch() const {
64 return _requests_queued
&&
65 (_requests_executing
< _config
.capacity
) &&
66 (_req_count_executing
< _config
.max_req_count
) &&
67 (_bytes_count_executing
< _config
.max_bytes_count
);
70 priority_class_ptr
fair_queue::register_priority_class(uint32_t shares
) {
71 priority_class_ptr pclass
= make_lw_shared
<priority_class
>(shares
);
72 _all_classes
.insert(pclass
);
76 void fair_queue::unregister_priority_class(priority_class_ptr pclass
) {
77 assert(pclass
->_queue
.empty());
78 _all_classes
.erase(pclass
);
81 size_t fair_queue::waiters() const {
82 return _requests_queued
;
85 size_t fair_queue::requests_currently_executing() const {
86 return _requests_executing
;
89 void fair_queue::queue(priority_class_ptr pc
, fair_queue_request_descriptor desc
, noncopyable_function
<void()> func
) {
90 // We need to return a future in this function on which the caller can wait.
91 // Since we don't know which queue we will use to execute the next request - if ours or
92 // someone else's, we need a separate promise at this point.
93 push_priority_class(pc
);
94 pc
->_queue
.push_back(priority_class::request
{std::move(func
), std::move(desc
)});
98 void fair_queue::notify_requests_finished(fair_queue_request_descriptor
& desc
) {
99 _requests_executing
--;
100 _req_count_executing
-= desc
.weight
;
101 _bytes_count_executing
-= desc
.size
;
105 void fair_queue::dispatch_requests() {
106 while (can_dispatch()) {
107 priority_class_ptr h
;
109 h
= pop_priority_class();
110 } while (h
->_queue
.empty());
112 auto req
= std::move(h
->_queue
.front());
113 h
->_queue
.pop_front();
114 _requests_executing
++;
115 _req_count_executing
+= req
.desc
.weight
;
116 _bytes_count_executing
+= req
.desc
.size
;
119 auto delta
= std::chrono::duration_cast
<std::chrono::microseconds
>(std::chrono::steady_clock::now() - _base
);
120 auto req_cost
= (float(req
.desc
.weight
) / _config
.max_req_count
+ float(req
.desc
.size
) / _config
.max_bytes_count
) / h
->_shares
;
121 auto cost
= expf(1.0f
/_config
.tau
.count() * delta
.count()) * req_cost
;
122 float next_accumulated
= h
->_accumulated
+ cost
;
123 while (std::isinf(next_accumulated
)) {
125 // If we have renormalized, our time base will have changed. This should happen very infrequently
126 delta
= std::chrono::duration_cast
<std::chrono::microseconds
>(std::chrono::steady_clock::now() - _base
);
127 cost
= expf(1.0f
/_config
.tau
.count() * delta
.count()) * req_cost
;
128 next_accumulated
= h
->_accumulated
+ cost
;
130 h
->_accumulated
= next_accumulated
;
132 if (!h
->_queue
.empty()) {
133 push_priority_class(h
);
139 void fair_queue::update_shares(priority_class_ptr pc
, uint32_t new_shares
) {
140 pc
->update_shares(new_shares
);