2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
23 #include <seastar/core/file.hh>
24 #include <seastar/core/fair_queue.hh>
25 #include <seastar/core/io_queue.hh>
26 #include <seastar/core/reactor.hh>
27 #include <seastar/core/metrics.hh>
28 #include <seastar/core/linux-aio.hh>
29 #include <seastar/core/internal/io_desc.hh>
30 #include <seastar/util/log.hh>
34 #include <fmt/format.h>
35 #include <fmt/ostream.h>
41 using namespace std::chrono_literals
;
42 using namespace internal::linux_abi
;
44 class io_desc_read_write final
: public io_completion
{
46 fair_queue_ticket _fq_ticket
;
49 void notify_requests_finished() noexcept
{
50 _ioq_ptr
->notify_requests_finished(_fq_ticket
);
53 io_desc_read_write(io_queue
* ioq
, fair_queue_ticket ticket
)
58 virtual void set_exception(std::exception_ptr eptr
) noexcept override
{
59 io_log
.trace("dev {} : req {} error", _ioq_ptr
->dev_id(), fmt::ptr(this));
60 notify_requests_finished();
61 _pr
.set_exception(eptr
);
65 virtual void complete(size_t res
) noexcept override
{
66 io_log
.trace("dev {} : req {} complete", _ioq_ptr
->dev_id(), fmt::ptr(this));
67 notify_requests_finished();
72 future
<size_t> get_future() {
73 return _pr
.get_future();
78 io_queue::notify_requests_finished(fair_queue_ticket
& desc
) noexcept
{
79 _requests_executing
--;
80 _fq
.notify_requests_finished(desc
);
83 fair_queue::config
io_queue::make_fair_queue_config(config iocfg
) {
84 fair_queue::config cfg
;
85 cfg
.max_req_count
= iocfg
.max_req_count
;
86 cfg
.max_bytes_count
= iocfg
.max_bytes_count
;
90 io_queue::io_queue(io_queue::config cfg
)
92 , _fq(make_fair_queue_config(cfg
))
93 , _config(std::move(cfg
)) {
96 io_queue::~io_queue() {
97 // It is illegal to stop the I/O queue with pending requests.
98 // Technically we would use a gate to guarantee that. But here, it is not
99 // needed since this is expected to be destroyed only after the reactor is destroyed.
101 // And that will happen only when there are no more fibers to run. If we ever change
102 // that, then this has to change.
103 for (auto&& pc_vec
: _priority_classes
) {
104 for (auto&& pc_data
: pc_vec
) {
106 _fq
.unregister_priority_class(pc_data
->ptr
);
112 std::mutex
io_queue::_register_lock
;
113 std::array
<uint32_t, io_queue::_max_classes
> io_queue::_registered_shares
;
114 // We could very well just add the name to the io_priority_class. However, because that
115 // structure is passed along all the time - and sometimes we can't help but copy it, better keep
116 // it lean. The name won't really be used for anything other than monitoring.
117 std::array
<sstring
, io_queue::_max_classes
> io_queue::_registered_names
;
119 io_priority_class
io_queue::register_one_priority_class(sstring name
, uint32_t shares
) {
120 std::lock_guard
<std::mutex
> lock(_register_lock
);
121 for (unsigned i
= 0; i
< _max_classes
; ++i
) {
122 if (!_registered_shares
[i
]) {
123 _registered_shares
[i
] = shares
;
124 _registered_names
[i
] = std::move(name
);
125 } else if (_registered_names
[i
] != name
) {
128 // found an entry matching the name to be registered,
129 // make sure it was registered with the same number shares
130 // Note: those may change dynamically later on in the
131 // fair queue priority_class_ptr
132 assert(_registered_shares
[i
] == shares
);
134 return io_priority_class(i
);
136 throw std::runtime_error("No more room for new I/O priority classes");
139 bool io_queue::rename_one_priority_class(io_priority_class pc
, sstring new_name
) {
140 std::lock_guard
<std::mutex
> guard(_register_lock
);
141 for (unsigned i
= 0; i
< _max_classes
; ++i
) {
142 if (!_registered_shares
[i
]) {
145 if (_registered_names
[i
] == new_name
) {
149 throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an"
150 " already existing name ({})", new_name
));
154 _registered_names
[pc
.id()] = new_name
;
158 seastar::metrics::label
io_queue_shard("ioshard");
160 io_queue::priority_class_data::priority_class_data(sstring name
, sstring mountpoint
, priority_class_ptr ptr
, shard_id owner
)
167 register_stats(name
, mountpoint
, owner
);
171 io_queue::priority_class_data::rename(sstring new_name
, sstring mountpoint
, shard_id owner
) {
173 register_stats(new_name
, mountpoint
, owner
);
174 } catch (metrics::double_registration
&e
) {
175 // we need to ignore this exception, since it can happen that
176 // a class that was already created with the new name will be
177 // renamed again (this will cause a double registration exception
184 io_queue::priority_class_data::register_stats(sstring name
, sstring mountpoint
, shard_id owner
) {
185 seastar::metrics::metric_groups new_metrics
;
186 namespace sm
= seastar::metrics
;
187 auto shard
= sm::impl::shard();
189 auto ioq_group
= sm::label("mountpoint");
190 auto mountlabel
= ioq_group(mountpoint
);
192 auto class_label_type
= sm::label("class");
193 auto class_label
= class_label_type(name
);
194 new_metrics
.add_group("io_queue", {
195 sm::make_derive("total_bytes", bytes
, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
196 sm::make_derive("total_operations", ops
, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
197 // Note: The counter below is not the same as reactor's queued-io-requests
198 // queued-io-requests shows us how many requests in total exist in this I/O Queue.
200 // This counter lives in the priority class, so it will count only queued requests
201 // that belong to that class.
203 // In other words: the new counter tells you how busy a class is, and the
204 // old counter tells you how busy the system is.
206 sm::make_queue_length("queue_length", nr_queued
, sm::description("Number of requests in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
207 sm::make_gauge("delay", [this] {
208 return queue_time
.count();
209 }, sm::description("total delay time in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
210 sm::make_gauge("shares", [this] {
211 return this->ptr
->shares();
212 }, sm::description("current amount of shares"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
})
214 _metric_groups
= std::exchange(new_metrics
, {});
217 io_queue::priority_class_data
& io_queue::find_or_create_class(const io_priority_class
& pc
, shard_id owner
) {
219 bool do_insert
= false;
220 if ((do_insert
= (owner
>= _priority_classes
.size()))) {
221 _priority_classes
.resize(owner
+ 1);
222 _priority_classes
[owner
].resize(id
+ 1);
223 } else if ((do_insert
= (id
>= _priority_classes
[owner
].size()))) {
224 _priority_classes
[owner
].resize(id
+ 1);
226 if (do_insert
|| !_priority_classes
[owner
][id
]) {
227 auto shares
= _registered_shares
.at(id
);
230 std::lock_guard
<std::mutex
> lock(_register_lock
);
231 name
= _registered_names
.at(id
);
236 // We could just add the owner as the instance id and have something like:
237 // io_queue-<class_owner>-<counter>-<class_name>
239 // However, when there are more than one shard per I/O queue, it is very useful
240 // to know which shards are being served by the same queue. Therefore, a better name
243 // io_queue-<queue_owner>-<counter>-<class_name>, shard=<class_owner>
244 // using the shard label to hold the owner number
246 // This conveys all the information we need and allows one to easily group all classes from
247 // the same I/O queue (by filtering by shard)
248 auto pc_ptr
= _fq
.register_priority_class(shares
);
249 auto pc_data
= std::make_unique
<priority_class_data
>(name
, mountpoint(), pc_ptr
, owner
);
251 _priority_classes
[owner
][id
] = std::move(pc_data
);
253 return *_priority_classes
[owner
][id
];
256 fair_queue_ticket
io_queue::request_fq_ticket(const internal::io_request
& req
, size_t len
) const {
259 if (req
.is_write()) {
260 weight
= _config
.disk_req_write_to_read_multiplier
;
261 size
= _config
.disk_bytes_write_to_read_multiplier
* len
;
262 } else if (req
.is_read()) {
263 weight
= io_queue::read_request_base_count
;
264 size
= io_queue::read_request_base_count
* len
;
266 throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req
.opname()));
269 return fair_queue_ticket(weight
, size
);
273 io_queue::queue_request(const io_priority_class
& pc
, size_t len
, internal::io_request req
) noexcept
{
274 auto start
= std::chrono::steady_clock::now();
275 return smp::submit_to(coordinator(), [start
, &pc
, len
, req
= std::move(req
), owner
= this_shard_id(), this] () mutable {
276 // First time will hit here, and then we create the class. It is important
277 // that we create the shared pointer in the same shard it will be used at later.
278 auto& pclass
= find_or_create_class(pc
, owner
);
279 fair_queue_ticket fq_ticket
= request_fq_ticket(req
, len
);
280 auto desc
= std::make_unique
<io_desc_read_write
>(this, fq_ticket
);
281 auto fut
= desc
->get_future();
282 io_log
.trace("dev {} : req {} queue len {} ticket {}", _config
.devid
, fmt::ptr(&*desc
), len
, fq_ticket
);
283 _fq
.queue(pclass
.ptr
, std::move(fq_ticket
), [&pclass
, start
, req
= std::move(req
), d
= std::move(desc
), len
, this] () mutable noexcept
{
285 _requests_executing
++;
289 pclass
.queue_time
= std::chrono::duration_cast
<std::chrono::duration
<double>>(std::chrono::steady_clock::now() - start
);
290 io_log
.trace("dev {} : req {} submit", _config
.devid
, fmt::ptr(&*d
));
291 engine().submit_io(d
.release(), std::move(req
));
300 io_queue::update_shares_for_class(const io_priority_class pc
, size_t new_shares
) {
301 return smp::submit_to(coordinator(), [this, pc
, owner
= this_shard_id(), new_shares
] {
302 auto& pclass
= find_or_create_class(pc
, owner
);
303 pclass
.ptr
->update_shares(new_shares
);
308 io_queue::rename_priority_class(io_priority_class pc
, sstring new_name
) {
309 for (unsigned owner
= 0; owner
< _priority_classes
.size(); owner
++) {
310 if (_priority_classes
[owner
].size() > pc
.id() &&
311 _priority_classes
[owner
][pc
.id()]) {
312 _priority_classes
[owner
][pc
.id()]->rename(new_name
, _config
.mountpoint
, owner
);