]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/fair_queue.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / seastar / src / core / fair_queue.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#include <seastar/core/fair_queue.hh>
23#include <seastar/core/future.hh>
24#include <seastar/core/shared_ptr.hh>
25#include <seastar/core/circular_buffer.hh>
26#include <seastar/util/noncopyable_function.hh>
27#include <queue>
28#include <chrono>
29#include <unordered_set>
30#include <cmath>
31
f67539c2
TL
32#include "fmt/format.h"
33#include "fmt/ostream.h"
34
9f95a23c
TL
35namespace seastar {
36
f67539c2
TL
37static_assert(sizeof(fair_queue_ticket) == sizeof(uint64_t), "unexpected fair_queue_ticket size");
38
39fair_queue_ticket::fair_queue_ticket(uint32_t weight, uint32_t size)
40 : _weight(weight)
41 , _size(size)
42{}
43
44float fair_queue_ticket::normalize(fair_queue_ticket denominator) const {
45 return float(_weight) / denominator._weight + float(_size) / denominator._size;
46}
47
48fair_queue_ticket fair_queue_ticket::operator+(fair_queue_ticket desc) const {
49 return fair_queue_ticket(_weight + desc._weight, _size + desc._size);
50}
51
52fair_queue_ticket& fair_queue_ticket::operator+=(fair_queue_ticket desc) {
53 _weight += desc._weight;
54 _size += desc._size;
55 return *this;
56}
57
58fair_queue_ticket fair_queue_ticket::operator-(fair_queue_ticket desc) const {
59 return fair_queue_ticket(_weight - desc._weight, _size - desc._size);
60}
61
62fair_queue_ticket& fair_queue_ticket::operator-=(fair_queue_ticket desc) {
63 _weight -= desc._weight;
64 _size -= desc._size;
65 return *this;
66}
67
68bool fair_queue_ticket::strictly_less(fair_queue_ticket rhs) const {
69 return (_weight < rhs._weight) && (_size < rhs._size);
70}
71
72fair_queue_ticket::operator bool() const {
73 return (_weight > 0) || (_size > 0);
74}
75
76std::ostream& operator<<(std::ostream& os, fair_queue_ticket t) {
77 return os << t._weight << ":" << t._size;
78}
79
80fair_queue::fair_queue(config cfg)
81 : _config(std::move(cfg))
82 , _maximum_capacity(_config.max_req_count, _config.max_bytes_count)
83 , _current_capacity(_config.max_req_count, _config.max_bytes_count)
84 , _base(std::chrono::steady_clock::now())
85{}
86
9f95a23c
TL
87void fair_queue::push_priority_class(priority_class_ptr pc) {
88 if (!pc->_queued) {
89 _handles.push(pc);
90 pc->_queued = true;
91 }
92}
93
94priority_class_ptr fair_queue::pop_priority_class() {
95 assert(!_handles.empty());
96 auto h = _handles.top();
97 _handles.pop();
98 assert(h->_queued);
99 h->_queued = false;
100 return h;
101}
102
103float fair_queue::normalize_factor() const {
104 return std::numeric_limits<float>::min();
105}
106
107void fair_queue::normalize_stats() {
108 auto time_delta = std::log(normalize_factor()) * _config.tau;
109 // time_delta is negative; and this may advance _base into the future
110 _base -= std::chrono::duration_cast<clock_type::duration>(time_delta);
111 for (auto& pc: _all_classes) {
112 pc->_accumulated *= normalize_factor();
113 }
114}
115
116bool fair_queue::can_dispatch() const {
f67539c2 117 return _resources_queued && (_resources_executing.strictly_less(_current_capacity));
9f95a23c
TL
118}
119
120priority_class_ptr fair_queue::register_priority_class(uint32_t shares) {
121 priority_class_ptr pclass = make_lw_shared<priority_class>(shares);
122 _all_classes.insert(pclass);
123 return pclass;
124}
125
126void fair_queue::unregister_priority_class(priority_class_ptr pclass) {
127 assert(pclass->_queue.empty());
128 _all_classes.erase(pclass);
129}
130
131size_t fair_queue::waiters() const {
132 return _requests_queued;
133}
134
135size_t fair_queue::requests_currently_executing() const {
136 return _requests_executing;
137}
138
f67539c2
TL
139fair_queue_ticket fair_queue::resources_currently_waiting() const {
140 return _resources_queued;
141}
142
143fair_queue_ticket fair_queue::resources_currently_executing() const {
144 return _resources_executing;
145}
146
147void fair_queue::queue(priority_class_ptr pc, fair_queue_ticket desc, noncopyable_function<void()> func) {
9f95a23c
TL
148 // We need to return a future in this function on which the caller can wait.
149 // Since we don't know which queue we will use to execute the next request - if ours or
150 // someone else's, we need a separate promise at this point.
151 push_priority_class(pc);
152 pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)});
f67539c2 153 _resources_queued += desc;
9f95a23c
TL
154 _requests_queued++;
155}
156
f67539c2
TL
157void fair_queue::notify_requests_finished(fair_queue_ticket desc, unsigned nr) noexcept {
158 _resources_executing -= desc;
159 _requests_executing -= nr;
9f95a23c
TL
160}
161
9f95a23c
TL
162void fair_queue::dispatch_requests() {
163 while (can_dispatch()) {
164 priority_class_ptr h;
165 do {
166 h = pop_priority_class();
167 } while (h->_queue.empty());
168
169 auto req = std::move(h->_queue.front());
170 h->_queue.pop_front();
f67539c2
TL
171 _resources_executing += req.desc;;
172 _resources_queued -= req.desc;
9f95a23c 173 _requests_executing++;
9f95a23c
TL
174 _requests_queued--;
175
176 auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
f67539c2 177 auto req_cost = req.desc.normalize(_maximum_capacity) / h->_shares;
9f95a23c
TL
178 auto cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;
179 float next_accumulated = h->_accumulated + cost;
180 while (std::isinf(next_accumulated)) {
181 normalize_stats();
182 // If we have renormalized, our time base will have changed. This should happen very infrequently
183 delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
184 cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;
185 next_accumulated = h->_accumulated + cost;
186 }
187 h->_accumulated = next_accumulated;
188
189 if (!h->_queue.empty()) {
190 push_priority_class(h);
191 }
192 req.func();
193 }
194}
195
9f95a23c 196}