]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
11fdf7f2 TL |
3 | |
4 | #include "common/async/completion.h" | |
5 | #include "rgw_dmclock_async_scheduler.h" | |
6 | #include "rgw_dmclock_scheduler.h" | |
7 | ||
8 | namespace rgw::dmclock { | |
9 | ||
10 | AsyncScheduler::~AsyncScheduler() | |
11 | { | |
12 | cancel(); | |
13 | if (observer) { | |
14 | cct->_conf.remove_observer(this); | |
15 | } | |
16 | } | |
17 | ||
18 | const char** AsyncScheduler::get_tracked_conf_keys() const | |
19 | { | |
20 | if (observer) { | |
21 | return observer->get_tracked_conf_keys(); | |
22 | } | |
23 | static const char* keys[] = { "rgw_max_concurrent_requests", nullptr }; | |
24 | return keys; | |
25 | } | |
26 | ||
27 | void AsyncScheduler::handle_conf_change(const ConfigProxy& conf, | |
28 | const std::set<std::string>& changed) | |
29 | { | |
30 | if (observer) { | |
31 | observer->handle_conf_change(conf, changed); | |
32 | } | |
33 | if (changed.count("rgw_max_concurrent_requests")) { | |
34 | auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests"); | |
35 | max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max(); | |
36 | } | |
37 | queue.update_client_infos(); | |
38 | schedule(crimson::dmclock::TimeZero); | |
39 | } | |
40 | ||
41 | int AsyncScheduler::schedule_request_impl(const client_id& client, | |
42 | const ReqParams& params, | |
43 | const Time& time, const Cost& cost, | |
44 | optional_yield yield_ctx) | |
45 | { | |
46 | ceph_assert(yield_ctx); | |
47 | ||
48 | auto &yield = yield_ctx.get_yield_context(); | |
49 | boost::system::error_code ec; | |
50 | async_request(client, params, time, cost, yield[ec]); | |
51 | ||
52 | if (ec){ | |
53 | if (ec == boost::system::errc::resource_unavailable_try_again) | |
54 | return -EAGAIN; | |
55 | else | |
56 | return -ec.value(); | |
57 | } | |
58 | ||
59 | return 0; | |
60 | } | |
61 | ||
62 | void AsyncScheduler::request_complete() | |
63 | { | |
64 | --outstanding_requests; | |
65 | schedule(crimson::dmclock::TimeZero); | |
66 | } | |
67 | ||
68 | void AsyncScheduler::cancel() | |
69 | { | |
70 | ClientSums sums; | |
71 | ||
72 | queue.remove_by_req_filter([&] (RequestRef&& request) { | |
73 | inc(sums, request->client, request->cost); | |
74 | auto c = static_cast<Completion*>(request.release()); | |
75 | Completion::dispatch(std::unique_ptr<Completion>{c}, | |
76 | boost::asio::error::operation_aborted, | |
77 | PhaseType::priority); | |
78 | return true; | |
79 | }); | |
80 | timer.cancel(); | |
81 | ||
82 | for (size_t i = 0; i < client_count; i++) { | |
83 | if (auto c = counters(static_cast<client_id>(i))) { | |
84 | on_cancel(c, sums[i]); | |
85 | } | |
86 | } | |
87 | } | |
88 | ||
89 | void AsyncScheduler::cancel(const client_id& client) | |
90 | { | |
91 | ClientSum sum; | |
92 | ||
93 | queue.remove_by_client(client, false, [&] (RequestRef&& request) { | |
94 | sum.count++; | |
95 | sum.cost += request->cost; | |
96 | auto c = static_cast<Completion*>(request.release()); | |
97 | Completion::dispatch(std::unique_ptr<Completion>{c}, | |
98 | boost::asio::error::operation_aborted, | |
99 | PhaseType::priority); | |
100 | }); | |
101 | if (auto c = counters(client)) { | |
102 | on_cancel(c, sum); | |
103 | } | |
104 | schedule(crimson::dmclock::TimeZero); | |
105 | } | |
106 | ||
107 | void AsyncScheduler::schedule(const Time& time) | |
108 | { | |
109 | timer.expires_at(Clock::from_double(time)); | |
110 | timer.async_wait([this] (boost::system::error_code ec) { | |
111 | // process requests unless the wait was canceled. note that a canceled | |
112 | // wait may execute after this AsyncScheduler destructs | |
113 | if (ec != boost::asio::error::operation_aborted) { | |
114 | process(get_time()); | |
115 | } | |
116 | }); | |
117 | } | |
118 | ||
119 | void AsyncScheduler::process(const Time& now) | |
120 | { | |
121 | // must run in the executor. we should only invoke completion handlers if the | |
122 | // executor is running | |
123 | assert(get_executor().running_in_this_thread()); | |
124 | ||
125 | ClientSums rsums, psums; | |
126 | ||
127 | while (outstanding_requests < max_requests) { | |
128 | auto pull = queue.pull_request(now); | |
129 | ||
130 | if (pull.is_none()) { | |
131 | // no pending requests, cancel the timer | |
132 | timer.cancel(); | |
133 | break; | |
134 | } | |
135 | if (pull.is_future()) { | |
136 | // update the timer based on the future time | |
137 | schedule(pull.getTime()); | |
138 | break; | |
139 | } | |
140 | ++outstanding_requests; | |
141 | ||
142 | // complete the request | |
143 | auto& r = pull.get_retn(); | |
144 | auto client = r.client; | |
145 | auto phase = r.phase; | |
146 | auto started = r.request->started; | |
147 | auto cost = r.request->cost; | |
148 | auto c = static_cast<Completion*>(r.request.release()); | |
149 | Completion::post(std::unique_ptr<Completion>{c}, | |
150 | boost::system::error_code{}, phase); | |
151 | ||
152 | if (auto c = counters(client)) { | |
153 | auto lat = Clock::from_double(now) - Clock::from_double(started); | |
154 | if (phase == PhaseType::reservation) { | |
155 | inc(rsums, client, cost); | |
156 | c->tinc(queue_counters::l_res_latency, lat); | |
157 | } else { | |
158 | inc(psums, client, cost); | |
159 | c->tinc(queue_counters::l_prio_latency, lat); | |
160 | } | |
161 | } | |
162 | } | |
163 | ||
164 | if (outstanding_requests >= max_requests) { | |
165 | if(auto c = counters(client_id::count)){ | |
166 | c->inc(throttle_counters::l_throttle); | |
167 | } | |
168 | } | |
169 | ||
170 | for (size_t i = 0; i < client_count; i++) { | |
171 | if (auto c = counters(static_cast<client_id>(i))) { | |
172 | on_process(c, rsums[i], psums[i]); | |
173 | } | |
174 | } | |
175 | } | |
176 | ||
177 | } // namespace rgw::dmclock |