]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #include <boost/container/small_vector.hpp> | |
23 | #include <boost/intrusive/parent_from_member.hpp> | |
24 | #include <seastar/core/fair_queue.hh> | |
25 | #include <seastar/core/future.hh> | |
26 | #include <seastar/core/shared_ptr.hh> | |
27 | #include <seastar/core/circular_buffer.hh> | |
28 | #include <seastar/util/noncopyable_function.hh> | |
29 | #include <seastar/core/reactor.hh> | |
30 | #include <seastar/core/metrics.hh> | |
31 | #include <queue> | |
32 | #include <chrono> | |
33 | #include <unordered_set> | |
34 | ||
35 | #include "fmt/format.h" | |
36 | #include "fmt/ostream.h" | |
37 | ||
38 | namespace seastar { | |
39 | ||
40 | static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size"); | |
41 | static_assert(sizeof(fair_queue_entry) <= 3 * sizeof(void*), "unexpected fair_queue_entry::_hook size"); | |
42 | static_assert(sizeof(fair_queue_entry::container_list_t) == 2 * sizeof(void*), "unexpected priority_class::_queue size"); | |
43 | ||
44 | fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size) noexcept | |
45 | : _weight(weight) | |
46 | , _size(size) | |
47 | {} | |
48 | ||
49 | float fair_queue_ticket::normalize(fair_queue_ticket denominator) const noexcept { | |
50 | return float(_weight) / denominator._weight + float(_size) / denominator._size; | |
51 | } | |
52 | ||
53 | fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const noexcept { | |
54 | return fair_queue_ticket(_weight + desc._weight, _size + desc._size); | |
55 | } | |
56 | ||
57 | fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) noexcept { | |
58 | _weight += desc._weight; | |
59 | _size += desc._size; | |
60 | return *this; | |
61 | } | |
62 | ||
63 | fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const noexcept { | |
64 | return fair_queue_ticket(_weight - desc._weight, _size - desc._size); | |
65 | } | |
66 | ||
67 | fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) noexcept { | |
68 | _weight -= desc._weight; | |
69 | _size -= desc._size; | |
70 | return *this; | |
71 | } | |
72 | ||
73 | fair_queue_ticket::operator bool() const noexcept { | |
74 | return (_weight > 0) || (_size > 0); | |
75 | } | |
76 | ||
77 | bool fair_queue_ticket::is_non_zero() const noexcept { | |
78 | return (_weight > 0) && (_size > 0); | |
79 | } | |
80 | ||
81 | bool fair_queue_ticket::operator==(const fair_queue_ticket& o) const noexcept { | |
82 | return _weight == o._weight && _size == o._size; | |
83 | } | |
84 | ||
85 | std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) { | |
86 | return os << t._weight << ":" << t._size; | |
87 | } | |
88 | ||
89 | fair_queue_ticket wrapping_difference(const fair_queue_ticket& a, const fair_queue_ticket& b) noexcept { | |
90 | return fair_queue_ticket(std::max<int32_t>(a._weight - b._weight, 0), | |
91 | std::max<int32_t>(a._size - b._size, 0)); | |
92 | } | |
93 | ||
94 | fair_group::fair_group(config cfg) | |
95 | : _cost_capacity(cfg.weight_rate / token_bucket_t::rate_cast(std::chrono::seconds(1)).count(), cfg.size_rate / token_bucket_t::rate_cast(std::chrono::seconds(1)).count()) | |
96 | , _token_bucket(cfg.rate_factor * fixed_point_factor, | |
97 | std::max<capacity_t>(cfg.rate_factor * fixed_point_factor * token_bucket_t::rate_cast(cfg.rate_limit_duration).count(), ticket_capacity(fair_queue_ticket(cfg.limit_min_weight, cfg.limit_min_size))), | |
98 | ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size)) | |
99 | ) | |
100 | { | |
101 | assert(_cost_capacity.is_non_zero()); | |
102 | seastar_logger.info("Created fair group {}, capacity rate {}, limit {}, rate {} (factor {}), threshold {}", cfg.label, | |
103 | _cost_capacity, _token_bucket.limit(), _token_bucket.rate(), cfg.rate_factor, _token_bucket.threshold()); | |
104 | ||
105 | if (cfg.rate_factor * fixed_point_factor > _token_bucket.max_rate) { | |
106 | throw std::runtime_error("Fair-group rate_factor is too large"); | |
107 | } | |
108 | ||
109 | if (ticket_capacity(fair_queue_ticket(cfg.min_weight, cfg.min_size)) > _token_bucket.threshold()) { | |
110 | throw std::runtime_error("Fair-group replenisher limit is lower than threshold"); | |
111 | } | |
112 | } | |
113 | ||
114 | auto fair_group::grab_capacity(capacity_t cap) noexcept -> capacity_t { | |
115 | assert(cap <= _token_bucket.limit()); | |
116 | return _token_bucket.grab(cap); | |
117 | } | |
118 | ||
119 | void fair_group::release_capacity(capacity_t cap) noexcept { | |
120 | _token_bucket.release(cap); | |
121 | } | |
122 | ||
123 | void fair_group::replenish_capacity(clock_type::time_point now) noexcept { | |
124 | _token_bucket.replenish(now); | |
125 | } | |
126 | ||
127 | void fair_group::maybe_replenish_capacity(clock_type::time_point& local_ts) noexcept { | |
128 | auto now = clock_type::now(); | |
129 | auto extra = _token_bucket.accumulated_in(now - local_ts); | |
130 | ||
131 | if (extra >= _token_bucket.threshold()) { | |
132 | local_ts = now; | |
133 | replenish_capacity(now); | |
134 | } | |
135 | } | |
136 | ||
137 | auto fair_group::capacity_deficiency(capacity_t from) const noexcept -> capacity_t { | |
138 | return _token_bucket.deficiency(from); | |
139 | } | |
140 | ||
141 | auto fair_group::ticket_capacity(fair_queue_ticket t) const noexcept -> capacity_t { | |
142 | return t.normalize(_cost_capacity) * fixed_point_factor; | |
143 | } | |
144 | ||
145 | // Priority class, to be used with a given fair_queue | |
146 | class fair_queue::priority_class_data { | |
147 | friend class fair_queue; | |
148 | uint32_t _shares = 0; | |
149 | capacity_t _accumulated = 0; | |
150 | capacity_t _pure_accumulated = 0; | |
151 | fair_queue_entry::container_list_t _queue; | |
152 | bool _queued = false; | |
153 | bool _plugged = true; | |
154 | ||
155 | public: | |
156 | explicit priority_class_data(uint32_t shares) noexcept : _shares(std::max(shares, 1u)) {} | |
157 | priority_class_data(const priority_class_data&) = delete; | |
158 | priority_class_data(priority_class_data&&) = delete; | |
159 | ||
160 | void update_shares(uint32_t shares) noexcept { | |
161 | _shares = (std::max(shares, 1u)); | |
162 | } | |
163 | }; | |
164 | ||
165 | bool fair_queue::class_compare::operator() (const priority_class_ptr& lhs, const priority_class_ptr & rhs) const noexcept { | |
166 | return lhs->_accumulated > rhs->_accumulated; | |
167 | } | |
168 | ||
169 | fair_queue::fair_queue(fair_group& group, config cfg) | |
170 | : _config(std::move(cfg)) | |
171 | , _group(group) | |
172 | , _group_replenish(clock_type::now()) | |
173 | { | |
174 | } | |
175 | ||
176 | fair_queue::fair_queue(fair_queue&& other) | |
177 | : _config(std::move(other._config)) | |
178 | , _group(other._group) | |
179 | , _group_replenish(std::move(other._group_replenish)) | |
180 | , _resources_executing(std::exchange(other._resources_executing, fair_queue_ticket{})) | |
181 | , _resources_queued(std::exchange(other._resources_queued, fair_queue_ticket{})) | |
182 | , _requests_executing(std::exchange(other._requests_executing, 0)) | |
183 | , _requests_queued(std::exchange(other._requests_queued, 0)) | |
184 | , _handles(std::move(other._handles)) | |
185 | , _priority_classes(std::move(other._priority_classes)) | |
186 | , _last_accumulated(other._last_accumulated) | |
187 | { | |
188 | } | |
189 | ||
190 | fair_queue::~fair_queue() { | |
191 | for (const auto& fq : _priority_classes) { | |
192 | assert(!fq); | |
193 | } | |
194 | } | |
195 | ||
196 | void fair_queue::push_priority_class(priority_class_data& pc) noexcept { | |
197 | assert(pc._plugged && !pc._queued); | |
198 | _handles.assert_enough_capacity(); | |
199 | _handles.push(&pc); | |
200 | pc._queued = true; | |
201 | } | |
202 | ||
203 | void fair_queue::push_priority_class_from_idle(priority_class_data& pc) noexcept { | |
204 | if (!pc._queued) { | |
205 | // Don't let the newcomer monopolize the disk for more than tau | |
206 | // duration. For this estimate how many capacity units can be | |
207 | // accumulated with the current class shares per rate resulution | |
208 | // and scale it up to tau. | |
209 | capacity_t max_deviation = fair_group::fixed_point_factor / pc._shares * fair_group::token_bucket_t::rate_cast(_config.tau).count(); | |
210 | // On start this deviation can go to negative values, so not to | |
211 | // introduce extra if's for that short corner case, use signed | |
212 | // arithmetics and make sure the _accumulated value doesn't grow | |
213 | // over signed maximum (see overflow check below) | |
214 | pc._accumulated = std::max<signed_capacity_t>(_last_accumulated - max_deviation, pc._accumulated); | |
215 | _handles.assert_enough_capacity(); | |
216 | _handles.push(&pc); | |
217 | pc._queued = true; | |
218 | } | |
219 | } | |
220 | ||
221 | void fair_queue::pop_priority_class(priority_class_data& pc) noexcept { | |
222 | assert(pc._plugged && pc._queued); | |
223 | pc._queued = false; | |
224 | _handles.pop(); | |
225 | } | |
226 | ||
227 | void fair_queue::plug_priority_class(priority_class_data& pc) noexcept { | |
228 | assert(!pc._plugged && !pc._queued); | |
229 | pc._plugged = true; | |
230 | if (!pc._queue.empty()) { | |
231 | push_priority_class_from_idle(pc); | |
232 | } | |
233 | } | |
234 | ||
235 | void fair_queue::plug_class(class_id cid) noexcept { | |
236 | plug_priority_class(*_priority_classes[cid]); | |
237 | } | |
238 | ||
239 | void fair_queue::unplug_priority_class(priority_class_data& pc) noexcept { | |
240 | assert(pc._plugged); | |
241 | if (pc._queued) { | |
242 | pop_priority_class(pc); | |
243 | } | |
244 | pc._plugged = false; | |
245 | } | |
246 | ||
247 | void fair_queue::unplug_class(class_id cid) noexcept { | |
248 | unplug_priority_class(*_priority_classes[cid]); | |
249 | } | |
250 | ||
251 | auto fair_queue::grab_pending_capacity(const fair_queue_entry& ent) noexcept -> grab_result { | |
252 | _group.maybe_replenish_capacity(_group_replenish); | |
253 | ||
254 | if (_group.capacity_deficiency(_pending->head)) { | |
255 | return grab_result::pending; | |
256 | } | |
257 | ||
258 | capacity_t cap = _group.ticket_capacity(ent._ticket); | |
259 | if (cap > _pending->cap) { | |
260 | return grab_result::cant_preempt; | |
261 | } | |
262 | ||
263 | if (cap < _pending->cap) { | |
264 | _group.release_capacity(_pending->cap - cap); // FIXME -- replenish right at once? | |
265 | } | |
266 | ||
267 | _pending.reset(); | |
268 | return grab_result::grabbed; | |
269 | } | |
270 | ||
271 | auto fair_queue::grab_capacity(const fair_queue_entry& ent) noexcept -> grab_result { | |
272 | if (_pending) { | |
273 | return grab_pending_capacity(ent); | |
274 | } | |
275 | ||
276 | capacity_t cap = _group.ticket_capacity(ent._ticket); | |
277 | capacity_t want_head = _group.grab_capacity(cap); | |
278 | if (_group.capacity_deficiency(want_head)) { | |
279 | _pending.emplace(want_head, cap); | |
280 | return grab_result::pending; | |
281 | } | |
282 | ||
283 | return grab_result::grabbed; | |
284 | } | |
285 | ||
286 | void fair_queue::register_priority_class(class_id id, uint32_t shares) { | |
287 | if (id >= _priority_classes.size()) { | |
288 | _priority_classes.resize(id + 1); | |
289 | } else { | |
290 | assert(!_priority_classes[id]); | |
291 | } | |
292 | ||
293 | _handles.reserve(_nr_classes + 1); | |
294 | _priority_classes[id] = std::make_unique<priority_class_data>(shares); | |
295 | _nr_classes++; | |
296 | } | |
297 | ||
298 | void fair_queue::unregister_priority_class(class_id id) { | |
299 | auto& pclass = _priority_classes[id]; | |
300 | assert(pclass && pclass->_queue.empty()); | |
301 | pclass.reset(); | |
302 | _nr_classes--; | |
303 | } | |
304 | ||
305 | void fair_queue::update_shares_for_class(class_id id, uint32_t shares) { | |
306 | assert(id < _priority_classes.size()); | |
307 | auto& pc = _priority_classes[id]; | |
308 | assert(pc); | |
309 | pc->update_shares(shares); | |
310 | } | |
311 | ||
312 | size_t fair_queue::waiters() const { | |
313 | return _requests_queued; | |
314 | } | |
315 | ||
316 | size_t fair_queue::requests_currently_executing() const { | |
317 | return _requests_executing; | |
318 | } | |
319 | ||
320 | fair_queue_ticket fair_queue::resources_currently_waiting() const { | |
321 | return _resources_queued; | |
322 | } | |
323 | ||
324 | fair_queue_ticket fair_queue::resources_currently_executing() const { | |
325 | return _resources_executing; | |
326 | } | |
327 | ||
328 | void fair_queue::queue(class_id id, fair_queue_entry& ent) noexcept { | |
329 | priority_class_data& pc = *_priority_classes[id]; | |
330 | // We need to return a future in this function on which the caller can wait. | |
331 | // Since we don't know which queue we will use to execute the next request - if ours or | |
332 | // someone else's, we need a separate promise at this point. | |
333 | if (pc._plugged) { | |
334 | push_priority_class_from_idle(pc); | |
335 | } | |
336 | pc._queue.push_back(ent); | |
337 | _resources_queued += ent._ticket; | |
338 | _requests_queued++; | |
339 | } | |
340 | ||
341 | void fair_queue::notify_request_finished(fair_queue_ticket desc) noexcept { | |
342 | _resources_executing -= desc; | |
343 | _requests_executing--; | |
344 | _group.release_capacity(_group.ticket_capacity(desc)); | |
345 | } | |
346 | ||
347 | void fair_queue::notify_request_cancelled(fair_queue_entry& ent) noexcept { | |
348 | _resources_queued -= ent._ticket; | |
349 | ent._ticket = fair_queue_ticket(); | |
350 | } | |
351 | ||
352 | fair_queue::clock_type::time_point fair_queue::next_pending_aio() const noexcept { | |
353 | if (_pending) { | |
354 | /* | |
355 | * We expect the disk to release the ticket within some time, | |
356 | * but it's ... OK if it doesn't -- the pending wait still | |
357 | * needs the head rover value to be ahead of the needed value. | |
358 | * | |
359 | * It may happen that the capacity gets released before we think | |
360 | * it will, in this case we will wait for the full value again, | |
361 | * which's sub-optimal. The expectation is that we think disk | |
362 | * works faster, than it really does. | |
363 | */ | |
364 | auto over = _group.capacity_deficiency(_pending->head); | |
365 | auto ticks = _group.capacity_duration(over); | |
366 | return std::chrono::steady_clock::now() + std::chrono::duration_cast<std::chrono::microseconds>(ticks); | |
367 | } | |
368 | ||
369 | return std::chrono::steady_clock::time_point::max(); | |
370 | } | |
371 | ||
372 | void fair_queue::dispatch_requests(std::function<void(fair_queue_entry&)> cb) { | |
373 | capacity_t dispatched = 0; | |
374 | boost::container::small_vector<priority_class_ptr, 2> preempt; | |
375 | ||
376 | while (!_handles.empty() && (dispatched < _group.maximum_capacity() / smp::count)) { | |
377 | priority_class_data& h = *_handles.top(); | |
378 | if (h._queue.empty()) { | |
379 | pop_priority_class(h); | |
380 | continue; | |
381 | } | |
382 | ||
383 | auto& req = h._queue.front(); | |
384 | auto gr = grab_capacity(req); | |
385 | if (gr == grab_result::pending) { | |
386 | break; | |
387 | } | |
388 | ||
389 | if (gr == grab_result::cant_preempt) { | |
390 | pop_priority_class(h); | |
391 | preempt.emplace_back(&h); | |
392 | continue; | |
393 | } | |
394 | ||
395 | _last_accumulated = std::max(h._accumulated, _last_accumulated); | |
396 | pop_priority_class(h); | |
397 | h._queue.pop_front(); | |
398 | ||
399 | _resources_executing += req._ticket; | |
400 | _resources_queued -= req._ticket; | |
401 | _requests_executing++; | |
402 | _requests_queued--; | |
403 | ||
404 | // Usually the cost of request is tens to hundreeds of thousands. However, for | |
405 | // unrestricted queue it can be as low as 2k. With large enough shares this | |
406 | // has chances to be translated into zero cost which, in turn, will make the | |
407 | // class show no progress and monopolize the queue. | |
408 | auto req_cap = _group.ticket_capacity(req._ticket); | |
409 | auto req_cost = std::max(req_cap / h._shares, (capacity_t)1); | |
410 | // signed overflow check to make push_priority_class_from_idle math work | |
411 | if (h._accumulated >= std::numeric_limits<signed_capacity_t>::max() - req_cost) { | |
412 | for (auto& pc : _priority_classes) { | |
413 | if (pc) { | |
414 | if (pc->_queued) { | |
415 | pc->_accumulated -= h._accumulated; | |
416 | } else { // this includes h | |
417 | pc->_accumulated = 0; | |
418 | } | |
419 | } | |
420 | } | |
421 | _last_accumulated = 0; | |
422 | } | |
423 | h._accumulated += req_cost; | |
424 | h._pure_accumulated += req_cap; | |
425 | ||
426 | dispatched += _group.ticket_capacity(req._ticket); | |
427 | cb(req); | |
428 | ||
429 | if (h._plugged && !h._queue.empty()) { | |
430 | push_priority_class(h); | |
431 | } | |
432 | } | |
433 | ||
434 | for (auto&& h : preempt) { | |
435 | push_priority_class(*h); | |
436 | } | |
437 | } | |
438 | ||
439 | std::vector<seastar::metrics::impl::metric_definition_impl> fair_queue::metrics(class_id c) { | |
440 | namespace sm = seastar::metrics; | |
441 | priority_class_data& pc = *_priority_classes[c]; | |
442 | return std::vector<sm::impl::metric_definition_impl>({ | |
443 | sm::make_counter("consumption", | |
444 | [&pc] { return fair_group::capacity_tokens(pc._pure_accumulated); }, | |
445 | sm::description("Accumulated disk capacity units consumed by this class; an increment per-second rate indicates full utilization")), | |
446 | sm::make_counter("adjusted_consumption", | |
447 | [&pc] { return fair_group::capacity_tokens(pc._accumulated); }, | |
448 | sm::description("Consumed disk capacity units adjusted for class shares and idling preemption")), | |
449 | }); | |
450 | } | |
451 | ||
452 | } |