1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/asio.hpp>
9 #define BOOST_COROUTINES_NO_DEPRECATION_WARNING
10 #include <boost/asio/spawn.hpp>
11 #include <boost/intrusive/list.hpp>
13 #include "common/async/shared_mutex.h"
14 #include "common/errno.h"
16 #include "rgw_asio_client.h"
17 #include "rgw_asio_frontend.h"
19 #ifdef WITH_RADOSGW_BEAST_OPENSSL
20 #include <boost/asio/ssl.hpp>
23 #include "rgw_dmclock_async_scheduler.h"
25 #define dout_subsys ceph_subsys_rgw
29 using tcp
= boost::asio::ip::tcp
;
30 namespace http
= boost::beast::http
;
31 #ifdef WITH_RADOSGW_BEAST_OPENSSL
32 namespace ssl
= boost::asio::ssl
;
35 template <typename Stream
>
36 class StreamIO
: public rgw::asio::ClientIO
{
37 CephContext
* const cct
;
39 boost::beast::flat_buffer
& buffer
;
41 StreamIO(CephContext
*cct
, Stream
& stream
, rgw::asio::parser_type
& parser
,
42 boost::beast::flat_buffer
& buffer
, bool is_ssl
,
43 const tcp::endpoint
& local_endpoint
,
44 const tcp::endpoint
& remote_endpoint
)
45 : ClientIO(parser
, is_ssl
, local_endpoint
, remote_endpoint
),
46 cct(cct
), stream(stream
), buffer(buffer
)
49 size_t write_data(const char* buf
, size_t len
) override
{
50 boost::system::error_code ec
;
51 auto bytes
= boost::asio::write(stream
, boost::asio::buffer(buf
, len
), ec
);
53 ldout(cct
, 4) << "write_data failed: " << ec
.message() << dendl
;
54 throw rgw::io::Exception(ec
.value(), std::system_category());
59 size_t recv_body(char* buf
, size_t max
) override
{
60 auto& message
= parser
.get();
61 auto& body_remaining
= message
.body();
62 body_remaining
.data
= buf
;
63 body_remaining
.size
= max
;
65 while (body_remaining
.size
&& !parser
.is_done()) {
66 boost::system::error_code ec
;
67 http::read_some(stream
, buffer
, parser
, ec
);
68 if (ec
== http::error::partial_message
||
69 ec
== http::error::need_buffer
) {
73 ldout(cct
, 4) << "failed to read body: " << ec
.message() << dendl
;
74 throw rgw::io::Exception(ec
.value(), std::system_category());
77 return max
- body_remaining
.size
;
81 using SharedMutex
= ceph::async::SharedMutex
<boost::asio::io_context::executor_type
>;
83 template <typename Stream
>
84 void handle_connection(RGWProcessEnv
& env
, Stream
& stream
,
85 boost::beast::flat_buffer
& buffer
, bool is_ssl
,
86 SharedMutex
& pause_mutex
,
87 rgw::dmclock::Scheduler
*scheduler
,
88 boost::system::error_code
& ec
,
89 boost::asio::yield_context yield
)
91 // limit header to 4k, since we read it all into a single flat_buffer
92 static constexpr size_t header_limit
= 4096;
93 // don't impose a limit on the body, since we read it in pieces
94 static constexpr size_t body_limit
= std::numeric_limits
<size_t>::max();
96 auto cct
= env
.store
->ctx();
98 // read messages from the stream until eof
100 // configure the parser
101 rgw::asio::parser_type parser
;
102 parser
.header_limit(header_limit
);
103 parser
.body_limit(body_limit
);
106 http::async_read_header(stream
, buffer
, parser
, yield
[ec
]);
107 if (ec
== boost::asio::error::connection_reset
||
108 ec
== boost::asio::error::bad_descriptor
||
109 ec
== boost::asio::error::operation_aborted
||
110 #ifdef WITH_RADOSGW_BEAST_OPENSSL
111 ec
== ssl::error::stream_truncated
||
113 ec
== http::error::end_of_stream
) {
114 ldout(cct
, 20) << "failed to read header: " << ec
.message() << dendl
;
118 ldout(cct
, 1) << "failed to read header: " << ec
.message() << dendl
;
119 auto& message
= parser
.get();
120 http::response
<http::empty_body
> response
;
121 response
.result(http::status::bad_request
);
122 response
.version(message
.version() == 10 ? 10 : 11);
123 response
.prepare_payload();
124 http::async_write(stream
, response
, yield
[ec
]);
126 ldout(cct
, 5) << "failed to write response: " << ec
.message() << dendl
;
128 ldout(cct
, 1) << "====== req done http_status=400 ======" << dendl
;
133 auto lock
= pause_mutex
.async_lock_shared(yield
[ec
]);
134 if (ec
== boost::asio::error::operation_aborted
) {
137 ldout(cct
, 1) << "failed to lock: " << ec
.message() << dendl
;
141 // process the request
142 RGWRequest req
{env
.store
->get_new_req_id()};
144 auto& socket
= stream
.lowest_layer();
145 StreamIO real_client
{cct
, stream
, parser
, buffer
, is_ssl
,
146 socket
.local_endpoint(),
147 socket
.remote_endpoint()};
149 auto real_client_io
= rgw::io::add_reordering(
150 rgw::io::add_buffering(cct
,
151 rgw::io::add_chunking(
152 rgw::io::add_conlen_controlling(
154 RGWRestfulIO
client(cct
, &real_client_io
);
155 auto y
= optional_yield
{socket
.get_io_context(), yield
};
156 process_request(env
.store
, env
.rest
, &req
, env
.uri_prefix
,
157 *env
.auth_registry
, &client
, env
.olog
, y
, scheduler
);
160 if (!parser
.keep_alive()) {
164 // if we failed before reading the entire message, discard any remaining
165 // bytes before reading the next
166 while (!parser
.is_done()) {
167 static std::array
<char, 1024> discard_buffer
;
169 auto& body
= parser
.get().body();
170 body
.size
= discard_buffer
.size();
171 body
.data
= discard_buffer
.data();
173 http::async_read_some(stream
, buffer
, parser
, yield
[ec
]);
174 if (ec
== boost::asio::error::connection_reset
) {
178 ldout(cct
, 5) << "failed to discard unread message: "
179 << ec
.message() << dendl
;
186 struct Connection
: boost::intrusive::list_base_hook
<> {
188 Connection(tcp::socket
& socket
) : socket(socket
) {}
191 class ConnectionList
{
192 using List
= boost::intrusive::list
<Connection
>;
196 void remove(Connection
& c
) {
197 std::lock_guard lock
{mutex
};
199 connections
.erase(List::s_iterator_to(c
));
204 ConnectionList
*list
;
207 Guard(ConnectionList
*list
, Connection
*conn
) : list(list
), conn(conn
) {}
208 ~Guard() { list
->remove(*conn
); }
210 [[nodiscard
]] Guard
add(Connection
& conn
) {
211 std::lock_guard lock
{mutex
};
212 connections
.push_back(conn
);
213 return Guard
{this, &conn
};
215 void close(boost::system::error_code
& ec
) {
216 std::lock_guard lock
{mutex
};
217 for (auto& conn
: connections
) {
218 conn
.socket
.close(ec
);
224 namespace dmc
= rgw::dmclock
;
227 RGWFrontendConfig
* conf
;
228 boost::asio::io_context context
;
229 #ifdef WITH_RADOSGW_BEAST_OPENSSL
230 boost::optional
<ssl::context
> ssl_context
;
233 SharedMutex pause_mutex
;
234 std::unique_ptr
<rgw::dmclock::Scheduler
> scheduler
;
237 tcp::endpoint endpoint
;
238 tcp::acceptor acceptor
;
240 bool use_ssl
= false;
241 bool use_nodelay
= false;
243 explicit Listener(boost::asio::io_context
& context
)
244 : acceptor(context
), socket(context
) {}
246 std::vector
<Listener
> listeners
;
248 ConnectionList connections
;
250 // work guard to keep run() threads busy while listeners are paused
251 using Executor
= boost::asio::io_context::executor_type
;
252 std::optional
<boost::asio::executor_work_guard
<Executor
>> work
;
254 std::vector
<std::thread
> threads
;
255 std::atomic
<bool> going_down
{false};
257 CephContext
* ctx() const { return env
.store
->ctx(); }
258 std::optional
<dmc::ClientCounters
> client_counters
;
259 std::unique_ptr
<dmc::ClientConfig
> client_config
;
260 void accept(Listener
& listener
, boost::system::error_code ec
);
263 AsioFrontend(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
264 dmc::SchedulerCtx
& sched_ctx
)
265 : env(env
), conf(conf
), pause_mutex(context
.get_executor())
267 auto sched_t
= dmc::get_scheduler_t(ctx());
269 case dmc::scheduler_t::dmclock
:
270 scheduler
.reset(new dmc::AsyncScheduler(ctx(),
272 std::ref(sched_ctx
.get_dmc_client_counters()),
273 sched_ctx
.get_dmc_client_config(),
274 *sched_ctx
.get_dmc_client_config(),
275 dmc::AtLimit::Reject
));
277 case dmc::scheduler_t::none
:
278 lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl
;
280 case dmc::scheduler_t::throttler
:
281 scheduler
.reset(new dmc::SimpleThrottler(ctx()));
291 void unpause(RGWRados
* store
, rgw_auth_registry_ptr_t
);
294 unsigned short parse_port(const char *input
, boost::system::error_code
& ec
)
297 auto port
= std::strtoul(input
, &end
, 10);
298 if (port
> std::numeric_limits
<unsigned short>::max()) {
299 ec
.assign(ERANGE
, boost::system::system_category());
300 } else if (port
== 0 && end
== input
) {
301 ec
.assign(EINVAL
, boost::system::system_category());
306 tcp::endpoint
parse_endpoint(boost::asio::string_view input
,
307 unsigned short default_port
,
308 boost::system::error_code
& ec
)
310 tcp::endpoint endpoint
;
313 ec
= boost::asio::error::invalid_argument
;
317 if (input
[0] == '[') { // ipv6
318 const size_t addr_begin
= 1;
319 const size_t addr_end
= input
.find(']');
320 if (addr_end
== input
.npos
) { // no matching ]
321 ec
= boost::asio::error::invalid_argument
;
324 if (addr_end
+ 1 < input
.size()) {
325 // :port must must follow [ipv6]
326 if (input
[addr_end
+ 1] != ':') {
327 ec
= boost::asio::error::invalid_argument
;
330 auto port_str
= input
.substr(addr_end
+ 2);
331 endpoint
.port(parse_port(port_str
.data(), ec
));
334 endpoint
.port(default_port
);
336 auto addr
= input
.substr(addr_begin
, addr_end
- addr_begin
);
337 endpoint
.address(boost::asio::ip::make_address_v6(addr
, ec
));
339 auto colon
= input
.find(':');
340 if (colon
!= input
.npos
) {
341 auto port_str
= input
.substr(colon
+ 1);
342 endpoint
.port(parse_port(port_str
.data(), ec
));
347 endpoint
.port(default_port
);
349 auto addr
= input
.substr(0, colon
);
350 endpoint
.address(boost::asio::ip::make_address_v4(addr
, ec
));
355 static int drop_privileges(CephContext
*ctx
)
357 uid_t uid
= ctx
->get_set_uid();
358 gid_t gid
= ctx
->get_set_gid();
359 std::string uid_string
= ctx
->get_set_uid_string();
360 std::string gid_string
= ctx
->get_set_gid_string();
361 if (gid
&& setgid(gid
) != 0) {
363 ldout(ctx
, -1) << "unable to setgid " << gid
<< ": " << cpp_strerror(err
) << dendl
;
366 if (uid
&& setuid(uid
) != 0) {
368 ldout(ctx
, -1) << "unable to setuid " << uid
<< ": " << cpp_strerror(err
) << dendl
;
372 ldout(ctx
, 0) << "set uid:gid to " << uid
<< ":" << gid
373 << " (" << uid_string
<< ":" << gid_string
<< ")" << dendl
;
378 int AsioFrontend::init()
380 boost::system::error_code ec
;
381 auto& config
= conf
->get_config_map();
383 #ifdef WITH_RADOSGW_BEAST_OPENSSL
391 auto ports
= config
.equal_range("port");
392 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
393 auto port
= parse_port(i
->second
.c_str(), ec
);
395 lderr(ctx()) << "failed to parse port=" << i
->second
<< dendl
;
398 listeners
.emplace_back(context
);
399 listeners
.back().endpoint
.port(port
);
401 listeners
.emplace_back(context
);
402 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
405 auto endpoints
= config
.equal_range("endpoint");
406 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
407 auto endpoint
= parse_endpoint(i
->second
, 80, ec
);
409 lderr(ctx()) << "failed to parse endpoint=" << i
->second
<< dendl
;
412 listeners
.emplace_back(context
);
413 listeners
.back().endpoint
= endpoint
;
416 auto nodelay
= config
.find("tcp_nodelay");
417 if (nodelay
!= config
.end()) {
418 for (auto& l
: listeners
) {
419 l
.use_nodelay
= (nodelay
->second
== "1");
424 bool socket_bound
= false;
426 for (auto& l
: listeners
) {
427 l
.acceptor
.open(l
.endpoint
.protocol(), ec
);
429 if (ec
== boost::asio::error::address_family_not_supported
) {
430 ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l
.endpoint
431 << ", " << ec
.message() << dendl
;
435 lderr(ctx()) << "failed to open socket: " << ec
.message() << dendl
;
439 if (l
.endpoint
.protocol() == tcp::v6()) {
440 l
.acceptor
.set_option(boost::asio::ip::v6_only(true), ec
);
442 lderr(ctx()) << "failed to set v6_only socket option: "
443 << ec
.message() << dendl
;
448 l
.acceptor
.set_option(tcp::acceptor::reuse_address(true));
449 l
.acceptor
.bind(l
.endpoint
, ec
);
451 lderr(ctx()) << "failed to bind address " << l
.endpoint
452 << ": " << ec
.message() << dendl
;
456 l
.acceptor
.listen(boost::asio::socket_base::max_connections
);
457 l
.acceptor
.async_accept(l
.socket
,
458 [this, &l
] (boost::system::error_code ec
) {
462 ldout(ctx(), 4) << "frontend listening on " << l
.endpoint
<< dendl
;
466 lderr(ctx()) << "Unable to listen at any endpoints" << dendl
;
470 return drop_privileges(ctx());
473 #ifdef WITH_RADOSGW_BEAST_OPENSSL
474 int AsioFrontend::init_ssl()
476 boost::system::error_code ec
;
477 auto& config
= conf
->get_config_map();
480 auto cert
= config
.find("ssl_certificate");
481 const bool have_cert
= cert
!= config
.end();
483 // only initialize the ssl context if it's going to be used
484 ssl_context
= boost::in_place(ssl::context::tls
);
487 auto key
= config
.find("ssl_private_key");
488 const bool have_private_key
= key
!= config
.end();
489 if (have_private_key
) {
491 lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl
;
494 ssl_context
->use_private_key_file(key
->second
, ssl::context::pem
, ec
);
496 lderr(ctx()) << "failed to add ssl_private_key=" << key
->second
497 << ": " << ec
.message() << dendl
;
502 ssl_context
->use_certificate_chain_file(cert
->second
, ec
);
504 lderr(ctx()) << "failed to use ssl_certificate=" << cert
->second
505 << ": " << ec
.message() << dendl
;
508 if (!have_private_key
) {
509 // attempt to use it as a private key if a separate one wasn't provided
510 ssl_context
->use_private_key_file(cert
->second
, ssl::context::pem
, ec
);
512 lderr(ctx()) << "failed to use ssl_certificate=" << cert
->second
513 << " as a private key: " << ec
.message() << dendl
;
519 // parse ssl endpoints
520 auto ports
= config
.equal_range("ssl_port");
521 for (auto i
= ports
.first
; i
!= ports
.second
; ++i
) {
523 lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl
;
526 auto port
= parse_port(i
->second
.c_str(), ec
);
528 lderr(ctx()) << "failed to parse ssl_port=" << i
->second
<< dendl
;
531 listeners
.emplace_back(context
);
532 listeners
.back().endpoint
.port(port
);
533 listeners
.back().use_ssl
= true;
535 listeners
.emplace_back(context
);
536 listeners
.back().endpoint
= tcp::endpoint(tcp::v6(), port
);
537 listeners
.back().use_ssl
= true;
540 auto endpoints
= config
.equal_range("ssl_endpoint");
541 for (auto i
= endpoints
.first
; i
!= endpoints
.second
; ++i
) {
543 lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl
;
546 auto endpoint
= parse_endpoint(i
->second
, 443, ec
);
548 lderr(ctx()) << "failed to parse ssl_endpoint=" << i
->second
<< dendl
;
551 listeners
.emplace_back(context
);
552 listeners
.back().endpoint
= endpoint
;
553 listeners
.back().use_ssl
= true;
557 #endif // WITH_RADOSGW_BEAST_OPENSSL
559 void AsioFrontend::accept(Listener
& l
, boost::system::error_code ec
)
561 if (!l
.acceptor
.is_open()) {
563 } else if (ec
== boost::asio::error::operation_aborted
) {
568 auto socket
= std::move(l
.socket
);
569 tcp::no_delay
options(l
.use_nodelay
);
570 socket
.set_option(options
,ec
);
571 l
.acceptor
.async_accept(l
.socket
,
572 [this, &l
] (boost::system::error_code ec
) {
576 // spawn a coroutine to handle the connection
577 #ifdef WITH_RADOSGW_BEAST_OPENSSL
579 boost::asio::spawn(context
,
580 [this, s
=std::move(socket
)] (boost::asio::yield_context yield
) mutable {
582 auto c
= connections
.add(conn
);
583 // wrap the socket in an ssl stream
584 ssl::stream
<tcp::socket
&> stream
{s
, *ssl_context
};
585 boost::beast::flat_buffer buffer
;
587 boost::system::error_code ec
;
588 auto bytes
= stream
.async_handshake(ssl::stream_base::server
,
589 buffer
.data(), yield
[ec
]);
591 ldout(ctx(), 1) << "ssl handshake failed: " << ec
.message() << dendl
;
594 buffer
.consume(bytes
);
595 handle_connection(env
, stream
, buffer
, true, pause_mutex
,
596 scheduler
.get(), ec
, yield
);
598 // ssl shutdown (ignoring errors)
599 stream
.async_shutdown(yield
[ec
]);
601 s
.shutdown(tcp::socket::shutdown_both
, ec
);
606 #endif // WITH_RADOSGW_BEAST_OPENSSL
607 boost::asio::spawn(context
,
608 [this, s
=std::move(socket
)] (boost::asio::yield_context yield
) mutable {
610 auto c
= connections
.add(conn
);
611 boost::beast::flat_buffer buffer
;
612 boost::system::error_code ec
;
613 handle_connection(env
, s
, buffer
, false, pause_mutex
,
614 scheduler
.get(), ec
, yield
);
615 s
.shutdown(tcp::socket::shutdown_both
, ec
);
620 int AsioFrontend::run()
623 const int thread_count
= cct
->_conf
->rgw_thread_pool_size
;
624 threads
.reserve(thread_count
);
626 ldout(cct
, 4) << "frontend spawning " << thread_count
<< " threads" << dendl
;
628 // the worker threads call io_context::run(), which will return when there's
629 // no work left. hold a work guard to keep these threads going until join()
630 work
.emplace(boost::asio::make_work_guard(context
));
632 for (int i
= 0; i
< thread_count
; i
++) {
633 threads
.emplace_back([=] {
634 // request warnings on synchronous librados calls in this thread
635 is_asio_thread
= true;
636 boost::system::error_code ec
;
643 void AsioFrontend::stop()
645 ldout(ctx(), 4) << "frontend initiating shutdown..." << dendl
;
649 boost::system::error_code ec
;
650 // close all listeners
651 for (auto& listener
: listeners
) {
652 listener
.acceptor
.close(ec
);
654 // close all connections
655 connections
.close(ec
);
656 pause_mutex
.cancel();
659 void AsioFrontend::join()
666 ldout(ctx(), 4) << "frontend joining threads..." << dendl
;
667 for (auto& thread
: threads
) {
670 ldout(ctx(), 4) << "frontend done" << dendl
;
673 void AsioFrontend::pause()
675 ldout(ctx(), 4) << "frontend pausing connections..." << dendl
;
677 // cancel pending calls to accept(), but don't close the sockets
678 boost::system::error_code ec
;
679 for (auto& l
: listeners
) {
680 l
.acceptor
.cancel(ec
);
683 // pause and wait for outstanding requests to complete
684 pause_mutex
.lock(ec
);
687 ldout(ctx(), 1) << "frontend failed to pause: " << ec
.message() << dendl
;
689 ldout(ctx(), 4) << "frontend paused" << dendl
;
693 void AsioFrontend::unpause(RGWRados
* const store
,
694 rgw_auth_registry_ptr_t auth_registry
)
697 env
.auth_registry
= std::move(auth_registry
);
699 // unpause to unblock connections
700 pause_mutex
.unlock();
702 // start accepting connections again
703 for (auto& l
: listeners
) {
704 l
.acceptor
.async_accept(l
.socket
,
705 [this, &l
] (boost::system::error_code ec
) {
710 ldout(ctx(), 4) << "frontend unpaused" << dendl
;
713 } // anonymous namespace
715 class RGWAsioFrontend::Impl
: public AsioFrontend
{
717 Impl(const RGWProcessEnv
& env
, RGWFrontendConfig
* conf
,
718 rgw::dmclock::SchedulerCtx
& sched_ctx
)
719 : AsioFrontend(env
, conf
, sched_ctx
) {}
722 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv
& env
,
723 RGWFrontendConfig
* conf
,
724 rgw::dmclock::SchedulerCtx
& sched_ctx
)
725 : impl(new Impl(env
, conf
, sched_ctx
))
729 RGWAsioFrontend::~RGWAsioFrontend() = default;
731 int RGWAsioFrontend::init()
736 int RGWAsioFrontend::run()
741 void RGWAsioFrontend::stop()
746 void RGWAsioFrontend::join()
751 void RGWAsioFrontend::pause_for_new_config()
756 void RGWAsioFrontend::unpause_with_new_config(
757 RGWRados
* const store
,
758 rgw_auth_registry_ptr_t auth_registry
760 impl
->unpause(store
, std::move(auth_registry
));