]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_asio_frontend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
index a265f184856eb4e8955ba893a18863a901e0ad8f..d6001a67f16ca26aa12dc34c770d1854297b8997 100644 (file)
@@ -8,6 +8,7 @@
 
 #include <boost/asio.hpp>
 #include <boost/intrusive/list.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
 
 #include <boost/context/protected_fixedsize_stack.hpp>
 #include <spawn/spawn.hpp>
@@ -21,7 +22,7 @@
 
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
 #include <boost/asio/ssl.hpp>
-#include <boost/beast/ssl/ssl_stream.hpp>
+#endif
 
 #include "common/split.h"
 
@@ -30,8 +31,7 @@
 
 #include "rgw_zone.h"
 
-#endif
-
+#include "rgw_asio_frontend_timer.h"
 #include "rgw_dmclock_async_scheduler.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -44,44 +44,56 @@ namespace http = boost::beast::http;
 namespace ssl = boost::asio::ssl;
 #endif
 
-using parse_buffer = boost::beast::flat_static_buffer<65536>;
+struct Connection;
+
+// use explicit executor types instead of the type-erased boost::asio::executor
+using executor_type = boost::asio::io_context::executor_type;
+
+using tcp_socket = boost::asio::basic_stream_socket<tcp, executor_type>;
+using tcp_stream = boost::beast::basic_stream<tcp, executor_type>;
+
+using timeout_timer = rgw::basic_timeout_timer<ceph::coarse_mono_clock,
+      executor_type, Connection>;
+
+static constexpr size_t parse_buffer_size = 65536;
+using parse_buffer = boost::beast::flat_static_buffer<parse_buffer_size>;
 
 // use mmap/mprotect to allocate 512k coroutine stacks
 auto make_stack_allocator() {
   return boost::context::protected_fixedsize_stack{512*1024};
 }
 
+using namespace std;
+
 template <typename Stream>
 class StreamIO : public rgw::asio::ClientIO {
   CephContext* const cct;
   Stream& stream;
-  spawn::yield_context yield;
+  timeout_timer& timeout;
+  yield_context yield;
   parse_buffer& buffer;
-  ceph::timespan request_timeout;
  public:
-  StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
-           spawn::yield_context yield,
+  StreamIO(CephContext *cct, Stream& stream, timeout_timer& timeout,
+           rgw::asio::parser_type& parser, yield_context yield,
            parse_buffer& buffer, bool is_ssl,
            const tcp::endpoint& local_endpoint,
-           const tcp::endpoint& remote_endpoint,
-           ceph::timespan request_timeout)
+           const tcp::endpoint& remote_endpoint)
       : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
-        cct(cct), stream(stream), yield(yield), buffer(buffer), request_timeout(request_timeout)
+        cct(cct), stream(stream), timeout(timeout), yield(yield),
+        buffer(buffer)
   {}
 
   size_t write_data(const char* buf, size_t len) override {
     boost::system::error_code ec;
-    auto& timeout = get_lowest_layer(stream);
-    if (request_timeout.count()) {
-      timeout.expires_after(request_timeout);
-    }
+    timeout.start();
     auto bytes = boost::asio::async_write(stream, boost::asio::buffer(buf, len),
                                           yield[ec]);
+    timeout.cancel();
     if (ec) {
       ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
-      if (ec==boost::asio::error::broken_pipe) {
+      if (ec == boost::asio::error::broken_pipe) {
         boost::system::error_code ec_ignored;
-        timeout.socket().shutdown(tcp::socket::shutdown_both, ec_ignored);
+        stream.lowest_layer().shutdown(tcp_socket::shutdown_both, ec_ignored);
       }
       throw rgw::io::Exception(ec.value(), std::system_category());
     }
@@ -89,7 +101,6 @@ class StreamIO : public rgw::asio::ClientIO {
   }
 
   size_t recv_body(char* buf, size_t max) override {
-    auto& timeout = get_lowest_layer(stream);
     auto& message = parser.get();
     auto& body_remaining = message.body();
     body_remaining.data = buf;
@@ -97,10 +108,9 @@ class StreamIO : public rgw::asio::ClientIO {
 
     while (body_remaining.size && !parser.is_done()) {
       boost::system::error_code ec;
-      if (request_timeout.count()) {
-        timeout.expires_after(request_timeout);
-      }
+      timeout.start();
       http::async_read_some(stream, buffer, parser, yield[ec]);
+      timeout.cancel();
       if (ec == http::error::need_buffer) {
         break;
       }
@@ -169,15 +179,13 @@ using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_t
 template <typename Stream>
 void handle_connection(boost::asio::io_context& context,
                        RGWProcessEnv& env, Stream& stream,
+                       timeout_timer& timeout, size_t header_limit,
                        parse_buffer& buffer, bool is_ssl,
                        SharedMutex& pause_mutex,
                        rgw::dmclock::Scheduler *scheduler,
                        boost::system::error_code& ec,
-                       spawn::yield_context yield,
-                       ceph::timespan request_timeout)
+                       yield_context yield)
 {
-  // limit header to 4k, since we read it all into a single flat_buffer
-  static constexpr size_t header_limit = 4096;
   // don't impose a limit on the body, since we read it in pieces
   static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
 
@@ -189,12 +197,10 @@ void handle_connection(boost::asio::io_context& context,
     rgw::asio::parser_type parser;
     parser.header_limit(header_limit);
     parser.body_limit(body_limit);
-    auto& timeout = get_lowest_layer(stream);
-    if (request_timeout.count()) {
-      timeout.expires_after(request_timeout);
-    }
+    timeout.start();
     // parse the header
     http::async_read_header(stream, buffer, parser, yield[ec]);
+    timeout.cancel();
     if (ec == boost::asio::error::connection_reset ||
         ec == boost::asio::error::bad_descriptor ||
         ec == boost::asio::error::operation_aborted ||
@@ -212,10 +218,9 @@ void handle_connection(boost::asio::io_context& context,
       response.result(http::status::bad_request);
       response.version(message.version() == 10 ? 10 : 11);
       response.prepare_payload();
-      if (request_timeout.count()) {
-        timeout.expires_after(request_timeout);
-      }
+      timeout.start();
       http::async_write(stream, response, yield[ec]);
+      timeout.cancel();
       if (ec) {
         ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
       }
@@ -233,18 +238,18 @@ void handle_connection(boost::asio::io_context& context,
       }
 
       // process the request
-      RGWRequest req{env.store->getRados()->get_new_req_id()};
+      RGWRequest req{env.store->get_new_req_id()};
 
-      auto& socket = get_lowest_layer(stream).socket();
+      auto& socket = stream.lowest_layer();
       const auto& remote_endpoint = socket.remote_endpoint(ec);
       if (ec) {
         ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
         return;
       }
 
-      StreamIO real_client{cct, stream, parser, yield, buffer, is_ssl,
-                           socket.local_endpoint(),
-                           remote_endpoint,request_timeout};
+      StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
+                           is_ssl, socket.local_endpoint(),
+                           remote_endpoint};
 
       auto real_client_io = rgw::io::add_reordering(
                               rgw::io::add_buffering(cct,
@@ -252,15 +257,19 @@ void handle_connection(boost::asio::io_context& context,
                                   rgw::io::add_conlen_controlling(
                                     &real_client))));
       RGWRestfulIO client(cct, &real_client_io);
-      auto y = optional_yield{context, yield};
+      optional_yield y = null_yield;
+      if (cct->_conf->rgw_beast_enable_async) {
+        y = optional_yield{context, yield};
+      }
       int http_ret = 0;
       string user = "-";
       const auto started = ceph::coarse_real_clock::now();
       ceph::coarse_real_clock::duration latency{};
-
       process_request(env.store, env.rest, &req, env.uri_prefix,
                       *env.auth_registry, &client, env.olog, y,
-                      scheduler, &user, &latency, &http_ret);
+                      scheduler, &user, &latency,
+                      env.ratelimiting->get_active(),
+                      &http_ret);
 
       if (cct->_conf->subsys.should_gather(dout_subsys, 1)) {
         // access log line elements begin per Apache Combined Log Format with additions following
@@ -289,10 +298,9 @@ void handle_connection(boost::asio::io_context& context,
       body.size = discard_buffer.size();
       body.data = discard_buffer.data();
 
-      if (request_timeout.count()) {
-        timeout.expires_after(request_timeout);
-      }
+      timeout.start();
       http::async_read_some(stream, buffer, parser, yield[ec]);
+      timeout.cancel();
       if (ec == http::error::need_buffer) {
         continue;
       }
@@ -308,9 +316,20 @@ void handle_connection(boost::asio::io_context& context,
   }
 }
 
-struct Connection : boost::intrusive::list_base_hook<> {
-  tcp::socket& socket;
-  Connection(tcp::socket& socket) : socket(socket) {}
+// timeout support requires that connections are reference-counted, because the
+// timeout_handler can outlive the coroutine
+struct Connection : boost::intrusive::list_base_hook<>,
+                    boost::intrusive_ref_counter<Connection>
+{
+  tcp_socket socket;
+  parse_buffer buffer;
+
+  explicit Connection(tcp_socket&& socket) noexcept
+      : socket(std::move(socket)) {}
+
+  void close(boost::system::error_code& ec) {
+    socket.close(ec);
+  }
 };
 
 class ConnectionList {
@@ -352,6 +371,7 @@ class AsioFrontend {
   RGWFrontendConfig* conf;
   boost::asio::io_context context;
   ceph::timespan request_timeout = std::chrono::milliseconds(REQUEST_TIMEOUT);
+  size_t header_limit = 16384;
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   boost::optional<ssl::context> ssl_context;
   int get_config_key_val(string name,
@@ -367,7 +387,7 @@ class AsioFrontend {
   struct Listener {
     tcp::endpoint endpoint;
     tcp::acceptor acceptor;
-    tcp::socket socket;
+    tcp_socket socket;
     bool use_ssl = false;
     bool use_nodelay = false;
 
@@ -419,7 +439,7 @@ class AsioFrontend {
   void stop();
   void join();
   void pause();
-  void unpause(rgw::sal::RGWRadosStore* store, rgw_auth_registry_ptr_t);
+  void unpause(rgw::sal::Store* store, rgw_auth_registry_ptr_t);
 };
 
 unsigned short parse_port(const char *input, boost::system::error_code& ec)
@@ -514,15 +534,32 @@ int AsioFrontend::init()
 // Setting global timeout
   auto timeout = config.find("request_timeout_ms");
   if (timeout != config.end()) {
-    auto timeout_number = ceph::parse<uint64_t>(timeout->second.data());
+    auto timeout_number = ceph::parse<uint64_t>(timeout->second);
     if (timeout_number) {
       request_timeout =  std::chrono::milliseconds(*timeout_number);
     } else {
       lderr(ctx()) << "WARNING: invalid value for request_timeout_ms: "
-      << timeout->second.data() << " setting it to the default value: "
+      << timeout->second << " setting it to the default value: "
       << REQUEST_TIMEOUT << dendl;
     }
-  } 
+  }
+
+  auto max_header_size = config.find("max_header_size");
+  if (max_header_size != config.end()) {
+    auto limit = ceph::parse<uint64_t>(max_header_size->second);
+    if (!limit) {
+      lderr(ctx()) << "WARNING: invalid value for max_header_size: "
+          << max_header_size->second << ", using the default value: "
+          << header_limit << dendl;
+    } else if (*limit > parse_buffer_size) { // can't exceed parse buffer size
+      header_limit = parse_buffer_size;
+      lderr(ctx()) << "WARNING: max_header_size " << max_header_size->second
+          << " capped at maximum value " << header_limit << dendl;
+    } else {
+      header_limit = *limit;
+    }
+  }
+
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   int r = init_ssl();
   if (r < 0) {
@@ -633,13 +670,13 @@ class ExpandMetaVar {
   map<string, string> meta_map;
 
 public:
-  ExpandMetaVar(RGWSI_Zone *zone_svc) {
+  ExpandMetaVar(rgw::sal::Zone* zone_svc) {
     meta_map["realm"] = zone_svc->get_realm().get_name();
     meta_map["realm_id"] = zone_svc->get_realm().get_id();
     meta_map["zonegroup"] = zone_svc->get_zonegroup().get_name();
     meta_map["zonegroup_id"] = zone_svc->get_zonegroup().get_id();
-    meta_map["zone"] = zone_svc->zone_name();
-    meta_map["zone_id"] = zone_svc->zone_id().id;
+    meta_map["zone"] = zone_svc->get_name();
+    meta_map["zone_id"] = zone_svc->get_id().id;
   }
 
   string process_str(const string& in);
@@ -713,8 +750,7 @@ int AsioFrontend::get_config_key_val(string name,
     return -EINVAL;
   }
 
-  auto svc = env.store->svc()->config_key;
-  int r = svc->get(name, true, pbl);
+  int r = env.store->get_config_key_val(name, pbl);
   if (r < 0) {
     lderr(ctx()) << type << " was not found: " << name << dendl;
     return r;
@@ -870,7 +906,7 @@ int AsioFrontend::init_ssl()
       key_is_cert = true;
     }
 
-    ExpandMetaVar emv(env.store->svc()->zone);
+    ExpandMetaVar emv(env.store->get_zone());
 
     cert = emv.process_str(*cert);
     key = emv.process_str(*key);
@@ -938,58 +974,57 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
     ldout(ctx(), 1) << "accept failed: " << ec.message() << dendl;
     return;
   }
-  auto socket = std::move(l.socket);
-  tcp::no_delay options(l.use_nodelay);
-  socket.set_option(options,ec);
+  auto stream = std::move(l.socket);
+  stream.set_option(tcp::no_delay(l.use_nodelay), ec);
   l.acceptor.async_accept(l.socket,
                           [this, &l] (boost::system::error_code ec) {
                             accept(l, ec);
                           });
   
-  boost::beast::tcp_stream stream(std::move(socket));
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
     spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
-        Connection conn{s.socket()};
-        auto c = connections.add(conn);
-        // wrap the tcp_stream in an ssl stream
-        boost::beast::ssl_stream<boost::beast::tcp_stream&> stream{s, *ssl_context};
-        auto buffer = std::make_unique<parse_buffer>();
+      [this, s=std::move(stream)] (yield_context yield) mutable {
+        auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
+        auto c = connections.add(*conn);
+        // wrap the tcp stream in an ssl stream
+        boost::asio::ssl::stream<tcp_socket&> stream{conn->socket, *ssl_context};
+        auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
         // do ssl handshake
         boost::system::error_code ec;
-        if (request_timeout.count()) {
-          get_lowest_layer(stream).expires_after(request_timeout);
-        }
+        timeout.start();
         auto bytes = stream.async_handshake(ssl::stream_base::server,
-                                            buffer->data(), yield[ec]);
+                                            conn->buffer.data(), yield[ec]);
+        timeout.cancel();
         if (ec) {
           ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
           return;
         }
-        buffer->consume(bytes);
-        handle_connection(context, env, stream, *buffer, true, pause_mutex,
-                          scheduler.get(), ec, yield, request_timeout);
+        conn->buffer.consume(bytes);
+        handle_connection(context, env, stream, timeout, header_limit,
+                          conn->buffer, true, pause_mutex, scheduler.get(),
+                          ec, yield);
         if (!ec) {
           // ssl shutdown (ignoring errors)
           stream.async_shutdown(yield[ec]);
         }
-        s.socket().shutdown(tcp::socket::shutdown_both, ec);
+        conn->socket.shutdown(tcp::socket::shutdown_both, ec);
       }, make_stack_allocator());
   } else {
 #else
   {
 #endif // WITH_RADOSGW_BEAST_OPENSSL
     spawn::spawn(context,
-      [this, s=std::move(stream)] (spawn::yield_context yield) mutable {
-        Connection conn{s.socket()};
-        auto c = connections.add(conn);
-        auto buffer = std::make_unique<parse_buffer>();
+      [this, s=std::move(stream)] (yield_context yield) mutable {
+        auto conn = boost::intrusive_ptr{new Connection(std::move(s))};
+        auto c = connections.add(*conn);
+        auto timeout = timeout_timer{context.get_executor(), request_timeout, conn};
         boost::system::error_code ec;
-        handle_connection(context, env, s, *buffer, false, pause_mutex,
-                          scheduler.get(), ec, yield, request_timeout);
-        s.socket().shutdown(tcp::socket::shutdown_both, ec);
+        handle_connection(context, env, conn->socket, timeout, header_limit,
+                          conn->buffer, false, pause_mutex, scheduler.get(),
+                          ec, yield);
+        conn->socket.shutdown(tcp_socket::shutdown_both, ec);
       }, make_stack_allocator());
   }
 }
@@ -1007,11 +1042,12 @@ int AsioFrontend::run()
   work.emplace(boost::asio::make_work_guard(context));
 
   for (int i = 0; i < thread_count; i++) {
-    threads.emplace_back([=] {
+    threads.emplace_back([=]() noexcept {
       // request warnings on synchronous librados calls in this thread
       is_asio_thread = true;
-      boost::system::error_code ec;
-      context.run(ec);
+      // Have uncaught exceptions kill the process and give a
+      // stacktrace, not be swallowed.
+      context.run();
     });
   }
   return 0;
@@ -1067,7 +1103,7 @@ void AsioFrontend::pause()
   }
 }
 
-void AsioFrontend::unpause(rgw::sal::RGWRadosStore* const store,
+void AsioFrontend::unpause(rgw::sal::Store* const store,
                            rgw_auth_registry_ptr_t auth_registry)
 {
   env.store = store;
@@ -1131,7 +1167,7 @@ void RGWAsioFrontend::pause_for_new_config()
 }
 
 void RGWAsioFrontend::unpause_with_new_config(
-  rgw::sal::RGWRadosStore* const store,
+  rgw::sal::Store* const store,
   rgw_auth_registry_ptr_t auth_registry
 ) {
   impl->unpause(store, std::move(auth_registry));