]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #include <seastar/core/fair_queue.hh> | |
23 | #include <seastar/core/future.hh> | |
24 | #include <seastar/core/shared_ptr.hh> | |
25 | #include <seastar/core/circular_buffer.hh> | |
26 | #include <seastar/util/noncopyable_function.hh> | |
27 | #include <queue> | |
28 | #include <chrono> | |
29 | #include <unordered_set> | |
30 | #include <cmath> | |
31 | ||
32 | namespace seastar { | |
33 | ||
34 | void fair_queue::push_priority_class(priority_class_ptr pc) { | |
35 | if (!pc->_queued) { | |
36 | _handles.push(pc); | |
37 | pc->_queued = true; | |
38 | } | |
39 | } | |
40 | ||
41 | priority_class_ptr fair_queue::pop_priority_class() { | |
42 | assert(!_handles.empty()); | |
43 | auto h = _handles.top(); | |
44 | _handles.pop(); | |
45 | assert(h->_queued); | |
46 | h->_queued = false; | |
47 | return h; | |
48 | } | |
49 | ||
50 | float fair_queue::normalize_factor() const { | |
51 | return std::numeric_limits<float>::min(); | |
52 | } | |
53 | ||
54 | void fair_queue::normalize_stats() { | |
55 | auto time_delta = std::log(normalize_factor()) * _config.tau; | |
56 | // time_delta is negative; and this may advance _base into the future | |
57 | _base -= std::chrono::duration_cast<clock_type::duration>(time_delta); | |
58 | for (auto& pc: _all_classes) { | |
59 | pc->_accumulated *= normalize_factor(); | |
60 | } | |
61 | } | |
62 | ||
63 | bool fair_queue::can_dispatch() const { | |
64 | return _requests_queued && | |
65 | (_requests_executing < _config.capacity) && | |
66 | (_req_count_executing < _config.max_req_count) && | |
67 | (_bytes_count_executing < _config.max_bytes_count); | |
68 | } | |
69 | ||
70 | priority_class_ptr fair_queue::register_priority_class(uint32_t shares) { | |
71 | priority_class_ptr pclass = make_lw_shared<priority_class>(shares); | |
72 | _all_classes.insert(pclass); | |
73 | return pclass; | |
74 | } | |
75 | ||
76 | void fair_queue::unregister_priority_class(priority_class_ptr pclass) { | |
77 | assert(pclass->_queue.empty()); | |
78 | _all_classes.erase(pclass); | |
79 | } | |
80 | ||
81 | size_t fair_queue::waiters() const { | |
82 | return _requests_queued; | |
83 | } | |
84 | ||
85 | size_t fair_queue::requests_currently_executing() const { | |
86 | return _requests_executing; | |
87 | } | |
88 | ||
89 | void fair_queue::queue(priority_class_ptr pc, fair_queue_request_descriptor desc, noncopyable_function<void()> func) { | |
90 | // We need to return a future in this function on which the caller can wait. | |
91 | // Since we don't know which queue we will use to execute the next request - if ours or | |
92 | // someone else's, we need a separate promise at this point. | |
93 | push_priority_class(pc); | |
94 | pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)}); | |
95 | _requests_queued++; | |
96 | } | |
97 | ||
98 | void fair_queue::notify_requests_finished(fair_queue_request_descriptor& desc) { | |
99 | _requests_executing--; | |
100 | _req_count_executing -= desc.weight; | |
101 | _bytes_count_executing -= desc.size; | |
102 | } | |
103 | ||
104 | ||
105 | void fair_queue::dispatch_requests() { | |
106 | while (can_dispatch()) { | |
107 | priority_class_ptr h; | |
108 | do { | |
109 | h = pop_priority_class(); | |
110 | } while (h->_queue.empty()); | |
111 | ||
112 | auto req = std::move(h->_queue.front()); | |
113 | h->_queue.pop_front(); | |
114 | _requests_executing++; | |
115 | _req_count_executing += req.desc.weight; | |
116 | _bytes_count_executing += req.desc.size; | |
117 | _requests_queued--; | |
118 | ||
119 | auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); | |
120 | auto req_cost = (float(req.desc.weight) / _config.max_req_count + float(req.desc.size) / _config.max_bytes_count) / h->_shares; | |
121 | auto cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost; | |
122 | float next_accumulated = h->_accumulated + cost; | |
123 | while (std::isinf(next_accumulated)) { | |
124 | normalize_stats(); | |
125 | // If we have renormalized, our time base will have changed. This should happen very infrequently | |
126 | delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base); | |
127 | cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost; | |
128 | next_accumulated = h->_accumulated + cost; | |
129 | } | |
130 | h->_accumulated = next_accumulated; | |
131 | ||
132 | if (!h->_queue.empty()) { | |
133 | push_priority_class(h); | |
134 | } | |
135 | req.func(); | |
136 | } | |
137 | } | |
138 | ||
139 | void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) { | |
140 | pc->update_shares(new_shares); | |
141 | } | |
142 | ||
143 | } |