]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
b32b8144 | 4 | #include <atomic> |
7c673cae FG |
5 | #include <condition_variable> |
6 | #include <mutex> | |
7 | #include <thread> | |
8 | #include <vector> | |
9 | ||
10 | #include <boost/asio.hpp> | |
7c673cae | 11 | |
7c673cae | 12 | #include "rgw_asio_client.h" |
b32b8144 | 13 | #include "rgw_asio_frontend.h" |
7c673cae FG |
14 | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
7c673cae FG |
17 | namespace { |
18 | ||
19 | class Pauser { | |
20 | std::mutex mutex; | |
21 | std::condition_variable cond_ready; // signaled on ready==true | |
22 | std::condition_variable cond_paused; // signaled on waiters==thread_count | |
23 | bool ready{false}; | |
24 | int waiters{0}; | |
25 | public: | |
26 | template <typename Func> | |
27 | void pause(int thread_count, Func&& func); | |
28 | void unpause(); | |
29 | void wait(); | |
30 | }; | |
31 | ||
32 | template <typename Func> | |
33 | void Pauser::pause(int thread_count, Func&& func) | |
34 | { | |
35 | std::unique_lock<std::mutex> lock(mutex); | |
36 | ready = false; | |
37 | lock.unlock(); | |
38 | ||
39 | func(); | |
40 | ||
41 | // wait for all threads to pause | |
42 | lock.lock(); | |
43 | cond_paused.wait(lock, [=] { return waiters == thread_count; }); | |
44 | } | |
45 | ||
46 | void Pauser::unpause() | |
47 | { | |
48 | std::lock_guard<std::mutex> lock(mutex); | |
49 | ready = true; | |
50 | cond_ready.notify_all(); | |
51 | } | |
52 | ||
53 | void Pauser::wait() | |
54 | { | |
55 | std::unique_lock<std::mutex> lock(mutex); | |
56 | ++waiters; | |
57 | cond_paused.notify_one(); // notify pause() that we're waiting | |
58 | cond_ready.wait(lock, [this] { return ready; }); // wait for unpause() | |
59 | --waiters; | |
60 | } | |
61 | ||
62 | using tcp = boost::asio::ip::tcp; | |
b32b8144 | 63 | namespace beast = boost::beast; |
7c673cae | 64 | |
b32b8144 FG |
65 | class Connection { |
66 | RGWProcessEnv& env; | |
67 | boost::asio::io_service::strand strand; | |
68 | tcp::socket socket; | |
69 | ||
70 | // references are bound to callbacks for async operations. if a callback | |
71 | // function returns without issuing another operation, the reference is | |
72 | // dropped and the Connection is deleted/closed | |
73 | std::atomic<int> nref{0}; | |
74 | using Ref = boost::intrusive_ptr<Connection>; | |
75 | ||
76 | // limit header to 4k, since we read it all into a single flat_buffer | |
77 | static constexpr size_t header_limit = 4096; | |
78 | // don't impose a limit on the body, since we read it in pieces | |
79 | static constexpr size_t body_limit = std::numeric_limits<size_t>::max(); | |
80 | ||
81 | beast::flat_buffer buffer; | |
82 | boost::optional<rgw::asio::parser_type> parser; | |
83 | ||
84 | using bad_response_type = beast::http::response<beast::http::empty_body>; | |
85 | boost::optional<bad_response_type> response; | |
86 | ||
87 | CephContext* ctx() const { return env.store->ctx(); } | |
7c673cae | 88 | |
b32b8144 FG |
89 | void read_header() { |
90 | // configure the parser | |
91 | parser.emplace(); | |
92 | parser->header_limit(header_limit); | |
93 | parser->body_limit(body_limit); | |
7c673cae | 94 | |
7c673cae | 95 | // parse the header |
b32b8144 FG |
96 | beast::http::async_read_header(socket, buffer, *parser, strand.wrap( |
97 | std::bind(&Connection::on_header, Ref{this}, | |
98 | std::placeholders::_1))); | |
99 | } | |
100 | ||
101 | void discard_unread_message() { | |
102 | if (parser->is_done()) { | |
103 | // nothing left to discard, start reading the next message | |
104 | read_header(); | |
105 | return; | |
106 | } | |
107 | ||
108 | // read the rest of the request into a static buffer. multiple clients could | |
109 | // write at the same time, but this is okay because we never read it back | |
110 | static std::array<char, 1024> discard_buffer; | |
111 | ||
112 | auto& body = parser->get().body(); | |
113 | body.size = discard_buffer.size(); | |
114 | body.data = discard_buffer.data(); | |
7c673cae | 115 | |
b32b8144 FG |
116 | beast::http::async_read_some(socket, buffer, *parser, strand.wrap( |
117 | std::bind(&Connection::on_discard_unread, Ref{this}, | |
118 | std::placeholders::_1))); | |
119 | } | |
120 | ||
121 | void on_discard_unread(boost::system::error_code ec) { | |
122 | if (ec == boost::asio::error::connection_reset) { | |
123 | return; | |
124 | } | |
125 | if (ec) { | |
126 | ldout(ctx(), 5) << "discard_unread_message failed: " | |
127 | << ec.message() << dendl; | |
128 | return; | |
129 | } | |
130 | discard_unread_message(); | |
131 | } | |
132 | ||
133 | void on_write_error(boost::system::error_code ec) { | |
134 | if (ec) { | |
135 | ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl; | |
136 | } | |
137 | } | |
138 | ||
139 | void on_header(boost::system::error_code ec) { | |
7c673cae | 140 | if (ec == boost::asio::error::connection_reset || |
b32b8144 | 141 | ec == beast::http::error::end_of_stream) { |
7c673cae FG |
142 | return; |
143 | } | |
144 | if (ec) { | |
b32b8144 FG |
145 | auto& message = parser->get(); |
146 | ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl; | |
147 | ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl; | |
148 | response.emplace(); | |
149 | response->result(beast::http::status::bad_request); | |
150 | response->version(message.version() == 10 ? 10 : 11); | |
151 | response->prepare_payload(); | |
152 | beast::http::async_write(socket, *response, strand.wrap( | |
153 | std::bind(&Connection::on_write_error, Ref{this}, | |
154 | std::placeholders::_1))); | |
7c673cae FG |
155 | return; |
156 | } | |
157 | ||
158 | // process the request | |
159 | RGWRequest req{env.store->get_new_req_id()}; | |
160 | ||
b32b8144 | 161 | rgw::asio::ClientIO real_client{socket, *parser, buffer}; |
7c673cae FG |
162 | |
163 | auto real_client_io = rgw::io::add_reordering( | |
b32b8144 | 164 | rgw::io::add_buffering(ctx(), |
7c673cae FG |
165 | rgw::io::add_chunking( |
166 | rgw::io::add_conlen_controlling( | |
167 | &real_client)))); | |
b32b8144 | 168 | RGWRestfulIO client(ctx(), &real_client_io); |
7c673cae FG |
169 | process_request(env.store, env.rest, &req, env.uri_prefix, |
170 | *env.auth_registry, &client, env.olog); | |
171 | ||
b32b8144 FG |
172 | if (parser->keep_alive()) { |
173 | // parse any unread bytes from the previous message (in case we replied | |
174 | // before reading the entire body) before reading the next | |
175 | discard_unread_message(); | |
7c673cae FG |
176 | } |
177 | } | |
b32b8144 FG |
178 | |
179 | public: | |
180 | Connection(RGWProcessEnv& env, tcp::socket&& socket) | |
181 | : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {} | |
182 | ||
183 | void on_connect() { | |
184 | read_header(); | |
185 | } | |
186 | ||
187 | void get() { ++nref; } | |
188 | void put() { if (nref.fetch_sub(1) == 1) { delete this; } } | |
189 | ||
190 | friend void intrusive_ptr_add_ref(Connection *c) { c->get(); } | |
191 | friend void intrusive_ptr_release(Connection *c) { c->put(); } | |
192 | }; | |
193 | ||
7c673cae FG |
194 | class AsioFrontend { |
195 | RGWProcessEnv env; | |
94b18763 | 196 | RGWFrontendConfig* conf; |
7c673cae FG |
197 | boost::asio::io_service service; |
198 | ||
28e407b8 AA |
199 | struct Listener { |
200 | tcp::endpoint endpoint; | |
201 | tcp::acceptor acceptor; | |
202 | tcp::socket socket; | |
203 | ||
204 | Listener(boost::asio::io_service& service) | |
205 | : acceptor(service), socket(service) {} | |
206 | }; | |
207 | std::vector<Listener> listeners; | |
7c673cae FG |
208 | |
209 | std::vector<std::thread> threads; | |
210 | Pauser pauser; | |
211 | std::atomic<bool> going_down{false}; | |
212 | ||
213 | CephContext* ctx() const { return env.store->ctx(); } | |
214 | ||
28e407b8 | 215 | void accept(Listener& listener, boost::system::error_code ec); |
7c673cae FG |
216 | |
217 | public: | |
94b18763 | 218 | AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf) |
28e407b8 | 219 | : env(env), conf(conf) {} |
7c673cae FG |
220 | |
221 | int init(); | |
222 | int run(); | |
223 | void stop(); | |
224 | void join(); | |
225 | void pause(); | |
226 | void unpause(RGWRados* store, rgw_auth_registry_ptr_t); | |
227 | }; | |
228 | ||
28e407b8 AA |
229 | unsigned short parse_port(const char *input, boost::system::error_code& ec) |
230 | { | |
231 | char *end = nullptr; | |
232 | auto port = std::strtoul(input, &end, 10); | |
233 | if (port > std::numeric_limits<unsigned short>::max()) { | |
234 | ec.assign(ERANGE, boost::system::system_category()); | |
235 | } else if (port == 0 && end == input) { | |
236 | ec.assign(EINVAL, boost::system::system_category()); | |
237 | } | |
238 | return port; | |
239 | } | |
240 | ||
241 | tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input, | |
242 | boost::system::error_code& ec) | |
7c673cae | 243 | { |
28e407b8 | 244 | tcp::endpoint endpoint; |
7c673cae | 245 | |
28e407b8 AA |
246 | auto colon = input.find(':'); |
247 | if (colon != input.npos) { | |
248 | auto port_str = input.substr(colon + 1); | |
249 | endpoint.port(parse_port(port_str.data(), ec)); | |
250 | } else { | |
251 | endpoint.port(80); | |
252 | } | |
253 | if (!ec) { | |
254 | auto addr = input.substr(0, colon); | |
255 | endpoint.address(boost::asio::ip::make_address(addr, ec)); | |
256 | } | |
257 | return endpoint; | |
258 | } | |
259 | ||
260 | int AsioFrontend::init() | |
261 | { | |
7c673cae | 262 | boost::system::error_code ec; |
28e407b8 | 263 | auto& config = conf->get_config_map(); |
94b18763 | 264 | |
28e407b8 AA |
265 | // parse endpoints |
266 | auto range = config.equal_range("port"); | |
267 | for (auto i = range.first; i != range.second; ++i) { | |
268 | auto port = parse_port(i->second.c_str(), ec); | |
94b18763 | 269 | if (ec) { |
28e407b8 | 270 | lderr(ctx()) << "failed to parse port=" << i->second << dendl; |
94b18763 FG |
271 | return -ec.value(); |
272 | } | |
28e407b8 AA |
273 | listeners.emplace_back(service); |
274 | listeners.back().endpoint.port(port); | |
94b18763 FG |
275 | } |
276 | ||
28e407b8 AA |
277 | range = config.equal_range("endpoint"); |
278 | for (auto i = range.first; i != range.second; ++i) { | |
279 | auto endpoint = parse_endpoint(i->second, ec); | |
280 | if (ec) { | |
281 | lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl; | |
282 | return -ec.value(); | |
283 | } | |
284 | listeners.emplace_back(service); | |
285 | listeners.back().endpoint = endpoint; | |
7c673cae | 286 | } |
28e407b8 AA |
287 | |
288 | // start listeners | |
289 | for (auto& l : listeners) { | |
290 | l.acceptor.open(l.endpoint.protocol(), ec); | |
291 | if (ec) { | |
292 | lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; | |
293 | return -ec.value(); | |
294 | } | |
295 | l.acceptor.set_option(tcp::acceptor::reuse_address(true)); | |
296 | l.acceptor.bind(l.endpoint, ec); | |
297 | if (ec) { | |
298 | lderr(ctx()) << "failed to bind address " << l.endpoint | |
299 | << ": " << ec.message() << dendl; | |
300 | return -ec.value(); | |
301 | } | |
302 | l.acceptor.listen(boost::asio::socket_base::max_connections); | |
303 | l.acceptor.async_accept(l.socket, | |
304 | [this, &l] (boost::system::error_code ec) { | |
305 | accept(l, ec); | |
306 | }); | |
307 | ||
308 | ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl; | |
7c673cae | 309 | } |
7c673cae FG |
310 | return 0; |
311 | } | |
312 | ||
28e407b8 | 313 | void AsioFrontend::accept(Listener& l, boost::system::error_code ec) |
7c673cae | 314 | { |
28e407b8 | 315 | if (!l.acceptor.is_open()) { |
7c673cae FG |
316 | return; |
317 | } else if (ec == boost::asio::error::operation_aborted) { | |
318 | return; | |
319 | } else if (ec) { | |
320 | throw ec; | |
321 | } | |
28e407b8 AA |
322 | auto socket = std::move(l.socket); |
323 | l.acceptor.async_accept(l.socket, | |
324 | [this, &l] (boost::system::error_code ec) { | |
325 | accept(l, ec); | |
326 | }); | |
b32b8144 FG |
327 | |
328 | boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))}; | |
329 | conn->on_connect(); | |
330 | // reference drops here, but on_connect() takes another | |
7c673cae FG |
331 | } |
332 | ||
333 | int AsioFrontend::run() | |
334 | { | |
335 | auto cct = ctx(); | |
336 | const int thread_count = cct->_conf->rgw_thread_pool_size; | |
337 | threads.reserve(thread_count); | |
338 | ||
339 | ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; | |
340 | ||
341 | for (int i = 0; i < thread_count; i++) { | |
342 | threads.emplace_back([=] { | |
343 | for (;;) { | |
344 | service.run(); | |
345 | if (going_down) { | |
346 | break; | |
347 | } | |
348 | pauser.wait(); | |
349 | } | |
350 | }); | |
351 | } | |
352 | return 0; | |
353 | } | |
354 | ||
355 | void AsioFrontend::stop() | |
356 | { | |
357 | ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; | |
358 | ||
359 | going_down = true; | |
360 | ||
361 | boost::system::error_code ec; | |
28e407b8 AA |
362 | // close all listeners |
363 | for (auto& listener : listeners) { | |
364 | listener.acceptor.close(ec); | |
365 | } | |
b32b8144 FG |
366 | |
367 | // unblock the run() threads | |
368 | service.stop(); | |
7c673cae FG |
369 | } |
370 | ||
371 | void AsioFrontend::join() | |
372 | { | |
373 | if (!going_down) { | |
374 | stop(); | |
375 | } | |
376 | ldout(ctx(), 4) << "frontend joining threads..." << dendl; | |
377 | for (auto& thread : threads) { | |
378 | thread.join(); | |
379 | } | |
380 | ldout(ctx(), 4) << "frontend done" << dendl; | |
381 | } | |
382 | ||
383 | void AsioFrontend::pause() | |
384 | { | |
385 | ldout(ctx(), 4) << "frontend pausing threads..." << dendl; | |
386 | pauser.pause(threads.size(), [=] { | |
b32b8144 FG |
387 | // unblock the run() threads |
388 | service.stop(); | |
7c673cae FG |
389 | }); |
390 | ldout(ctx(), 4) << "frontend paused" << dendl; | |
391 | } | |
392 | ||
393 | void AsioFrontend::unpause(RGWRados* const store, | |
394 | rgw_auth_registry_ptr_t auth_registry) | |
395 | { | |
396 | env.store = store; | |
397 | env.auth_registry = std::move(auth_registry); | |
398 | ldout(ctx(), 4) << "frontend unpaused" << dendl; | |
399 | service.reset(); | |
7c673cae FG |
400 | pauser.unpause(); |
401 | } | |
402 | ||
403 | } // anonymous namespace | |
404 | ||
405 | class RGWAsioFrontend::Impl : public AsioFrontend { | |
406 | public: | |
94b18763 | 407 | Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {} |
7c673cae FG |
408 | }; |
409 | ||
94b18763 FG |
410 | RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env, |
411 | RGWFrontendConfig* conf) | |
412 | : impl(new Impl(env, conf)) | |
7c673cae FG |
413 | { |
414 | } | |
415 | ||
416 | RGWAsioFrontend::~RGWAsioFrontend() = default; | |
417 | ||
418 | int RGWAsioFrontend::init() | |
419 | { | |
420 | return impl->init(); | |
421 | } | |
422 | ||
423 | int RGWAsioFrontend::run() | |
424 | { | |
425 | return impl->run(); | |
426 | } | |
427 | ||
428 | void RGWAsioFrontend::stop() | |
429 | { | |
430 | impl->stop(); | |
431 | } | |
432 | ||
433 | void RGWAsioFrontend::join() | |
434 | { | |
435 | impl->join(); | |
436 | } | |
437 | ||
438 | void RGWAsioFrontend::pause_for_new_config() | |
439 | { | |
440 | impl->pause(); | |
441 | } | |
442 | ||
443 | void RGWAsioFrontend::unpause_with_new_config( | |
444 | RGWRados* const store, | |
445 | rgw_auth_registry_ptr_t auth_registry | |
446 | ) { | |
447 | impl->unpause(store, std::move(auth_registry)); | |
448 | } |