]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/fair_queue.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / seastar / src / core / fair_queue.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#include <seastar/core/fair_queue.hh>
23#include <seastar/core/future.hh>
24#include <seastar/core/shared_ptr.hh>
25#include <seastar/core/circular_buffer.hh>
26#include <seastar/util/noncopyable_function.hh>
27#include <queue>
28#include <chrono>
29#include <unordered_set>
30#include <cmath>
31
32namespace seastar {
33
34void fair_queue::push_priority_class(priority_class_ptr pc) {
35 if (!pc->_queued) {
36 _handles.push(pc);
37 pc->_queued = true;
38 }
39}
40
41priority_class_ptr fair_queue::pop_priority_class() {
42 assert(!_handles.empty());
43 auto h = _handles.top();
44 _handles.pop();
45 assert(h->_queued);
46 h->_queued = false;
47 return h;
48}
49
50float fair_queue::normalize_factor() const {
51 return std::numeric_limits<float>::min();
52}
53
54void fair_queue::normalize_stats() {
55 auto time_delta = std::log(normalize_factor()) * _config.tau;
56 // time_delta is negative; and this may advance _base into the future
57 _base -= std::chrono::duration_cast<clock_type::duration>(time_delta);
58 for (auto& pc: _all_classes) {
59 pc->_accumulated *= normalize_factor();
60 }
61}
62
63bool fair_queue::can_dispatch() const {
64 return _requests_queued &&
65 (_requests_executing < _config.capacity) &&
66 (_req_count_executing < _config.max_req_count) &&
67 (_bytes_count_executing < _config.max_bytes_count);
68}
69
70priority_class_ptr fair_queue::register_priority_class(uint32_t shares) {
71 priority_class_ptr pclass = make_lw_shared<priority_class>(shares);
72 _all_classes.insert(pclass);
73 return pclass;
74}
75
76void fair_queue::unregister_priority_class(priority_class_ptr pclass) {
77 assert(pclass->_queue.empty());
78 _all_classes.erase(pclass);
79}
80
81size_t fair_queue::waiters() const {
82 return _requests_queued;
83}
84
85size_t fair_queue::requests_currently_executing() const {
86 return _requests_executing;
87}
88
89void fair_queue::queue(priority_class_ptr pc, fair_queue_request_descriptor desc, noncopyable_function<void()> func) {
90 // We need to return a future in this function on which the caller can wait.
91 // Since we don't know which queue we will use to execute the next request - if ours or
92 // someone else's, we need a separate promise at this point.
93 push_priority_class(pc);
94 pc->_queue.push_back(priority_class::request{std::move(func), std::move(desc)});
95 _requests_queued++;
96}
97
98void fair_queue::notify_requests_finished(fair_queue_request_descriptor& desc) {
99 _requests_executing--;
100 _req_count_executing -= desc.weight;
101 _bytes_count_executing -= desc.size;
102}
103
104
105void fair_queue::dispatch_requests() {
106 while (can_dispatch()) {
107 priority_class_ptr h;
108 do {
109 h = pop_priority_class();
110 } while (h->_queue.empty());
111
112 auto req = std::move(h->_queue.front());
113 h->_queue.pop_front();
114 _requests_executing++;
115 _req_count_executing += req.desc.weight;
116 _bytes_count_executing += req.desc.size;
117 _requests_queued--;
118
119 auto delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
120 auto req_cost = (float(req.desc.weight) / _config.max_req_count + float(req.desc.size) / _config.max_bytes_count) / h->_shares;
121 auto cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;
122 float next_accumulated = h->_accumulated + cost;
123 while (std::isinf(next_accumulated)) {
124 normalize_stats();
125 // If we have renormalized, our time base will have changed. This should happen very infrequently
126 delta = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - _base);
127 cost = expf(1.0f/_config.tau.count() * delta.count()) * req_cost;
128 next_accumulated = h->_accumulated + cost;
129 }
130 h->_accumulated = next_accumulated;
131
132 if (!h->_queue.empty()) {
133 push_priority_class(h);
134 }
135 req.func();
136 }
137}
138
139void fair_queue::update_shares(priority_class_ptr pc, uint32_t new_shares) {
140 pc->update_shares(new_shares);
141}
142
143}