1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
9 #include <boost/asio.hpp>
10 #include <boost/intrusive/list.hpp>
11 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
13 #include <boost/context/protected_fixedsize_stack.hpp>
14 #include <spawn/spawn.hpp>
16 #include "common/async/shared_mutex.h"
17 #include "common/errno.h"
18 #include "common/strtol.h"
20 #include "rgw_asio_client.h"
21 #include "rgw_asio_frontend.h"
23 #ifdef WITH_RADOSGW_BEAST_OPENSSL
24 #include <boost/asio/ssl.hpp>
27 #include "common/split.h"
29 #include "services/svc_config_key.h"
30 #include "services/svc_zone.h"
34 #include "rgw_asio_frontend_timer.h"
35 #include "rgw_dmclock_async_scheduler.h"
37 #define dout_subsys ceph_subsys_rgw
41 using tcp
= boost::asio::ip::tcp
;
42 namespace http
= boost::beast::http
;
43 #ifdef WITH_RADOSGW_BEAST_OPENSSL
44 namespace ssl
= boost::asio::ssl
;
49 // use explicit executor types instead of the type-erased boost::asio::executor
50 using executor_type
= boost::asio::io_context::executor_type
;
52 using tcp_socket
= boost::asio::basic_stream_socket
<tcp
, executor_type
>;
53 using tcp_stream
= boost::beast::basic_stream
<tcp
, executor_type
>;
55 using timeout_timer
= rgw::basic_timeout_timer
<ceph::coarse_mono_clock
,
56 executor_type
, Connection
>;
58 static constexpr size_t parse_buffer_size
= 65536;
59 using parse_buffer
= boost::beast::flat_static_buffer
<parse_buffer_size
>;
61 // use mmap/mprotect to allocate 512k coroutine stacks
62 auto make_stack_allocator() {
63 return boost::context::protected_fixedsize_stack
{512*1024};
68 template <typename Stream
>
69 class StreamIO
: public rgw::asio::ClientIO
{
70 CephContext
* const cct
;
72 timeout_timer
& timeout
;
75 boost::system::error_code fatal_ec
;
77 StreamIO(CephContext
*cct
, Stream
& stream
, timeout_timer
& timeout
,
78 rgw::asio::parser_type
& parser
, yield_context yield
,
79 parse_buffer
& buffer
, bool is_ssl
,
80 const tcp::endpoint
& local_endpoint
,
81 const tcp::endpoint
& remote_endpoint
)
82 : ClientIO(parser
, is_ssl
, local_endpoint
, remote_endpoint
),
83 cct(cct
), stream(stream
), timeout(timeout
), yield(yield
),
87 boost::system::error_code
get_fatal_error_code() const { return fatal_ec
; }
89 size_t write_data(const char* buf
, size_t len
) override
{
90 boost::system::error_code ec
;
92 auto bytes
= boost::asio::async_write(stream
, boost::asio::buffer(buf
, len
),
96 ldout(cct
, 4) << "write_data failed: " << ec
.message() << dendl
;
97 if (ec
== boost::asio::error::broken_pipe
) {
98 boost::system::error_code ec_ignored
;
99 stream
.lowest_layer().shutdown(tcp_socket::shutdown_both
, ec_ignored
);
104 throw rgw::io::Exception(ec
.value(), std::system_category());
109 size_t recv_body(char* buf
, size_t max
) override
{
110 auto& message
= parser
.get();
111 auto& body_remaining
= message
.body();
112 body_remaining
.data
= buf
;
113 body_remaining
.size
= max
;
115 while (body_remaining
.size
&& !parser
.is_done()) {
116 boost::system::error_code ec
;
118 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
120 if (ec
== http::error::need_buffer
) {
124 ldout(cct
, 4) << "failed to read body: " << ec
.message() << dendl
;
128 throw rgw::io::Exception(ec
.value(), std::system_category());
131 return max
- body_remaining
.size
;
135 // output the http version as a string, ie 'HTTP/1.1'
136 struct http_version
{
139 explicit http_version(unsigned version
)
140 : major_ver(version
/ 10), minor_ver(version
% 10) {}
142 std::ostream
& operator<<(std::ostream
& out
, const http_version
& v
) {
143 return out
<< "HTTP/" << v
.major_ver
<< '.' << v
.minor_ver
;
146 // log an http header value or '-' if it's missing
148 const http::fields
& fields
;
150 std::string_view quote
;
151 log_header(const http::fields
& fields
, http::field field
,
152 std::string_view quote
= "")
153 : fields(fields
), field(field
), quote(quote
) {}
155 std::ostream
& operator<<(std::ostream
& out
, const log_header
& h
) {
156 auto p
= h
.fields
.find(h
.field
);
157 if (p
== h
.fields
.end()) {
160 return out
<< h
.quote
<< p
->value() << h
.quote
;
163 // log fractional seconds in milliseconds
164 struct log_ms_remainder
{
165 ceph::coarse_real_time t
;
166 log_ms_remainder(ceph::coarse_real_time t
) : t(t
) {}
168 std::ostream
& operator<<(std::ostream
& out
, const log_ms_remainder
& m
) {
169 using namespace std::chrono
;
170 return out
<< std::setfill('0') << std::setw(3)
171 << duration_cast
<milliseconds
>(m
.t
.time_since_epoch()).count() % 1000;
174 // log time in apache format: day/month/year:hour:minute:second zone
175 struct log_apache_time
{
176 ceph::coarse_real_time t
;
177 log_apache_time(ceph::coarse_real_time t
) : t(t
) {}
179 std::ostream
& operator<<(std::ostream
& out
, const log_apache_time
& a
) {
180 const auto t
= ceph::coarse_real_clock::to_time_t(a
.t
);
181 const auto local
= std::localtime(&t
);
182 return out
<< std::put_time(local
, "%d/%b/%Y:%T.") << log_ms_remainder
{a
.t
}
183 << std::put_time(local
, " %z");
186 using SharedMutex
= ceph::async::SharedMutex
<boost::asio::io_context::executor_type
>;
188 template <typename Stream
>
189 void handle_connection(boost::asio::io_context
& context
,
190 RGWProcessEnv
& env
, Stream
& stream
,
191 timeout_timer
& timeout
, size_t header_limit
,
192 parse_buffer
& buffer
, bool is_ssl
,
193 SharedMutex
& pause_mutex
,
194 rgw::dmclock::Scheduler
*scheduler
,
195 const std::string
& uri_prefix
,
196 boost::system::error_code
& ec
,
199 // don't impose a limit on the body, since we read it in pieces
200 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
202 auto cct
= env
.driver
->ctx();
204 // read messages from the stream until eof
206 // configure the parser
207 rgw::asio::parser_type parser
;
208 parser
.header_limit(header_limit
);
209 parser
.body_limit(body_limit
);
212 http::async_read_header(stream
, buffer
, parser
, yield
[ec
]);
214 if (ec
== boost::asio::error::connection_reset
||
215 ec
== boost::asio::error::bad_descriptor
||
216 ec
== boost::asio::error::operation_aborted
||
217 #ifdef WITH_RADOSGW_BEAST_OPENSSL
218 ec
== ssl::error::stream_truncated
||
220 ec
== http::error::end_of_stream
) {
221 ldout(cct
, 20) << "failed to read header: " << ec
.message() << dendl
;
224 auto& message
= parser
.get();
226 ldout(cct
, 1) << "failed to read header: " << ec
.message() << dendl
;
227 http::response
<http::empty_body
> response
;
228 response
.result(http::status::bad_request
);
229 response
.version(message
.version() == 10 ? 10 : 11);
230 response
.prepare_payload();
232 http::async_write(stream
, response
, yield
[ec
]);
235 ldout(cct
, 5) << "failed to write response: " << ec
.message() << dendl
;
237 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
241 bool expect_continue
= (message
[http::field::expect
] == "100-continue");
244 auto lock
= pause_mutex
.async_lock_shared(yield
[ec
]);
245 if (ec
== boost::asio::error::operation_aborted
) {
248 ldout(cct
, 1) << "failed to lock: " << ec
.message() << dendl
;
252 // process the request
253 RGWRequest req
{env
.driver
->get_new_req_id()};
255 auto& socket
= stream
.lowest_layer();
256 const auto& remote_endpoint
= socket
.remote_endpoint(ec
);
258 ldout(cct
, 1) << "failed to connect client: " << ec
.message() << dendl
;
261 const auto& local_endpoint
= socket
.local_endpoint(ec
);
263 ldout(cct
, 1) << "failed to connect client: " << ec
.message() << dendl
;
267 StreamIO real_client
{cct
, stream
, timeout
, parser
, yield
, buffer
,
268 is_ssl
, local_endpoint
, remote_endpoint
};
270 auto real_client_io
= rgw::io::add_reordering(
271 rgw::io::add_buffering(cct
,
272 rgw::io::add_chunking(
273 rgw::io::add_conlen_controlling(
275 RGWRestfulIO
client(cct
, &real_client_io
);
276 optional_yield y
= null_yield
;
277 if (cct
->_conf
->rgw_beast_enable_async
) {
278 y
= optional_yield
{context
, yield
};
282 const auto started
= ceph::coarse_real_clock::now();
283 ceph::coarse_real_clock::duration latency
{};
284 process_request(env
, &req
, uri_prefix
, &client
, y
,
285 scheduler
, &user
, &latency
, &http_ret
);
287 if (cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw_access
, 1)) {
288 // access log line elements begin per Apache Combined Log Format with additions following
289 lsubdout(cct
, rgw_access
, 1) << "beast: " << std::hex
<< &req
<< std::dec
<< ": "
290 << remote_endpoint
.address() << " - " << user
<< " [" << log_apache_time
{started
} << "] \""
291 << message
.method_string() << ' ' << message
.target() << ' '
292 << http_version
{message
.version()} << "\" " << http_ret
<< ' '
293 << client
.get_bytes_sent() + client
.get_bytes_received() << ' '
294 << log_header
{message
, http::field::referer
, "\""} << ' '
295 << log_header
{message
, http::field::user_agent
, "\""} << ' '
296 << log_header
{message
, http::field::range
} << " latency="
300 // process_request() can't distinguish between connection errors and
301 // http/s3 errors, so check StreamIO for fatal connection errors
302 ec
= real_client
.get_fatal_error_code();
307 if (real_client
.sent_100_continue()) {
308 expect_continue
= false;
312 if (!parser
.keep_alive()) {
316 // if we failed before reading the entire message, discard any remaining
317 // bytes before reading the next
318 while (!expect_continue
&& !parser
.is_done()) {
319 static std::array
<char, 1024> discard_buffer
;
321 auto& body
= parser
.get().body();
322 body
.size
= discard_buffer
.size();
323 body
.data
= discard_buffer
.data();
326 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
328 if (ec
== http::error::need_buffer
) {
331 if (ec
== boost::asio::error::connection_reset
) {
335 ldout(cct
, 5) << "failed to discard unread message: "
336 << ec
.message() << dendl
;
343 // timeout support requires that connections are reference-counted, because the
344 // timeout_handler can outlive the coroutine
345 struct Connection
: boost::intrusive::list_base_hook
<>,
346 boost::intrusive_ref_counter
<Connection
>
351 explicit Connection(tcp_socket
&& socket
) noexcept
352 : socket(std::move(socket
)) {}
354 void close(boost::system::error_code
& ec
) {
358 tcp_socket
& get_socket() { return socket
; }
361 class ConnectionList
{
362 using List
= boost::intrusive::list
<Connection
>;
366 void remove(Connection
& c
) {
367 std::lock_guard lock
{mutex
};
369 connections
.erase(List::s_iterator_to(c
));
374 ConnectionList
*list
;
377 Guard(ConnectionList
*list
, Connection
*conn
) : list(list
), conn(conn
) {}
378 ~Guard() { list
->remove(*conn
); }
380 [[nodiscard
]] Guard
add(Connection
& conn
) {
381 std::lock_guard lock
{mutex
};
382 connections
.push_back(conn
);
383 return Guard
{this, &conn
};
385 void close(boost::system::error_code
& ec
) {
386 std::lock_guard lock
{mutex
};
387 for (auto& conn
: connections
) {
388 conn
.socket
.close(ec
);
394 namespace dmc
= rgw::dmclock
;
397 RGWFrontendConfig
* conf
;
398 boost::asio::io_context context
;
399 std::string uri_prefix
;
400 ceph::timespan request_timeout
= std::chrono::milliseconds(REQUEST_TIMEOUT
);
401 size_t header_limit
= 16384;
402 #ifdef WITH_RADOSGW_BEAST_OPENSSL
403 boost::optional
<ssl::context
> ssl_context
;
404 int get_config_key_val(string name
,
407 int ssl_set_private_key(const string
& name
, bool is_ssl_cert
);
408 int ssl_set_certificate_chain(const string
& name
);
411 SharedMutex pause_mutex
;
412 std::unique_ptr
<rgw::dmclock::Scheduler
> scheduler
;
415 tcp::endpoint endpoint
;
416 tcp::acceptor acceptor
;
418 bool use_ssl
= false;
419 bool use_nodelay
= false;
421 explicit Listener(boost::asio::io_context
& context
)
422 : acceptor(context
), socket(context
) {}
424 std::vector
<Listener
> listeners
;
426 ConnectionList connections
;
428 // work guard to keep run() threads busy while listeners are paused
429 using Executor
= boost::asio::io_context::executor_type
;
430 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
432 std::vector
<std::thread
> threads
;
433 std::atomic
<bool> going_down
{false};
435 CephContext
* ctx() const { return env
.driver
->ctx(); }
436 std::optional
<dmc::ClientCounters
> client_counters
;
437 std::unique_ptr
<dmc::ClientConfig
> client_config
;
438 void accept(Listener
& listener
, boost::system::error_code ec
);
441 AsioFrontend(RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
442 dmc::SchedulerCtx
& sched_ctx
)
443 : env(env
), conf(conf
), pause_mutex(context
.get_executor())
445 auto sched_t
= dmc::get_scheduler_t(ctx());
447 case dmc::scheduler_t::dmclock
:
448 scheduler
.reset(new dmc::AsyncScheduler(ctx(),
450 std::ref(sched_ctx
.get_dmc_client_counters()),
451 sched_ctx
.get_dmc_client_config(),
452 *sched_ctx
.get_dmc_client_config(),
453 dmc::AtLimit::Reject
));
455 case dmc::scheduler_t::none
:
456 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl
;
458 case dmc::scheduler_t::throttler
:
459 scheduler
.reset(new dmc::SimpleThrottler(ctx()));
472 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
475 auto port
= std::strtoul(input
, &end
, 10);
476 if (port
> std::numeric_limits
<unsigned short>::max()) {
477 ec
.assign(ERANGE
, boost::system::system_category());
478 } else if (port
== 0 && end
== input
) {
479 ec
.assign(EINVAL
, boost::system::system_category());
484 tcp::endpoint
parse_endpoint(boost::asio::string_view input
,
485 unsigned short default_port
,
486 boost::system::error_code
& ec
)
488 tcp::endpoint endpoint
;
491 ec
= boost::asio::error::invalid_argument
;
495 if (input
[0] == '[') { // ipv6
496 const size_t addr_begin
= 1;
497 const size_t addr_end
= input
.find(']');
498 if (addr_end
== input
.npos
) { // no matching ]
499 ec
= boost::asio::error::invalid_argument
;
502 if (addr_end
+ 1 < input
.size()) {
503 // :port must must follow [ipv6]
504 if (input
[addr_end
+ 1] != ':') {
505 ec
= boost::asio::error::invalid_argument
;
508 auto port_str
= input
.substr(addr_end
+ 2);
509 endpoint
.port(parse_port(port_str
.data(), ec
));
512 endpoint
.port(default_port
);
514 auto addr
= input
.substr(addr_begin
, addr_end
- addr_begin
);
515 endpoint
.address(boost::asio::ip::make_address_v6(addr
, ec
));
517 auto colon
= input
.find(':');
518 if (colon
!= input
.npos
) {
519 auto port_str
= input
.substr(colon
+ 1);
520 endpoint
.port(parse_port(port_str
.data(), ec
));
525 endpoint
.port(default_port
);
527 auto addr
= input
.substr(0, colon
);
528 endpoint
.address(boost::asio::ip::make_address_v4(addr
, ec
));
533 static int drop_privileges(CephContext
*ctx
)
535 uid_t uid
= ctx
->get_set_uid();
536 gid_t gid
= ctx
->get_set_gid();
537 std::string uid_string
= ctx
->get_set_uid_string();
538 std::string gid_string
= ctx
->get_set_gid_string();
539 if (gid
&& setgid(gid
) != 0) {
541 ldout(ctx
, -1) << "unable to setgid " << gid
<< ": " << cpp_strerror(err
) << dendl
;
544 if (uid
&& setuid(uid
) != 0) {
546 ldout(ctx
, -1) << "unable to setuid " << uid
<< ": " << cpp_strerror(err
) << dendl
;
550 ldout(ctx
, 0) << "set uid:gid to " << uid
<< ":" << gid
551 << " (" << uid_string
<< ":" << gid_string
<< ")" << dendl
;
556 int AsioFrontend::init()
558 boost::system::error_code ec
;
559 auto& config
= conf
->get_config_map();
561 if (auto i
= config
.find("prefix"); i
!= config
.end()) {
562 uri_prefix
= i
->second
;
565 // Setting global timeout
566 auto timeout
= config
.find("request_timeout_ms");
567 if (timeout
!= config
.end()) {
568 auto timeout_number
= ceph::parse
<uint64_t>(timeout
->second
);
569 if (timeout_number
) {
570 request_timeout
= std::chrono::milliseconds(*timeout_number
);
572 lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
573 << timeout
->second
<< " setting it to the default value: "
574 << REQUEST_TIMEOUT
<< dendl
;
578 auto max_header_size
= config
.find("max_header_size");
579 if (max_header_size
!= config
.end()) {
580 auto limit
= ceph::parse
<uint64_t>(max_header_size
->second
);
582 lderr(ctx()) << "WARNING: invalid value for max_header_size: "
583 << max_header_size
->second
<< ", using the default value: "
584 << header_limit
<< dendl
;
585 } else if (*limit
> parse_buffer_size
) { // can't exceed parse buffer size
586 header_limit
= parse_buffer_size
;
587 lderr(ctx()) << "WARNING: max_header_size " << max_header_size
->second
588 << " capped at maximum value " << header_limit
<< dendl
;
590 header_limit
= *limit
;
594 #ifdef WITH_RADOSGW_BEAST_OPENSSL
602 auto ports
= config
.equal_range("port");
603 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
604 auto port
= parse_port(i
->second
.c_str(), ec
);
606 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
609 listeners
.emplace_back(context
);
610 listeners
.back().endpoint
.port(port
);
612 listeners
.emplace_back(context
);
613 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
616 auto endpoints
= config
.equal_range("endpoint");
617 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
618 auto endpoint
= parse_endpoint(i
->second
, 80, ec
);
620 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
623 listeners
.emplace_back(context
);
624 listeners
.back().endpoint
= endpoint
;
627 auto nodelay
= config
.find("tcp_nodelay");
628 if (nodelay
!= config
.end()) {
629 for (auto& l
: listeners
) {
630 l
.use_nodelay
= (nodelay
->second
== "1");
635 bool socket_bound
= false;
637 for (auto& l
: listeners
) {
638 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
640 if (ec
== boost::asio::error::address_family_not_supported
) {
641 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l
.endpoint
642 << ", " << ec
.message() << dendl
;
646 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
650 if (l
.endpoint
.protocol() == tcp::v6()) {
651 l
.acceptor
.set_option(boost::asio::ip::v6_only(true), ec
);
653 lderr(ctx()) << "failed to set v6_only socket option: "
654 << ec
.message() << dendl
;
659 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
660 l
.acceptor
.bind(l
.endpoint
, ec
);
662 lderr(ctx()) << "failed to bind address " << l
.endpoint
663 << ": " << ec
.message() << dendl
;
667 auto it
= config
.find("max_connection_backlog");
668 auto max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
669 if (it
!= config
.end()) {
671 max_connection_backlog
= strict_strtol(it
->second
.c_str(), 10, &err
);
673 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it
->second
<< dendl
;
674 max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
677 l
.acceptor
.listen(max_connection_backlog
);
678 l
.acceptor
.async_accept(l
.socket
,
679 [this, &l
] (boost::system::error_code ec
) {
683 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
687 lderr(ctx()) << "Unable to listen at any endpoints" << dendl
;
691 return drop_privileges(ctx());
694 #ifdef WITH_RADOSGW_BEAST_OPENSSL
696 static string config_val_prefix
= "config://";
700 class ExpandMetaVar
{
701 map
<string
, string
> meta_map
;
704 ExpandMetaVar(rgw::sal::Zone
* zone_svc
) {
705 meta_map
["realm"] = zone_svc
->get_realm_name();
706 meta_map
["realm_id"] = zone_svc
->get_realm_id();
707 meta_map
["zonegroup"] = zone_svc
->get_zonegroup().get_name();
708 meta_map
["zonegroup_id"] = zone_svc
->get_zonegroup().get_id();
709 meta_map
["zone"] = zone_svc
->get_name();
710 meta_map
["zone_id"] = zone_svc
->get_id();
713 string
process_str(const string
& in
);
716 string
ExpandMetaVar::process_str(const string
& in
)
718 if (meta_map
.empty()) {
722 auto pos
= in
.find('$');
723 if (pos
== std::string::npos
) {
728 decltype(pos
) last_pos
= 0;
730 while (pos
!= std::string::npos
) {
731 if (pos
> last_pos
) {
732 out
+= in
.substr(last_pos
, pos
- last_pos
);
736 const char *valid_chars
= "abcdefghijklmnopqrstuvwxyz_";
739 if (in
[pos
+1] == '{') {
741 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 2);
742 if (endpos
!= std::string::npos
&&
744 var
= in
.substr(pos
+ 2, endpos
- pos
- 2);
749 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 1);
750 if (endpos
!= std::string::npos
)
751 var
= in
.substr(pos
+ 1, endpos
- pos
- 1);
753 var
= in
.substr(pos
+ 1);
755 string var_source
= in
.substr(pos
, endpos
- pos
);
758 auto iter
= meta_map
.find(var
);
759 if (iter
!= meta_map
.end()) {
764 pos
= in
.find('$', last_pos
);
766 if (last_pos
!= std::string::npos
) {
767 out
+= in
.substr(last_pos
);
773 } /* anonymous namespace */
775 int AsioFrontend::get_config_key_val(string name
,
780 lderr(ctx()) << "bad " << type
<< " config value" << dendl
;
784 int r
= env
.driver
->get_config_key_val(name
, pbl
);
786 lderr(ctx()) << type
<< " was not found: " << name
<< dendl
;
792 int AsioFrontend::ssl_set_private_key(const string
& name
, bool is_ssl_certificate
)
794 boost::system::error_code ec
;
796 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
797 ssl_context
->use_private_key_file(name
, ssl::context::pem
, ec
);
800 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
806 ssl_context
->use_private_key(boost::asio::buffer(bl
.c_str(), bl
.length()),
807 ssl::context::pem
, ec
);
811 if (!is_ssl_certificate
) {
812 lderr(ctx()) << "failed to add ssl_private_key=" << name
813 << ": " << ec
.message() << dendl
;
815 lderr(ctx()) << "failed to use ssl_certificate=" << name
816 << " as a private key: " << ec
.message() << dendl
;
824 int AsioFrontend::ssl_set_certificate_chain(const string
& name
)
826 boost::system::error_code ec
;
828 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
829 ssl_context
->use_certificate_chain_file(name
, ec
);
832 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
838 ssl_context
->use_certificate_chain(boost::asio::buffer(bl
.c_str(), bl
.length()),
843 lderr(ctx()) << "failed to use ssl_certificate=" << name
844 << ": " << ec
.message() << dendl
;
851 int AsioFrontend::init_ssl()
853 boost::system::error_code ec
;
854 auto& config
= conf
->get_config_map();
857 std::optional
<string
> cert
= conf
->get_val("ssl_certificate");
859 // only initialize the ssl context if it's going to be used
860 ssl_context
= boost::in_place(ssl::context::tls
);
863 std::optional
<string
> key
= conf
->get_val("ssl_private_key");
864 bool have_cert
= false;
867 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl
;
871 std::optional
<string
> options
= conf
->get_val("ssl_options");
874 lderr(ctx()) << "no ssl_certificate configured for ssl_options" << dendl
;
878 options
= "no_sslv2:no_sslv3:no_tlsv1:no_tlsv1_1";
882 for (auto &option
: ceph::split(*options
, ":")) {
883 if (option
== "default_workarounds") {
884 ssl_context
->set_options(ssl::context::default_workarounds
);
885 } else if (option
== "no_compression") {
886 ssl_context
->set_options(ssl::context::no_compression
);
887 } else if (option
== "no_sslv2") {
888 ssl_context
->set_options(ssl::context::no_sslv2
);
889 } else if (option
== "no_sslv3") {
890 ssl_context
->set_options(ssl::context::no_sslv3
);
891 } else if (option
== "no_tlsv1") {
892 ssl_context
->set_options(ssl::context::no_tlsv1
);
893 } else if (option
== "no_tlsv1_1") {
894 ssl_context
->set_options(ssl::context::no_tlsv1_1
);
895 } else if (option
== "no_tlsv1_2") {
896 ssl_context
->set_options(ssl::context::no_tlsv1_2
);
897 } else if (option
== "single_dh_use") {
898 ssl_context
->set_options(ssl::context::single_dh_use
);
900 lderr(ctx()) << "ignoring unknown ssl option '" << option
<< "'" << dendl
;
905 std::optional
<string
> ciphers
= conf
->get_val("ssl_ciphers");
908 lderr(ctx()) << "no ssl_certificate configured for ssl_ciphers" << dendl
;
912 int r
= SSL_CTX_set_cipher_list(ssl_context
->native_handle(),
915 lderr(ctx()) << "no cipher could be selected from ssl_ciphers: "
916 << *ciphers
<< dendl
;
921 auto ports
= config
.equal_range("ssl_port");
922 auto endpoints
= config
.equal_range("ssl_endpoint");
925 * don't try to config certificate if frontend isn't configured for ssl
927 if (ports
.first
== ports
.second
&&
928 endpoints
.first
== endpoints
.second
) {
932 bool key_is_cert
= false;
940 ExpandMetaVar
emv(env
.driver
->get_zone());
942 cert
= emv
.process_str(*cert
);
943 key
= emv
.process_str(*key
);
945 int r
= ssl_set_private_key(*key
, key_is_cert
);
946 bool have_private_key
= (r
>= 0);
949 r
= ssl_set_private_key(*cert
, true);
950 have_private_key
= (r
>= 0);
954 if (have_private_key
) {
955 int r
= ssl_set_certificate_chain(*cert
);
956 have_cert
= (r
>= 0);
960 // parse ssl endpoints
961 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
963 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl
;
966 auto port
= parse_port(i
->second
.c_str(), ec
);
968 lderr(ctx()) << "failed to parse ssl_port=" << i
->second
<< dendl
;
971 listeners
.emplace_back(context
);
972 listeners
.back().endpoint
.port(port
);
973 listeners
.back().use_ssl
= true;
975 listeners
.emplace_back(context
);
976 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
977 listeners
.back().use_ssl
= true;
980 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
982 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl
;
985 auto endpoint
= parse_endpoint(i
->second
, 443, ec
);
987 lderr(ctx()) << "failed to parse ssl_endpoint=" << i
->second
<< dendl
;
990 listeners
.emplace_back(context
);
991 listeners
.back().endpoint
= endpoint
;
992 listeners
.back().use_ssl
= true;
996 #endif // WITH_RADOSGW_BEAST_OPENSSL
998 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
1000 if (!l
.acceptor
.is_open()) {
1002 } else if (ec
== boost::asio::error::operation_aborted
) {
1005 ldout(ctx(), 1) << "accept failed: " << ec
.message() << dendl
;
1008 auto stream
= std::move(l
.socket
);
1009 stream
.set_option(tcp::no_delay(l
.use_nodelay
), ec
);
1010 l
.acceptor
.async_accept(l
.socket
,
1011 [this, &l
] (boost::system::error_code ec
) {
1015 // spawn a coroutine to handle the connection
1016 #ifdef WITH_RADOSGW_BEAST_OPENSSL
1018 spawn::spawn(context
,
1019 [this, s
=std::move(stream
)] (yield_context yield
) mutable {
1020 auto conn
= boost::intrusive_ptr
{new Connection(std::move(s
))};
1021 auto c
= connections
.add(*conn
);
1022 // wrap the tcp stream in an ssl stream
1023 boost::asio::ssl::stream
<tcp_socket
&> stream
{conn
->socket
, *ssl_context
};
1024 auto timeout
= timeout_timer
{context
.get_executor(), request_timeout
, conn
};
1026 boost::system::error_code ec
;
1028 auto bytes
= stream
.async_handshake(ssl::stream_base::server
,
1029 conn
->buffer
.data(), yield
[ec
]);
1032 ldout(ctx(), 1) << "ssl handshake failed: " << ec
.message() << dendl
;
1035 conn
->buffer
.consume(bytes
);
1036 handle_connection(context
, env
, stream
, timeout
, header_limit
,
1037 conn
->buffer
, true, pause_mutex
, scheduler
.get(),
1038 uri_prefix
, ec
, yield
);
1040 // ssl shutdown (ignoring errors)
1041 stream
.async_shutdown(yield
[ec
]);
1043 conn
->socket
.shutdown(tcp::socket::shutdown_both
, ec
);
1044 }, make_stack_allocator());
1048 #endif // WITH_RADOSGW_BEAST_OPENSSL
1049 spawn::spawn(context
,
1050 [this, s
=std::move(stream
)] (yield_context yield
) mutable {
1051 auto conn
= boost::intrusive_ptr
{new Connection(std::move(s
))};
1052 auto c
= connections
.add(*conn
);
1053 auto timeout
= timeout_timer
{context
.get_executor(), request_timeout
, conn
};
1054 boost::system::error_code ec
;
1055 handle_connection(context
, env
, conn
->socket
, timeout
, header_limit
,
1056 conn
->buffer
, false, pause_mutex
, scheduler
.get(),
1057 uri_prefix
, ec
, yield
);
1058 conn
->socket
.shutdown(tcp_socket::shutdown_both
, ec
);
1059 }, make_stack_allocator());
1063 int AsioFrontend::run()
1066 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
1067 threads
.reserve(thread_count
);
1069 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
1071 // the worker threads call io_context::run(), which will return when there's
1072 // no work left. hold a work guard to keep these threads going until join()
1073 work
.emplace(boost::asio::make_work_guard(context
));
1075 for (int i
= 0; i
< thread_count
; i
++) {
1076 threads
.emplace_back([this]() noexcept
{
1077 // request warnings on synchronous librados calls in this thread
1078 is_asio_thread
= true;
1079 // Have uncaught exceptions kill the process and give a
1080 // stacktrace, not be swallowed.
1087 void AsioFrontend::stop()
1089 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
1093 boost::system::error_code ec
;
1094 // close all listeners
1095 for (auto& listener
: listeners
) {
1096 listener
.acceptor
.close(ec
);
1098 // close all connections
1099 connections
.close(ec
);
1100 pause_mutex
.cancel();
1103 void AsioFrontend::join()
1110 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
1111 for (auto& thread
: threads
) {
1114 ldout(ctx(), 4) << "frontend done" << dendl
;
1117 void AsioFrontend::pause()
1119 ldout(ctx(), 4) << "frontend pausing connections..." << dendl
;
1121 // cancel pending calls to accept(), but don't close the sockets
1122 boost::system::error_code ec
;
1123 for (auto& l
: listeners
) {
1124 l
.acceptor
.cancel(ec
);
1127 // pause and wait for outstanding requests to complete
1128 pause_mutex
.lock(ec
);
1131 ldout(ctx(), 1) << "frontend failed to pause: " << ec
.message() << dendl
;
1133 ldout(ctx(), 4) << "frontend paused" << dendl
;
1137 void AsioFrontend::unpause()
1139 // unpause to unblock connections
1140 pause_mutex
.unlock();
1142 // start accepting connections again
1143 for (auto& l
: listeners
) {
1144 l
.acceptor
.async_accept(l
.socket
,
1145 [this, &l
] (boost::system::error_code ec
) {
1150 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
1153 } // anonymous namespace
1155 class RGWAsioFrontend::Impl
: public AsioFrontend
{
1157 Impl(RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
1158 rgw::dmclock::SchedulerCtx
& sched_ctx
)
1159 : AsioFrontend(env
, conf
, sched_ctx
) {}
1162 RGWAsioFrontend::RGWAsioFrontend(RGWProcessEnv
& env
,
1163 RGWFrontendConfig
* conf
,
1164 rgw::dmclock::SchedulerCtx
& sched_ctx
)
1165 : impl(new Impl(env
, conf
, sched_ctx
))
1169 RGWAsioFrontend::~RGWAsioFrontend() = default;
1171 int RGWAsioFrontend::init()
1173 return impl
->init();
1176 int RGWAsioFrontend::run()
1181 void RGWAsioFrontend::stop()
1186 void RGWAsioFrontend::join()
1191 void RGWAsioFrontend::pause_for_new_config()
1196 void RGWAsioFrontend::unpause_with_new_config()