fair_queue::class_id _nr_classes = 0;
std::vector<request> _inflight;
+ static fair_group::config fg_config(unsigned cap) {
+ fair_group::config cfg;
+ cfg.weight_rate = 1'000'000;
+ cfg.size_rate = std::numeric_limits<int>::max();
+ cfg.rate_limit_duration = std::chrono::microseconds(cap);
+ return cfg;
+ }
+
+ static fair_queue::config fq_config() {
+ fair_queue::config cfg;
+ cfg.tau = std::chrono::microseconds(50);
+ return cfg;
+ }
+
void drain() {
do {} while (tick() != 0);
}
public:
test_env(unsigned capacity)
- : _fg(fair_group::config(capacity, std::numeric_limits<int>::max()))
- , _fq(_fg, fair_queue::config())
+ : _fg(fg_config(capacity))
+ , _fq(_fg, fq_config())
{}
// As long as there is a request sitting in the queue, tick() will process
// before the queue is destroyed.
unsigned tick(unsigned n = 1) {
unsigned processed = 0;
+ _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1));
_fq.dispatch_requests([] (fair_queue_entry& ent) {
boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
});
_fq.notify_request_finished(req.fqent.ticket());
}
+ _fg.replenish_capacity(_fg.replenished_ts() + std::chrono::microseconds(1));
_fq.dispatch_requests([] (fair_queue_entry& ent) {
boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
});
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// allow half the requests in
env.tick(100);
env.verify("equal_2classes", {1, 1});
env.do_op(c, 1);
env.do_op(d, 1);
}
- later().get();
+ yield().get();
// allow half the requests in
env.tick(200);
env.verify("equal_4classes", {1, 1, 1, 1});
env.do_op(a, 1);
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// allow half the requests in
env.tick(100);
return env.verify("different_shares", {1, 2});
env.do_op(a, 1);
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// queue has capacity 10, 10 x 10 = 100, allow half the requests in
env.tick(10);
env.do_op(a, 1);
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// queue has capacity 10, 10 x 10 = 100, allow half the requests in
env.tick(10);
env.verify("different_shares_hi_capacity", {1, 2});
env.do_op(a, 2);
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// allow half the requests in
env.tick(100);
env.verify("different_weights", {1, 2});
for (int i = 0; i < 100; ++i) {
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// consume all requests
env.tick(100);
env.verify("dominant_queue", {1, 0});
}
-// Class2 pushes many requests at first. After enough time, this shouldn't matter anymore.
+// Class2 pushes many requests at first. Right after, don't expect Class1 to be able to do the same
SEASTAR_THREAD_TEST_CASE(test_fair_queue_forgiving_queue) {
test_env env(1);
+ // The fair_queue preemption logic allows one class to gain exclusive
+ // queue access for at most tau duration. Test queue configures the
+ // request rate to be 1/us and tau to be 50us, so after (re-)activation
+ // a queue can overrun its peer by at most 50 requests.
+
auto a = env.register_priority_class(10);
auto b = env.register_priority_class(10);
for (int i = 0; i < 100; ++i) {
- env.do_op(b, 1);
+ env.do_op(a, 1);
}
- later().get();
+ yield().get();
// consume all requests
env.tick(100);
- sleep(500ms).get();
- env.reset_results(b);
+ env.reset_results(a);
+
for (int i = 0; i < 100; ++i) {
env.do_op(a, 1);
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// allow half the requests in
env.tick(100);
- env.verify("forgiving_queue", {1, 1});
+ // 50 requests should be passed from b, other 100 should be shared 1:1
+ env.verify("forgiving_queue", {1, 3}, 2);
}
// Classes push requests and then update swap their shares. In the end, should have executed
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// allow 25% of the requests in
env.tick(250);
env.update_shares(a, 10);
env.update_shares(b, 20);
- later().get();
+ yield().get();
// allow 25% of the requests in
env.tick(250);
env.verify("update_shares", {1, 1}, 2);
env.do_op(b, 1);
}
- later().get();
+ yield().get();
// In total allow half the requests in
env.tick(reqs);