]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
b32b8144 | 4 | #include <atomic> |
7c673cae FG |
5 | #include <condition_variable> |
6 | #include <mutex> | |
7 | #include <thread> | |
8 | #include <vector> | |
9 | ||
10 | #include <boost/asio.hpp> | |
7c673cae | 11 | |
91327a77 AA |
12 | #include "common/errno.h" |
13 | ||
7c673cae | 14 | #include "rgw_asio_client.h" |
b32b8144 | 15 | #include "rgw_asio_frontend.h" |
7c673cae FG |
16 | |
17 | #define dout_subsys ceph_subsys_rgw | |
18 | ||
7c673cae FG |
19 | namespace { |
20 | ||
21 | class Pauser { | |
22 | std::mutex mutex; | |
23 | std::condition_variable cond_ready; // signaled on ready==true | |
24 | std::condition_variable cond_paused; // signaled on waiters==thread_count | |
25 | bool ready{false}; | |
26 | int waiters{0}; | |
27 | public: | |
28 | template <typename Func> | |
29 | void pause(int thread_count, Func&& func); | |
30 | void unpause(); | |
31 | void wait(); | |
32 | }; | |
33 | ||
34 | template <typename Func> | |
35 | void Pauser::pause(int thread_count, Func&& func) | |
36 | { | |
37 | std::unique_lock<std::mutex> lock(mutex); | |
38 | ready = false; | |
39 | lock.unlock(); | |
40 | ||
41 | func(); | |
42 | ||
43 | // wait for all threads to pause | |
44 | lock.lock(); | |
45 | cond_paused.wait(lock, [=] { return waiters == thread_count; }); | |
46 | } | |
47 | ||
48 | void Pauser::unpause() | |
49 | { | |
50 | std::lock_guard<std::mutex> lock(mutex); | |
51 | ready = true; | |
52 | cond_ready.notify_all(); | |
53 | } | |
54 | ||
55 | void Pauser::wait() | |
56 | { | |
57 | std::unique_lock<std::mutex> lock(mutex); | |
58 | ++waiters; | |
59 | cond_paused.notify_one(); // notify pause() that we're waiting | |
60 | cond_ready.wait(lock, [this] { return ready; }); // wait for unpause() | |
61 | --waiters; | |
62 | } | |
63 | ||
64 | using tcp = boost::asio::ip::tcp; | |
b32b8144 | 65 | namespace beast = boost::beast; |
7c673cae | 66 | |
b32b8144 FG |
67 | class Connection { |
68 | RGWProcessEnv& env; | |
69 | boost::asio::io_service::strand strand; | |
70 | tcp::socket socket; | |
71 | ||
72 | // references are bound to callbacks for async operations. if a callback | |
73 | // function returns without issuing another operation, the reference is | |
74 | // dropped and the Connection is deleted/closed | |
75 | std::atomic<int> nref{0}; | |
76 | using Ref = boost::intrusive_ptr<Connection>; | |
77 | ||
78 | // limit header to 4k, since we read it all into a single flat_buffer | |
79 | static constexpr size_t header_limit = 4096; | |
80 | // don't impose a limit on the body, since we read it in pieces | |
81 | static constexpr size_t body_limit = std::numeric_limits<size_t>::max(); | |
82 | ||
83 | beast::flat_buffer buffer; | |
84 | boost::optional<rgw::asio::parser_type> parser; | |
85 | ||
86 | using bad_response_type = beast::http::response<beast::http::empty_body>; | |
87 | boost::optional<bad_response_type> response; | |
88 | ||
89 | CephContext* ctx() const { return env.store->ctx(); } | |
7c673cae | 90 | |
b32b8144 FG |
91 | void read_header() { |
92 | // configure the parser | |
93 | parser.emplace(); | |
94 | parser->header_limit(header_limit); | |
95 | parser->body_limit(body_limit); | |
7c673cae | 96 | |
7c673cae | 97 | // parse the header |
b32b8144 FG |
98 | beast::http::async_read_header(socket, buffer, *parser, strand.wrap( |
99 | std::bind(&Connection::on_header, Ref{this}, | |
100 | std::placeholders::_1))); | |
101 | } | |
102 | ||
103 | void discard_unread_message() { | |
104 | if (parser->is_done()) { | |
105 | // nothing left to discard, start reading the next message | |
106 | read_header(); | |
107 | return; | |
108 | } | |
109 | ||
110 | // read the rest of the request into a static buffer. multiple clients could | |
111 | // write at the same time, but this is okay because we never read it back | |
112 | static std::array<char, 1024> discard_buffer; | |
113 | ||
114 | auto& body = parser->get().body(); | |
115 | body.size = discard_buffer.size(); | |
116 | body.data = discard_buffer.data(); | |
7c673cae | 117 | |
b32b8144 FG |
118 | beast::http::async_read_some(socket, buffer, *parser, strand.wrap( |
119 | std::bind(&Connection::on_discard_unread, Ref{this}, | |
120 | std::placeholders::_1))); | |
121 | } | |
122 | ||
123 | void on_discard_unread(boost::system::error_code ec) { | |
124 | if (ec == boost::asio::error::connection_reset) { | |
125 | return; | |
126 | } | |
127 | if (ec) { | |
128 | ldout(ctx(), 5) << "discard_unread_message failed: " | |
129 | << ec.message() << dendl; | |
130 | return; | |
131 | } | |
132 | discard_unread_message(); | |
133 | } | |
134 | ||
135 | void on_write_error(boost::system::error_code ec) { | |
136 | if (ec) { | |
137 | ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl; | |
138 | } | |
139 | } | |
140 | ||
141 | void on_header(boost::system::error_code ec) { | |
7c673cae | 142 | if (ec == boost::asio::error::connection_reset || |
b32b8144 | 143 | ec == beast::http::error::end_of_stream) { |
7c673cae FG |
144 | return; |
145 | } | |
146 | if (ec) { | |
b32b8144 FG |
147 | auto& message = parser->get(); |
148 | ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl; | |
149 | ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl; | |
150 | response.emplace(); | |
151 | response->result(beast::http::status::bad_request); | |
152 | response->version(message.version() == 10 ? 10 : 11); | |
153 | response->prepare_payload(); | |
154 | beast::http::async_write(socket, *response, strand.wrap( | |
155 | std::bind(&Connection::on_write_error, Ref{this}, | |
156 | std::placeholders::_1))); | |
7c673cae FG |
157 | return; |
158 | } | |
159 | ||
160 | // process the request | |
161 | RGWRequest req{env.store->get_new_req_id()}; | |
162 | ||
b32b8144 | 163 | rgw::asio::ClientIO real_client{socket, *parser, buffer}; |
7c673cae FG |
164 | |
165 | auto real_client_io = rgw::io::add_reordering( | |
b32b8144 | 166 | rgw::io::add_buffering(ctx(), |
7c673cae FG |
167 | rgw::io::add_chunking( |
168 | rgw::io::add_conlen_controlling( | |
169 | &real_client)))); | |
b32b8144 | 170 | RGWRestfulIO client(ctx(), &real_client_io); |
7c673cae FG |
171 | process_request(env.store, env.rest, &req, env.uri_prefix, |
172 | *env.auth_registry, &client, env.olog); | |
173 | ||
b32b8144 FG |
174 | if (parser->keep_alive()) { |
175 | // parse any unread bytes from the previous message (in case we replied | |
176 | // before reading the entire body) before reading the next | |
177 | discard_unread_message(); | |
7c673cae FG |
178 | } |
179 | } | |
b32b8144 FG |
180 | |
181 | public: | |
182 | Connection(RGWProcessEnv& env, tcp::socket&& socket) | |
183 | : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {} | |
184 | ||
185 | void on_connect() { | |
186 | read_header(); | |
187 | } | |
188 | ||
189 | void get() { ++nref; } | |
190 | void put() { if (nref.fetch_sub(1) == 1) { delete this; } } | |
191 | ||
192 | friend void intrusive_ptr_add_ref(Connection *c) { c->get(); } | |
193 | friend void intrusive_ptr_release(Connection *c) { c->put(); } | |
194 | }; | |
195 | ||
7c673cae FG |
196 | class AsioFrontend { |
197 | RGWProcessEnv env; | |
94b18763 | 198 | RGWFrontendConfig* conf; |
7c673cae FG |
199 | boost::asio::io_service service; |
200 | ||
28e407b8 AA |
201 | struct Listener { |
202 | tcp::endpoint endpoint; | |
203 | tcp::acceptor acceptor; | |
204 | tcp::socket socket; | |
205 | ||
206 | Listener(boost::asio::io_service& service) | |
207 | : acceptor(service), socket(service) {} | |
208 | }; | |
209 | std::vector<Listener> listeners; | |
7c673cae FG |
210 | |
211 | std::vector<std::thread> threads; | |
212 | Pauser pauser; | |
213 | std::atomic<bool> going_down{false}; | |
214 | ||
215 | CephContext* ctx() const { return env.store->ctx(); } | |
216 | ||
28e407b8 | 217 | void accept(Listener& listener, boost::system::error_code ec); |
7c673cae FG |
218 | |
219 | public: | |
94b18763 | 220 | AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf) |
28e407b8 | 221 | : env(env), conf(conf) {} |
7c673cae FG |
222 | |
223 | int init(); | |
224 | int run(); | |
225 | void stop(); | |
226 | void join(); | |
227 | void pause(); | |
228 | void unpause(RGWRados* store, rgw_auth_registry_ptr_t); | |
229 | }; | |
230 | ||
28e407b8 AA |
231 | unsigned short parse_port(const char *input, boost::system::error_code& ec) |
232 | { | |
233 | char *end = nullptr; | |
234 | auto port = std::strtoul(input, &end, 10); | |
235 | if (port > std::numeric_limits<unsigned short>::max()) { | |
236 | ec.assign(ERANGE, boost::system::system_category()); | |
237 | } else if (port == 0 && end == input) { | |
238 | ec.assign(EINVAL, boost::system::system_category()); | |
239 | } | |
240 | return port; | |
241 | } | |
242 | ||
243 | tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input, | |
244 | boost::system::error_code& ec) | |
7c673cae | 245 | { |
28e407b8 | 246 | tcp::endpoint endpoint; |
7c673cae | 247 | |
28e407b8 AA |
248 | auto colon = input.find(':'); |
249 | if (colon != input.npos) { | |
250 | auto port_str = input.substr(colon + 1); | |
251 | endpoint.port(parse_port(port_str.data(), ec)); | |
252 | } else { | |
253 | endpoint.port(80); | |
254 | } | |
255 | if (!ec) { | |
256 | auto addr = input.substr(0, colon); | |
257 | endpoint.address(boost::asio::ip::make_address(addr, ec)); | |
258 | } | |
259 | return endpoint; | |
260 | } | |
261 | ||
91327a77 AA |
262 | static int drop_privileges(CephContext *ctx) |
263 | { | |
264 | uid_t uid = ctx->get_set_uid(); | |
265 | gid_t gid = ctx->get_set_gid(); | |
266 | std::string uid_string = ctx->get_set_uid_string(); | |
267 | std::string gid_string = ctx->get_set_gid_string(); | |
268 | if (gid && setgid(gid) != 0) { | |
269 | int err = errno; | |
270 | ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl; | |
271 | return -err; | |
272 | } | |
273 | if (uid && setuid(uid) != 0) { | |
274 | int err = errno; | |
275 | ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl; | |
276 | return -err; | |
277 | } | |
278 | if (uid && gid) { | |
279 | ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid | |
280 | << " (" << uid_string << ":" << gid_string << ")" << dendl; | |
281 | } | |
282 | return 0; | |
283 | } | |
284 | ||
28e407b8 AA |
285 | int AsioFrontend::init() |
286 | { | |
7c673cae | 287 | boost::system::error_code ec; |
28e407b8 | 288 | auto& config = conf->get_config_map(); |
94b18763 | 289 | |
28e407b8 AA |
290 | // parse endpoints |
291 | auto range = config.equal_range("port"); | |
292 | for (auto i = range.first; i != range.second; ++i) { | |
293 | auto port = parse_port(i->second.c_str(), ec); | |
94b18763 | 294 | if (ec) { |
28e407b8 | 295 | lderr(ctx()) << "failed to parse port=" << i->second << dendl; |
94b18763 FG |
296 | return -ec.value(); |
297 | } | |
28e407b8 AA |
298 | listeners.emplace_back(service); |
299 | listeners.back().endpoint.port(port); | |
94b18763 FG |
300 | } |
301 | ||
28e407b8 AA |
302 | range = config.equal_range("endpoint"); |
303 | for (auto i = range.first; i != range.second; ++i) { | |
304 | auto endpoint = parse_endpoint(i->second, ec); | |
305 | if (ec) { | |
306 | lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl; | |
307 | return -ec.value(); | |
308 | } | |
309 | listeners.emplace_back(service); | |
310 | listeners.back().endpoint = endpoint; | |
7c673cae | 311 | } |
28e407b8 AA |
312 | |
313 | // start listeners | |
314 | for (auto& l : listeners) { | |
315 | l.acceptor.open(l.endpoint.protocol(), ec); | |
316 | if (ec) { | |
317 | lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; | |
318 | return -ec.value(); | |
319 | } | |
320 | l.acceptor.set_option(tcp::acceptor::reuse_address(true)); | |
321 | l.acceptor.bind(l.endpoint, ec); | |
322 | if (ec) { | |
323 | lderr(ctx()) << "failed to bind address " << l.endpoint | |
324 | << ": " << ec.message() << dendl; | |
325 | return -ec.value(); | |
326 | } | |
327 | l.acceptor.listen(boost::asio::socket_base::max_connections); | |
328 | l.acceptor.async_accept(l.socket, | |
329 | [this, &l] (boost::system::error_code ec) { | |
330 | accept(l, ec); | |
331 | }); | |
332 | ||
333 | ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl; | |
7c673cae | 334 | } |
91327a77 | 335 | return drop_privileges(ctx()); |
7c673cae FG |
336 | } |
337 | ||
28e407b8 | 338 | void AsioFrontend::accept(Listener& l, boost::system::error_code ec) |
7c673cae | 339 | { |
28e407b8 | 340 | if (!l.acceptor.is_open()) { |
7c673cae FG |
341 | return; |
342 | } else if (ec == boost::asio::error::operation_aborted) { | |
343 | return; | |
344 | } else if (ec) { | |
345 | throw ec; | |
346 | } | |
28e407b8 AA |
347 | auto socket = std::move(l.socket); |
348 | l.acceptor.async_accept(l.socket, | |
349 | [this, &l] (boost::system::error_code ec) { | |
350 | accept(l, ec); | |
351 | }); | |
b32b8144 FG |
352 | |
353 | boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))}; | |
354 | conn->on_connect(); | |
355 | // reference drops here, but on_connect() takes another | |
7c673cae FG |
356 | } |
357 | ||
358 | int AsioFrontend::run() | |
359 | { | |
360 | auto cct = ctx(); | |
361 | const int thread_count = cct->_conf->rgw_thread_pool_size; | |
362 | threads.reserve(thread_count); | |
363 | ||
364 | ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; | |
365 | ||
366 | for (int i = 0; i < thread_count; i++) { | |
367 | threads.emplace_back([=] { | |
368 | for (;;) { | |
369 | service.run(); | |
370 | if (going_down) { | |
371 | break; | |
372 | } | |
373 | pauser.wait(); | |
374 | } | |
375 | }); | |
376 | } | |
377 | return 0; | |
378 | } | |
379 | ||
380 | void AsioFrontend::stop() | |
381 | { | |
382 | ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; | |
383 | ||
384 | going_down = true; | |
385 | ||
386 | boost::system::error_code ec; | |
28e407b8 AA |
387 | // close all listeners |
388 | for (auto& listener : listeners) { | |
389 | listener.acceptor.close(ec); | |
390 | } | |
b32b8144 FG |
391 | |
392 | // unblock the run() threads | |
393 | service.stop(); | |
7c673cae FG |
394 | } |
395 | ||
396 | void AsioFrontend::join() | |
397 | { | |
398 | if (!going_down) { | |
399 | stop(); | |
400 | } | |
401 | ldout(ctx(), 4) << "frontend joining threads..." << dendl; | |
402 | for (auto& thread : threads) { | |
403 | thread.join(); | |
404 | } | |
405 | ldout(ctx(), 4) << "frontend done" << dendl; | |
406 | } | |
407 | ||
408 | void AsioFrontend::pause() | |
409 | { | |
410 | ldout(ctx(), 4) << "frontend pausing threads..." << dendl; | |
411 | pauser.pause(threads.size(), [=] { | |
b32b8144 FG |
412 | // unblock the run() threads |
413 | service.stop(); | |
7c673cae FG |
414 | }); |
415 | ldout(ctx(), 4) << "frontend paused" << dendl; | |
416 | } | |
417 | ||
418 | void AsioFrontend::unpause(RGWRados* const store, | |
419 | rgw_auth_registry_ptr_t auth_registry) | |
420 | { | |
421 | env.store = store; | |
422 | env.auth_registry = std::move(auth_registry); | |
423 | ldout(ctx(), 4) << "frontend unpaused" << dendl; | |
424 | service.reset(); | |
7c673cae FG |
425 | pauser.unpause(); |
426 | } | |
427 | ||
428 | } // anonymous namespace | |
429 | ||
430 | class RGWAsioFrontend::Impl : public AsioFrontend { | |
431 | public: | |
94b18763 | 432 | Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {} |
7c673cae FG |
433 | }; |
434 | ||
94b18763 FG |
435 | RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env, |
436 | RGWFrontendConfig* conf) | |
437 | : impl(new Impl(env, conf)) | |
7c673cae FG |
438 | { |
439 | } | |
440 | ||
441 | RGWAsioFrontend::~RGWAsioFrontend() = default; | |
442 | ||
443 | int RGWAsioFrontend::init() | |
444 | { | |
445 | return impl->init(); | |
446 | } | |
447 | ||
448 | int RGWAsioFrontend::run() | |
449 | { | |
450 | return impl->run(); | |
451 | } | |
452 | ||
453 | void RGWAsioFrontend::stop() | |
454 | { | |
455 | impl->stop(); | |
456 | } | |
457 | ||
458 | void RGWAsioFrontend::join() | |
459 | { | |
460 | impl->join(); | |
461 | } | |
462 | ||
463 | void RGWAsioFrontend::pause_for_new_config() | |
464 | { | |
465 | impl->pause(); | |
466 | } | |
467 | ||
468 | void RGWAsioFrontend::unpause_with_new_config( | |
469 | RGWRados* const store, | |
470 | rgw_auth_registry_ptr_t auth_registry | |
471 | ) { | |
472 | impl->unpause(store, std::move(auth_registry)); | |
473 | } |