]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_asio_frontend.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
b32b8144 4#include <atomic>
7c673cae
FG
5#include <thread>
6#include <vector>
7
8#include <boost/asio.hpp>
11fdf7f2 9#define BOOST_COROUTINES_NO_DEPRECATION_WARNING
f64942e4 10#include <boost/asio/spawn.hpp>
11fdf7f2 11#include <boost/intrusive/list.hpp>
7c673cae 12
11fdf7f2 13#include "common/async/shared_mutex.h"
91327a77
AA
14#include "common/errno.h"
15
7c673cae 16#include "rgw_asio_client.h"
b32b8144 17#include "rgw_asio_frontend.h"
7c673cae 18
f64942e4
AA
19#ifdef WITH_RADOSGW_BEAST_OPENSSL
20#include <boost/asio/ssl.hpp>
21#endif
22
11fdf7f2
TL
23#include "rgw_dmclock_async_scheduler.h"
24
7c673cae
FG
25#define dout_subsys ceph_subsys_rgw
26
7c673cae
FG
27namespace {
28
7c673cae 29using tcp = boost::asio::ip::tcp;
11fdf7f2 30namespace http = boost::beast::http;
f64942e4
AA
31#ifdef WITH_RADOSGW_BEAST_OPENSSL
32namespace ssl = boost::asio::ssl;
33#endif
34
35template <typename Stream>
36class StreamIO : public rgw::asio::ClientIO {
11fdf7f2 37 CephContext* const cct;
f64942e4 38 Stream& stream;
11fdf7f2 39 boost::beast::flat_buffer& buffer;
f64942e4 40 public:
11fdf7f2
TL
41 StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
42 boost::beast::flat_buffer& buffer, bool is_ssl,
f64942e4
AA
43 const tcp::endpoint& local_endpoint,
44 const tcp::endpoint& remote_endpoint)
45 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
11fdf7f2 46 cct(cct), stream(stream), buffer(buffer)
f64942e4
AA
47 {}
48
49 size_t write_data(const char* buf, size_t len) override {
50 boost::system::error_code ec;
51 auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec);
52 if (ec) {
11fdf7f2 53 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
f64942e4
AA
54 throw rgw::io::Exception(ec.value(), std::system_category());
55 }
56 return bytes;
57 }
7c673cae 58
f64942e4
AA
59 size_t recv_body(char* buf, size_t max) override {
60 auto& message = parser.get();
61 auto& body_remaining = message.body();
62 body_remaining.data = buf;
63 body_remaining.size = max;
64
65 while (body_remaining.size && !parser.is_done()) {
66 boost::system::error_code ec;
11fdf7f2
TL
67 http::read_some(stream, buffer, parser, ec);
68 if (ec == http::error::partial_message ||
69 ec == http::error::need_buffer) {
f64942e4
AA
70 break;
71 }
72 if (ec) {
11fdf7f2 73 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
f64942e4
AA
74 throw rgw::io::Exception(ec.value(), std::system_category());
75 }
76 }
77 return max - body_remaining.size;
78 }
79};
b32b8144 80
11fdf7f2
TL
81using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
82
f64942e4
AA
83template <typename Stream>
84void handle_connection(RGWProcessEnv& env, Stream& stream,
11fdf7f2
TL
85 boost::beast::flat_buffer& buffer, bool is_ssl,
86 SharedMutex& pause_mutex,
87 rgw::dmclock::Scheduler *scheduler,
f64942e4
AA
88 boost::system::error_code& ec,
89 boost::asio::yield_context yield)
90{
b32b8144
FG
91 // limit header to 4k, since we read it all into a single flat_buffer
92 static constexpr size_t header_limit = 4096;
93 // don't impose a limit on the body, since we read it in pieces
94 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
95
f64942e4 96 auto cct = env.store->ctx();
b32b8144 97
f64942e4
AA
98 // read messages from the stream until eof
99 for (;;) {
b32b8144 100 // configure the parser
f64942e4
AA
101 rgw::asio::parser_type parser;
102 parser.header_limit(header_limit);
103 parser.body_limit(body_limit);
7c673cae 104
7c673cae 105 // parse the header
11fdf7f2 106 http::async_read_header(stream, buffer, parser, yield[ec]);
7c673cae 107 if (ec == boost::asio::error::connection_reset ||
11fdf7f2
TL
108 ec == boost::asio::error::bad_descriptor ||
109 ec == boost::asio::error::operation_aborted ||
f64942e4
AA
110#ifdef WITH_RADOSGW_BEAST_OPENSSL
111 ec == ssl::error::stream_truncated ||
112#endif
11fdf7f2
TL
113 ec == http::error::end_of_stream) {
114 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
7c673cae
FG
115 return;
116 }
117 if (ec) {
f64942e4
AA
118 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
119 auto& message = parser.get();
11fdf7f2
TL
120 http::response<http::empty_body> response;
121 response.result(http::status::bad_request);
f64942e4
AA
122 response.version(message.version() == 10 ? 10 : 11);
123 response.prepare_payload();
11fdf7f2 124 http::async_write(stream, response, yield[ec]);
f64942e4
AA
125 if (ec) {
126 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
127 }
128 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
7c673cae
FG
129 return;
130 }
131
11fdf7f2
TL
132 {
133 auto lock = pause_mutex.async_lock_shared(yield[ec]);
134 if (ec == boost::asio::error::operation_aborted) {
135 return;
136 } else if (ec) {
137 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
138 return;
139 }
7c673cae 140
11fdf7f2
TL
141 // process the request
142 RGWRequest req{env.store->get_new_req_id()};
143
144 auto& socket = stream.lowest_layer();
494da23a
TL
145 const auto& remote_endpoint = socket.remote_endpoint(ec);
146 if (ec) {
147 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
148 return;
149 }
150
11fdf7f2
TL
151 StreamIO real_client{cct, stream, parser, buffer, is_ssl,
152 socket.local_endpoint(),
494da23a 153 remote_endpoint};
11fdf7f2
TL
154
155 auto real_client_io = rgw::io::add_reordering(
156 rgw::io::add_buffering(cct,
157 rgw::io::add_chunking(
158 rgw::io::add_conlen_controlling(
159 &real_client))));
160 RGWRestfulIO client(cct, &real_client_io);
161 auto y = optional_yield{socket.get_io_context(), yield};
162 process_request(env.store, env.rest, &req, env.uri_prefix,
163 *env.auth_registry, &client, env.olog, y, scheduler);
164 }
7c673cae 165
f64942e4
AA
166 if (!parser.keep_alive()) {
167 return;
7c673cae 168 }
b32b8144 169
f64942e4
AA
170 // if we failed before reading the entire message, discard any remaining
171 // bytes before reading the next
172 while (!parser.is_done()) {
173 static std::array<char, 1024> discard_buffer;
174
175 auto& body = parser.get().body();
176 body.size = discard_buffer.size();
177 body.data = discard_buffer.data();
178
11fdf7f2 179 http::async_read_some(stream, buffer, parser, yield[ec]);
f64942e4
AA
180 if (ec == boost::asio::error::connection_reset) {
181 return;
182 }
183 if (ec) {
184 ldout(cct, 5) << "failed to discard unread message: "
185 << ec.message() << dendl;
186 return;
187 }
188 }
b32b8144 189 }
f64942e4 190}
b32b8144 191
11fdf7f2
TL
192struct Connection : boost::intrusive::list_base_hook<> {
193 tcp::socket& socket;
194 Connection(tcp::socket& socket) : socket(socket) {}
195};
196
197class ConnectionList {
198 using List = boost::intrusive::list<Connection>;
199 List connections;
200 std::mutex mutex;
201
202 void remove(Connection& c) {
203 std::lock_guard lock{mutex};
204 if (c.is_linked()) {
205 connections.erase(List::s_iterator_to(c));
206 }
207 }
208 public:
209 class Guard {
210 ConnectionList *list;
211 Connection *conn;
212 public:
213 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
214 ~Guard() { list->remove(*conn); }
215 };
216 [[nodiscard]] Guard add(Connection& conn) {
217 std::lock_guard lock{mutex};
218 connections.push_back(conn);
219 return Guard{this, &conn};
220 }
221 void close(boost::system::error_code& ec) {
222 std::lock_guard lock{mutex};
223 for (auto& conn : connections) {
224 conn.socket.close(ec);
225 }
226 connections.clear();
227 }
228};
229
230namespace dmc = rgw::dmclock;
7c673cae
FG
231class AsioFrontend {
232 RGWProcessEnv env;
94b18763 233 RGWFrontendConfig* conf;
11fdf7f2 234 boost::asio::io_context context;
f64942e4
AA
235#ifdef WITH_RADOSGW_BEAST_OPENSSL
236 boost::optional<ssl::context> ssl_context;
237 int init_ssl();
238#endif
11fdf7f2
TL
239 SharedMutex pause_mutex;
240 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
7c673cae 241
28e407b8
AA
242 struct Listener {
243 tcp::endpoint endpoint;
244 tcp::acceptor acceptor;
245 tcp::socket socket;
f64942e4 246 bool use_ssl = false;
11fdf7f2 247 bool use_nodelay = false;
28e407b8 248
11fdf7f2
TL
249 explicit Listener(boost::asio::io_context& context)
250 : acceptor(context), socket(context) {}
28e407b8
AA
251 };
252 std::vector<Listener> listeners;
7c673cae 253
11fdf7f2
TL
254 ConnectionList connections;
255
256 // work guard to keep run() threads busy while listeners are paused
257 using Executor = boost::asio::io_context::executor_type;
258 std::optional<boost::asio::executor_work_guard<Executor>> work;
259
7c673cae 260 std::vector<std::thread> threads;
7c673cae
FG
261 std::atomic<bool> going_down{false};
262
263 CephContext* ctx() const { return env.store->ctx(); }
11fdf7f2
TL
264 std::optional<dmc::ClientCounters> client_counters;
265 std::unique_ptr<dmc::ClientConfig> client_config;
28e407b8 266 void accept(Listener& listener, boost::system::error_code ec);
7c673cae
FG
267
268 public:
11fdf7f2
TL
269 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
270 dmc::SchedulerCtx& sched_ctx)
271 : env(env), conf(conf), pause_mutex(context.get_executor())
272 {
273 auto sched_t = dmc::get_scheduler_t(ctx());
274 switch(sched_t){
275 case dmc::scheduler_t::dmclock:
276 scheduler.reset(new dmc::AsyncScheduler(ctx(),
277 context,
278 std::ref(sched_ctx.get_dmc_client_counters()),
279 sched_ctx.get_dmc_client_config(),
280 *sched_ctx.get_dmc_client_config(),
281 dmc::AtLimit::Reject));
282 break;
283 case dmc::scheduler_t::none:
284 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
285 [[fallthrough]];
286 case dmc::scheduler_t::throttler:
287 scheduler.reset(new dmc::SimpleThrottler(ctx()));
288
289 }
290 }
7c673cae
FG
291
292 int init();
293 int run();
294 void stop();
295 void join();
296 void pause();
297 void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
298};
299
28e407b8
AA
300unsigned short parse_port(const char *input, boost::system::error_code& ec)
301{
302 char *end = nullptr;
303 auto port = std::strtoul(input, &end, 10);
304 if (port > std::numeric_limits<unsigned short>::max()) {
305 ec.assign(ERANGE, boost::system::system_category());
306 } else if (port == 0 && end == input) {
307 ec.assign(EINVAL, boost::system::system_category());
308 }
309 return port;
310}
11fdf7f2
TL
311
312tcp::endpoint parse_endpoint(boost::asio::string_view input,
f64942e4 313 unsigned short default_port,
28e407b8 314 boost::system::error_code& ec)
7c673cae 315{
28e407b8 316 tcp::endpoint endpoint;
7c673cae 317
f64942e4
AA
318 if (input.empty()) {
319 ec = boost::asio::error::invalid_argument;
320 return endpoint;
28e407b8 321 }
f64942e4
AA
322
323 if (input[0] == '[') { // ipv6
324 const size_t addr_begin = 1;
325 const size_t addr_end = input.find(']');
326 if (addr_end == input.npos) { // no matching ]
327 ec = boost::asio::error::invalid_argument;
328 return endpoint;
329 }
330 if (addr_end + 1 < input.size()) {
331 // :port must must follow [ipv6]
332 if (input[addr_end + 1] != ':') {
333 ec = boost::asio::error::invalid_argument;
334 return endpoint;
335 } else {
336 auto port_str = input.substr(addr_end + 2);
337 endpoint.port(parse_port(port_str.data(), ec));
338 }
81eedcae
TL
339 } else {
340 endpoint.port(default_port);
f64942e4
AA
341 }
342 auto addr = input.substr(addr_begin, addr_end - addr_begin);
343 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
344 } else { // ipv4
345 auto colon = input.find(':');
346 if (colon != input.npos) {
347 auto port_str = input.substr(colon + 1);
348 endpoint.port(parse_port(port_str.data(), ec));
349 if (ec) {
350 return endpoint;
351 }
81eedcae
TL
352 } else {
353 endpoint.port(default_port);
f64942e4 354 }
28e407b8 355 auto addr = input.substr(0, colon);
f64942e4 356 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
28e407b8
AA
357 }
358 return endpoint;
359}
360
91327a77
AA
361static int drop_privileges(CephContext *ctx)
362{
363 uid_t uid = ctx->get_set_uid();
364 gid_t gid = ctx->get_set_gid();
365 std::string uid_string = ctx->get_set_uid_string();
366 std::string gid_string = ctx->get_set_gid_string();
367 if (gid && setgid(gid) != 0) {
368 int err = errno;
369 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
370 return -err;
371 }
372 if (uid && setuid(uid) != 0) {
373 int err = errno;
374 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
375 return -err;
376 }
377 if (uid && gid) {
378 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
379 << " (" << uid_string << ":" << gid_string << ")" << dendl;
380 }
381 return 0;
382}
383
28e407b8
AA
384int AsioFrontend::init()
385{
7c673cae 386 boost::system::error_code ec;
28e407b8 387 auto& config = conf->get_config_map();
94b18763 388
f64942e4
AA
389#ifdef WITH_RADOSGW_BEAST_OPENSSL
390 int r = init_ssl();
391 if (r < 0) {
392 return r;
393 }
394#endif
395
28e407b8 396 // parse endpoints
f64942e4
AA
397 auto ports = config.equal_range("port");
398 for (auto i = ports.first; i != ports.second; ++i) {
28e407b8 399 auto port = parse_port(i->second.c_str(), ec);
94b18763 400 if (ec) {
28e407b8 401 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
94b18763
FG
402 return -ec.value();
403 }
11fdf7f2 404 listeners.emplace_back(context);
28e407b8 405 listeners.back().endpoint.port(port);
81eedcae
TL
406
407 listeners.emplace_back(context);
408 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
94b18763
FG
409 }
410
f64942e4
AA
411 auto endpoints = config.equal_range("endpoint");
412 for (auto i = endpoints.first; i != endpoints.second; ++i) {
413 auto endpoint = parse_endpoint(i->second, 80, ec);
28e407b8
AA
414 if (ec) {
415 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
416 return -ec.value();
417 }
11fdf7f2 418 listeners.emplace_back(context);
28e407b8 419 listeners.back().endpoint = endpoint;
7c673cae 420 }
11fdf7f2
TL
421 // parse tcp nodelay
422 auto nodelay = config.find("tcp_nodelay");
423 if (nodelay != config.end()) {
424 for (auto& l : listeners) {
425 l.use_nodelay = (nodelay->second == "1");
426 }
427 }
428
81eedcae
TL
429
430 bool socket_bound = false;
28e407b8
AA
431 // start listeners
432 for (auto& l : listeners) {
433 l.acceptor.open(l.endpoint.protocol(), ec);
434 if (ec) {
81eedcae
TL
435 if (ec == boost::asio::error::address_family_not_supported) {
436 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
437 << ", " << ec.message() << dendl;
438 continue;
439 }
440
28e407b8
AA
441 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
442 return -ec.value();
443 }
81eedcae
TL
444
445 if (l.endpoint.protocol() == tcp::v6()) {
446 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
447 if (ec) {
448 lderr(ctx()) << "failed to set v6_only socket option: "
449 << ec.message() << dendl;
450 return -ec.value();
451 }
452 }
453
28e407b8
AA
454 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
455 l.acceptor.bind(l.endpoint, ec);
456 if (ec) {
457 lderr(ctx()) << "failed to bind address " << l.endpoint
458 << ": " << ec.message() << dendl;
459 return -ec.value();
460 }
81eedcae 461
28e407b8
AA
462 l.acceptor.listen(boost::asio::socket_base::max_connections);
463 l.acceptor.async_accept(l.socket,
464 [this, &l] (boost::system::error_code ec) {
465 accept(l, ec);
466 });
467
468 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
81eedcae
TL
469 socket_bound = true;
470 }
471 if (!socket_bound) {
472 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
473 return -EINVAL;
7c673cae 474 }
81eedcae 475
91327a77 476 return drop_privileges(ctx());
7c673cae
FG
477}
478
f64942e4
AA
479#ifdef WITH_RADOSGW_BEAST_OPENSSL
480int AsioFrontend::init_ssl()
481{
482 boost::system::error_code ec;
483 auto& config = conf->get_config_map();
484
485 // ssl configuration
486 auto cert = config.find("ssl_certificate");
487 const bool have_cert = cert != config.end();
488 if (have_cert) {
489 // only initialize the ssl context if it's going to be used
490 ssl_context = boost::in_place(ssl::context::tls);
491 }
492
493 auto key = config.find("ssl_private_key");
494 const bool have_private_key = key != config.end();
495 if (have_private_key) {
496 if (!have_cert) {
497 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
498 return -EINVAL;
499 }
500 ssl_context->use_private_key_file(key->second, ssl::context::pem, ec);
501 if (ec) {
502 lderr(ctx()) << "failed to add ssl_private_key=" << key->second
503 << ": " << ec.message() << dendl;
504 return -ec.value();
505 }
506 }
507 if (have_cert) {
508 ssl_context->use_certificate_chain_file(cert->second, ec);
509 if (ec) {
510 lderr(ctx()) << "failed to use ssl_certificate=" << cert->second
511 << ": " << ec.message() << dendl;
512 return -ec.value();
513 }
514 if (!have_private_key) {
515 // attempt to use it as a private key if a separate one wasn't provided
516 ssl_context->use_private_key_file(cert->second, ssl::context::pem, ec);
517 if (ec) {
518 lderr(ctx()) << "failed to use ssl_certificate=" << cert->second
519 << " as a private key: " << ec.message() << dendl;
520 return -ec.value();
521 }
522 }
523 }
524
525 // parse ssl endpoints
526 auto ports = config.equal_range("ssl_port");
527 for (auto i = ports.first; i != ports.second; ++i) {
528 if (!have_cert) {
529 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
530 return -EINVAL;
531 }
532 auto port = parse_port(i->second.c_str(), ec);
533 if (ec) {
534 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
535 return -ec.value();
536 }
11fdf7f2 537 listeners.emplace_back(context);
f64942e4
AA
538 listeners.back().endpoint.port(port);
539 listeners.back().use_ssl = true;
81eedcae
TL
540
541 listeners.emplace_back(context);
542 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
543 listeners.back().use_ssl = true;
f64942e4
AA
544 }
545
546 auto endpoints = config.equal_range("ssl_endpoint");
547 for (auto i = endpoints.first; i != endpoints.second; ++i) {
548 if (!have_cert) {
549 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
550 return -EINVAL;
551 }
552 auto endpoint = parse_endpoint(i->second, 443, ec);
553 if (ec) {
554 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
555 return -ec.value();
556 }
11fdf7f2 557 listeners.emplace_back(context);
f64942e4
AA
558 listeners.back().endpoint = endpoint;
559 listeners.back().use_ssl = true;
560 }
561 return 0;
562}
563#endif // WITH_RADOSGW_BEAST_OPENSSL
564
28e407b8 565void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
7c673cae 566{
28e407b8 567 if (!l.acceptor.is_open()) {
7c673cae
FG
568 return;
569 } else if (ec == boost::asio::error::operation_aborted) {
570 return;
571 } else if (ec) {
572 throw ec;
573 }
28e407b8 574 auto socket = std::move(l.socket);
11fdf7f2
TL
575 tcp::no_delay options(l.use_nodelay);
576 socket.set_option(options,ec);
28e407b8
AA
577 l.acceptor.async_accept(l.socket,
578 [this, &l] (boost::system::error_code ec) {
579 accept(l, ec);
580 });
b32b8144 581
f64942e4
AA
582 // spawn a coroutine to handle the connection
583#ifdef WITH_RADOSGW_BEAST_OPENSSL
584 if (l.use_ssl) {
11fdf7f2
TL
585 boost::asio::spawn(context,
586 [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
587 Connection conn{s};
588 auto c = connections.add(conn);
f64942e4
AA
589 // wrap the socket in an ssl stream
590 ssl::stream<tcp::socket&> stream{s, *ssl_context};
11fdf7f2 591 boost::beast::flat_buffer buffer;
f64942e4
AA
592 // do ssl handshake
593 boost::system::error_code ec;
594 auto bytes = stream.async_handshake(ssl::stream_base::server,
595 buffer.data(), yield[ec]);
596 if (ec) {
597 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
598 return;
599 }
600 buffer.consume(bytes);
11fdf7f2
TL
601 handle_connection(env, stream, buffer, true, pause_mutex,
602 scheduler.get(), ec, yield);
f64942e4
AA
603 if (!ec) {
604 // ssl shutdown (ignoring errors)
605 stream.async_shutdown(yield[ec]);
606 }
607 s.shutdown(tcp::socket::shutdown_both, ec);
11fdf7f2 608 });
f64942e4
AA
609 } else {
610#else
611 {
612#endif // WITH_RADOSGW_BEAST_OPENSSL
11fdf7f2
TL
613 boost::asio::spawn(context,
614 [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
615 Connection conn{s};
616 auto c = connections.add(conn);
617 boost::beast::flat_buffer buffer;
f64942e4 618 boost::system::error_code ec;
11fdf7f2
TL
619 handle_connection(env, s, buffer, false, pause_mutex,
620 scheduler.get(), ec, yield);
f64942e4 621 s.shutdown(tcp::socket::shutdown_both, ec);
11fdf7f2 622 });
f64942e4 623 }
7c673cae
FG
624}
625
626int AsioFrontend::run()
627{
628 auto cct = ctx();
629 const int thread_count = cct->_conf->rgw_thread_pool_size;
630 threads.reserve(thread_count);
631
632 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
633
11fdf7f2
TL
634 // the worker threads call io_context::run(), which will return when there's
635 // no work left. hold a work guard to keep these threads going until join()
636 work.emplace(boost::asio::make_work_guard(context));
637
7c673cae
FG
638 for (int i = 0; i < thread_count; i++) {
639 threads.emplace_back([=] {
11fdf7f2
TL
640 // request warnings on synchronous librados calls in this thread
641 is_asio_thread = true;
642 boost::system::error_code ec;
643 context.run(ec);
7c673cae
FG
644 });
645 }
646 return 0;
647}
648
649void AsioFrontend::stop()
650{
651 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
652
653 going_down = true;
654
655 boost::system::error_code ec;
28e407b8
AA
656 // close all listeners
657 for (auto& listener : listeners) {
658 listener.acceptor.close(ec);
659 }
11fdf7f2
TL
660 // close all connections
661 connections.close(ec);
662 pause_mutex.cancel();
7c673cae
FG
663}
664
665void AsioFrontend::join()
666{
667 if (!going_down) {
668 stop();
669 }
11fdf7f2
TL
670 work.reset();
671
7c673cae
FG
672 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
673 for (auto& thread : threads) {
674 thread.join();
675 }
676 ldout(ctx(), 4) << "frontend done" << dendl;
677}
678
679void AsioFrontend::pause()
680{
11fdf7f2
TL
681 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
682
683 // cancel pending calls to accept(), but don't close the sockets
684 boost::system::error_code ec;
685 for (auto& l : listeners) {
686 l.acceptor.cancel(ec);
687 }
688
689 // pause and wait for outstanding requests to complete
690 pause_mutex.lock(ec);
691
692 if (ec) {
693 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
694 } else {
695 ldout(ctx(), 4) << "frontend paused" << dendl;
696 }
7c673cae
FG
697}
698
699void AsioFrontend::unpause(RGWRados* const store,
700 rgw_auth_registry_ptr_t auth_registry)
701{
702 env.store = store;
703 env.auth_registry = std::move(auth_registry);
11fdf7f2
TL
704
705 // unpause to unblock connections
706 pause_mutex.unlock();
707
708 // start accepting connections again
709 for (auto& l : listeners) {
710 l.acceptor.async_accept(l.socket,
711 [this, &l] (boost::system::error_code ec) {
712 accept(l, ec);
713 });
714 }
715
7c673cae 716 ldout(ctx(), 4) << "frontend unpaused" << dendl;
7c673cae
FG
717}
718
719} // anonymous namespace
720
721class RGWAsioFrontend::Impl : public AsioFrontend {
722 public:
11fdf7f2
TL
723 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
724 rgw::dmclock::SchedulerCtx& sched_ctx)
725 : AsioFrontend(env, conf, sched_ctx) {}
7c673cae
FG
726};
727
94b18763 728RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
11fdf7f2
TL
729 RGWFrontendConfig* conf,
730 rgw::dmclock::SchedulerCtx& sched_ctx)
731 : impl(new Impl(env, conf, sched_ctx))
7c673cae
FG
732{
733}
734
735RGWAsioFrontend::~RGWAsioFrontend() = default;
736
737int RGWAsioFrontend::init()
738{
739 return impl->init();
740}
741
742int RGWAsioFrontend::run()
743{
744 return impl->run();
745}
746
747void RGWAsioFrontend::stop()
748{
749 impl->stop();
750}
751
752void RGWAsioFrontend::join()
753{
754 impl->join();
755}
756
757void RGWAsioFrontend::pause_for_new_config()
758{
759 impl->pause();
760}
761
762void RGWAsioFrontend::unpause_with_new_config(
763 RGWRados* const store,
764 rgw_auth_registry_ptr_t auth_registry
765) {
766 impl->unpause(store, std::move(auth_registry));
767}