]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <atomic>
5 #include <ctime>
6 #include <thread>
7 #include <vector>
8
9 #include <boost/asio.hpp>
10 #include <boost/intrusive/list.hpp>
11 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
12
13 #include <boost/context/protected_fixedsize_stack.hpp>
14 #include <spawn/spawn.hpp>
15
16 #include "common/async/shared_mutex.h"
17 #include "common/errno.h"
18 #include "common/strtol.h"
19
20 #include "rgw_asio_client.h"
21 #include "rgw_asio_frontend.h"
22
23 #ifdef WITH_RADOSGW_BEAST_OPENSSL
24 #include <boost/asio/ssl.hpp>
25 #endif
26
27 #include "common/split.h"
28
29 #include "services/svc_config_key.h"
30 #include "services/svc_zone.h"
31
32 #include "rgw_zone.h"
33
34 #include "rgw_asio_frontend_timer.h"
35 #include "rgw_dmclock_async_scheduler.h"
36
37 #define dout_subsys ceph_subsys_rgw
38
39 namespace {
40
41 using tcp = boost::asio::ip::tcp;
42 namespace http = boost::beast::http;
43 #ifdef WITH_RADOSGW_BEAST_OPENSSL
44 namespace ssl = boost::asio::ssl;
45 #endif
46
47 struct Connection;
48
49 // use explicit executor types instead of the type-erased boost::asio::executor
50 using executor_type = boost::asio::io_context::executor_type;
51
52 using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>;
53 using tcp_stream = boost::beast::basic_stream<tcp, executor_type>;
54
55 using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock,
56 executor_type, Connection>;
57
58 static constexpr size_t parse_buffer_size = 65536;
59 using parse_buffer = boost::beast::flat_static_buffer<parse_buffer_size>;
60
61 // use mmap/mprotect to allocate 512k coroutine stacks
62 auto make_stack_allocator() {
63 return boost::context::protected_fixedsize_stack{512*1024};
64 }
65
66 using namespace std;
67
68 template <typename Stream>
69 class StreamIO : public rgw::asio::ClientIO {
70 CephContext* const cct;
71 Stream& stream;
72 timeout_timer& timeout;
73 yield_context yield;
74 parse_buffer& buffer;
75 boost::system::error_code fatal_ec;
76 public:
77 StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
78 rgw::asio::parser_type& parser, yield_context yield,
79 parse_buffer& buffer, bool is_ssl,
80 const tcp::endpoint& local_endpoint,
81 const tcp::endpoint& remote_endpoint)
82 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
83 cct(cct), stream(stream), timeout(timeout), yield(yield),
84 buffer(buffer)
85 {}
86
87 boost::system::error_code get_fatal_error_code() const { return fatal_ec; }
88
89 size_t write_data(const char* buf, size_t len) override {
90 boost::system::error_code ec;
91 timeout.start();
92 auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
93 yield[ec]);
94 timeout.cancel();
95 if (ec) {
96 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
97 if (ec == boost::asio::error::broken_pipe) {
98 boost::system::error_code ec_ignored;
99 stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored);
100 }
101 if (!fatal_ec) {
102 fatal_ec = ec;
103 }
104 throw rgw::io::Exception(ec.value(), std::system_category());
105 }
106 return bytes;
107 }
108
109 size_t recv_body(char* buf, size_t max) override {
110 auto& message = parser.get();
111 auto& body_remaining = message.body();
112 body_remaining.data = buf;
113 body_remaining.size = max;
114
115 while (body_remaining.size && !parser.is_done()) {
116 boost::system::error_code ec;
117 timeout.start();
118 http::async_read_some(stream, buffer, parser, yield[ec]);
119 timeout.cancel();
120 if (ec == http::error::need_buffer) {
121 break;
122 }
123 if (ec) {
124 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
125 if (!fatal_ec) {
126 fatal_ec = ec;
127 }
128 throw rgw::io::Exception(ec.value(), std::system_category());
129 }
130 }
131 return max - body_remaining.size;
132 }
133 };
134
135 // output the http version as a string, ie 'HTTP/1.1'
136 struct http_version {
137 unsigned major_ver;
138 unsigned minor_ver;
139 explicit http_version(unsigned version)
140 : major_ver(version / 10), minor_ver(version % 10) {}
141 };
142 std::ostream& operator<<(std::ostream& out, const http_version& v) {
143 return out << "HTTP/" << v.major_ver << '.' << v.minor_ver;
144 }
145
146 // log an http header value or '-' if it's missing
147 struct log_header {
148 const http::fields& fields;
149 http::field field;
150 std::string_view quote;
151 log_header(const http::fields& fields, http::field field,
152 std::string_view quote = "")
153 : fields(fields), field(field), quote(quote) {}
154 };
155 std::ostream& operator<<(std::ostream& out, const log_header& h) {
156 auto p = h.fields.find(h.field);
157 if (p == h.fields.end()) {
158 return out << '-';
159 }
160 return out << h.quote << p->value() << h.quote;
161 }
162
163 // log fractional seconds in milliseconds
164 struct log_ms_remainder {
165 ceph::coarse_real_time t;
166 log_ms_remainder(ceph::coarse_real_time t) : t(t) {}
167 };
168 std::ostream& operator<<(std::ostream& out, const log_ms_remainder& m) {
169 using namespace std::chrono;
170 return out << std::setfill('0') << std::setw(3)
171 << duration_cast<milliseconds>(m.t.time_since_epoch()).count() % 1000;
172 }
173
174 // log time in apache format: day/month/year:hour:minute:second zone
175 struct log_apache_time {
176 ceph::coarse_real_time t;
177 log_apache_time(ceph::coarse_real_time t) : t(t) {}
178 };
179 std::ostream& operator<<(std::ostream& out, const log_apache_time& a) {
180 const auto t = ceph::coarse_real_clock::to_time_t(a.t);
181 const auto local = std::localtime(&t);
182 return out << std::put_time(local, "%d/%b/%Y:%T.") << log_ms_remainder{a.t}
183 << std::put_time(local, " %z");
184 };
185
186 using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
187
188 template <typename Stream>
189 void handle_connection(boost::asio::io_context& context,
190 RGWProcessEnv& env, Stream& stream,
191 timeout_timer& timeout, size_t header_limit,
192 parse_buffer& buffer, bool is_ssl,
193 SharedMutex& pause_mutex,
194 rgw::dmclock::Scheduler *scheduler,
195 const std::string& uri_prefix,
196 boost::system::error_code& ec,
197 yield_context yield)
198 {
199 // don't impose a limit on the body, since we read it in pieces
200 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
201
202 auto cct = env.driver->ctx();
203
204 // read messages from the stream until eof
205 for (;;) {
206 // configure the parser
207 rgw::asio::parser_type parser;
208 parser.header_limit(header_limit);
209 parser.body_limit(body_limit);
210 timeout.start();
211 // parse the header
212 http::async_read_header(stream, buffer, parser, yield[ec]);
213 timeout.cancel();
214 if (ec == boost::asio::error::connection_reset ||
215 ec == boost::asio::error::bad_descriptor ||
216 ec == boost::asio::error::operation_aborted ||
217 #ifdef WITH_RADOSGW_BEAST_OPENSSL
218 ec == ssl::error::stream_truncated ||
219 #endif
220 ec == http::error::end_of_stream) {
221 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
222 return;
223 }
224 auto& message = parser.get();
225 if (ec) {
226 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
227 http::response<http::empty_body> response;
228 response.result(http::status::bad_request);
229 response.version(message.version() == 10 ? 10 : 11);
230 response.prepare_payload();
231 timeout.start();
232 http::async_write(stream, response, yield[ec]);
233 timeout.cancel();
234 if (ec) {
235 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
236 }
237 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
238 return;
239 }
240
241 bool expect_continue = (message[http::field::expect] == "100-continue");
242
243 {
244 auto lock = pause_mutex.async_lock_shared(yield[ec]);
245 if (ec == boost::asio::error::operation_aborted) {
246 return;
247 } else if (ec) {
248 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
249 return;
250 }
251
252 // process the request
253 RGWRequest req{env.driver->get_new_req_id()};
254
255 auto& socket = stream.lowest_layer();
256 const auto& remote_endpoint = socket.remote_endpoint(ec);
257 if (ec) {
258 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
259 return;
260 }
261 const auto& local_endpoint = socket.local_endpoint(ec);
262 if (ec) {
263 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
264 return;
265 }
266
267 StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
268 is_ssl, local_endpoint, remote_endpoint};
269
270 auto real_client_io = rgw::io::add_reordering(
271 rgw::io::add_buffering(cct,
272 rgw::io::add_chunking(
273 rgw::io::add_conlen_controlling(
274 &real_client))));
275 RGWRestfulIO client(cct, &real_client_io);
276 optional_yield y = null_yield;
277 if (cct->_conf->rgw_beast_enable_async) {
278 y = optional_yield{context, yield};
279 }
280 int http_ret = 0;
281 string user = "-";
282 const auto started = ceph::coarse_real_clock::now();
283 ceph::coarse_real_clock::duration latency{};
284 process_request(env, &req, uri_prefix, &client, y,
285 scheduler, &user, &latency, &http_ret);
286
287 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw_access, 1)) {
288 // access log line elements begin per Apache Combined Log Format with additions following
289 lsubdout(cct, rgw_access, 1) << "beast: " << std::hex << &req << std::dec << ": "
290 << remote_endpoint.address() << " - " << user << " [" << log_apache_time{started} << "] \""
291 << message.method_string() << ' ' << message.target() << ' '
292 << http_version{message.version()} << "\" " << http_ret << ' '
293 << client.get_bytes_sent() + client.get_bytes_received() << ' '
294 << log_header{message, http::field::referer, "\""} << ' '
295 << log_header{message, http::field::user_agent, "\""} << ' '
296 << log_header{message, http::field::range} << " latency="
297 << latency << dendl;
298 }
299
300 // process_request() can't distinguish between connection errors and
301 // http/s3 errors, so check StreamIO for fatal connection errors
302 ec = real_client.get_fatal_error_code();
303 if (ec) {
304 return;
305 }
306
307 if (real_client.sent_100_continue()) {
308 expect_continue = false;
309 }
310 }
311
312 if (!parser.keep_alive()) {
313 return;
314 }
315
316 // if we failed before reading the entire message, discard any remaining
317 // bytes before reading the next
318 while (!expect_continue && !parser.is_done()) {
319 static std::array<char, 1024> discard_buffer;
320
321 auto& body = parser.get().body();
322 body.size = discard_buffer.size();
323 body.data = discard_buffer.data();
324
325 timeout.start();
326 http::async_read_some(stream, buffer, parser, yield[ec]);
327 timeout.cancel();
328 if (ec == http::error::need_buffer) {
329 continue;
330 }
331 if (ec == boost::asio::error::connection_reset) {
332 return;
333 }
334 if (ec) {
335 ldout(cct, 5) << "failed to discard unread message: "
336 << ec.message() << dendl;
337 return;
338 }
339 }
340 }
341 }
342
343 // timeout support requires that connections are reference-counted, because the
344 // timeout_handler can outlive the coroutine
345 struct Connection : boost::intrusive::list_base_hook<>,
346 boost::intrusive_ref_counter<Connection>
347 {
348 tcp_socket socket;
349 parse_buffer buffer;
350
351 explicit Connection(tcp_socket&& socket) noexcept
352 : socket(std::move(socket)) {}
353
354 void close(boost::system::error_code& ec) {
355 socket.close(ec);
356 }
357
358 tcp_socket& get_socket() { return socket; }
359 };
360
361 class ConnectionList {
362 using List = boost::intrusive::list<Connection>;
363 List connections;
364 std::mutex mutex;
365
366 void remove(Connection& c) {
367 std::lock_guard lock{mutex};
368 if (c.is_linked()) {
369 connections.erase(List::s_iterator_to(c));
370 }
371 }
372 public:
373 class Guard {
374 ConnectionList *list;
375 Connection *conn;
376 public:
377 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
378 ~Guard() { list->remove(*conn); }
379 };
380 [[nodiscard]] Guard add(Connection& conn) {
381 std::lock_guard lock{mutex};
382 connections.push_back(conn);
383 return Guard{this, &conn};
384 }
385 void close(boost::system::error_code& ec) {
386 std::lock_guard lock{mutex};
387 for (auto& conn : connections) {
388 conn.socket.close(ec);
389 }
390 connections.clear();
391 }
392 };
393
394 namespace dmc = rgw::dmclock;
395 class AsioFrontend {
396 RGWProcessEnv& env;
397 RGWFrontendConfig* conf;
398 boost::asio::io_context context;
399 std::string uri_prefix;
400 ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
401 size_t header_limit = 16384;
402 #ifdef WITH_RADOSGW_BEAST_OPENSSL
403 boost::optional<ssl::context> ssl_context;
404 int get_config_key_val(string name,
405 const string& type,
406 bufferlist *pbl);
407 int ssl_set_private_key(const string& name, bool is_ssl_cert);
408 int ssl_set_certificate_chain(const string& name);
409 int init_ssl();
410 #endif
411 SharedMutex pause_mutex;
412 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
413
414 struct Listener {
415 tcp::endpoint endpoint;
416 tcp::acceptor acceptor;
417 tcp_socket socket;
418 bool use_ssl = false;
419 bool use_nodelay = false;
420
421 explicit Listener(boost::asio::io_context& context)
422 : acceptor(context), socket(context) {}
423 };
424 std::vector<Listener> listeners;
425
426 ConnectionList connections;
427
428 // work guard to keep run() threads busy while listeners are paused
429 using Executor = boost::asio::io_context::executor_type;
430 std::optional<boost::asio::executor_work_guard<Executor>> work;
431
432 std::vector<std::thread> threads;
433 std::atomic<bool> going_down{false};
434
435 CephContext* ctx() const { return env.driver->ctx(); }
436 std::optional<dmc::ClientCounters> client_counters;
437 std::unique_ptr<dmc::ClientConfig> client_config;
438 void accept(Listener& listener, boost::system::error_code ec);
439
440 public:
441 AsioFrontend(RGWProcessEnv& env, RGWFrontendConfig* conf,
442 dmc::SchedulerCtx& sched_ctx)
443 : env(env), conf(conf), pause_mutex(context.get_executor())
444 {
445 auto sched_t = dmc::get_scheduler_t(ctx());
446 switch(sched_t){
447 case dmc::scheduler_t::dmclock:
448 scheduler.reset(new dmc::AsyncScheduler(ctx(),
449 context,
450 std::ref(sched_ctx.get_dmc_client_counters()),
451 sched_ctx.get_dmc_client_config(),
452 *sched_ctx.get_dmc_client_config(),
453 dmc::AtLimit::Reject));
454 break;
455 case dmc::scheduler_t::none:
456 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
457 [[fallthrough]];
458 case dmc::scheduler_t::throttler:
459 scheduler.reset(new dmc::SimpleThrottler(ctx()));
460
461 }
462 }
463
464 int init();
465 int run();
466 void stop();
467 void join();
468 void pause();
469 void unpause();
470 };
471
472 unsigned short parse_port(const char *input, boost::system::error_code& ec)
473 {
474 char *end = nullptr;
475 auto port = std::strtoul(input, &end, 10);
476 if (port > std::numeric_limits<unsigned short>::max()) {
477 ec.assign(ERANGE, boost::system::system_category());
478 } else if (port == 0 && end == input) {
479 ec.assign(EINVAL, boost::system::system_category());
480 }
481 return port;
482 }
483
484 tcp::endpoint parse_endpoint(boost::asio::string_view input,
485 unsigned short default_port,
486 boost::system::error_code& ec)
487 {
488 tcp::endpoint endpoint;
489
490 if (input.empty()) {
491 ec = boost::asio::error::invalid_argument;
492 return endpoint;
493 }
494
495 if (input[0] == '[') { // ipv6
496 const size_t addr_begin = 1;
497 const size_t addr_end = input.find(']');
498 if (addr_end == input.npos) { // no matching ]
499 ec = boost::asio::error::invalid_argument;
500 return endpoint;
501 }
502 if (addr_end + 1 < input.size()) {
503 // :port must must follow [ipv6]
504 if (input[addr_end + 1] != ':') {
505 ec = boost::asio::error::invalid_argument;
506 return endpoint;
507 } else {
508 auto port_str = input.substr(addr_end + 2);
509 endpoint.port(parse_port(port_str.data(), ec));
510 }
511 } else {
512 endpoint.port(default_port);
513 }
514 auto addr = input.substr(addr_begin, addr_end - addr_begin);
515 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
516 } else { // ipv4
517 auto colon = input.find(':');
518 if (colon != input.npos) {
519 auto port_str = input.substr(colon + 1);
520 endpoint.port(parse_port(port_str.data(), ec));
521 if (ec) {
522 return endpoint;
523 }
524 } else {
525 endpoint.port(default_port);
526 }
527 auto addr = input.substr(0, colon);
528 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
529 }
530 return endpoint;
531 }
532
533 static int drop_privileges(CephContext *ctx)
534 {
535 uid_t uid = ctx->get_set_uid();
536 gid_t gid = ctx->get_set_gid();
537 std::string uid_string = ctx->get_set_uid_string();
538 std::string gid_string = ctx->get_set_gid_string();
539 if (gid && setgid(gid) != 0) {
540 int err = errno;
541 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
542 return -err;
543 }
544 if (uid && setuid(uid) != 0) {
545 int err = errno;
546 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
547 return -err;
548 }
549 if (uid && gid) {
550 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
551 << " (" << uid_string << ":" << gid_string << ")" << dendl;
552 }
553 return 0;
554 }
555
556 int AsioFrontend::init()
557 {
558 boost::system::error_code ec;
559 auto& config = conf->get_config_map();
560
561 if (auto i = config.find("prefix"); i != config.end()) {
562 uri_prefix = i->second;
563 }
564
565 // Setting global timeout
566 auto timeout = config.find("request_timeout_ms");
567 if (timeout != config.end()) {
568 auto timeout_number = ceph::parse<uint64_t>(timeout->second);
569 if (timeout_number) {
570 request_timeout = std::chrono::milliseconds(*timeout_number);
571 } else {
572 lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
573 << timeout->second << " setting it to the default value: "
574 << REQUEST_TIMEOUT << dendl;
575 }
576 }
577
578 auto max_header_size = config.find("max_header_size");
579 if (max_header_size != config.end()) {
580 auto limit = ceph::parse<uint64_t>(max_header_size->second);
581 if (!limit) {
582 lderr(ctx()) << "WARNING: invalid value for max_header_size: "
583 << max_header_size->second << ", using the default value: "
584 << header_limit << dendl;
585 } else if (*limit > parse_buffer_size) { // can't exceed parse buffer size
586 header_limit = parse_buffer_size;
587 lderr(ctx()) << "WARNING: max_header_size " << max_header_size->second
588 << " capped at maximum value " << header_limit << dendl;
589 } else {
590 header_limit = *limit;
591 }
592 }
593
594 #ifdef WITH_RADOSGW_BEAST_OPENSSL
595 int r = init_ssl();
596 if (r < 0) {
597 return r;
598 }
599 #endif
600
601 // parse endpoints
602 auto ports = config.equal_range("port");
603 for (auto i = ports.first; i != ports.second; ++i) {
604 auto port = parse_port(i->second.c_str(), ec);
605 if (ec) {
606 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
607 return -ec.value();
608 }
609 listeners.emplace_back(context);
610 listeners.back().endpoint.port(port);
611
612 listeners.emplace_back(context);
613 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
614 }
615
616 auto endpoints = config.equal_range("endpoint");
617 for (auto i = endpoints.first; i != endpoints.second; ++i) {
618 auto endpoint = parse_endpoint(i->second, 80, ec);
619 if (ec) {
620 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
621 return -ec.value();
622 }
623 listeners.emplace_back(context);
624 listeners.back().endpoint = endpoint;
625 }
626 // parse tcp nodelay
627 auto nodelay = config.find("tcp_nodelay");
628 if (nodelay != config.end()) {
629 for (auto& l : listeners) {
630 l.use_nodelay = (nodelay->second == "1");
631 }
632 }
633
634
635 bool socket_bound = false;
636 // start listeners
637 for (auto& l : listeners) {
638 l.acceptor.open(l.endpoint.protocol(), ec);
639 if (ec) {
640 if (ec == boost::asio::error::address_family_not_supported) {
641 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
642 << ", " << ec.message() << dendl;
643 continue;
644 }
645
646 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
647 return -ec.value();
648 }
649
650 if (l.endpoint.protocol() == tcp::v6()) {
651 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
652 if (ec) {
653 lderr(ctx()) << "failed to set v6_only socket option: "
654 << ec.message() << dendl;
655 return -ec.value();
656 }
657 }
658
659 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
660 l.acceptor.bind(l.endpoint, ec);
661 if (ec) {
662 lderr(ctx()) << "failed to bind address " << l.endpoint
663 << ": " << ec.message() << dendl;
664 return -ec.value();
665 }
666
667 auto it = config.find("max_connection_backlog");
668 auto max_connection_backlog = boost::asio::socket_base::max_listen_connections;
669 if (it != config.end()) {
670 string err;
671 max_connection_backlog = strict_strtol(it->second.c_str(), 10, &err);
672 if (!err.empty()) {
673 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it->second << dendl;
674 max_connection_backlog = boost::asio::socket_base::max_listen_connections;
675 }
676 }
677 l.acceptor.listen(max_connection_backlog);
678 l.acceptor.async_accept(l.socket,
679 [this, &l] (boost::system::error_code ec) {
680 accept(l, ec);
681 });
682
683 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
684 socket_bound = true;
685 }
686 if (!socket_bound) {
687 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
688 return -EINVAL;
689 }
690
691 return drop_privileges(ctx());
692 }
693
694 #ifdef WITH_RADOSGW_BEAST_OPENSSL
695
696 static string config_val_prefix = "config://";
697
698 namespace {
699
700 class ExpandMetaVar {
701 map<string, string> meta_map;
702
703 public:
704 ExpandMetaVar(rgw::sal::Zone* zone_svc) {
705 meta_map["realm"] = zone_svc->get_realm_name();
706 meta_map["realm_id"] = zone_svc->get_realm_id();
707 meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
708 meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
709 meta_map["zone"] = zone_svc->get_name();
710 meta_map["zone_id"] = zone_svc->get_id();
711 }
712
713 string process_str(const string& in);
714 };
715
716 string ExpandMetaVar::process_str(const string& in)
717 {
718 if (meta_map.empty()) {
719 return in;
720 }
721
722 auto pos = in.find('$');
723 if (pos == std::string::npos) {
724 return in;
725 }
726
727 string out;
728 decltype(pos) last_pos = 0;
729
730 while (pos != std::string::npos) {
731 if (pos > last_pos) {
732 out += in.substr(last_pos, pos - last_pos);
733 }
734
735 string var;
736 const char *valid_chars = "abcdefghijklmnopqrstuvwxyz_";
737
738 size_t endpos = 0;
739 if (in[pos+1] == '{') {
740 // ...${foo_bar}...
741 endpos = in.find_first_not_of(valid_chars, pos + 2);
742 if (endpos != std::string::npos &&
743 in[endpos] == '}') {
744 var = in.substr(pos + 2, endpos - pos - 2);
745 endpos++;
746 }
747 } else {
748 // ...$foo...
749 endpos = in.find_first_not_of(valid_chars, pos + 1);
750 if (endpos != std::string::npos)
751 var = in.substr(pos + 1, endpos - pos - 1);
752 else
753 var = in.substr(pos + 1);
754 }
755 string var_source = in.substr(pos, endpos - pos);
756 last_pos = endpos;
757
758 auto iter = meta_map.find(var);
759 if (iter != meta_map.end()) {
760 out += iter->second;
761 } else {
762 out += var_source;
763 }
764 pos = in.find('$', last_pos);
765 }
766 if (last_pos != std::string::npos) {
767 out += in.substr(last_pos);
768 }
769
770 return out;
771 }
772
773 } /* anonymous namespace */
774
775 int AsioFrontend::get_config_key_val(string name,
776 const string& type,
777 bufferlist *pbl)
778 {
779 if (name.empty()) {
780 lderr(ctx()) << "bad " << type << " config value" << dendl;
781 return -EINVAL;
782 }
783
784 int r = env.driver->get_config_key_val(name, pbl);
785 if (r < 0) {
786 lderr(ctx()) << type << " was not found: " << name << dendl;
787 return r;
788 }
789 return 0;
790 }
791
792 int AsioFrontend::ssl_set_private_key(const string& name, bool is_ssl_certificate)
793 {
794 boost::system::error_code ec;
795
796 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
797 ssl_context->use_private_key_file(name, ssl::context::pem, ec);
798 } else {
799 bufferlist bl;
800 int r = get_config_key_val(name.substr(config_val_prefix.size()),
801 "ssl_private_key",
802 &bl);
803 if (r < 0) {
804 return r;
805 }
806 ssl_context->use_private_key(boost::asio::buffer(bl.c_str(), bl.length()),
807 ssl::context::pem, ec);
808 }
809
810 if (ec) {
811 if (!is_ssl_certificate) {
812 lderr(ctx()) << "failed to add ssl_private_key=" << name
813 << ": " << ec.message() << dendl;
814 } else {
815 lderr(ctx()) << "failed to use ssl_certificate=" << name
816 << " as a private key: " << ec.message() << dendl;
817 }
818 return -ec.value();
819 }
820
821 return 0;
822 }
823
824 int AsioFrontend::ssl_set_certificate_chain(const string& name)
825 {
826 boost::system::error_code ec;
827
828 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
829 ssl_context->use_certificate_chain_file(name, ec);
830 } else {
831 bufferlist bl;
832 int r = get_config_key_val(name.substr(config_val_prefix.size()),
833 "ssl_certificate",
834 &bl);
835 if (r < 0) {
836 return r;
837 }
838 ssl_context->use_certificate_chain(boost::asio::buffer(bl.c_str(), bl.length()),
839 ec);
840 }
841
842 if (ec) {
843 lderr(ctx()) << "failed to use ssl_certificate=" << name
844 << ": " << ec.message() << dendl;
845 return -ec.value();
846 }
847
848 return 0;
849 }
850
851 int AsioFrontend::init_ssl()
852 {
853 boost::system::error_code ec;
854 auto& config = conf->get_config_map();
855
856 // ssl configuration
857 std::optional<string> cert = conf->get_val("ssl_certificate");
858 if (cert) {
859 // only initialize the ssl context if it's going to be used
860 ssl_context = boost::in_place(ssl::context::tls);
861 }
862
863 std::optional<string> key = conf->get_val("ssl_private_key");
864 bool have_cert = false;
865
866 if (key && !cert) {
867 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
868 return -EINVAL;
869 }
870
871 std::optional<string> options = conf->get_val("ssl_options");
872 if (options) {
873 if (!cert) {
874 lderr(ctx()) << "no ssl_certificate configured for ssl_options" << dendl;
875 return -EINVAL;
876 }
877 } else if (cert) {
878 options = "no_sslv2:no_sslv3:no_tlsv1:no_tlsv1_1";
879 }
880
881 if (options) {
882 for (auto &option : ceph::split(*options, ":")) {
883 if (option == "default_workarounds") {
884 ssl_context->set_options(ssl::context::default_workarounds);
885 } else if (option == "no_compression") {
886 ssl_context->set_options(ssl::context::no_compression);
887 } else if (option == "no_sslv2") {
888 ssl_context->set_options(ssl::context::no_sslv2);
889 } else if (option == "no_sslv3") {
890 ssl_context->set_options(ssl::context::no_sslv3);
891 } else if (option == "no_tlsv1") {
892 ssl_context->set_options(ssl::context::no_tlsv1);
893 } else if (option == "no_tlsv1_1") {
894 ssl_context->set_options(ssl::context::no_tlsv1_1);
895 } else if (option == "no_tlsv1_2") {
896 ssl_context->set_options(ssl::context::no_tlsv1_2);
897 } else if (option == "single_dh_use") {
898 ssl_context->set_options(ssl::context::single_dh_use);
899 } else {
900 lderr(ctx()) << "ignoring unknown ssl option '" << option << "'" << dendl;
901 }
902 }
903 }
904
905 std::optional<string> ciphers = conf->get_val("ssl_ciphers");
906 if (ciphers) {
907 if (!cert) {
908 lderr(ctx()) << "no ssl_certificate configured for ssl_ciphers" << dendl;
909 return -EINVAL;
910 }
911
912 int r = SSL_CTX_set_cipher_list(ssl_context->native_handle(),
913 ciphers->c_str());
914 if (r == 0) {
915 lderr(ctx()) << "no cipher could be selected from ssl_ciphers: "
916 << *ciphers << dendl;
917 return -EINVAL;
918 }
919 }
920
921 auto ports = config.equal_range("ssl_port");
922 auto endpoints = config.equal_range("ssl_endpoint");
923
924 /*
925 * don't try to config certificate if frontend isn't configured for ssl
926 */
927 if (ports.first == ports.second &&
928 endpoints.first == endpoints.second) {
929 return 0;
930 }
931
932 bool key_is_cert = false;
933
934 if (cert) {
935 if (!key) {
936 key = cert;
937 key_is_cert = true;
938 }
939
940 ExpandMetaVar emv(env.driver->get_zone());
941
942 cert = emv.process_str(*cert);
943 key = emv.process_str(*key);
944
945 int r = ssl_set_private_key(*key, key_is_cert);
946 bool have_private_key = (r >= 0);
947 if (r < 0) {
948 if (!key_is_cert) {
949 r = ssl_set_private_key(*cert, true);
950 have_private_key = (r >= 0);
951 }
952 }
953
954 if (have_private_key) {
955 int r = ssl_set_certificate_chain(*cert);
956 have_cert = (r >= 0);
957 }
958 }
959
960 // parse ssl endpoints
961 for (auto i = ports.first; i != ports.second; ++i) {
962 if (!have_cert) {
963 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
964 return -EINVAL;
965 }
966 auto port = parse_port(i->second.c_str(), ec);
967 if (ec) {
968 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
969 return -ec.value();
970 }
971 listeners.emplace_back(context);
972 listeners.back().endpoint.port(port);
973 listeners.back().use_ssl = true;
974
975 listeners.emplace_back(context);
976 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
977 listeners.back().use_ssl = true;
978 }
979
980 for (auto i = endpoints.first; i != endpoints.second; ++i) {
981 if (!have_cert) {
982 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
983 return -EINVAL;
984 }
985 auto endpoint = parse_endpoint(i->second, 443, ec);
986 if (ec) {
987 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
988 return -ec.value();
989 }
990 listeners.emplace_back(context);
991 listeners.back().endpoint = endpoint;
992 listeners.back().use_ssl = true;
993 }
994 return 0;
995 }
996 #endif // WITH_RADOSGW_BEAST_OPENSSL
997
998 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
999 {
1000 if (!l.acceptor.is_open()) {
1001 return;
1002 } else if (ec == boost::asio::error::operation_aborted) {
1003 return;
1004 } else if (ec) {
1005 ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
1006 return;
1007 }
1008 auto stream = std::move(l.socket);
1009 stream.set_option(tcp::no_delay(l.use_nodelay), ec);
1010 l.acceptor.async_accept(l.socket,
1011 [this, &l] (boost::system::error_code ec) {
1012 accept(l, ec);
1013 });
1014
1015 // spawn a coroutine to handle the connection
1016 #ifdef WITH_RADOSGW_BEAST_OPENSSL
1017 if (l.use_ssl) {
1018 spawn::spawn(context,
1019 [this, s=std::move(stream)] (yield_context yield) mutable {
1020 auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
1021 auto c = connections.add(*conn);
1022 // wrap the tcp stream in an ssl stream
1023 boost::asio::ssl::stream<tcp_socket&> stream{conn->socket, *ssl_context};
1024 auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
1025 // do ssl handshake
1026 boost::system::error_code ec;
1027 timeout.start();
1028 auto bytes = stream.async_handshake(ssl::stream_base::server,
1029 conn->buffer.data(), yield[ec]);
1030 timeout.cancel();
1031 if (ec) {
1032 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
1033 return;
1034 }
1035 conn->buffer.consume(bytes);
1036 handle_connection(context, env, stream, timeout, header_limit,
1037 conn->buffer, true, pause_mutex, scheduler.get(),
1038 uri_prefix, ec, yield);
1039 if (!ec) {
1040 // ssl shutdown (ignoring errors)
1041 stream.async_shutdown(yield[ec]);
1042 }
1043 conn->socket.shutdown(tcp::socket::shutdown_both, ec);
1044 }, make_stack_allocator());
1045 } else {
1046 #else
1047 {
1048 #endif // WITH_RADOSGW_BEAST_OPENSSL
1049 spawn::spawn(context,
1050 [this, s=std::move(stream)] (yield_context yield) mutable {
1051 auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
1052 auto c = connections.add(*conn);
1053 auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
1054 boost::system::error_code ec;
1055 handle_connection(context, env, conn->socket, timeout, header_limit,
1056 conn->buffer, false, pause_mutex, scheduler.get(),
1057 uri_prefix, ec, yield);
1058 conn->socket.shutdown(tcp_socket::shutdown_both, ec);
1059 }, make_stack_allocator());
1060 }
1061 }
1062
1063 int AsioFrontend::run()
1064 {
1065 auto cct = ctx();
1066 const int thread_count = cct->_conf->rgw_thread_pool_size;
1067 threads.reserve(thread_count);
1068
1069 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
1070
1071 // the worker threads call io_context::run(), which will return when there's
1072 // no work left. hold a work guard to keep these threads going until join()
1073 work.emplace(boost::asio::make_work_guard(context));
1074
1075 for (int i = 0; i < thread_count; i++) {
1076 threads.emplace_back([this]() noexcept {
1077 // request warnings on synchronous librados calls in this thread
1078 is_asio_thread = true;
1079 // Have uncaught exceptions kill the process and give a
1080 // stacktrace, not be swallowed.
1081 context.run();
1082 });
1083 }
1084 return 0;
1085 }
1086
1087 void AsioFrontend::stop()
1088 {
1089 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
1090
1091 going_down = true;
1092
1093 boost::system::error_code ec;
1094 // close all listeners
1095 for (auto& listener : listeners) {
1096 listener.acceptor.close(ec);
1097 }
1098 // close all connections
1099 connections.close(ec);
1100 pause_mutex.cancel();
1101 }
1102
1103 void AsioFrontend::join()
1104 {
1105 if (!going_down) {
1106 stop();
1107 }
1108 work.reset();
1109
1110 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
1111 for (auto& thread : threads) {
1112 thread.join();
1113 }
1114 ldout(ctx(), 4) << "frontend done" << dendl;
1115 }
1116
1117 void AsioFrontend::pause()
1118 {
1119 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
1120
1121 // cancel pending calls to accept(), but don't close the sockets
1122 boost::system::error_code ec;
1123 for (auto& l : listeners) {
1124 l.acceptor.cancel(ec);
1125 }
1126
1127 // pause and wait for outstanding requests to complete
1128 pause_mutex.lock(ec);
1129
1130 if (ec) {
1131 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
1132 } else {
1133 ldout(ctx(), 4) << "frontend paused" << dendl;
1134 }
1135 }
1136
1137 void AsioFrontend::unpause()
1138 {
1139 // unpause to unblock connections
1140 pause_mutex.unlock();
1141
1142 // start accepting connections again
1143 for (auto& l : listeners) {
1144 l.acceptor.async_accept(l.socket,
1145 [this, &l] (boost::system::error_code ec) {
1146 accept(l, ec);
1147 });
1148 }
1149
1150 ldout(ctx(), 4) << "frontend unpaused" << dendl;
1151 }
1152
1153 } // anonymous namespace
1154
1155 class RGWAsioFrontend::Impl : public AsioFrontend {
1156 public:
1157 Impl(RGWProcessEnv& env, RGWFrontendConfig* conf,
1158 rgw::dmclock::SchedulerCtx& sched_ctx)
1159 : AsioFrontend(env, conf, sched_ctx) {}
1160 };
1161
1162 RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv& env,
1163 RGWFrontendConfig* conf,
1164 rgw::dmclock::SchedulerCtx& sched_ctx)
1165 : impl(new Impl(env, conf, sched_ctx))
1166 {
1167 }
1168
1169 RGWAsioFrontend::~RGWAsioFrontend() = default;
1170
1171 int RGWAsioFrontend::init()
1172 {
1173 return impl->init();
1174 }
1175
1176 int RGWAsioFrontend::run()
1177 {
1178 return impl->run();
1179 }
1180
1181 void RGWAsioFrontend::stop()
1182 {
1183 impl->stop();
1184 }
1185
1186 void RGWAsioFrontend::join()
1187 {
1188 impl->join();
1189 }
1190
1191 void RGWAsioFrontend::pause_for_new_config()
1192 {
1193 impl->pause();
1194 }
1195
1196 void RGWAsioFrontend::unpause_with_new_config()
1197 {
1198 impl->unpause();
1199 }