]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_dmclock_async_scheduler.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / rgw_dmclock_async_scheduler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/async/completion.h"
5 #include "rgw_dmclock_async_scheduler.h"
6 #include "rgw_dmclock_scheduler.h"
7
8 namespace rgw::dmclock {
9
10 AsyncScheduler::~AsyncScheduler()
11 {
12 cancel();
13 if (observer) {
14 cct->_conf.remove_observer(this);
15 }
16 }
17
18 const char** AsyncScheduler::get_tracked_conf_keys() const
19 {
20 if (observer) {
21 return observer->get_tracked_conf_keys();
22 }
23 static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
24 return keys;
25 }
26
27 void AsyncScheduler::handle_conf_change(const ConfigProxy& conf,
28 const std::set<std::string>& changed)
29 {
30 if (observer) {
31 observer->handle_conf_change(conf, changed);
32 }
33 if (changed.count("rgw_max_concurrent_requests")) {
34 auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
35 max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
36 }
37 queue.update_client_infos();
38 schedule(crimson::dmclock::TimeZero);
39 }
40
41 int AsyncScheduler::schedule_request_impl(const client_id& client,
42 const ReqParams& params,
43 const Time& time, const Cost& cost,
44 optional_yield yield_ctx)
45 {
46 ceph_assert(yield_ctx);
47
48 auto &yield = yield_ctx.get_yield_context();
49 boost::system::error_code ec;
50 async_request(client, params, time, cost, yield[ec]);
51
52 if (ec){
53 if (ec == boost::system::errc::resource_unavailable_try_again)
54 return -EAGAIN;
55 else
56 return -ec.value();
57 }
58
59 return 0;
60 }
61
62 void AsyncScheduler::request_complete()
63 {
64 --outstanding_requests;
65 if(auto c = counters(client_id::count)){
66 c->inc(throttle_counters::l_outstanding, -1);
67 }
68 schedule(crimson::dmclock::TimeZero);
69 }
70
71 void AsyncScheduler::cancel()
72 {
73 ClientSums sums;
74
75 queue.remove_by_req_filter([&] (RequestRef&& request) {
76 inc(sums, request->client, request->cost);
77 auto c = static_cast<Completion*>(request.release());
78 Completion::dispatch(std::unique_ptr<Completion>{c},
79 boost::asio::error::operation_aborted,
80 PhaseType::priority);
81 return true;
82 });
83 timer.cancel();
84
85 for (size_t i = 0; i < client_count; i++) {
86 if (auto c = counters(static_cast<client_id>(i))) {
87 on_cancel(c, sums[i]);
88 }
89 }
90 }
91
92 void AsyncScheduler::cancel(const client_id& client)
93 {
94 ClientSum sum;
95
96 queue.remove_by_client(client, false, [&] (RequestRef&& request) {
97 sum.count++;
98 sum.cost += request->cost;
99 auto c = static_cast<Completion*>(request.release());
100 Completion::dispatch(std::unique_ptr<Completion>{c},
101 boost::asio::error::operation_aborted,
102 PhaseType::priority);
103 });
104 if (auto c = counters(client)) {
105 on_cancel(c, sum);
106 }
107 schedule(crimson::dmclock::TimeZero);
108 }
109
110 void AsyncScheduler::schedule(const Time& time)
111 {
112 timer.expires_at(Clock::from_double(time));
113 timer.async_wait([this] (boost::system::error_code ec) {
114 // process requests unless the wait was canceled. note that a canceled
115 // wait may execute after this AsyncScheduler destructs
116 if (ec != boost::asio::error::operation_aborted) {
117 process(get_time());
118 }
119 });
120 }
121
122 void AsyncScheduler::process(const Time& now)
123 {
124 // must run in the executor. we should only invoke completion handlers if the
125 // executor is running
126 assert(get_executor().running_in_this_thread());
127
128 ClientSums rsums, psums;
129
130 while (outstanding_requests < max_requests) {
131 auto pull = queue.pull_request(now);
132
133 if (pull.is_none()) {
134 // no pending requests, cancel the timer
135 timer.cancel();
136 break;
137 }
138 if (pull.is_future()) {
139 // update the timer based on the future time
140 schedule(pull.getTime());
141 break;
142 }
143 ++outstanding_requests;
144 if(auto c = counters(client_id::count)){
145 c->inc(throttle_counters::l_outstanding);
146 }
147
148 // complete the request
149 auto& r = pull.get_retn();
150 auto client = r.client;
151 auto phase = r.phase;
152 auto started = r.request->started;
153 auto cost = r.request->cost;
154 auto c = static_cast<Completion*>(r.request.release());
155 Completion::post(std::unique_ptr<Completion>{c},
156 boost::system::error_code{}, phase);
157
158 if (auto c = counters(client)) {
159 auto lat = Clock::from_double(now) - Clock::from_double(started);
160 if (phase == PhaseType::reservation) {
161 inc(rsums, client, cost);
162 c->tinc(queue_counters::l_res_latency, lat);
163 } else {
164 inc(psums, client, cost);
165 c->tinc(queue_counters::l_prio_latency, lat);
166 }
167 }
168 }
169
170 if (outstanding_requests >= max_requests) {
171 if(auto c = counters(client_id::count)){
172 c->inc(throttle_counters::l_throttle);
173 }
174 }
175
176 for (size_t i = 0; i < client_count; i++) {
177 if (auto c = counters(static_cast<client_id>(i))) {
178 on_process(c, rsums[i], psums[i]);
179 }
180 }
181 }
182
183 } // namespace rgw::dmclock