2 #include "common/async/completion.h"
3 #include "rgw_dmclock_async_scheduler.h"
4 #include "rgw_dmclock_scheduler.h"
6 namespace rgw::dmclock
{
8 AsyncScheduler::~AsyncScheduler()
12 cct
->_conf
.remove_observer(this);
16 const char** AsyncScheduler::get_tracked_conf_keys() const
19 return observer
->get_tracked_conf_keys();
21 static const char* keys
[] = { "rgw_max_concurrent_requests", nullptr };
25 void AsyncScheduler::handle_conf_change(const ConfigProxy
& conf
,
26 const std::set
<std::string
>& changed
)
29 observer
->handle_conf_change(conf
, changed
);
31 if (changed
.count("rgw_max_concurrent_requests")) {
32 auto new_max
= conf
.get_val
<int64_t>("rgw_max_concurrent_requests");
33 max_requests
= new_max
> 0 ? new_max
: std::numeric_limits
<int64_t>::max();
35 queue
.update_client_infos();
36 schedule(crimson::dmclock::TimeZero
);
39 int AsyncScheduler::schedule_request_impl(const client_id
& client
,
40 const ReqParams
& params
,
41 const Time
& time
, const Cost
& cost
,
42 optional_yield yield_ctx
)
44 ceph_assert(yield_ctx
);
46 auto &yield
= yield_ctx
.get_yield_context();
47 boost::system::error_code ec
;
48 async_request(client
, params
, time
, cost
, yield
[ec
]);
51 if (ec
== boost::system::errc::resource_unavailable_try_again
)
60 void AsyncScheduler::request_complete()
62 --outstanding_requests
;
63 schedule(crimson::dmclock::TimeZero
);
66 void AsyncScheduler::cancel()
70 queue
.remove_by_req_filter([&] (RequestRef
&& request
) {
71 inc(sums
, request
->client
, request
->cost
);
72 auto c
= static_cast<Completion
*>(request
.release());
73 Completion::dispatch(std::unique_ptr
<Completion
>{c
},
74 boost::asio::error::operation_aborted
,
80 for (size_t i
= 0; i
< client_count
; i
++) {
81 if (auto c
= counters(static_cast<client_id
>(i
))) {
82 on_cancel(c
, sums
[i
]);
87 void AsyncScheduler::cancel(const client_id
& client
)
91 queue
.remove_by_client(client
, false, [&] (RequestRef
&& request
) {
93 sum
.cost
+= request
->cost
;
94 auto c
= static_cast<Completion
*>(request
.release());
95 Completion::dispatch(std::unique_ptr
<Completion
>{c
},
96 boost::asio::error::operation_aborted
,
99 if (auto c
= counters(client
)) {
102 schedule(crimson::dmclock::TimeZero
);
105 void AsyncScheduler::schedule(const Time
& time
)
107 timer
.expires_at(Clock::from_double(time
));
108 timer
.async_wait([this] (boost::system::error_code ec
) {
109 // process requests unless the wait was canceled. note that a canceled
110 // wait may execute after this AsyncScheduler destructs
111 if (ec
!= boost::asio::error::operation_aborted
) {
117 void AsyncScheduler::process(const Time
& now
)
119 // must run in the executor. we should only invoke completion handlers if the
120 // executor is running
121 assert(get_executor().running_in_this_thread());
123 ClientSums rsums
, psums
;
125 while (outstanding_requests
< max_requests
) {
126 auto pull
= queue
.pull_request(now
);
128 if (pull
.is_none()) {
129 // no pending requests, cancel the timer
133 if (pull
.is_future()) {
134 // update the timer based on the future time
135 schedule(pull
.getTime());
138 ++outstanding_requests
;
140 // complete the request
141 auto& r
= pull
.get_retn();
142 auto client
= r
.client
;
143 auto phase
= r
.phase
;
144 auto started
= r
.request
->started
;
145 auto cost
= r
.request
->cost
;
146 auto c
= static_cast<Completion
*>(r
.request
.release());
147 Completion::post(std::unique_ptr
<Completion
>{c
},
148 boost::system::error_code
{}, phase
);
150 if (auto c
= counters(client
)) {
151 auto lat
= Clock::from_double(now
) - Clock::from_double(started
);
152 if (phase
== PhaseType::reservation
) {
153 inc(rsums
, client
, cost
);
154 c
->tinc(queue_counters::l_res_latency
, lat
);
156 inc(psums
, client
, cost
);
157 c
->tinc(queue_counters::l_prio_latency
, lat
);
162 if (outstanding_requests
>= max_requests
) {
163 if(auto c
= counters(client_id::count
)){
164 c
->inc(throttle_counters::l_throttle
);
168 for (size_t i
= 0; i
< client_count
; i
++) {
169 if (auto c
= counters(static_cast<client_id
>(i
))) {
170 on_process(c
, rsums
[i
], psums
[i
]);
175 } // namespace rgw::dmclock