]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/core/io_queue.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / io_queue.cc
index 8603ab5c37ef16b58edc7144dc919369aeb41a33..4fb4b5b9ac8d2eef5ad2711b90c703c50dab4a0a 100644 (file)
@@ -20,7 +20,6 @@
  */
 
 
-#include <seastar/core/future-util.hh>
 #include <seastar/core/file.hh>
 #include <seastar/core/fair_queue.hh>
 #include <seastar/core/io_queue.hh>
@@ -28,6 +27,7 @@
 #include <seastar/core/metrics.hh>
 #include <seastar/core/linux-aio.hh>
 #include <seastar/core/internal/io_desc.hh>
+#include <seastar/util/log.hh>
 #include <chrono>
 #include <mutex>
 #include <array>
 
 namespace seastar {
 
+logger io_log("io");
+
 using namespace std::chrono_literals;
 using namespace internal::linux_abi;
 
-class io_desc_read_write final : public kernel_completion {
+class io_desc_read_write final : public io_completion {
     io_queue* _ioq_ptr;
-    fair_queue_request_descriptor _fq_desc;
+    fair_queue_ticket _fq_ticket;
     promise<size_t> _pr;
 private:
-    void notify_requests_finished() {
-        _ioq_ptr->notify_requests_finished(_fq_desc);
+    void notify_requests_finished() noexcept {
+        _ioq_ptr->notify_requests_finished(_fq_ticket);
     }
 public:
-    io_desc_read_write(io_queue* ioq, unsigned weight, unsigned size)
+    io_desc_read_write(io_queue* ioq, fair_queue_ticket ticket)
         : _ioq_ptr(ioq)
-        , _fq_desc(fair_queue_request_descriptor{weight, size})
+        , _fq_ticket(ticket)
     {}
 
-    fair_queue_request_descriptor& fq_descriptor() {
-        return _fq_desc;
-    }
-
-    void set_exception(std::exception_ptr eptr) {
+    virtual void set_exception(std::exception_ptr eptr) noexcept override {
+        io_log.trace("dev {} : req {} error", _ioq_ptr->dev_id(), fmt::ptr(this));
         notify_requests_finished();
         _pr.set_exception(eptr);
         delete this;
     }
 
-    virtual void complete_with(ssize_t ret) override {
-        try {
-            engine().handle_io_result(ret);
-            notify_requests_finished();
-            _pr.set_value(ret);
-            delete this;
-        } catch (...) {
-            set_exception(std::current_exception());
-        }
+    virtual void complete(size_t res) noexcept override {
+        io_log.trace("dev {} : req {} complete", _ioq_ptr->dev_id(), fmt::ptr(this));
+        notify_requests_finished();
+        _pr.set_value(res);
+        delete this;
     }
 
     future<size_t> get_future() {
@@ -79,10 +74,14 @@ public:
     }
 };
 
+void
+io_queue::notify_requests_finished(fair_queue_ticket& desc) noexcept {
+    _requests_executing--;
+    _fq.notify_requests_finished(desc);
+}
 
 fair_queue::config io_queue::make_fair_queue_config(config iocfg) {
     fair_queue::config cfg;
-    cfg.capacity = std::min(iocfg.capacity, reactor::max_aio_per_queue);
     cfg.max_req_count = iocfg.max_req_count;
     cfg.max_bytes_count = iocfg.max_bytes_count;
     return cfg;
@@ -137,6 +136,25 @@ io_priority_class io_queue::register_one_priority_class(sstring name, uint32_t s
     throw std::runtime_error("No more room for new I/O priority classes");
 }
 
+bool io_queue::rename_one_priority_class(io_priority_class pc, sstring new_name) {
+    std::lock_guard<std::mutex> guard(_register_lock);
+    for (unsigned i = 0; i < _max_classes; ++i) {
+       if (!_registered_shares[i]) {
+           break;
+       }
+       if (_registered_names[i] == new_name) {
+           if (i == pc.id()) {
+               return false;
+           } else {
+               throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an"
+                       " already existing name ({})", new_name));
+           }
+       }
+    }
+    _registered_names[pc.id()] = new_name;
+    return true;
+}
+
 seastar::metrics::label io_queue_shard("ioshard");
 
 io_queue::priority_class_data::priority_class_data(sstring name, sstring mountpoint, priority_class_ptr ptr, shard_id owner)
@@ -228,55 +246,61 @@ io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_
         // This conveys all the information we need and allows one to easily group all classes from
         // the same I/O queue (by filtering by shard)
         auto pc_ptr = _fq.register_priority_class(shares);
-        auto pc_data = make_lw_shared<priority_class_data>(name, mountpoint(), pc_ptr, owner);
+        auto pc_data = std::make_unique<priority_class_data>(name, mountpoint(), pc_ptr, owner);
 
-        _priority_classes[owner][id] = pc_data;
+        _priority_classes[owner][id] = std::move(pc_data);
     }
     return *_priority_classes[owner][id];
 }
 
+fair_queue_ticket io_queue::request_fq_ticket(const internal::io_request& req, size_t len) const {
+    unsigned weight;
+    size_t size;
+    if (req.is_write()) {
+        weight = _config.disk_req_write_to_read_multiplier;
+        size = _config.disk_bytes_write_to_read_multiplier * len;
+    } else if (req.is_read()) {
+        weight = io_queue::read_request_base_count;
+        size = io_queue::read_request_base_count * len;
+    } else {
+        throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
+    }
+
+    return fair_queue_ticket(weight, size);
+}
+
 future<size_t>
-io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) {
+io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req) noexcept {
     auto start = std::chrono::steady_clock::now();
-    return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = engine().cpu_id(), this] () mutable {
+    return smp::submit_to(coordinator(), [start, &pc, len, req = std::move(req), owner = this_shard_id(), this] () mutable {
         // First time will hit here, and then we create the class. It is important
         // that we create the shared pointer in the same shard it will be used at later.
         auto& pclass = find_or_create_class(pc, owner);
-        pclass.nr_queued++;
-        unsigned weight;
-        size_t size;
-        if (req.is_write()) {
-            weight = _config.disk_req_write_to_read_multiplier;
-            size = _config.disk_bytes_write_to_read_multiplier * len;
-        } else if (req.is_read()) {
-            weight = io_queue::read_request_base_count;
-            size = io_queue::read_request_base_count * len;
-        } else {
-            throw std::runtime_error(fmt::format("Unrecognized request passing through I/O queue {}", req.opname()));
-        }
-        auto desc = std::make_unique<io_desc_read_write>(this, weight, size);
-        auto fq_desc = desc->fq_descriptor();
+        fair_queue_ticket fq_ticket = request_fq_ticket(req, len);
+        auto desc = std::make_unique<io_desc_read_write>(this, fq_ticket);
         auto fut = desc->get_future();
-        _fq.queue(pclass.ptr, std::move(fq_desc), [&pclass, start, req = std::move(req), desc = desc.release(), len] () mutable noexcept {
-            try {
-                pclass.nr_queued--;
-                pclass.ops++;
-                pclass.bytes += len;
-                pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
-                engine().submit_io(desc, std::move(req));
-            } catch (...) {
-                desc->set_exception(std::current_exception());
-            }
+        io_log.trace("dev {} : req {} queue  len {} ticket {}", _config.devid, fmt::ptr(&*desc), len, fq_ticket);
+        _fq.queue(pclass.ptr, std::move(fq_ticket), [&pclass, start, req = std::move(req), d = std::move(desc), len, this] () mutable noexcept {
+            _queued_requests--;
+            _requests_executing++;
+            pclass.nr_queued--;
+            pclass.ops++;
+            pclass.bytes += len;
+            pclass.queue_time = std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - start);
+            io_log.trace("dev {} : req {} submit", _config.devid, fmt::ptr(&*d));
+            engine().submit_io(d.release(), std::move(req));
         });
+        pclass.nr_queued++;
+        _queued_requests++;
         return fut;
     });
 }
 
 future<>
 io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
-    return smp::submit_to(coordinator(), [this, pc, owner = engine().cpu_id(), new_shares] {
+    return smp::submit_to(coordinator(), [this, pc, owner = this_shard_id(), new_shares] {
         auto& pclass = find_or_create_class(pc, owner);
-        _fq.update_shares(pclass.ptr, new_shares);
+        pclass.ptr->update_shares(new_shares);
     });
 }