]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | /* |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2018 Red Hat, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
1e59de90 | 15 | #pragma once |
11fdf7f2 TL |
16 | |
17 | #include "common/async/completion.h" | |
18 | ||
19 | #include <boost/asio.hpp> | |
20 | #include "rgw_dmclock_scheduler.h" | |
21 | #include "rgw_dmclock_scheduler_ctx.h" | |
22 | ||
23 | namespace rgw::dmclock { | |
24 | namespace async = ceph::async; | |
25 | ||
26 | /* | |
27 | * A dmclock request scheduling service for use with boost::asio. | |
28 | * | |
29 | * An asynchronous dmclock priority queue, where scheduled requests complete | |
30 | * on a boost::asio executor. | |
31 | */ | |
32 | class AsyncScheduler : public md_config_obs_t, public Scheduler { | |
33 | public: | |
34 | template <typename ...Args> // args forwarded to PullPriorityQueue ctor | |
35 | AsyncScheduler(CephContext *cct, boost::asio::io_context& context, | |
36 | GetClientCounters&& counters, md_config_obs_t *observer, | |
37 | Args&& ...args); | |
38 | ~AsyncScheduler(); | |
39 | ||
40 | using executor_type = boost::asio::io_context::executor_type; | |
41 | ||
42 | /// return the default executor for async_request() callbacks | |
43 | executor_type get_executor() noexcept { | |
44 | return timer.get_executor(); | |
45 | } | |
46 | ||
47 | /// submit an async request for dmclock scheduling. the given completion | |
48 | /// handler will be invoked with (error_code, PhaseType) when the request | |
49 | /// is ready or canceled. on success, this grants a throttle unit that must | |
50 | /// be returned with a call to request_complete() | |
51 | template <typename CompletionToken> | |
52 | auto async_request(const client_id& client, const ReqParams& params, | |
53 | const Time& time, Cost cost, CompletionToken&& token); | |
54 | ||
55 | /// returns a throttle unit granted by async_request() | |
56 | void request_complete() override; | |
57 | ||
58 | /// cancel all queued requests, invoking their completion handlers with an | |
59 | /// operation_aborted error and default-constructed result | |
60 | void cancel(); | |
61 | ||
62 | /// cancel all queued requests for a given client, invoking their completion | |
63 | /// handler with an operation_aborted error and default-constructed result | |
64 | void cancel(const client_id& client); | |
65 | ||
66 | const char** get_tracked_conf_keys() const override; | |
67 | void handle_conf_change(const ConfigProxy& conf, | |
68 | const std::set<std::string>& changed) override; | |
69 | ||
70 | private: | |
71 | int schedule_request_impl(const client_id& client, const ReqParams& params, | |
72 | const Time& time, const Cost& cost, | |
73 | optional_yield yield_ctx) override; | |
74 | ||
75 | static constexpr bool IsDelayed = false; | |
76 | using Queue = crimson::dmclock::PullPriorityQueue<client_id, Request, IsDelayed>; | |
77 | using RequestRef = typename Queue::RequestRef; | |
78 | Queue queue; //< dmclock priority queue | |
79 | ||
80 | using Signature = void(boost::system::error_code, PhaseType); | |
81 | using Completion = async::Completion<Signature, async::AsBase<Request>>; | |
82 | ||
83 | using Clock = ceph::coarse_real_clock; | |
92f5a8d4 TL |
84 | using Timer = boost::asio::basic_waitable_timer<Clock, |
85 | boost::asio::wait_traits<Clock>, executor_type>; | |
11fdf7f2 TL |
86 | Timer timer; //< timer for the next scheduled request |
87 | ||
88 | CephContext *const cct; | |
89 | md_config_obs_t *const observer; //< observer to update ClientInfoFunc | |
90 | GetClientCounters counters; //< provides per-client perf counters | |
91 | ||
92 | /// max request throttle | |
93 | std::atomic<int64_t> max_requests; | |
94 | std::atomic<int64_t> outstanding_requests = 0; | |
95 | ||
96 | /// set a timer to process the next request | |
97 | void schedule(const Time& time); | |
98 | ||
99 | /// process ready requests, then schedule the next pending request | |
100 | void process(const Time& now); | |
101 | }; | |
102 | ||
103 | ||
104 | template <typename ...Args> | |
105 | AsyncScheduler::AsyncScheduler(CephContext *cct, boost::asio::io_context& context, | |
106 | GetClientCounters&& counters, | |
107 | md_config_obs_t *observer, Args&& ...args) | |
108 | : queue(std::forward<Args>(args)...), | |
109 | timer(context), cct(cct), observer(observer), | |
110 | counters(std::move(counters)), | |
111 | max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests")) | |
112 | { | |
113 | if (max_requests <= 0) { | |
114 | max_requests = std::numeric_limits<int64_t>::max(); | |
115 | } | |
116 | if (observer) { | |
117 | cct->_conf.add_observer(this); | |
118 | } | |
119 | } | |
120 | ||
121 | template <typename CompletionToken> | |
122 | auto AsyncScheduler::async_request(const client_id& client, | |
123 | const ReqParams& params, | |
124 | const Time& time, Cost cost, | |
125 | CompletionToken&& token) | |
126 | { | |
127 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
128 | ||
129 | auto ex1 = get_executor(); | |
130 | auto& handler = init.completion_handler; | |
131 | ||
132 | // allocate the Request and add it to the queue | |
133 | auto completion = Completion::create(ex1, std::move(handler), | |
134 | Request{client, time, cost}); | |
135 | // cast to unique_ptr<Request> | |
136 | auto req = RequestRef{std::move(completion)}; | |
137 | int r = queue.add_request(std::move(req), client, params, time, cost); | |
138 | if (r == 0) { | |
139 | // schedule an immediate call to process() on the executor | |
140 | schedule(crimson::dmclock::TimeZero); | |
141 | if (auto c = counters(client)) { | |
142 | c->inc(queue_counters::l_qlen); | |
143 | c->inc(queue_counters::l_cost, cost); | |
144 | } | |
145 | } else { | |
146 | // post the error code | |
147 | boost::system::error_code ec(r, boost::system::system_category()); | |
148 | // cast back to Completion | |
149 | auto completion = static_cast<Completion*>(req.release()); | |
150 | async::post(std::unique_ptr<Completion>{completion}, | |
151 | ec, PhaseType::priority); | |
152 | if (auto c = counters(client)) { | |
153 | c->inc(queue_counters::l_limit); | |
154 | c->inc(queue_counters::l_limit_cost, cost); | |
155 | } | |
156 | } | |
157 | ||
158 | return init.result.get(); | |
159 | } | |
160 | ||
161 | class SimpleThrottler : public md_config_obs_t, public dmclock::Scheduler { | |
162 | public: | |
163 | SimpleThrottler(CephContext *cct) : | |
164 | max_requests(cct->_conf.get_val<int64_t>("rgw_max_concurrent_requests")), | |
165 | counters(cct, "simple-throttler") | |
166 | { | |
167 | if (max_requests <= 0) { | |
168 | max_requests = std::numeric_limits<int64_t>::max(); | |
169 | } | |
170 | cct->_conf.add_observer(this); | |
171 | } | |
172 | ||
173 | const char** get_tracked_conf_keys() const override { | |
174 | static const char* keys[] = { "rgw_max_concurrent_requests", nullptr }; | |
175 | return keys; | |
176 | } | |
177 | ||
178 | void handle_conf_change(const ConfigProxy& conf, | |
179 | const std::set<std::string>& changed) override | |
180 | { | |
181 | if (changed.count("rgw_max_concurrent_requests")) { | |
182 | auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests"); | |
183 | max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max(); | |
184 | } | |
185 | } | |
186 | ||
187 | void request_complete() override { | |
188 | --outstanding_requests; | |
f67539c2 TL |
189 | if (auto c = counters(); |
190 | c != nullptr) { | |
191 | c->inc(throttle_counters::l_outstanding, -1); | |
192 | } | |
193 | ||
11fdf7f2 TL |
194 | } |
195 | ||
196 | private: | |
197 | int schedule_request_impl(const client_id&, const ReqParams&, | |
198 | const Time&, const Cost&, | |
199 | optional_yield) override { | |
200 | if (outstanding_requests++ >= max_requests) { | |
201 | if (auto c = counters(); | |
202 | c != nullptr) { | |
f67539c2 | 203 | c->inc(throttle_counters::l_outstanding); |
11fdf7f2 TL |
204 | c->inc(throttle_counters::l_throttle); |
205 | } | |
206 | return -EAGAIN; | |
207 | } | |
208 | ||
209 | return 0 ; | |
210 | } | |
211 | ||
212 | std::atomic<int64_t> max_requests; | |
213 | std::atomic<int64_t> outstanding_requests = 0; | |
214 | ThrottleCounters counters; | |
215 | }; | |
216 | ||
217 | } // namespace rgw::dmclock |