]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | /* |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2018 Red Hat, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef RGW_DMCLOCK_ASYNC_SCHEDULER_H | |
16 | #define RGW_DMCLOCK_ASYNC_SCHEDULER_H | |
17 | ||
18 | #include "common/async/completion.h" | |
19 | ||
20 | #include <boost/asio.hpp> | |
21 | #include "rgw_dmclock_scheduler.h" | |
22 | #include "rgw_dmclock_scheduler_ctx.h" | |
23 | ||
24 | namespace rgw::dmclock { | |
25 | namespace async = ceph::async; | |
26 | ||
27 | /* | |
28 | * A dmclock request scheduling service for use with boost::asio. | |
29 | * | |
30 | * An asynchronous dmclock priority queue, where scheduled requests complete | |
31 | * on a boost::asio executor. | |
32 | */ | |
33 | class AsyncScheduler : public md_config_obs_t, public Scheduler { | |
34 | public: | |
35 | template <typename ...Args> // args forwarded to PullPriorityQueue ctor | |
36 | AsyncScheduler(CephContext *cct, boost::asio::io_context& context, | |
37 | GetClientCounters&& counters, md_config_obs_t *observer, | |
38 | Args&& ...args); | |
39 | ~AsyncScheduler(); | |
40 | ||
41 | using executor_type = boost::asio::io_context::executor_type; | |
42 | ||
43 | /// return the default executor for async_request() callbacks | |
44 | executor_type get_executor() noexcept { | |
45 | return timer.get_executor(); | |
46 | } | |
47 | ||
48 | /// submit an async request for dmclock scheduling. the given completion | |
49 | /// handler will be invoked with (error_code, PhaseType) when the request | |
50 | /// is ready or canceled. on success, this grants a throttle unit that must | |
51 | /// be returned with a call to request_complete() | |
52 | template <typename CompletionToken> | |
53 | auto async_request(const client_id& client, const ReqParams& params, | |
54 | const Time& time, Cost cost, CompletionToken&& token); | |
55 | ||
56 | /// returns a throttle unit granted by async_request() | |
57 | void request_complete() override; | |
58 | ||
59 | /// cancel all queued requests, invoking their completion handlers with an | |
60 | /// operation_aborted error and default-constructed result | |
61 | void cancel(); | |
62 | ||
63 | /// cancel all queued requests for a given client, invoking their completion | |
64 | /// handler with an operation_aborted error and default-constructed result | |
65 | void cancel(const client_id& client); | |
66 | ||
67 | const char** get_tracked_conf_keys() const override; | |
68 | void handle_conf_change(const ConfigProxy& conf, | |
69 | const std::set<std::string>& changed) override; | |
70 | ||
71 | private: | |
72 | int schedule_request_impl(const client_id& client, const ReqParams& params, | |
73 | const Time& time, const Cost& cost, | |
74 | optional_yield yield_ctx) override; | |
75 | ||
76 | static constexpr bool IsDelayed = false; | |
77 | using Queue = crimson::dmclock::PullPriorityQueue<client_id, Request, IsDelayed>; | |
78 | using RequestRef = typename Queue::RequestRef; | |
79 | Queue queue; //< dmclock priority queue | |
80 | ||
81 | using Signature = void(boost::system::error_code, PhaseType); | |
82 | using Completion = async::Completion<Signature, async::AsBase<Request>>; | |
83 | ||
84 | using Clock = ceph::coarse_real_clock; | |
92f5a8d4 | 85 | #if BOOST_VERSION < 107000 |
11fdf7f2 | 86 | using Timer = boost::asio::basic_waitable_timer<Clock>; |
92f5a8d4 TL |
87 | #else |
88 | using Timer = boost::asio::basic_waitable_timer<Clock, | |
89 | boost::asio::wait_traits<Clock>, executor_type>; | |
90 | #endif | |
11fdf7f2 TL |
91 | Timer timer; //< timer for the next scheduled request |
92 | ||
93 | CephContext *const cct; | |
94 | md_config_obs_t *const observer; //< observer to update ClientInfoFunc | |
95 | GetClientCounters counters; //< provides per-client perf counters | |
96 | ||
97 | /// max request throttle | |
98 | std::atomic<int64_t> max_requests; | |
99 | std::atomic<int64_t> outstanding_requests = 0; | |
100 | ||
101 | /// set a timer to process the next request | |
102 | void schedule(const Time& time); | |
103 | ||
104 | /// process ready requests, then schedule the next pending request | |
105 | void process(const Time& now); | |
106 | }; | |
107 | ||
108 | ||
109 | template <typename ...Args> | |
110 | AsyncScheduler::AsyncScheduler(CephContext *cct, boost::asio::io_context& context, | |
111 | GetClientCounters&& counters, | |
112 | md_config_obs_t *observer, Args&& ...args) | |
113 | : queue(std::forward<Args>(args)...), | |
114 | timer(context), cct(cct), observer(observer), | |
115 | counters(std::move(counters)), | |
116 | max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests")) | |
117 | { | |
118 | if (max_requests <= 0) { | |
119 | max_requests = std::numeric_limits<int64_t>::max(); | |
120 | } | |
121 | if (observer) { | |
122 | cct->_conf.add_observer(this); | |
123 | } | |
124 | } | |
125 | ||
126 | template <typename CompletionToken> | |
127 | auto AsyncScheduler::async_request(const client_id& client, | |
128 | const ReqParams& params, | |
129 | const Time& time, Cost cost, | |
130 | CompletionToken&& token) | |
131 | { | |
132 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
133 | ||
134 | auto ex1 = get_executor(); | |
135 | auto& handler = init.completion_handler; | |
136 | ||
137 | // allocate the Request and add it to the queue | |
138 | auto completion = Completion::create(ex1, std::move(handler), | |
139 | Request{client, time, cost}); | |
140 | // cast to unique_ptr<Request> | |
141 | auto req = RequestRef{std::move(completion)}; | |
142 | int r = queue.add_request(std::move(req), client, params, time, cost); | |
143 | if (r == 0) { | |
144 | // schedule an immediate call to process() on the executor | |
145 | schedule(crimson::dmclock::TimeZero); | |
146 | if (auto c = counters(client)) { | |
147 | c->inc(queue_counters::l_qlen); | |
148 | c->inc(queue_counters::l_cost, cost); | |
149 | } | |
150 | } else { | |
151 | // post the error code | |
152 | boost::system::error_code ec(r, boost::system::system_category()); | |
153 | // cast back to Completion | |
154 | auto completion = static_cast<Completion*>(req.release()); | |
155 | async::post(std::unique_ptr<Completion>{completion}, | |
156 | ec, PhaseType::priority); | |
157 | if (auto c = counters(client)) { | |
158 | c->inc(queue_counters::l_limit); | |
159 | c->inc(queue_counters::l_limit_cost, cost); | |
160 | } | |
161 | } | |
162 | ||
163 | return init.result.get(); | |
164 | } | |
165 | ||
166 | class SimpleThrottler : public md_config_obs_t, public dmclock::Scheduler { | |
167 | public: | |
168 | SimpleThrottler(CephContext *cct) : | |
169 | max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests")), | |
170 | counters(cct, "simple-throttler") | |
171 | { | |
172 | if (max_requests <= 0) { | |
173 | max_requests = std::numeric_limits<int64_t>::max(); | |
174 | } | |
175 | cct->_conf.add_observer(this); | |
176 | } | |
177 | ||
178 | const char** get_tracked_conf_keys() const override { | |
179 | static const char* keys[] = { "rgw_max_concurrent_requests", nullptr }; | |
180 | return keys; | |
181 | } | |
182 | ||
183 | void handle_conf_change(const ConfigProxy& conf, | |
184 | const std::set<std::string>& changed) override | |
185 | { | |
186 | if (changed.count("rgw_max_concurrent_requests")) { | |
187 | auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests"); | |
188 | max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max(); | |
189 | } | |
190 | } | |
191 | ||
192 | void request_complete() override { | |
193 | --outstanding_requests; | |
194 | } | |
195 | ||
196 | private: | |
197 | int schedule_request_impl(const client_id&, const ReqParams&, | |
198 | const Time&, const Cost&, | |
199 | optional_yield) override { | |
200 | if (outstanding_requests++ >= max_requests) { | |
201 | if (auto c = counters(); | |
202 | c != nullptr) { | |
203 | c->inc(throttle_counters::l_throttle); | |
204 | } | |
205 | return -EAGAIN; | |
206 | } | |
207 | ||
208 | return 0 ; | |
209 | } | |
210 | ||
211 | std::atomic<int64_t> max_requests; | |
212 | std::atomic<int64_t> outstanding_requests = 0; | |
213 | ThrottleCounters counters; | |
214 | }; | |
215 | ||
216 | } // namespace rgw::dmclock | |
217 | #endif /* RGW_DMCLOCK_ASYNC_SCHEDULER_H */ |