]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/io_queue.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / io_queue.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22
23 #include <seastar/core/file.hh>
24 #include <seastar/core/fair_queue.hh>
25 #include <seastar/core/io_queue.hh>
26 #include <seastar/core/reactor.hh>
27 #include <seastar/core/metrics.hh>
28 #include <seastar/core/linux-aio.hh>
29 #include <seastar/core/internal/io_desc.hh>
30 #include <seastar/util/log.hh>
31 #include <chrono>
32 #include <mutex>
33 #include <array>
34 #include <fmt/format.h>
35 #include <fmt/ostream.h>
36
37 namespace seastar {
38
39 logger io_log("io");
40
41 using namespace std::chrono_literals;
42 using namespace internal::linux_abi;
43
44 class io_desc_read_write final : public io_completion {
45 io_queue* _ioq_ptr;
46 fair_queue_ticket _fq_ticket;
47 promise<size_t> _pr;
48 private:
49 void notify_requests_finished() noexcept {
50 _ioq_ptr->notify_requests_finished(_fq_ticket);
51 }
52 public:
53 io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
54 : _ioq_ptr(ioq)
55 , _fq_ticket(ticket)
56 {}
57
58 virtual void set_exception(std::exception_ptr eptr) noexcept override {
59 io_log.trace("dev {} : req {} error", _ioq_ptr->dev_id(), fmt::ptr(this));
60 notify_requests_finished();
61 _pr.set_exception(eptr);
62 delete this;
63 }
64
65 virtual void complete(size_t res) noexcept override {
66 io_log.trace("dev {} : req {} complete", _ioq_ptr->dev_id(), fmt::ptr(this));
67 notify_requests_finished();
68 _pr.set_value(res);
69 delete this;
70 }
71
72 future<size_t> get_future() {
73 return _pr.get_future();
74 }
75 };
76
77 void
78 io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {
79 _requests_executing--;
80 _fq.notify_requests_finished(desc);
81 }
82
83 fair_queue::config io_queue::make_fair_queue_config(config iocfg) {
84 fair_queue::config cfg;
85 cfg.max_req_count = iocfg.max_req_count;
86 cfg.max_bytes_count = iocfg.max_bytes_count;
87 return cfg;
88 }
89
90 io_queue::io_queue(io_queue::config cfg)
91 : _priority_classes()
92 , _fq(make_fair_queue_config(cfg))
93 , _config(std::move(cfg)) {
94 }
95
96 io_queue::~io_queue() {
97 // It is illegal to stop the I/O queue with pending requests.
98 // Technically we would use a gate to guarantee that. But here, it is not
99 // needed since this is expected to be destroyed only after the reactor is destroyed.
100 //
101 // And that will happen only when there are no more fibers to run. If we ever change
102 // that, then this has to change.
103 for (auto&& pc_vec : _priority_classes) {
104 for (auto&& pc_data : pc_vec) {
105 if (pc_data) {
106 _fq.unregister_priority_class(pc_data->ptr);
107 }
108 }
109 }
110 }
111
112 std::mutex io_queue::_register_lock;
113 std::array<uint32_t, io_queue::_max_classes> io_queue::_registered_shares;
114 // We could very well just add the name to the io_priority_class. However, because that
115 // structure is passed along all the time - and sometimes we can't help but copy it, better keep
116 // it lean. The name won't really be used for anything other than monitoring.
117 std::array<sstring, io_queue::_max_classes> io_queue::_registered_names;
118
119 io_priority_class io_queue::register_one_priority_class(sstring name, uint32_t shares) {
120 std::lock_guard<std::mutex> lock(_register_lock);
121 for (unsigned i = 0; i < _max_classes; ++i) {
122 if (!_registered_shares[i]) {
123 _registered_shares[i] = shares;
124 _registered_names[i] = std::move(name);
125 } else if (_registered_names[i] != name) {
126 continue;
127 } else {
128 // found an entry matching the name to be registered,
129 // make sure it was registered with the same number shares
130 // Note: those may change dynamically later on in the
131 // fair queue priority_class_ptr
132 assert(_registered_shares[i] == shares);
133 }
134 return io_priority_class(i);
135 }
136 throw std::runtime_error("No more room for new I/O priority classes");
137 }
138
139 bool io_queue::rename_one_priority_class(io_priority_class pc, sstring new_name) {
140 std::lock_guard<std::mutex> guard(_register_lock);
141 for (unsigned i = 0; i < _max_classes; ++i) {
142 if (!_registered_shares[i]) {
143 break;
144 }
145 if (_registered_names[i] == new_name) {
146 if (i == pc.id()) {
147 return false;
148 } else {
149 throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an"
150 " already existing name ({})", new_name));
151 }
152 }
153 }
154 _registered_names[pc.id()] = new_name;
155 return true;
156 }
157
158 seastar::metrics::label io_queue_shard("ioshard");
159
160 io_queue::priority_class_data::priority_class_data(sstring name, sstring mountpoint, priority_class_ptr ptr, shard_id owner)
161 : ptr(ptr)
162 , bytes(0)
163 , ops(0)
164 , nr_queued(0)
165 , queue_time(1s)
166 {
167 register_stats(name, mountpoint, owner);
168 }
169
170 void
171 io_queue::priority_class_data::rename(sstring new_name, sstring mountpoint, shard_id owner) {
172 try {
173 register_stats(new_name, mountpoint, owner);
174 } catch (metrics::double_registration &e) {
175 // we need to ignore this exception, since it can happen that
176 // a class that was already created with the new name will be
177 // renamed again (this will cause a double registration exception
178 // to be thrown).
179 }
180
181 }
182
183 void
184 io_queue::priority_class_data::register_stats(sstring name, sstring mountpoint, shard_id owner) {
185 seastar::metrics::metric_groups new_metrics;
186 namespace sm = seastar::metrics;
187 auto shard = sm::impl::shard();
188
189 auto ioq_group = sm::label("mountpoint");
190 auto mountlabel = ioq_group(mountpoint);
191
192 auto class_label_type = sm::label("class");
193 auto class_label = class_label_type(name);
194 new_metrics.add_group("io_queue", {
195 sm::make_derive("total_bytes", bytes, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
196 sm::make_derive("total_operations", ops, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
197 // Note: The counter below is not the same as reactor's queued-io-requests
198 // queued-io-requests shows us how many requests in total exist in this I/O Queue.
199 //
200 // This counter lives in the priority class, so it will count only queued requests
201 // that belong to that class.
202 //
203 // In other words: the new counter tells you how busy a class is, and the
204 // old counter tells you how busy the system is.
205
206 sm::make_queue_length("queue_length", nr_queued, sm::description("Number of requests in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
207 sm::make_gauge("delay", [this] {
208 return queue_time.count();
209 }, sm::description("total delay time in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
210 sm::make_gauge("shares", [this] {
211 return this->ptr->shares();
212 }, sm::description("current amount of shares"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label})
213 });
214 _metric_groups = std::exchange(new_metrics, {});
215 }
216
217 io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_class& pc, shard_id owner) {
218 auto id = pc.id();
219 bool do_insert = false;
220 if ((do_insert = (owner >= _priority_classes.size()))) {
221 _priority_classes.resize(owner + 1);
222 _priority_classes[owner].resize(id + 1);
223 } else if ((do_insert = (id >= _priority_classes[owner].size()))) {
224 _priority_classes[owner].resize(id + 1);
225 }
226 if (do_insert || !_priority_classes[owner][id]) {
227 auto shares = _registered_shares.at(id);
228 sstring name;
229 {
230 std::lock_guard<std::mutex> lock(_register_lock);
231 name = _registered_names.at(id);
232 }
233
234 // A note on naming:
235 //
236 // We could just add the owner as the instance id and have something like:
237 // io_queue-<class_owner>-<counter>-<class_name>
238 //
239 // However, when there are more than one shard per I/O queue, it is very useful
240 // to know which shards are being served by the same queue. Therefore, a better name
241 // scheme is:
242 //
243 // io_queue-<queue_owner>-<counter>-<class_name>, shard=<class_owner>
244 // using the shard label to hold the owner number
245 //
246 // This conveys all the information we need and allows one to easily group all classes from
247 // the same I/O queue (by filtering by shard)
248 auto pc_ptr = _fq.register_priority_class(shares);
249 auto pc_data = std::make_unique<priority_class_data>(name, mountpoint(), pc_ptr, owner);
250
251 _priority_classes[owner][id] = std::move(pc_data);
252 }
253 return *_priority_classes[owner][id];
254 }
255
256 fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) const {
257 unsigned weight;
258 size_t size;
259 if (req.is_write()) {
260 weight = _config.disk_req_write_to_read_multiplier;
261 size = _config.disk_bytes_write_to_read_multiplier * len;
262 } else if (req.is_read()) {
263 weight = io_queue::read_request_base_count;
264 size = io_queue::read_request_base_count * len;
265 } else {
266 throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
267 }
268
269 return fair_queue_ticket(weight, size);
270 }
271
272 future<size_t>
273 io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
274 auto start = std::chrono::steady_clock::now();
275 return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
276 // First time will hit here, and then we create the class. It is important
277 // that we create the shared pointer in the same shard it will be used at later.
278 auto& pclass = find_or_create_class(pc, owner);
279 fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
280 auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
281 auto fut = desc->get_future();
282 io_log.trace("dev {} : req {} queue len {} ticket {}", _config.devid, fmt::ptr(&*desc), len, fq_ticket);
283 _fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
284 _queued_requests--;
285 _requests_executing++;
286 pclass.nr_queued--;
287 pclass.ops++;
288 pclass.bytes += len;
289 pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
290 io_log.trace("dev {} : req {} submit", _config.devid, fmt::ptr(&*d));
291 engine().submit_io(d.release(), std::move(req));
292 });
293 pclass.nr_queued++;
294 _queued_requests++;
295 return fut;
296 });
297 }
298
299 future<>
300 io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
301 return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] {
302 auto& pclass = find_or_create_class(pc, owner);
303 pclass.ptr->update_shares(new_shares);
304 });
305 }
306
307 void
308 io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
309 for (unsigned owner = 0; owner < _priority_classes.size(); owner++) {
310 if (_priority_classes[owner].size() > pc.id() &&
311 _priority_classes[owner][pc.id()]) {
312 _priority_classes[owner][pc.id()]->rename(new_name, _config.mountpoint, owner);
313 }
314 }
315 }
316
317 }