1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include <boost/asio.hpp>
9 #include <boost/intrusive/list.hpp>
11 #include <boost/context/protected_fixedsize_stack.hpp>
12 #include <spawn/spawn.hpp>
14 #include "common/async/shared_mutex.h"
15 #include "common/errno.h"
16 #include "common/strtol.h"
18 #include "rgw_asio_client.h"
19 #include "rgw_asio_frontend.h"
21 #ifdef WITH_RADOSGW_BEAST_OPENSSL
22 #include <boost/asio/ssl.hpp>
23 #include <boost/beast/ssl/ssl_stream.hpp>
25 #include "services/svc_config_key.h"
26 #include "services/svc_zone.h"
32 #include "rgw_dmclock_async_scheduler.h"
34 #define dout_subsys ceph_subsys_rgw
38 using tcp
= boost::asio::ip::tcp
;
39 namespace http
= boost::beast::http
;
40 #ifdef WITH_RADOSGW_BEAST_OPENSSL
41 namespace ssl
= boost::asio::ssl
;
44 using parse_buffer
= boost::beast::flat_static_buffer
<65536>;
46 // use mmap/mprotect to allocate 512k coroutine stacks
47 auto make_stack_allocator() {
48 return boost::context::protected_fixedsize_stack
{512*1024};
51 template <typename Stream
>
52 class StreamIO
: public rgw::asio::ClientIO
{
53 CephContext
* const cct
;
55 spawn::yield_context yield
;
57 ceph::timespan request_timeout
;
59 StreamIO(CephContext
*cct
, Stream
& stream
, rgw::asio::parser_type
& parser
,
60 spawn::yield_context yield
,
61 parse_buffer
& buffer
, bool is_ssl
,
62 const tcp::endpoint
& local_endpoint
,
63 const tcp::endpoint
& remote_endpoint
,
64 ceph::timespan request_timeout
)
65 : ClientIO(parser
, is_ssl
, local_endpoint
, remote_endpoint
),
66 cct(cct
), stream(stream
), yield(yield
), buffer(buffer
), request_timeout(request_timeout
)
69 size_t write_data(const char* buf
, size_t len
) override
{
70 boost::system::error_code ec
;
71 auto& timeout
= get_lowest_layer(stream
);
72 if (request_timeout
.count()) {
73 timeout
.expires_after(request_timeout
);
75 auto bytes
= boost::asio::async_write(stream
, boost::asio::buffer(buf
, len
),
78 ldout(cct
, 4) << "write_data failed: " << ec
.message() << dendl
;
79 if (ec
==boost::asio::error::broken_pipe
) {
80 boost::system::error_code ec_ignored
;
81 timeout
.socket().shutdown(tcp::socket::shutdown_both
, ec_ignored
);
83 throw rgw::io::Exception(ec
.value(), std::system_category());
88 size_t recv_body(char* buf
, size_t max
) override
{
89 auto& timeout
= get_lowest_layer(stream
);
90 auto& message
= parser
.get();
91 auto& body_remaining
= message
.body();
92 body_remaining
.data
= buf
;
93 body_remaining
.size
= max
;
95 while (body_remaining
.size
&& !parser
.is_done()) {
96 boost::system::error_code ec
;
97 if (request_timeout
.count()) {
98 timeout
.expires_after(request_timeout
);
100 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
101 if (ec
== http::error::need_buffer
) {
105 ldout(cct
, 4) << "failed to read body: " << ec
.message() << dendl
;
106 throw rgw::io::Exception(ec
.value(), std::system_category());
109 return max
- body_remaining
.size
;
113 // output the http version as a string, ie 'HTTP/1.1'
114 struct http_version
{
117 explicit http_version(unsigned version
)
118 : major_ver(version
/ 10), minor_ver(version
% 10) {}
120 std::ostream
& operator<<(std::ostream
& out
, const http_version
& v
) {
121 return out
<< "HTTP/" << v
.major_ver
<< '.' << v
.minor_ver
;
124 // log an http header value or '-' if it's missing
126 const http::fields
& fields
;
128 std::string_view quote
;
129 log_header(const http::fields
& fields
, http::field field
,
130 std::string_view quote
= "")
131 : fields(fields
), field(field
), quote(quote
) {}
133 std::ostream
& operator<<(std::ostream
& out
, const log_header
& h
) {
134 auto p
= h
.fields
.find(h
.field
);
135 if (p
== h
.fields
.end()) {
138 return out
<< h
.quote
<< p
->value() << h
.quote
;
141 using SharedMutex
= ceph::async::SharedMutex
<boost::asio::io_context::executor_type
>;
143 template <typename Stream
>
144 void handle_connection(boost::asio::io_context
& context
,
145 RGWProcessEnv
& env
, Stream
& stream
,
146 parse_buffer
& buffer
, bool is_ssl
,
147 SharedMutex
& pause_mutex
,
148 rgw::dmclock::Scheduler
*scheduler
,
149 boost::system::error_code
& ec
,
150 spawn::yield_context yield
,
151 ceph::timespan request_timeout
)
153 // limit header to 4k, since we read it all into a single flat_buffer
154 static constexpr size_t header_limit
= 4096;
155 // don't impose a limit on the body, since we read it in pieces
156 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
158 auto cct
= env
.store
->ctx();
160 // read messages from the stream until eof
162 // configure the parser
163 rgw::asio::parser_type parser
;
164 parser
.header_limit(header_limit
);
165 parser
.body_limit(body_limit
);
166 auto& timeout
= get_lowest_layer(stream
);
167 if (request_timeout
.count()) {
168 timeout
.expires_after(request_timeout
);
171 http::async_read_header(stream
, buffer
, parser
, yield
[ec
]);
172 if (ec
== boost::asio::error::connection_reset
||
173 ec
== boost::asio::error::bad_descriptor
||
174 ec
== boost::asio::error::operation_aborted
||
175 #ifdef WITH_RADOSGW_BEAST_OPENSSL
176 ec
== ssl::error::stream_truncated
||
178 ec
== http::error::end_of_stream
) {
179 ldout(cct
, 20) << "failed to read header: " << ec
.message() << dendl
;
182 auto& message
= parser
.get();
184 ldout(cct
, 1) << "failed to read header: " << ec
.message() << dendl
;
185 http::response
<http::empty_body
> response
;
186 response
.result(http::status::bad_request
);
187 response
.version(message
.version() == 10 ? 10 : 11);
188 response
.prepare_payload();
189 if (request_timeout
.count()) {
190 timeout
.expires_after(request_timeout
);
192 http::async_write(stream
, response
, yield
[ec
]);
194 ldout(cct
, 5) << "failed to write response: " << ec
.message() << dendl
;
196 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
201 auto lock
= pause_mutex
.async_lock_shared(yield
[ec
]);
202 if (ec
== boost::asio::error::operation_aborted
) {
205 ldout(cct
, 1) << "failed to lock: " << ec
.message() << dendl
;
209 // process the request
210 RGWRequest req
{env
.store
->getRados()->get_new_req_id()};
212 auto& socket
= get_lowest_layer(stream
).socket();
213 const auto& remote_endpoint
= socket
.remote_endpoint(ec
);
215 ldout(cct
, 1) << "failed to connect client: " << ec
.message() << dendl
;
219 StreamIO real_client
{cct
, stream
, parser
, yield
, buffer
, is_ssl
,
220 socket
.local_endpoint(),
221 remote_endpoint
,request_timeout
};
223 auto real_client_io
= rgw::io::add_reordering(
224 rgw::io::add_buffering(cct
,
225 rgw::io::add_chunking(
226 rgw::io::add_conlen_controlling(
228 RGWRestfulIO
client(cct
, &real_client_io
);
229 auto y
= optional_yield
{context
, yield
};
231 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
232 *env
.auth_registry
, &client
, env
.olog
, y
,
233 scheduler
, &http_ret
);
235 if (cct
->_conf
->subsys
.should_gather(dout_subsys
, 1)) {
236 // access log line elements begin per Apache Combined Log Format with additions following
237 const auto now
= ceph::coarse_real_clock::now();
238 using ceph::operator<<; // for coarse_real_time
239 ldout(cct
, 1) << "beast: " << hex
<< &req
<< dec
<< ": "
240 << remote_endpoint
.address() << " - - [" << now
<< "] \""
241 << message
.method_string() << ' ' << message
.target() << ' '
242 << http_version
{message
.version()} << "\" " << http_ret
<< ' '
243 << client
.get_bytes_sent() + client
.get_bytes_received() << ' '
244 << log_header
{message
, http::field::referer
, "\""} << ' '
245 << log_header
{message
, http::field::user_agent
, "\""} << ' '
246 << log_header
{message
, http::field::range
} << dendl
;
250 if (!parser
.keep_alive()) {
254 // if we failed before reading the entire message, discard any remaining
255 // bytes before reading the next
256 while (!parser
.is_done()) {
257 static std::array
<char, 1024> discard_buffer
;
259 auto& body
= parser
.get().body();
260 body
.size
= discard_buffer
.size();
261 body
.data
= discard_buffer
.data();
263 if (request_timeout
.count()) {
264 timeout
.expires_after(request_timeout
);
266 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
267 if (ec
== http::error::need_buffer
) {
270 if (ec
== boost::asio::error::connection_reset
) {
274 ldout(cct
, 5) << "failed to discard unread message: "
275 << ec
.message() << dendl
;
282 struct Connection
: boost::intrusive::list_base_hook
<> {
284 Connection(tcp::socket
& socket
) : socket(socket
) {}
287 class ConnectionList
{
288 using List
= boost::intrusive::list
<Connection
>;
292 void remove(Connection
& c
) {
293 std::lock_guard lock
{mutex
};
295 connections
.erase(List::s_iterator_to(c
));
300 ConnectionList
*list
;
303 Guard(ConnectionList
*list
, Connection
*conn
) : list(list
), conn(conn
) {}
304 ~Guard() { list
->remove(*conn
); }
306 [[nodiscard
]] Guard
add(Connection
& conn
) {
307 std::lock_guard lock
{mutex
};
308 connections
.push_back(conn
);
309 return Guard
{this, &conn
};
311 void close(boost::system::error_code
& ec
) {
312 std::lock_guard lock
{mutex
};
313 for (auto& conn
: connections
) {
314 conn
.socket
.close(ec
);
320 namespace dmc
= rgw::dmclock
;
323 RGWFrontendConfig
* conf
;
324 boost::asio::io_context context
;
325 ceph::timespan request_timeout
= std::chrono::milliseconds(REQUEST_TIMEOUT
);
326 #ifdef WITH_RADOSGW_BEAST_OPENSSL
327 boost::optional
<ssl::context
> ssl_context
;
328 int get_config_key_val(string name
,
331 int ssl_set_private_key(const string
& name
, bool is_ssl_cert
);
332 int ssl_set_certificate_chain(const string
& name
);
335 SharedMutex pause_mutex
;
336 std::unique_ptr
<rgw::dmclock::Scheduler
> scheduler
;
339 tcp::endpoint endpoint
;
340 tcp::acceptor acceptor
;
342 bool use_ssl
= false;
343 bool use_nodelay
= false;
345 explicit Listener(boost::asio::io_context
& context
)
346 : acceptor(context
), socket(context
) {}
348 std::vector
<Listener
> listeners
;
350 ConnectionList connections
;
352 // work guard to keep run() threads busy while listeners are paused
353 using Executor
= boost::asio::io_context::executor_type
;
354 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
356 std::vector
<std::thread
> threads
;
357 std::atomic
<bool> going_down
{false};
359 CephContext
* ctx() const { return env
.store
->ctx(); }
360 std::optional
<dmc::ClientCounters
> client_counters
;
361 std::unique_ptr
<dmc::ClientConfig
> client_config
;
362 void accept(Listener
& listener
, boost::system::error_code ec
);
365 AsioFrontend(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
366 dmc::SchedulerCtx
& sched_ctx
)
367 : env(env
), conf(conf
), pause_mutex(context
.get_executor())
369 auto sched_t
= dmc::get_scheduler_t(ctx());
371 case dmc::scheduler_t::dmclock
:
372 scheduler
.reset(new dmc::AsyncScheduler(ctx(),
374 std::ref(sched_ctx
.get_dmc_client_counters()),
375 sched_ctx
.get_dmc_client_config(),
376 *sched_ctx
.get_dmc_client_config(),
377 dmc::AtLimit::Reject
));
379 case dmc::scheduler_t::none
:
380 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl
;
382 case dmc::scheduler_t::throttler
:
383 scheduler
.reset(new dmc::SimpleThrottler(ctx()));
393 void unpause(rgw::sal::RGWRadosStore
* store
, rgw_auth_registry_ptr_t
);
396 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
399 auto port
= std::strtoul(input
, &end
, 10);
400 if (port
> std::numeric_limits
<unsigned short>::max()) {
401 ec
.assign(ERANGE
, boost::system::system_category());
402 } else if (port
== 0 && end
== input
) {
403 ec
.assign(EINVAL
, boost::system::system_category());
408 tcp::endpoint
parse_endpoint(boost::asio::string_view input
,
409 unsigned short default_port
,
410 boost::system::error_code
& ec
)
412 tcp::endpoint endpoint
;
415 ec
= boost::asio::error::invalid_argument
;
419 if (input
[0] == '[') { // ipv6
420 const size_t addr_begin
= 1;
421 const size_t addr_end
= input
.find(']');
422 if (addr_end
== input
.npos
) { // no matching ]
423 ec
= boost::asio::error::invalid_argument
;
426 if (addr_end
+ 1 < input
.size()) {
427 // :port must must follow [ipv6]
428 if (input
[addr_end
+ 1] != ':') {
429 ec
= boost::asio::error::invalid_argument
;
432 auto port_str
= input
.substr(addr_end
+ 2);
433 endpoint
.port(parse_port(port_str
.data(), ec
));
436 endpoint
.port(default_port
);
438 auto addr
= input
.substr(addr_begin
, addr_end
- addr_begin
);
439 endpoint
.address(boost::asio::ip::make_address_v6(addr
, ec
));
441 auto colon
= input
.find(':');
442 if (colon
!= input
.npos
) {
443 auto port_str
= input
.substr(colon
+ 1);
444 endpoint
.port(parse_port(port_str
.data(), ec
));
449 endpoint
.port(default_port
);
451 auto addr
= input
.substr(0, colon
);
452 endpoint
.address(boost::asio::ip::make_address_v4(addr
, ec
));
457 static int drop_privileges(CephContext
*ctx
)
459 uid_t uid
= ctx
->get_set_uid();
460 gid_t gid
= ctx
->get_set_gid();
461 std::string uid_string
= ctx
->get_set_uid_string();
462 std::string gid_string
= ctx
->get_set_gid_string();
463 if (gid
&& setgid(gid
) != 0) {
465 ldout(ctx
, -1) << "unable to setgid " << gid
<< ": " << cpp_strerror(err
) << dendl
;
468 if (uid
&& setuid(uid
) != 0) {
470 ldout(ctx
, -1) << "unable to setuid " << uid
<< ": " << cpp_strerror(err
) << dendl
;
474 ldout(ctx
, 0) << "set uid:gid to " << uid
<< ":" << gid
475 << " (" << uid_string
<< ":" << gid_string
<< ")" << dendl
;
480 int AsioFrontend::init()
482 boost::system::error_code ec
;
483 auto& config
= conf
->get_config_map();
485 // Setting global timeout
486 auto timeout
= config
.find("request_timeout_ms");
487 if (timeout
!= config
.end()) {
488 auto timeout_number
= ceph::parse
<uint64_t>(timeout
->second
.data());
489 if (timeout_number
) {
490 request_timeout
= std::chrono::milliseconds(*timeout_number
);
492 lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
493 << timeout
->second
.data() << " setting it to the default value: "
494 << REQUEST_TIMEOUT
<< dendl
;
497 #ifdef WITH_RADOSGW_BEAST_OPENSSL
505 auto ports
= config
.equal_range("port");
506 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
507 auto port
= parse_port(i
->second
.c_str(), ec
);
509 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
512 listeners
.emplace_back(context
);
513 listeners
.back().endpoint
.port(port
);
515 listeners
.emplace_back(context
);
516 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
519 auto endpoints
= config
.equal_range("endpoint");
520 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
521 auto endpoint
= parse_endpoint(i
->second
, 80, ec
);
523 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
526 listeners
.emplace_back(context
);
527 listeners
.back().endpoint
= endpoint
;
530 auto nodelay
= config
.find("tcp_nodelay");
531 if (nodelay
!= config
.end()) {
532 for (auto& l
: listeners
) {
533 l
.use_nodelay
= (nodelay
->second
== "1");
538 bool socket_bound
= false;
540 for (auto& l
: listeners
) {
541 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
543 if (ec
== boost::asio::error::address_family_not_supported
) {
544 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l
.endpoint
545 << ", " << ec
.message() << dendl
;
549 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
553 if (l
.endpoint
.protocol() == tcp::v6()) {
554 l
.acceptor
.set_option(boost::asio::ip::v6_only(true), ec
);
556 lderr(ctx()) << "failed to set v6_only socket option: "
557 << ec
.message() << dendl
;
562 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
563 l
.acceptor
.bind(l
.endpoint
, ec
);
565 lderr(ctx()) << "failed to bind address " << l
.endpoint
566 << ": " << ec
.message() << dendl
;
570 auto it
= config
.find("max_connection_backlog");
571 auto max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
572 if (it
!= config
.end()) {
574 max_connection_backlog
= strict_strtol(it
->second
.c_str(), 10, &err
);
576 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it
->second
<< dendl
;
577 max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
580 l
.acceptor
.listen(max_connection_backlog
);
581 l
.acceptor
.async_accept(l
.socket
,
582 [this, &l
] (boost::system::error_code ec
) {
586 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
590 lderr(ctx()) << "Unable to listen at any endpoints" << dendl
;
594 return drop_privileges(ctx());
597 #ifdef WITH_RADOSGW_BEAST_OPENSSL
599 static string config_val_prefix
= "config://";
603 class ExpandMetaVar
{
604 map
<string
, string
> meta_map
;
607 ExpandMetaVar(RGWSI_Zone
*zone_svc
) {
608 meta_map
["realm"] = zone_svc
->get_realm().get_name();
609 meta_map
["realm_id"] = zone_svc
->get_realm().get_id();
610 meta_map
["zonegroup"] = zone_svc
->get_zonegroup().get_name();
611 meta_map
["zonegroup_id"] = zone_svc
->get_zonegroup().get_id();
612 meta_map
["zone"] = zone_svc
->zone_name();
613 meta_map
["zone_id"] = zone_svc
->zone_id().id
;
616 string
process_str(const string
& in
);
619 string
ExpandMetaVar::process_str(const string
& in
)
621 if (meta_map
.empty()) {
625 auto pos
= in
.find('$');
626 if (pos
== std::string::npos
) {
631 decltype(pos
) last_pos
= 0;
633 while (pos
!= std::string::npos
) {
634 if (pos
> last_pos
) {
635 out
+= in
.substr(last_pos
, pos
- last_pos
);
639 const char *valid_chars
= "abcdefghijklmnopqrstuvwxyz_";
642 if (in
[pos
+1] == '{') {
644 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 2);
645 if (endpos
!= std::string::npos
&&
647 var
= in
.substr(pos
+ 2, endpos
- pos
- 2);
652 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 1);
653 if (endpos
!= std::string::npos
)
654 var
= in
.substr(pos
+ 1, endpos
- pos
- 1);
656 var
= in
.substr(pos
+ 1);
658 string var_source
= in
.substr(pos
, endpos
- pos
);
661 auto iter
= meta_map
.find(var
);
662 if (iter
!= meta_map
.end()) {
667 pos
= in
.find('$', last_pos
);
669 if (last_pos
!= std::string::npos
) {
670 out
+= in
.substr(last_pos
);
676 } /* anonymous namespace */
678 int AsioFrontend::get_config_key_val(string name
,
683 lderr(ctx()) << "bad " << type
<< " config value" << dendl
;
687 auto svc
= env
.store
->svc()->config_key
;
688 int r
= svc
->get(name
, true, pbl
);
690 lderr(ctx()) << type
<< " was not found: " << name
<< dendl
;
696 int AsioFrontend::ssl_set_private_key(const string
& name
, bool is_ssl_certificate
)
698 boost::system::error_code ec
;
700 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
701 ssl_context
->use_private_key_file(name
, ssl::context::pem
, ec
);
704 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
710 ssl_context
->use_private_key(boost::asio::buffer(bl
.c_str(), bl
.length()),
711 ssl::context::pem
, ec
);
715 if (!is_ssl_certificate
) {
716 lderr(ctx()) << "failed to add ssl_private_key=" << name
717 << ": " << ec
.message() << dendl
;
719 lderr(ctx()) << "failed to use ssl_certificate=" << name
720 << " as a private key: " << ec
.message() << dendl
;
728 int AsioFrontend::ssl_set_certificate_chain(const string
& name
)
730 boost::system::error_code ec
;
732 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
733 ssl_context
->use_certificate_chain_file(name
, ec
);
736 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
742 ssl_context
->use_certificate_chain(boost::asio::buffer(bl
.c_str(), bl
.length()),
747 lderr(ctx()) << "failed to use ssl_certificate=" << name
748 << ": " << ec
.message() << dendl
;
755 int AsioFrontend::init_ssl()
757 boost::system::error_code ec
;
758 auto& config
= conf
->get_config_map();
761 std::optional
<string
> cert
= conf
->get_val("ssl_certificate");
763 // only initialize the ssl context if it's going to be used
764 ssl_context
= boost::in_place(ssl::context::tls
);
767 std::optional
<string
> key
= conf
->get_val("ssl_private_key");
768 bool have_cert
= false;
771 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl
;
775 auto ports
= config
.equal_range("ssl_port");
776 auto endpoints
= config
.equal_range("ssl_endpoint");
779 * don't try to config certificate if frontend isn't configured for ssl
781 if (ports
.first
== ports
.second
&&
782 endpoints
.first
== endpoints
.second
) {
786 bool key_is_cert
= false;
794 ExpandMetaVar
emv(env
.store
->svc()->zone
);
796 cert
= emv
.process_str(*cert
);
797 key
= emv
.process_str(*key
);
799 int r
= ssl_set_private_key(*key
, key_is_cert
);
800 bool have_private_key
= (r
>= 0);
803 r
= ssl_set_private_key(*cert
, true);
804 have_private_key
= (r
>= 0);
808 if (have_private_key
) {
809 int r
= ssl_set_certificate_chain(*cert
);
810 have_cert
= (r
>= 0);
814 // parse ssl endpoints
815 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
817 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl
;
820 auto port
= parse_port(i
->second
.c_str(), ec
);
822 lderr(ctx()) << "failed to parse ssl_port=" << i
->second
<< dendl
;
825 listeners
.emplace_back(context
);
826 listeners
.back().endpoint
.port(port
);
827 listeners
.back().use_ssl
= true;
829 listeners
.emplace_back(context
);
830 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
831 listeners
.back().use_ssl
= true;
834 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
836 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl
;
839 auto endpoint
= parse_endpoint(i
->second
, 443, ec
);
841 lderr(ctx()) << "failed to parse ssl_endpoint=" << i
->second
<< dendl
;
844 listeners
.emplace_back(context
);
845 listeners
.back().endpoint
= endpoint
;
846 listeners
.back().use_ssl
= true;
850 #endif // WITH_RADOSGW_BEAST_OPENSSL
852 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
854 if (!l
.acceptor
.is_open()) {
856 } else if (ec
== boost::asio::error::operation_aborted
) {
859 ldout(ctx(), 1) << "accept failed: " << ec
.message() << dendl
;
862 auto socket
= std::move(l
.socket
);
863 tcp::no_delay
options(l
.use_nodelay
);
864 socket
.set_option(options
,ec
);
865 l
.acceptor
.async_accept(l
.socket
,
866 [this, &l
] (boost::system::error_code ec
) {
870 boost::beast::tcp_stream
stream(std::move(socket
));
871 // spawn a coroutine to handle the connection
872 #ifdef WITH_RADOSGW_BEAST_OPENSSL
874 spawn::spawn(context
,
875 [this, s
=std::move(stream
)] (spawn::yield_context yield
) mutable {
876 Connection conn
{s
.socket()};
877 auto c
= connections
.add(conn
);
878 // wrap the tcp_stream in an ssl stream
879 boost::beast::ssl_stream
<boost::beast::tcp_stream
&> stream
{s
, *ssl_context
};
880 auto buffer
= std::make_unique
<parse_buffer
>();
882 boost::system::error_code ec
;
883 if (request_timeout
.count()) {
884 get_lowest_layer(stream
).expires_after(request_timeout
);
886 auto bytes
= stream
.async_handshake(ssl::stream_base::server
,
887 buffer
->data(), yield
[ec
]);
889 ldout(ctx(), 1) << "ssl handshake failed: " << ec
.message() << dendl
;
892 buffer
->consume(bytes
);
893 handle_connection(context
, env
, stream
, *buffer
, true, pause_mutex
,
894 scheduler
.get(), ec
, yield
, request_timeout
);
896 // ssl shutdown (ignoring errors)
897 stream
.async_shutdown(yield
[ec
]);
899 s
.socket().shutdown(tcp::socket::shutdown_both
, ec
);
900 }, make_stack_allocator());
904 #endif // WITH_RADOSGW_BEAST_OPENSSL
905 spawn::spawn(context
,
906 [this, s
=std::move(stream
)] (spawn::yield_context yield
) mutable {
907 Connection conn
{s
.socket()};
908 auto c
= connections
.add(conn
);
909 auto buffer
= std::make_unique
<parse_buffer
>();
910 boost::system::error_code ec
;
911 handle_connection(context
, env
, s
, *buffer
, false, pause_mutex
,
912 scheduler
.get(), ec
, yield
, request_timeout
);
913 s
.socket().shutdown(tcp::socket::shutdown_both
, ec
);
914 }, make_stack_allocator());
918 int AsioFrontend::run()
921 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
922 threads
.reserve(thread_count
);
924 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
926 // the worker threads call io_context::run(), which will return when there's
927 // no work left. hold a work guard to keep these threads going until join()
928 work
.emplace(boost::asio::make_work_guard(context
));
930 for (int i
= 0; i
< thread_count
; i
++) {
931 threads
.emplace_back([=] {
932 // request warnings on synchronous librados calls in this thread
933 is_asio_thread
= true;
934 boost::system::error_code ec
;
941 void AsioFrontend::stop()
943 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
947 boost::system::error_code ec
;
948 // close all listeners
949 for (auto& listener
: listeners
) {
950 listener
.acceptor
.close(ec
);
952 // close all connections
953 connections
.close(ec
);
954 pause_mutex
.cancel();
957 void AsioFrontend::join()
964 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
965 for (auto& thread
: threads
) {
968 ldout(ctx(), 4) << "frontend done" << dendl
;
971 void AsioFrontend::pause()
973 ldout(ctx(), 4) << "frontend pausing connections..." << dendl
;
975 // cancel pending calls to accept(), but don't close the sockets
976 boost::system::error_code ec
;
977 for (auto& l
: listeners
) {
978 l
.acceptor
.cancel(ec
);
981 // pause and wait for outstanding requests to complete
982 pause_mutex
.lock(ec
);
985 ldout(ctx(), 1) << "frontend failed to pause: " << ec
.message() << dendl
;
987 ldout(ctx(), 4) << "frontend paused" << dendl
;
991 void AsioFrontend::unpause(rgw::sal::RGWRadosStore
* const store
,
992 rgw_auth_registry_ptr_t auth_registry
)
995 env
.auth_registry
= std::move(auth_registry
);
997 // unpause to unblock connections
998 pause_mutex
.unlock();
1000 // start accepting connections again
1001 for (auto& l
: listeners
) {
1002 l
.acceptor
.async_accept(l
.socket
,
1003 [this, &l
] (boost::system::error_code ec
) {
1008 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
1011 } // anonymous namespace
1013 class RGWAsioFrontend::Impl
: public AsioFrontend
{
1015 Impl(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
1016 rgw::dmclock::SchedulerCtx
& sched_ctx
)
1017 : AsioFrontend(env
, conf
, sched_ctx
) {}
1020 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
,
1021 RGWFrontendConfig
* conf
,
1022 rgw::dmclock::SchedulerCtx
& sched_ctx
)
1023 : impl(new Impl(env
, conf
, sched_ctx
))
1027 RGWAsioFrontend::~RGWAsioFrontend() = default;
1029 int RGWAsioFrontend::init()
1031 return impl
->init();
1034 int RGWAsioFrontend::run()
1039 void RGWAsioFrontend::stop()
1044 void RGWAsioFrontend::join()
1049 void RGWAsioFrontend::pause_for_new_config()
1054 void RGWAsioFrontend::unpause_with_new_config(
1055 rgw::sal::RGWRadosStore
* const store
,
1056 rgw_auth_registry_ptr_t auth_registry
1058 impl
->unpause(store
, std::move(auth_registry
));