]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <atomic>
5 #include <thread>
6 #include <vector>
7
8 #include <boost/asio.hpp>
9 #include <boost/intrusive/list.hpp>
10
11 #include <boost/context/protected_fixedsize_stack.hpp>
12 #include <spawn/spawn.hpp>
13
14 #include "common/async/shared_mutex.h"
15 #include "common/errno.h"
16 #include "common/strtol.h"
17
18 #include "rgw_asio_client.h"
19 #include "rgw_asio_frontend.h"
20
21 #ifdef WITH_RADOSGW_BEAST_OPENSSL
22 #include <boost/asio/ssl.hpp>
23
24 #include "services/svc_config_key.h"
25 #include "services/svc_zone.h"
26
27 #include "rgw_zone.h"
28
29 #endif
30
31 #include "rgw_dmclock_async_scheduler.h"
32
33 #define dout_subsys ceph_subsys_rgw
34
35 namespace {
36
37 using tcp = boost::asio::ip::tcp;
38 namespace http = boost::beast::http;
39 #ifdef WITH_RADOSGW_BEAST_OPENSSL
40 namespace ssl = boost::asio::ssl;
41 #endif
42
43 using parse_buffer = boost::beast::flat_static_buffer<65536>;
44
45 // use mmap/mprotect to allocate 512k coroutine stacks
46 auto make_stack_allocator() {
47 return boost::context::protected_fixedsize_stack{512*1024};
48 }
49
50 template <typename Stream>
51 class StreamIO : public rgw::asio::ClientIO {
52 CephContext* const cct;
53 Stream& stream;
54 spawn::yield_context yield;
55 parse_buffer& buffer;
56 public:
57 StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
58 spawn::yield_context yield,
59 parse_buffer& buffer, bool is_ssl,
60 const tcp::endpoint& local_endpoint,
61 const tcp::endpoint& remote_endpoint)
62 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
63 cct(cct), stream(stream), yield(yield), buffer(buffer)
64 {}
65
66 size_t write_data(const char* buf, size_t len) override {
67 boost::system::error_code ec;
68 auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
69 yield[ec]);
70 if (ec) {
71 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
72 if (ec==boost::asio::error::broken_pipe) {
73 boost::system::error_code ec_ignored;
74 stream.lowest_layer().shutdown(tcp::socket::shutdown_both, ec_ignored);
75 }
76 throw rgw::io::Exception(ec.value(), std::system_category());
77 }
78 return bytes;
79 }
80
81 size_t recv_body(char* buf, size_t max) override {
82 auto& message = parser.get();
83 auto& body_remaining = message.body();
84 body_remaining.data = buf;
85 body_remaining.size = max;
86
87 while (body_remaining.size && !parser.is_done()) {
88 boost::system::error_code ec;
89 http::async_read_some(stream, buffer, parser, yield[ec]);
90 if (ec == http::error::need_buffer) {
91 break;
92 }
93 if (ec) {
94 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
95 throw rgw::io::Exception(ec.value(), std::system_category());
96 }
97 }
98 return max - body_remaining.size;
99 }
100 };
101
102 // output the http version as a string, ie 'HTTP/1.1'
103 struct http_version {
104 unsigned major_ver;
105 unsigned minor_ver;
106 explicit http_version(unsigned version)
107 : major_ver(version / 10), minor_ver(version % 10) {}
108 };
109 std::ostream& operator<<(std::ostream& out, const http_version& v) {
110 return out << "HTTP/" << v.major_ver << '.' << v.minor_ver;
111 }
112
113 // log an http header value or '-' if it's missing
114 struct log_header {
115 const http::fields& fields;
116 http::field field;
117 std::string_view quote;
118 log_header(const http::fields& fields, http::field field,
119 std::string_view quote = "")
120 : fields(fields), field(field), quote(quote) {}
121 };
122 std::ostream& operator<<(std::ostream& out, const log_header& h) {
123 auto p = h.fields.find(h.field);
124 if (p == h.fields.end()) {
125 return out << '-';
126 }
127 return out << h.quote << p->value() << h.quote;
128 }
129
130 using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
131
132 template <typename Stream>
133 void handle_connection(boost::asio::io_context& context,
134 RGWProcessEnv& env, Stream& stream,
135 parse_buffer& buffer, bool is_ssl,
136 SharedMutex& pause_mutex,
137 rgw::dmclock::Scheduler *scheduler,
138 boost::system::error_code& ec,
139 spawn::yield_context yield)
140 {
141 // limit header to 4k, since we read it all into a single flat_buffer
142 static constexpr size_t header_limit = 4096;
143 // don't impose a limit on the body, since we read it in pieces
144 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
145
146 auto cct = env.store->ctx();
147
148 // read messages from the stream until eof
149 for (;;) {
150 // configure the parser
151 rgw::asio::parser_type parser;
152 parser.header_limit(header_limit);
153 parser.body_limit(body_limit);
154
155 // parse the header
156 http::async_read_header(stream, buffer, parser, yield[ec]);
157 if (ec == boost::asio::error::connection_reset ||
158 ec == boost::asio::error::bad_descriptor ||
159 ec == boost::asio::error::operation_aborted ||
160 #ifdef WITH_RADOSGW_BEAST_OPENSSL
161 ec == ssl::error::stream_truncated ||
162 #endif
163 ec == http::error::end_of_stream) {
164 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
165 return;
166 }
167 auto& message = parser.get();
168 if (ec) {
169 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
170 http::response<http::empty_body> response;
171 response.result(http::status::bad_request);
172 response.version(message.version() == 10 ? 10 : 11);
173 response.prepare_payload();
174 http::async_write(stream, response, yield[ec]);
175 if (ec) {
176 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
177 }
178 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
179 return;
180 }
181
182 {
183 auto lock = pause_mutex.async_lock_shared(yield[ec]);
184 if (ec == boost::asio::error::operation_aborted) {
185 return;
186 } else if (ec) {
187 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
188 return;
189 }
190
191 // process the request
192 RGWRequest req{env.store->getRados()->get_new_req_id()};
193
194 auto& socket = stream.lowest_layer();
195 const auto& remote_endpoint = socket.remote_endpoint(ec);
196 if (ec) {
197 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
198 return;
199 }
200
201 StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
202 socket.local_endpoint(),
203 remote_endpoint};
204
205 auto real_client_io = rgw::io::add_reordering(
206 rgw::io::add_buffering(cct,
207 rgw::io::add_chunking(
208 rgw::io::add_conlen_controlling(
209 &real_client))));
210 RGWRestfulIO client(cct, &real_client_io);
211 auto y = optional_yield{context, yield};
212 int http_ret = 0;
213 process_request(env.store, env.rest, &req, env.uri_prefix,
214 *env.auth_registry, &client, env.olog, y,
215 scheduler, &http_ret);
216
217 if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
218 // access log line elements begin per Apache Combined Log Format with additions following
219 const auto now = ceph::coarse_real_clock::now();
220 using ceph::operator<<; // for coarse_real_time
221 ldout(cct, 1) << "beast: " << hex << &req << dec << ": "
222 << remote_endpoint.address() << " - - [" << now << "] \""
223 << message.method_string() << ' ' << message.target() << ' '
224 << http_version{message.version()} << "\" " << http_ret << ' '
225 << client.get_bytes_sent() + client.get_bytes_received() << ' '
226 << log_header{message, http::field::referer, "\""} << ' '
227 << log_header{message, http::field::user_agent, "\""} << ' '
228 << log_header{message, http::field::range} << dendl;
229 }
230 }
231
232 if (!parser.keep_alive()) {
233 return;
234 }
235
236 // if we failed before reading the entire message, discard any remaining
237 // bytes before reading the next
238 while (!parser.is_done()) {
239 static std::array<char, 1024> discard_buffer;
240
241 auto& body = parser.get().body();
242 body.size = discard_buffer.size();
243 body.data = discard_buffer.data();
244
245 http::async_read_some(stream, buffer, parser, yield[ec]);
246 if (ec == http::error::need_buffer) {
247 continue;
248 }
249 if (ec == boost::asio::error::connection_reset) {
250 return;
251 }
252 if (ec) {
253 ldout(cct, 5) << "failed to discard unread message: "
254 << ec.message() << dendl;
255 return;
256 }
257 }
258 }
259 }
260
261 struct Connection : boost::intrusive::list_base_hook<> {
262 tcp::socket& socket;
263 Connection(tcp::socket& socket) : socket(socket) {}
264 };
265
266 class ConnectionList {
267 using List = boost::intrusive::list<Connection>;
268 List connections;
269 std::mutex mutex;
270
271 void remove(Connection& c) {
272 std::lock_guard lock{mutex};
273 if (c.is_linked()) {
274 connections.erase(List::s_iterator_to(c));
275 }
276 }
277 public:
278 class Guard {
279 ConnectionList *list;
280 Connection *conn;
281 public:
282 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
283 ~Guard() { list->remove(*conn); }
284 };
285 [[nodiscard]] Guard add(Connection& conn) {
286 std::lock_guard lock{mutex};
287 connections.push_back(conn);
288 return Guard{this, &conn};
289 }
290 void close(boost::system::error_code& ec) {
291 std::lock_guard lock{mutex};
292 for (auto& conn : connections) {
293 conn.socket.close(ec);
294 }
295 connections.clear();
296 }
297 };
298
299 namespace dmc = rgw::dmclock;
300 class AsioFrontend {
301 RGWProcessEnv env;
302 RGWFrontendConfig* conf;
303 boost::asio::io_context context;
304 #ifdef WITH_RADOSGW_BEAST_OPENSSL
305 boost::optional<ssl::context> ssl_context;
306 int get_config_key_val(string name,
307 const string& type,
308 bufferlist *pbl);
309 int ssl_set_private_key(const string& name, bool is_ssl_cert);
310 int ssl_set_certificate_chain(const string& name);
311 int init_ssl();
312 #endif
313 SharedMutex pause_mutex;
314 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
315
316 struct Listener {
317 tcp::endpoint endpoint;
318 tcp::acceptor acceptor;
319 tcp::socket socket;
320 bool use_ssl = false;
321 bool use_nodelay = false;
322
323 explicit Listener(boost::asio::io_context& context)
324 : acceptor(context), socket(context) {}
325 };
326 std::vector<Listener> listeners;
327
328 ConnectionList connections;
329
330 // work guard to keep run() threads busy while listeners are paused
331 using Executor = boost::asio::io_context::executor_type;
332 std::optional<boost::asio::executor_work_guard<Executor>> work;
333
334 std::vector<std::thread> threads;
335 std::atomic<bool> going_down{false};
336
337 CephContext* ctx() const { return env.store->ctx(); }
338 std::optional<dmc::ClientCounters> client_counters;
339 std::unique_ptr<dmc::ClientConfig> client_config;
340 void accept(Listener& listener, boost::system::error_code ec);
341
342 public:
343 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
344 dmc::SchedulerCtx& sched_ctx)
345 : env(env), conf(conf), pause_mutex(context.get_executor())
346 {
347 auto sched_t = dmc::get_scheduler_t(ctx());
348 switch(sched_t){
349 case dmc::scheduler_t::dmclock:
350 scheduler.reset(new dmc::AsyncScheduler(ctx(),
351 context,
352 std::ref(sched_ctx.get_dmc_client_counters()),
353 sched_ctx.get_dmc_client_config(),
354 *sched_ctx.get_dmc_client_config(),
355 dmc::AtLimit::Reject));
356 break;
357 case dmc::scheduler_t::none:
358 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
359 [[fallthrough]];
360 case dmc::scheduler_t::throttler:
361 scheduler.reset(new dmc::SimpleThrottler(ctx()));
362
363 }
364 }
365
366 int init();
367 int run();
368 void stop();
369 void join();
370 void pause();
371 void unpause(rgw::sal::RGWRadosStore* store, rgw_auth_registry_ptr_t);
372 };
373
374 unsigned short parse_port(const char *input, boost::system::error_code& ec)
375 {
376 char *end = nullptr;
377 auto port = std::strtoul(input, &end, 10);
378 if (port > std::numeric_limits<unsigned short>::max()) {
379 ec.assign(ERANGE, boost::system::system_category());
380 } else if (port == 0 && end == input) {
381 ec.assign(EINVAL, boost::system::system_category());
382 }
383 return port;
384 }
385
386 tcp::endpoint parse_endpoint(boost::asio::string_view input,
387 unsigned short default_port,
388 boost::system::error_code& ec)
389 {
390 tcp::endpoint endpoint;
391
392 if (input.empty()) {
393 ec = boost::asio::error::invalid_argument;
394 return endpoint;
395 }
396
397 if (input[0] == '[') { // ipv6
398 const size_t addr_begin = 1;
399 const size_t addr_end = input.find(']');
400 if (addr_end == input.npos) { // no matching ]
401 ec = boost::asio::error::invalid_argument;
402 return endpoint;
403 }
404 if (addr_end + 1 < input.size()) {
405 // :port must must follow [ipv6]
406 if (input[addr_end + 1] != ':') {
407 ec = boost::asio::error::invalid_argument;
408 return endpoint;
409 } else {
410 auto port_str = input.substr(addr_end + 2);
411 endpoint.port(parse_port(port_str.data(), ec));
412 }
413 } else {
414 endpoint.port(default_port);
415 }
416 auto addr = input.substr(addr_begin, addr_end - addr_begin);
417 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
418 } else { // ipv4
419 auto colon = input.find(':');
420 if (colon != input.npos) {
421 auto port_str = input.substr(colon + 1);
422 endpoint.port(parse_port(port_str.data(), ec));
423 if (ec) {
424 return endpoint;
425 }
426 } else {
427 endpoint.port(default_port);
428 }
429 auto addr = input.substr(0, colon);
430 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
431 }
432 return endpoint;
433 }
434
435 static int drop_privileges(CephContext *ctx)
436 {
437 uid_t uid = ctx->get_set_uid();
438 gid_t gid = ctx->get_set_gid();
439 std::string uid_string = ctx->get_set_uid_string();
440 std::string gid_string = ctx->get_set_gid_string();
441 if (gid && setgid(gid) != 0) {
442 int err = errno;
443 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
444 return -err;
445 }
446 if (uid && setuid(uid) != 0) {
447 int err = errno;
448 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
449 return -err;
450 }
451 if (uid && gid) {
452 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
453 << " (" << uid_string << ":" << gid_string << ")" << dendl;
454 }
455 return 0;
456 }
457
458 int AsioFrontend::init()
459 {
460 boost::system::error_code ec;
461 auto& config = conf->get_config_map();
462
463 #ifdef WITH_RADOSGW_BEAST_OPENSSL
464 int r = init_ssl();
465 if (r < 0) {
466 return r;
467 }
468 #endif
469
470 // parse endpoints
471 auto ports = config.equal_range("port");
472 for (auto i = ports.first; i != ports.second; ++i) {
473 auto port = parse_port(i->second.c_str(), ec);
474 if (ec) {
475 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
476 return -ec.value();
477 }
478 listeners.emplace_back(context);
479 listeners.back().endpoint.port(port);
480
481 listeners.emplace_back(context);
482 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
483 }
484
485 auto endpoints = config.equal_range("endpoint");
486 for (auto i = endpoints.first; i != endpoints.second; ++i) {
487 auto endpoint = parse_endpoint(i->second, 80, ec);
488 if (ec) {
489 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
490 return -ec.value();
491 }
492 listeners.emplace_back(context);
493 listeners.back().endpoint = endpoint;
494 }
495 // parse tcp nodelay
496 auto nodelay = config.find("tcp_nodelay");
497 if (nodelay != config.end()) {
498 for (auto& l : listeners) {
499 l.use_nodelay = (nodelay->second == "1");
500 }
501 }
502
503
504 bool socket_bound = false;
505 // start listeners
506 for (auto& l : listeners) {
507 l.acceptor.open(l.endpoint.protocol(), ec);
508 if (ec) {
509 if (ec == boost::asio::error::address_family_not_supported) {
510 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
511 << ", " << ec.message() << dendl;
512 continue;
513 }
514
515 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
516 return -ec.value();
517 }
518
519 if (l.endpoint.protocol() == tcp::v6()) {
520 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
521 if (ec) {
522 lderr(ctx()) << "failed to set v6_only socket option: "
523 << ec.message() << dendl;
524 return -ec.value();
525 }
526 }
527
528 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
529 l.acceptor.bind(l.endpoint, ec);
530 if (ec) {
531 lderr(ctx()) << "failed to bind address " << l.endpoint
532 << ": " << ec.message() << dendl;
533 return -ec.value();
534 }
535
536 auto it = config.find("max_connection_backlog");
537 auto max_connection_backlog = boost::asio::socket_base::max_listen_connections;
538 if (it != config.end()) {
539 string err;
540 max_connection_backlog = strict_strtol(it->second.c_str(), 10, &err);
541 if (!err.empty()) {
542 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it->second << dendl;
543 max_connection_backlog = boost::asio::socket_base::max_listen_connections;
544 }
545 }
546 l.acceptor.listen(max_connection_backlog);
547 l.acceptor.async_accept(l.socket,
548 [this, &l] (boost::system::error_code ec) {
549 accept(l, ec);
550 });
551
552 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
553 socket_bound = true;
554 }
555 if (!socket_bound) {
556 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
557 return -EINVAL;
558 }
559
560 return drop_privileges(ctx());
561 }
562
563 #ifdef WITH_RADOSGW_BEAST_OPENSSL
564
565 static string config_val_prefix = "config://";
566
567 namespace {
568
569 class ExpandMetaVar {
570 map<string, string> meta_map;
571
572 public:
573 ExpandMetaVar(RGWSI_Zone *zone_svc) {
574 meta_map["realm"] = zone_svc->get_realm().get_name();
575 meta_map["realm_id"] = zone_svc->get_realm().get_id();
576 meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
577 meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
578 meta_map["zone"] = zone_svc->zone_name();
579 meta_map["zone_id"] = zone_svc->zone_id().id;
580 }
581
582 string process_str(const string& in);
583 };
584
585 string ExpandMetaVar::process_str(const string& in)
586 {
587 if (meta_map.empty()) {
588 return in;
589 }
590
591 auto pos = in.find('$');
592 if (pos == std::string::npos) {
593 return in;
594 }
595
596 string out;
597 decltype(pos) last_pos = 0;
598
599 while (pos != std::string::npos) {
600 if (pos > last_pos) {
601 out += in.substr(last_pos, pos - last_pos);
602 }
603
604 string var;
605 const char *valid_chars = "abcdefghijklmnopqrstuvwxyz_";
606
607 size_t endpos = 0;
608 if (in[pos+1] == '{') {
609 // ...${foo_bar}...
610 endpos = in.find_first_not_of(valid_chars, pos + 2);
611 if (endpos != std::string::npos &&
612 in[endpos] == '}') {
613 var = in.substr(pos + 2, endpos - pos - 2);
614 endpos++;
615 }
616 } else {
617 // ...$foo...
618 endpos = in.find_first_not_of(valid_chars, pos + 1);
619 if (endpos != std::string::npos)
620 var = in.substr(pos + 1, endpos - pos - 1);
621 else
622 var = in.substr(pos + 1);
623 }
624 string var_source = in.substr(pos, endpos - pos);
625 last_pos = endpos;
626
627 auto iter = meta_map.find(var);
628 if (iter != meta_map.end()) {
629 out += iter->second;
630 } else {
631 out += var_source;
632 }
633 pos = in.find('$', last_pos);
634 }
635 if (last_pos != std::string::npos) {
636 out += in.substr(last_pos);
637 }
638
639 return out;
640 }
641
642 } /* anonymous namespace */
643
644 int AsioFrontend::get_config_key_val(string name,
645 const string& type,
646 bufferlist *pbl)
647 {
648 if (name.empty()) {
649 lderr(ctx()) << "bad " << type << " config value" << dendl;
650 return -EINVAL;
651 }
652
653 auto svc = env.store->svc()->config_key;
654 int r = svc->get(name, true, pbl);
655 if (r < 0) {
656 lderr(ctx()) << type << " was not found: " << name << dendl;
657 return r;
658 }
659 return 0;
660 }
661
662 int AsioFrontend::ssl_set_private_key(const string& name, bool is_ssl_certificate)
663 {
664 boost::system::error_code ec;
665
666 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
667 ssl_context->use_private_key_file(name, ssl::context::pem, ec);
668 } else {
669 bufferlist bl;
670 int r = get_config_key_val(name.substr(config_val_prefix.size()),
671 "ssl_private_key",
672 &bl);
673 if (r < 0) {
674 return r;
675 }
676 ssl_context->use_private_key(boost::asio::buffer(bl.c_str(), bl.length()),
677 ssl::context::pem, ec);
678 }
679
680 if (ec) {
681 if (!is_ssl_certificate) {
682 lderr(ctx()) << "failed to add ssl_private_key=" << name
683 << ": " << ec.message() << dendl;
684 } else {
685 lderr(ctx()) << "failed to use ssl_certificate=" << name
686 << " as a private key: " << ec.message() << dendl;
687 }
688 return -ec.value();
689 }
690
691 return 0;
692 }
693
694 int AsioFrontend::ssl_set_certificate_chain(const string& name)
695 {
696 boost::system::error_code ec;
697
698 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
699 ssl_context->use_certificate_chain_file(name, ec);
700 } else {
701 bufferlist bl;
702 int r = get_config_key_val(name.substr(config_val_prefix.size()),
703 "ssl_certificate",
704 &bl);
705 if (r < 0) {
706 return r;
707 }
708 ssl_context->use_certificate_chain(boost::asio::buffer(bl.c_str(), bl.length()),
709 ec);
710 }
711
712 if (ec) {
713 lderr(ctx()) << "failed to use ssl_certificate=" << name
714 << ": " << ec.message() << dendl;
715 return -ec.value();
716 }
717
718 return 0;
719 }
720
721 int AsioFrontend::init_ssl()
722 {
723 boost::system::error_code ec;
724 auto& config = conf->get_config_map();
725
726 // ssl configuration
727 std::optional<string> cert = conf->get_val("ssl_certificate");
728 if (cert) {
729 // only initialize the ssl context if it's going to be used
730 ssl_context = boost::in_place(ssl::context::tls);
731 }
732
733 std::optional<string> key = conf->get_val("ssl_private_key");
734 bool have_cert = false;
735
736 if (key && !cert) {
737 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
738 return -EINVAL;
739 }
740
741 auto ports = config.equal_range("ssl_port");
742 auto endpoints = config.equal_range("ssl_endpoint");
743
744 /*
745 * don't try to config certificate if frontend isn't configured for ssl
746 */
747 if (ports.first == ports.second &&
748 endpoints.first == endpoints.second) {
749 return 0;
750 }
751
752 bool key_is_cert = false;
753
754 if (cert) {
755 if (!key) {
756 key = cert;
757 key_is_cert = true;
758 }
759
760 ExpandMetaVar emv(env.store->svc()->zone);
761
762 cert = emv.process_str(*cert);
763 key = emv.process_str(*key);
764
765 int r = ssl_set_private_key(*key, key_is_cert);
766 bool have_private_key = (r >= 0);
767 if (r < 0) {
768 if (!key_is_cert) {
769 r = ssl_set_private_key(*cert, true);
770 have_private_key = (r >= 0);
771 }
772 }
773
774 if (have_private_key) {
775 int r = ssl_set_certificate_chain(*cert);
776 have_cert = (r >= 0);
777 }
778 }
779
780 // parse ssl endpoints
781 for (auto i = ports.first; i != ports.second; ++i) {
782 if (!have_cert) {
783 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
784 return -EINVAL;
785 }
786 auto port = parse_port(i->second.c_str(), ec);
787 if (ec) {
788 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
789 return -ec.value();
790 }
791 listeners.emplace_back(context);
792 listeners.back().endpoint.port(port);
793 listeners.back().use_ssl = true;
794
795 listeners.emplace_back(context);
796 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
797 listeners.back().use_ssl = true;
798 }
799
800 for (auto i = endpoints.first; i != endpoints.second; ++i) {
801 if (!have_cert) {
802 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
803 return -EINVAL;
804 }
805 auto endpoint = parse_endpoint(i->second, 443, ec);
806 if (ec) {
807 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
808 return -ec.value();
809 }
810 listeners.emplace_back(context);
811 listeners.back().endpoint = endpoint;
812 listeners.back().use_ssl = true;
813 }
814 return 0;
815 }
816 #endif // WITH_RADOSGW_BEAST_OPENSSL
817
818 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
819 {
820 if (!l.acceptor.is_open()) {
821 return;
822 } else if (ec == boost::asio::error::operation_aborted) {
823 return;
824 } else if (ec) {
825 ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
826 return;
827 }
828 auto socket = std::move(l.socket);
829 tcp::no_delay options(l.use_nodelay);
830 socket.set_option(options,ec);
831 l.acceptor.async_accept(l.socket,
832 [this, &l] (boost::system::error_code ec) {
833 accept(l, ec);
834 });
835
836 // spawn a coroutine to handle the connection
837 #ifdef WITH_RADOSGW_BEAST_OPENSSL
838 if (l.use_ssl) {
839 spawn::spawn(context,
840 [this, s=std::move(socket)] (spawn::yield_context yield) mutable {
841 Connection conn{s};
842 auto c = connections.add(conn);
843 // wrap the socket in an ssl stream
844 ssl::stream<tcp::socket&> stream{s, *ssl_context};
845 auto buffer = std::make_unique<parse_buffer>();
846 // do ssl handshake
847 boost::system::error_code ec;
848 auto bytes = stream.async_handshake(ssl::stream_base::server,
849 buffer->data(), yield[ec]);
850 if (ec) {
851 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
852 return;
853 }
854 buffer->consume(bytes);
855 handle_connection(context, env, stream, *buffer, true, pause_mutex,
856 scheduler.get(), ec, yield);
857 if (!ec) {
858 // ssl shutdown (ignoring errors)
859 stream.async_shutdown(yield[ec]);
860 }
861 s.shutdown(tcp::socket::shutdown_both, ec);
862 }, make_stack_allocator());
863 } else {
864 #else
865 {
866 #endif // WITH_RADOSGW_BEAST_OPENSSL
867 spawn::spawn(context,
868 [this, s=std::move(socket)] (spawn::yield_context yield) mutable {
869 Connection conn{s};
870 auto c = connections.add(conn);
871 auto buffer = std::make_unique<parse_buffer>();
872 boost::system::error_code ec;
873 handle_connection(context, env, s, *buffer, false, pause_mutex,
874 scheduler.get(), ec, yield);
875 s.shutdown(tcp::socket::shutdown_both, ec);
876 }, make_stack_allocator());
877 }
878 }
879
880 int AsioFrontend::run()
881 {
882 auto cct = ctx();
883 const int thread_count = cct->_conf->rgw_thread_pool_size;
884 threads.reserve(thread_count);
885
886 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
887
888 // the worker threads call io_context::run(), which will return when there's
889 // no work left. hold a work guard to keep these threads going until join()
890 work.emplace(boost::asio::make_work_guard(context));
891
892 for (int i = 0; i < thread_count; i++) {
893 threads.emplace_back([=] {
894 // request warnings on synchronous librados calls in this thread
895 is_asio_thread = true;
896 boost::system::error_code ec;
897 context.run(ec);
898 });
899 }
900 return 0;
901 }
902
903 void AsioFrontend::stop()
904 {
905 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
906
907 going_down = true;
908
909 boost::system::error_code ec;
910 // close all listeners
911 for (auto& listener : listeners) {
912 listener.acceptor.close(ec);
913 }
914 // close all connections
915 connections.close(ec);
916 pause_mutex.cancel();
917 }
918
919 void AsioFrontend::join()
920 {
921 if (!going_down) {
922 stop();
923 }
924 work.reset();
925
926 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
927 for (auto& thread : threads) {
928 thread.join();
929 }
930 ldout(ctx(), 4) << "frontend done" << dendl;
931 }
932
933 void AsioFrontend::pause()
934 {
935 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
936
937 // cancel pending calls to accept(), but don't close the sockets
938 boost::system::error_code ec;
939 for (auto& l : listeners) {
940 l.acceptor.cancel(ec);
941 }
942
943 // pause and wait for outstanding requests to complete
944 pause_mutex.lock(ec);
945
946 if (ec) {
947 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
948 } else {
949 ldout(ctx(), 4) << "frontend paused" << dendl;
950 }
951 }
952
953 void AsioFrontend::unpause(rgw::sal::RGWRadosStore* const store,
954 rgw_auth_registry_ptr_t auth_registry)
955 {
956 env.store = store;
957 env.auth_registry = std::move(auth_registry);
958
959 // unpause to unblock connections
960 pause_mutex.unlock();
961
962 // start accepting connections again
963 for (auto& l : listeners) {
964 l.acceptor.async_accept(l.socket,
965 [this, &l] (boost::system::error_code ec) {
966 accept(l, ec);
967 });
968 }
969
970 ldout(ctx(), 4) << "frontend unpaused" << dendl;
971 }
972
973 } // anonymous namespace
974
975 class RGWAsioFrontend::Impl : public AsioFrontend {
976 public:
977 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
978 rgw::dmclock::SchedulerCtx& sched_ctx)
979 : AsioFrontend(env, conf, sched_ctx) {}
980 };
981
982 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
983 RGWFrontendConfig* conf,
984 rgw::dmclock::SchedulerCtx& sched_ctx)
985 : impl(new Impl(env, conf, sched_ctx))
986 {
987 }
988
989 RGWAsioFrontend::~RGWAsioFrontend() = default;
990
991 int RGWAsioFrontend::init()
992 {
993 return impl->init();
994 }
995
996 int RGWAsioFrontend::run()
997 {
998 return impl->run();
999 }
1000
1001 void RGWAsioFrontend::stop()
1002 {
1003 impl->stop();
1004 }
1005
1006 void RGWAsioFrontend::join()
1007 {
1008 impl->join();
1009 }
1010
1011 void RGWAsioFrontend::pause_for_new_config()
1012 {
1013 impl->pause();
1014 }
1015
1016 void RGWAsioFrontend::unpause_with_new_config(
1017 rgw::sal::RGWRadosStore* const store,
1018 rgw_auth_registry_ptr_t auth_registry
1019 ) {
1020 impl->unpause(store, std::move(auth_registry));
1021 }