// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
#include <boost/asio.hpp>
-#include <boost/asio/spawn.hpp>
-#include <beast/core/placeholders.hpp>
-#include <beast/http/read.hpp>
-#include <beast/http/string_body.hpp>
-#include <beast/http/write.hpp>
-
-#include "rgw_asio_frontend.h"
#include "rgw_asio_client.h"
+#include "rgw_asio_frontend.h"
#define dout_subsys ceph_subsys_rgw
-#undef dout_prefix
-#define dout_prefix (*_dout << "asio: ")
-
namespace {
class Pauser {
}
using tcp = boost::asio::ip::tcp;
+namespace beast = boost::beast;
-// coroutine to handle a client connection to completion
-static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
- boost::asio::yield_context yield)
-{
- auto cct = env.store->ctx();
- boost::system::error_code ec;
+class Connection {
+ RGWProcessEnv& env;
+ boost::asio::io_service::strand strand;
+ tcp::socket socket;
+
+ // references are bound to callbacks for async operations. if a callback
+ // function returns without issuing another operation, the reference is
+ // dropped and the Connection is deleted/closed
+ std::atomic<int> nref{0};
+ using Ref = boost::intrusive_ptr<Connection>;
- beast::flat_streambuf buffer{1024};
+ // limit header to 4k, since we read it all into a single flat_buffer
+ static constexpr size_t header_limit = 4096;
+ // don't impose a limit on the body, since we read it in pieces
+ static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
+
+ beast::flat_buffer buffer;
+ boost::optional<rgw::asio::parser_type> parser;
+
+ using bad_response_type = beast::http::response<beast::http::empty_body>;
+ boost::optional<bad_response_type> response;
+
+ CephContext* ctx() const { return env.store->ctx(); }
+
+ void read_header() {
+ // configure the parser
+ parser.emplace();
+ parser->header_limit(header_limit);
+ parser->body_limit(body_limit);
- // read messages from the socket until eof
- for (;;) {
// parse the header
- rgw::asio::parser_type parser;
- do {
- auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
- buffer.consume(bytes);
- } while (!ec && !parser.got_header());
+ beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
+ std::bind(&Connection::on_header, Ref{this},
+ std::placeholders::_1)));
+ }
+
+ void discard_unread_message() {
+ if (parser->is_done()) {
+ // nothing left to discard, start reading the next message
+ read_header();
+ return;
+ }
+
+ // read the rest of the request into a static buffer. multiple clients could
+ // write at the same time, but this is okay because we never read it back
+ static std::array<char, 1024> discard_buffer;
+
+ auto& body = parser->get().body();
+ body.size = discard_buffer.size();
+ body.data = discard_buffer.data();
+
+ beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
+ std::bind(&Connection::on_discard_unread, Ref{this},
+ std::placeholders::_1)));
+ }
+
+ void on_discard_unread(boost::system::error_code ec) {
+ if (ec == boost::asio::error::connection_reset) {
+ return;
+ }
+ if (ec) {
+ ldout(ctx(), 5) << "discard_unread_message failed: "
+ << ec.message() << dendl;
+ return;
+ }
+ discard_unread_message();
+ }
+ void on_write_error(boost::system::error_code ec) {
+ if (ec) {
+ ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
+ }
+ }
+
+ void on_header(boost::system::error_code ec) {
if (ec == boost::asio::error::connection_reset ||
- ec == boost::asio::error::eof) {
+ ec == beast::http::error::end_of_stream) {
return;
}
if (ec) {
- auto& message = parser.get();
- ldout(cct, 1) << "read failed: " << ec.message() << dendl;
- ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
- beast::http::response<beast::http::string_body> response;
- response.status = 400;
- response.reason = "Bad Request";
- response.version = message.version == 10 ? 10 : 11;
- beast::http::prepare(response);
- beast::http::async_write(socket, std::move(response), yield[ec]);
- // ignore ec
+ auto& message = parser->get();
+ ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
+ ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
+ response.emplace();
+ response->result(beast::http::status::bad_request);
+ response->version(message.version() == 10 ? 10 : 11);
+ response->prepare_payload();
+ beast::http::async_write(socket, *response, strand.wrap(
+ std::bind(&Connection::on_write_error, Ref{this},
+ std::placeholders::_1)));
return;
}
// process the request
RGWRequest req{env.store->get_new_req_id()};
- rgw::asio::ClientIO real_client{socket, parser, buffer};
+ rgw::asio::ClientIO real_client{socket, *parser, buffer};
auto real_client_io = rgw::io::add_reordering(
- rgw::io::add_buffering(
+ rgw::io::add_buffering(ctx(),
rgw::io::add_chunking(
rgw::io::add_conlen_controlling(
&real_client))));
- RGWRestfulIO client(&real_client_io);
+ RGWRestfulIO client(ctx(), &real_client_io);
process_request(env.store, env.rest, &req, env.uri_prefix,
*env.auth_registry, &client, env.olog);
- if (real_client.get_conn_close()) {
- return;
+ if (parser->keep_alive()) {
+ // parse any unread bytes from the previous message (in case we replied
+ // before reading the entire body) before reading the next
+ discard_unread_message();
}
}
-}
+
+ public:
+ Connection(RGWProcessEnv& env, tcp::socket&& socket)
+ : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
+
+ void on_connect() {
+ read_header();
+ }
+
+ void get() { ++nref; }
+ void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
+
+ friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
+ friend void intrusive_ptr_release(Connection *c) { c->put(); }
+};
+
class AsioFrontend {
RGWProcessEnv env;
+ RGWFrontendConfig* conf;
boost::asio::io_service service;
tcp::acceptor acceptor;
void accept(boost::system::error_code ec);
public:
- AsioFrontend(const RGWProcessEnv& env)
- : env(env), acceptor(service), peer_socket(service) {}
+ AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
+ : env(env), conf(conf), acceptor(service), peer_socket(service) {}
int init();
int run();
int AsioFrontend::init()
{
- auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
- ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
+ std::string port_str;
+ conf->get_val("port", "80", &port_str);
+ unsigned short port;
+ boost::asio::ip::address addr; // default to 'any'
boost::system::error_code ec;
+
+ auto colon = port_str.find(':');
+ if (colon != port_str.npos) {
+ addr = boost::asio::ip::make_address(port_str.substr(0, colon), ec);
+ if (ec) {
+ lderr(ctx()) << "failed to parse address '" << port_str << "': " << ec.message() << dendl;
+ return -ec.value();
+ }
+ port = std::stoul(port_str.substr(colon + 1), nullptr, 0);
+ } else {
+ port = std::stoul(port_str, nullptr, 0);
+ }
+
+ tcp::endpoint ep = {addr, port};
+ ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
+
acceptor.open(ep.protocol(), ec);
if (ec) {
lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
throw ec;
}
auto socket = std::move(peer_socket);
- // spawn a coroutine to handle the connection
- boost::asio::spawn(service,
- [&] (boost::asio::yield_context yield) {
- handle_connection(env, std::move(socket), yield);
- });
acceptor.async_accept(peer_socket,
[this] (boost::system::error_code ec) {
return accept(ec);
});
+
+ boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
+ conn->on_connect();
+ // reference drops here, but on_connect() takes another
}
int AsioFrontend::run()
going_down = true;
boost::system::error_code ec;
- acceptor.close(ec); // unblock the run() threads
+ acceptor.close(ec);
+
+ // unblock the run() threads
+ service.stop();
}
void AsioFrontend::join()
{
ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
pauser.pause(threads.size(), [=] {
- // stop accepting but leave the port open
- boost::system::error_code ec;
- acceptor.cancel(ec);
+ // unblock the run() threads
+ service.stop();
});
ldout(ctx(), 4) << "frontend paused" << dendl;
}
env.auth_registry = std::move(auth_registry);
ldout(ctx(), 4) << "frontend unpaused" << dendl;
service.reset();
- acceptor.async_accept(peer_socket,
- [this] (boost::system::error_code ec) {
- return accept(ec);
- });
pauser.unpause();
}
class RGWAsioFrontend::Impl : public AsioFrontend {
public:
- Impl(const RGWProcessEnv& env) : AsioFrontend(env) {}
+ Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {}
};
-RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
- : impl(new Impl(env))
+RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
+ RGWFrontendConfig* conf)
+ : impl(new Impl(env, conf))
{
}