1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
9 #include <boost/asio.hpp>
10 #include <boost/intrusive/list.hpp>
11 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
13 #include <boost/context/protected_fixedsize_stack.hpp>
14 #include <spawn/spawn.hpp>
16 #include "common/async/shared_mutex.h"
17 #include "common/errno.h"
18 #include "common/strtol.h"
20 #include "rgw_asio_client.h"
21 #include "rgw_asio_frontend.h"
23 #ifdef WITH_RADOSGW_BEAST_OPENSSL
24 #include <boost/asio/ssl.hpp>
27 #include "common/split.h"
29 #include "services/svc_config_key.h"
30 #include "services/svc_zone.h"
34 #include "rgw_asio_frontend_timer.h"
35 #include "rgw_dmclock_async_scheduler.h"
37 #define dout_subsys ceph_subsys_rgw
41 using tcp
= boost::asio::ip::tcp
;
42 namespace http
= boost::beast::http
;
43 #ifdef WITH_RADOSGW_BEAST_OPENSSL
44 namespace ssl
= boost::asio::ssl
;
49 // use explicit executor types instead of the type-erased boost::asio::executor
50 using executor_type
= boost::asio::io_context::executor_type
;
52 using tcp_socket
= boost::asio::basic_stream_socket
<tcp
, executor_type
>;
53 using tcp_stream
= boost::beast::basic_stream
<tcp
, executor_type
>;
55 using timeout_timer
= rgw::basic_timeout_timer
<ceph::coarse_mono_clock
,
56 executor_type
, Connection
>;
58 static constexpr size_t parse_buffer_size
= 65536;
59 using parse_buffer
= boost::beast::flat_static_buffer
<parse_buffer_size
>;
61 // use mmap/mprotect to allocate 512k coroutine stacks
62 auto make_stack_allocator() {
63 return boost::context::protected_fixedsize_stack
{512*1024};
68 template <typename Stream
>
69 class StreamIO
: public rgw::asio::ClientIO
{
70 CephContext
* const cct
;
72 timeout_timer
& timeout
;
76 StreamIO(CephContext
*cct
, Stream
& stream
, timeout_timer
& timeout
,
77 rgw::asio::parser_type
& parser
, yield_context yield
,
78 parse_buffer
& buffer
, bool is_ssl
,
79 const tcp::endpoint
& local_endpoint
,
80 const tcp::endpoint
& remote_endpoint
)
81 : ClientIO(parser
, is_ssl
, local_endpoint
, remote_endpoint
),
82 cct(cct
), stream(stream
), timeout(timeout
), yield(yield
),
86 size_t write_data(const char* buf
, size_t len
) override
{
87 boost::system::error_code ec
;
89 auto bytes
= boost::asio::async_write(stream
, boost::asio::buffer(buf
, len
),
93 ldout(cct
, 4) << "write_data failed: " << ec
.message() << dendl
;
94 if (ec
== boost::asio::error::broken_pipe
) {
95 boost::system::error_code ec_ignored
;
96 stream
.lowest_layer().shutdown(tcp_socket::shutdown_both
, ec_ignored
);
98 throw rgw::io::Exception(ec
.value(), std::system_category());
103 size_t recv_body(char* buf
, size_t max
) override
{
104 auto& message
= parser
.get();
105 auto& body_remaining
= message
.body();
106 body_remaining
.data
= buf
;
107 body_remaining
.size
= max
;
109 while (body_remaining
.size
&& !parser
.is_done()) {
110 boost::system::error_code ec
;
112 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
114 if (ec
== http::error::need_buffer
) {
118 ldout(cct
, 4) << "failed to read body: " << ec
.message() << dendl
;
119 throw rgw::io::Exception(ec
.value(), std::system_category());
122 return max
- body_remaining
.size
;
126 // output the http version as a string, ie 'HTTP/1.1'
127 struct http_version
{
130 explicit http_version(unsigned version
)
131 : major_ver(version
/ 10), minor_ver(version
% 10) {}
133 std::ostream
& operator<<(std::ostream
& out
, const http_version
& v
) {
134 return out
<< "HTTP/" << v
.major_ver
<< '.' << v
.minor_ver
;
137 // log an http header value or '-' if it's missing
139 const http::fields
& fields
;
141 std::string_view quote
;
142 log_header(const http::fields
& fields
, http::field field
,
143 std::string_view quote
= "")
144 : fields(fields
), field(field
), quote(quote
) {}
146 std::ostream
& operator<<(std::ostream
& out
, const log_header
& h
) {
147 auto p
= h
.fields
.find(h
.field
);
148 if (p
== h
.fields
.end()) {
151 return out
<< h
.quote
<< p
->value() << h
.quote
;
154 // log fractional seconds in milliseconds
155 struct log_ms_remainder
{
156 ceph::coarse_real_time t
;
157 log_ms_remainder(ceph::coarse_real_time t
) : t(t
) {}
159 std::ostream
& operator<<(std::ostream
& out
, const log_ms_remainder
& m
) {
160 using namespace std::chrono
;
161 return out
<< std::setfill('0') << std::setw(3)
162 << duration_cast
<milliseconds
>(m
.t
.time_since_epoch()).count() % 1000;
165 // log time in apache format: day/month/year:hour:minute:second zone
166 struct log_apache_time
{
167 ceph::coarse_real_time t
;
168 log_apache_time(ceph::coarse_real_time t
) : t(t
) {}
170 std::ostream
& operator<<(std::ostream
& out
, const log_apache_time
& a
) {
171 const auto t
= ceph::coarse_real_clock::to_time_t(a
.t
);
172 const auto local
= std::localtime(&t
);
173 return out
<< std::put_time(local
, "%d/%b/%Y:%T.") << log_ms_remainder
{a
.t
}
174 << std::put_time(local
, " %z");
177 using SharedMutex
= ceph::async::SharedMutex
<boost::asio::io_context::executor_type
>;
179 template <typename Stream
>
180 void handle_connection(boost::asio::io_context
& context
,
181 RGWProcessEnv
& env
, Stream
& stream
,
182 timeout_timer
& timeout
, size_t header_limit
,
183 parse_buffer
& buffer
, bool is_ssl
,
184 SharedMutex
& pause_mutex
,
185 rgw::dmclock::Scheduler
*scheduler
,
186 boost::system::error_code
& ec
,
189 // don't impose a limit on the body, since we read it in pieces
190 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
192 auto cct
= env
.store
->ctx();
194 // read messages from the stream until eof
196 // configure the parser
197 rgw::asio::parser_type parser
;
198 parser
.header_limit(header_limit
);
199 parser
.body_limit(body_limit
);
202 http::async_read_header(stream
, buffer
, parser
, yield
[ec
]);
204 if (ec
== boost::asio::error::connection_reset
||
205 ec
== boost::asio::error::bad_descriptor
||
206 ec
== boost::asio::error::operation_aborted
||
207 #ifdef WITH_RADOSGW_BEAST_OPENSSL
208 ec
== ssl::error::stream_truncated
||
210 ec
== http::error::end_of_stream
) {
211 ldout(cct
, 20) << "failed to read header: " << ec
.message() << dendl
;
214 auto& message
= parser
.get();
216 ldout(cct
, 1) << "failed to read header: " << ec
.message() << dendl
;
217 http::response
<http::empty_body
> response
;
218 response
.result(http::status::bad_request
);
219 response
.version(message
.version() == 10 ? 10 : 11);
220 response
.prepare_payload();
222 http::async_write(stream
, response
, yield
[ec
]);
225 ldout(cct
, 5) << "failed to write response: " << ec
.message() << dendl
;
227 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
232 auto lock
= pause_mutex
.async_lock_shared(yield
[ec
]);
233 if (ec
== boost::asio::error::operation_aborted
) {
236 ldout(cct
, 1) << "failed to lock: " << ec
.message() << dendl
;
240 // process the request
241 RGWRequest req
{env
.store
->get_new_req_id()};
243 auto& socket
= stream
.lowest_layer();
244 const auto& remote_endpoint
= socket
.remote_endpoint(ec
);
246 ldout(cct
, 1) << "failed to connect client: " << ec
.message() << dendl
;
250 StreamIO real_client
{cct
, stream
, timeout
, parser
, yield
, buffer
,
251 is_ssl
, socket
.local_endpoint(),
254 auto real_client_io
= rgw::io::add_reordering(
255 rgw::io::add_buffering(cct
,
256 rgw::io::add_chunking(
257 rgw::io::add_conlen_controlling(
259 RGWRestfulIO
client(cct
, &real_client_io
);
260 optional_yield y
= null_yield
;
261 if (cct
->_conf
->rgw_beast_enable_async
) {
262 y
= optional_yield
{context
, yield
};
266 const auto started
= ceph::coarse_real_clock::now();
267 ceph::coarse_real_clock::duration latency
{};
268 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
269 *env
.auth_registry
, &client
, env
.olog
, y
,
270 scheduler
, &user
, &latency
,
271 env
.ratelimiting
->get_active(),
274 if (cct
->_conf
->subsys
.should_gather(dout_subsys
, 1)) {
275 // access log line elements begin per Apache Combined Log Format with additions following
276 ldout(cct
, 1) << "beast: " << std::hex
<< &req
<< std::dec
<< ": "
277 << remote_endpoint
.address() << " - " << user
<< " [" << log_apache_time
{started
} << "] \""
278 << message
.method_string() << ' ' << message
.target() << ' '
279 << http_version
{message
.version()} << "\" " << http_ret
<< ' '
280 << client
.get_bytes_sent() + client
.get_bytes_received() << ' '
281 << log_header
{message
, http::field::referer
, "\""} << ' '
282 << log_header
{message
, http::field::user_agent
, "\""} << ' '
283 << log_header
{message
, http::field::range
} << " latency="
288 if (!parser
.keep_alive()) {
292 // if we failed before reading the entire message, discard any remaining
293 // bytes before reading the next
294 while (!parser
.is_done()) {
295 static std::array
<char, 1024> discard_buffer
;
297 auto& body
= parser
.get().body();
298 body
.size
= discard_buffer
.size();
299 body
.data
= discard_buffer
.data();
302 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
304 if (ec
== http::error::need_buffer
) {
307 if (ec
== boost::asio::error::connection_reset
) {
311 ldout(cct
, 5) << "failed to discard unread message: "
312 << ec
.message() << dendl
;
319 // timeout support requires that connections are reference-counted, because the
320 // timeout_handler can outlive the coroutine
321 struct Connection
: boost::intrusive::list_base_hook
<>,
322 boost::intrusive_ref_counter
<Connection
>
327 explicit Connection(tcp_socket
&& socket
) noexcept
328 : socket(std::move(socket
)) {}
330 void close(boost::system::error_code
& ec
) {
335 class ConnectionList
{
336 using List
= boost::intrusive::list
<Connection
>;
340 void remove(Connection
& c
) {
341 std::lock_guard lock
{mutex
};
343 connections
.erase(List::s_iterator_to(c
));
348 ConnectionList
*list
;
351 Guard(ConnectionList
*list
, Connection
*conn
) : list(list
), conn(conn
) {}
352 ~Guard() { list
->remove(*conn
); }
354 [[nodiscard
]] Guard
add(Connection
& conn
) {
355 std::lock_guard lock
{mutex
};
356 connections
.push_back(conn
);
357 return Guard
{this, &conn
};
359 void close(boost::system::error_code
& ec
) {
360 std::lock_guard lock
{mutex
};
361 for (auto& conn
: connections
) {
362 conn
.socket
.close(ec
);
368 namespace dmc
= rgw::dmclock
;
371 RGWFrontendConfig
* conf
;
372 boost::asio::io_context context
;
373 ceph::timespan request_timeout
= std::chrono::milliseconds(REQUEST_TIMEOUT
);
374 size_t header_limit
= 16384;
375 #ifdef WITH_RADOSGW_BEAST_OPENSSL
376 boost::optional
<ssl::context
> ssl_context
;
377 int get_config_key_val(string name
,
380 int ssl_set_private_key(const string
& name
, bool is_ssl_cert
);
381 int ssl_set_certificate_chain(const string
& name
);
384 SharedMutex pause_mutex
;
385 std::unique_ptr
<rgw::dmclock::Scheduler
> scheduler
;
388 tcp::endpoint endpoint
;
389 tcp::acceptor acceptor
;
391 bool use_ssl
= false;
392 bool use_nodelay
= false;
394 explicit Listener(boost::asio::io_context
& context
)
395 : acceptor(context
), socket(context
) {}
397 std::vector
<Listener
> listeners
;
399 ConnectionList connections
;
401 // work guard to keep run() threads busy while listeners are paused
402 using Executor
= boost::asio::io_context::executor_type
;
403 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
405 std::vector
<std::thread
> threads
;
406 std::atomic
<bool> going_down
{false};
408 CephContext
* ctx() const { return env
.store
->ctx(); }
409 std::optional
<dmc::ClientCounters
> client_counters
;
410 std::unique_ptr
<dmc::ClientConfig
> client_config
;
411 void accept(Listener
& listener
, boost::system::error_code ec
);
414 AsioFrontend(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
415 dmc::SchedulerCtx
& sched_ctx
)
416 : env(env
), conf(conf
), pause_mutex(context
.get_executor())
418 auto sched_t
= dmc::get_scheduler_t(ctx());
420 case dmc::scheduler_t::dmclock
:
421 scheduler
.reset(new dmc::AsyncScheduler(ctx(),
423 std::ref(sched_ctx
.get_dmc_client_counters()),
424 sched_ctx
.get_dmc_client_config(),
425 *sched_ctx
.get_dmc_client_config(),
426 dmc::AtLimit::Reject
));
428 case dmc::scheduler_t::none
:
429 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl
;
431 case dmc::scheduler_t::throttler
:
432 scheduler
.reset(new dmc::SimpleThrottler(ctx()));
442 void unpause(rgw::sal::Store
* store
, rgw_auth_registry_ptr_t
);
445 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
448 auto port
= std::strtoul(input
, &end
, 10);
449 if (port
> std::numeric_limits
<unsigned short>::max()) {
450 ec
.assign(ERANGE
, boost::system::system_category());
451 } else if (port
== 0 && end
== input
) {
452 ec
.assign(EINVAL
, boost::system::system_category());
457 tcp::endpoint
parse_endpoint(boost::asio::string_view input
,
458 unsigned short default_port
,
459 boost::system::error_code
& ec
)
461 tcp::endpoint endpoint
;
464 ec
= boost::asio::error::invalid_argument
;
468 if (input
[0] == '[') { // ipv6
469 const size_t addr_begin
= 1;
470 const size_t addr_end
= input
.find(']');
471 if (addr_end
== input
.npos
) { // no matching ]
472 ec
= boost::asio::error::invalid_argument
;
475 if (addr_end
+ 1 < input
.size()) {
476 // :port must must follow [ipv6]
477 if (input
[addr_end
+ 1] != ':') {
478 ec
= boost::asio::error::invalid_argument
;
481 auto port_str
= input
.substr(addr_end
+ 2);
482 endpoint
.port(parse_port(port_str
.data(), ec
));
485 endpoint
.port(default_port
);
487 auto addr
= input
.substr(addr_begin
, addr_end
- addr_begin
);
488 endpoint
.address(boost::asio::ip::make_address_v6(addr
, ec
));
490 auto colon
= input
.find(':');
491 if (colon
!= input
.npos
) {
492 auto port_str
= input
.substr(colon
+ 1);
493 endpoint
.port(parse_port(port_str
.data(), ec
));
498 endpoint
.port(default_port
);
500 auto addr
= input
.substr(0, colon
);
501 endpoint
.address(boost::asio::ip::make_address_v4(addr
, ec
));
506 static int drop_privileges(CephContext
*ctx
)
508 uid_t uid
= ctx
->get_set_uid();
509 gid_t gid
= ctx
->get_set_gid();
510 std::string uid_string
= ctx
->get_set_uid_string();
511 std::string gid_string
= ctx
->get_set_gid_string();
512 if (gid
&& setgid(gid
) != 0) {
514 ldout(ctx
, -1) << "unable to setgid " << gid
<< ": " << cpp_strerror(err
) << dendl
;
517 if (uid
&& setuid(uid
) != 0) {
519 ldout(ctx
, -1) << "unable to setuid " << uid
<< ": " << cpp_strerror(err
) << dendl
;
523 ldout(ctx
, 0) << "set uid:gid to " << uid
<< ":" << gid
524 << " (" << uid_string
<< ":" << gid_string
<< ")" << dendl
;
529 int AsioFrontend::init()
531 boost::system::error_code ec
;
532 auto& config
= conf
->get_config_map();
534 // Setting global timeout
535 auto timeout
= config
.find("request_timeout_ms");
536 if (timeout
!= config
.end()) {
537 auto timeout_number
= ceph::parse
<uint64_t>(timeout
->second
);
538 if (timeout_number
) {
539 request_timeout
= std::chrono::milliseconds(*timeout_number
);
541 lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
542 << timeout
->second
<< " setting it to the default value: "
543 << REQUEST_TIMEOUT
<< dendl
;
547 auto max_header_size
= config
.find("max_header_size");
548 if (max_header_size
!= config
.end()) {
549 auto limit
= ceph::parse
<uint64_t>(max_header_size
->second
);
551 lderr(ctx()) << "WARNING: invalid value for max_header_size: "
552 << max_header_size
->second
<< ", using the default value: "
553 << header_limit
<< dendl
;
554 } else if (*limit
> parse_buffer_size
) { // can't exceed parse buffer size
555 header_limit
= parse_buffer_size
;
556 lderr(ctx()) << "WARNING: max_header_size " << max_header_size
->second
557 << " capped at maximum value " << header_limit
<< dendl
;
559 header_limit
= *limit
;
563 #ifdef WITH_RADOSGW_BEAST_OPENSSL
571 auto ports
= config
.equal_range("port");
572 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
573 auto port
= parse_port(i
->second
.c_str(), ec
);
575 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
578 listeners
.emplace_back(context
);
579 listeners
.back().endpoint
.port(port
);
581 listeners
.emplace_back(context
);
582 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
585 auto endpoints
= config
.equal_range("endpoint");
586 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
587 auto endpoint
= parse_endpoint(i
->second
, 80, ec
);
589 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
592 listeners
.emplace_back(context
);
593 listeners
.back().endpoint
= endpoint
;
596 auto nodelay
= config
.find("tcp_nodelay");
597 if (nodelay
!= config
.end()) {
598 for (auto& l
: listeners
) {
599 l
.use_nodelay
= (nodelay
->second
== "1");
604 bool socket_bound
= false;
606 for (auto& l
: listeners
) {
607 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
609 if (ec
== boost::asio::error::address_family_not_supported
) {
610 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l
.endpoint
611 << ", " << ec
.message() << dendl
;
615 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
619 if (l
.endpoint
.protocol() == tcp::v6()) {
620 l
.acceptor
.set_option(boost::asio::ip::v6_only(true), ec
);
622 lderr(ctx()) << "failed to set v6_only socket option: "
623 << ec
.message() << dendl
;
628 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
629 l
.acceptor
.bind(l
.endpoint
, ec
);
631 lderr(ctx()) << "failed to bind address " << l
.endpoint
632 << ": " << ec
.message() << dendl
;
636 auto it
= config
.find("max_connection_backlog");
637 auto max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
638 if (it
!= config
.end()) {
640 max_connection_backlog
= strict_strtol(it
->second
.c_str(), 10, &err
);
642 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it
->second
<< dendl
;
643 max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
646 l
.acceptor
.listen(max_connection_backlog
);
647 l
.acceptor
.async_accept(l
.socket
,
648 [this, &l
] (boost::system::error_code ec
) {
652 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
656 lderr(ctx()) << "Unable to listen at any endpoints" << dendl
;
660 return drop_privileges(ctx());
663 #ifdef WITH_RADOSGW_BEAST_OPENSSL
665 static string config_val_prefix
= "config://";
669 class ExpandMetaVar
{
670 map
<string
, string
> meta_map
;
673 ExpandMetaVar(rgw::sal::Zone
* zone_svc
) {
674 meta_map
["realm"] = zone_svc
->get_realm().get_name();
675 meta_map
["realm_id"] = zone_svc
->get_realm().get_id();
676 meta_map
["zonegroup"] = zone_svc
->get_zonegroup().get_name();
677 meta_map
["zonegroup_id"] = zone_svc
->get_zonegroup().get_id();
678 meta_map
["zone"] = zone_svc
->get_name();
679 meta_map
["zone_id"] = zone_svc
->get_id().id
;
682 string
process_str(const string
& in
);
685 string
ExpandMetaVar::process_str(const string
& in
)
687 if (meta_map
.empty()) {
691 auto pos
= in
.find('$');
692 if (pos
== std::string::npos
) {
697 decltype(pos
) last_pos
= 0;
699 while (pos
!= std::string::npos
) {
700 if (pos
> last_pos
) {
701 out
+= in
.substr(last_pos
, pos
- last_pos
);
705 const char *valid_chars
= "abcdefghijklmnopqrstuvwxyz_";
708 if (in
[pos
+1] == '{') {
710 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 2);
711 if (endpos
!= std::string::npos
&&
713 var
= in
.substr(pos
+ 2, endpos
- pos
- 2);
718 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 1);
719 if (endpos
!= std::string::npos
)
720 var
= in
.substr(pos
+ 1, endpos
- pos
- 1);
722 var
= in
.substr(pos
+ 1);
724 string var_source
= in
.substr(pos
, endpos
- pos
);
727 auto iter
= meta_map
.find(var
);
728 if (iter
!= meta_map
.end()) {
733 pos
= in
.find('$', last_pos
);
735 if (last_pos
!= std::string::npos
) {
736 out
+= in
.substr(last_pos
);
742 } /* anonymous namespace */
744 int AsioFrontend::get_config_key_val(string name
,
749 lderr(ctx()) << "bad " << type
<< " config value" << dendl
;
753 int r
= env
.store
->get_config_key_val(name
, pbl
);
755 lderr(ctx()) << type
<< " was not found: " << name
<< dendl
;
761 int AsioFrontend::ssl_set_private_key(const string
& name
, bool is_ssl_certificate
)
763 boost::system::error_code ec
;
765 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
766 ssl_context
->use_private_key_file(name
, ssl::context::pem
, ec
);
769 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
775 ssl_context
->use_private_key(boost::asio::buffer(bl
.c_str(), bl
.length()),
776 ssl::context::pem
, ec
);
780 if (!is_ssl_certificate
) {
781 lderr(ctx()) << "failed to add ssl_private_key=" << name
782 << ": " << ec
.message() << dendl
;
784 lderr(ctx()) << "failed to use ssl_certificate=" << name
785 << " as a private key: " << ec
.message() << dendl
;
793 int AsioFrontend::ssl_set_certificate_chain(const string
& name
)
795 boost::system::error_code ec
;
797 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
798 ssl_context
->use_certificate_chain_file(name
, ec
);
801 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
807 ssl_context
->use_certificate_chain(boost::asio::buffer(bl
.c_str(), bl
.length()),
812 lderr(ctx()) << "failed to use ssl_certificate=" << name
813 << ": " << ec
.message() << dendl
;
820 int AsioFrontend::init_ssl()
822 boost::system::error_code ec
;
823 auto& config
= conf
->get_config_map();
826 std::optional
<string
> cert
= conf
->get_val("ssl_certificate");
828 // only initialize the ssl context if it's going to be used
829 ssl_context
= boost::in_place(ssl::context::tls
);
832 std::optional
<string
> key
= conf
->get_val("ssl_private_key");
833 bool have_cert
= false;
836 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl
;
840 std::optional
<string
> options
= conf
->get_val("ssl_options");
843 lderr(ctx()) << "no ssl_certificate configured for ssl_options" << dendl
;
847 options
= "no_sslv2:no_sslv3:no_tlsv1:no_tlsv1_1";
851 for (auto &option
: ceph::split(*options
, ":")) {
852 if (option
== "default_workarounds") {
853 ssl_context
->set_options(ssl::context::default_workarounds
);
854 } else if (option
== "no_compression") {
855 ssl_context
->set_options(ssl::context::no_compression
);
856 } else if (option
== "no_sslv2") {
857 ssl_context
->set_options(ssl::context::no_sslv2
);
858 } else if (option
== "no_sslv3") {
859 ssl_context
->set_options(ssl::context::no_sslv3
);
860 } else if (option
== "no_tlsv1") {
861 ssl_context
->set_options(ssl::context::no_tlsv1
);
862 } else if (option
== "no_tlsv1_1") {
863 ssl_context
->set_options(ssl::context::no_tlsv1_1
);
864 } else if (option
== "no_tlsv1_2") {
865 ssl_context
->set_options(ssl::context::no_tlsv1_2
);
866 } else if (option
== "single_dh_use") {
867 ssl_context
->set_options(ssl::context::single_dh_use
);
869 lderr(ctx()) << "ignoring unknown ssl option '" << option
<< "'" << dendl
;
874 std::optional
<string
> ciphers
= conf
->get_val("ssl_ciphers");
877 lderr(ctx()) << "no ssl_certificate configured for ssl_ciphers" << dendl
;
881 int r
= SSL_CTX_set_cipher_list(ssl_context
->native_handle(),
884 lderr(ctx()) << "no cipher could be selected from ssl_ciphers: "
885 << *ciphers
<< dendl
;
890 auto ports
= config
.equal_range("ssl_port");
891 auto endpoints
= config
.equal_range("ssl_endpoint");
894 * don't try to config certificate if frontend isn't configured for ssl
896 if (ports
.first
== ports
.second
&&
897 endpoints
.first
== endpoints
.second
) {
901 bool key_is_cert
= false;
909 ExpandMetaVar
emv(env
.store
->get_zone());
911 cert
= emv
.process_str(*cert
);
912 key
= emv
.process_str(*key
);
914 int r
= ssl_set_private_key(*key
, key_is_cert
);
915 bool have_private_key
= (r
>= 0);
918 r
= ssl_set_private_key(*cert
, true);
919 have_private_key
= (r
>= 0);
923 if (have_private_key
) {
924 int r
= ssl_set_certificate_chain(*cert
);
925 have_cert
= (r
>= 0);
929 // parse ssl endpoints
930 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
932 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl
;
935 auto port
= parse_port(i
->second
.c_str(), ec
);
937 lderr(ctx()) << "failed to parse ssl_port=" << i
->second
<< dendl
;
940 listeners
.emplace_back(context
);
941 listeners
.back().endpoint
.port(port
);
942 listeners
.back().use_ssl
= true;
944 listeners
.emplace_back(context
);
945 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
946 listeners
.back().use_ssl
= true;
949 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
951 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl
;
954 auto endpoint
= parse_endpoint(i
->second
, 443, ec
);
956 lderr(ctx()) << "failed to parse ssl_endpoint=" << i
->second
<< dendl
;
959 listeners
.emplace_back(context
);
960 listeners
.back().endpoint
= endpoint
;
961 listeners
.back().use_ssl
= true;
965 #endif // WITH_RADOSGW_BEAST_OPENSSL
967 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
969 if (!l
.acceptor
.is_open()) {
971 } else if (ec
== boost::asio::error::operation_aborted
) {
974 ldout(ctx(), 1) << "accept failed: " << ec
.message() << dendl
;
977 auto stream
= std::move(l
.socket
);
978 stream
.set_option(tcp::no_delay(l
.use_nodelay
), ec
);
979 l
.acceptor
.async_accept(l
.socket
,
980 [this, &l
] (boost::system::error_code ec
) {
984 // spawn a coroutine to handle the connection
985 #ifdef WITH_RADOSGW_BEAST_OPENSSL
987 spawn::spawn(context
,
988 [this, s
=std::move(stream
)] (yield_context yield
) mutable {
989 auto conn
= boost::intrusive_ptr
{new Connection(std::move(s
))};
990 auto c
= connections
.add(*conn
);
991 // wrap the tcp stream in an ssl stream
992 boost::asio::ssl::stream
<tcp_socket
&> stream
{conn
->socket
, *ssl_context
};
993 auto timeout
= timeout_timer
{context
.get_executor(), request_timeout
, conn
};
995 boost::system::error_code ec
;
997 auto bytes
= stream
.async_handshake(ssl::stream_base::server
,
998 conn
->buffer
.data(), yield
[ec
]);
1001 ldout(ctx(), 1) << "ssl handshake failed: " << ec
.message() << dendl
;
1004 conn
->buffer
.consume(bytes
);
1005 handle_connection(context
, env
, stream
, timeout
, header_limit
,
1006 conn
->buffer
, true, pause_mutex
, scheduler
.get(),
1009 // ssl shutdown (ignoring errors)
1010 stream
.async_shutdown(yield
[ec
]);
1012 conn
->socket
.shutdown(tcp::socket::shutdown_both
, ec
);
1013 }, make_stack_allocator());
1017 #endif // WITH_RADOSGW_BEAST_OPENSSL
1018 spawn::spawn(context
,
1019 [this, s
=std::move(stream
)] (yield_context yield
) mutable {
1020 auto conn
= boost::intrusive_ptr
{new Connection(std::move(s
))};
1021 auto c
= connections
.add(*conn
);
1022 auto timeout
= timeout_timer
{context
.get_executor(), request_timeout
, conn
};
1023 boost::system::error_code ec
;
1024 handle_connection(context
, env
, conn
->socket
, timeout
, header_limit
,
1025 conn
->buffer
, false, pause_mutex
, scheduler
.get(),
1027 conn
->socket
.shutdown(tcp_socket::shutdown_both
, ec
);
1028 }, make_stack_allocator());
1032 int AsioFrontend::run()
1035 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
1036 threads
.reserve(thread_count
);
1038 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
1040 // the worker threads call io_context::run(), which will return when there's
1041 // no work left. hold a work guard to keep these threads going until join()
1042 work
.emplace(boost::asio::make_work_guard(context
));
1044 for (int i
= 0; i
< thread_count
; i
++) {
1045 threads
.emplace_back([=]() noexcept
{
1046 // request warnings on synchronous librados calls in this thread
1047 is_asio_thread
= true;
1048 // Have uncaught exceptions kill the process and give a
1049 // stacktrace, not be swallowed.
1056 void AsioFrontend::stop()
1058 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
1062 boost::system::error_code ec
;
1063 // close all listeners
1064 for (auto& listener
: listeners
) {
1065 listener
.acceptor
.close(ec
);
1067 // close all connections
1068 connections
.close(ec
);
1069 pause_mutex
.cancel();
1072 void AsioFrontend::join()
1079 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
1080 for (auto& thread
: threads
) {
1083 ldout(ctx(), 4) << "frontend done" << dendl
;
1086 void AsioFrontend::pause()
1088 ldout(ctx(), 4) << "frontend pausing connections..." << dendl
;
1090 // cancel pending calls to accept(), but don't close the sockets
1091 boost::system::error_code ec
;
1092 for (auto& l
: listeners
) {
1093 l
.acceptor
.cancel(ec
);
1096 // pause and wait for outstanding requests to complete
1097 pause_mutex
.lock(ec
);
1100 ldout(ctx(), 1) << "frontend failed to pause: " << ec
.message() << dendl
;
1102 ldout(ctx(), 4) << "frontend paused" << dendl
;
1106 void AsioFrontend::unpause(rgw::sal::Store
* const store
,
1107 rgw_auth_registry_ptr_t auth_registry
)
1110 env
.auth_registry
= std::move(auth_registry
);
1112 // unpause to unblock connections
1113 pause_mutex
.unlock();
1115 // start accepting connections again
1116 for (auto& l
: listeners
) {
1117 l
.acceptor
.async_accept(l
.socket
,
1118 [this, &l
] (boost::system::error_code ec
) {
1123 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
1126 } // anonymous namespace
1128 class RGWAsioFrontend::Impl
: public AsioFrontend
{
1130 Impl(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
1131 rgw::dmclock::SchedulerCtx
& sched_ctx
)
1132 : AsioFrontend(env
, conf
, sched_ctx
) {}
1135 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
,
1136 RGWFrontendConfig
* conf
,
1137 rgw::dmclock::SchedulerCtx
& sched_ctx
)
1138 : impl(new Impl(env
, conf
, sched_ctx
))
1142 RGWAsioFrontend::~RGWAsioFrontend() = default;
1144 int RGWAsioFrontend::init()
1146 return impl
->init();
1149 int RGWAsioFrontend::run()
1154 void RGWAsioFrontend::stop()
1159 void RGWAsioFrontend::join()
1164 void RGWAsioFrontend::pause_for_new_config()
1169 void RGWAsioFrontend::unpause_with_new_config(
1170 rgw::sal::Store
* const store
,
1171 rgw_auth_registry_ptr_t auth_registry
1173 impl
->unpause(store
, std::move(auth_registry
));