]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
b32b8144 | 4 | #include <atomic> |
7c673cae FG |
5 | #include <thread> |
6 | #include <vector> | |
7 | ||
8 | #include <boost/asio.hpp> | |
11fdf7f2 | 9 | #define BOOST_COROUTINES_NO_DEPRECATION_WARNING |
f64942e4 | 10 | #include <boost/asio/spawn.hpp> |
11fdf7f2 | 11 | #include <boost/intrusive/list.hpp> |
7c673cae | 12 | |
11fdf7f2 | 13 | #include "common/async/shared_mutex.h" |
91327a77 AA |
14 | #include "common/errno.h" |
15 | ||
7c673cae | 16 | #include "rgw_asio_client.h" |
b32b8144 | 17 | #include "rgw_asio_frontend.h" |
7c673cae | 18 | |
f64942e4 AA |
19 | #ifdef WITH_RADOSGW_BEAST_OPENSSL |
20 | #include <boost/asio/ssl.hpp> | |
21 | #endif | |
22 | ||
11fdf7f2 TL |
23 | #include "rgw_dmclock_async_scheduler.h" |
24 | ||
7c673cae FG |
25 | #define dout_subsys ceph_subsys_rgw |
26 | ||
7c673cae FG |
27 | namespace { |
28 | ||
7c673cae | 29 | using tcp = boost::asio::ip::tcp; |
11fdf7f2 | 30 | namespace http = boost::beast::http; |
f64942e4 AA |
31 | #ifdef WITH_RADOSGW_BEAST_OPENSSL |
32 | namespace ssl = boost::asio::ssl; | |
33 | #endif | |
34 | ||
35 | template <typename Stream> | |
36 | class StreamIO : public rgw::asio::ClientIO { | |
11fdf7f2 | 37 | CephContext* const cct; |
f64942e4 | 38 | Stream& stream; |
11fdf7f2 | 39 | boost::beast::flat_buffer& buffer; |
f64942e4 | 40 | public: |
11fdf7f2 TL |
41 | StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser, |
42 | boost::beast::flat_buffer& buffer, bool is_ssl, | |
f64942e4 AA |
43 | const tcp::endpoint& local_endpoint, |
44 | const tcp::endpoint& remote_endpoint) | |
45 | : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint), | |
11fdf7f2 | 46 | cct(cct), stream(stream), buffer(buffer) |
f64942e4 AA |
47 | {} |
48 | ||
49 | size_t write_data(const char* buf, size_t len) override { | |
50 | boost::system::error_code ec; | |
51 | auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec); | |
52 | if (ec) { | |
11fdf7f2 | 53 | ldout(cct, 4) << "write_data failed: " << ec.message() << dendl; |
f64942e4 AA |
54 | throw rgw::io::Exception(ec.value(), std::system_category()); |
55 | } | |
56 | return bytes; | |
57 | } | |
7c673cae | 58 | |
f64942e4 AA |
59 | size_t recv_body(char* buf, size_t max) override { |
60 | auto& message = parser.get(); | |
61 | auto& body_remaining = message.body(); | |
62 | body_remaining.data = buf; | |
63 | body_remaining.size = max; | |
64 | ||
65 | while (body_remaining.size && !parser.is_done()) { | |
66 | boost::system::error_code ec; | |
11fdf7f2 TL |
67 | http::read_some(stream, buffer, parser, ec); |
68 | if (ec == http::error::partial_message || | |
69 | ec == http::error::need_buffer) { | |
f64942e4 AA |
70 | break; |
71 | } | |
72 | if (ec) { | |
11fdf7f2 | 73 | ldout(cct, 4) << "failed to read body: " << ec.message() << dendl; |
f64942e4 AA |
74 | throw rgw::io::Exception(ec.value(), std::system_category()); |
75 | } | |
76 | } | |
77 | return max - body_remaining.size; | |
78 | } | |
79 | }; | |
b32b8144 | 80 | |
11fdf7f2 TL |
81 | using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>; |
82 | ||
f64942e4 AA |
83 | template <typename Stream> |
84 | void handle_connection(RGWProcessEnv& env, Stream& stream, | |
11fdf7f2 TL |
85 | boost::beast::flat_buffer& buffer, bool is_ssl, |
86 | SharedMutex& pause_mutex, | |
87 | rgw::dmclock::Scheduler *scheduler, | |
f64942e4 AA |
88 | boost::system::error_code& ec, |
89 | boost::asio::yield_context yield) | |
90 | { | |
b32b8144 FG |
91 | // limit header to 4k, since we read it all into a single flat_buffer |
92 | static constexpr size_t header_limit = 4096; | |
93 | // don't impose a limit on the body, since we read it in pieces | |
94 | static constexpr size_t body_limit = std::numeric_limits<size_t>::max(); | |
95 | ||
f64942e4 | 96 | auto cct = env.store->ctx(); |
b32b8144 | 97 | |
f64942e4 AA |
98 | // read messages from the stream until eof |
99 | for (;;) { | |
b32b8144 | 100 | // configure the parser |
f64942e4 AA |
101 | rgw::asio::parser_type parser; |
102 | parser.header_limit(header_limit); | |
103 | parser.body_limit(body_limit); | |
7c673cae | 104 | |
7c673cae | 105 | // parse the header |
11fdf7f2 | 106 | http::async_read_header(stream, buffer, parser, yield[ec]); |
7c673cae | 107 | if (ec == boost::asio::error::connection_reset || |
11fdf7f2 TL |
108 | ec == boost::asio::error::bad_descriptor || |
109 | ec == boost::asio::error::operation_aborted || | |
f64942e4 AA |
110 | #ifdef WITH_RADOSGW_BEAST_OPENSSL |
111 | ec == ssl::error::stream_truncated || | |
112 | #endif | |
11fdf7f2 TL |
113 | ec == http::error::end_of_stream) { |
114 | ldout(cct, 20) << "failed to read header: " << ec.message() << dendl; | |
7c673cae FG |
115 | return; |
116 | } | |
117 | if (ec) { | |
f64942e4 AA |
118 | ldout(cct, 1) << "failed to read header: " << ec.message() << dendl; |
119 | auto& message = parser.get(); | |
11fdf7f2 TL |
120 | http::response<http::empty_body> response; |
121 | response.result(http::status::bad_request); | |
f64942e4 AA |
122 | response.version(message.version() == 10 ? 10 : 11); |
123 | response.prepare_payload(); | |
11fdf7f2 | 124 | http::async_write(stream, response, yield[ec]); |
f64942e4 AA |
125 | if (ec) { |
126 | ldout(cct, 5) << "failed to write response: " << ec.message() << dendl; | |
127 | } | |
128 | ldout(cct, 1) << "====== req done http_status=400 ======" << dendl; | |
7c673cae FG |
129 | return; |
130 | } | |
131 | ||
11fdf7f2 TL |
132 | { |
133 | auto lock = pause_mutex.async_lock_shared(yield[ec]); | |
134 | if (ec == boost::asio::error::operation_aborted) { | |
135 | return; | |
136 | } else if (ec) { | |
137 | ldout(cct, 1) << "failed to lock: " << ec.message() << dendl; | |
138 | return; | |
139 | } | |
7c673cae | 140 | |
11fdf7f2 TL |
141 | // process the request |
142 | RGWRequest req{env.store->get_new_req_id()}; | |
143 | ||
144 | auto& socket = stream.lowest_layer(); | |
494da23a TL |
145 | const auto& remote_endpoint = socket.remote_endpoint(ec); |
146 | if (ec) { | |
147 | ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl; | |
148 | return; | |
149 | } | |
150 | ||
11fdf7f2 TL |
151 | StreamIO real_client{cct, stream, parser, buffer, is_ssl, |
152 | socket.local_endpoint(), | |
494da23a | 153 | remote_endpoint}; |
11fdf7f2 TL |
154 | |
155 | auto real_client_io = rgw::io::add_reordering( | |
156 | rgw::io::add_buffering(cct, | |
157 | rgw::io::add_chunking( | |
158 | rgw::io::add_conlen_controlling( | |
159 | &real_client)))); | |
160 | RGWRestfulIO client(cct, &real_client_io); | |
161 | auto y = optional_yield{socket.get_io_context(), yield}; | |
162 | process_request(env.store, env.rest, &req, env.uri_prefix, | |
163 | *env.auth_registry, &client, env.olog, y, scheduler); | |
164 | } | |
7c673cae | 165 | |
f64942e4 AA |
166 | if (!parser.keep_alive()) { |
167 | return; | |
7c673cae | 168 | } |
b32b8144 | 169 | |
f64942e4 AA |
170 | // if we failed before reading the entire message, discard any remaining |
171 | // bytes before reading the next | |
172 | while (!parser.is_done()) { | |
173 | static std::array<char, 1024> discard_buffer; | |
174 | ||
175 | auto& body = parser.get().body(); | |
176 | body.size = discard_buffer.size(); | |
177 | body.data = discard_buffer.data(); | |
178 | ||
11fdf7f2 | 179 | http::async_read_some(stream, buffer, parser, yield[ec]); |
f64942e4 AA |
180 | if (ec == boost::asio::error::connection_reset) { |
181 | return; | |
182 | } | |
183 | if (ec) { | |
184 | ldout(cct, 5) << "failed to discard unread message: " | |
185 | << ec.message() << dendl; | |
186 | return; | |
187 | } | |
188 | } | |
b32b8144 | 189 | } |
f64942e4 | 190 | } |
b32b8144 | 191 | |
11fdf7f2 TL |
192 | struct Connection : boost::intrusive::list_base_hook<> { |
193 | tcp::socket& socket; | |
194 | Connection(tcp::socket& socket) : socket(socket) {} | |
195 | }; | |
196 | ||
197 | class ConnectionList { | |
198 | using List = boost::intrusive::list<Connection>; | |
199 | List connections; | |
200 | std::mutex mutex; | |
201 | ||
202 | void remove(Connection& c) { | |
203 | std::lock_guard lock{mutex}; | |
204 | if (c.is_linked()) { | |
205 | connections.erase(List::s_iterator_to(c)); | |
206 | } | |
207 | } | |
208 | public: | |
209 | class Guard { | |
210 | ConnectionList *list; | |
211 | Connection *conn; | |
212 | public: | |
213 | Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {} | |
214 | ~Guard() { list->remove(*conn); } | |
215 | }; | |
216 | [[nodiscard]] Guard add(Connection& conn) { | |
217 | std::lock_guard lock{mutex}; | |
218 | connections.push_back(conn); | |
219 | return Guard{this, &conn}; | |
220 | } | |
221 | void close(boost::system::error_code& ec) { | |
222 | std::lock_guard lock{mutex}; | |
223 | for (auto& conn : connections) { | |
224 | conn.socket.close(ec); | |
225 | } | |
226 | connections.clear(); | |
227 | } | |
228 | }; | |
229 | ||
230 | namespace dmc = rgw::dmclock; | |
7c673cae FG |
231 | class AsioFrontend { |
232 | RGWProcessEnv env; | |
94b18763 | 233 | RGWFrontendConfig* conf; |
11fdf7f2 | 234 | boost::asio::io_context context; |
f64942e4 AA |
235 | #ifdef WITH_RADOSGW_BEAST_OPENSSL |
236 | boost::optional<ssl::context> ssl_context; | |
237 | int init_ssl(); | |
238 | #endif | |
11fdf7f2 TL |
239 | SharedMutex pause_mutex; |
240 | std::unique_ptr<rgw::dmclock::Scheduler> scheduler; | |
7c673cae | 241 | |
28e407b8 AA |
242 | struct Listener { |
243 | tcp::endpoint endpoint; | |
244 | tcp::acceptor acceptor; | |
245 | tcp::socket socket; | |
f64942e4 | 246 | bool use_ssl = false; |
11fdf7f2 | 247 | bool use_nodelay = false; |
28e407b8 | 248 | |
11fdf7f2 TL |
249 | explicit Listener(boost::asio::io_context& context) |
250 | : acceptor(context), socket(context) {} | |
28e407b8 AA |
251 | }; |
252 | std::vector<Listener> listeners; | |
7c673cae | 253 | |
11fdf7f2 TL |
254 | ConnectionList connections; |
255 | ||
256 | // work guard to keep run() threads busy while listeners are paused | |
257 | using Executor = boost::asio::io_context::executor_type; | |
258 | std::optional<boost::asio::executor_work_guard<Executor>> work; | |
259 | ||
7c673cae | 260 | std::vector<std::thread> threads; |
7c673cae FG |
261 | std::atomic<bool> going_down{false}; |
262 | ||
263 | CephContext* ctx() const { return env.store->ctx(); } | |
11fdf7f2 TL |
264 | std::optional<dmc::ClientCounters> client_counters; |
265 | std::unique_ptr<dmc::ClientConfig> client_config; | |
28e407b8 | 266 | void accept(Listener& listener, boost::system::error_code ec); |
7c673cae FG |
267 | |
268 | public: | |
11fdf7f2 TL |
269 | AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf, |
270 | dmc::SchedulerCtx& sched_ctx) | |
271 | : env(env), conf(conf), pause_mutex(context.get_executor()) | |
272 | { | |
273 | auto sched_t = dmc::get_scheduler_t(ctx()); | |
274 | switch(sched_t){ | |
275 | case dmc::scheduler_t::dmclock: | |
276 | scheduler.reset(new dmc::AsyncScheduler(ctx(), | |
277 | context, | |
278 | std::ref(sched_ctx.get_dmc_client_counters()), | |
279 | sched_ctx.get_dmc_client_config(), | |
280 | *sched_ctx.get_dmc_client_config(), | |
281 | dmc::AtLimit::Reject)); | |
282 | break; | |
283 | case dmc::scheduler_t::none: | |
284 | lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl; | |
285 | [[fallthrough]]; | |
286 | case dmc::scheduler_t::throttler: | |
287 | scheduler.reset(new dmc::SimpleThrottler(ctx())); | |
288 | ||
289 | } | |
290 | } | |
7c673cae FG |
291 | |
292 | int init(); | |
293 | int run(); | |
294 | void stop(); | |
295 | void join(); | |
296 | void pause(); | |
297 | void unpause(RGWRados* store, rgw_auth_registry_ptr_t); | |
298 | }; | |
299 | ||
28e407b8 AA |
300 | unsigned short parse_port(const char *input, boost::system::error_code& ec) |
301 | { | |
302 | char *end = nullptr; | |
303 | auto port = std::strtoul(input, &end, 10); | |
304 | if (port > std::numeric_limits<unsigned short>::max()) { | |
305 | ec.assign(ERANGE, boost::system::system_category()); | |
306 | } else if (port == 0 && end == input) { | |
307 | ec.assign(EINVAL, boost::system::system_category()); | |
308 | } | |
309 | return port; | |
310 | } | |
11fdf7f2 TL |
311 | |
312 | tcp::endpoint parse_endpoint(boost::asio::string_view input, | |
f64942e4 | 313 | unsigned short default_port, |
28e407b8 | 314 | boost::system::error_code& ec) |
7c673cae | 315 | { |
28e407b8 | 316 | tcp::endpoint endpoint; |
7c673cae | 317 | |
f64942e4 AA |
318 | if (input.empty()) { |
319 | ec = boost::asio::error::invalid_argument; | |
320 | return endpoint; | |
28e407b8 | 321 | } |
f64942e4 AA |
322 | |
323 | if (input[0] == '[') { // ipv6 | |
324 | const size_t addr_begin = 1; | |
325 | const size_t addr_end = input.find(']'); | |
326 | if (addr_end == input.npos) { // no matching ] | |
327 | ec = boost::asio::error::invalid_argument; | |
328 | return endpoint; | |
329 | } | |
330 | if (addr_end + 1 < input.size()) { | |
331 | // :port must must follow [ipv6] | |
332 | if (input[addr_end + 1] != ':') { | |
333 | ec = boost::asio::error::invalid_argument; | |
334 | return endpoint; | |
335 | } else { | |
336 | auto port_str = input.substr(addr_end + 2); | |
337 | endpoint.port(parse_port(port_str.data(), ec)); | |
338 | } | |
81eedcae TL |
339 | } else { |
340 | endpoint.port(default_port); | |
f64942e4 AA |
341 | } |
342 | auto addr = input.substr(addr_begin, addr_end - addr_begin); | |
343 | endpoint.address(boost::asio::ip::make_address_v6(addr, ec)); | |
344 | } else { // ipv4 | |
345 | auto colon = input.find(':'); | |
346 | if (colon != input.npos) { | |
347 | auto port_str = input.substr(colon + 1); | |
348 | endpoint.port(parse_port(port_str.data(), ec)); | |
349 | if (ec) { | |
350 | return endpoint; | |
351 | } | |
81eedcae TL |
352 | } else { |
353 | endpoint.port(default_port); | |
f64942e4 | 354 | } |
28e407b8 | 355 | auto addr = input.substr(0, colon); |
f64942e4 | 356 | endpoint.address(boost::asio::ip::make_address_v4(addr, ec)); |
28e407b8 AA |
357 | } |
358 | return endpoint; | |
359 | } | |
360 | ||
91327a77 AA |
361 | static int drop_privileges(CephContext *ctx) |
362 | { | |
363 | uid_t uid = ctx->get_set_uid(); | |
364 | gid_t gid = ctx->get_set_gid(); | |
365 | std::string uid_string = ctx->get_set_uid_string(); | |
366 | std::string gid_string = ctx->get_set_gid_string(); | |
367 | if (gid && setgid(gid) != 0) { | |
368 | int err = errno; | |
369 | ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl; | |
370 | return -err; | |
371 | } | |
372 | if (uid && setuid(uid) != 0) { | |
373 | int err = errno; | |
374 | ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl; | |
375 | return -err; | |
376 | } | |
377 | if (uid && gid) { | |
378 | ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid | |
379 | << " (" << uid_string << ":" << gid_string << ")" << dendl; | |
380 | } | |
381 | return 0; | |
382 | } | |
383 | ||
28e407b8 AA |
384 | int AsioFrontend::init() |
385 | { | |
7c673cae | 386 | boost::system::error_code ec; |
28e407b8 | 387 | auto& config = conf->get_config_map(); |
94b18763 | 388 | |
f64942e4 AA |
389 | #ifdef WITH_RADOSGW_BEAST_OPENSSL |
390 | int r = init_ssl(); | |
391 | if (r < 0) { | |
392 | return r; | |
393 | } | |
394 | #endif | |
395 | ||
28e407b8 | 396 | // parse endpoints |
f64942e4 AA |
397 | auto ports = config.equal_range("port"); |
398 | for (auto i = ports.first; i != ports.second; ++i) { | |
28e407b8 | 399 | auto port = parse_port(i->second.c_str(), ec); |
94b18763 | 400 | if (ec) { |
28e407b8 | 401 | lderr(ctx()) << "failed to parse port=" << i->second << dendl; |
94b18763 FG |
402 | return -ec.value(); |
403 | } | |
11fdf7f2 | 404 | listeners.emplace_back(context); |
28e407b8 | 405 | listeners.back().endpoint.port(port); |
81eedcae TL |
406 | |
407 | listeners.emplace_back(context); | |
408 | listeners.back().endpoint = tcp::endpoint(tcp::v6(), port); | |
94b18763 FG |
409 | } |
410 | ||
f64942e4 AA |
411 | auto endpoints = config.equal_range("endpoint"); |
412 | for (auto i = endpoints.first; i != endpoints.second; ++i) { | |
413 | auto endpoint = parse_endpoint(i->second, 80, ec); | |
28e407b8 AA |
414 | if (ec) { |
415 | lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl; | |
416 | return -ec.value(); | |
417 | } | |
11fdf7f2 | 418 | listeners.emplace_back(context); |
28e407b8 | 419 | listeners.back().endpoint = endpoint; |
7c673cae | 420 | } |
11fdf7f2 TL |
421 | // parse tcp nodelay |
422 | auto nodelay = config.find("tcp_nodelay"); | |
423 | if (nodelay != config.end()) { | |
424 | for (auto& l : listeners) { | |
425 | l.use_nodelay = (nodelay->second == "1"); | |
426 | } | |
427 | } | |
428 | ||
81eedcae TL |
429 | |
430 | bool socket_bound = false; | |
28e407b8 AA |
431 | // start listeners |
432 | for (auto& l : listeners) { | |
433 | l.acceptor.open(l.endpoint.protocol(), ec); | |
434 | if (ec) { | |
81eedcae TL |
435 | if (ec == boost::asio::error::address_family_not_supported) { |
436 | ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint | |
437 | << ", " << ec.message() << dendl; | |
438 | continue; | |
439 | } | |
440 | ||
28e407b8 AA |
441 | lderr(ctx()) << "failed to open socket: " << ec.message() << dendl; |
442 | return -ec.value(); | |
443 | } | |
81eedcae TL |
444 | |
445 | if (l.endpoint.protocol() == tcp::v6()) { | |
446 | l.acceptor.set_option(boost::asio::ip::v6_only(true), ec); | |
447 | if (ec) { | |
448 | lderr(ctx()) << "failed to set v6_only socket option: " | |
449 | << ec.message() << dendl; | |
450 | return -ec.value(); | |
451 | } | |
452 | } | |
453 | ||
28e407b8 AA |
454 | l.acceptor.set_option(tcp::acceptor::reuse_address(true)); |
455 | l.acceptor.bind(l.endpoint, ec); | |
456 | if (ec) { | |
457 | lderr(ctx()) << "failed to bind address " << l.endpoint | |
458 | << ": " << ec.message() << dendl; | |
459 | return -ec.value(); | |
460 | } | |
81eedcae | 461 | |
28e407b8 AA |
462 | l.acceptor.listen(boost::asio::socket_base::max_connections); |
463 | l.acceptor.async_accept(l.socket, | |
464 | [this, &l] (boost::system::error_code ec) { | |
465 | accept(l, ec); | |
466 | }); | |
467 | ||
468 | ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl; | |
81eedcae TL |
469 | socket_bound = true; |
470 | } | |
471 | if (!socket_bound) { | |
472 | lderr(ctx()) << "Unable to listen at any endpoints" << dendl; | |
473 | return -EINVAL; | |
7c673cae | 474 | } |
81eedcae | 475 | |
91327a77 | 476 | return drop_privileges(ctx()); |
7c673cae FG |
477 | } |
478 | ||
f64942e4 AA |
479 | #ifdef WITH_RADOSGW_BEAST_OPENSSL |
480 | int AsioFrontend::init_ssl() | |
481 | { | |
482 | boost::system::error_code ec; | |
483 | auto& config = conf->get_config_map(); | |
484 | ||
485 | // ssl configuration | |
486 | auto cert = config.find("ssl_certificate"); | |
487 | const bool have_cert = cert != config.end(); | |
488 | if (have_cert) { | |
489 | // only initialize the ssl context if it's going to be used | |
490 | ssl_context = boost::in_place(ssl::context::tls); | |
491 | } | |
492 | ||
493 | auto key = config.find("ssl_private_key"); | |
494 | const bool have_private_key = key != config.end(); | |
495 | if (have_private_key) { | |
496 | if (!have_cert) { | |
497 | lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl; | |
498 | return -EINVAL; | |
499 | } | |
500 | ssl_context->use_private_key_file(key->second, ssl::context::pem, ec); | |
501 | if (ec) { | |
502 | lderr(ctx()) << "failed to add ssl_private_key=" << key->second | |
503 | << ": " << ec.message() << dendl; | |
504 | return -ec.value(); | |
505 | } | |
506 | } | |
507 | if (have_cert) { | |
508 | ssl_context->use_certificate_chain_file(cert->second, ec); | |
509 | if (ec) { | |
510 | lderr(ctx()) << "failed to use ssl_certificate=" << cert->second | |
511 | << ": " << ec.message() << dendl; | |
512 | return -ec.value(); | |
513 | } | |
514 | if (!have_private_key) { | |
515 | // attempt to use it as a private key if a separate one wasn't provided | |
516 | ssl_context->use_private_key_file(cert->second, ssl::context::pem, ec); | |
517 | if (ec) { | |
518 | lderr(ctx()) << "failed to use ssl_certificate=" << cert->second | |
519 | << " as a private key: " << ec.message() << dendl; | |
520 | return -ec.value(); | |
521 | } | |
522 | } | |
523 | } | |
524 | ||
525 | // parse ssl endpoints | |
526 | auto ports = config.equal_range("ssl_port"); | |
527 | for (auto i = ports.first; i != ports.second; ++i) { | |
528 | if (!have_cert) { | |
529 | lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl; | |
530 | return -EINVAL; | |
531 | } | |
532 | auto port = parse_port(i->second.c_str(), ec); | |
533 | if (ec) { | |
534 | lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl; | |
535 | return -ec.value(); | |
536 | } | |
11fdf7f2 | 537 | listeners.emplace_back(context); |
f64942e4 AA |
538 | listeners.back().endpoint.port(port); |
539 | listeners.back().use_ssl = true; | |
81eedcae TL |
540 | |
541 | listeners.emplace_back(context); | |
542 | listeners.back().endpoint = tcp::endpoint(tcp::v6(), port); | |
543 | listeners.back().use_ssl = true; | |
f64942e4 AA |
544 | } |
545 | ||
546 | auto endpoints = config.equal_range("ssl_endpoint"); | |
547 | for (auto i = endpoints.first; i != endpoints.second; ++i) { | |
548 | if (!have_cert) { | |
549 | lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl; | |
550 | return -EINVAL; | |
551 | } | |
552 | auto endpoint = parse_endpoint(i->second, 443, ec); | |
553 | if (ec) { | |
554 | lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl; | |
555 | return -ec.value(); | |
556 | } | |
11fdf7f2 | 557 | listeners.emplace_back(context); |
f64942e4 AA |
558 | listeners.back().endpoint = endpoint; |
559 | listeners.back().use_ssl = true; | |
560 | } | |
561 | return 0; | |
562 | } | |
563 | #endif // WITH_RADOSGW_BEAST_OPENSSL | |
564 | ||
28e407b8 | 565 | void AsioFrontend::accept(Listener& l, boost::system::error_code ec) |
7c673cae | 566 | { |
28e407b8 | 567 | if (!l.acceptor.is_open()) { |
7c673cae FG |
568 | return; |
569 | } else if (ec == boost::asio::error::operation_aborted) { | |
570 | return; | |
571 | } else if (ec) { | |
572 | throw ec; | |
573 | } | |
28e407b8 | 574 | auto socket = std::move(l.socket); |
11fdf7f2 TL |
575 | tcp::no_delay options(l.use_nodelay); |
576 | socket.set_option(options,ec); | |
28e407b8 AA |
577 | l.acceptor.async_accept(l.socket, |
578 | [this, &l] (boost::system::error_code ec) { | |
579 | accept(l, ec); | |
580 | }); | |
b32b8144 | 581 | |
f64942e4 AA |
582 | // spawn a coroutine to handle the connection |
583 | #ifdef WITH_RADOSGW_BEAST_OPENSSL | |
584 | if (l.use_ssl) { | |
11fdf7f2 TL |
585 | boost::asio::spawn(context, |
586 | [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable { | |
587 | Connection conn{s}; | |
588 | auto c = connections.add(conn); | |
f64942e4 AA |
589 | // wrap the socket in an ssl stream |
590 | ssl::stream<tcp::socket&> stream{s, *ssl_context}; | |
11fdf7f2 | 591 | boost::beast::flat_buffer buffer; |
f64942e4 AA |
592 | // do ssl handshake |
593 | boost::system::error_code ec; | |
594 | auto bytes = stream.async_handshake(ssl::stream_base::server, | |
595 | buffer.data(), yield[ec]); | |
596 | if (ec) { | |
597 | ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl; | |
598 | return; | |
599 | } | |
600 | buffer.consume(bytes); | |
11fdf7f2 TL |
601 | handle_connection(env, stream, buffer, true, pause_mutex, |
602 | scheduler.get(), ec, yield); | |
f64942e4 AA |
603 | if (!ec) { |
604 | // ssl shutdown (ignoring errors) | |
605 | stream.async_shutdown(yield[ec]); | |
606 | } | |
607 | s.shutdown(tcp::socket::shutdown_both, ec); | |
11fdf7f2 | 608 | }); |
f64942e4 AA |
609 | } else { |
610 | #else | |
611 | { | |
612 | #endif // WITH_RADOSGW_BEAST_OPENSSL | |
11fdf7f2 TL |
613 | boost::asio::spawn(context, |
614 | [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable { | |
615 | Connection conn{s}; | |
616 | auto c = connections.add(conn); | |
617 | boost::beast::flat_buffer buffer; | |
f64942e4 | 618 | boost::system::error_code ec; |
11fdf7f2 TL |
619 | handle_connection(env, s, buffer, false, pause_mutex, |
620 | scheduler.get(), ec, yield); | |
f64942e4 | 621 | s.shutdown(tcp::socket::shutdown_both, ec); |
11fdf7f2 | 622 | }); |
f64942e4 | 623 | } |
7c673cae FG |
624 | } |
625 | ||
626 | int AsioFrontend::run() | |
627 | { | |
628 | auto cct = ctx(); | |
629 | const int thread_count = cct->_conf->rgw_thread_pool_size; | |
630 | threads.reserve(thread_count); | |
631 | ||
632 | ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl; | |
633 | ||
11fdf7f2 TL |
634 | // the worker threads call io_context::run(), which will return when there's |
635 | // no work left. hold a work guard to keep these threads going until join() | |
636 | work.emplace(boost::asio::make_work_guard(context)); | |
637 | ||
7c673cae FG |
638 | for (int i = 0; i < thread_count; i++) { |
639 | threads.emplace_back([=] { | |
11fdf7f2 TL |
640 | // request warnings on synchronous librados calls in this thread |
641 | is_asio_thread = true; | |
642 | boost::system::error_code ec; | |
643 | context.run(ec); | |
7c673cae FG |
644 | }); |
645 | } | |
646 | return 0; | |
647 | } | |
648 | ||
649 | void AsioFrontend::stop() | |
650 | { | |
651 | ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl; | |
652 | ||
653 | going_down = true; | |
654 | ||
655 | boost::system::error_code ec; | |
28e407b8 AA |
656 | // close all listeners |
657 | for (auto& listener : listeners) { | |
658 | listener.acceptor.close(ec); | |
659 | } | |
11fdf7f2 TL |
660 | // close all connections |
661 | connections.close(ec); | |
662 | pause_mutex.cancel(); | |
7c673cae FG |
663 | } |
664 | ||
665 | void AsioFrontend::join() | |
666 | { | |
667 | if (!going_down) { | |
668 | stop(); | |
669 | } | |
11fdf7f2 TL |
670 | work.reset(); |
671 | ||
7c673cae FG |
672 | ldout(ctx(), 4) << "frontend joining threads..." << dendl; |
673 | for (auto& thread : threads) { | |
674 | thread.join(); | |
675 | } | |
676 | ldout(ctx(), 4) << "frontend done" << dendl; | |
677 | } | |
678 | ||
679 | void AsioFrontend::pause() | |
680 | { | |
11fdf7f2 TL |
681 | ldout(ctx(), 4) << "frontend pausing connections..." << dendl; |
682 | ||
683 | // cancel pending calls to accept(), but don't close the sockets | |
684 | boost::system::error_code ec; | |
685 | for (auto& l : listeners) { | |
686 | l.acceptor.cancel(ec); | |
687 | } | |
688 | ||
689 | // pause and wait for outstanding requests to complete | |
690 | pause_mutex.lock(ec); | |
691 | ||
692 | if (ec) { | |
693 | ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl; | |
694 | } else { | |
695 | ldout(ctx(), 4) << "frontend paused" << dendl; | |
696 | } | |
7c673cae FG |
697 | } |
698 | ||
699 | void AsioFrontend::unpause(RGWRados* const store, | |
700 | rgw_auth_registry_ptr_t auth_registry) | |
701 | { | |
702 | env.store = store; | |
703 | env.auth_registry = std::move(auth_registry); | |
11fdf7f2 TL |
704 | |
705 | // unpause to unblock connections | |
706 | pause_mutex.unlock(); | |
707 | ||
708 | // start accepting connections again | |
709 | for (auto& l : listeners) { | |
710 | l.acceptor.async_accept(l.socket, | |
711 | [this, &l] (boost::system::error_code ec) { | |
712 | accept(l, ec); | |
713 | }); | |
714 | } | |
715 | ||
7c673cae | 716 | ldout(ctx(), 4) << "frontend unpaused" << dendl; |
7c673cae FG |
717 | } |
718 | ||
719 | } // anonymous namespace | |
720 | ||
721 | class RGWAsioFrontend::Impl : public AsioFrontend { | |
722 | public: | |
11fdf7f2 TL |
723 | Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf, |
724 | rgw::dmclock::SchedulerCtx& sched_ctx) | |
725 | : AsioFrontend(env, conf, sched_ctx) {} | |
7c673cae FG |
726 | }; |
727 | ||
94b18763 | 728 | RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env, |
11fdf7f2 TL |
729 | RGWFrontendConfig* conf, |
730 | rgw::dmclock::SchedulerCtx& sched_ctx) | |
731 | : impl(new Impl(env, conf, sched_ctx)) | |
7c673cae FG |
732 | { |
733 | } | |
734 | ||
735 | RGWAsioFrontend::~RGWAsioFrontend() = default; | |
736 | ||
737 | int RGWAsioFrontend::init() | |
738 | { | |
739 | return impl->init(); | |
740 | } | |
741 | ||
742 | int RGWAsioFrontend::run() | |
743 | { | |
744 | return impl->run(); | |
745 | } | |
746 | ||
747 | void RGWAsioFrontend::stop() | |
748 | { | |
749 | impl->stop(); | |
750 | } | |
751 | ||
752 | void RGWAsioFrontend::join() | |
753 | { | |
754 | impl->join(); | |
755 | } | |
756 | ||
757 | void RGWAsioFrontend::pause_for_new_config() | |
758 | { | |
759 | impl->pause(); | |
760 | } | |
761 | ||
762 | void RGWAsioFrontend::unpause_with_new_config( | |
763 | RGWRados* const store, | |
764 | rgw_auth_registry_ptr_t auth_registry | |
765 | ) { | |
766 | impl->unpause(store, std::move(auth_registry)); | |
767 | } |