]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_dmclock_async_scheduler.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_dmclock_async_scheduler.cc
1
2 #include "common/async/completion.h"
3 #include "rgw_dmclock_async_scheduler.h"
4 #include "rgw_dmclock_scheduler.h"
5
6 namespace rgw::dmclock {
7
8 AsyncScheduler::~AsyncScheduler()
9 {
10 cancel();
11 if (observer) {
12 cct->_conf.remove_observer(this);
13 }
14 }
15
16 const char** AsyncScheduler::get_tracked_conf_keys() const
17 {
18 if (observer) {
19 return observer->get_tracked_conf_keys();
20 }
21 static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
22 return keys;
23 }
24
25 void AsyncScheduler::handle_conf_change(const ConfigProxy& conf,
26 const std::set<std::string>& changed)
27 {
28 if (observer) {
29 observer->handle_conf_change(conf, changed);
30 }
31 if (changed.count("rgw_max_concurrent_requests")) {
32 auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
33 max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
34 }
35 queue.update_client_infos();
36 schedule(crimson::dmclock::TimeZero);
37 }
38
39 int AsyncScheduler::schedule_request_impl(const client_id& client,
40 const ReqParams& params,
41 const Time& time, const Cost& cost,
42 optional_yield yield_ctx)
43 {
44 ceph_assert(yield_ctx);
45
46 auto &yield = yield_ctx.get_yield_context();
47 boost::system::error_code ec;
48 async_request(client, params, time, cost, yield[ec]);
49
50 if (ec){
51 if (ec == boost::system::errc::resource_unavailable_try_again)
52 return -EAGAIN;
53 else
54 return -ec.value();
55 }
56
57 return 0;
58 }
59
60 void AsyncScheduler::request_complete()
61 {
62 --outstanding_requests;
63 schedule(crimson::dmclock::TimeZero);
64 }
65
66 void AsyncScheduler::cancel()
67 {
68 ClientSums sums;
69
70 queue.remove_by_req_filter([&] (RequestRef&& request) {
71 inc(sums, request->client, request->cost);
72 auto c = static_cast<Completion*>(request.release());
73 Completion::dispatch(std::unique_ptr<Completion>{c},
74 boost::asio::error::operation_aborted,
75 PhaseType::priority);
76 return true;
77 });
78 timer.cancel();
79
80 for (size_t i = 0; i < client_count; i++) {
81 if (auto c = counters(static_cast<client_id>(i))) {
82 on_cancel(c, sums[i]);
83 }
84 }
85 }
86
87 void AsyncScheduler::cancel(const client_id& client)
88 {
89 ClientSum sum;
90
91 queue.remove_by_client(client, false, [&] (RequestRef&& request) {
92 sum.count++;
93 sum.cost += request->cost;
94 auto c = static_cast<Completion*>(request.release());
95 Completion::dispatch(std::unique_ptr<Completion>{c},
96 boost::asio::error::operation_aborted,
97 PhaseType::priority);
98 });
99 if (auto c = counters(client)) {
100 on_cancel(c, sum);
101 }
102 schedule(crimson::dmclock::TimeZero);
103 }
104
105 void AsyncScheduler::schedule(const Time& time)
106 {
107 timer.expires_at(Clock::from_double(time));
108 timer.async_wait([this] (boost::system::error_code ec) {
109 // process requests unless the wait was canceled. note that a canceled
110 // wait may execute after this AsyncScheduler destructs
111 if (ec != boost::asio::error::operation_aborted) {
112 process(get_time());
113 }
114 });
115 }
116
117 void AsyncScheduler::process(const Time& now)
118 {
119 // must run in the executor. we should only invoke completion handlers if the
120 // executor is running
121 assert(get_executor().running_in_this_thread());
122
123 ClientSums rsums, psums;
124
125 while (outstanding_requests < max_requests) {
126 auto pull = queue.pull_request(now);
127
128 if (pull.is_none()) {
129 // no pending requests, cancel the timer
130 timer.cancel();
131 break;
132 }
133 if (pull.is_future()) {
134 // update the timer based on the future time
135 schedule(pull.getTime());
136 break;
137 }
138 ++outstanding_requests;
139
140 // complete the request
141 auto& r = pull.get_retn();
142 auto client = r.client;
143 auto phase = r.phase;
144 auto started = r.request->started;
145 auto cost = r.request->cost;
146 auto c = static_cast<Completion*>(r.request.release());
147 Completion::post(std::unique_ptr<Completion>{c},
148 boost::system::error_code{}, phase);
149
150 if (auto c = counters(client)) {
151 auto lat = Clock::from_double(now) - Clock::from_double(started);
152 if (phase == PhaseType::reservation) {
153 inc(rsums, client, cost);
154 c->tinc(queue_counters::l_res_latency, lat);
155 } else {
156 inc(psums, client, cost);
157 c->tinc(queue_counters::l_prio_latency, lat);
158 }
159 }
160 }
161
162 if (outstanding_requests >= max_requests) {
163 if(auto c = counters(client_id::count)){
164 c->inc(throttle_counters::l_throttle);
165 }
166 }
167
168 for (size_t i = 0; i < client_count; i++) {
169 if (auto c = counters(static_cast<client_id>(i))) {
170 on_process(c, rsums[i], psums[i]);
171 }
172 }
173 }
174
175 } // namespace rgw::dmclock