1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #include <boost/asio.hpp>
9 #include <boost/intrusive/list.hpp>
11 #include <boost/context/protected_fixedsize_stack.hpp>
12 #include <spawn/spawn.hpp>
14 #include "common/async/shared_mutex.h"
15 #include "common/errno.h"
16 #include "common/strtol.h"
18 #include "rgw_asio_client.h"
19 #include "rgw_asio_frontend.h"
21 #ifdef WITH_RADOSGW_BEAST_OPENSSL
22 #include <boost/asio/ssl.hpp>
24 #include "services/svc_config_key.h"
25 #include "services/svc_zone.h"
31 #include "rgw_dmclock_async_scheduler.h"
33 #define dout_subsys ceph_subsys_rgw
37 using tcp
= boost::asio::ip::tcp
;
38 namespace http
= boost::beast::http
;
39 #ifdef WITH_RADOSGW_BEAST_OPENSSL
40 namespace ssl
= boost::asio::ssl
;
43 using parse_buffer
= boost::beast::flat_static_buffer
<65536>;
45 // use mmap/mprotect to allocate 512k coroutine stacks
46 auto make_stack_allocator() {
47 return boost::context::protected_fixedsize_stack
{512*1024};
50 template <typename Stream
>
51 class StreamIO
: public rgw::asio::ClientIO
{
52 CephContext
* const cct
;
54 spawn::yield_context yield
;
57 StreamIO(CephContext
*cct
, Stream
& stream
, rgw::asio::parser_type
& parser
,
58 spawn::yield_context yield
,
59 parse_buffer
& buffer
, bool is_ssl
,
60 const tcp::endpoint
& local_endpoint
,
61 const tcp::endpoint
& remote_endpoint
)
62 : ClientIO(parser
, is_ssl
, local_endpoint
, remote_endpoint
),
63 cct(cct
), stream(stream
), yield(yield
), buffer(buffer
)
66 size_t write_data(const char* buf
, size_t len
) override
{
67 boost::system::error_code ec
;
68 auto bytes
= boost::asio::async_write(stream
, boost::asio::buffer(buf
, len
),
71 ldout(cct
, 4) << "write_data failed: " << ec
.message() << dendl
;
72 throw rgw::io::Exception(ec
.value(), std::system_category());
77 size_t recv_body(char* buf
, size_t max
) override
{
78 auto& message
= parser
.get();
79 auto& body_remaining
= message
.body();
80 body_remaining
.data
= buf
;
81 body_remaining
.size
= max
;
83 while (body_remaining
.size
&& !parser
.is_done()) {
84 boost::system::error_code ec
;
85 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
86 if (ec
== http::error::need_buffer
) {
90 ldout(cct
, 4) << "failed to read body: " << ec
.message() << dendl
;
91 throw rgw::io::Exception(ec
.value(), std::system_category());
94 return max
- body_remaining
.size
;
98 using SharedMutex
= ceph::async::SharedMutex
<boost::asio::io_context::executor_type
>;
100 template <typename Stream
>
101 void handle_connection(boost::asio::io_context
& context
,
102 RGWProcessEnv
& env
, Stream
& stream
,
103 parse_buffer
& buffer
, bool is_ssl
,
104 SharedMutex
& pause_mutex
,
105 rgw::dmclock::Scheduler
*scheduler
,
106 boost::system::error_code
& ec
,
107 spawn::yield_context yield
)
109 // limit header to 4k, since we read it all into a single flat_buffer
110 static constexpr size_t header_limit
= 4096;
111 // don't impose a limit on the body, since we read it in pieces
112 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
114 auto cct
= env
.store
->ctx();
116 // read messages from the stream until eof
118 // configure the parser
119 rgw::asio::parser_type parser
;
120 parser
.header_limit(header_limit
);
121 parser
.body_limit(body_limit
);
124 http::async_read_header(stream
, buffer
, parser
, yield
[ec
]);
125 if (ec
== boost::asio::error::connection_reset
||
126 ec
== boost::asio::error::bad_descriptor
||
127 ec
== boost::asio::error::operation_aborted
||
128 #ifdef WITH_RADOSGW_BEAST_OPENSSL
129 ec
== ssl::error::stream_truncated
||
131 ec
== http::error::end_of_stream
) {
132 ldout(cct
, 20) << "failed to read header: " << ec
.message() << dendl
;
136 ldout(cct
, 1) << "failed to read header: " << ec
.message() << dendl
;
137 auto& message
= parser
.get();
138 http::response
<http::empty_body
> response
;
139 response
.result(http::status::bad_request
);
140 response
.version(message
.version() == 10 ? 10 : 11);
141 response
.prepare_payload();
142 http::async_write(stream
, response
, yield
[ec
]);
144 ldout(cct
, 5) << "failed to write response: " << ec
.message() << dendl
;
146 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
151 auto lock
= pause_mutex
.async_lock_shared(yield
[ec
]);
152 if (ec
== boost::asio::error::operation_aborted
) {
155 ldout(cct
, 1) << "failed to lock: " << ec
.message() << dendl
;
159 // process the request
160 RGWRequest req
{env
.store
->getRados()->get_new_req_id()};
162 auto& socket
= stream
.lowest_layer();
163 const auto& remote_endpoint
= socket
.remote_endpoint(ec
);
165 ldout(cct
, 1) << "failed to connect client: " << ec
.message() << dendl
;
169 StreamIO real_client
{cct
, stream
, parser
, yield
, buffer
, is_ssl
,
170 socket
.local_endpoint(),
173 auto real_client_io
= rgw::io::add_reordering(
174 rgw::io::add_buffering(cct
,
175 rgw::io::add_chunking(
176 rgw::io::add_conlen_controlling(
178 RGWRestfulIO
client(cct
, &real_client_io
);
179 auto y
= optional_yield
{context
, yield
};
180 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
181 *env
.auth_registry
, &client
, env
.olog
, y
, scheduler
);
184 if (!parser
.keep_alive()) {
188 // if we failed before reading the entire message, discard any remaining
189 // bytes before reading the next
190 while (!parser
.is_done()) {
191 static std::array
<char, 1024> discard_buffer
;
193 auto& body
= parser
.get().body();
194 body
.size
= discard_buffer
.size();
195 body
.data
= discard_buffer
.data();
197 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
198 if (ec
== http::error::need_buffer
) {
201 if (ec
== boost::asio::error::connection_reset
) {
205 ldout(cct
, 5) << "failed to discard unread message: "
206 << ec
.message() << dendl
;
213 struct Connection
: boost::intrusive::list_base_hook
<> {
215 Connection(tcp::socket
& socket
) : socket(socket
) {}
218 class ConnectionList
{
219 using List
= boost::intrusive::list
<Connection
>;
223 void remove(Connection
& c
) {
224 std::lock_guard lock
{mutex
};
226 connections
.erase(List::s_iterator_to(c
));
231 ConnectionList
*list
;
234 Guard(ConnectionList
*list
, Connection
*conn
) : list(list
), conn(conn
) {}
235 ~Guard() { list
->remove(*conn
); }
237 [[nodiscard
]] Guard
add(Connection
& conn
) {
238 std::lock_guard lock
{mutex
};
239 connections
.push_back(conn
);
240 return Guard
{this, &conn
};
242 void close(boost::system::error_code
& ec
) {
243 std::lock_guard lock
{mutex
};
244 for (auto& conn
: connections
) {
245 conn
.socket
.close(ec
);
251 namespace dmc
= rgw::dmclock
;
254 RGWFrontendConfig
* conf
;
255 boost::asio::io_context context
;
256 #ifdef WITH_RADOSGW_BEAST_OPENSSL
257 boost::optional
<ssl::context
> ssl_context
;
258 int get_config_key_val(string name
,
261 int ssl_set_private_key(const string
& name
, bool is_ssl_cert
);
262 int ssl_set_certificate_chain(const string
& name
);
265 SharedMutex pause_mutex
;
266 std::unique_ptr
<rgw::dmclock::Scheduler
> scheduler
;
269 tcp::endpoint endpoint
;
270 tcp::acceptor acceptor
;
272 bool use_ssl
= false;
273 bool use_nodelay
= false;
275 explicit Listener(boost::asio::io_context
& context
)
276 : acceptor(context
), socket(context
) {}
278 std::vector
<Listener
> listeners
;
280 ConnectionList connections
;
282 // work guard to keep run() threads busy while listeners are paused
283 using Executor
= boost::asio::io_context::executor_type
;
284 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
286 std::vector
<std::thread
> threads
;
287 std::atomic
<bool> going_down
{false};
289 CephContext
* ctx() const { return env
.store
->ctx(); }
290 std::optional
<dmc::ClientCounters
> client_counters
;
291 std::unique_ptr
<dmc::ClientConfig
> client_config
;
292 void accept(Listener
& listener
, boost::system::error_code ec
);
295 AsioFrontend(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
296 dmc::SchedulerCtx
& sched_ctx
)
297 : env(env
), conf(conf
), pause_mutex(context
.get_executor())
299 auto sched_t
= dmc::get_scheduler_t(ctx());
301 case dmc::scheduler_t::dmclock
:
302 scheduler
.reset(new dmc::AsyncScheduler(ctx(),
304 std::ref(sched_ctx
.get_dmc_client_counters()),
305 sched_ctx
.get_dmc_client_config(),
306 *sched_ctx
.get_dmc_client_config(),
307 dmc::AtLimit::Reject
));
309 case dmc::scheduler_t::none
:
310 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl
;
312 case dmc::scheduler_t::throttler
:
313 scheduler
.reset(new dmc::SimpleThrottler(ctx()));
323 void unpause(rgw::sal::RGWRadosStore
* store
, rgw_auth_registry_ptr_t
);
326 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
329 auto port
= std::strtoul(input
, &end
, 10);
330 if (port
> std::numeric_limits
<unsigned short>::max()) {
331 ec
.assign(ERANGE
, boost::system::system_category());
332 } else if (port
== 0 && end
== input
) {
333 ec
.assign(EINVAL
, boost::system::system_category());
338 tcp::endpoint
parse_endpoint(boost::asio::string_view input
,
339 unsigned short default_port
,
340 boost::system::error_code
& ec
)
342 tcp::endpoint endpoint
;
345 ec
= boost::asio::error::invalid_argument
;
349 if (input
[0] == '[') { // ipv6
350 const size_t addr_begin
= 1;
351 const size_t addr_end
= input
.find(']');
352 if (addr_end
== input
.npos
) { // no matching ]
353 ec
= boost::asio::error::invalid_argument
;
356 if (addr_end
+ 1 < input
.size()) {
357 // :port must must follow [ipv6]
358 if (input
[addr_end
+ 1] != ':') {
359 ec
= boost::asio::error::invalid_argument
;
362 auto port_str
= input
.substr(addr_end
+ 2);
363 endpoint
.port(parse_port(port_str
.data(), ec
));
366 endpoint
.port(default_port
);
368 auto addr
= input
.substr(addr_begin
, addr_end
- addr_begin
);
369 endpoint
.address(boost::asio::ip::make_address_v6(addr
, ec
));
371 auto colon
= input
.find(':');
372 if (colon
!= input
.npos
) {
373 auto port_str
= input
.substr(colon
+ 1);
374 endpoint
.port(parse_port(port_str
.data(), ec
));
379 endpoint
.port(default_port
);
381 auto addr
= input
.substr(0, colon
);
382 endpoint
.address(boost::asio::ip::make_address_v4(addr
, ec
));
387 static int drop_privileges(CephContext
*ctx
)
389 uid_t uid
= ctx
->get_set_uid();
390 gid_t gid
= ctx
->get_set_gid();
391 std::string uid_string
= ctx
->get_set_uid_string();
392 std::string gid_string
= ctx
->get_set_gid_string();
393 if (gid
&& setgid(gid
) != 0) {
395 ldout(ctx
, -1) << "unable to setgid " << gid
<< ": " << cpp_strerror(err
) << dendl
;
398 if (uid
&& setuid(uid
) != 0) {
400 ldout(ctx
, -1) << "unable to setuid " << uid
<< ": " << cpp_strerror(err
) << dendl
;
404 ldout(ctx
, 0) << "set uid:gid to " << uid
<< ":" << gid
405 << " (" << uid_string
<< ":" << gid_string
<< ")" << dendl
;
410 int AsioFrontend::init()
412 boost::system::error_code ec
;
413 auto& config
= conf
->get_config_map();
415 #ifdef WITH_RADOSGW_BEAST_OPENSSL
423 auto ports
= config
.equal_range("port");
424 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
425 auto port
= parse_port(i
->second
.c_str(), ec
);
427 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
430 listeners
.emplace_back(context
);
431 listeners
.back().endpoint
.port(port
);
433 listeners
.emplace_back(context
);
434 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
437 auto endpoints
= config
.equal_range("endpoint");
438 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
439 auto endpoint
= parse_endpoint(i
->second
, 80, ec
);
441 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
444 listeners
.emplace_back(context
);
445 listeners
.back().endpoint
= endpoint
;
448 auto nodelay
= config
.find("tcp_nodelay");
449 if (nodelay
!= config
.end()) {
450 for (auto& l
: listeners
) {
451 l
.use_nodelay
= (nodelay
->second
== "1");
456 bool socket_bound
= false;
458 for (auto& l
: listeners
) {
459 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
461 if (ec
== boost::asio::error::address_family_not_supported
) {
462 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l
.endpoint
463 << ", " << ec
.message() << dendl
;
467 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
471 if (l
.endpoint
.protocol() == tcp::v6()) {
472 l
.acceptor
.set_option(boost::asio::ip::v6_only(true), ec
);
474 lderr(ctx()) << "failed to set v6_only socket option: "
475 << ec
.message() << dendl
;
480 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
481 l
.acceptor
.bind(l
.endpoint
, ec
);
483 lderr(ctx()) << "failed to bind address " << l
.endpoint
484 << ": " << ec
.message() << dendl
;
488 auto it
= config
.find("max_connection_backlog");
489 auto max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
490 if (it
!= config
.end()) {
492 max_connection_backlog
= strict_strtol(it
->second
.c_str(), 10, &err
);
494 ldout(ctx(), 0) << "WARNING: invalid value for max_connection_backlog=" << it
->second
<< dendl
;
495 max_connection_backlog
= boost::asio::socket_base::max_listen_connections
;
498 l
.acceptor
.listen(max_connection_backlog
);
499 l
.acceptor
.async_accept(l
.socket
,
500 [this, &l
] (boost::system::error_code ec
) {
504 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
508 lderr(ctx()) << "Unable to listen at any endpoints" << dendl
;
512 return drop_privileges(ctx());
515 #ifdef WITH_RADOSGW_BEAST_OPENSSL
517 static string config_val_prefix
= "config://";
521 class ExpandMetaVar
{
522 map
<string
, string
> meta_map
;
525 ExpandMetaVar(RGWSI_Zone
*zone_svc
) {
526 meta_map
["realm"] = zone_svc
->get_realm().get_name();
527 meta_map
["realm_id"] = zone_svc
->get_realm().get_id();
528 meta_map
["zonegroup"] = zone_svc
->get_zonegroup().get_name();
529 meta_map
["zonegroup_id"] = zone_svc
->get_zonegroup().get_id();
530 meta_map
["zone"] = zone_svc
->zone_name();
531 meta_map
["zone_id"] = zone_svc
->zone_id().id
;
534 string
process_str(const string
& in
);
537 string
ExpandMetaVar::process_str(const string
& in
)
539 if (meta_map
.empty()) {
543 auto pos
= in
.find('$');
544 if (pos
== std::string::npos
) {
549 decltype(pos
) last_pos
= 0;
551 while (pos
!= std::string::npos
) {
552 if (pos
> last_pos
) {
553 out
+= in
.substr(last_pos
, pos
- last_pos
);
557 const char *valid_chars
= "abcdefghijklmnopqrstuvwxyz_";
560 if (in
[pos
+1] == '{') {
562 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 2);
563 if (endpos
!= std::string::npos
&&
565 var
= in
.substr(pos
+ 2, endpos
- pos
- 2);
570 endpos
= in
.find_first_not_of(valid_chars
, pos
+ 1);
571 if (endpos
!= std::string::npos
)
572 var
= in
.substr(pos
+ 1, endpos
- pos
- 1);
574 var
= in
.substr(pos
+ 1);
576 string var_source
= in
.substr(pos
, endpos
- pos
);
579 auto iter
= meta_map
.find(var
);
580 if (iter
!= meta_map
.end()) {
585 pos
= in
.find('$', last_pos
);
587 if (last_pos
!= std::string::npos
) {
588 out
+= in
.substr(last_pos
);
594 } /* anonymous namespace */
596 int AsioFrontend::get_config_key_val(string name
,
601 lderr(ctx()) << "bad " << type
<< " config value" << dendl
;
605 auto svc
= env
.store
->svc()->config_key
;
606 int r
= svc
->get(name
, true, pbl
);
608 lderr(ctx()) << type
<< " was not found: " << name
<< dendl
;
614 int AsioFrontend::ssl_set_private_key(const string
& name
, bool is_ssl_certificate
)
616 boost::system::error_code ec
;
618 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
619 ssl_context
->use_private_key_file(name
, ssl::context::pem
, ec
);
622 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
628 ssl_context
->use_private_key(boost::asio::buffer(bl
.c_str(), bl
.length()),
629 ssl::context::pem
, ec
);
633 if (!is_ssl_certificate
) {
634 lderr(ctx()) << "failed to add ssl_private_key=" << name
635 << ": " << ec
.message() << dendl
;
637 lderr(ctx()) << "failed to use ssl_certificate=" << name
638 << " as a private key: " << ec
.message() << dendl
;
646 int AsioFrontend::ssl_set_certificate_chain(const string
& name
)
648 boost::system::error_code ec
;
650 if (!boost::algorithm::starts_with(name
, config_val_prefix
)) {
651 ssl_context
->use_certificate_chain_file(name
, ec
);
654 int r
= get_config_key_val(name
.substr(config_val_prefix
.size()),
660 ssl_context
->use_certificate_chain(boost::asio::buffer(bl
.c_str(), bl
.length()),
665 lderr(ctx()) << "failed to use ssl_certificate=" << name
666 << ": " << ec
.message() << dendl
;
673 int AsioFrontend::init_ssl()
675 boost::system::error_code ec
;
676 auto& config
= conf
->get_config_map();
679 std::optional
<string
> cert
= conf
->get_val("ssl_certificate");
681 // only initialize the ssl context if it's going to be used
682 ssl_context
= boost::in_place(ssl::context::tls
);
685 std::optional
<string
> key
= conf
->get_val("ssl_private_key");
686 bool have_cert
= false;
689 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl
;
693 auto ports
= config
.equal_range("ssl_port");
694 auto endpoints
= config
.equal_range("ssl_endpoint");
697 * don't try to config certificate if frontend isn't configured for ssl
699 if (ports
.first
== ports
.second
&&
700 endpoints
.first
== endpoints
.second
) {
704 bool key_is_cert
= false;
712 ExpandMetaVar
emv(env
.store
->svc()->zone
);
714 cert
= emv
.process_str(*cert
);
715 key
= emv
.process_str(*key
);
717 int r
= ssl_set_private_key(*key
, key_is_cert
);
718 bool have_private_key
= (r
>= 0);
721 r
= ssl_set_private_key(*cert
, true);
722 have_private_key
= (r
>= 0);
726 if (have_private_key
) {
727 int r
= ssl_set_certificate_chain(*cert
);
728 have_cert
= (r
>= 0);
732 // parse ssl endpoints
733 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
735 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl
;
738 auto port
= parse_port(i
->second
.c_str(), ec
);
740 lderr(ctx()) << "failed to parse ssl_port=" << i
->second
<< dendl
;
743 listeners
.emplace_back(context
);
744 listeners
.back().endpoint
.port(port
);
745 listeners
.back().use_ssl
= true;
747 listeners
.emplace_back(context
);
748 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
749 listeners
.back().use_ssl
= true;
752 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
754 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl
;
757 auto endpoint
= parse_endpoint(i
->second
, 443, ec
);
759 lderr(ctx()) << "failed to parse ssl_endpoint=" << i
->second
<< dendl
;
762 listeners
.emplace_back(context
);
763 listeners
.back().endpoint
= endpoint
;
764 listeners
.back().use_ssl
= true;
768 #endif // WITH_RADOSGW_BEAST_OPENSSL
770 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
772 if (!l
.acceptor
.is_open()) {
774 } else if (ec
== boost::asio::error::operation_aborted
) {
777 ldout(ctx(), 1) << "accept failed: " << ec
.message() << dendl
;
780 auto socket
= std::move(l
.socket
);
781 tcp::no_delay
options(l
.use_nodelay
);
782 socket
.set_option(options
,ec
);
783 l
.acceptor
.async_accept(l
.socket
,
784 [this, &l
] (boost::system::error_code ec
) {
788 // spawn a coroutine to handle the connection
789 #ifdef WITH_RADOSGW_BEAST_OPENSSL
791 spawn::spawn(context
,
792 [this, s
=std::move(socket
)] (spawn::yield_context yield
) mutable {
794 auto c
= connections
.add(conn
);
795 // wrap the socket in an ssl stream
796 ssl::stream
<tcp::socket
&> stream
{s
, *ssl_context
};
797 auto buffer
= std::make_unique
<parse_buffer
>();
799 boost::system::error_code ec
;
800 auto bytes
= stream
.async_handshake(ssl::stream_base::server
,
801 buffer
->data(), yield
[ec
]);
803 ldout(ctx(), 1) << "ssl handshake failed: " << ec
.message() << dendl
;
806 buffer
->consume(bytes
);
807 handle_connection(context
, env
, stream
, *buffer
, true, pause_mutex
,
808 scheduler
.get(), ec
, yield
);
810 // ssl shutdown (ignoring errors)
811 stream
.async_shutdown(yield
[ec
]);
813 s
.shutdown(tcp::socket::shutdown_both
, ec
);
814 }, make_stack_allocator());
818 #endif // WITH_RADOSGW_BEAST_OPENSSL
819 spawn::spawn(context
,
820 [this, s
=std::move(socket
)] (spawn::yield_context yield
) mutable {
822 auto c
= connections
.add(conn
);
823 auto buffer
= std::make_unique
<parse_buffer
>();
824 boost::system::error_code ec
;
825 handle_connection(context
, env
, s
, *buffer
, false, pause_mutex
,
826 scheduler
.get(), ec
, yield
);
827 s
.shutdown(tcp::socket::shutdown_both
, ec
);
828 }, make_stack_allocator());
832 int AsioFrontend::run()
835 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
836 threads
.reserve(thread_count
);
838 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
840 // the worker threads call io_context::run(), which will return when there's
841 // no work left. hold a work guard to keep these threads going until join()
842 work
.emplace(boost::asio::make_work_guard(context
));
844 for (int i
= 0; i
< thread_count
; i
++) {
845 threads
.emplace_back([=] {
846 // request warnings on synchronous librados calls in this thread
847 is_asio_thread
= true;
848 boost::system::error_code ec
;
855 void AsioFrontend::stop()
857 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
861 boost::system::error_code ec
;
862 // close all listeners
863 for (auto& listener
: listeners
) {
864 listener
.acceptor
.close(ec
);
866 // close all connections
867 connections
.close(ec
);
868 pause_mutex
.cancel();
871 void AsioFrontend::join()
878 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
879 for (auto& thread
: threads
) {
882 ldout(ctx(), 4) << "frontend done" << dendl
;
885 void AsioFrontend::pause()
887 ldout(ctx(), 4) << "frontend pausing connections..." << dendl
;
889 // cancel pending calls to accept(), but don't close the sockets
890 boost::system::error_code ec
;
891 for (auto& l
: listeners
) {
892 l
.acceptor
.cancel(ec
);
895 // pause and wait for outstanding requests to complete
896 pause_mutex
.lock(ec
);
899 ldout(ctx(), 1) << "frontend failed to pause: " << ec
.message() << dendl
;
901 ldout(ctx(), 4) << "frontend paused" << dendl
;
905 void AsioFrontend::unpause(rgw::sal::RGWRadosStore
* const store
,
906 rgw_auth_registry_ptr_t auth_registry
)
909 env
.auth_registry
= std::move(auth_registry
);
911 // unpause to unblock connections
912 pause_mutex
.unlock();
914 // start accepting connections again
915 for (auto& l
: listeners
) {
916 l
.acceptor
.async_accept(l
.socket
,
917 [this, &l
] (boost::system::error_code ec
) {
922 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
925 } // anonymous namespace
927 class RGWAsioFrontend::Impl
: public AsioFrontend
{
929 Impl(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
930 rgw::dmclock::SchedulerCtx
& sched_ctx
)
931 : AsioFrontend(env
, conf
, sched_ctx
) {}
934 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
,
935 RGWFrontendConfig
* conf
,
936 rgw::dmclock::SchedulerCtx
& sched_ctx
)
937 : impl(new Impl(env
, conf
, sched_ctx
))
941 RGWAsioFrontend::~RGWAsioFrontend() = default;
943 int RGWAsioFrontend::init()
948 int RGWAsioFrontend::run()
953 void RGWAsioFrontend::stop()
958 void RGWAsioFrontend::join()
963 void RGWAsioFrontend::pause_for_new_config()
968 void RGWAsioFrontend::unpause_with_new_config(
969 rgw::sal::RGWRadosStore
* const store
,
970 rgw_auth_registry_ptr_t auth_registry
972 impl
->unpause(store
, std::move(auth_registry
));