using namespace std::chrono_literals;
struct request {
- fair_queue_ticket fqdesc;
+ fair_queue_entry fqent;
+ std::function<void(request& req)> handle;
unsigned index;
- request(unsigned weight, unsigned index)
- : fqdesc({weight, 0})
+ template <typename Func>
+ request(unsigned weight, unsigned index, Func&& h)
+ : fqent(fair_queue_ticket(weight, 0))
+ , handle(std::move(h))
, index(index)
{}
-};
+ void submit() {
+ handle(*this);
+ delete this;
+ }
+};
class test_env {
+ fair_group _fg;
fair_queue _fq;
std::vector<int> _results;
std::vector<std::vector<std::exception_ptr>> _exceptions;
- std::vector<priority_class_ptr> _classes;
+ fair_queue::class_id _nr_classes = 0;
std::vector<request> _inflight;
void drain() {
do {} while (tick() != 0);
}
public:
- test_env(unsigned capacity) : _fq(capacity)
+ test_env(unsigned capacity)
+ : _fg(fair_group::config(capacity, std::numeric_limits<int>::max()))
+ , _fq(_fg, fair_queue::config())
{}
// As long as there is a request sitting in the queue, tick() will process
// before the queue is destroyed.
unsigned tick(unsigned n = 1) {
unsigned processed = 0;
- _fq.dispatch_requests();
+ _fq.dispatch_requests([] (fair_queue_entry& ent) {
+ boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
+ });
for (unsigned i = 0; i < n; ++i) {
std::vector<request> curr;
for (auto& req : curr) {
processed++;
_results[req.index]++;
- _fq.notify_requests_finished(req.fqdesc);
+ _fq.notify_request_finished(req.fqent.ticket());
}
- _fq.dispatch_requests();
+ _fq.dispatch_requests([] (fair_queue_entry& ent) {
+ boost::intrusive::get_parent_from_member(&ent, &request::fqent)->submit();
+ });
}
return processed;
}
~test_env() {
drain();
- for (auto& p: _classes) {
- _fq.unregister_priority_class(p);
+ for (fair_queue::class_id id = 0; id < _nr_classes; id++) {
+ _fq.unregister_priority_class(id);
}
}
size_t register_priority_class(uint32_t shares) {
_results.push_back(0);
_exceptions.push_back(std::vector<std::exception_ptr>());
- _classes.push_back(_fq.register_priority_class(shares));
- return _classes.size() - 1;
+ _fq.register_priority_class(_nr_classes, shares);
+ return _nr_classes++;
}
- void do_op(unsigned index, unsigned weight) {
- auto cl = _classes[index];
- auto req = request(weight, index);
-
- _fq.queue(cl, req.fqdesc, [this, index, req] () mutable noexcept {
+ void do_op(fair_queue::class_id id, unsigned weight) {
+ unsigned index = id;
+ auto req = std::make_unique<request>(weight, index, [this, index] (request& req) mutable noexcept {
try {
_inflight.push_back(std::move(req));
} catch (...) {
auto eptr = std::current_exception();
_exceptions[index].push_back(eptr);
- _fq.notify_requests_finished(req.fqdesc);
+ _fq.notify_request_finished(req.fqent.ticket());
}
});
+
+ _fq.queue(id, req->fqent);
+ req.release();
}
- void update_shares(unsigned index, uint32_t shares) {
- auto cl = _classes[index];
- cl->update_shares(shares);
+ void update_shares(fair_queue::class_id id, uint32_t shares) {
+ _fq.update_shares_for_class(id, shares);
}
void reset_results(unsigned index) {
// long period of time, ticking slowly
for (int i = 0; i < 1000; ++i) {
sleep(1ms).get();
- env.tick(2);
+ env.tick(3);
}
env.verify("longer_run_different_shares", {1, 2}, 2);
}