1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include <boost/asio.hpp>
9 #include <boost/intrusive/list.hpp>
11 #include <boost/context/protected_fixedsize_stack.hpp>
12 #include <spawn/spawn.hpp>
14 #include "common/async/shared_mutex.h"
15 #include "common/errno.h"
16 #include "common/strtol.h"
18 #include "rgw_asio_client.h"
19 #include "rgw_asio_frontend.h"
21 #ifdef WITH_RADOSGW_BEAST_OPENSSL
22 #include <boost/asio/ssl.hpp>
24 #include "services/svc_config_key.h"
25 #include "services/svc_zone.h"
31 #include "rgw_dmclock_async_scheduler.h"
33 #define dout_subsys ceph_subsys_rgw
37 using tcp
= boost::asio::ip::tcp
;
38 namespace http
= boost::beast::http
;
39 #ifdef WITH_RADOSGW_BEAST_OPENSSL
40 namespace ssl
= boost::asio::ssl
;
43 using parse_buffer
= boost::beast::flat_static_buffer
<65536>;
45 // use mmap/mprotect to allocate 512k coroutine stacks
46 auto make_stack_allocator() {
47 return boost::context::protected_fixedsize_stack
{512*1024};
50 template <typename Stream
>
51 class StreamIO
: public rgw::asio::ClientIO
{
52 CephContext
* const cct
;
54 spawn::yield_context yield
;
57 StreamIO(CephContext
*cct
, Stream
& stream
, rgw::asio::parser_type
& parser
,
58 spawn::yield_context yield
,
59 parse_buffer
& buffer
, bool is_ssl
,
60 const tcp::endpoint
& local_endpoint
,
61 const tcp::endpoint
& remote_endpoint
)
62 : ClientIO(parser
, is_ssl
, local_endpoint
, remote_endpoint
),
63 cct(cct
), stream(stream
), yield(yield
), buffer(buffer
)
66 size_t write_data(const char* buf
, size_t len
) override
{
67 boost::system::error_code ec
;
68 auto bytes
= boost::asio::async_write(stream
, boost::asio::buffer(buf
, len
),
71 ldout(cct
, 4) << "write_data failed: " << ec
.message() << dendl
;
72 if (ec
==boost::asio::error::broken_pipe
) {
73 boost::system::error_code ec_ignored
;
74 stream
.lowest_layer().shutdown(tcp::socket::shutdown_both
, ec_ignored
);
76 throw rgw::io::Exception(ec
.value(), std::system_category());
81 size_t recv_body(char* buf
, size_t max
) override
{
82 auto& message
= parser
.get();
83 auto& body_remaining
= message
.body();
84 body_remaining
.data
= buf
;
85 body_remaining
.size
= max
;
87 while (body_remaining
.size
&& !parser
.is_done()) {
88 boost::system::error_code ec
;
89 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
90 if (ec
== http::error::need_buffer
) {
94 ldout(cct
, 4) << "failed to read body: " << ec
.message() << dendl
;
95 throw rgw::io::Exception(ec
.value(), std::system_category());
98 return max
- body_remaining
.size
;
102 // output the http version as a string, ie 'HTTP/1.1'
103 struct http_version
{
106 explicit http_version(unsigned version
)
107 : major_ver(version
/ 10), minor_ver(version
% 10) {}
109 std::ostream
& operator<<(std::ostream
& out
, const http_version
& v
) {
110 return out
<< "HTTP/" << v
.major_ver
<< '.' << v
.minor_ver
;
113 // log an http header value or '-' if it's missing
115 const http::fields
& fields
;
117 std::string_view quote
;
118 log_header(const http::fields
& fields
, http::field field
,
119 std::string_view quote
= "")
120 : fields(fields
), field(field
), quote(quote
) {}
122 std::ostream
& operator<<(std::ostream
& out
, const log_header
& h
) {
123 auto p
= h
.fields
.find(h
.field
);
124 if (p
== h
.fields
.end()) {
127 return out
<< h
.quote
<< p
->value() << h
.quote
;
130 using SharedMutex
= ceph::async::SharedMutex
<boost::asio::io_context::executor_type
>;
132 template <typename Stream
>
133 void handle_connection(boost::asio::io_context
& context
,
134 RGWProcessEnv
& env
, Stream
& stream
,
135 parse_buffer
& buffer
, bool is_ssl
,
136 SharedMutex
& pause_mutex
,
137 rgw::dmclock::Scheduler
*scheduler
,
138 boost::system::error_code
& ec
,
139 spawn::yield_context yield
)
141 // limit header to 4k, since we read it all into a single flat_buffer
142 static constexpr size_t header_limit
= 4096;
143 // don't impose a limit on the body, since we read it in pieces
144 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
146 auto cct
= env
.store
->ctx();
148 // read messages from the stream until eof
150 // configure the parser
151 rgw::asio::parser_type parser
;
152 parser
.header_limit(header_limit
);
153 parser
.body_limit(body_limit
);
156 http::async_read_header(stream
, buffer
, parser
, yield
[ec
]);
157 if (ec
== boost::asio::error::connection_reset
||
158 ec
== boost::asio::error::bad_descriptor
||
159 ec
== boost::asio::error::operation_aborted
||
160 #ifdef WITH_RADOSGW_BEAST_OPENSSL
161 ec
== ssl::error::stream_truncated
||
163 ec
== http::error::end_of_stream
) {
164 ldout(cct
, 20) << "failed to read header: " << ec
.message() << dendl
;
167 auto& message
= parser
.get();
169 ldout(cct
, 1) << "failed to read header: " << ec
.message() << dendl
;
170 http::response
<http::empty_body
> response
;
171 response
.result(http::status::bad_request
);
172 response
.version(message
.version() == 10 ? 10 : 11);
173 response
.prepare_payload();
174 http::async_write(stream
, response
, yield
[ec
]);
176 ldout(cct
, 5) << "failed to write response: " << ec
.message() << dendl
;
178 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
183 auto lock
= pause_mutex
.async_lock_shared(yield
[ec
]);
184 if (ec
== boost::asio::error::operation_aborted
) {
187 ldout(cct
, 1) << "failed to lock: " << ec
.message() << dendl
;
191 // process the request
192 RGWRequest req
{env
.store
->getRados()->get_new_req_id()};
194 auto& socket
= stream
.lowest_layer();
195 const auto& remote_endpoint
= socket
.remote_endpoint(ec
);
197 ldout(cct
, 1) << "failed to connect client: " << ec
.message() << dendl
;
201 StreamIO real_client
{cct
, stream
, parser
, yield
, buffer
, is_ssl
,
202 socket
.local_endpoint(),
205 auto real_client_io
= rgw::io::add_reordering(
206 rgw::io::add_buffering(cct
,
207 rgw::io::add_chunking(
208 rgw::io::add_conlen_controlling(
210 RGWRestfulIO
client(cct
, &real_client_io
);
211 auto y
= optional_yield
{context
, yield
};
213 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
214 *env
.auth_registry
, &client
, env
.olog
, y
,
215 scheduler
, &http_ret
);
217 if (cct
->_conf
->subsys
.should_gather(dout_subsys
, 1)) {
218 // access log line elements begin per Apache Combined Log Format with additions following
219 const auto now
= ceph::coarse_real_clock::now();
220 using ceph::operator<<; // for coarse_real_time
221 ldout(cct
, 1) << "beast: " << hex
<< &req
<< dec
<< ": "
222 << remote_endpoint
.address() << " - - [" << now
<< "] \""
223 << message
.method_string() << ' ' << message
.target() << ' '
224 << http_version
{message
.version()} << "\" " << http_ret
<< ' '
225 << client
.get_bytes_sent() + client
.get_bytes_received() << ' '
226 << log_header
{message
, http::field::referer
, "\""} << ' '
227 << log_header
{message
, http::field::user_agent
, "\""} << ' '
228 << log_header
{message
, http::field::range
} << dendl
;
232 if (!parser
.keep_alive()) {
236 // if we failed before reading the entire message, discard any remaining
237 // bytes before reading the next
238 while (!parser
.is_done()) {
239 static std::array
<char, 1024> discard_buffer
;
241 auto& body
= parser
.get().body();
242 body
.size
= discard_buffer
.size();
243 body
.data
= discard_buffer
.data();
245 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
246 if (ec
== http::error::need_buffer
) {
249 if (ec
== boost::asio::error::connection_reset
) {
253 ldout(cct
, 5) << "failed to discard unread message: "
254 << ec
.message() << dendl
;
261 struct Connection
: boost::intrusive::list_base_hook
<> {
263 Connection(tcp::socket
& socket
) : socket(socket
) {}
266 class ConnectionList
{
267 using List
= boost::intrusive::list
<Connection
>;
271 void remove(Connection
& c
) {
272 std::lock_guard lock
{mutex
};
274 connections
.erase(List::s_iterator_to(c
));
279 ConnectionList
*list
;
282 Guard(ConnectionList
*list
, Connection
*conn
) : list(list
), conn(conn
) {}
283 ~Guard() { list
->remove(*conn
); }
285 [[nodiscard
]] Guard
add(Connection
& conn
) {
286 std::lock_guard lock
{mutex
};
287 connections
.push_back(conn
);
288 return Guard
{this, &conn
};
290 void close(boost::system::error_code
& ec
) {
291 std::lock_guard lock
{mutex
};
292 for (auto& conn
: connections
) {
293 conn
.socket
.close(ec
);
299 namespace dmc
= rgw::dmclock
;
302 RGWFrontendConfig
* conf
;
303 boost::asio::io_context context
;
304 #ifdef WITH_RADOSGW_BEAST_OPENSSL
305 boost::optional
<ssl::context
> ssl_context
;
306 int get_config_key_val(string name
,
309 int ssl_set_private_key(const string
& name
, bool is_ssl_cert
);
310 int ssl_set_certificate_chain(const string
& name
);
313 SharedMutex pause_mutex
;
314 std::unique_ptr
<rgw::dmclock::Scheduler
> scheduler
;
317 tcp::endpoint endpoint
;
318 tcp::acceptor acceptor
;
320 bool use_ssl
= false;
321 bool use_nodelay
= false;
323 explicit Listener(boost::asio::io_context
& context
)
324 : acceptor(context
), socket(context
) {}
326 std::vector
<Listener
> listeners
;
328 ConnectionList connections
;
330 // work guard to keep run() threads busy while listeners are paused
331 using Executor
= boost::asio::io_context::executor_type
;
332 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
334 std::vector
<std::thread
> threads
;
335 std::atomic
<bool> going_down
{false};
337 CephContext
* ctx() const { return env
.store
->ctx(); }
338 std::optional
<dmc::ClientCounters
> client_counters
;
339 std::unique_ptr
<dmc::ClientConfig
> client_config
;
340 void accept(Listener
& listener
, boost::system::error_code ec
);
343 AsioFrontend(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
344 dmc::SchedulerCtx
& sched_ctx
)
345 : env(env
), conf(conf
), pause_mutex(context
.get_executor())
347 auto sched_t
= dmc::get_scheduler_t(ctx());
349 case dmc::scheduler_t::dmclock
:
350 scheduler
.reset(new dmc::AsyncScheduler(ctx(),
352 std::ref(sched_ctx
.get_dmc_client_counters()),
353 sched_ctx
.get_dmc_client_config(),
354 *sched_ctx
.get_dmc_client_config(),
355 dmc::AtLimit::Reject
));
357 case dmc::scheduler_t::none
:
358 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl
;
360 case dmc::scheduler_t::throttler
:
361 scheduler
.reset(new dmc::SimpleThrottler(ctx()));
371 void unpause(rgw::sal::RGWRadosStore
* store
, rgw_auth_registry_ptr_t
);
374 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
377 auto port
= std::strtoul(input
, &end
, 10);
378 if (port
> std::numeric_limits
<unsigned short>::max()) {
379 ec
.assign(ERANGE
, boost::system::system_category());
380 } else if (port
== 0 && end
== input
) {
381 ec
.assign(EINVAL
, boost::system::system_category());
386 tcp::endpoint
parse_endpoint(boost::asio::string_view input
,
387 unsigned short default_port
,
388 boost::system::error_code
& ec
)
390 tcp::endpoint endpoint
;
393 ec
= boost::asio::error::invalid_argument
;
397 if (input
[0] == '[') { // ipv6
398 const size_t addr_begin
= 1;
399 const size_t addr_end
= input
.find(']');
400 if (addr_end
== input
.npos
) { // no matching ]
401 ec
= boost::asio::error::invalid_argument
;
404 if (addr_end
+ 1 < input
.size()) {
405 // :port must must follow [ipv6]
406 if (input
[addr_end
+ 1] != ':') {
407 ec
= boost::asio::error::invalid_argument
;
410 auto port_str
= input
.substr(addr_end
+ 2);
411 endpoint
.port(parse_port(port_str
.data(), ec
));
414 endpoint
.port(default_port
);
416 auto addr
= input
.substr(addr_begin
, addr_end
- addr_begin
);
417 endpoint
.address(boost::asio::ip::make_address_v6(addr
, ec
));
419 auto colon
= input
.find(':');
420 if (colon
!= input
.npos
) {
421 auto port_str
= input
.substr(colon
+ 1);
422 endpoint
.port(parse_port(port_str
.data(), ec
));
427 endpoint
.port(default_port
);
429 auto addr
= input
.substr(0, colon
);
430 endpoint
.address(boost::asio::ip::make_address_v4(addr
, ec
));
435 static int drop_privileges(CephContext
*ctx
)
437 uid_t uid
= ctx
->get_set_uid();
438 gid_t gid
= ctx
->get_set_gid();
439 std::string uid_string
= ctx
->get_set_uid_string();
440 std::string gid_string
= ctx
->get_set_gid_string();
441 if (gid
&& setgid(gid
) != 0) {
443 ldout(ctx
, -1) << "unable to setgid " << gid
<< ": " << cpp_strerror(err
) << dendl
;
446 if (uid
&& setuid(uid
) != 0) {
448 ldout(ctx
, -1) << "unable to setuid " << uid
<< ": " << cpp_strerror(err
) << dendl
;
452 ldout(ctx
, 0) << "set uid:gid to " << uid
<< ":" << gid
453 << " (" << uid_string
<< ":" << gid_string
<< ")" << dendl
;
458 int AsioFrontend::init()
460 boost::system::error_code ec
;
461 auto& config
= conf
->get_config_map();
463 #ifdef WITH_RADOSGW_BEAST_OPENSSL
471 auto ports
= config
.equal_range("port");
472 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
473 auto port
= parse_port(i
->second
.c_str(), ec
);
475 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
478 listeners
.emplace_back(context
);
479 listeners
.back().endpoint
.port(port
);
481 listeners
.emplace_back(context
);
482 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
485 auto endpoints
= config
.equal_range("endpoint");
486 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
487 auto endpoint
= parse_endpoint(i
->second
, 80, ec
);
489 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
492 listeners
.emplace_back(context
);
493 listeners
.back().endpoint
= endpoint
;
496 auto nodelay
= config
.find("tcp_nodelay");
497 if (nodelay
!= config
.end()) {
498 for (auto& l
: listeners
) {
499 l
.use_nodelay
= (nodelay
->second
== "1");
504 bool socket_bound
= false;
506 for (auto& l
: listeners
) {
507 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
509 if (ec
== boost::asio::error::address_family_not_supported
) {
510 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l
.endpoint
511 << ", " << ec
.message() << dendl
;
515 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
519 if (l
.endpoint
.protocol() == tcp::v6()) {
520 l
.acceptor
.set_option(boost::asio::ip::v6_only(true), ec
);
522 lderr(ctx()) << "failed to set v6_only socket option: "
523 << ec
.message() << dendl
;
528 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
529 l
.acceptor
.bind(l
.endpoint
, ec
);
531 lderr(ctx()) << "failed to bind address " << l
.endpoint
532 << ": " << ec
.message() << dendl
;
536 auto it
= config
.find("max_connection_backlog");
537 auto max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
538 if (it
!= config
.end()) {
540 max_connection_backlog
= strict_strtol(it
->second
.c_str(), 10, &err
);
542 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it
->second
<< dendl
;
543 max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
546 l
.acceptor
.listen(max_connection_backlog
);
547 l
.acceptor
.async_accept(l
.socket
,
548 [this, &l
] (boost::system::error_code ec
) {
552 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
556 lderr(ctx()) << "Unable to listen at any endpoints" << dendl
;
560 return drop_privileges(ctx());
563 #ifdef WITH_RADOSGW_BEAST_OPENSSL
565 static string config_val_prefix
= "config://";
569 class ExpandMetaVar
{
570 map
<string
, string
> meta_map
;
573 ExpandMetaVar(RGWSI_Zone
*zone_svc
) {
574 meta_map
["realm"] = zone_svc
->get_realm().get_name();
575 meta_map
["realm_id"] = zone_svc
->get_realm().get_id();
576 meta_map
["zonegroup"] = zone_svc
->get_zonegroup().get_name();
577 meta_map
["zonegroup_id"] = zone_svc
->get_zonegroup().get_id();
578 meta_map
["zone"] = zone_svc
->zone_name();
579 meta_map
["zone_id"] = zone_svc
->zone_id().id
;
582 string
process_str(const string
& in
);
585 string
ExpandMetaVar::process_str(const string
& in
)
587 if (meta_map
.empty()) {
591 auto pos
= in
.find('$');
592 if (pos
== std::string::npos
) {
597 decltype(pos
) last_pos
= 0;
599 while (pos
!= std::string::npos
) {
600 if (pos
> last_pos
) {
601 out
+= in
.substr(last_pos
, pos
- last_pos
);
605 const char *valid_chars
= "abcdefghijklmnopqrstuvwxyz_";
608 if (in
[pos
+1] == '{') {
610 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 2);
611 if (endpos
!= std::string::npos
&&
613 var
= in
.substr(pos
+ 2, endpos
- pos
- 2);
618 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 1);
619 if (endpos
!= std::string::npos
)
620 var
= in
.substr(pos
+ 1, endpos
- pos
- 1);
622 var
= in
.substr(pos
+ 1);
624 string var_source
= in
.substr(pos
, endpos
- pos
);
627 auto iter
= meta_map
.find(var
);
628 if (iter
!= meta_map
.end()) {
633 pos
= in
.find('$', last_pos
);
635 if (last_pos
!= std::string::npos
) {
636 out
+= in
.substr(last_pos
);
642 } /* anonymous namespace */
644 int AsioFrontend::get_config_key_val(string name
,
649 lderr(ctx()) << "bad " << type
<< " config value" << dendl
;
653 auto svc
= env
.store
->svc()->config_key
;
654 int r
= svc
->get(name
, true, pbl
);
656 lderr(ctx()) << type
<< " was not found: " << name
<< dendl
;
662 int AsioFrontend::ssl_set_private_key(const string
& name
, bool is_ssl_certificate
)
664 boost::system::error_code ec
;
666 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
667 ssl_context
->use_private_key_file(name
, ssl::context::pem
, ec
);
670 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
676 ssl_context
->use_private_key(boost::asio::buffer(bl
.c_str(), bl
.length()),
677 ssl::context::pem
, ec
);
681 if (!is_ssl_certificate
) {
682 lderr(ctx()) << "failed to add ssl_private_key=" << name
683 << ": " << ec
.message() << dendl
;
685 lderr(ctx()) << "failed to use ssl_certificate=" << name
686 << " as a private key: " << ec
.message() << dendl
;
694 int AsioFrontend::ssl_set_certificate_chain(const string
& name
)
696 boost::system::error_code ec
;
698 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
699 ssl_context
->use_certificate_chain_file(name
, ec
);
702 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
708 ssl_context
->use_certificate_chain(boost::asio::buffer(bl
.c_str(), bl
.length()),
713 lderr(ctx()) << "failed to use ssl_certificate=" << name
714 << ": " << ec
.message() << dendl
;
721 int AsioFrontend::init_ssl()
723 boost::system::error_code ec
;
724 auto& config
= conf
->get_config_map();
727 std::optional
<string
> cert
= conf
->get_val("ssl_certificate");
729 // only initialize the ssl context if it's going to be used
730 ssl_context
= boost::in_place(ssl::context::tls
);
733 std::optional
<string
> key
= conf
->get_val("ssl_private_key");
734 bool have_cert
= false;
737 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl
;
741 auto ports
= config
.equal_range("ssl_port");
742 auto endpoints
= config
.equal_range("ssl_endpoint");
745 * don't try to config certificate if frontend isn't configured for ssl
747 if (ports
.first
== ports
.second
&&
748 endpoints
.first
== endpoints
.second
) {
752 bool key_is_cert
= false;
760 ExpandMetaVar
emv(env
.store
->svc()->zone
);
762 cert
= emv
.process_str(*cert
);
763 key
= emv
.process_str(*key
);
765 int r
= ssl_set_private_key(*key
, key_is_cert
);
766 bool have_private_key
= (r
>= 0);
769 r
= ssl_set_private_key(*cert
, true);
770 have_private_key
= (r
>= 0);
774 if (have_private_key
) {
775 int r
= ssl_set_certificate_chain(*cert
);
776 have_cert
= (r
>= 0);
780 // parse ssl endpoints
781 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
783 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl
;
786 auto port
= parse_port(i
->second
.c_str(), ec
);
788 lderr(ctx()) << "failed to parse ssl_port=" << i
->second
<< dendl
;
791 listeners
.emplace_back(context
);
792 listeners
.back().endpoint
.port(port
);
793 listeners
.back().use_ssl
= true;
795 listeners
.emplace_back(context
);
796 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
797 listeners
.back().use_ssl
= true;
800 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
802 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl
;
805 auto endpoint
= parse_endpoint(i
->second
, 443, ec
);
807 lderr(ctx()) << "failed to parse ssl_endpoint=" << i
->second
<< dendl
;
810 listeners
.emplace_back(context
);
811 listeners
.back().endpoint
= endpoint
;
812 listeners
.back().use_ssl
= true;
816 #endif // WITH_RADOSGW_BEAST_OPENSSL
818 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
820 if (!l
.acceptor
.is_open()) {
822 } else if (ec
== boost::asio::error::operation_aborted
) {
825 ldout(ctx(), 1) << "accept failed: " << ec
.message() << dendl
;
828 auto socket
= std::move(l
.socket
);
829 tcp::no_delay
options(l
.use_nodelay
);
830 socket
.set_option(options
,ec
);
831 l
.acceptor
.async_accept(l
.socket
,
832 [this, &l
] (boost::system::error_code ec
) {
836 // spawn a coroutine to handle the connection
837 #ifdef WITH_RADOSGW_BEAST_OPENSSL
839 spawn::spawn(context
,
840 [this, s
=std::move(socket
)] (spawn::yield_context yield
) mutable {
842 auto c
= connections
.add(conn
);
843 // wrap the socket in an ssl stream
844 ssl::stream
<tcp::socket
&> stream
{s
, *ssl_context
};
845 auto buffer
= std::make_unique
<parse_buffer
>();
847 boost::system::error_code ec
;
848 auto bytes
= stream
.async_handshake(ssl::stream_base::server
,
849 buffer
->data(), yield
[ec
]);
851 ldout(ctx(), 1) << "ssl handshake failed: " << ec
.message() << dendl
;
854 buffer
->consume(bytes
);
855 handle_connection(context
, env
, stream
, *buffer
, true, pause_mutex
,
856 scheduler
.get(), ec
, yield
);
858 // ssl shutdown (ignoring errors)
859 stream
.async_shutdown(yield
[ec
]);
861 s
.shutdown(tcp::socket::shutdown_both
, ec
);
862 }, make_stack_allocator());
866 #endif // WITH_RADOSGW_BEAST_OPENSSL
867 spawn::spawn(context
,
868 [this, s
=std::move(socket
)] (spawn::yield_context yield
) mutable {
870 auto c
= connections
.add(conn
);
871 auto buffer
= std::make_unique
<parse_buffer
>();
872 boost::system::error_code ec
;
873 handle_connection(context
, env
, s
, *buffer
, false, pause_mutex
,
874 scheduler
.get(), ec
, yield
);
875 s
.shutdown(tcp::socket::shutdown_both
, ec
);
876 }, make_stack_allocator());
880 int AsioFrontend::run()
883 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
884 threads
.reserve(thread_count
);
886 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
888 // the worker threads call io_context::run(), which will return when there's
889 // no work left. hold a work guard to keep these threads going until join()
890 work
.emplace(boost::asio::make_work_guard(context
));
892 for (int i
= 0; i
< thread_count
; i
++) {
893 threads
.emplace_back([=] {
894 // request warnings on synchronous librados calls in this thread
895 is_asio_thread
= true;
896 boost::system::error_code ec
;
903 void AsioFrontend::stop()
905 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
909 boost::system::error_code ec
;
910 // close all listeners
911 for (auto& listener
: listeners
) {
912 listener
.acceptor
.close(ec
);
914 // close all connections
915 connections
.close(ec
);
916 pause_mutex
.cancel();
919 void AsioFrontend::join()
926 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
927 for (auto& thread
: threads
) {
930 ldout(ctx(), 4) << "frontend done" << dendl
;
933 void AsioFrontend::pause()
935 ldout(ctx(), 4) << "frontend pausing connections..." << dendl
;
937 // cancel pending calls to accept(), but don't close the sockets
938 boost::system::error_code ec
;
939 for (auto& l
: listeners
) {
940 l
.acceptor
.cancel(ec
);
943 // pause and wait for outstanding requests to complete
944 pause_mutex
.lock(ec
);
947 ldout(ctx(), 1) << "frontend failed to pause: " << ec
.message() << dendl
;
949 ldout(ctx(), 4) << "frontend paused" << dendl
;
953 void AsioFrontend::unpause(rgw::sal::RGWRadosStore
* const store
,
954 rgw_auth_registry_ptr_t auth_registry
)
957 env
.auth_registry
= std::move(auth_registry
);
959 // unpause to unblock connections
960 pause_mutex
.unlock();
962 // start accepting connections again
963 for (auto& l
: listeners
) {
964 l
.acceptor
.async_accept(l
.socket
,
965 [this, &l
] (boost::system::error_code ec
) {
970 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
973 } // anonymous namespace
975 class RGWAsioFrontend::Impl
: public AsioFrontend
{
977 Impl(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
978 rgw::dmclock::SchedulerCtx
& sched_ctx
)
979 : AsioFrontend(env
, conf
, sched_ctx
) {}
982 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
,
983 RGWFrontendConfig
* conf
,
984 rgw::dmclock::SchedulerCtx
& sched_ctx
)
985 : impl(new Impl(env
, conf
, sched_ctx
))
989 RGWAsioFrontend::~RGWAsioFrontend() = default;
991 int RGWAsioFrontend::init()
996 int RGWAsioFrontend::run()
1001 void RGWAsioFrontend::stop()
1006 void RGWAsioFrontend::join()
1011 void RGWAsioFrontend::pause_for_new_config()
1016 void RGWAsioFrontend::unpause_with_new_config(
1017 rgw::sal::RGWRadosStore
* const store
,
1018 rgw_auth_registry_ptr_t auth_registry
1020 impl
->unpause(store
, std::move(auth_registry
));