]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <atomic>
5 #include <thread>
6 #include <vector>
7
8 #include <boost/asio.hpp>
9 #include <boost/intrusive/list.hpp>
10
11 #include <boost/context/protected_fixedsize_stack.hpp>
12 #include <spawn/spawn.hpp>
13
14 #include "common/async/shared_mutex.h"
15 #include "common/errno.h"
16 #include "common/strtol.h"
17
18 #include "rgw_asio_client.h"
19 #include "rgw_asio_frontend.h"
20
21 #ifdef WITH_RADOSGW_BEAST_OPENSSL
22 #include <boost/asio/ssl.hpp>
23
24 #include "services/svc_config_key.h"
25 #include "services/svc_zone.h"
26
27 #include "rgw_zone.h"
28
29 #endif
30
31 #include "rgw_dmclock_async_scheduler.h"
32
33 #define dout_subsys ceph_subsys_rgw
34
35 namespace {
36
37 using tcp = boost::asio::ip::tcp;
38 namespace http = boost::beast::http;
39 #ifdef WITH_RADOSGW_BEAST_OPENSSL
40 namespace ssl = boost::asio::ssl;
41 #endif
42
43 using parse_buffer = boost::beast::flat_static_buffer<65536>;
44
45 // use mmap/mprotect to allocate 512k coroutine stacks
46 auto make_stack_allocator() {
47 return boost::context::protected_fixedsize_stack{512*1024};
48 }
49
50 template <typename Stream>
51 class StreamIO : public rgw::asio::ClientIO {
52 CephContext* const cct;
53 Stream& stream;
54 spawn::yield_context yield;
55 parse_buffer& buffer;
56 public:
57 StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
58 spawn::yield_context yield,
59 parse_buffer& buffer, bool is_ssl,
60 const tcp::endpoint& local_endpoint,
61 const tcp::endpoint& remote_endpoint)
62 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
63 cct(cct), stream(stream), yield(yield), buffer(buffer)
64 {}
65
66 size_t write_data(const char* buf, size_t len) override {
67 boost::system::error_code ec;
68 auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
69 yield[ec]);
70 if (ec) {
71 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
72 throw rgw::io::Exception(ec.value(), std::system_category());
73 }
74 return bytes;
75 }
76
77 size_t recv_body(char* buf, size_t max) override {
78 auto& message = parser.get();
79 auto& body_remaining = message.body();
80 body_remaining.data = buf;
81 body_remaining.size = max;
82
83 while (body_remaining.size && !parser.is_done()) {
84 boost::system::error_code ec;
85 http::async_read_some(stream, buffer, parser, yield[ec]);
86 if (ec == http::error::need_buffer) {
87 break;
88 }
89 if (ec) {
90 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
91 throw rgw::io::Exception(ec.value(), std::system_category());
92 }
93 }
94 return max - body_remaining.size;
95 }
96 };
97
98 using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
99
100 template <typename Stream>
101 void handle_connection(boost::asio::io_context& context,
102 RGWProcessEnv& env, Stream& stream,
103 parse_buffer& buffer, bool is_ssl,
104 SharedMutex& pause_mutex,
105 rgw::dmclock::Scheduler *scheduler,
106 boost::system::error_code& ec,
107 spawn::yield_context yield)
108 {
109 // limit header to 4k, since we read it all into a single flat_buffer
110 static constexpr size_t header_limit = 4096;
111 // don't impose a limit on the body, since we read it in pieces
112 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
113
114 auto cct = env.store->ctx();
115
116 // read messages from the stream until eof
117 for (;;) {
118 // configure the parser
119 rgw::asio::parser_type parser;
120 parser.header_limit(header_limit);
121 parser.body_limit(body_limit);
122
123 // parse the header
124 http::async_read_header(stream, buffer, parser, yield[ec]);
125 if (ec == boost::asio::error::connection_reset ||
126 ec == boost::asio::error::bad_descriptor ||
127 ec == boost::asio::error::operation_aborted ||
128 #ifdef WITH_RADOSGW_BEAST_OPENSSL
129 ec == ssl::error::stream_truncated ||
130 #endif
131 ec == http::error::end_of_stream) {
132 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
133 return;
134 }
135 if (ec) {
136 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
137 auto& message = parser.get();
138 http::response<http::empty_body> response;
139 response.result(http::status::bad_request);
140 response.version(message.version() == 10 ? 10 : 11);
141 response.prepare_payload();
142 http::async_write(stream, response, yield[ec]);
143 if (ec) {
144 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
145 }
146 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
147 return;
148 }
149
150 {
151 auto lock = pause_mutex.async_lock_shared(yield[ec]);
152 if (ec == boost::asio::error::operation_aborted) {
153 return;
154 } else if (ec) {
155 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
156 return;
157 }
158
159 // process the request
160 RGWRequest req{env.store->getRados()->get_new_req_id()};
161
162 auto& socket = stream.lowest_layer();
163 const auto& remote_endpoint = socket.remote_endpoint(ec);
164 if (ec) {
165 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
166 return;
167 }
168
169 StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
170 socket.local_endpoint(),
171 remote_endpoint};
172
173 auto real_client_io = rgw::io::add_reordering(
174 rgw::io::add_buffering(cct,
175 rgw::io::add_chunking(
176 rgw::io::add_conlen_controlling(
177 &real_client))));
178 RGWRestfulIO client(cct, &real_client_io);
179 auto y = optional_yield{context, yield};
180 process_request(env.store, env.rest, &req, env.uri_prefix,
181 *env.auth_registry, &client, env.olog, y, scheduler);
182 }
183
184 if (!parser.keep_alive()) {
185 return;
186 }
187
188 // if we failed before reading the entire message, discard any remaining
189 // bytes before reading the next
190 while (!parser.is_done()) {
191 static std::array<char, 1024> discard_buffer;
192
193 auto& body = parser.get().body();
194 body.size = discard_buffer.size();
195 body.data = discard_buffer.data();
196
197 http::async_read_some(stream, buffer, parser, yield[ec]);
198 if (ec == http::error::need_buffer) {
199 continue;
200 }
201 if (ec == boost::asio::error::connection_reset) {
202 return;
203 }
204 if (ec) {
205 ldout(cct, 5) << "failed to discard unread message: "
206 << ec.message() << dendl;
207 return;
208 }
209 }
210 }
211 }
212
213 struct Connection : boost::intrusive::list_base_hook<> {
214 tcp::socket& socket;
215 Connection(tcp::socket& socket) : socket(socket) {}
216 };
217
218 class ConnectionList {
219 using List = boost::intrusive::list<Connection>;
220 List connections;
221 std::mutex mutex;
222
223 void remove(Connection& c) {
224 std::lock_guard lock{mutex};
225 if (c.is_linked()) {
226 connections.erase(List::s_iterator_to(c));
227 }
228 }
229 public:
230 class Guard {
231 ConnectionList *list;
232 Connection *conn;
233 public:
234 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
235 ~Guard() { list->remove(*conn); }
236 };
237 [[nodiscard]] Guard add(Connection& conn) {
238 std::lock_guard lock{mutex};
239 connections.push_back(conn);
240 return Guard{this, &conn};
241 }
242 void close(boost::system::error_code& ec) {
243 std::lock_guard lock{mutex};
244 for (auto& conn : connections) {
245 conn.socket.close(ec);
246 }
247 connections.clear();
248 }
249 };
250
251 namespace dmc = rgw::dmclock;
252 class AsioFrontend {
253 RGWProcessEnv env;
254 RGWFrontendConfig* conf;
255 boost::asio::io_context context;
256 #ifdef WITH_RADOSGW_BEAST_OPENSSL
257 boost::optional<ssl::context> ssl_context;
258 int get_config_key_val(string name,
259 const string& type,
260 bufferlist *pbl);
261 int ssl_set_private_key(const string& name, bool is_ssl_cert);
262 int ssl_set_certificate_chain(const string& name);
263 int init_ssl();
264 #endif
265 SharedMutex pause_mutex;
266 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
267
268 struct Listener {
269 tcp::endpoint endpoint;
270 tcp::acceptor acceptor;
271 tcp::socket socket;
272 bool use_ssl = false;
273 bool use_nodelay = false;
274
275 explicit Listener(boost::asio::io_context& context)
276 : acceptor(context), socket(context) {}
277 };
278 std::vector<Listener> listeners;
279
280 ConnectionList connections;
281
282 // work guard to keep run() threads busy while listeners are paused
283 using Executor = boost::asio::io_context::executor_type;
284 std::optional<boost::asio::executor_work_guard<Executor>> work;
285
286 std::vector<std::thread> threads;
287 std::atomic<bool> going_down{false};
288
289 CephContext* ctx() const { return env.store->ctx(); }
290 std::optional<dmc::ClientCounters> client_counters;
291 std::unique_ptr<dmc::ClientConfig> client_config;
292 void accept(Listener& listener, boost::system::error_code ec);
293
294 public:
295 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
296 dmc::SchedulerCtx& sched_ctx)
297 : env(env), conf(conf), pause_mutex(context.get_executor())
298 {
299 auto sched_t = dmc::get_scheduler_t(ctx());
300 switch(sched_t){
301 case dmc::scheduler_t::dmclock:
302 scheduler.reset(new dmc::AsyncScheduler(ctx(),
303 context,
304 std::ref(sched_ctx.get_dmc_client_counters()),
305 sched_ctx.get_dmc_client_config(),
306 *sched_ctx.get_dmc_client_config(),
307 dmc::AtLimit::Reject));
308 break;
309 case dmc::scheduler_t::none:
310 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
311 [[fallthrough]];
312 case dmc::scheduler_t::throttler:
313 scheduler.reset(new dmc::SimpleThrottler(ctx()));
314
315 }
316 }
317
318 int init();
319 int run();
320 void stop();
321 void join();
322 void pause();
323 void unpause(rgw::sal::RGWRadosStore* store, rgw_auth_registry_ptr_t);
324 };
325
326 unsigned short parse_port(const char *input, boost::system::error_code& ec)
327 {
328 char *end = nullptr;
329 auto port = std::strtoul(input, &end, 10);
330 if (port > std::numeric_limits<unsigned short>::max()) {
331 ec.assign(ERANGE, boost::system::system_category());
332 } else if (port == 0 && end == input) {
333 ec.assign(EINVAL, boost::system::system_category());
334 }
335 return port;
336 }
337
338 tcp::endpoint parse_endpoint(boost::asio::string_view input,
339 unsigned short default_port,
340 boost::system::error_code& ec)
341 {
342 tcp::endpoint endpoint;
343
344 if (input.empty()) {
345 ec = boost::asio::error::invalid_argument;
346 return endpoint;
347 }
348
349 if (input[0] == '[') { // ipv6
350 const size_t addr_begin = 1;
351 const size_t addr_end = input.find(']');
352 if (addr_end == input.npos) { // no matching ]
353 ec = boost::asio::error::invalid_argument;
354 return endpoint;
355 }
356 if (addr_end + 1 < input.size()) {
357 // :port must must follow [ipv6]
358 if (input[addr_end + 1] != ':') {
359 ec = boost::asio::error::invalid_argument;
360 return endpoint;
361 } else {
362 auto port_str = input.substr(addr_end + 2);
363 endpoint.port(parse_port(port_str.data(), ec));
364 }
365 } else {
366 endpoint.port(default_port);
367 }
368 auto addr = input.substr(addr_begin, addr_end - addr_begin);
369 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
370 } else { // ipv4
371 auto colon = input.find(':');
372 if (colon != input.npos) {
373 auto port_str = input.substr(colon + 1);
374 endpoint.port(parse_port(port_str.data(), ec));
375 if (ec) {
376 return endpoint;
377 }
378 } else {
379 endpoint.port(default_port);
380 }
381 auto addr = input.substr(0, colon);
382 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
383 }
384 return endpoint;
385 }
386
387 static int drop_privileges(CephContext *ctx)
388 {
389 uid_t uid = ctx->get_set_uid();
390 gid_t gid = ctx->get_set_gid();
391 std::string uid_string = ctx->get_set_uid_string();
392 std::string gid_string = ctx->get_set_gid_string();
393 if (gid && setgid(gid) != 0) {
394 int err = errno;
395 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
396 return -err;
397 }
398 if (uid && setuid(uid) != 0) {
399 int err = errno;
400 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
401 return -err;
402 }
403 if (uid && gid) {
404 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
405 << " (" << uid_string << ":" << gid_string << ")" << dendl;
406 }
407 return 0;
408 }
409
410 int AsioFrontend::init()
411 {
412 boost::system::error_code ec;
413 auto& config = conf->get_config_map();
414
415 #ifdef WITH_RADOSGW_BEAST_OPENSSL
416 int r = init_ssl();
417 if (r < 0) {
418 return r;
419 }
420 #endif
421
422 // parse endpoints
423 auto ports = config.equal_range("port");
424 for (auto i = ports.first; i != ports.second; ++i) {
425 auto port = parse_port(i->second.c_str(), ec);
426 if (ec) {
427 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
428 return -ec.value();
429 }
430 listeners.emplace_back(context);
431 listeners.back().endpoint.port(port);
432
433 listeners.emplace_back(context);
434 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
435 }
436
437 auto endpoints = config.equal_range("endpoint");
438 for (auto i = endpoints.first; i != endpoints.second; ++i) {
439 auto endpoint = parse_endpoint(i->second, 80, ec);
440 if (ec) {
441 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
442 return -ec.value();
443 }
444 listeners.emplace_back(context);
445 listeners.back().endpoint = endpoint;
446 }
447 // parse tcp nodelay
448 auto nodelay = config.find("tcp_nodelay");
449 if (nodelay != config.end()) {
450 for (auto& l : listeners) {
451 l.use_nodelay = (nodelay->second == "1");
452 }
453 }
454
455
456 bool socket_bound = false;
457 // start listeners
458 for (auto& l : listeners) {
459 l.acceptor.open(l.endpoint.protocol(), ec);
460 if (ec) {
461 if (ec == boost::asio::error::address_family_not_supported) {
462 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
463 << ", " << ec.message() << dendl;
464 continue;
465 }
466
467 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
468 return -ec.value();
469 }
470
471 if (l.endpoint.protocol() == tcp::v6()) {
472 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
473 if (ec) {
474 lderr(ctx()) << "failed to set v6_only socket option: "
475 << ec.message() << dendl;
476 return -ec.value();
477 }
478 }
479
480 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
481 l.acceptor.bind(l.endpoint, ec);
482 if (ec) {
483 lderr(ctx()) << "failed to bind address " << l.endpoint
484 << ": " << ec.message() << dendl;
485 return -ec.value();
486 }
487
488 auto it = config.find("max_connection_backlog");
489 auto max_connection_backlog = boost::asio::socket_base::max_listen_connections;
490 if (it != config.end()) {
491 string err;
492 max_connection_backlog = strict_strtol(it->second.c_str(), 10, &err);
493 if (!err.empty()) {
494 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it->second << dendl;
495 max_connection_backlog = boost::asio::socket_base::max_listen_connections;
496 }
497 }
498 l.acceptor.listen(max_connection_backlog);
499 l.acceptor.async_accept(l.socket,
500 [this, &l] (boost::system::error_code ec) {
501 accept(l, ec);
502 });
503
504 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
505 socket_bound = true;
506 }
507 if (!socket_bound) {
508 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
509 return -EINVAL;
510 }
511
512 return drop_privileges(ctx());
513 }
514
515 #ifdef WITH_RADOSGW_BEAST_OPENSSL
516
517 static string config_val_prefix = "config://";
518
519 namespace {
520
521 class ExpandMetaVar {
522 map<string, string> meta_map;
523
524 public:
525 ExpandMetaVar(RGWSI_Zone *zone_svc) {
526 meta_map["realm"] = zone_svc->get_realm().get_name();
527 meta_map["realm_id"] = zone_svc->get_realm().get_id();
528 meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
529 meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
530 meta_map["zone"] = zone_svc->zone_name();
531 meta_map["zone_id"] = zone_svc->zone_id().id;
532 }
533
534 string process_str(const string& in);
535 };
536
537 string ExpandMetaVar::process_str(const string& in)
538 {
539 if (meta_map.empty()) {
540 return in;
541 }
542
543 auto pos = in.find('$');
544 if (pos == std::string::npos) {
545 return in;
546 }
547
548 string out;
549 decltype(pos) last_pos = 0;
550
551 while (pos != std::string::npos) {
552 if (pos > last_pos) {
553 out += in.substr(last_pos, pos - last_pos);
554 }
555
556 string var;
557 const char *valid_chars = "abcdefghijklmnopqrstuvwxyz_";
558
559 size_t endpos = 0;
560 if (in[pos+1] == '{') {
561 // ...${foo_bar}...
562 endpos = in.find_first_not_of(valid_chars, pos + 2);
563 if (endpos != std::string::npos &&
564 in[endpos] == '}') {
565 var = in.substr(pos + 2, endpos - pos - 2);
566 endpos++;
567 }
568 } else {
569 // ...$foo...
570 endpos = in.find_first_not_of(valid_chars, pos + 1);
571 if (endpos != std::string::npos)
572 var = in.substr(pos + 1, endpos - pos - 1);
573 else
574 var = in.substr(pos + 1);
575 }
576 string var_source = in.substr(pos, endpos - pos);
577 last_pos = endpos;
578
579 auto iter = meta_map.find(var);
580 if (iter != meta_map.end()) {
581 out += iter->second;
582 } else {
583 out += var_source;
584 }
585 pos = in.find('$', last_pos);
586 }
587 if (last_pos != std::string::npos) {
588 out += in.substr(last_pos);
589 }
590
591 return out;
592 }
593
594 } /* anonymous namespace */
595
596 int AsioFrontend::get_config_key_val(string name,
597 const string& type,
598 bufferlist *pbl)
599 {
600 if (name.empty()) {
601 lderr(ctx()) << "bad " << type << " config value" << dendl;
602 return -EINVAL;
603 }
604
605 auto svc = env.store->svc()->config_key;
606 int r = svc->get(name, true, pbl);
607 if (r < 0) {
608 lderr(ctx()) << type << " was not found: " << name << dendl;
609 return r;
610 }
611 return 0;
612 }
613
614 int AsioFrontend::ssl_set_private_key(const string& name, bool is_ssl_certificate)
615 {
616 boost::system::error_code ec;
617
618 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
619 ssl_context->use_private_key_file(name, ssl::context::pem, ec);
620 } else {
621 bufferlist bl;
622 int r = get_config_key_val(name.substr(config_val_prefix.size()),
623 "ssl_private_key",
624 &bl);
625 if (r < 0) {
626 return r;
627 }
628 ssl_context->use_private_key(boost::asio::buffer(bl.c_str(), bl.length()),
629 ssl::context::pem, ec);
630 }
631
632 if (ec) {
633 if (!is_ssl_certificate) {
634 lderr(ctx()) << "failed to add ssl_private_key=" << name
635 << ": " << ec.message() << dendl;
636 } else {
637 lderr(ctx()) << "failed to use ssl_certificate=" << name
638 << " as a private key: " << ec.message() << dendl;
639 }
640 return -ec.value();
641 }
642
643 return 0;
644 }
645
646 int AsioFrontend::ssl_set_certificate_chain(const string& name)
647 {
648 boost::system::error_code ec;
649
650 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
651 ssl_context->use_certificate_chain_file(name, ec);
652 } else {
653 bufferlist bl;
654 int r = get_config_key_val(name.substr(config_val_prefix.size()),
655 "ssl_certificate",
656 &bl);
657 if (r < 0) {
658 return r;
659 }
660 ssl_context->use_certificate_chain(boost::asio::buffer(bl.c_str(), bl.length()),
661 ec);
662 }
663
664 if (ec) {
665 lderr(ctx()) << "failed to use ssl_certificate=" << name
666 << ": " << ec.message() << dendl;
667 return -ec.value();
668 }
669
670 return 0;
671 }
672
673 int AsioFrontend::init_ssl()
674 {
675 boost::system::error_code ec;
676 auto& config = conf->get_config_map();
677
678 // ssl configuration
679 std::optional<string> cert = conf->get_val("ssl_certificate");
680 if (cert) {
681 // only initialize the ssl context if it's going to be used
682 ssl_context = boost::in_place(ssl::context::tls);
683 }
684
685 std::optional<string> key = conf->get_val("ssl_private_key");
686 bool have_cert = false;
687
688 if (key && !cert) {
689 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
690 return -EINVAL;
691 }
692
693 auto ports = config.equal_range("ssl_port");
694 auto endpoints = config.equal_range("ssl_endpoint");
695
696 /*
697 * don't try to config certificate if frontend isn't configured for ssl
698 */
699 if (ports.first == ports.second &&
700 endpoints.first == endpoints.second) {
701 return 0;
702 }
703
704 bool key_is_cert = false;
705
706 if (cert) {
707 if (!key) {
708 key = cert;
709 key_is_cert = true;
710 }
711
712 ExpandMetaVar emv(env.store->svc()->zone);
713
714 cert = emv.process_str(*cert);
715 key = emv.process_str(*key);
716
717 int r = ssl_set_private_key(*key, key_is_cert);
718 bool have_private_key = (r >= 0);
719 if (r < 0) {
720 if (!key_is_cert) {
721 r = ssl_set_private_key(*cert, true);
722 have_private_key = (r >= 0);
723 }
724 }
725
726 if (have_private_key) {
727 int r = ssl_set_certificate_chain(*cert);
728 have_cert = (r >= 0);
729 }
730 }
731
732 // parse ssl endpoints
733 for (auto i = ports.first; i != ports.second; ++i) {
734 if (!have_cert) {
735 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
736 return -EINVAL;
737 }
738 auto port = parse_port(i->second.c_str(), ec);
739 if (ec) {
740 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
741 return -ec.value();
742 }
743 listeners.emplace_back(context);
744 listeners.back().endpoint.port(port);
745 listeners.back().use_ssl = true;
746
747 listeners.emplace_back(context);
748 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
749 listeners.back().use_ssl = true;
750 }
751
752 for (auto i = endpoints.first; i != endpoints.second; ++i) {
753 if (!have_cert) {
754 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
755 return -EINVAL;
756 }
757 auto endpoint = parse_endpoint(i->second, 443, ec);
758 if (ec) {
759 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
760 return -ec.value();
761 }
762 listeners.emplace_back(context);
763 listeners.back().endpoint = endpoint;
764 listeners.back().use_ssl = true;
765 }
766 return 0;
767 }
768 #endif // WITH_RADOSGW_BEAST_OPENSSL
769
770 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
771 {
772 if (!l.acceptor.is_open()) {
773 return;
774 } else if (ec == boost::asio::error::operation_aborted) {
775 return;
776 } else if (ec) {
777 ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
778 return;
779 }
780 auto socket = std::move(l.socket);
781 tcp::no_delay options(l.use_nodelay);
782 socket.set_option(options,ec);
783 l.acceptor.async_accept(l.socket,
784 [this, &l] (boost::system::error_code ec) {
785 accept(l, ec);
786 });
787
788 // spawn a coroutine to handle the connection
789 #ifdef WITH_RADOSGW_BEAST_OPENSSL
790 if (l.use_ssl) {
791 spawn::spawn(context,
792 [this, s=std::move(socket)] (spawn::yield_context yield) mutable {
793 Connection conn{s};
794 auto c = connections.add(conn);
795 // wrap the socket in an ssl stream
796 ssl::stream<tcp::socket&> stream{s, *ssl_context};
797 auto buffer = std::make_unique<parse_buffer>();
798 // do ssl handshake
799 boost::system::error_code ec;
800 auto bytes = stream.async_handshake(ssl::stream_base::server,
801 buffer->data(), yield[ec]);
802 if (ec) {
803 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
804 return;
805 }
806 buffer->consume(bytes);
807 handle_connection(context, env, stream, *buffer, true, pause_mutex,
808 scheduler.get(), ec, yield);
809 if (!ec) {
810 // ssl shutdown (ignoring errors)
811 stream.async_shutdown(yield[ec]);
812 }
813 s.shutdown(tcp::socket::shutdown_both, ec);
814 }, make_stack_allocator());
815 } else {
816 #else
817 {
818 #endif // WITH_RADOSGW_BEAST_OPENSSL
819 spawn::spawn(context,
820 [this, s=std::move(socket)] (spawn::yield_context yield) mutable {
821 Connection conn{s};
822 auto c = connections.add(conn);
823 auto buffer = std::make_unique<parse_buffer>();
824 boost::system::error_code ec;
825 handle_connection(context, env, s, *buffer, false, pause_mutex,
826 scheduler.get(), ec, yield);
827 s.shutdown(tcp::socket::shutdown_both, ec);
828 }, make_stack_allocator());
829 }
830 }
831
832 int AsioFrontend::run()
833 {
834 auto cct = ctx();
835 const int thread_count = cct->_conf->rgw_thread_pool_size;
836 threads.reserve(thread_count);
837
838 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
839
840 // the worker threads call io_context::run(), which will return when there's
841 // no work left. hold a work guard to keep these threads going until join()
842 work.emplace(boost::asio::make_work_guard(context));
843
844 for (int i = 0; i < thread_count; i++) {
845 threads.emplace_back([=] {
846 // request warnings on synchronous librados calls in this thread
847 is_asio_thread = true;
848 boost::system::error_code ec;
849 context.run(ec);
850 });
851 }
852 return 0;
853 }
854
855 void AsioFrontend::stop()
856 {
857 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
858
859 going_down = true;
860
861 boost::system::error_code ec;
862 // close all listeners
863 for (auto& listener : listeners) {
864 listener.acceptor.close(ec);
865 }
866 // close all connections
867 connections.close(ec);
868 pause_mutex.cancel();
869 }
870
871 void AsioFrontend::join()
872 {
873 if (!going_down) {
874 stop();
875 }
876 work.reset();
877
878 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
879 for (auto& thread : threads) {
880 thread.join();
881 }
882 ldout(ctx(), 4) << "frontend done" << dendl;
883 }
884
885 void AsioFrontend::pause()
886 {
887 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
888
889 // cancel pending calls to accept(), but don't close the sockets
890 boost::system::error_code ec;
891 for (auto& l : listeners) {
892 l.acceptor.cancel(ec);
893 }
894
895 // pause and wait for outstanding requests to complete
896 pause_mutex.lock(ec);
897
898 if (ec) {
899 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
900 } else {
901 ldout(ctx(), 4) << "frontend paused" << dendl;
902 }
903 }
904
905 void AsioFrontend::unpause(rgw::sal::RGWRadosStore* const store,
906 rgw_auth_registry_ptr_t auth_registry)
907 {
908 env.store = store;
909 env.auth_registry = std::move(auth_registry);
910
911 // unpause to unblock connections
912 pause_mutex.unlock();
913
914 // start accepting connections again
915 for (auto& l : listeners) {
916 l.acceptor.async_accept(l.socket,
917 [this, &l] (boost::system::error_code ec) {
918 accept(l, ec);
919 });
920 }
921
922 ldout(ctx(), 4) << "frontend unpaused" << dendl;
923 }
924
925 } // anonymous namespace
926
927 class RGWAsioFrontend::Impl : public AsioFrontend {
928 public:
929 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
930 rgw::dmclock::SchedulerCtx& sched_ctx)
931 : AsioFrontend(env, conf, sched_ctx) {}
932 };
933
934 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
935 RGWFrontendConfig* conf,
936 rgw::dmclock::SchedulerCtx& sched_ctx)
937 : impl(new Impl(env, conf, sched_ctx))
938 {
939 }
940
941 RGWAsioFrontend::~RGWAsioFrontend() = default;
942
943 int RGWAsioFrontend::init()
944 {
945 return impl->init();
946 }
947
948 int RGWAsioFrontend::run()
949 {
950 return impl->run();
951 }
952
953 void RGWAsioFrontend::stop()
954 {
955 impl->stop();
956 }
957
958 void RGWAsioFrontend::join()
959 {
960 impl->join();
961 }
962
963 void RGWAsioFrontend::pause_for_new_config()
964 {
965 impl->pause();
966 }
967
968 void RGWAsioFrontend::unpause_with_new_config(
969 rgw::sal::RGWRadosStore* const store,
970 rgw_auth_registry_ptr_t auth_registry
971 ) {
972 impl->unpause(store, std::move(auth_registry));
973 }