]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <atomic>
5 #include <thread>
6 #include <vector>
7
8 #include <boost/asio.hpp>
9 #include <boost/intrusive/list.hpp>
10
11 #include <boost/context/protected_fixedsize_stack.hpp>
12 #include <spawn/spawn.hpp>
13
14 #include "common/async/shared_mutex.h"
15 #include "common/errno.h"
16 #include "common/strtol.h"
17
18 #include "rgw_asio_client.h"
19 #include "rgw_asio_frontend.h"
20
21 #ifdef WITH_RADOSGW_BEAST_OPENSSL
22 #include <boost/asio/ssl.hpp>
23 #include <boost/beast/ssl/ssl_stream.hpp>
24
25 #include "services/svc_config_key.h"
26 #include "services/svc_zone.h"
27
28 #include "rgw_zone.h"
29
30 #endif
31
32 #include "rgw_dmclock_async_scheduler.h"
33
34 #define dout_subsys ceph_subsys_rgw
35
36 namespace {
37
38 using tcp = boost::asio::ip::tcp;
39 namespace http = boost::beast::http;
40 #ifdef WITH_RADOSGW_BEAST_OPENSSL
41 namespace ssl = boost::asio::ssl;
42 #endif
43
44 using parse_buffer = boost::beast::flat_static_buffer<65536>;
45
46 // use mmap/mprotect to allocate 512k coroutine stacks
47 auto make_stack_allocator() {
48 return boost::context::protected_fixedsize_stack{512*1024};
49 }
50
51 template <typename Stream>
52 class StreamIO : public rgw::asio::ClientIO {
53 CephContext* const cct;
54 Stream& stream;
55 spawn::yield_context yield;
56 parse_buffer& buffer;
57 ceph::timespan request_timeout;
58 public:
59 StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
60 spawn::yield_context yield,
61 parse_buffer& buffer, bool is_ssl,
62 const tcp::endpoint& local_endpoint,
63 const tcp::endpoint& remote_endpoint,
64 ceph::timespan request_timeout)
65 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
66 cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout)
67 {}
68
69 size_t write_data(const char* buf, size_t len) override {
70 boost::system::error_code ec;
71 auto& timeout = get_lowest_layer(stream);
72 if (request_timeout.count()) {
73 timeout.expires_after(request_timeout);
74 }
75 auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
76 yield[ec]);
77 if (ec) {
78 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
79 if (ec==boost::asio::error::broken_pipe) {
80 boost::system::error_code ec_ignored;
81 timeout.socket().shutdown(tcp::socket::shutdown_both, ec_ignored);
82 }
83 throw rgw::io::Exception(ec.value(), std::system_category());
84 }
85 return bytes;
86 }
87
88 size_t recv_body(char* buf, size_t max) override {
89 auto& timeout = get_lowest_layer(stream);
90 auto& message = parser.get();
91 auto& body_remaining = message.body();
92 body_remaining.data = buf;
93 body_remaining.size = max;
94
95 while (body_remaining.size && !parser.is_done()) {
96 boost::system::error_code ec;
97 if (request_timeout.count()) {
98 timeout.expires_after(request_timeout);
99 }
100 http::async_read_some(stream, buffer, parser, yield[ec]);
101 if (ec == http::error::need_buffer) {
102 break;
103 }
104 if (ec) {
105 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
106 throw rgw::io::Exception(ec.value(), std::system_category());
107 }
108 }
109 return max - body_remaining.size;
110 }
111 };
112
113 // output the http version as a string, ie 'HTTP/1.1'
114 struct http_version {
115 unsigned major_ver;
116 unsigned minor_ver;
117 explicit http_version(unsigned version)
118 : major_ver(version / 10), minor_ver(version % 10) {}
119 };
120 std::ostream& operator<<(std::ostream& out, const http_version& v) {
121 return out << "HTTP/" << v.major_ver << '.' << v.minor_ver;
122 }
123
124 // log an http header value or '-' if it's missing
125 struct log_header {
126 const http::fields& fields;
127 http::field field;
128 std::string_view quote;
129 log_header(const http::fields& fields, http::field field,
130 std::string_view quote = "")
131 : fields(fields), field(field), quote(quote) {}
132 };
133 std::ostream& operator<<(std::ostream& out, const log_header& h) {
134 auto p = h.fields.find(h.field);
135 if (p == h.fields.end()) {
136 return out << '-';
137 }
138 return out << h.quote << p->value() << h.quote;
139 }
140
141 using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
142
143 template <typename Stream>
144 void handle_connection(boost::asio::io_context& context,
145 RGWProcessEnv& env, Stream& stream,
146 parse_buffer& buffer, bool is_ssl,
147 SharedMutex& pause_mutex,
148 rgw::dmclock::Scheduler *scheduler,
149 boost::system::error_code& ec,
150 spawn::yield_context yield,
151 ceph::timespan request_timeout)
152 {
153 // limit header to 4k, since we read it all into a single flat_buffer
154 static constexpr size_t header_limit = 4096;
155 // don't impose a limit on the body, since we read it in pieces
156 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
157
158 auto cct = env.store->ctx();
159
160 // read messages from the stream until eof
161 for (;;) {
162 // configure the parser
163 rgw::asio::parser_type parser;
164 parser.header_limit(header_limit);
165 parser.body_limit(body_limit);
166 auto& timeout = get_lowest_layer(stream);
167 if (request_timeout.count()) {
168 timeout.expires_after(request_timeout);
169 }
170 // parse the header
171 http::async_read_header(stream, buffer, parser, yield[ec]);
172 if (ec == boost::asio::error::connection_reset ||
173 ec == boost::asio::error::bad_descriptor ||
174 ec == boost::asio::error::operation_aborted ||
175 #ifdef WITH_RADOSGW_BEAST_OPENSSL
176 ec == ssl::error::stream_truncated ||
177 #endif
178 ec == http::error::end_of_stream) {
179 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
180 return;
181 }
182 auto& message = parser.get();
183 if (ec) {
184 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
185 http::response<http::empty_body> response;
186 response.result(http::status::bad_request);
187 response.version(message.version() == 10 ? 10 : 11);
188 response.prepare_payload();
189 if (request_timeout.count()) {
190 timeout.expires_after(request_timeout);
191 }
192 http::async_write(stream, response, yield[ec]);
193 if (ec) {
194 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
195 }
196 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
197 return;
198 }
199
200 {
201 auto lock = pause_mutex.async_lock_shared(yield[ec]);
202 if (ec == boost::asio::error::operation_aborted) {
203 return;
204 } else if (ec) {
205 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
206 return;
207 }
208
209 // process the request
210 RGWRequest req{env.store->getRados()->get_new_req_id()};
211
212 auto& socket = get_lowest_layer(stream).socket();
213 const auto& remote_endpoint = socket.remote_endpoint(ec);
214 if (ec) {
215 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
216 return;
217 }
218
219 StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
220 socket.local_endpoint(),
221 remote_endpoint,request_timeout};
222
223 auto real_client_io = rgw::io::add_reordering(
224 rgw::io::add_buffering(cct,
225 rgw::io::add_chunking(
226 rgw::io::add_conlen_controlling(
227 &real_client))));
228 RGWRestfulIO client(cct, &real_client_io);
229 auto y = optional_yield{context, yield};
230 int http_ret = 0;
231 process_request(env.store, env.rest, &req, env.uri_prefix,
232 *env.auth_registry, &client, env.olog, y,
233 scheduler, &http_ret);
234
235 if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
236 // access log line elements begin per Apache Combined Log Format with additions following
237 const auto now = ceph::coarse_real_clock::now();
238 using ceph::operator<<; // for coarse_real_time
239 ldout(cct, 1) << "beast: " << hex << &req << dec << ": "
240 << remote_endpoint.address() << " - - [" << now << "] \""
241 << message.method_string() << ' ' << message.target() << ' '
242 << http_version{message.version()} << "\" " << http_ret << ' '
243 << client.get_bytes_sent() + client.get_bytes_received() << ' '
244 << log_header{message, http::field::referer, "\""} << ' '
245 << log_header{message, http::field::user_agent, "\""} << ' '
246 << log_header{message, http::field::range} << dendl;
247 }
248 }
249
250 if (!parser.keep_alive()) {
251 return;
252 }
253
254 // if we failed before reading the entire message, discard any remaining
255 // bytes before reading the next
256 while (!parser.is_done()) {
257 static std::array<char, 1024> discard_buffer;
258
259 auto& body = parser.get().body();
260 body.size = discard_buffer.size();
261 body.data = discard_buffer.data();
262
263 if (request_timeout.count()) {
264 timeout.expires_after(request_timeout);
265 }
266 http::async_read_some(stream, buffer, parser, yield[ec]);
267 if (ec == http::error::need_buffer) {
268 continue;
269 }
270 if (ec == boost::asio::error::connection_reset) {
271 return;
272 }
273 if (ec) {
274 ldout(cct, 5) << "failed to discard unread message: "
275 << ec.message() << dendl;
276 return;
277 }
278 }
279 }
280 }
281
282 struct Connection : boost::intrusive::list_base_hook<> {
283 tcp::socket& socket;
284 Connection(tcp::socket& socket) : socket(socket) {}
285 };
286
287 class ConnectionList {
288 using List = boost::intrusive::list<Connection>;
289 List connections;
290 std::mutex mutex;
291
292 void remove(Connection& c) {
293 std::lock_guard lock{mutex};
294 if (c.is_linked()) {
295 connections.erase(List::s_iterator_to(c));
296 }
297 }
298 public:
299 class Guard {
300 ConnectionList *list;
301 Connection *conn;
302 public:
303 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
304 ~Guard() { list->remove(*conn); }
305 };
306 [[nodiscard]] Guard add(Connection& conn) {
307 std::lock_guard lock{mutex};
308 connections.push_back(conn);
309 return Guard{this, &conn};
310 }
311 void close(boost::system::error_code& ec) {
312 std::lock_guard lock{mutex};
313 for (auto& conn : connections) {
314 conn.socket.close(ec);
315 }
316 connections.clear();
317 }
318 };
319
320 namespace dmc = rgw::dmclock;
321 class AsioFrontend {
322 RGWProcessEnv env;
323 RGWFrontendConfig* conf;
324 boost::asio::io_context context;
325 ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
326 #ifdef WITH_RADOSGW_BEAST_OPENSSL
327 boost::optional<ssl::context> ssl_context;
328 int get_config_key_val(string name,
329 const string& type,
330 bufferlist *pbl);
331 int ssl_set_private_key(const string& name, bool is_ssl_cert);
332 int ssl_set_certificate_chain(const string& name);
333 int init_ssl();
334 #endif
335 SharedMutex pause_mutex;
336 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
337
338 struct Listener {
339 tcp::endpoint endpoint;
340 tcp::acceptor acceptor;
341 tcp::socket socket;
342 bool use_ssl = false;
343 bool use_nodelay = false;
344
345 explicit Listener(boost::asio::io_context& context)
346 : acceptor(context), socket(context) {}
347 };
348 std::vector<Listener> listeners;
349
350 ConnectionList connections;
351
352 // work guard to keep run() threads busy while listeners are paused
353 using Executor = boost::asio::io_context::executor_type;
354 std::optional<boost::asio::executor_work_guard<Executor>> work;
355
356 std::vector<std::thread> threads;
357 std::atomic<bool> going_down{false};
358
359 CephContext* ctx() const { return env.store->ctx(); }
360 std::optional<dmc::ClientCounters> client_counters;
361 std::unique_ptr<dmc::ClientConfig> client_config;
362 void accept(Listener& listener, boost::system::error_code ec);
363
364 public:
365 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
366 dmc::SchedulerCtx& sched_ctx)
367 : env(env), conf(conf), pause_mutex(context.get_executor())
368 {
369 auto sched_t = dmc::get_scheduler_t(ctx());
370 switch(sched_t){
371 case dmc::scheduler_t::dmclock:
372 scheduler.reset(new dmc::AsyncScheduler(ctx(),
373 context,
374 std::ref(sched_ctx.get_dmc_client_counters()),
375 sched_ctx.get_dmc_client_config(),
376 *sched_ctx.get_dmc_client_config(),
377 dmc::AtLimit::Reject));
378 break;
379 case dmc::scheduler_t::none:
380 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
381 [[fallthrough]];
382 case dmc::scheduler_t::throttler:
383 scheduler.reset(new dmc::SimpleThrottler(ctx()));
384
385 }
386 }
387
388 int init();
389 int run();
390 void stop();
391 void join();
392 void pause();
393 void unpause(rgw::sal::RGWRadosStore* store, rgw_auth_registry_ptr_t);
394 };
395
396 unsigned short parse_port(const char *input, boost::system::error_code& ec)
397 {
398 char *end = nullptr;
399 auto port = std::strtoul(input, &end, 10);
400 if (port > std::numeric_limits<unsigned short>::max()) {
401 ec.assign(ERANGE, boost::system::system_category());
402 } else if (port == 0 && end == input) {
403 ec.assign(EINVAL, boost::system::system_category());
404 }
405 return port;
406 }
407
408 tcp::endpoint parse_endpoint(boost::asio::string_view input,
409 unsigned short default_port,
410 boost::system::error_code& ec)
411 {
412 tcp::endpoint endpoint;
413
414 if (input.empty()) {
415 ec = boost::asio::error::invalid_argument;
416 return endpoint;
417 }
418
419 if (input[0] == '[') { // ipv6
420 const size_t addr_begin = 1;
421 const size_t addr_end = input.find(']');
422 if (addr_end == input.npos) { // no matching ]
423 ec = boost::asio::error::invalid_argument;
424 return endpoint;
425 }
426 if (addr_end + 1 < input.size()) {
427 // :port must must follow [ipv6]
428 if (input[addr_end + 1] != ':') {
429 ec = boost::asio::error::invalid_argument;
430 return endpoint;
431 } else {
432 auto port_str = input.substr(addr_end + 2);
433 endpoint.port(parse_port(port_str.data(), ec));
434 }
435 } else {
436 endpoint.port(default_port);
437 }
438 auto addr = input.substr(addr_begin, addr_end - addr_begin);
439 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
440 } else { // ipv4
441 auto colon = input.find(':');
442 if (colon != input.npos) {
443 auto port_str = input.substr(colon + 1);
444 endpoint.port(parse_port(port_str.data(), ec));
445 if (ec) {
446 return endpoint;
447 }
448 } else {
449 endpoint.port(default_port);
450 }
451 auto addr = input.substr(0, colon);
452 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
453 }
454 return endpoint;
455 }
456
457 static int drop_privileges(CephContext *ctx)
458 {
459 uid_t uid = ctx->get_set_uid();
460 gid_t gid = ctx->get_set_gid();
461 std::string uid_string = ctx->get_set_uid_string();
462 std::string gid_string = ctx->get_set_gid_string();
463 if (gid && setgid(gid) != 0) {
464 int err = errno;
465 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
466 return -err;
467 }
468 if (uid && setuid(uid) != 0) {
469 int err = errno;
470 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
471 return -err;
472 }
473 if (uid && gid) {
474 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
475 << " (" << uid_string << ":" << gid_string << ")" << dendl;
476 }
477 return 0;
478 }
479
480 int AsioFrontend::init()
481 {
482 boost::system::error_code ec;
483 auto& config = conf->get_config_map();
484
485 // Setting global timeout
486 auto timeout = config.find("request_timeout_ms");
487 if (timeout != config.end()) {
488 auto timeout_number = ceph::parse<uint64_t>(timeout->second.data());
489 if (timeout_number) {
490 request_timeout = std::chrono::milliseconds(*timeout_number);
491 } else {
492 lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
493 << timeout->second.data() << " setting it to the default value: "
494 << REQUEST_TIMEOUT << dendl;
495 }
496 }
497 #ifdef WITH_RADOSGW_BEAST_OPENSSL
498 int r = init_ssl();
499 if (r < 0) {
500 return r;
501 }
502 #endif
503
504 // parse endpoints
505 auto ports = config.equal_range("port");
506 for (auto i = ports.first; i != ports.second; ++i) {
507 auto port = parse_port(i->second.c_str(), ec);
508 if (ec) {
509 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
510 return -ec.value();
511 }
512 listeners.emplace_back(context);
513 listeners.back().endpoint.port(port);
514
515 listeners.emplace_back(context);
516 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
517 }
518
519 auto endpoints = config.equal_range("endpoint");
520 for (auto i = endpoints.first; i != endpoints.second; ++i) {
521 auto endpoint = parse_endpoint(i->second, 80, ec);
522 if (ec) {
523 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
524 return -ec.value();
525 }
526 listeners.emplace_back(context);
527 listeners.back().endpoint = endpoint;
528 }
529 // parse tcp nodelay
530 auto nodelay = config.find("tcp_nodelay");
531 if (nodelay != config.end()) {
532 for (auto& l : listeners) {
533 l.use_nodelay = (nodelay->second == "1");
534 }
535 }
536
537
538 bool socket_bound = false;
539 // start listeners
540 for (auto& l : listeners) {
541 l.acceptor.open(l.endpoint.protocol(), ec);
542 if (ec) {
543 if (ec == boost::asio::error::address_family_not_supported) {
544 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
545 << ", " << ec.message() << dendl;
546 continue;
547 }
548
549 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
550 return -ec.value();
551 }
552
553 if (l.endpoint.protocol() == tcp::v6()) {
554 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
555 if (ec) {
556 lderr(ctx()) << "failed to set v6_only socket option: "
557 << ec.message() << dendl;
558 return -ec.value();
559 }
560 }
561
562 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
563 l.acceptor.bind(l.endpoint, ec);
564 if (ec) {
565 lderr(ctx()) << "failed to bind address " << l.endpoint
566 << ": " << ec.message() << dendl;
567 return -ec.value();
568 }
569
570 auto it = config.find("max_connection_backlog");
571 auto max_connection_backlog = boost::asio::socket_base::max_listen_connections;
572 if (it != config.end()) {
573 string err;
574 max_connection_backlog = strict_strtol(it->second.c_str(), 10, &err);
575 if (!err.empty()) {
576 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it->second << dendl;
577 max_connection_backlog = boost::asio::socket_base::max_listen_connections;
578 }
579 }
580 l.acceptor.listen(max_connection_backlog);
581 l.acceptor.async_accept(l.socket,
582 [this, &l] (boost::system::error_code ec) {
583 accept(l, ec);
584 });
585
586 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
587 socket_bound = true;
588 }
589 if (!socket_bound) {
590 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
591 return -EINVAL;
592 }
593
594 return drop_privileges(ctx());
595 }
596
597 #ifdef WITH_RADOSGW_BEAST_OPENSSL
598
599 static string config_val_prefix = "config://";
600
601 namespace {
602
603 class ExpandMetaVar {
604 map<string, string> meta_map;
605
606 public:
607 ExpandMetaVar(RGWSI_Zone *zone_svc) {
608 meta_map["realm"] = zone_svc->get_realm().get_name();
609 meta_map["realm_id"] = zone_svc->get_realm().get_id();
610 meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
611 meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
612 meta_map["zone"] = zone_svc->zone_name();
613 meta_map["zone_id"] = zone_svc->zone_id().id;
614 }
615
616 string process_str(const string& in);
617 };
618
619 string ExpandMetaVar::process_str(const string& in)
620 {
621 if (meta_map.empty()) {
622 return in;
623 }
624
625 auto pos = in.find('$');
626 if (pos == std::string::npos) {
627 return in;
628 }
629
630 string out;
631 decltype(pos) last_pos = 0;
632
633 while (pos != std::string::npos) {
634 if (pos > last_pos) {
635 out += in.substr(last_pos, pos - last_pos);
636 }
637
638 string var;
639 const char *valid_chars = "abcdefghijklmnopqrstuvwxyz_";
640
641 size_t endpos = 0;
642 if (in[pos+1] == '{') {
643 // ...${foo_bar}...
644 endpos = in.find_first_not_of(valid_chars, pos + 2);
645 if (endpos != std::string::npos &&
646 in[endpos] == '}') {
647 var = in.substr(pos + 2, endpos - pos - 2);
648 endpos++;
649 }
650 } else {
651 // ...$foo...
652 endpos = in.find_first_not_of(valid_chars, pos + 1);
653 if (endpos != std::string::npos)
654 var = in.substr(pos + 1, endpos - pos - 1);
655 else
656 var = in.substr(pos + 1);
657 }
658 string var_source = in.substr(pos, endpos - pos);
659 last_pos = endpos;
660
661 auto iter = meta_map.find(var);
662 if (iter != meta_map.end()) {
663 out += iter->second;
664 } else {
665 out += var_source;
666 }
667 pos = in.find('$', last_pos);
668 }
669 if (last_pos != std::string::npos) {
670 out += in.substr(last_pos);
671 }
672
673 return out;
674 }
675
676 } /* anonymous namespace */
677
678 int AsioFrontend::get_config_key_val(string name,
679 const string& type,
680 bufferlist *pbl)
681 {
682 if (name.empty()) {
683 lderr(ctx()) << "bad " << type << " config value" << dendl;
684 return -EINVAL;
685 }
686
687 auto svc = env.store->svc()->config_key;
688 int r = svc->get(name, true, pbl);
689 if (r < 0) {
690 lderr(ctx()) << type << " was not found: " << name << dendl;
691 return r;
692 }
693 return 0;
694 }
695
696 int AsioFrontend::ssl_set_private_key(const string& name, bool is_ssl_certificate)
697 {
698 boost::system::error_code ec;
699
700 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
701 ssl_context->use_private_key_file(name, ssl::context::pem, ec);
702 } else {
703 bufferlist bl;
704 int r = get_config_key_val(name.substr(config_val_prefix.size()),
705 "ssl_private_key",
706 &bl);
707 if (r < 0) {
708 return r;
709 }
710 ssl_context->use_private_key(boost::asio::buffer(bl.c_str(), bl.length()),
711 ssl::context::pem, ec);
712 }
713
714 if (ec) {
715 if (!is_ssl_certificate) {
716 lderr(ctx()) << "failed to add ssl_private_key=" << name
717 << ": " << ec.message() << dendl;
718 } else {
719 lderr(ctx()) << "failed to use ssl_certificate=" << name
720 << " as a private key: " << ec.message() << dendl;
721 }
722 return -ec.value();
723 }
724
725 return 0;
726 }
727
728 int AsioFrontend::ssl_set_certificate_chain(const string& name)
729 {
730 boost::system::error_code ec;
731
732 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
733 ssl_context->use_certificate_chain_file(name, ec);
734 } else {
735 bufferlist bl;
736 int r = get_config_key_val(name.substr(config_val_prefix.size()),
737 "ssl_certificate",
738 &bl);
739 if (r < 0) {
740 return r;
741 }
742 ssl_context->use_certificate_chain(boost::asio::buffer(bl.c_str(), bl.length()),
743 ec);
744 }
745
746 if (ec) {
747 lderr(ctx()) << "failed to use ssl_certificate=" << name
748 << ": " << ec.message() << dendl;
749 return -ec.value();
750 }
751
752 return 0;
753 }
754
755 int AsioFrontend::init_ssl()
756 {
757 boost::system::error_code ec;
758 auto& config = conf->get_config_map();
759
760 // ssl configuration
761 std::optional<string> cert = conf->get_val("ssl_certificate");
762 if (cert) {
763 // only initialize the ssl context if it's going to be used
764 ssl_context = boost::in_place(ssl::context::tls);
765 }
766
767 std::optional<string> key = conf->get_val("ssl_private_key");
768 bool have_cert = false;
769
770 if (key && !cert) {
771 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
772 return -EINVAL;
773 }
774
775 auto ports = config.equal_range("ssl_port");
776 auto endpoints = config.equal_range("ssl_endpoint");
777
778 /*
779 * don't try to config certificate if frontend isn't configured for ssl
780 */
781 if (ports.first == ports.second &&
782 endpoints.first == endpoints.second) {
783 return 0;
784 }
785
786 bool key_is_cert = false;
787
788 if (cert) {
789 if (!key) {
790 key = cert;
791 key_is_cert = true;
792 }
793
794 ExpandMetaVar emv(env.store->svc()->zone);
795
796 cert = emv.process_str(*cert);
797 key = emv.process_str(*key);
798
799 int r = ssl_set_private_key(*key, key_is_cert);
800 bool have_private_key = (r >= 0);
801 if (r < 0) {
802 if (!key_is_cert) {
803 r = ssl_set_private_key(*cert, true);
804 have_private_key = (r >= 0);
805 }
806 }
807
808 if (have_private_key) {
809 int r = ssl_set_certificate_chain(*cert);
810 have_cert = (r >= 0);
811 }
812 }
813
814 // parse ssl endpoints
815 for (auto i = ports.first; i != ports.second; ++i) {
816 if (!have_cert) {
817 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
818 return -EINVAL;
819 }
820 auto port = parse_port(i->second.c_str(), ec);
821 if (ec) {
822 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
823 return -ec.value();
824 }
825 listeners.emplace_back(context);
826 listeners.back().endpoint.port(port);
827 listeners.back().use_ssl = true;
828
829 listeners.emplace_back(context);
830 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
831 listeners.back().use_ssl = true;
832 }
833
834 for (auto i = endpoints.first; i != endpoints.second; ++i) {
835 if (!have_cert) {
836 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
837 return -EINVAL;
838 }
839 auto endpoint = parse_endpoint(i->second, 443, ec);
840 if (ec) {
841 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
842 return -ec.value();
843 }
844 listeners.emplace_back(context);
845 listeners.back().endpoint = endpoint;
846 listeners.back().use_ssl = true;
847 }
848 return 0;
849 }
850 #endif // WITH_RADOSGW_BEAST_OPENSSL
851
852 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
853 {
854 if (!l.acceptor.is_open()) {
855 return;
856 } else if (ec == boost::asio::error::operation_aborted) {
857 return;
858 } else if (ec) {
859 ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
860 return;
861 }
862 auto socket = std::move(l.socket);
863 tcp::no_delay options(l.use_nodelay);
864 socket.set_option(options,ec);
865 l.acceptor.async_accept(l.socket,
866 [this, &l] (boost::system::error_code ec) {
867 accept(l, ec);
868 });
869
870 boost::beast::tcp_stream stream(std::move(socket));
871 // spawn a coroutine to handle the connection
872 #ifdef WITH_RADOSGW_BEAST_OPENSSL
873 if (l.use_ssl) {
874 spawn::spawn(context,
875 [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
876 Connection conn{s.socket()};
877 auto c = connections.add(conn);
878 // wrap the tcp_stream in an ssl stream
879 boost::beast::ssl_stream<boost::beast::tcp_stream&> stream{s, *ssl_context};
880 auto buffer = std::make_unique<parse_buffer>();
881 // do ssl handshake
882 boost::system::error_code ec;
883 if (request_timeout.count()) {
884 get_lowest_layer(stream).expires_after(request_timeout);
885 }
886 auto bytes = stream.async_handshake(ssl::stream_base::server,
887 buffer->data(), yield[ec]);
888 if (ec) {
889 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
890 return;
891 }
892 buffer->consume(bytes);
893 handle_connection(context, env, stream, *buffer, true, pause_mutex,
894 scheduler.get(), ec, yield, request_timeout);
895 if (!ec) {
896 // ssl shutdown (ignoring errors)
897 stream.async_shutdown(yield[ec]);
898 }
899 s.socket().shutdown(tcp::socket::shutdown_both, ec);
900 }, make_stack_allocator());
901 } else {
902 #else
903 {
904 #endif // WITH_RADOSGW_BEAST_OPENSSL
905 spawn::spawn(context,
906 [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
907 Connection conn{s.socket()};
908 auto c = connections.add(conn);
909 auto buffer = std::make_unique<parse_buffer>();
910 boost::system::error_code ec;
911 handle_connection(context, env, s, *buffer, false, pause_mutex,
912 scheduler.get(), ec, yield, request_timeout);
913 s.socket().shutdown(tcp::socket::shutdown_both, ec);
914 }, make_stack_allocator());
915 }
916 }
917
918 int AsioFrontend::run()
919 {
920 auto cct = ctx();
921 const int thread_count = cct->_conf->rgw_thread_pool_size;
922 threads.reserve(thread_count);
923
924 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
925
926 // the worker threads call io_context::run(), which will return when there's
927 // no work left. hold a work guard to keep these threads going until join()
928 work.emplace(boost::asio::make_work_guard(context));
929
930 for (int i = 0; i < thread_count; i++) {
931 threads.emplace_back([=] {
932 // request warnings on synchronous librados calls in this thread
933 is_asio_thread = true;
934 boost::system::error_code ec;
935 context.run(ec);
936 });
937 }
938 return 0;
939 }
940
941 void AsioFrontend::stop()
942 {
943 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
944
945 going_down = true;
946
947 boost::system::error_code ec;
948 // close all listeners
949 for (auto& listener : listeners) {
950 listener.acceptor.close(ec);
951 }
952 // close all connections
953 connections.close(ec);
954 pause_mutex.cancel();
955 }
956
957 void AsioFrontend::join()
958 {
959 if (!going_down) {
960 stop();
961 }
962 work.reset();
963
964 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
965 for (auto& thread : threads) {
966 thread.join();
967 }
968 ldout(ctx(), 4) << "frontend done" << dendl;
969 }
970
971 void AsioFrontend::pause()
972 {
973 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
974
975 // cancel pending calls to accept(), but don't close the sockets
976 boost::system::error_code ec;
977 for (auto& l : listeners) {
978 l.acceptor.cancel(ec);
979 }
980
981 // pause and wait for outstanding requests to complete
982 pause_mutex.lock(ec);
983
984 if (ec) {
985 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
986 } else {
987 ldout(ctx(), 4) << "frontend paused" << dendl;
988 }
989 }
990
991 void AsioFrontend::unpause(rgw::sal::RGWRadosStore* const store,
992 rgw_auth_registry_ptr_t auth_registry)
993 {
994 env.store = store;
995 env.auth_registry = std::move(auth_registry);
996
997 // unpause to unblock connections
998 pause_mutex.unlock();
999
1000 // start accepting connections again
1001 for (auto& l : listeners) {
1002 l.acceptor.async_accept(l.socket,
1003 [this, &l] (boost::system::error_code ec) {
1004 accept(l, ec);
1005 });
1006 }
1007
1008 ldout(ctx(), 4) << "frontend unpaused" << dendl;
1009 }
1010
1011 } // anonymous namespace
1012
1013 class RGWAsioFrontend::Impl : public AsioFrontend {
1014 public:
1015 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
1016 rgw::dmclock::SchedulerCtx& sched_ctx)
1017 : AsioFrontend(env, conf, sched_ctx) {}
1018 };
1019
1020 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
1021 RGWFrontendConfig* conf,
1022 rgw::dmclock::SchedulerCtx& sched_ctx)
1023 : impl(new Impl(env, conf, sched_ctx))
1024 {
1025 }
1026
1027 RGWAsioFrontend::~RGWAsioFrontend() = default;
1028
1029 int RGWAsioFrontend::init()
1030 {
1031 return impl->init();
1032 }
1033
1034 int RGWAsioFrontend::run()
1035 {
1036 return impl->run();
1037 }
1038
1039 void RGWAsioFrontend::stop()
1040 {
1041 impl->stop();
1042 }
1043
1044 void RGWAsioFrontend::join()
1045 {
1046 impl->join();
1047 }
1048
1049 void RGWAsioFrontend::pause_for_new_config()
1050 {
1051 impl->pause();
1052 }
1053
1054 void RGWAsioFrontend::unpause_with_new_config(
1055 rgw::sal::RGWRadosStore* const store,
1056 rgw_auth_registry_ptr_t auth_registry
1057 ) {
1058 impl->unpause(store, std::move(auth_registry));
1059 }