]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_asio_frontend.cc
import quincy 17.2.0
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <atomic>
5 #include <ctime>
6 #include <thread>
7 #include <vector>
8
9 #include <boost/asio.hpp>
10 #include <boost/intrusive/list.hpp>
11 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
12
13 #include <boost/context/protected_fixedsize_stack.hpp>
14 #include <spawn/spawn.hpp>
15
16 #include "common/async/shared_mutex.h"
17 #include "common/errno.h"
18 #include "common/strtol.h"
19
20 #include "rgw_asio_client.h"
21 #include "rgw_asio_frontend.h"
22
23 #ifdef WITH_RADOSGW_BEAST_OPENSSL
24 #include <boost/asio/ssl.hpp>
25 #endif
26
27 #include "common/split.h"
28
29 #include "services/svc_config_key.h"
30 #include "services/svc_zone.h"
31
32 #include "rgw_zone.h"
33
34 #include "rgw_asio_frontend_timer.h"
35 #include "rgw_dmclock_async_scheduler.h"
36
37 #define dout_subsys ceph_subsys_rgw
38
39 namespace {
40
41 using tcp = boost::asio::ip::tcp;
42 namespace http = boost::beast::http;
43 #ifdef WITH_RADOSGW_BEAST_OPENSSL
44 namespace ssl = boost::asio::ssl;
45 #endif
46
47 struct Connection;
48
49 // use explicit executor types instead of the type-erased boost::asio::executor
50 using executor_type = boost::asio::io_context::executor_type;
51
52 using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>;
53 using tcp_stream = boost::beast::basic_stream<tcp, executor_type>;
54
55 using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock,
56 executor_type, Connection>;
57
58 static constexpr size_t parse_buffer_size = 65536;
59 using parse_buffer = boost::beast::flat_static_buffer<parse_buffer_size>;
60
61 // use mmap/mprotect to allocate 512k coroutine stacks
62 auto make_stack_allocator() {
63 return boost::context::protected_fixedsize_stack{512*1024};
64 }
65
66 using namespace std;
67
68 template <typename Stream>
69 class StreamIO : public rgw::asio::ClientIO {
70 CephContext* const cct;
71 Stream& stream;
72 timeout_timer& timeout;
73 yield_context yield;
74 parse_buffer& buffer;
75 public:
76 StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
77 rgw::asio::parser_type& parser, yield_context yield,
78 parse_buffer& buffer, bool is_ssl,
79 const tcp::endpoint& local_endpoint,
80 const tcp::endpoint& remote_endpoint)
81 : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
82 cct(cct), stream(stream), timeout(timeout), yield(yield),
83 buffer(buffer)
84 {}
85
86 size_t write_data(const char* buf, size_t len) override {
87 boost::system::error_code ec;
88 timeout.start();
89 auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
90 yield[ec]);
91 timeout.cancel();
92 if (ec) {
93 ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
94 if (ec == boost::asio::error::broken_pipe) {
95 boost::system::error_code ec_ignored;
96 stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored);
97 }
98 throw rgw::io::Exception(ec.value(), std::system_category());
99 }
100 return bytes;
101 }
102
103 size_t recv_body(char* buf, size_t max) override {
104 auto& message = parser.get();
105 auto& body_remaining = message.body();
106 body_remaining.data = buf;
107 body_remaining.size = max;
108
109 while (body_remaining.size && !parser.is_done()) {
110 boost::system::error_code ec;
111 timeout.start();
112 http::async_read_some(stream, buffer, parser, yield[ec]);
113 timeout.cancel();
114 if (ec == http::error::need_buffer) {
115 break;
116 }
117 if (ec) {
118 ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
119 throw rgw::io::Exception(ec.value(), std::system_category());
120 }
121 }
122 return max - body_remaining.size;
123 }
124 };
125
126 // output the http version as a string, ie 'HTTP/1.1'
127 struct http_version {
128 unsigned major_ver;
129 unsigned minor_ver;
130 explicit http_version(unsigned version)
131 : major_ver(version / 10), minor_ver(version % 10) {}
132 };
133 std::ostream& operator<<(std::ostream& out, const http_version& v) {
134 return out << "HTTP/" << v.major_ver << '.' << v.minor_ver;
135 }
136
137 // log an http header value or '-' if it's missing
138 struct log_header {
139 const http::fields& fields;
140 http::field field;
141 std::string_view quote;
142 log_header(const http::fields& fields, http::field field,
143 std::string_view quote = "")
144 : fields(fields), field(field), quote(quote) {}
145 };
146 std::ostream& operator<<(std::ostream& out, const log_header& h) {
147 auto p = h.fields.find(h.field);
148 if (p == h.fields.end()) {
149 return out << '-';
150 }
151 return out << h.quote << p->value() << h.quote;
152 }
153
154 // log fractional seconds in milliseconds
155 struct log_ms_remainder {
156 ceph::coarse_real_time t;
157 log_ms_remainder(ceph::coarse_real_time t) : t(t) {}
158 };
159 std::ostream& operator<<(std::ostream& out, const log_ms_remainder& m) {
160 using namespace std::chrono;
161 return out << std::setfill('0') << std::setw(3)
162 << duration_cast<milliseconds>(m.t.time_since_epoch()).count() % 1000;
163 }
164
165 // log time in apache format: day/month/year:hour:minute:second zone
166 struct log_apache_time {
167 ceph::coarse_real_time t;
168 log_apache_time(ceph::coarse_real_time t) : t(t) {}
169 };
170 std::ostream& operator<<(std::ostream& out, const log_apache_time& a) {
171 const auto t = ceph::coarse_real_clock::to_time_t(a.t);
172 const auto local = std::localtime(&t);
173 return out << std::put_time(local, "%d/%b/%Y:%T.") << log_ms_remainder{a.t}
174 << std::put_time(local, " %z");
175 };
176
177 using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
178
179 template <typename Stream>
180 void handle_connection(boost::asio::io_context& context,
181 RGWProcessEnv& env, Stream& stream,
182 timeout_timer& timeout, size_t header_limit,
183 parse_buffer& buffer, bool is_ssl,
184 SharedMutex& pause_mutex,
185 rgw::dmclock::Scheduler *scheduler,
186 boost::system::error_code& ec,
187 yield_context yield)
188 {
189 // don't impose a limit on the body, since we read it in pieces
190 static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
191
192 auto cct = env.store->ctx();
193
194 // read messages from the stream until eof
195 for (;;) {
196 // configure the parser
197 rgw::asio::parser_type parser;
198 parser.header_limit(header_limit);
199 parser.body_limit(body_limit);
200 timeout.start();
201 // parse the header
202 http::async_read_header(stream, buffer, parser, yield[ec]);
203 timeout.cancel();
204 if (ec == boost::asio::error::connection_reset ||
205 ec == boost::asio::error::bad_descriptor ||
206 ec == boost::asio::error::operation_aborted ||
207 #ifdef WITH_RADOSGW_BEAST_OPENSSL
208 ec == ssl::error::stream_truncated ||
209 #endif
210 ec == http::error::end_of_stream) {
211 ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
212 return;
213 }
214 auto& message = parser.get();
215 if (ec) {
216 ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
217 http::response<http::empty_body> response;
218 response.result(http::status::bad_request);
219 response.version(message.version() == 10 ? 10 : 11);
220 response.prepare_payload();
221 timeout.start();
222 http::async_write(stream, response, yield[ec]);
223 timeout.cancel();
224 if (ec) {
225 ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
226 }
227 ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
228 return;
229 }
230
231 {
232 auto lock = pause_mutex.async_lock_shared(yield[ec]);
233 if (ec == boost::asio::error::operation_aborted) {
234 return;
235 } else if (ec) {
236 ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
237 return;
238 }
239
240 // process the request
241 RGWRequest req{env.store->get_new_req_id()};
242
243 auto& socket = stream.lowest_layer();
244 const auto& remote_endpoint = socket.remote_endpoint(ec);
245 if (ec) {
246 ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
247 return;
248 }
249
250 StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
251 is_ssl, socket.local_endpoint(),
252 remote_endpoint};
253
254 auto real_client_io = rgw::io::add_reordering(
255 rgw::io::add_buffering(cct,
256 rgw::io::add_chunking(
257 rgw::io::add_conlen_controlling(
258 &real_client))));
259 RGWRestfulIO client(cct, &real_client_io);
260 optional_yield y = null_yield;
261 if (cct->_conf->rgw_beast_enable_async) {
262 y = optional_yield{context, yield};
263 }
264 int http_ret = 0;
265 string user = "-";
266 const auto started = ceph::coarse_real_clock::now();
267 ceph::coarse_real_clock::duration latency{};
268 process_request(env.store, env.rest, &req, env.uri_prefix,
269 *env.auth_registry, &client, env.olog, y,
270 scheduler, &user, &latency,
271 env.ratelimiting->get_active(),
272 &http_ret);
273
274 if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
275 // access log line elements begin per Apache Combined Log Format with additions following
276 ldout(cct, 1) << "beast: " << std::hex << &req << std::dec << ": "
277 << remote_endpoint.address() << " - " << user << " [" << log_apache_time{started} << "] \""
278 << message.method_string() << ' ' << message.target() << ' '
279 << http_version{message.version()} << "\" " << http_ret << ' '
280 << client.get_bytes_sent() + client.get_bytes_received() << ' '
281 << log_header{message, http::field::referer, "\""} << ' '
282 << log_header{message, http::field::user_agent, "\""} << ' '
283 << log_header{message, http::field::range} << " latency="
284 << latency << dendl;
285 }
286 }
287
288 if (!parser.keep_alive()) {
289 return;
290 }
291
292 // if we failed before reading the entire message, discard any remaining
293 // bytes before reading the next
294 while (!parser.is_done()) {
295 static std::array<char, 1024> discard_buffer;
296
297 auto& body = parser.get().body();
298 body.size = discard_buffer.size();
299 body.data = discard_buffer.data();
300
301 timeout.start();
302 http::async_read_some(stream, buffer, parser, yield[ec]);
303 timeout.cancel();
304 if (ec == http::error::need_buffer) {
305 continue;
306 }
307 if (ec == boost::asio::error::connection_reset) {
308 return;
309 }
310 if (ec) {
311 ldout(cct, 5) << "failed to discard unread message: "
312 << ec.message() << dendl;
313 return;
314 }
315 }
316 }
317 }
318
319 // timeout support requires that connections are reference-counted, because the
320 // timeout_handler can outlive the coroutine
321 struct Connection : boost::intrusive::list_base_hook<>,
322 boost::intrusive_ref_counter<Connection>
323 {
324 tcp_socket socket;
325 parse_buffer buffer;
326
327 explicit Connection(tcp_socket&& socket) noexcept
328 : socket(std::move(socket)) {}
329
330 void close(boost::system::error_code& ec) {
331 socket.close(ec);
332 }
333 };
334
335 class ConnectionList {
336 using List = boost::intrusive::list<Connection>;
337 List connections;
338 std::mutex mutex;
339
340 void remove(Connection& c) {
341 std::lock_guard lock{mutex};
342 if (c.is_linked()) {
343 connections.erase(List::s_iterator_to(c));
344 }
345 }
346 public:
347 class Guard {
348 ConnectionList *list;
349 Connection *conn;
350 public:
351 Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
352 ~Guard() { list->remove(*conn); }
353 };
354 [[nodiscard]] Guard add(Connection& conn) {
355 std::lock_guard lock{mutex};
356 connections.push_back(conn);
357 return Guard{this, &conn};
358 }
359 void close(boost::system::error_code& ec) {
360 std::lock_guard lock{mutex};
361 for (auto& conn : connections) {
362 conn.socket.close(ec);
363 }
364 connections.clear();
365 }
366 };
367
368 namespace dmc = rgw::dmclock;
369 class AsioFrontend {
370 RGWProcessEnv env;
371 RGWFrontendConfig* conf;
372 boost::asio::io_context context;
373 ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
374 size_t header_limit = 16384;
375 #ifdef WITH_RADOSGW_BEAST_OPENSSL
376 boost::optional<ssl::context> ssl_context;
377 int get_config_key_val(string name,
378 const string& type,
379 bufferlist *pbl);
380 int ssl_set_private_key(const string& name, bool is_ssl_cert);
381 int ssl_set_certificate_chain(const string& name);
382 int init_ssl();
383 #endif
384 SharedMutex pause_mutex;
385 std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
386
387 struct Listener {
388 tcp::endpoint endpoint;
389 tcp::acceptor acceptor;
390 tcp_socket socket;
391 bool use_ssl = false;
392 bool use_nodelay = false;
393
394 explicit Listener(boost::asio::io_context& context)
395 : acceptor(context), socket(context) {}
396 };
397 std::vector<Listener> listeners;
398
399 ConnectionList connections;
400
401 // work guard to keep run() threads busy while listeners are paused
402 using Executor = boost::asio::io_context::executor_type;
403 std::optional<boost::asio::executor_work_guard<Executor>> work;
404
405 std::vector<std::thread> threads;
406 std::atomic<bool> going_down{false};
407
408 CephContext* ctx() const { return env.store->ctx(); }
409 std::optional<dmc::ClientCounters> client_counters;
410 std::unique_ptr<dmc::ClientConfig> client_config;
411 void accept(Listener& listener, boost::system::error_code ec);
412
413 public:
414 AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
415 dmc::SchedulerCtx& sched_ctx)
416 : env(env), conf(conf), pause_mutex(context.get_executor())
417 {
418 auto sched_t = dmc::get_scheduler_t(ctx());
419 switch(sched_t){
420 case dmc::scheduler_t::dmclock:
421 scheduler.reset(new dmc::AsyncScheduler(ctx(),
422 context,
423 std::ref(sched_ctx.get_dmc_client_counters()),
424 sched_ctx.get_dmc_client_config(),
425 *sched_ctx.get_dmc_client_config(),
426 dmc::AtLimit::Reject));
427 break;
428 case dmc::scheduler_t::none:
429 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
430 [[fallthrough]];
431 case dmc::scheduler_t::throttler:
432 scheduler.reset(new dmc::SimpleThrottler(ctx()));
433
434 }
435 }
436
437 int init();
438 int run();
439 void stop();
440 void join();
441 void pause();
442 void unpause(rgw::sal::Store* store, rgw_auth_registry_ptr_t);
443 };
444
445 unsigned short parse_port(const char *input, boost::system::error_code& ec)
446 {
447 char *end = nullptr;
448 auto port = std::strtoul(input, &end, 10);
449 if (port > std::numeric_limits<unsigned short>::max()) {
450 ec.assign(ERANGE, boost::system::system_category());
451 } else if (port == 0 && end == input) {
452 ec.assign(EINVAL, boost::system::system_category());
453 }
454 return port;
455 }
456
457 tcp::endpoint parse_endpoint(boost::asio::string_view input,
458 unsigned short default_port,
459 boost::system::error_code& ec)
460 {
461 tcp::endpoint endpoint;
462
463 if (input.empty()) {
464 ec = boost::asio::error::invalid_argument;
465 return endpoint;
466 }
467
468 if (input[0] == '[') { // ipv6
469 const size_t addr_begin = 1;
470 const size_t addr_end = input.find(']');
471 if (addr_end == input.npos) { // no matching ]
472 ec = boost::asio::error::invalid_argument;
473 return endpoint;
474 }
475 if (addr_end + 1 < input.size()) {
476 // :port must must follow [ipv6]
477 if (input[addr_end + 1] != ':') {
478 ec = boost::asio::error::invalid_argument;
479 return endpoint;
480 } else {
481 auto port_str = input.substr(addr_end + 2);
482 endpoint.port(parse_port(port_str.data(), ec));
483 }
484 } else {
485 endpoint.port(default_port);
486 }
487 auto addr = input.substr(addr_begin, addr_end - addr_begin);
488 endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
489 } else { // ipv4
490 auto colon = input.find(':');
491 if (colon != input.npos) {
492 auto port_str = input.substr(colon + 1);
493 endpoint.port(parse_port(port_str.data(), ec));
494 if (ec) {
495 return endpoint;
496 }
497 } else {
498 endpoint.port(default_port);
499 }
500 auto addr = input.substr(0, colon);
501 endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
502 }
503 return endpoint;
504 }
505
506 static int drop_privileges(CephContext *ctx)
507 {
508 uid_t uid = ctx->get_set_uid();
509 gid_t gid = ctx->get_set_gid();
510 std::string uid_string = ctx->get_set_uid_string();
511 std::string gid_string = ctx->get_set_gid_string();
512 if (gid && setgid(gid) != 0) {
513 int err = errno;
514 ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
515 return -err;
516 }
517 if (uid && setuid(uid) != 0) {
518 int err = errno;
519 ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
520 return -err;
521 }
522 if (uid && gid) {
523 ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
524 << " (" << uid_string << ":" << gid_string << ")" << dendl;
525 }
526 return 0;
527 }
528
529 int AsioFrontend::init()
530 {
531 boost::system::error_code ec;
532 auto& config = conf->get_config_map();
533
534 // Setting global timeout
535 auto timeout = config.find("request_timeout_ms");
536 if (timeout != config.end()) {
537 auto timeout_number = ceph::parse<uint64_t>(timeout->second);
538 if (timeout_number) {
539 request_timeout = std::chrono::milliseconds(*timeout_number);
540 } else {
541 lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
542 << timeout->second << " setting it to the default value: "
543 << REQUEST_TIMEOUT << dendl;
544 }
545 }
546
547 auto max_header_size = config.find("max_header_size");
548 if (max_header_size != config.end()) {
549 auto limit = ceph::parse<uint64_t>(max_header_size->second);
550 if (!limit) {
551 lderr(ctx()) << "WARNING: invalid value for max_header_size: "
552 << max_header_size->second << ", using the default value: "
553 << header_limit << dendl;
554 } else if (*limit > parse_buffer_size) { // can't exceed parse buffer size
555 header_limit = parse_buffer_size;
556 lderr(ctx()) << "WARNING: max_header_size " << max_header_size->second
557 << " capped at maximum value " << header_limit << dendl;
558 } else {
559 header_limit = *limit;
560 }
561 }
562
563 #ifdef WITH_RADOSGW_BEAST_OPENSSL
564 int r = init_ssl();
565 if (r < 0) {
566 return r;
567 }
568 #endif
569
570 // parse endpoints
571 auto ports = config.equal_range("port");
572 for (auto i = ports.first; i != ports.second; ++i) {
573 auto port = parse_port(i->second.c_str(), ec);
574 if (ec) {
575 lderr(ctx()) << "failed to parse port=" << i->second << dendl;
576 return -ec.value();
577 }
578 listeners.emplace_back(context);
579 listeners.back().endpoint.port(port);
580
581 listeners.emplace_back(context);
582 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
583 }
584
585 auto endpoints = config.equal_range("endpoint");
586 for (auto i = endpoints.first; i != endpoints.second; ++i) {
587 auto endpoint = parse_endpoint(i->second, 80, ec);
588 if (ec) {
589 lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
590 return -ec.value();
591 }
592 listeners.emplace_back(context);
593 listeners.back().endpoint = endpoint;
594 }
595 // parse tcp nodelay
596 auto nodelay = config.find("tcp_nodelay");
597 if (nodelay != config.end()) {
598 for (auto& l : listeners) {
599 l.use_nodelay = (nodelay->second == "1");
600 }
601 }
602
603
604 bool socket_bound = false;
605 // start listeners
606 for (auto& l : listeners) {
607 l.acceptor.open(l.endpoint.protocol(), ec);
608 if (ec) {
609 if (ec == boost::asio::error::address_family_not_supported) {
610 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
611 << ", " << ec.message() << dendl;
612 continue;
613 }
614
615 lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
616 return -ec.value();
617 }
618
619 if (l.endpoint.protocol() == tcp::v6()) {
620 l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
621 if (ec) {
622 lderr(ctx()) << "failed to set v6_only socket option: "
623 << ec.message() << dendl;
624 return -ec.value();
625 }
626 }
627
628 l.acceptor.set_option(tcp::acceptor::reuse_address(true));
629 l.acceptor.bind(l.endpoint, ec);
630 if (ec) {
631 lderr(ctx()) << "failed to bind address " << l.endpoint
632 << ": " << ec.message() << dendl;
633 return -ec.value();
634 }
635
636 auto it = config.find("max_connection_backlog");
637 auto max_connection_backlog = boost::asio::socket_base::max_listen_connections;
638 if (it != config.end()) {
639 string err;
640 max_connection_backlog = strict_strtol(it->second.c_str(), 10, &err);
641 if (!err.empty()) {
642 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it->second << dendl;
643 max_connection_backlog = boost::asio::socket_base::max_listen_connections;
644 }
645 }
646 l.acceptor.listen(max_connection_backlog);
647 l.acceptor.async_accept(l.socket,
648 [this, &l] (boost::system::error_code ec) {
649 accept(l, ec);
650 });
651
652 ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
653 socket_bound = true;
654 }
655 if (!socket_bound) {
656 lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
657 return -EINVAL;
658 }
659
660 return drop_privileges(ctx());
661 }
662
663 #ifdef WITH_RADOSGW_BEAST_OPENSSL
664
665 static string config_val_prefix = "config://";
666
667 namespace {
668
669 class ExpandMetaVar {
670 map<string, string> meta_map;
671
672 public:
673 ExpandMetaVar(rgw::sal::Zone* zone_svc) {
674 meta_map["realm"] = zone_svc->get_realm().get_name();
675 meta_map["realm_id"] = zone_svc->get_realm().get_id();
676 meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
677 meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
678 meta_map["zone"] = zone_svc->get_name();
679 meta_map["zone_id"] = zone_svc->get_id().id;
680 }
681
682 string process_str(const string& in);
683 };
684
685 string ExpandMetaVar::process_str(const string& in)
686 {
687 if (meta_map.empty()) {
688 return in;
689 }
690
691 auto pos = in.find('$');
692 if (pos == std::string::npos) {
693 return in;
694 }
695
696 string out;
697 decltype(pos) last_pos = 0;
698
699 while (pos != std::string::npos) {
700 if (pos > last_pos) {
701 out += in.substr(last_pos, pos - last_pos);
702 }
703
704 string var;
705 const char *valid_chars = "abcdefghijklmnopqrstuvwxyz_";
706
707 size_t endpos = 0;
708 if (in[pos+1] == '{') {
709 // ...${foo_bar}...
710 endpos = in.find_first_not_of(valid_chars, pos + 2);
711 if (endpos != std::string::npos &&
712 in[endpos] == '}') {
713 var = in.substr(pos + 2, endpos - pos - 2);
714 endpos++;
715 }
716 } else {
717 // ...$foo...
718 endpos = in.find_first_not_of(valid_chars, pos + 1);
719 if (endpos != std::string::npos)
720 var = in.substr(pos + 1, endpos - pos - 1);
721 else
722 var = in.substr(pos + 1);
723 }
724 string var_source = in.substr(pos, endpos - pos);
725 last_pos = endpos;
726
727 auto iter = meta_map.find(var);
728 if (iter != meta_map.end()) {
729 out += iter->second;
730 } else {
731 out += var_source;
732 }
733 pos = in.find('$', last_pos);
734 }
735 if (last_pos != std::string::npos) {
736 out += in.substr(last_pos);
737 }
738
739 return out;
740 }
741
742 } /* anonymous namespace */
743
744 int AsioFrontend::get_config_key_val(string name,
745 const string& type,
746 bufferlist *pbl)
747 {
748 if (name.empty()) {
749 lderr(ctx()) << "bad " << type << " config value" << dendl;
750 return -EINVAL;
751 }
752
753 int r = env.store->get_config_key_val(name, pbl);
754 if (r < 0) {
755 lderr(ctx()) << type << " was not found: " << name << dendl;
756 return r;
757 }
758 return 0;
759 }
760
761 int AsioFrontend::ssl_set_private_key(const string& name, bool is_ssl_certificate)
762 {
763 boost::system::error_code ec;
764
765 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
766 ssl_context->use_private_key_file(name, ssl::context::pem, ec);
767 } else {
768 bufferlist bl;
769 int r = get_config_key_val(name.substr(config_val_prefix.size()),
770 "ssl_private_key",
771 &bl);
772 if (r < 0) {
773 return r;
774 }
775 ssl_context->use_private_key(boost::asio::buffer(bl.c_str(), bl.length()),
776 ssl::context::pem, ec);
777 }
778
779 if (ec) {
780 if (!is_ssl_certificate) {
781 lderr(ctx()) << "failed to add ssl_private_key=" << name
782 << ": " << ec.message() << dendl;
783 } else {
784 lderr(ctx()) << "failed to use ssl_certificate=" << name
785 << " as a private key: " << ec.message() << dendl;
786 }
787 return -ec.value();
788 }
789
790 return 0;
791 }
792
793 int AsioFrontend::ssl_set_certificate_chain(const string& name)
794 {
795 boost::system::error_code ec;
796
797 if (!boost::algorithm::starts_with(name, config_val_prefix)) {
798 ssl_context->use_certificate_chain_file(name, ec);
799 } else {
800 bufferlist bl;
801 int r = get_config_key_val(name.substr(config_val_prefix.size()),
802 "ssl_certificate",
803 &bl);
804 if (r < 0) {
805 return r;
806 }
807 ssl_context->use_certificate_chain(boost::asio::buffer(bl.c_str(), bl.length()),
808 ec);
809 }
810
811 if (ec) {
812 lderr(ctx()) << "failed to use ssl_certificate=" << name
813 << ": " << ec.message() << dendl;
814 return -ec.value();
815 }
816
817 return 0;
818 }
819
820 int AsioFrontend::init_ssl()
821 {
822 boost::system::error_code ec;
823 auto& config = conf->get_config_map();
824
825 // ssl configuration
826 std::optional<string> cert = conf->get_val("ssl_certificate");
827 if (cert) {
828 // only initialize the ssl context if it's going to be used
829 ssl_context = boost::in_place(ssl::context::tls);
830 }
831
832 std::optional<string> key = conf->get_val("ssl_private_key");
833 bool have_cert = false;
834
835 if (key && !cert) {
836 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
837 return -EINVAL;
838 }
839
840 std::optional<string> options = conf->get_val("ssl_options");
841 if (options) {
842 if (!cert) {
843 lderr(ctx()) << "no ssl_certificate configured for ssl_options" << dendl;
844 return -EINVAL;
845 }
846 } else if (cert) {
847 options = "no_sslv2:no_sslv3:no_tlsv1:no_tlsv1_1";
848 }
849
850 if (options) {
851 for (auto &option : ceph::split(*options, ":")) {
852 if (option == "default_workarounds") {
853 ssl_context->set_options(ssl::context::default_workarounds);
854 } else if (option == "no_compression") {
855 ssl_context->set_options(ssl::context::no_compression);
856 } else if (option == "no_sslv2") {
857 ssl_context->set_options(ssl::context::no_sslv2);
858 } else if (option == "no_sslv3") {
859 ssl_context->set_options(ssl::context::no_sslv3);
860 } else if (option == "no_tlsv1") {
861 ssl_context->set_options(ssl::context::no_tlsv1);
862 } else if (option == "no_tlsv1_1") {
863 ssl_context->set_options(ssl::context::no_tlsv1_1);
864 } else if (option == "no_tlsv1_2") {
865 ssl_context->set_options(ssl::context::no_tlsv1_2);
866 } else if (option == "single_dh_use") {
867 ssl_context->set_options(ssl::context::single_dh_use);
868 } else {
869 lderr(ctx()) << "ignoring unknown ssl option '" << option << "'" << dendl;
870 }
871 }
872 }
873
874 std::optional<string> ciphers = conf->get_val("ssl_ciphers");
875 if (ciphers) {
876 if (!cert) {
877 lderr(ctx()) << "no ssl_certificate configured for ssl_ciphers" << dendl;
878 return -EINVAL;
879 }
880
881 int r = SSL_CTX_set_cipher_list(ssl_context->native_handle(),
882 ciphers->c_str());
883 if (r == 0) {
884 lderr(ctx()) << "no cipher could be selected from ssl_ciphers: "
885 << *ciphers << dendl;
886 return -EINVAL;
887 }
888 }
889
890 auto ports = config.equal_range("ssl_port");
891 auto endpoints = config.equal_range("ssl_endpoint");
892
893 /*
894 * don't try to config certificate if frontend isn't configured for ssl
895 */
896 if (ports.first == ports.second &&
897 endpoints.first == endpoints.second) {
898 return 0;
899 }
900
901 bool key_is_cert = false;
902
903 if (cert) {
904 if (!key) {
905 key = cert;
906 key_is_cert = true;
907 }
908
909 ExpandMetaVar emv(env.store->get_zone());
910
911 cert = emv.process_str(*cert);
912 key = emv.process_str(*key);
913
914 int r = ssl_set_private_key(*key, key_is_cert);
915 bool have_private_key = (r >= 0);
916 if (r < 0) {
917 if (!key_is_cert) {
918 r = ssl_set_private_key(*cert, true);
919 have_private_key = (r >= 0);
920 }
921 }
922
923 if (have_private_key) {
924 int r = ssl_set_certificate_chain(*cert);
925 have_cert = (r >= 0);
926 }
927 }
928
929 // parse ssl endpoints
930 for (auto i = ports.first; i != ports.second; ++i) {
931 if (!have_cert) {
932 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
933 return -EINVAL;
934 }
935 auto port = parse_port(i->second.c_str(), ec);
936 if (ec) {
937 lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
938 return -ec.value();
939 }
940 listeners.emplace_back(context);
941 listeners.back().endpoint.port(port);
942 listeners.back().use_ssl = true;
943
944 listeners.emplace_back(context);
945 listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
946 listeners.back().use_ssl = true;
947 }
948
949 for (auto i = endpoints.first; i != endpoints.second; ++i) {
950 if (!have_cert) {
951 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
952 return -EINVAL;
953 }
954 auto endpoint = parse_endpoint(i->second, 443, ec);
955 if (ec) {
956 lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
957 return -ec.value();
958 }
959 listeners.emplace_back(context);
960 listeners.back().endpoint = endpoint;
961 listeners.back().use_ssl = true;
962 }
963 return 0;
964 }
965 #endif // WITH_RADOSGW_BEAST_OPENSSL
966
967 void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
968 {
969 if (!l.acceptor.is_open()) {
970 return;
971 } else if (ec == boost::asio::error::operation_aborted) {
972 return;
973 } else if (ec) {
974 ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
975 return;
976 }
977 auto stream = std::move(l.socket);
978 stream.set_option(tcp::no_delay(l.use_nodelay), ec);
979 l.acceptor.async_accept(l.socket,
980 [this, &l] (boost::system::error_code ec) {
981 accept(l, ec);
982 });
983
984 // spawn a coroutine to handle the connection
985 #ifdef WITH_RADOSGW_BEAST_OPENSSL
986 if (l.use_ssl) {
987 spawn::spawn(context,
988 [this, s=std::move(stream)] (yield_context yield) mutable {
989 auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
990 auto c = connections.add(*conn);
991 // wrap the tcp stream in an ssl stream
992 boost::asio::ssl::stream<tcp_socket&> stream{conn->socket, *ssl_context};
993 auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
994 // do ssl handshake
995 boost::system::error_code ec;
996 timeout.start();
997 auto bytes = stream.async_handshake(ssl::stream_base::server,
998 conn->buffer.data(), yield[ec]);
999 timeout.cancel();
1000 if (ec) {
1001 ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
1002 return;
1003 }
1004 conn->buffer.consume(bytes);
1005 handle_connection(context, env, stream, timeout, header_limit,
1006 conn->buffer, true, pause_mutex, scheduler.get(),
1007 ec, yield);
1008 if (!ec) {
1009 // ssl shutdown (ignoring errors)
1010 stream.async_shutdown(yield[ec]);
1011 }
1012 conn->socket.shutdown(tcp::socket::shutdown_both, ec);
1013 }, make_stack_allocator());
1014 } else {
1015 #else
1016 {
1017 #endif // WITH_RADOSGW_BEAST_OPENSSL
1018 spawn::spawn(context,
1019 [this, s=std::move(stream)] (yield_context yield) mutable {
1020 auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
1021 auto c = connections.add(*conn);
1022 auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
1023 boost::system::error_code ec;
1024 handle_connection(context, env, conn->socket, timeout, header_limit,
1025 conn->buffer, false, pause_mutex, scheduler.get(),
1026 ec, yield);
1027 conn->socket.shutdown(tcp_socket::shutdown_both, ec);
1028 }, make_stack_allocator());
1029 }
1030 }
1031
1032 int AsioFrontend::run()
1033 {
1034 auto cct = ctx();
1035 const int thread_count = cct->_conf->rgw_thread_pool_size;
1036 threads.reserve(thread_count);
1037
1038 ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
1039
1040 // the worker threads call io_context::run(), which will return when there's
1041 // no work left. hold a work guard to keep these threads going until join()
1042 work.emplace(boost::asio::make_work_guard(context));
1043
1044 for (int i = 0; i < thread_count; i++) {
1045 threads.emplace_back([=]() noexcept {
1046 // request warnings on synchronous librados calls in this thread
1047 is_asio_thread = true;
1048 // Have uncaught exceptions kill the process and give a
1049 // stacktrace, not be swallowed.
1050 context.run();
1051 });
1052 }
1053 return 0;
1054 }
1055
1056 void AsioFrontend::stop()
1057 {
1058 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl;
1059
1060 going_down = true;
1061
1062 boost::system::error_code ec;
1063 // close all listeners
1064 for (auto& listener : listeners) {
1065 listener.acceptor.close(ec);
1066 }
1067 // close all connections
1068 connections.close(ec);
1069 pause_mutex.cancel();
1070 }
1071
1072 void AsioFrontend::join()
1073 {
1074 if (!going_down) {
1075 stop();
1076 }
1077 work.reset();
1078
1079 ldout(ctx(), 4) << "frontend joining threads..." << dendl;
1080 for (auto& thread : threads) {
1081 thread.join();
1082 }
1083 ldout(ctx(), 4) << "frontend done" << dendl;
1084 }
1085
1086 void AsioFrontend::pause()
1087 {
1088 ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
1089
1090 // cancel pending calls to accept(), but don't close the sockets
1091 boost::system::error_code ec;
1092 for (auto& l : listeners) {
1093 l.acceptor.cancel(ec);
1094 }
1095
1096 // pause and wait for outstanding requests to complete
1097 pause_mutex.lock(ec);
1098
1099 if (ec) {
1100 ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
1101 } else {
1102 ldout(ctx(), 4) << "frontend paused" << dendl;
1103 }
1104 }
1105
1106 void AsioFrontend::unpause(rgw::sal::Store* const store,
1107 rgw_auth_registry_ptr_t auth_registry)
1108 {
1109 env.store = store;
1110 env.auth_registry = std::move(auth_registry);
1111
1112 // unpause to unblock connections
1113 pause_mutex.unlock();
1114
1115 // start accepting connections again
1116 for (auto& l : listeners) {
1117 l.acceptor.async_accept(l.socket,
1118 [this, &l] (boost::system::error_code ec) {
1119 accept(l, ec);
1120 });
1121 }
1122
1123 ldout(ctx(), 4) << "frontend unpaused" << dendl;
1124 }
1125
1126 } // anonymous namespace
1127
1128 class RGWAsioFrontend::Impl : public AsioFrontend {
1129 public:
1130 Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
1131 rgw::dmclock::SchedulerCtx& sched_ctx)
1132 : AsioFrontend(env, conf, sched_ctx) {}
1133 };
1134
1135 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
1136 RGWFrontendConfig* conf,
1137 rgw::dmclock::SchedulerCtx& sched_ctx)
1138 : impl(new Impl(env, conf, sched_ctx))
1139 {
1140 }
1141
1142 RGWAsioFrontend::~RGWAsioFrontend() = default;
1143
1144 int RGWAsioFrontend::init()
1145 {
1146 return impl->init();
1147 }
1148
1149 int RGWAsioFrontend::run()
1150 {
1151 return impl->run();
1152 }
1153
1154 void RGWAsioFrontend::stop()
1155 {
1156 impl->stop();
1157 }
1158
1159 void RGWAsioFrontend::join()
1160 {
1161 impl->join();
1162 }
1163
1164 void RGWAsioFrontend::pause_for_new_config()
1165 {
1166 impl->pause();
1167 }
1168
1169 void RGWAsioFrontend::unpause_with_new_config(
1170 rgw::sal::Store* const store,
1171 rgw_auth_registry_ptr_t auth_registry
1172 ) {
1173 impl->unpause(store, std::move(auth_registry));
1174 }