]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/tests/unit/fair_queue_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / tests / unit / fair_queue_test.cc
index 02d63b161503837077088f990774fff5c98f408d..33ce6f50025dc5035bc47a9ee5bff02a0da5d80f 100644 (file)
@@ -37,28 +37,38 @@ using namespace seastar;
 using namespace std::chrono_literals;
 
 struct request {
-    fair_queue_ticket fqdesc;
+    fair_queue_entry fqent;
+    std::function<void(request& req)> handle;
     unsigned index;
 
-    request(unsigned weight, unsigned index)
-        : fqdesc({weight, 0})
+    template <typename Func>
+    request(unsigned weight, unsigned index, Func&& h)
+        : fqent(fair_queue_ticket(weight, 0))
+        , handle(std::move(h))
         , index(index)
     {}
-};
 
+    void submit() {
+        handle(*this);
+        delete this;
+    }
+};
 
 class test_env {
+    fair_group _fg;
     fair_queue _fq;
     std::vector<int> _results;
     std::vector<std::vector<std::exception_ptr>> _exceptions;
-    std::vector<priority_class_ptr> _classes;
+    fair_queue::class_id _nr_classes = 0;
     std::vector<request> _inflight;
 
     void drain() {
         do {} while (tick() != 0);
     }
 public:
-    test_env(unsigned capacity) : _fq(capacity)
+    test_env(unsigned capacity)
+        : _fg(fair_group::config(capacity, std::numeric_limits<int>::max()))
+        , _fq(_fg, fair_queue::config())
     {}
 
     // As long as there is a request sitting in the queue, tick() will process
@@ -70,7 +80,9 @@ public:
     // before the queue is destroyed.
     unsigned tick(unsigned n = 1) {
         unsigned processed = 0;
-        _fq.dispatch_requests();
+        _fq.dispatch_requests([] (fair_queue_entry& ent) {
+            boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
+        });
 
         for (unsigned i = 0; i < n; ++i) {
             std::vector<request> curr;
@@ -79,46 +91,48 @@ public:
             for (auto& req : curr) {
                 processed++;
                 _results[req.index]++;
-                _fq.notify_requests_finished(req.fqdesc);
+                _fq.notify_request_finished(req.fqent.ticket());
             }
 
-            _fq.dispatch_requests();
+            _fq.dispatch_requests([] (fair_queue_entry& ent) {
+                boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
+            });
         }
         return processed;
     }
 
     ~test_env() {
         drain();
-        for (auto& p: _classes) {
-            _fq.unregister_priority_class(p);
+        for (fair_queue::class_id id = 0; id < _nr_classes; id++) {
+            _fq.unregister_priority_class(id);
         }
     }
 
     size_t register_priority_class(uint32_t shares) {
         _results.push_back(0);
         _exceptions.push_back(std::vector<std::exception_ptr>());
-        _classes.push_back(_fq.register_priority_class(shares));
-        return _classes.size() - 1;
+        _fq.register_priority_class(_nr_classes, shares);
+        return _nr_classes++;
     }
 
-    void do_op(unsigned index, unsigned weight) {
-        auto cl = _classes[index];
-        auto req = request(weight, index);
-
-        _fq.queue(cl, req.fqdesc, [this, index, req] () mutable noexcept {
+    void do_op(fair_queue::class_id id, unsigned weight) {
+        unsigned index = id;
+        auto req = std::make_unique<request>(weight, index, [this, index] (request& req) mutable noexcept {
             try {
                 _inflight.push_back(std::move(req));
             } catch (...) {
                 auto eptr = std::current_exception();
                 _exceptions[index].push_back(eptr);
-                _fq.notify_requests_finished(req.fqdesc);
+                _fq.notify_request_finished(req.fqent.ticket());
             }
         });
+
+        _fq.queue(id, req->fqent);
+        req.release();
     }
 
-    void update_shares(unsigned index, uint32_t shares) {
-        auto cl = _classes[index];
-        cl->update_shares(shares);
+    void update_shares(fair_queue::class_id id, uint32_t shares) {
+        _fq.update_shares_for_class(id, shares);
     }
 
     void reset_results(unsigned index) {
@@ -378,7 +392,7 @@ SEASTAR_THREAD_TEST_CASE(test_fair_queue_longer_run_different_shares) {
     // long period of time, ticking slowly
     for (int i = 0; i < 1000; ++i) {
         sleep(1ms).get();
-        env.tick(2);
+        env.tick(3);
     }
     env.verify("longer_run_different_shares", {1, 2}, 2);
 }