]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
11fdf7f2 TL |
4 | #include "rgw_dmclock_scheduler.h" |
5 | #include "rgw_dmclock_sync_scheduler.h" | |
6 | #include "rgw_dmclock_scheduler_ctx.h" | |
7 | ||
8 | namespace rgw::dmclock { | |
9 | ||
10 | SyncScheduler::~SyncScheduler() | |
11 | { | |
12 | cancel(); | |
13 | } | |
14 | ||
15 | int SyncScheduler::add_request(const client_id& client, const ReqParams& params, | |
16 | const Time& time, Cost cost) | |
17 | { | |
18 | std::mutex req_mtx; | |
19 | std::condition_variable req_cv; | |
20 | ReqState rstate {ReqState::Wait}; | |
21 | auto req = SyncRequest{client, time, cost, req_mtx, req_cv, rstate, counters}; | |
22 | int r = queue.add_request_time(req, client, params, time, cost); | |
23 | if (r == 0) { | |
24 | if (auto c = counters(client)) { | |
25 | c->inc(queue_counters::l_qlen); | |
26 | c->inc(queue_counters::l_cost, cost); | |
27 | } | |
28 | queue.request_completed(); | |
29 | // Perform a blocking wait until the request callback is called | |
9f95a23c TL |
30 | { |
31 | std::unique_lock lock{req_mtx}; | |
32 | req_cv.wait(lock, [&rstate] {return rstate != ReqState::Wait;}); | |
11fdf7f2 TL |
33 | } |
34 | if (rstate == ReqState::Cancelled) { | |
35 | //FIXME: decide on error code for cancelled request | |
36 | r = -ECONNABORTED; | |
37 | } | |
38 | } else { | |
39 | // post the error code | |
40 | if (auto c = counters(client)) { | |
41 | c->inc(queue_counters::l_limit); | |
42 | c->inc(queue_counters::l_limit_cost, cost); | |
43 | } | |
44 | } | |
45 | return r; | |
46 | } | |
47 | ||
48 | void SyncScheduler::handle_request_cb(const client_id &c, | |
49 | std::unique_ptr<SyncRequest> req, | |
50 | PhaseType phase, Cost cost) | |
51 | { | |
52 | { std::lock_guard<std::mutex> lg(req->req_mtx); | |
53 | req->req_state = ReqState::Ready; | |
54 | req->req_cv.notify_one(); | |
55 | } | |
56 | ||
57 | if (auto ctr = req->counters(c)) { | |
58 | auto lat = Clock::from_double(get_time()) - Clock::from_double(req->started); | |
59 | if (phase == PhaseType::reservation){ | |
60 | ctr->tinc(queue_counters::l_res_latency, lat); | |
61 | ctr->inc(queue_counters::l_res); | |
62 | if (cost) ctr->inc(queue_counters::l_res_cost); | |
63 | } else if (phase == PhaseType::priority){ | |
64 | ctr->tinc(queue_counters::l_prio_latency, lat); | |
65 | ctr->inc(queue_counters::l_prio); | |
66 | if (cost) ctr->inc(queue_counters::l_prio_cost); | |
67 | } | |
68 | ctr->dec(queue_counters::l_qlen); | |
69 | if (cost) ctr->dec(queue_counters::l_cost); | |
70 | } | |
71 | } | |
72 | ||
73 | ||
74 | void SyncScheduler::cancel(const client_id& client) | |
75 | { | |
76 | ClientSum sum; | |
77 | ||
78 | queue.remove_by_client(client, false, [&](RequestRef&& request) | |
79 | { | |
80 | sum.count++; | |
81 | sum.cost += request->cost; | |
82 | { | |
83 | std::lock_guard <std::mutex> lg(request->req_mtx); | |
84 | request->req_state = ReqState::Cancelled; | |
85 | request->req_cv.notify_one(); | |
86 | } | |
87 | }); | |
88 | if (auto c = counters(client)) { | |
89 | on_cancel(c, sum); | |
90 | } | |
91 | ||
92 | queue.request_completed(); | |
93 | } | |
94 | ||
95 | void SyncScheduler::cancel() | |
96 | { | |
97 | ClientSums sums; | |
98 | ||
99 | queue.remove_by_req_filter([&](RequestRef&& request) -> bool | |
100 | { | |
101 | inc(sums, request->client, request->cost); | |
102 | { | |
103 | std::lock_guard<std::mutex> lg(request->req_mtx); | |
104 | request->req_state = ReqState::Cancelled; | |
105 | request->req_cv.notify_one(); | |
106 | } | |
107 | return true; | |
108 | }); | |
109 | ||
110 | for (size_t i = 0; i < client_count; i++) { | |
111 | if (auto c = counters(static_cast<client_id>(i))) { | |
112 | on_cancel(c, sums[i]); | |
113 | } | |
114 | } | |
115 | } | |
116 | ||
117 | } // namespace rgw::dmclock |