} // TEST
+ TEST(dmclock_server, add_req_pushprio_queue) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PushPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+ ClientId client1 = 17;
+ ClientId client2 = 34;
+
+ dmc::ClientInfo ci(0.0, 1.0, 0.0);
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &ci;
+ };
+ auto server_ready_f = [] () -> bool { return true; };
+ auto submit_req_f = [] (const ClientId& c,
+ std::unique_ptr<MyReq> req,
+ dmc::PhaseType phase,
+ uint64_t req_cost) {
+ // empty; do nothing
+ };
+
+ Queue pq(client_info_f,
+ server_ready_f,
+ submit_req_f,
+ std::chrono::seconds(3),
+ std::chrono::seconds(5),
+ std::chrono::seconds(2),
+ AtLimit::Wait);
+
+ auto lock_pq = [&](std::function<void()> code) {
+ test_locked(pq.data_mtx, code);
+ };
+
+ lock_pq([&] () {
+ EXPECT_EQ(0u, pq.client_map.size()) <<
+ "client map initially has size 0";
+ });
+
+ dmc::ReqParams req_params(1, 1);
+
+ // Create a reference to a request
+ MyReqRef rr1 = MyReqRef(new MyReq(11));
+
+ // Exercise different versions of add_request()
+ EXPECT_EQ(0, pq.add_request(std::move(rr1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(22), client2, req_params));
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+
+ lock_pq([&] () {
+ EXPECT_EQ(2u, pq.client_map.size()) <<
+ "client map has 2 after 2 clients";
+ EXPECT_FALSE(pq.client_map.at(client1)->idle) <<
+ "initially client1 map entry shows not idle.";
+ EXPECT_FALSE(pq.client_map.at(client2)->idle) <<
+ "initially client2 map entry shows not idle.";
+ });
+
+ // Check client idle state
+ std::this_thread::sleep_for(std::chrono::seconds(6));
+ lock_pq([&] () {
+ EXPECT_TRUE(pq.client_map.at(client1)->idle) <<
+ "after idle age client1 map entry shows idle.";
+ EXPECT_TRUE(pq.client_map.at(client2)->idle) <<
+ "after idle age client2 map entry shows idle.";
+ });
+
+ // Sleep until after erase age elapses
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ lock_pq([&] () {
+ EXPECT_EQ(0u, pq.client_map.size()) <<
+ "client map loses its entries after erase age";
+ });
+ } // TEST
+
+
+ TEST(dmclock_server, schedule_req_pushprio_queue) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PushPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+ ClientId client1 = 17;
+ ClientId client2 = 34;
+ ClientId client3 = 48;
+
+ dmc::ClientInfo ci(1.0, 1.0, 1.0);
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &ci;
+ };
+ auto server_ready_f = [] () -> bool { return true; };
+ auto submit_req_f = [] (const ClientId& c,
+ std::unique_ptr<MyReq> req,
+ dmc::PhaseType phase,
+ uint64_t req_cost) {
+ // empty; do nothing
+ };
+
+ Queue pq(client_info_f,
+ server_ready_f,
+ submit_req_f,
+ std::chrono::seconds(3),
+ std::chrono::seconds(5),
+ std::chrono::seconds(2),
+ AtLimit::Wait);
+
+ dmc::ReqParams req_params(1, 1);
+
+ // Create a reference to a request
+ MyReqRef rr1 = MyReqRef(new MyReq(11));
+
+ // Exercise different versions of add_request()
+ EXPECT_EQ(0, pq.add_request(std::move(rr1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(22), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(MyReq(33), client3, req_params));
+
+ std::this_thread::sleep_for(std::chrono::seconds(4));
+
+ ASSERT_TRUE(pq.request_count() == 0);
+ } // TEST
+
+
TEST(dmclock_server, delayed_tag_calc) {
using ClientId = int;
constexpr ClientId client1 = 17;
} // TEST
- TEST(dmclock_server_pull, pull_weight) {
+ TEST(dmclock_server, add_req_ref) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+ ClientId client1 = 22;
+ ClientId client2 = 44;
+
+ dmc::ClientInfo info(0.0, 1.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ Queue pq(client_info_f, AtLimit::Allow);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ ReqParams req_params(1,1);
+
+ MyReqRef rr1 = MyReqRef(new MyReq(1));
+ MyReqRef rr2 = MyReqRef(new MyReq(2));
+ MyReqRef rr3 = MyReqRef(new MyReq(3));
+ MyReqRef rr4 = MyReqRef(new MyReq(4));
+ MyReqRef rr5 = MyReqRef(new MyReq(5));
+ EXPECT_EQ(0, pq.add_request(std::move(rr1), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(std::move(rr2), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(std::move(rr3), client1, req_params));
+ EXPECT_EQ(0, pq.add_request(std::move(rr4), client2, req_params));
+ EXPECT_EQ(0, pq.add_request(std::move(rr5), client2, req_params));
+
+ EXPECT_EQ(2u, pq.client_count());
+ EXPECT_EQ(5u, pq.request_count());
+
+ pq.remove_by_req_filter([](MyReqRef&& r) -> bool {return 0 == r->id % 2;});
+
+ EXPECT_EQ(3u, pq.request_count());
+
+ std::list<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (MyReqRef&& r) -> bool {
+ if (1 == r->id % 2) {
+ capture.push_front(*r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(3u, capture.size());
+ int total = 0;
+ for (auto i : capture) {
+ total += i.id;
+ }
+ EXPECT_EQ(9, total) << " sum of captured items should be 9";
+ } // TEST
+
+
+ TEST(dmclock_server, add_req_ref_null_req_params) {
+ struct MyReq {
+ int id;
+
+ MyReq(int _id) :
+ id(_id)
+ {
+ // empty
+ }
+ }; // MyReq
+
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,MyReq>;
+ using MyReqRef = typename Queue::RequestRef;
+ ClientId client1 = 22;
+ ClientId client2 = 44;
+
+ dmc::ClientInfo info(0.0, 1.0, 0.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ return &info;
+ };
+
+ Queue pq(client_info_f, AtLimit::Allow);
+
+ EXPECT_EQ(0u, pq.client_count());
+ EXPECT_EQ(0u, pq.request_count());
+
+ MyReqRef&& rr1 = MyReqRef(new MyReq(1));
+ MyReqRef&& rr2 = MyReqRef(new MyReq(2));
+ MyReqRef&& rr3 = MyReqRef(new MyReq(3));
+ MyReqRef&& rr4 = MyReqRef(new MyReq(4));
+ MyReqRef&& rr5 = MyReqRef(new MyReq(5));
+ EXPECT_EQ(0, pq.add_request(std::move(rr1), client1));
+ EXPECT_EQ(0, pq.add_request(std::move(rr2), client2));
+ EXPECT_EQ(0, pq.add_request(std::move(rr3), client1));
+ EXPECT_EQ(0, pq.add_request(std::move(rr4), client2));
+ EXPECT_EQ(0, pq.add_request(std::move(rr5), client2));
+
+ EXPECT_EQ(2u, pq.client_count());
+ EXPECT_EQ(5u, pq.request_count());
+
+ pq.remove_by_req_filter([](MyReqRef&& r) -> bool {return 1 == r->id % 2;});
+
+ EXPECT_EQ(2u, pq.request_count());
+
+ std::list<MyReq> capture;
+ pq.remove_by_req_filter(
+ [&capture] (MyReqRef&& r) -> bool {
+ if (0 == r->id % 2) {
+ capture.push_front(*r);
+ return true;
+ } else {
+ return false;
+ }
+ },
+ true);
+
+ EXPECT_EQ(0u, pq.request_count());
+ EXPECT_EQ(2u, capture.size());
+ int total = 0;
+ for (auto i : capture) {
+ total += i.id;
+ }
+ EXPECT_EQ(6, total) << " sum of captured items should be 6";
+ } // TEST
+
+
+ TEST(dmclock_server_pull, pull_weight) {
using ClientId = int;
using Queue = dmc::PullPriorityQueue<ClientId,Request>;
using QueueRef = std::unique_ptr<Queue>;
ClientId client1 = 17;
ClientId client2 = 98;
- std::vector<dmc::ClientInfo> info1;
- std::vector<dmc::ClientInfo> info2;
-
- info1.push_back(dmc::ClientInfo(0.0, 100.0, 0.0));
- info1.push_back(dmc::ClientInfo(0.0, 150.0, 0.0));
-
- info2.push_back(dmc::ClientInfo(0.0, 200.0, 0.0));
- info2.push_back(dmc::ClientInfo(0.0, 50.0, 0.0));
+ dmc::ClientInfo info1[] = {
+ dmc::ClientInfo(0.0, 100.0, 0.0),
+ dmc::ClientInfo(0.0, 150.0, 0.0)};
+ dmc::ClientInfo info2[] = {
+ dmc::ClientInfo(0.0, 200.0, 0.0),
+ dmc::ClientInfo(0.0, 50.0, 0.0)};
size_t cli_info_group = 0;
pq = QueueRef(new Queue(client_info_f, AtLimit::Wait));
- ReqParams req_params(1,1);
-
auto now = dmc::get_time();
- for (int i = 0; i < 5; ++i) {
- EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
- EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
- now += 0.0001;
- }
-
- int c1_count = 0;
- int c2_count = 0;
- for (int i = 0; i < 10; ++i) {
- Queue::PullReq pr = pq->pull_request();
- EXPECT_EQ(Queue::NextReqType::returning, pr.type);
- auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ auto run_test = [&](float lower_bound, float upper_bound) {
+ ReqParams req_params(1,1);
+ constexpr unsigned num_requests = 1000;
- if (i > 5) continue;
- if (client1 == retn.client) ++c1_count;
- else if (client2 == retn.client) ++c2_count;
- else ADD_FAILURE() << "got request from neither of two clients";
+ for (int i = 0; i < num_requests; i += 2) {
+ EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
+ EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
+ now += 0.0001;
+ }
- EXPECT_EQ(PhaseType::priority, retn.phase);
- }
+ int c1_count = 0;
+ int c2_count = 0;
+ for (int i = 0; i < num_requests; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ // only count the specified portion of the served request
+ if (i < num_requests * lower_bound || i > num_requests * upper_bound) {
+ continue;
+ }
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ if (client1 == retn.client) {
+ ++c1_count;
+ } else if (client2 == retn.client) {
+ ++c2_count;
+ } else {
+ ADD_FAILURE() << "got request from neither of two clients";
+ }
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
- EXPECT_EQ(2, c1_count) <<
- "before: one-third of request should have come from first client";
- EXPECT_EQ(4, c2_count) <<
- "before: two-thirds of request should have come from second client";
+ constexpr float tolerance = 0.002;
+ float prop1 = float(info1[cli_info_group].weight) / (info1[cli_info_group].weight +
+ info2[cli_info_group].weight);
+ float prop2 = float(info2[cli_info_group].weight) / (info1[cli_info_group].weight +
+ info2[cli_info_group].weight);
+ EXPECT_NEAR(float(c1_count) / (c1_count + c2_count), prop1, tolerance) <<
+ "before: " << prop1 << " of requests should have come from first client";
+ EXPECT_NEAR
+ (float(c2_count) / (c1_count + c2_count), prop2, tolerance) <<
+ "before: " << prop2 << " of requests should have come from second client";
+ };
+ cli_info_group = 0;
+ // only count the first half of the served requests, so we can check
+ // the prioritized ones
+ run_test(0.0F /* lower bound */,
+ 0.5F /* upper bound */);
std::chrono::seconds dura(1);
std::this_thread::sleep_for(dura);
+ // check the middle part of the request sequence which is less likely
+ // to be impacted by previous requests served before we switch to the
+ // new client info.
cli_info_group = 1;
-
- now = dmc::get_time();
-
- for (int i = 0; i < 6; ++i) {
- EXPECT_EQ(0, pq->add_request(Request{}, client1, req_params));
- EXPECT_EQ(0, pq->add_request(Request{}, client2, req_params));
- now += 0.0001;
- }
-
- c1_count = 0;
- c2_count = 0;
- for (int i = 0; i < 8; ++i) {
- Queue::PullReq pr = pq->pull_request();
- EXPECT_EQ(Queue::NextReqType::returning, pr.type);
- auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
-
- if (client1 == retn.client) ++c1_count;
- else if (client2 == retn.client) ++c2_count;
- else ADD_FAILURE() << "got request from neither of two clients";
-
- EXPECT_EQ(PhaseType::priority, retn.phase);
- }
-
- EXPECT_EQ(6, c1_count) <<
- "after: one-third of request should have come from first client";
- EXPECT_EQ(2, c2_count) <<
- "after: two-thirds of request should have come from second client";
+ run_test(1.0F/3 /* lower bound */,
+ 2.0F/3 /* upper bound */);
}
-
// This test shows what happens when a request can be ready (under
// limit) but not schedulable since proportion tag is 0. We expect
// to get some future and none responses.
EXPECT_EQ(0, pq.add_request_time({}, client1, {}, Time{3})); // 3 over
}
+
+ TEST(dmclock_server_pull, pull_wait_at_limit) {
+ using ClientId = int;
+ using Queue = dmc::PullPriorityQueue<ClientId,Request>;
+ using QueueRef = std::unique_ptr<Queue>;
+
+ ClientId client1 = 52;
+ ClientId client2 = 8;
+
+ // Create client1 with high limit.
+ // Create client2 with low limit and with lower weight than client1
+ dmc::ClientInfo info1(1.0, 2.0, 100.0);
+ dmc::ClientInfo info2(1.0, 1.0, 2.0);
+
+ auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
+ if (client1 == c) {
+ return &info1;
+ } else if (client2 == c) {
+ return &info2;
+ } else {
+ ADD_FAILURE() << "client info looked up for non-existent client";
+ return nullptr;
+ }
+ };
+
+ QueueRef pq(new Queue(client_info_f, AtLimit::Wait));
+
+ ReqParams req_params(1,1);
+
+ // make sure all times are before now
+ auto add_time = dmc::get_time() - 1.0;
+ auto old_time = add_time;
+
+ for (int i = 0; i < 50; ++i) {
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client1, req_params, add_time));
+ EXPECT_EQ(0, pq->add_request_time(Request{}, client2, req_params, add_time));
+ add_time += 0.01;
+ }
+
+ EXPECT_EQ(2u, pq->client_count());
+ EXPECT_EQ(100u, pq->request_count());
+ int c1_count = 0;
+ int c2_count = 0;
+
+ // Pull couple of requests, should come from reservation queue.
+ // One request each from client1 and client2 should be pulled.
+ for (int i = 0; i < 2; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) {
+ ++c1_count;
+ } else if (client2 == retn.client) {
+ ++c2_count;
+ } else {
+ ADD_FAILURE() << "got request from neither of two clients";
+ }
+
+ EXPECT_EQ(PhaseType::reservation, retn.phase);
+ }
+
+ EXPECT_EQ(1, c1_count) <<
+ "one request should have come from first client";
+ EXPECT_EQ(1, c2_count) <<
+ "one request should have come from second client";
+
+ EXPECT_EQ(2u, pq->client_count());
+ EXPECT_EQ(98u, pq->request_count());
+
+ // Pull more requests out.
+ // All remaining requests from client1 should be pulled.
+ // Only 1 request from client2 should be pulled.
+ for (int i = 0; i < 50; ++i) {
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+
+ if (client1 == retn.client) {
+ ++c1_count;
+ } else if (client2 == retn.client) {
+ ++c2_count;
+ } else {
+ ADD_FAILURE() << "got request from neither of two clients";
+ }
+
+ EXPECT_EQ(PhaseType::priority, retn.phase);
+ }
+
+ EXPECT_EQ(2u, pq->client_count());
+ EXPECT_EQ(48u, pq->request_count());
+
+ // Pulling the remaining client2 requests shouldn't succeed.
+ Queue::PullReq pr = pq->pull_request();
+ EXPECT_EQ(Queue::NextReqType::future, pr.type);
+ Time when_ready = pr.getTime();
+ EXPECT_EQ(old_time + 2.0, when_ready);
+
+ EXPECT_EQ(50, c1_count) <<
+ "half of the total requests should have come from first client";
+ EXPECT_EQ(2, c2_count) <<
+ "only two requests should have come from second client";
+
+ // Trying to pull a request after restoring the limit should succeed.
+ pr = pq->pull_request(old_time + 2.0);
+ EXPECT_EQ(Queue::NextReqType::returning, pr.type);
+ auto& retn = boost::get<Queue::PullReq::Retn>(pr.data);
+ EXPECT_EQ(retn.client, client2);
+ EXPECT_EQ(47u, pq->request_count());
+ } // dmclock_server_pull.pull_wait_at_limit
+
} // namespace dmclock
} // namespace crimson