]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_asio_frontend.cc
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
index ee6be6256a5ba69b588c612a517b4d190eeee5db..61e011c7c4b5f91e08a2acefff89730692ffde42 100644 (file)
@@ -1,27 +1,19 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
+#include <atomic>
 #include <condition_variable>
 #include <mutex>
 #include <thread>
 #include <vector>
 
 #include <boost/asio.hpp>
-#include <boost/asio/spawn.hpp>
 
-#include <beast/core/placeholders.hpp>
-#include <beast/http/read.hpp>
-#include <beast/http/string_body.hpp>
-#include <beast/http/write.hpp>
-
-#include "rgw_asio_frontend.h"
 #include "rgw_asio_client.h"
+#include "rgw_asio_frontend.h"
 
 #define dout_subsys ceph_subsys_rgw
 
-#undef dout_prefix
-#define dout_prefix (*_dout << "asio: ")
-
 namespace {
 
 class Pauser {
@@ -68,62 +60,137 @@ void Pauser::wait()
 }
 
 using tcp = boost::asio::ip::tcp;
+namespace beast = boost::beast;
 
-// coroutine to handle a client connection to completion
-static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
-                              boost::asio::yield_context yield)
-{
-  auto cct = env.store->ctx();
-  boost::system::error_code ec;
+class Connection {
+  RGWProcessEnv& env;
+  boost::asio::io_service::strand strand;
+  tcp::socket socket;
+
+  // references are bound to callbacks for async operations. if a callback
+  // function returns without issuing another operation, the reference is
+  // dropped and the Connection is deleted/closed
+  std::atomic<int> nref{0};
+  using Ref = boost::intrusive_ptr<Connection>;
+
+  // limit header to 4k, since we read it all into a single flat_buffer
+  static constexpr size_t header_limit = 4096;
+  // don't impose a limit on the body, since we read it in pieces
+  static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
+
+  beast::flat_buffer buffer;
+  boost::optional<rgw::asio::parser_type> parser;
+
+  using bad_response_type = beast::http::response<beast::http::empty_body>;
+  boost::optional<bad_response_type> response;
+
+  CephContext* ctx() const { return env.store->ctx(); }
 
-  beast::flat_streambuf buffer{1024};
+  void read_header() {
+    // configure the parser
+    parser.emplace();
+    parser->header_limit(header_limit);
+    parser->body_limit(body_limit);
 
-  // read messages from the socket until eof
-  for (;;) {
     // parse the header
-    rgw::asio::parser_type parser;
-    do {
-      auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
-      buffer.consume(bytes);
-    } while (!ec && !parser.got_header());
+    beast::http::async_read_header(socket, buffer, *parser, strand.wrap(
+            std::bind(&Connection::on_header, Ref{this},
+                      std::placeholders::_1)));
+  }
+
+  void discard_unread_message() {
+    if (parser->is_done()) {
+      // nothing left to discard, start reading the next message
+      read_header();
+      return;
+    }
+
+    // read the rest of the request into a static buffer. multiple clients could
+    // write at the same time, but this is okay because we never read it back
+    static std::array<char, 1024> discard_buffer;
+
+    auto& body = parser->get().body();
+    body.size = discard_buffer.size();
+    body.data = discard_buffer.data();
 
+    beast::http::async_read_some(socket, buffer, *parser, strand.wrap(
+            std::bind(&Connection::on_discard_unread, Ref{this},
+                      std::placeholders::_1)));
+  }
+
+  void on_discard_unread(boost::system::error_code ec) {
+    if (ec == boost::asio::error::connection_reset) {
+      return;
+    }
+    if (ec) {
+      ldout(ctx(), 5) << "discard_unread_message failed: "
+          << ec.message() << dendl;
+      return;
+    }
+    discard_unread_message();
+  }
+
+  void on_write_error(boost::system::error_code ec) {
+    if (ec) {
+      ldout(ctx(), 5) << "failed to write response: " << ec.message() << dendl;
+    }
+  }
+
+  void on_header(boost::system::error_code ec) {
     if (ec == boost::asio::error::connection_reset ||
-        ec == boost::asio::error::eof) {
+        ec == beast::http::error::end_of_stream) {
       return;
     }
     if (ec) {
-      auto& message = parser.get();
-      ldout(cct, 1) << "read failed: " << ec.message() << dendl;
-      ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
-      beast::http::response<beast::http::string_body> response;
-      response.status = 400;
-      response.reason = "Bad Request";
-      response.version = message.version == 10 ? 10 : 11;
-      beast::http::prepare(response);
-      beast::http::async_write(socket, std::move(response), yield[ec]);
-      // ignore ec
+      auto& message = parser->get();
+      ldout(ctx(), 1) << "failed to read header: " << ec.message() << dendl;
+      ldout(ctx(), 1) << "====== req done http_status=400 ======" << dendl;
+      response.emplace();
+      response->result(beast::http::status::bad_request);
+      response->version(message.version() == 10 ? 10 : 11);
+      response->prepare_payload();
+      beast::http::async_write(socket, *response, strand.wrap(
+            std::bind(&Connection::on_write_error, Ref{this},
+                      std::placeholders::_1)));
       return;
     }
 
     // process the request
     RGWRequest req{env.store->get_new_req_id()};
 
-    rgw::asio::ClientIO real_client{socket, parser, buffer};
+    rgw::asio::ClientIO real_client{socket, *parser, buffer};
 
     auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(cct,
+                            rgw::io::add_buffering(ctx(),
                               rgw::io::add_chunking(
                                 rgw::io::add_conlen_controlling(
                                   &real_client))));
-    RGWRestfulIO client(cct, &real_client_io);
+    RGWRestfulIO client(ctx(), &real_client_io);
     process_request(env.store, env.rest, &req, env.uri_prefix,
                     *env.auth_registry, &client, env.olog);
 
-    if (real_client.get_conn_close()) {
-      return;
+    if (parser->keep_alive()) {
+      // parse any unread bytes from the previous message (in case we replied
+      // before reading the entire body) before reading the next
+      discard_unread_message();
     }
   }
-}
+
+ public:
+  Connection(RGWProcessEnv& env, tcp::socket&& socket)
+    : env(env), strand(socket.get_io_service()), socket(std::move(socket)) {}
+
+  void on_connect() {
+    read_header();
+  }
+
+  void get() { ++nref; }
+  void put() { if (nref.fetch_sub(1) == 1) { delete this; } }
+
+  friend void intrusive_ptr_add_ref(Connection *c) { c->get(); }
+  friend void intrusive_ptr_release(Connection *c) { c->put(); }
+};
+
 
 class AsioFrontend {
   RGWProcessEnv env;
@@ -188,15 +255,14 @@ void AsioFrontend::accept(boost::system::error_code ec)
     throw ec;
   }
   auto socket = std::move(peer_socket);
-  // spawn a coroutine to handle the connection
-  boost::asio::spawn(service,
-                     [&] (boost::asio::yield_context yield) {
-                       handle_connection(env, std::move(socket), yield);
-                     });
   acceptor.async_accept(peer_socket,
                         [this] (boost::system::error_code ec) {
                           return accept(ec);
                         });
+
+  boost::intrusive_ptr<Connection> conn{new Connection(env, std::move(socket))};
+  conn->on_connect();
+  // reference drops here, but on_connect() takes another
 }
 
 int AsioFrontend::run()
@@ -228,7 +294,10 @@ void AsioFrontend::stop()
   going_down = true;
 
   boost::system::error_code ec;
-  acceptor.close(ec); // unblock the run() threads
+  acceptor.close(ec);
+
+  // unblock the run() threads
+  service.stop();
 }
 
 void AsioFrontend::join()
@@ -247,9 +316,8 @@ void AsioFrontend::pause()
 {
   ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
   pauser.pause(threads.size(), [=] {
-    // stop accepting but leave the port open
-    boost::system::error_code ec;
-    acceptor.cancel(ec);
+    // unblock the run() threads
+    service.stop();
   });
   ldout(ctx(), 4) << "frontend paused" << dendl;
 }
@@ -261,10 +329,6 @@ void AsioFrontend::unpause(RGWRados* const store,
   env.auth_registry = std::move(auth_registry);
   ldout(ctx(), 4) << "frontend unpaused" << dendl;
   service.reset();
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
   pauser.unpause();
 }