1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_dmclock_scheduler.h"
5 #include "rgw_dmclock_sync_scheduler.h"
6 #include "rgw_dmclock_scheduler_ctx.h"
8 namespace rgw::dmclock
{
10 SyncScheduler::~SyncScheduler()
15 int SyncScheduler::add_request(const client_id
& client
, const ReqParams
& params
,
16 const Time
& time
, Cost cost
)
19 std::condition_variable req_cv
;
20 ReqState rstate
{ReqState::Wait
};
21 auto req
= SyncRequest
{client
, time
, cost
, req_mtx
, req_cv
, rstate
, counters
};
22 int r
= queue
.add_request_time(req
, client
, params
, time
, cost
);
24 if (auto c
= counters(client
)) {
25 c
->inc(queue_counters::l_qlen
);
26 c
->inc(queue_counters::l_cost
, cost
);
28 queue
.request_completed();
29 // Perform a blocking wait until the request callback is called
31 std::unique_lock lock
{req_mtx
};
32 req_cv
.wait(lock
, [&rstate
] {return rstate
!= ReqState::Wait
;});
34 if (rstate
== ReqState::Cancelled
) {
35 //FIXME: decide on error code for cancelled request
39 // post the error code
40 if (auto c
= counters(client
)) {
41 c
->inc(queue_counters::l_limit
);
42 c
->inc(queue_counters::l_limit_cost
, cost
);
48 void SyncScheduler::handle_request_cb(const client_id
&c
,
49 std::unique_ptr
<SyncRequest
> req
,
50 PhaseType phase
, Cost cost
)
52 { std::lock_guard
<std::mutex
> lg(req
->req_mtx
);
53 req
->req_state
= ReqState::Ready
;
54 req
->req_cv
.notify_one();
57 if (auto ctr
= req
->counters(c
)) {
58 auto lat
= Clock::from_double(get_time()) - Clock::from_double(req
->started
);
59 if (phase
== PhaseType::reservation
){
60 ctr
->tinc(queue_counters::l_res_latency
, lat
);
61 ctr
->inc(queue_counters::l_res
);
62 if (cost
) ctr
->inc(queue_counters::l_res_cost
);
63 } else if (phase
== PhaseType::priority
){
64 ctr
->tinc(queue_counters::l_prio_latency
, lat
);
65 ctr
->inc(queue_counters::l_prio
);
66 if (cost
) ctr
->inc(queue_counters::l_prio_cost
);
68 ctr
->dec(queue_counters::l_qlen
);
69 if (cost
) ctr
->dec(queue_counters::l_cost
);
74 void SyncScheduler::cancel(const client_id
& client
)
78 queue
.remove_by_client(client
, false, [&](RequestRef
&& request
)
81 sum
.cost
+= request
->cost
;
83 std::lock_guard
<std::mutex
> lg(request
->req_mtx
);
84 request
->req_state
= ReqState::Cancelled
;
85 request
->req_cv
.notify_one();
88 if (auto c
= counters(client
)) {
92 queue
.request_completed();
95 void SyncScheduler::cancel()
99 queue
.remove_by_req_filter([&](RequestRef
&& request
) -> bool
101 inc(sums
, request
->client
, request
->cost
);
103 std::lock_guard
<std::mutex
> lg(request
->req_mtx
);
104 request
->req_state
= ReqState::Cancelled
;
105 request
->req_cv
.notify_one();
110 for (size_t i
= 0; i
< client_count
; i
++) {
111 if (auto c
= counters(static_cast<client_id
>(i
))) {
112 on_cancel(c
, sums
[i
]);
117 } // namespace rgw::dmclock