1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/async/completion.h"
5 #include "rgw_dmclock_async_scheduler.h"
6 #include "rgw_dmclock_scheduler.h"
8 namespace rgw::dmclock
{
10 AsyncScheduler::~AsyncScheduler()
14 cct
->_conf
.remove_observer(this);
18 const char** AsyncScheduler::get_tracked_conf_keys() const
21 return observer
->get_tracked_conf_keys();
23 static const char* keys
[] = { "rgw_max_concurrent_requests", nullptr };
27 void AsyncScheduler::handle_conf_change(const ConfigProxy
& conf
,
28 const std::set
<std::string
>& changed
)
31 observer
->handle_conf_change(conf
, changed
);
33 if (changed
.count("rgw_max_concurrent_requests")) {
34 auto new_max
= conf
.get_val
<int64_t>("rgw_max_concurrent_requests");
35 max_requests
= new_max
> 0 ? new_max
: std::numeric_limits
<int64_t>::max();
37 queue
.update_client_infos();
38 schedule(crimson::dmclock::TimeZero
);
41 int AsyncScheduler::schedule_request_impl(const client_id
& client
,
42 const ReqParams
& params
,
43 const Time
& time
, const Cost
& cost
,
44 optional_yield yield_ctx
)
46 ceph_assert(yield_ctx
);
48 auto &yield
= yield_ctx
.get_yield_context();
49 boost::system::error_code ec
;
50 async_request(client
, params
, time
, cost
, yield
[ec
]);
53 if (ec
== boost::system::errc::resource_unavailable_try_again
)
62 void AsyncScheduler::request_complete()
64 --outstanding_requests
;
65 if(auto c
= counters(client_id::count
)){
66 c
->inc(throttle_counters::l_outstanding
, -1);
68 schedule(crimson::dmclock::TimeZero
);
71 void AsyncScheduler::cancel()
75 queue
.remove_by_req_filter([&] (RequestRef
&& request
) {
76 inc(sums
, request
->client
, request
->cost
);
77 auto c
= static_cast<Completion
*>(request
.release());
78 Completion::dispatch(std::unique_ptr
<Completion
>{c
},
79 boost::asio::error::operation_aborted
,
85 for (size_t i
= 0; i
< client_count
; i
++) {
86 if (auto c
= counters(static_cast<client_id
>(i
))) {
87 on_cancel(c
, sums
[i
]);
92 void AsyncScheduler::cancel(const client_id
& client
)
96 queue
.remove_by_client(client
, false, [&] (RequestRef
&& request
) {
98 sum
.cost
+= request
->cost
;
99 auto c
= static_cast<Completion
*>(request
.release());
100 Completion::dispatch(std::unique_ptr
<Completion
>{c
},
101 boost::asio::error::operation_aborted
,
102 PhaseType::priority
);
104 if (auto c
= counters(client
)) {
107 schedule(crimson::dmclock::TimeZero
);
110 void AsyncScheduler::schedule(const Time
& time
)
112 timer
.expires_at(Clock::from_double(time
));
113 timer
.async_wait([this] (boost::system::error_code ec
) {
114 // process requests unless the wait was canceled. note that a canceled
115 // wait may execute after this AsyncScheduler destructs
116 if (ec
!= boost::asio::error::operation_aborted
) {
122 void AsyncScheduler::process(const Time
& now
)
124 // must run in the executor. we should only invoke completion handlers if the
125 // executor is running
126 assert(get_executor().running_in_this_thread());
128 ClientSums rsums
, psums
;
130 while (outstanding_requests
< max_requests
) {
131 auto pull
= queue
.pull_request(now
);
133 if (pull
.is_none()) {
134 // no pending requests, cancel the timer
138 if (pull
.is_future()) {
139 // update the timer based on the future time
140 schedule(pull
.getTime());
143 ++outstanding_requests
;
144 if(auto c
= counters(client_id::count
)){
145 c
->inc(throttle_counters::l_outstanding
);
148 // complete the request
149 auto& r
= pull
.get_retn();
150 auto client
= r
.client
;
151 auto phase
= r
.phase
;
152 auto started
= r
.request
->started
;
153 auto cost
= r
.request
->cost
;
154 auto c
= static_cast<Completion
*>(r
.request
.release());
155 Completion::post(std::unique_ptr
<Completion
>{c
},
156 boost::system::error_code
{}, phase
);
158 if (auto c
= counters(client
)) {
159 auto lat
= Clock::from_double(now
) - Clock::from_double(started
);
160 if (phase
== PhaseType::reservation
) {
161 inc(rsums
, client
, cost
);
162 c
->tinc(queue_counters::l_res_latency
, lat
);
164 inc(psums
, client
, cost
);
165 c
->tinc(queue_counters::l_prio_latency
, lat
);
170 if (outstanding_requests
>= max_requests
) {
171 if(auto c
= counters(client_id::count
)){
172 c
->inc(throttle_counters::l_throttle
);
176 for (size_t i
= 0; i
< client_count
; i
++) {
177 if (auto c
= counters(static_cast<client_id
>(i
))) {
178 on_process(c
, rsums
[i
], psums
[i
]);
183 } // namespace rgw::dmclock