]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | ||
9f95a23c TL |
23 | #include <seastar/core/file.hh> |
24 | #include <seastar/core/fair_queue.hh> | |
25 | #include <seastar/core/io_queue.hh> | |
26 | #include <seastar/core/reactor.hh> | |
27 | #include <seastar/core/metrics.hh> | |
28 | #include <seastar/core/linux-aio.hh> | |
29 | #include <seastar/core/internal/io_desc.hh> | |
f67539c2 | 30 | #include <seastar/util/log.hh> |
9f95a23c TL |
31 | #include <chrono> |
32 | #include <mutex> | |
33 | #include <array> | |
34 | #include <fmt/format.h> | |
35 | #include <fmt/ostream.h> | |
36 | ||
37 | namespace seastar { | |
38 | ||
f67539c2 TL |
39 | logger io_log("io"); |
40 | ||
9f95a23c TL |
41 | using namespace std::chrono_literals; |
42 | using namespace internal::linux_abi; | |
43 | ||
f67539c2 | 44 | class io_desc_read_write final : public io_completion { |
9f95a23c | 45 | io_queue* _ioq_ptr; |
f67539c2 | 46 | fair_queue_ticket _fq_ticket; |
9f95a23c TL |
47 | promise<size_t> _pr; |
48 | private: | |
f67539c2 TL |
49 | void notify_requests_finished() noexcept { |
50 | _ioq_ptr->notify_requests_finished(_fq_ticket); | |
9f95a23c TL |
51 | } |
52 | public: | |
f67539c2 | 53 | io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket) |
9f95a23c | 54 | : _ioq_ptr(ioq) |
f67539c2 | 55 | , _fq_ticket(ticket) |
9f95a23c TL |
56 | {} |
57 | ||
f67539c2 TL |
58 | virtual void set_exception(std::exception_ptr eptr) noexcept override { |
59 | io_log.trace("dev {} : req {} error", _ioq_ptr->dev_id(), fmt::ptr(this)); | |
9f95a23c TL |
60 | notify_requests_finished(); |
61 | _pr.set_exception(eptr); | |
62 | delete this; | |
63 | } | |
64 | ||
f67539c2 TL |
65 | virtual void complete(size_t res) noexcept override { |
66 | io_log.trace("dev {} : req {} complete", _ioq_ptr->dev_id(), fmt::ptr(this)); | |
67 | notify_requests_finished(); | |
68 | _pr.set_value(res); | |
69 | delete this; | |
9f95a23c TL |
70 | } |
71 | ||
72 | future<size_t> get_future() { | |
73 | return _pr.get_future(); | |
74 | } | |
75 | }; | |
76 | ||
f67539c2 TL |
77 | void |
78 | io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept { | |
79 | _requests_executing--; | |
80 | _fq.notify_requests_finished(desc); | |
81 | } | |
9f95a23c TL |
82 | |
83 | fair_queue::config io_queue::make_fair_queue_config(config iocfg) { | |
84 | fair_queue::config cfg; | |
9f95a23c TL |
85 | cfg.max_req_count = iocfg.max_req_count; |
86 | cfg.max_bytes_count = iocfg.max_bytes_count; | |
87 | return cfg; | |
88 | } | |
89 | ||
90 | io_queue::io_queue(io_queue::config cfg) | |
91 | : _priority_classes() | |
92 | , _fq(make_fair_queue_config(cfg)) | |
93 | , _config(std::move(cfg)) { | |
94 | } | |
95 | ||
96 | io_queue::~io_queue() { | |
97 | // It is illegal to stop the I/O queue with pending requests. | |
98 | // Technically we would use a gate to guarantee that. But here, it is not | |
99 | // needed since this is expected to be destroyed only after the reactor is destroyed. | |
100 | // | |
101 | // And that will happen only when there are no more fibers to run. If we ever change | |
102 | // that, then this has to change. | |
103 | for (auto&& pc_vec : _priority_classes) { | |
104 | for (auto&& pc_data : pc_vec) { | |
105 | if (pc_data) { | |
106 | _fq.unregister_priority_class(pc_data->ptr); | |
107 | } | |
108 | } | |
109 | } | |
110 | } | |
111 | ||
112 | std::mutex io_queue::_register_lock; | |
113 | std::array<uint32_t, io_queue::_max_classes> io_queue::_registered_shares; | |
114 | // We could very well just add the name to the io_priority_class. However, because that | |
115 | // structure is passed along all the time - and sometimes we can't help but copy it, better keep | |
116 | // it lean. The name won't really be used for anything other than monitoring. | |
117 | std::array<sstring, io_queue::_max_classes> io_queue::_registered_names; | |
118 | ||
119 | io_priority_class io_queue::register_one_priority_class(sstring name, uint32_t shares) { | |
120 | std::lock_guard<std::mutex> lock(_register_lock); | |
121 | for (unsigned i = 0; i < _max_classes; ++i) { | |
122 | if (!_registered_shares[i]) { | |
123 | _registered_shares[i] = shares; | |
124 | _registered_names[i] = std::move(name); | |
125 | } else if (_registered_names[i] != name) { | |
126 | continue; | |
127 | } else { | |
128 | // found an entry matching the name to be registered, | |
129 | // make sure it was registered with the same number shares | |
130 | // Note: those may change dynamically later on in the | |
131 | // fair queue priority_class_ptr | |
132 | assert(_registered_shares[i] == shares); | |
133 | } | |
134 | return io_priority_class(i); | |
135 | } | |
136 | throw std::runtime_error("No more room for new I/O priority classes"); | |
137 | } | |
138 | ||
f67539c2 TL |
139 | bool io_queue::rename_one_priority_class(io_priority_class pc, sstring new_name) { |
140 | std::lock_guard<std::mutex> guard(_register_lock); | |
141 | for (unsigned i = 0; i < _max_classes; ++i) { | |
142 | if (!_registered_shares[i]) { | |
143 | break; | |
144 | } | |
145 | if (_registered_names[i] == new_name) { | |
146 | if (i == pc.id()) { | |
147 | return false; | |
148 | } else { | |
149 | throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an" | |
150 | " already existing name ({})", new_name)); | |
151 | } | |
152 | } | |
153 | } | |
154 | _registered_names[pc.id()] = new_name; | |
155 | return true; | |
156 | } | |
157 | ||
9f95a23c TL |
158 | seastar::metrics::label io_queue_shard("ioshard"); |
159 | ||
160 | io_queue::priority_class_data::priority_class_data(sstring name, sstring mountpoint, priority_class_ptr ptr, shard_id owner) | |
161 | : ptr(ptr) | |
162 | , bytes(0) | |
163 | , ops(0) | |
164 | , nr_queued(0) | |
165 | , queue_time(1s) | |
166 | { | |
167 | register_stats(name, mountpoint, owner); | |
168 | } | |
169 | ||
170 | void | |
171 | io_queue::priority_class_data::rename(sstring new_name, sstring mountpoint, shard_id owner) { | |
172 | try { | |
173 | register_stats(new_name, mountpoint, owner); | |
174 | } catch (metrics::double_registration &e) { | |
175 | // we need to ignore this exception, since it can happen that | |
176 | // a class that was already created with the new name will be | |
177 | // renamed again (this will cause a double registration exception | |
178 | // to be thrown). | |
179 | } | |
180 | ||
181 | } | |
182 | ||
183 | void | |
184 | io_queue::priority_class_data::register_stats(sstring name, sstring mountpoint, shard_id owner) { | |
185 | seastar::metrics::metric_groups new_metrics; | |
186 | namespace sm = seastar::metrics; | |
187 | auto shard = sm::impl::shard(); | |
188 | ||
189 | auto ioq_group = sm::label("mountpoint"); | |
190 | auto mountlabel = ioq_group(mountpoint); | |
191 | ||
192 | auto class_label_type = sm::label("class"); | |
193 | auto class_label = class_label_type(name); | |
194 | new_metrics.add_group("io_queue", { | |
195 | sm::make_derive("total_bytes", bytes, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}), | |
196 | sm::make_derive("total_operations", ops, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}), | |
197 | // Note: The counter below is not the same as reactor's queued-io-requests | |
198 | // queued-io-requests shows us how many requests in total exist in this I/O Queue. | |
199 | // | |
200 | // This counter lives in the priority class, so it will count only queued requests | |
201 | // that belong to that class. | |
202 | // | |
203 | // In other words: the new counter tells you how busy a class is, and the | |
204 | // old counter tells you how busy the system is. | |
205 | ||
206 | sm::make_queue_length("queue_length", nr_queued, sm::description("Number of requests in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}), | |
207 | sm::make_gauge("delay", [this] { | |
208 | return queue_time.count(); | |
209 | }, sm::description("total delay time in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}), | |
210 | sm::make_gauge("shares", [this] { | |
211 | return this->ptr->shares(); | |
212 | }, sm::description("current amount of shares"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}) | |
213 | }); | |
214 | _metric_groups = std::exchange(new_metrics, {}); | |
215 | } | |
216 | ||
217 | io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_class& pc, shard_id owner) { | |
218 | auto id = pc.id(); | |
219 | bool do_insert = false; | |
220 | if ((do_insert = (owner >= _priority_classes.size()))) { | |
221 | _priority_classes.resize(owner + 1); | |
222 | _priority_classes[owner].resize(id + 1); | |
223 | } else if ((do_insert = (id >= _priority_classes[owner].size()))) { | |
224 | _priority_classes[owner].resize(id + 1); | |
225 | } | |
226 | if (do_insert || !_priority_classes[owner][id]) { | |
227 | auto shares = _registered_shares.at(id); | |
228 | sstring name; | |
229 | { | |
230 | std::lock_guard<std::mutex> lock(_register_lock); | |
231 | name = _registered_names.at(id); | |
232 | } | |
233 | ||
234 | // A note on naming: | |
235 | // | |
236 | // We could just add the owner as the instance id and have something like: | |
237 | // io_queue-<class_owner>-<counter>-<class_name> | |
238 | // | |
239 | // However, when there are more than one shard per I/O queue, it is very useful | |
240 | // to know which shards are being served by the same queue. Therefore, a better name | |
241 | // scheme is: | |
242 | // | |
243 | // io_queue-<queue_owner>-<counter>-<class_name>, shard=<class_owner> | |
244 | // using the shard label to hold the owner number | |
245 | // | |
246 | // This conveys all the information we need and allows one to easily group all classes from | |
247 | // the same I/O queue (by filtering by shard) | |
248 | auto pc_ptr = _fq.register_priority_class(shares); | |
f67539c2 | 249 | auto pc_data = std::make_unique<priority_class_data>(name, mountpoint(), pc_ptr, owner); |
9f95a23c | 250 | |
f67539c2 | 251 | _priority_classes[owner][id] = std::move(pc_data); |
9f95a23c TL |
252 | } |
253 | return *_priority_classes[owner][id]; | |
254 | } | |
255 | ||
f67539c2 TL |
256 | fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) const { |
257 | unsigned weight; | |
258 | size_t size; | |
259 | if (req.is_write()) { | |
260 | weight = _config.disk_req_write_to_read_multiplier; | |
261 | size = _config.disk_bytes_write_to_read_multiplier * len; | |
262 | } else if (req.is_read()) { | |
263 | weight = io_queue::read_request_base_count; | |
264 | size = io_queue::read_request_base_count * len; | |
265 | } else { | |
266 | throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname())); | |
267 | } | |
268 | ||
269 | return fair_queue_ticket(weight, size); | |
270 | } | |
271 | ||
9f95a23c | 272 | future<size_t> |
f67539c2 | 273 | io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept { |
9f95a23c | 274 | auto start = std::chrono::steady_clock::now(); |
f67539c2 | 275 | return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable { |
9f95a23c TL |
276 | // First time will hit here, and then we create the class. It is important |
277 | // that we create the shared pointer in the same shard it will be used at later. | |
278 | auto& pclass = find_or_create_class(pc, owner); | |
f67539c2 TL |
279 | fair_queue_ticket fq_ticket = request_fq_ticket(req, len); |
280 | auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket); | |
9f95a23c | 281 | auto fut = desc->get_future(); |
f67539c2 TL |
282 | io_log.trace("dev {} : req {} queue len {} ticket {}", _config.devid, fmt::ptr(&*desc), len, fq_ticket); |
283 | _fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept { | |
284 | _queued_requests--; | |
285 | _requests_executing++; | |
286 | pclass.nr_queued--; | |
287 | pclass.ops++; | |
288 | pclass.bytes += len; | |
289 | pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start); | |
290 | io_log.trace("dev {} : req {} submit", _config.devid, fmt::ptr(&*d)); | |
291 | engine().submit_io(d.release(), std::move(req)); | |
9f95a23c | 292 | }); |
f67539c2 TL |
293 | pclass.nr_queued++; |
294 | _queued_requests++; | |
9f95a23c TL |
295 | return fut; |
296 | }); | |
297 | } | |
298 | ||
299 | future<> | |
300 | io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) { | |
f67539c2 | 301 | return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] { |
9f95a23c | 302 | auto& pclass = find_or_create_class(pc, owner); |
f67539c2 | 303 | pclass.ptr->update_shares(new_shares); |
9f95a23c TL |
304 | }); |
305 | } | |
306 | ||
307 | void | |
308 | io_queue::rename_priority_class(io_priority_class pc, sstring new_name) { | |
309 | for (unsigned owner = 0; owner < _priority_classes.size(); owner++) { | |
310 | if (_priority_classes[owner].size() > pc.id() && | |
311 | _priority_classes[owner][pc.id()]) { | |
312 | _priority_classes[owner][pc.id()]->rename(new_name, _config.mountpoint, owner); | |
313 | } | |
314 | } | |
315 | } | |
316 | ||
317 | } |