1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "common/async/completion.h"
5 #include "rgw_dmclock_async_scheduler.h"
6 #include "rgw_dmclock_scheduler.h"
8 namespace rgw::dmclock
{
10 AsyncScheduler::~AsyncScheduler()
14 cct
->_conf
.remove_observer(this);
18 const char** AsyncScheduler::get_tracked_conf_keys() const
21 return observer
->get_tracked_conf_keys();
23 static const char* keys
[] = { "rgw_max_concurrent_requests", nullptr };
27 void AsyncScheduler::handle_conf_change(const ConfigProxy
& conf
,
28 const std::set
<std::string
>& changed
)
31 observer
->handle_conf_change(conf
, changed
);
33 if (changed
.count("rgw_max_concurrent_requests")) {
34 auto new_max
= conf
.get_val
<int64_t>("rgw_max_concurrent_requests");
35 max_requests
= new_max
> 0 ? new_max
: std::numeric_limits
<int64_t>::max();
37 queue
.update_client_infos();
38 schedule(crimson::dmclock::TimeZero
);
41 int AsyncScheduler::schedule_request_impl(const client_id
& client
,
42 const ReqParams
& params
,
43 const Time
& time
, const Cost
& cost
,
44 optional_yield yield_ctx
)
46 ceph_assert(yield_ctx
);
48 auto &yield
= yield_ctx
.get_yield_context();
49 boost::system::error_code ec
;
50 async_request(client
, params
, time
, cost
, yield
[ec
]);
53 if (ec
== boost::system::errc::resource_unavailable_try_again
)
62 void AsyncScheduler::request_complete()
64 --outstanding_requests
;
65 schedule(crimson::dmclock::TimeZero
);
68 void AsyncScheduler::cancel()
72 queue
.remove_by_req_filter([&] (RequestRef
&& request
) {
73 inc(sums
, request
->client
, request
->cost
);
74 auto c
= static_cast<Completion
*>(request
.release());
75 Completion::dispatch(std::unique_ptr
<Completion
>{c
},
76 boost::asio::error::operation_aborted
,
82 for (size_t i
= 0; i
< client_count
; i
++) {
83 if (auto c
= counters(static_cast<client_id
>(i
))) {
84 on_cancel(c
, sums
[i
]);
89 void AsyncScheduler::cancel(const client_id
& client
)
93 queue
.remove_by_client(client
, false, [&] (RequestRef
&& request
) {
95 sum
.cost
+= request
->cost
;
96 auto c
= static_cast<Completion
*>(request
.release());
97 Completion::dispatch(std::unique_ptr
<Completion
>{c
},
98 boost::asio::error::operation_aborted
,
101 if (auto c
= counters(client
)) {
104 schedule(crimson::dmclock::TimeZero
);
107 void AsyncScheduler::schedule(const Time
& time
)
109 timer
.expires_at(Clock::from_double(time
));
110 timer
.async_wait([this] (boost::system::error_code ec
) {
111 // process requests unless the wait was canceled. note that a canceled
112 // wait may execute after this AsyncScheduler destructs
113 if (ec
!= boost::asio::error::operation_aborted
) {
119 void AsyncScheduler::process(const Time
& now
)
121 // must run in the executor. we should only invoke completion handlers if the
122 // executor is running
123 assert(get_executor().running_in_this_thread());
125 ClientSums rsums
, psums
;
127 while (outstanding_requests
< max_requests
) {
128 auto pull
= queue
.pull_request(now
);
130 if (pull
.is_none()) {
131 // no pending requests, cancel the timer
135 if (pull
.is_future()) {
136 // update the timer based on the future time
137 schedule(pull
.getTime());
140 ++outstanding_requests
;
142 // complete the request
143 auto& r
= pull
.get_retn();
144 auto client
= r
.client
;
145 auto phase
= r
.phase
;
146 auto started
= r
.request
->started
;
147 auto cost
= r
.request
->cost
;
148 auto c
= static_cast<Completion
*>(r
.request
.release());
149 Completion::post(std::unique_ptr
<Completion
>{c
},
150 boost::system::error_code
{}, phase
);
152 if (auto c
= counters(client
)) {
153 auto lat
= Clock::from_double(now
) - Clock::from_double(started
);
154 if (phase
== PhaseType::reservation
) {
155 inc(rsums
, client
, cost
);
156 c
->tinc(queue_counters::l_res_latency
, lat
);
158 inc(psums
, client
, cost
);
159 c
->tinc(queue_counters::l_prio_latency
, lat
);
164 if (outstanding_requests
>= max_requests
) {
165 if(auto c
= counters(client_id::count
)){
166 c
->inc(throttle_counters::l_throttle
);
170 for (size_t i
= 0; i
< client_count
; i
++) {
171 if (auto c
= counters(static_cast<client_id
>(i
))) {
172 on_process(c
, rsums
[i
], psums
[i
]);
177 } // namespace rgw::dmclock