]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_dmclock_async_scheduler.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_dmclock_async_scheduler.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/async/completion.h"
5 #include "rgw_dmclock_async_scheduler.h"
6 #include "rgw_dmclock_scheduler.h"
7
8 namespace rgw::dmclock {
9
10 AsyncScheduler::~AsyncScheduler()
11 {
12 cancel();
13 if (observer) {
14 cct->_conf.remove_observer(this);
15 }
16 }
17
18 const char** AsyncScheduler::get_tracked_conf_keys() const
19 {
20 if (observer) {
21 return observer->get_tracked_conf_keys();
22 }
23 static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
24 return keys;
25 }
26
27 void AsyncScheduler::handle_conf_change(const ConfigProxy& conf,
28 const std::set<std::string>& changed)
29 {
30 if (observer) {
31 observer->handle_conf_change(conf, changed);
32 }
33 if (changed.count("rgw_max_concurrent_requests")) {
34 auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
35 max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
36 }
37 queue.update_client_infos();
38 schedule(crimson::dmclock::TimeZero);
39 }
40
41 int AsyncScheduler::schedule_request_impl(const client_id& client,
42 const ReqParams& params,
43 const Time& time, const Cost& cost,
44 optional_yield yield_ctx)
45 {
46 ceph_assert(yield_ctx);
47
48 auto &yield = yield_ctx.get_yield_context();
49 boost::system::error_code ec;
50 async_request(client, params, time, cost, yield[ec]);
51
52 if (ec){
53 if (ec == boost::system::errc::resource_unavailable_try_again)
54 return -EAGAIN;
55 else
56 return -ec.value();
57 }
58
59 return 0;
60 }
61
62 void AsyncScheduler::request_complete()
63 {
64 --outstanding_requests;
65 schedule(crimson::dmclock::TimeZero);
66 }
67
68 void AsyncScheduler::cancel()
69 {
70 ClientSums sums;
71
72 queue.remove_by_req_filter([&] (RequestRef&& request) {
73 inc(sums, request->client, request->cost);
74 auto c = static_cast<Completion*>(request.release());
75 Completion::dispatch(std::unique_ptr<Completion>{c},
76 boost::asio::error::operation_aborted,
77 PhaseType::priority);
78 return true;
79 });
80 timer.cancel();
81
82 for (size_t i = 0; i < client_count; i++) {
83 if (auto c = counters(static_cast<client_id>(i))) {
84 on_cancel(c, sums[i]);
85 }
86 }
87 }
88
89 void AsyncScheduler::cancel(const client_id& client)
90 {
91 ClientSum sum;
92
93 queue.remove_by_client(client, false, [&] (RequestRef&& request) {
94 sum.count++;
95 sum.cost += request->cost;
96 auto c = static_cast<Completion*>(request.release());
97 Completion::dispatch(std::unique_ptr<Completion>{c},
98 boost::asio::error::operation_aborted,
99 PhaseType::priority);
100 });
101 if (auto c = counters(client)) {
102 on_cancel(c, sum);
103 }
104 schedule(crimson::dmclock::TimeZero);
105 }
106
107 void AsyncScheduler::schedule(const Time& time)
108 {
109 timer.expires_at(Clock::from_double(time));
110 timer.async_wait([this] (boost::system::error_code ec) {
111 // process requests unless the wait was canceled. note that a canceled
112 // wait may execute after this AsyncScheduler destructs
113 if (ec != boost::asio::error::operation_aborted) {
114 process(get_time());
115 }
116 });
117 }
118
119 void AsyncScheduler::process(const Time& now)
120 {
121 // must run in the executor. we should only invoke completion handlers if the
122 // executor is running
123 assert(get_executor().running_in_this_thread());
124
125 ClientSums rsums, psums;
126
127 while (outstanding_requests < max_requests) {
128 auto pull = queue.pull_request(now);
129
130 if (pull.is_none()) {
131 // no pending requests, cancel the timer
132 timer.cancel();
133 break;
134 }
135 if (pull.is_future()) {
136 // update the timer based on the future time
137 schedule(pull.getTime());
138 break;
139 }
140 ++outstanding_requests;
141
142 // complete the request
143 auto& r = pull.get_retn();
144 auto client = r.client;
145 auto phase = r.phase;
146 auto started = r.request->started;
147 auto cost = r.request->cost;
148 auto c = static_cast<Completion*>(r.request.release());
149 Completion::post(std::unique_ptr<Completion>{c},
150 boost::system::error_code{}, phase);
151
152 if (auto c = counters(client)) {
153 auto lat = Clock::from_double(now) - Clock::from_double(started);
154 if (phase == PhaseType::reservation) {
155 inc(rsums, client, cost);
156 c->tinc(queue_counters::l_res_latency, lat);
157 } else {
158 inc(psums, client, cost);
159 c->tinc(queue_counters::l_prio_latency, lat);
160 }
161 }
162 }
163
164 if (outstanding_requests >= max_requests) {
165 if(auto c = counters(client_id::count)){
166 c->inc(throttle_counters::l_throttle);
167 }
168 }
169
170 for (size_t i = 0; i < client_count; i++) {
171 if (auto c = counters(static_cast<client_id>(i))) {
172 on_process(c, rsums[i], psums[i]);
173 }
174 }
175 }
176
177 } // namespace rgw::dmclock