]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
b32b8144 | 4 | #include <atomic> |
7c673cae FG |
5 | #include <condition_variable> |
6 | #include <mutex> | |
7 | #include <thread> | |
8 | #include <vector> | |
9 | ||
10 | #include <boost/asio.hpp> | |
7c673cae | 11 | |
7c673cae | 12 | #include "rgw_asio_client.h" |
b32b8144 | 13 | #include "rgw_asio_frontend.h" |
7c673cae FG |
14 | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
7c673cae FG |
17 | namespace { |
18 | ||
19 | class Pauser { | |
20 | std::mutex mutex; | |
21 | std::condition_variable cond_ready; // signaled on ready==true | |
22 | std::condition_variable cond_paused; // signaled on waiters==thread_count | |
23 | bool ready{false}; | |
24 | int waiters{0}; | |
25 | public: | |
26 | template <typename Func> | |
27 | void pause(int thread_count, Func&& func); | |
28 | void unpause(); | |
29 | void wait(); | |
30 | }; | |
31 | ||
32 | template <typename Func> | |
33 | void Pauser::pause(int thread_count, Func&& func) | |
34 | { | |
35 | std::unique_lock<std::mutex> lock(mutex); | |
36 | ready = false; | |
37 | lock.unlock(); | |
38 | ||
39 | func(); | |
40 | ||
41 | // wait for all threads to pause | |
42 | lock.lock(); | |
43 | cond_paused.wait(lock, [=] { return waiters == thread_count; }); | |
44 | } | |
45 | ||
46 | void Pauser::unpause() | |
47 | { | |
48 | std::lock_guard<std::mutex> lock(mutex); | |
49 | ready = true; | |
50 | cond_ready.notify_all(); | |
51 | } | |
52 | ||
53 | void Pauser::wait() | |
54 | { | |
55 | std::unique_lock<std::mutex> lock(mutex); | |
56 | ++waiters; | |
57 | cond_paused.notify_one(); // notify pause() that we're waiting | |
58 | cond_ready.wait(lock, [this] { return ready; }); // wait for unpause() | |
59 | --waiters; | |
60 | } | |
61 | ||
62 | using tcp = boost::asio::ip::tcp; | |
b32b8144 | 63 | namespace beast = boost::beast; |
7c673cae | 64 | |
b32b8144 FG |
65 | class Connection { |
66 | RGWProcessEnv& env; | |
67 | boost::asio::io_service::strand strand; | |
68 | tcp::socket socket; | |
69 | ||
70 | // references are bound to callbacks for async operations. if a callback | |
71 | // function returns without issuing another operation, the reference is | |
72 | // dropped and the Connection is deleted/closed | |
73 | std::atomic<int> nref{0}; | |
74 | using Ref = boost::intrusive_ptr<Connection>; | |
75 | ||
76 | // limit header to 4k, since we read it all into a single flat_buffer | |
77 | static constexpr size_t header_limit = 4096; | |
78 | // don't impose a limit on the body, since we read it in pieces | |
79 | static constexpr size_t body_limit = std::numeric_limits<size_t>::max(); | |
80 | ||
81 | beast::flat_buffer buffer; | |
82 | boost::optional<rgw::asio::parser_type> parser; | |
83 | ||
84 | using bad_response_type = beast::http::response<beast::http::empty_body>; | |
85 | boost::optional<bad_response_type> response; | |
86 | ||
87 | CephContext* ctx() const { return env.store->ctx(); } | |
7c673cae | 88 | |
b32b8144 FG |
89 | void read_header() { |
90 | // configure the parser | |
91 | parser.emplace(); | |
92 | parser->header_limit(header_limit); | |
93 | parser->body_limit(body_limit); | |
7c673cae | 94 | |
7c673cae | 95 | // parse the header |
b32b8144 FG |
96 | beast::http::async_read_header(socket, buffer, *parser, strand.wrap( |
97 | std::bind(&Connection::on_header, Ref{this}, | |
98 | std::placeholders::_1))); | |
99 | } | |
100 | ||
101 | void discard_unread_message() { | |
102 | if (parser->is_done()) { | |
103 | // nothing left to discard, start reading the next message | |
104 | read_header(); | |
105 | return; | |
106 | } | |
107 | ||
108 | // read the rest of the request into a static buffer. multiple clients could | |
109 | // write at the same time, but this is okay because we never read it back | |
110 | static std::array<char, 1024> discard_buffer; | |
111 | ||
112 | auto& body = parser->get().body(); | |
113 | body.size = discard_buffer.size(); | |
114 | body.data = discard_buffer.data(); | |
7c673cae | 115 | |
b32b8144 FG |
116 | beast::http::async_read_some(socket, buffer, *parser, strand.wrap( |
117 | std::bind(&Connection::on_discard_unread, Ref{this}, | |
118 | std::placeholders::_1))); | |
119 | } | |
120 | ||
121 | void on_discard_unread(boost::system::error_code ec) { | |
122 | if (ec == boost::asio::error::connection_reset) { | |
123 | return; | |
124 | } | |
125 | if (ec) { | |
126 | ldout(ctx(), 5) << "discard_unread_message failed: " | |
127 | << ec.message() << dendl; | |
128 | return; | |
129 | } | |
130 | discard_unread_message(); | |
131 | } | |
132 | ||
133 | void on_write_error(boost::system::error_code ec) { | |
134 | if (ec) { | |
135 | ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl; | |
136 | } | |
137 | } | |
138 | ||
139 | void on_header(boost::system::error_code ec) { | |
7c673cae | 140 | if (ec == boost::asio::error::connection_reset || |
b32b8144 | 141 | ec == beast::http::error::end_of_stream) { |
7c673cae FG |
142 | return; |
143 | } | |
144 | if (ec) { | |
b32b8144 FG |
145 | auto& message = parser->get(); |
146 | ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl; | |
147 | ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl; | |
148 | response.emplace(); | |
149 | response->result(beast::http::status::bad_request); | |
150 | response->version(message.version() == 10 ? 10 : 11); | |
151 | response->prepare_payload(); | |
152 | beast::http::async_write(socket, *response, strand.wrap( | |
153 | std::bind(&Connection::on_write_error, Ref{this}, | |
154 | std::placeholders::_1))); | |
7c673cae FG |
155 | return; |
156 | } | |
157 | ||
158 | // process the request | |
159 | RGWRequest req{env.store->get_new_req_id()}; | |
160 | ||
b32b8144 | 161 | rgw::asio::ClientIO real_client{socket, *parser, buffer}; |
7c673cae FG |
162 | |
163 | auto real_client_io = rgw::io::add_reordering( | |
b32b8144 | 164 | rgw::io::add_buffering(ctx(), |
7c673cae FG |
165 | rgw::io::add_chunking( |
166 | rgw::io::add_conlen_controlling( | |
167 | &real_client)))); | |
b32b8144 | 168 | RGWRestfulIO client(ctx(), &real_client_io); |
7c673cae FG |
169 | process_request(env.store, env.rest, &req, env.uri_prefix, |
170 | *env.auth_registry, &client, env.olog); | |
171 | ||
b32b8144 FG |
172 | if (parser->keep_alive()) { |
173 | // parse any unread bytes from the previous message (in case we replied | |
174 | // before reading the entire body) before reading the next | |
175 | discard_unread_message(); | |
7c673cae FG |
176 | } |
177 | } | |
b32b8144 FG |
178 | |
179 | public: | |
180 | Connection(RGWProcessEnv& env, tcp::socket&& socket) | |
181 | : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {} | |
182 | ||
183 | void on_connect() { | |
184 | read_header(); | |
185 | } | |
186 | ||
187 | void get() { ++nref; } | |
188 | void put() { if (nref.fetch_sub(1) == 1) { delete this; } } | |
189 | ||
190 | friend void intrusive_ptr_add_ref(Connection *c) { c->get(); } | |
191 | friend void intrusive_ptr_release(Connection *c) { c->put(); } | |
192 | }; | |
193 | ||
7c673cae FG |
194 | |
195 | class AsioFrontend { | |
196 | RGWProcessEnv env; | |
197 | boost::asio::io_service service; | |
198 | ||
199 | tcp::acceptor acceptor; | |
200 | tcp::socket peer_socket; | |
201 | ||
202 | std::vector<std::thread> threads; | |
203 | Pauser pauser; | |
204 | std::atomic<bool> going_down{false}; | |
205 | ||
206 | CephContext* ctx() const { return env.store->ctx(); } | |
207 | ||
208 | void accept(boost::system::error_code ec); | |
209 | ||
210 | public: | |
211 | AsioFrontend(const RGWProcessEnv& env) | |
212 | : env(env), acceptor(service), peer_socket(service) {} | |
213 | ||
214 | int init(); | |
215 | int run(); | |
216 | void stop(); | |
217 | void join(); | |
218 | void pause(); | |
219 | void unpause(RGWRados* store, rgw_auth_registry_ptr_t); | |
220 | }; | |
221 | ||
222 | int AsioFrontend::init() | |
223 | { | |
224 | auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)}; | |
225 | ldout(ctx(), 4) << "frontend listening on " << ep << dendl; | |
226 | ||
227 | boost::system::error_code ec; | |
228 | acceptor.open(ep.protocol(), ec); | |
229 | if (ec) { | |
230 | lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; | |
231 | return -ec.value(); | |
232 | } | |
233 | acceptor.set_option(tcp::acceptor::reuse_address(true)); | |
234 | acceptor.bind(ep, ec); | |
235 | if (ec) { | |
236 | lderr(ctx()) << "failed to bind address " << ep << | |
237 | ": " << ec.message() << dendl; | |
238 | return -ec.value(); | |
239 | } | |
240 | acceptor.listen(boost::asio::socket_base::max_connections); | |
241 | acceptor.async_accept(peer_socket, | |
242 | [this] (boost::system::error_code ec) { | |
243 | return accept(ec); | |
244 | }); | |
245 | return 0; | |
246 | } | |
247 | ||
248 | void AsioFrontend::accept(boost::system::error_code ec) | |
249 | { | |
250 | if (!acceptor.is_open()) { | |
251 | return; | |
252 | } else if (ec == boost::asio::error::operation_aborted) { | |
253 | return; | |
254 | } else if (ec) { | |
255 | throw ec; | |
256 | } | |
257 | auto socket = std::move(peer_socket); | |
7c673cae FG |
258 | acceptor.async_accept(peer_socket, |
259 | [this] (boost::system::error_code ec) { | |
260 | return accept(ec); | |
261 | }); | |
b32b8144 FG |
262 | |
263 | boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))}; | |
264 | conn->on_connect(); | |
265 | // reference drops here, but on_connect() takes another | |
7c673cae FG |
266 | } |
267 | ||
268 | int AsioFrontend::run() | |
269 | { | |
270 | auto cct = ctx(); | |
271 | const int thread_count = cct->_conf->rgw_thread_pool_size; | |
272 | threads.reserve(thread_count); | |
273 | ||
274 | ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; | |
275 | ||
276 | for (int i = 0; i < thread_count; i++) { | |
277 | threads.emplace_back([=] { | |
278 | for (;;) { | |
279 | service.run(); | |
280 | if (going_down) { | |
281 | break; | |
282 | } | |
283 | pauser.wait(); | |
284 | } | |
285 | }); | |
286 | } | |
287 | return 0; | |
288 | } | |
289 | ||
290 | void AsioFrontend::stop() | |
291 | { | |
292 | ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; | |
293 | ||
294 | going_down = true; | |
295 | ||
296 | boost::system::error_code ec; | |
b32b8144 FG |
297 | acceptor.close(ec); |
298 | ||
299 | // unblock the run() threads | |
300 | service.stop(); | |
7c673cae FG |
301 | } |
302 | ||
303 | void AsioFrontend::join() | |
304 | { | |
305 | if (!going_down) { | |
306 | stop(); | |
307 | } | |
308 | ldout(ctx(), 4) << "frontend joining threads..." << dendl; | |
309 | for (auto& thread : threads) { | |
310 | thread.join(); | |
311 | } | |
312 | ldout(ctx(), 4) << "frontend done" << dendl; | |
313 | } | |
314 | ||
315 | void AsioFrontend::pause() | |
316 | { | |
317 | ldout(ctx(), 4) << "frontend pausing threads..." << dendl; | |
318 | pauser.pause(threads.size(), [=] { | |
b32b8144 FG |
319 | // unblock the run() threads |
320 | service.stop(); | |
7c673cae FG |
321 | }); |
322 | ldout(ctx(), 4) << "frontend paused" << dendl; | |
323 | } | |
324 | ||
325 | void AsioFrontend::unpause(RGWRados* const store, | |
326 | rgw_auth_registry_ptr_t auth_registry) | |
327 | { | |
328 | env.store = store; | |
329 | env.auth_registry = std::move(auth_registry); | |
330 | ldout(ctx(), 4) << "frontend unpaused" << dendl; | |
331 | service.reset(); | |
7c673cae FG |
332 | pauser.unpause(); |
333 | } | |
334 | ||
335 | } // anonymous namespace | |
336 | ||
337 | class RGWAsioFrontend::Impl : public AsioFrontend { | |
338 | public: | |
339 | Impl(const RGWProcessEnv& env) : AsioFrontend(env) {} | |
340 | }; | |
341 | ||
342 | RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env) | |
343 | : impl(new Impl(env)) | |
344 | { | |
345 | } | |
346 | ||
347 | RGWAsioFrontend::~RGWAsioFrontend() = default; | |
348 | ||
349 | int RGWAsioFrontend::init() | |
350 | { | |
351 | return impl->init(); | |
352 | } | |
353 | ||
354 | int RGWAsioFrontend::run() | |
355 | { | |
356 | return impl->run(); | |
357 | } | |
358 | ||
359 | void RGWAsioFrontend::stop() | |
360 | { | |
361 | impl->stop(); | |
362 | } | |
363 | ||
364 | void RGWAsioFrontend::join() | |
365 | { | |
366 | impl->join(); | |
367 | } | |
368 | ||
369 | void RGWAsioFrontend::pause_for_new_config() | |
370 | { | |
371 | impl->pause(); | |
372 | } | |
373 | ||
374 | void RGWAsioFrontend::unpause_with_new_config( | |
375 | RGWRados* const store, | |
376 | rgw_auth_registry_ptr_t auth_registry | |
377 | ) { | |
378 | impl->unpause(store, std::move(auth_registry)); | |
379 | } |