]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #include <seastar/core/fair_queue.hh> | |
23 | #include <seastar/core/future.hh> | |
24 | #include <seastar/core/shared_ptr.hh> | |
25 | #include <seastar/core/circular_buffer.hh> | |
26 | #include <seastar/util/noncopyable_function.hh> | |
27 | #include <queue> | |
28 | #include <chrono> | |
29 | #include <unordered_set> | |
30 | #include <cmath> | |
31 | ||
f67539c2 TL |
32 | #include "fmt/format.h" |
33 | #include "fmt/ostream.h" | |
34 | ||
9f95a23c TL |
35 | namespace seastar { |
36 | ||
f67539c2 TL |
37 | static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size"); |
38 | ||
39 | fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size) | |
40 | : _weight(weight) | |
41 | , _size(size) | |
42 | {} | |
43 | ||
44 | float fair_queue_ticket::normalize(fair_queue_ticket denominator) const { | |
45 | return float(_weight) / denominator._weight + float(_size) / denominator._size; | |
46 | } | |
47 | ||
48 | fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const { | |
49 | return fair_queue_ticket(_weight + desc._weight, _size + desc._size); | |
50 | } | |
51 | ||
52 | fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) { | |
53 | _weight += desc._weight; | |
54 | _size += desc._size; | |
55 | return *this; | |
56 | } | |
57 | ||
58 | fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const { | |
59 | return fair_queue_ticket(_weight - desc._weight, _size - desc._size); | |
60 | } | |
61 | ||
62 | fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) { | |
63 | _weight -= desc._weight; | |
64 | _size -= desc._size; | |
65 | return *this; | |
66 | } | |
67 | ||
68 | bool fair_queue_ticket::strictly_less(fair_queue_ticket rhs) const { | |
69 | return (_weight < rhs._weight) && (_size < rhs._size); | |
70 | } | |
71 | ||
72 | fair_queue_ticket::operator bool() const { | |
73 | return (_weight > 0) || (_size > 0); | |
74 | } | |
75 | ||
76 | std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) { | |
77 | return os << t._weight << ":" << t._size; | |
78 | } | |
79 | ||
80 | fair_queue::fair_queue(config cfg) | |
81 | : _config(std::move(cfg)) | |
82 | , _maximum_capacity(_config.max_req_count, _config.max_bytes_count) | |
83 | , _current_capacity(_config.max_req_count, _config.max_bytes_count) | |
84 | , _base(std::chrono::steady_clock::now()) | |
85 | {} | |
86 | ||
9f95a23c TL |
87 | void fair_queue::push_priority_class(priority_class_ptr pc) { |
88 | if (!pc->_queued) { | |
89 | _handles.push(pc); | |
90 | pc->_queued = true; | |
91 | } | |
92 | } | |
93 | ||
94 | priority_class_ptr fair_queue::pop_priority_class() { | |
95 | assert(!_handles.empty()); | |
96 | auto h = _handles.top(); | |
97 | _handles.pop(); | |
98 | assert(h->_queued); | |
99 | h->_queued = false; | |
100 | return h; | |
101 | } | |
102 | ||
103 | float fair_queue::normalize_factor() const { | |
104 | return std::numeric_limits<float>::min(); | |
105 | } | |
106 | ||
107 | void fair_queue::normalize_stats() { | |
108 | auto time_delta = std::log(normalize_factor()) * _config.tau; | |
109 | // time_delta is negative; and this may advance _base into the future | |
110 | _base -= std::chrono::duration_cast<clock_type::duration>(time_delta); | |
111 | for (auto& pc: _all_classes) { | |
112 | pc->_accumulated *= normalize_factor(); | |
113 | } | |
114 | } | |
115 | ||
116 | bool fair_queue::can_dispatch() const { | |
f67539c2 | 117 | return _resources_queued && (_resources_executing.strictly_less(_current_capacity)); |
9f95a23c TL |
118 | } |
119 | ||
120 | priority_class_ptr fair_queue::register_priority_class(uint32_t shares) { | |
121 | priority_class_ptr pclass = make_lw_shared<priority_class>(shares); | |
122 | _all_classes.insert(pclass); | |
123 | return pclass; | |
124 | } | |
125 | ||
126 | void fair_queue::unregister_priority_class(priority_class_ptr pclass) { | |
127 | assert(pclass->_queue.empty()); | |
128 | _all_classes.erase(pclass); | |
129 | } | |
130 | ||
131 | size_t fair_queue::waiters() const { | |
132 | return _requests_queued; | |
133 | } | |
134 | ||
135 | size_t fair_queue::requests_currently_executing() const { | |
136 | return _requests_executing; | |
137 | } | |
138 | ||
f67539c2 TL |
139 | fair_queue_ticket fair_queue::resources_currently_waiting() const { |
140 | return _resources_queued; | |
141 | } | |
142 | ||
143 | fair_queue_ticket fair_queue::resources_currently_executing() const { | |
144 | return _resources_executing; | |
145 | } | |
146 | ||
147 | void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyable_function<void()> func) { | |
9f95a23c TL |
148 | // We need to return a future in this function on which the caller can wait. |
149 | // Since we don't know which queue we will use to execute the next request - if ours or | |
150 | // someone else's, we need a separate promise at this point. | |
151 | push_priority_class(pc); | |
152 | pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)}); | |
f67539c2 | 153 | _resources_queued += desc; |
9f95a23c TL |
154 | _requests_queued++; |
155 | } | |
156 | ||
f67539c2 TL |
157 | void fair_queue::notify_requests_finished(fair_queue_ticket desc, unsigned nr) noexcept { |
158 | _resources_executing -= desc; | |
159 | _requests_executing -= nr; | |
9f95a23c TL |
160 | } |
161 | ||
9f95a23c TL |
162 | void fair_queue::dispatch_requests() { |
163 | while (can_dispatch()) { | |
164 | priority_class_ptr h; | |
165 | do { | |
166 | h = pop_priority_class(); | |
167 | } while (h->_queue.empty()); | |
168 | ||
169 | auto req = std::move(h->_queue.front()); | |
170 | h->_queue.pop_front(); | |
f67539c2 TL |
171 | _resources_executing += req.desc;; |
172 | _resources_queued -= req.desc; | |
9f95a23c | 173 | _requests_executing++; |
9f95a23c TL |
174 | _requests_queued--; |
175 | ||
176 | auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); | |
f67539c2 | 177 | auto req_cost = req.desc.normalize(_maximum_capacity) / h->_shares; |
9f95a23c TL |
178 | auto cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost; |
179 | float next_accumulated = h->_accumulated + cost; | |
180 | while (std::isinf(next_accumulated)) { | |
181 | normalize_stats(); | |
182 | // If we have renormalized, our time base will have changed. This should happen very infrequently | |
183 | delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); | |
184 | cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost; | |
185 | next_accumulated = h->_accumulated + cost; | |
186 | } | |
187 | h->_accumulated = next_accumulated; | |
188 | ||
189 | if (!h->_queue.empty()) { | |
190 | push_priority_class(h); | |
191 | } | |
192 | req.func(); | |
193 | } | |
194 | } | |
195 | ||
9f95a23c | 196 | } |