#include <boost/asio.hpp>
#include <boost/intrusive/list.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <spawn/spawn.hpp>
#ifdef WITH_RADOSGW_BEAST_OPENSSL
#include <boost/asio/ssl.hpp>
-#include <boost/beast/ssl/ssl_stream.hpp>
+#endif
#include "common/split.h"
#include "rgw_zone.h"
-#endif
-
+#include "rgw_asio_frontend_timer.h"
#include "rgw_dmclock_async_scheduler.h"
#define dout_subsys ceph_subsys_rgw
namespace ssl = boost::asio::ssl;
#endif
-using parse_buffer = boost::beast::flat_static_buffer<65536>;
+struct Connection;
+
+// use explicit executor types instead of the type-erased boost::asio::executor
+using executor_type = boost::asio::io_context::executor_type;
+
+using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>;
+using tcp_stream = boost::beast::basic_stream<tcp, executor_type>;
+
+using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock,
+ executor_type, Connection>;
+
+static constexpr size_t parse_buffer_size = 65536;
+using parse_buffer = boost::beast::flat_static_buffer<parse_buffer_size>;
// use mmap/mprotect to allocate 512k coroutine stacks
auto make_stack_allocator() {
return boost::context::protected_fixedsize_stack{512*1024};
}
+using namespace std;
+
template <typename Stream>
class StreamIO : public rgw::asio::ClientIO {
CephContext* const cct;
Stream& stream;
- spawn::yield_context yield;
+ timeout_timer& timeout;
+ yield_context yield;
parse_buffer& buffer;
- ceph::timespan request_timeout;
public:
- StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
- spawn::yield_context yield,
+ StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
+ rgw::asio::parser_type& parser, yield_context yield,
parse_buffer& buffer, bool is_ssl,
const tcp::endpoint& local_endpoint,
- const tcp::endpoint& remote_endpoint,
- ceph::timespan request_timeout)
+ const tcp::endpoint& remote_endpoint)
: ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
- cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout)
+ cct(cct), stream(stream), timeout(timeout), yield(yield),
+ buffer(buffer)
{}
size_t write_data(const char* buf, size_t len) override {
boost::system::error_code ec;
- auto& timeout = get_lowest_layer(stream);
- if (request_timeout.count()) {
- timeout.expires_after(request_timeout);
- }
+ timeout.start();
auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
yield[ec]);
+ timeout.cancel();
if (ec) {
ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
- if (ec==boost::asio::error::broken_pipe) {
+ if (ec == boost::asio::error::broken_pipe) {
boost::system::error_code ec_ignored;
- timeout.socket().shutdown(tcp::socket::shutdown_both, ec_ignored);
+ stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored);
}
throw rgw::io::Exception(ec.value(), std::system_category());
}
}
size_t recv_body(char* buf, size_t max) override {
- auto& timeout = get_lowest_layer(stream);
auto& message = parser.get();
auto& body_remaining = message.body();
body_remaining.data = buf;
while (body_remaining.size && !parser.is_done()) {
boost::system::error_code ec;
- if (request_timeout.count()) {
- timeout.expires_after(request_timeout);
- }
+ timeout.start();
http::async_read_some(stream, buffer, parser, yield[ec]);
+ timeout.cancel();
if (ec == http::error::need_buffer) {
break;
}
template <typename Stream>
void handle_connection(boost::asio::io_context& context,
RGWProcessEnv& env, Stream& stream,
+ timeout_timer& timeout, size_t header_limit,
parse_buffer& buffer, bool is_ssl,
SharedMutex& pause_mutex,
rgw::dmclock::Scheduler *scheduler,
boost::system::error_code& ec,
- spawn::yield_context yield,
- ceph::timespan request_timeout)
+ yield_context yield)
{
- // limit header to 4k, since we read it all into a single flat_buffer
- static constexpr size_t header_limit = 4096;
// don't impose a limit on the body, since we read it in pieces
static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
rgw::asio::parser_type parser;
parser.header_limit(header_limit);
parser.body_limit(body_limit);
- auto& timeout = get_lowest_layer(stream);
- if (request_timeout.count()) {
- timeout.expires_after(request_timeout);
- }
+ timeout.start();
// parse the header
http::async_read_header(stream, buffer, parser, yield[ec]);
+ timeout.cancel();
if (ec == boost::asio::error::connection_reset ||
ec == boost::asio::error::bad_descriptor ||
ec == boost::asio::error::operation_aborted ||
response.result(http::status::bad_request);
response.version(message.version() == 10 ? 10 : 11);
response.prepare_payload();
- if (request_timeout.count()) {
- timeout.expires_after(request_timeout);
- }
+ timeout.start();
http::async_write(stream, response, yield[ec]);
+ timeout.cancel();
if (ec) {
ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
}
}
// process the request
- RGWRequest req{env.store->getRados()->get_new_req_id()};
+ RGWRequest req{env.store->get_new_req_id()};
- auto& socket = get_lowest_layer(stream).socket();
+ auto& socket = stream.lowest_layer();
const auto& remote_endpoint = socket.remote_endpoint(ec);
if (ec) {
ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
return;
}
- StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
- socket.local_endpoint(),
- remote_endpoint,request_timeout};
+ StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
+ is_ssl, socket.local_endpoint(),
+ remote_endpoint};
auto real_client_io = rgw::io::add_reordering(
rgw::io::add_buffering(cct,
rgw::io::add_conlen_controlling(
&real_client))));
RGWRestfulIO client(cct, &real_client_io);
- auto y = optional_yield{context, yield};
+ optional_yield y = null_yield;
+ if (cct->_conf->rgw_beast_enable_async) {
+ y = optional_yield{context, yield};
+ }
int http_ret = 0;
string user = "-";
const auto started = ceph::coarse_real_clock::now();
ceph::coarse_real_clock::duration latency{};
-
process_request(env.store, env.rest, &req, env.uri_prefix,
*env.auth_registry, &client, env.olog, y,
- scheduler, &user, &latency, &http_ret);
+ scheduler, &user, &latency,
+ env.ratelimiting->get_active(),
+ &http_ret);
if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
// access log line elements begin per Apache Combined Log Format with additions following
body.size = discard_buffer.size();
body.data = discard_buffer.data();
- if (request_timeout.count()) {
- timeout.expires_after(request_timeout);
- }
+ timeout.start();
http::async_read_some(stream, buffer, parser, yield[ec]);
+ timeout.cancel();
if (ec == http::error::need_buffer) {
continue;
}
}
}
-struct Connection : boost::intrusive::list_base_hook<> {
- tcp::socket& socket;
- Connection(tcp::socket& socket) : socket(socket) {}
+// timeout support requires that connections are reference-counted, because the
+// timeout_handler can outlive the coroutine
+struct Connection : boost::intrusive::list_base_hook<>,
+ boost::intrusive_ref_counter<Connection>
+{
+ tcp_socket socket;
+ parse_buffer buffer;
+
+ explicit Connection(tcp_socket&& socket) noexcept
+ : socket(std::move(socket)) {}
+
+ void close(boost::system::error_code& ec) {
+ socket.close(ec);
+ }
};
class ConnectionList {
RGWFrontendConfig* conf;
boost::asio::io_context context;
ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
+ size_t header_limit = 16384;
#ifdef WITH_RADOSGW_BEAST_OPENSSL
boost::optional<ssl::context> ssl_context;
int get_config_key_val(string name,
struct Listener {
tcp::endpoint endpoint;
tcp::acceptor acceptor;
- tcp::socket socket;
+ tcp_socket socket;
bool use_ssl = false;
bool use_nodelay = false;
void stop();
void join();
void pause();
- void unpause(rgw::sal::RGWRadosStore* store, rgw_auth_registry_ptr_t);
+ void unpause(rgw::sal::Store* store, rgw_auth_registry_ptr_t);
};
unsigned short parse_port(const char *input, boost::system::error_code& ec)
// Setting global timeout
auto timeout = config.find("request_timeout_ms");
if (timeout != config.end()) {
- auto timeout_number = ceph::parse<uint64_t>(timeout->second.data());
+ auto timeout_number = ceph::parse<uint64_t>(timeout->second);
if (timeout_number) {
request_timeout = std::chrono::milliseconds(*timeout_number);
} else {
lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
- << timeout->second.data() << " setting it to the default value: "
+ << timeout->second << " setting it to the default value: "
<< REQUEST_TIMEOUT << dendl;
}
- }
+ }
+
+ auto max_header_size = config.find("max_header_size");
+ if (max_header_size != config.end()) {
+ auto limit = ceph::parse<uint64_t>(max_header_size->second);
+ if (!limit) {
+ lderr(ctx()) << "WARNING: invalid value for max_header_size: "
+ << max_header_size->second << ", using the default value: "
+ << header_limit << dendl;
+ } else if (*limit > parse_buffer_size) { // can't exceed parse buffer size
+ header_limit = parse_buffer_size;
+ lderr(ctx()) << "WARNING: max_header_size " << max_header_size->second
+ << " capped at maximum value " << header_limit << dendl;
+ } else {
+ header_limit = *limit;
+ }
+ }
+
#ifdef WITH_RADOSGW_BEAST_OPENSSL
int r = init_ssl();
if (r < 0) {
map<string, string> meta_map;
public:
- ExpandMetaVar(RGWSI_Zone *zone_svc) {
+ ExpandMetaVar(rgw::sal::Zone* zone_svc) {
meta_map["realm"] = zone_svc->get_realm().get_name();
meta_map["realm_id"] = zone_svc->get_realm().get_id();
meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
- meta_map["zone"] = zone_svc->zone_name();
- meta_map["zone_id"] = zone_svc->zone_id().id;
+ meta_map["zone"] = zone_svc->get_name();
+ meta_map["zone_id"] = zone_svc->get_id().id;
}
string process_str(const string& in);
return -EINVAL;
}
- auto svc = env.store->svc()->config_key;
- int r = svc->get(name, true, pbl);
+ int r = env.store->get_config_key_val(name, pbl);
if (r < 0) {
lderr(ctx()) << type << " was not found: " << name << dendl;
return r;
key_is_cert = true;
}
- ExpandMetaVar emv(env.store->svc()->zone);
+ ExpandMetaVar emv(env.store->get_zone());
cert = emv.process_str(*cert);
key = emv.process_str(*key);
ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
return;
}
- auto socket = std::move(l.socket);
- tcp::no_delay options(l.use_nodelay);
- socket.set_option(options,ec);
+ auto stream = std::move(l.socket);
+ stream.set_option(tcp::no_delay(l.use_nodelay), ec);
l.acceptor.async_accept(l.socket,
[this, &l] (boost::system::error_code ec) {
accept(l, ec);
});
- boost::beast::tcp_stream stream(std::move(socket));
// spawn a coroutine to handle the connection
#ifdef WITH_RADOSGW_BEAST_OPENSSL
if (l.use_ssl) {
spawn::spawn(context,
- [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
- Connection conn{s.socket()};
- auto c = connections.add(conn);
- // wrap the tcp_stream in an ssl stream
- boost::beast::ssl_stream<boost::beast::tcp_stream&> stream{s, *ssl_context};
- auto buffer = std::make_unique<parse_buffer>();
+ [this, s=std::move(stream)] (yield_context yield) mutable {
+ auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
+ auto c = connections.add(*conn);
+ // wrap the tcp stream in an ssl stream
+ boost::asio::ssl::stream<tcp_socket&> stream{conn->socket, *ssl_context};
+ auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
// do ssl handshake
boost::system::error_code ec;
- if (request_timeout.count()) {
- get_lowest_layer(stream).expires_after(request_timeout);
- }
+ timeout.start();
auto bytes = stream.async_handshake(ssl::stream_base::server,
- buffer->data(), yield[ec]);
+ conn->buffer.data(), yield[ec]);
+ timeout.cancel();
if (ec) {
ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
return;
}
- buffer->consume(bytes);
- handle_connection(context, env, stream, *buffer, true, pause_mutex,
- scheduler.get(), ec, yield, request_timeout);
+ conn->buffer.consume(bytes);
+ handle_connection(context, env, stream, timeout, header_limit,
+ conn->buffer, true, pause_mutex, scheduler.get(),
+ ec, yield);
if (!ec) {
// ssl shutdown (ignoring errors)
stream.async_shutdown(yield[ec]);
}
- s.socket().shutdown(tcp::socket::shutdown_both, ec);
+ conn->socket.shutdown(tcp::socket::shutdown_both, ec);
}, make_stack_allocator());
} else {
#else
{
#endif // WITH_RADOSGW_BEAST_OPENSSL
spawn::spawn(context,
- [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
- Connection conn{s.socket()};
- auto c = connections.add(conn);
- auto buffer = std::make_unique<parse_buffer>();
+ [this, s=std::move(stream)] (yield_context yield) mutable {
+ auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
+ auto c = connections.add(*conn);
+ auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
boost::system::error_code ec;
- handle_connection(context, env, s, *buffer, false, pause_mutex,
- scheduler.get(), ec, yield, request_timeout);
- s.socket().shutdown(tcp::socket::shutdown_both, ec);
+ handle_connection(context, env, conn->socket, timeout, header_limit,
+ conn->buffer, false, pause_mutex, scheduler.get(),
+ ec, yield);
+ conn->socket.shutdown(tcp_socket::shutdown_both, ec);
}, make_stack_allocator());
}
}
work.emplace(boost::asio::make_work_guard(context));
for (int i = 0; i < thread_count; i++) {
- threads.emplace_back([=] {
+ threads.emplace_back([=]() noexcept {
// request warnings on synchronous librados calls in this thread
is_asio_thread = true;
- boost::system::error_code ec;
- context.run(ec);
+ // Have uncaught exceptions kill the process and give a
+ // stacktrace, not be swallowed.
+ context.run();
});
}
return 0;
}
}
-void AsioFrontend::unpause(rgw::sal::RGWRadosStore* const store,
+void AsioFrontend::unpause(rgw::sal::Store* const store,
rgw_auth_registry_ptr_t auth_registry)
{
env.store = store;
}
void RGWAsioFrontend::unpause_with_new_config(
- rgw::sal::RGWRadosStore* const store,
+ rgw::sal::Store* const store,
rgw_auth_registry_ptr_t auth_registry
) {
impl->unpause(store, std::move(auth_registry));