*/
-#include <seastar/core/future-util.hh>
#include <seastar/core/file.hh>
#include <seastar/core/fair_queue.hh>
#include <seastar/core/io_queue.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/linux-aio.hh>
#include <seastar/core/internal/io_desc.hh>
+#include <seastar/util/log.hh>
#include <chrono>
#include <mutex>
#include <array>
namespace seastar {
+logger io_log("io");
+
using namespace std::chrono_literals;
using namespace internal::linux_abi;
-class io_desc_read_write final : public kernel_completion {
+class io_desc_read_write final : public io_completion {
io_queue* _ioq_ptr;
- fair_queue_request_descriptor _fq_desc;
+ fair_queue_ticket _fq_ticket;
promise<size_t> _pr;
private:
- void notify_requests_finished() {
- _ioq_ptr->notify_requests_finished(_fq_desc);
+ void notify_requests_finished() noexcept {
+ _ioq_ptr->notify_requests_finished(_fq_ticket);
}
public:
- io_desc_read_write(io_queue* ioq, unsigned weight, unsigned size)
+ io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
: _ioq_ptr(ioq)
- , _fq_desc(fair_queue_request_descriptor{weight, size})
+ , _fq_ticket(ticket)
{}
- fair_queue_request_descriptor& fq_descriptor() {
- return _fq_desc;
- }
-
- void set_exception(std::exception_ptr eptr) {
+ virtual void set_exception(std::exception_ptr eptr) noexcept override {
+ io_log.trace("dev {} : req {} error", _ioq_ptr->dev_id(), fmt::ptr(this));
notify_requests_finished();
_pr.set_exception(eptr);
delete this;
}
- virtual void complete_with(ssize_t ret) override {
- try {
- engine().handle_io_result(ret);
- notify_requests_finished();
- _pr.set_value(ret);
- delete this;
- } catch (...) {
- set_exception(std::current_exception());
- }
+ virtual void complete(size_t res) noexcept override {
+ io_log.trace("dev {} : req {} complete", _ioq_ptr->dev_id(), fmt::ptr(this));
+ notify_requests_finished();
+ _pr.set_value(res);
+ delete this;
}
future<size_t> get_future() {
}
};
+void
+io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {
+ _requests_executing--;
+ _fq.notify_requests_finished(desc);
+}
fair_queue::config io_queue::make_fair_queue_config(config iocfg) {
fair_queue::config cfg;
- cfg.capacity = std::min(iocfg.capacity, reactor::max_aio_per_queue);
cfg.max_req_count = iocfg.max_req_count;
cfg.max_bytes_count = iocfg.max_bytes_count;
return cfg;
throw std::runtime_error("No more room for new I/O priority classes");
}
+bool io_queue::rename_one_priority_class(io_priority_class pc, sstring new_name) {
+ std::lock_guard<std::mutex> guard(_register_lock);
+ for (unsigned i = 0; i < _max_classes; ++i) {
+ if (!_registered_shares[i]) {
+ break;
+ }
+ if (_registered_names[i] == new_name) {
+ if (i == pc.id()) {
+ return false;
+ } else {
+ throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an"
+ " already existing name ({})", new_name));
+ }
+ }
+ }
+ _registered_names[pc.id()] = new_name;
+ return true;
+}
+
seastar::metrics::label io_queue_shard("ioshard");
io_queue::priority_class_data::priority_class_data(sstring name, sstring mountpoint, priority_class_ptr ptr, shard_id owner)
// This conveys all the information we need and allows one to easily group all classes from
// the same I/O queue (by filtering by shard)
auto pc_ptr = _fq.register_priority_class(shares);
- auto pc_data = make_lw_shared<priority_class_data>(name, mountpoint(), pc_ptr, owner);
+ auto pc_data = std::make_unique<priority_class_data>(name, mountpoint(), pc_ptr, owner);
- _priority_classes[owner][id] = pc_data;
+ _priority_classes[owner][id] = std::move(pc_data);
}
return *_priority_classes[owner][id];
}
+fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) const {
+ unsigned weight;
+ size_t size;
+ if (req.is_write()) {
+ weight = _config.disk_req_write_to_read_multiplier;
+ size = _config.disk_bytes_write_to_read_multiplier * len;
+ } else if (req.is_read()) {
+ weight = io_queue::read_request_base_count;
+ size = io_queue::read_request_base_count * len;
+ } else {
+ throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
+ }
+
+ return fair_queue_ticket(weight, size);
+}
+
future<size_t>
-io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) {
+io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
auto start = std::chrono::steady_clock::now();
- return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable {
+ return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
// First time will hit here, and then we create the class. It is important
// that we create the shared pointer in the same shard it will be used at later.
auto& pclass = find_or_create_class(pc, owner);
- pclass.nr_queued++;
- unsigned weight;
- size_t size;
- if (req.is_write()) {
- weight = _config.disk_req_write_to_read_multiplier;
- size = _config.disk_bytes_write_to_read_multiplier * len;
- } else if (req.is_read()) {
- weight = io_queue::read_request_base_count;
- size = io_queue::read_request_base_count * len;
- } else {
- throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
- }
- auto desc = std::make_unique<io_desc_read_write>(this, weight, size);
- auto fq_desc = desc->fq_descriptor();
+ fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
+ auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
auto fut = desc->get_future();
- _fq.queue(pclass.ptr, std::move(fq_desc), [&pclass, start, req = std::move(req), desc = desc.release(), len] () mutable noexcept {
- try {
- pclass.nr_queued--;
- pclass.ops++;
- pclass.bytes += len;
- pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
- engine().submit_io(desc, std::move(req));
- } catch (...) {
- desc->set_exception(std::current_exception());
- }
+ io_log.trace("dev {} : req {} queue len {} ticket {}", _config.devid, fmt::ptr(&*desc), len, fq_ticket);
+ _fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
+ _queued_requests--;
+ _requests_executing++;
+ pclass.nr_queued--;
+ pclass.ops++;
+ pclass.bytes += len;
+ pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
+ io_log.trace("dev {} : req {} submit", _config.devid, fmt::ptr(&*d));
+ engine().submit_io(d.release(), std::move(req));
});
+ pclass.nr_queued++;
+ _queued_requests++;
return fut;
});
}
future<>
io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
- return smp::submit_to(coordinator(), [this, pc, owner = engine().cpu_id(), new_shares] {
+ return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] {
auto& pclass = find_or_create_class(pc, owner);
- _fq.update_shares(pclass.ptr, new_shares);
+ pclass.ptr->update_shares(new_shares);
});
}