]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_asio_frontend.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
index e974ae7bf8e0c686298a65bd0dd0b1481b250b67..8431be0b89557f6e03b538866de9b05e50e8cd16 100644 (file)
@@ -2,14 +2,15 @@
 // vim: ts=8 sw=2 smarttab
 
 #include <atomic>
-#include <condition_variable>
-#include <mutex>
 #include <thread>
 #include <vector>
 
 #include <boost/asio.hpp>
+#define BOOST_COROUTINES_NO_DEPRECATION_WARNING
 #include <boost/asio/spawn.hpp>
+#include <boost/intrusive/list.hpp>
 
+#include "common/async/shared_mutex.h"
 #include "common/errno.h"
 
 #include "rgw_asio_client.h"
 #include <boost/asio/ssl.hpp>
 #endif
 
+#include "rgw_dmclock_async_scheduler.h"
+
 #define dout_subsys ceph_subsys_rgw
 
 namespace {
 
-class Pauser {
-  std::mutex mutex;
-  std::condition_variable cond_ready; // signaled on ready==true
-  std::condition_variable cond_paused; // signaled on waiters==thread_count
-  bool ready{false};
-  int waiters{0};
- public:
-  template <typename Func>
-  void pause(int thread_count, Func&& func);
-  void unpause();
-  void wait();
-};
-
-template <typename Func>
-void Pauser::pause(int thread_count, Func&& func)
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ready = false;
-  lock.unlock();
-
-  func();
-
-  // wait for all threads to pause
-  lock.lock();
-  cond_paused.wait(lock, [=] { return waiters == thread_count; });
-}
-
-void Pauser::unpause()
-{
-  std::lock_guard<std::mutex> lock(mutex);
-  ready = true;
-  cond_ready.notify_all();
-}
-
-void Pauser::wait()
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ++waiters;
-  cond_paused.notify_one(); // notify pause() that we're waiting
-  cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
-  --waiters;
-}
-
 using tcp = boost::asio::ip::tcp;
-namespace beast = boost::beast;
+namespace http = boost::beast::http;
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
 namespace ssl = boost::asio::ssl;
 #endif
 
 template <typename Stream>
 class StreamIO : public rgw::asio::ClientIO {
+  CephContext* const cct;
   Stream& stream;
-  beast::flat_buffer& buffer;
+  boost::beast::flat_buffer& buffer;
  public:
-  StreamIO(Stream& stream, rgw::asio::parser_type& parser,
-           beast::flat_buffer& buffer, bool is_ssl,
+  StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
+           boost::beast::flat_buffer& buffer, bool is_ssl,
            const tcp::endpoint& local_endpoint,
            const tcp::endpoint& remote_endpoint)
       : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
-        stream(stream), buffer(buffer)
+        cct(cct), stream(stream), buffer(buffer)
   {}
 
   size_t write_data(const char* buf, size_t len) override {
     boost::system::error_code ec;
     auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec);
     if (ec) {
-      derr << "write_data failed: " << ec.message() << dendl;
+      ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
       throw rgw::io::Exception(ec.value(), std::system_category());
     }
     return bytes;
@@ -103,13 +64,13 @@ class StreamIO : public rgw::asio::ClientIO {
 
     while (body_remaining.size && !parser.is_done()) {
       boost::system::error_code ec;
-      beast::http::read_some(stream, buffer, parser, ec);
-      if (ec == beast::http::error::partial_message ||
-          ec == beast::http::error::need_buffer) {
+      http::read_some(stream, buffer, parser, ec);
+      if (ec == http::error::partial_message ||
+          ec == http::error::need_buffer) {
         break;
       }
       if (ec) {
-        derr << "failed to read body: " << ec.message() << dendl;
+        ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
         throw rgw::io::Exception(ec.value(), std::system_category());
       }
     }
@@ -117,9 +78,13 @@ class StreamIO : public rgw::asio::ClientIO {
   }
 };
 
+using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
+
 template <typename Stream>
 void handle_connection(RGWProcessEnv& env, Stream& stream,
-                       beast::flat_buffer& buffer, bool is_ssl,
+                       boost::beast::flat_buffer& buffer, bool is_ssl,
+                       SharedMutex& pause_mutex,
+                       rgw::dmclock::Scheduler *scheduler,
                        boost::system::error_code& ec,
                        boost::asio::yield_context yield)
 {
@@ -138,22 +103,25 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
     parser.body_limit(body_limit);
 
     // parse the header
-    beast::http::async_read_header(stream, buffer, parser, yield[ec]);
+    http::async_read_header(stream, buffer, parser, yield[ec]);
     if (ec == boost::asio::error::connection_reset ||
+        ec == boost::asio::error::bad_descriptor ||
+        ec == boost::asio::error::operation_aborted ||
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
         ec == ssl::error::stream_truncated ||
 #endif
-        ec == beast::http::error::end_of_stream) {
+        ec == http::error::end_of_stream) {
+      ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
       return;
     }
     if (ec) {
       ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
       auto& message = parser.get();
-      beast::http::response<beast::http::empty_body> response;
-      response.result(beast::http::status::bad_request);
+      http::response<http::empty_body> response;
+      response.result(http::status::bad_request);
       response.version(message.version() == 10 ? 10 : 11);
       response.prepare_payload();
-      beast::http::async_write(stream, response, yield[ec]);
+      http::async_write(stream, response, yield[ec]);
       if (ec) {
         ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
       }
@@ -161,22 +129,39 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
       return;
     }
 
-    // process the request
-    RGWRequest req{env.store->get_new_req_id()};
+    {
+      auto lock = pause_mutex.async_lock_shared(yield[ec]);
+      if (ec == boost::asio::error::operation_aborted) {
+        return;
+      } else if (ec) {
+        ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
+        return;
+      }
+
+      // process the request
+      RGWRequest req{env.store->get_new_req_id()};
 
-    auto& socket = stream.lowest_layer();
-    StreamIO<Stream> real_client{stream, parser, buffer, is_ssl,
-                                 socket.local_endpoint(),
-                                 socket.remote_endpoint()};
+      auto& socket = stream.lowest_layer();
+      const auto& remote_endpoint = socket.remote_endpoint(ec);
+      if (ec) {
+        ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
+        return;
+      }
 
-    auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(cct,
-                              rgw::io::add_chunking(
-                                rgw::io::add_conlen_controlling(
-                                  &real_client))));
-    RGWRestfulIO client(cct, &real_client_io);
-    process_request(env.store, env.rest, &req, env.uri_prefix,
-                    *env.auth_registry, &client, env.olog);
+      StreamIO real_client{cct, stream, parser, buffer, is_ssl,
+                           socket.local_endpoint(),
+                           remote_endpoint};
+
+      auto real_client_io = rgw::io::add_reordering(
+                              rgw::io::add_buffering(cct,
+                                rgw::io::add_chunking(
+                                  rgw::io::add_conlen_controlling(
+                                    &real_client))));
+      RGWRestfulIO client(cct, &real_client_io);
+      auto y = optional_yield{socket.get_io_context(), yield};
+      process_request(env.store, env.rest, &req, env.uri_prefix,
+                      *env.auth_registry, &client, env.olog, y, scheduler);
+    }
 
     if (!parser.keep_alive()) {
       return;
@@ -191,7 +176,7 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
       body.size = discard_buffer.size();
       body.data = discard_buffer.data();
 
-      beast::http::async_read_some(stream, buffer, parser, yield[ec]);
+      http::async_read_some(stream, buffer, parser, yield[ec]);
       if (ec == boost::asio::error::connection_reset) {
         return;
       }
@@ -204,37 +189,105 @@ void handle_connection(RGWProcessEnv& env, Stream& stream,
   }
 }
 
+struct Connection : boost::intrusive::list_base_hook<> {
+  tcp::socket& socket;
+  Connection(tcp::socket& socket) : socket(socket) {}
+};
+
+class ConnectionList {
+  using List = boost::intrusive::list<Connection>;
+  List connections;
+  std::mutex mutex;
+
+  void remove(Connection& c) {
+    std::lock_guard lock{mutex};
+    if (c.is_linked()) {
+      connections.erase(List::s_iterator_to(c));
+    }
+  }
+ public:
+  class Guard {
+    ConnectionList *list;
+    Connection *conn;
+   public:
+    Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
+    ~Guard() { list->remove(*conn); }
+  };
+  [[nodiscard]] Guard add(Connection& conn) {
+    std::lock_guard lock{mutex};
+    connections.push_back(conn);
+    return Guard{this, &conn};
+  }
+  void close(boost::system::error_code& ec) {
+    std::lock_guard lock{mutex};
+    for (auto& conn : connections) {
+      conn.socket.close(ec);
+    }
+    connections.clear();
+  }
+};
+
+namespace dmc = rgw::dmclock;
 class AsioFrontend {
   RGWProcessEnv env;
   RGWFrontendConfig* conf;
-  boost::asio::io_service service;
+  boost::asio::io_context context;
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   boost::optional<ssl::context> ssl_context;
   int init_ssl();
 #endif
+  SharedMutex pause_mutex;
+  std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
 
   struct Listener {
     tcp::endpoint endpoint;
     tcp::acceptor acceptor;
     tcp::socket socket;
     bool use_ssl = false;
+    bool use_nodelay = false;
 
-    Listener(boost::asio::io_service& service)
-      : acceptor(service), socket(service) {}
+    explicit Listener(boost::asio::io_context& context)
+      : acceptor(context), socket(context) {}
   };
   std::vector<Listener> listeners;
 
+  ConnectionList connections;
+
+  // work guard to keep run() threads busy while listeners are paused
+  using Executor = boost::asio::io_context::executor_type;
+  std::optional<boost::asio::executor_work_guard<Executor>> work;
+
   std::vector<std::thread> threads;
-  Pauser pauser;
   std::atomic<bool> going_down{false};
 
   CephContext* ctx() const { return env.store->ctx(); }
-
+  std::optional<dmc::ClientCounters> client_counters;
+  std::unique_ptr<dmc::ClientConfig> client_config;
   void accept(Listener& listener, boost::system::error_code ec);
 
  public:
-  AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf)
-    : env(env), conf(conf) {}
+  AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+              dmc::SchedulerCtx& sched_ctx)
+    : env(env), conf(conf), pause_mutex(context.get_executor())
+  {
+    auto sched_t = dmc::get_scheduler_t(ctx());
+    switch(sched_t){
+    case dmc::scheduler_t::dmclock:
+      scheduler.reset(new dmc::AsyncScheduler(ctx(),
+                                              context,
+                                              std::ref(sched_ctx.get_dmc_client_counters()),
+                                              sched_ctx.get_dmc_client_config(),
+                                              *sched_ctx.get_dmc_client_config(),
+                                              dmc::AtLimit::Reject));
+      break;
+    case dmc::scheduler_t::none:
+      lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
+      [[fallthrough]];
+    case dmc::scheduler_t::throttler:
+      scheduler.reset(new dmc::SimpleThrottler(ctx()));
+
+    }
+  }
 
   int init();
   int run();
@@ -255,8 +308,8 @@ unsigned short parse_port(const char *input, boost::system::error_code& ec)
   }
   return port;
 }
-
-tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input,
+       
+tcp::endpoint parse_endpoint(boost::asio::string_view input,
                              unsigned short default_port,
                              boost::system::error_code& ec)
 {
@@ -283,6 +336,8 @@ tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input,
         auto port_str = input.substr(addr_end + 2);
         endpoint.port(parse_port(port_str.data(), ec));
       }
+    } else {
+      endpoint.port(default_port);
     }
     auto addr = input.substr(addr_begin, addr_end - addr_begin);
     endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
@@ -294,6 +349,8 @@ tcp::endpoint parse_endpoint(BOOST_ASIO_STRING_VIEW_PARAM input,
       if (ec) {
         return endpoint;
       }
+    } else {
+      endpoint.port(default_port);
     }
     auto addr = input.substr(0, colon);
     endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
@@ -344,8 +401,11 @@ int AsioFrontend::init()
       lderr(ctx()) << "failed to parse port=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint.port(port);
+
+    listeners.emplace_back(context);
+    listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
   }
 
   auto endpoints = config.equal_range("endpoint");
@@ -355,17 +415,42 @@ int AsioFrontend::init()
       lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint = endpoint;
   }
+  // parse tcp nodelay
+  auto nodelay = config.find("tcp_nodelay");
+  if (nodelay != config.end()) {
+    for (auto& l : listeners) {
+      l.use_nodelay = (nodelay->second == "1");
+    }
+  }
+  
 
+  bool socket_bound = false;
   // start listeners
   for (auto& l : listeners) {
     l.acceptor.open(l.endpoint.protocol(), ec);
     if (ec) {
+      if (ec == boost::asio::error::address_family_not_supported) {
+       ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
+                       << ", " << ec.message() << dendl;
+       continue;
+      }
+
       lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
       return -ec.value();
     }
+
+    if (l.endpoint.protocol() == tcp::v6()) {
+      l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
+      if (ec) {
+        lderr(ctx()) << "failed to set v6_only socket option: "
+                    << ec.message() << dendl;
+       return -ec.value();
+      }
+    }
+
     l.acceptor.set_option(tcp::acceptor::reuse_address(true));
     l.acceptor.bind(l.endpoint, ec);
     if (ec) {
@@ -373,6 +458,7 @@ int AsioFrontend::init()
           << ": " << ec.message() << dendl;
       return -ec.value();
     }
+
     l.acceptor.listen(boost::asio::socket_base::max_connections);
     l.acceptor.async_accept(l.socket,
                             [this, &l] (boost::system::error_code ec) {
@@ -380,7 +466,13 @@ int AsioFrontend::init()
                             });
 
     ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
+    socket_bound = true;
+  }
+  if (!socket_bound) {
+    lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
+    return -EINVAL;
   }
+
   return drop_privileges(ctx());
 }
 
@@ -442,9 +534,13 @@ int AsioFrontend::init_ssl()
       lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint.port(port);
     listeners.back().use_ssl = true;
+
+    listeners.emplace_back(context);
+    listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
+    listeners.back().use_ssl = true;
   }
 
   auto endpoints = config.equal_range("ssl_endpoint");
@@ -458,7 +554,7 @@ int AsioFrontend::init_ssl()
       lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
       return -ec.value();
     }
-    listeners.emplace_back(service);
+    listeners.emplace_back(context);
     listeners.back().endpoint = endpoint;
     listeners.back().use_ssl = true;
   }
@@ -476,6 +572,8 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
     throw ec;
   }
   auto socket = std::move(l.socket);
+  tcp::no_delay options(l.use_nodelay);
+  socket.set_option(options,ec);
   l.acceptor.async_accept(l.socket,
                           [this, &l] (boost::system::error_code ec) {
                             accept(l, ec);
@@ -484,11 +582,13 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
   // spawn a coroutine to handle the connection
 #ifdef WITH_RADOSGW_BEAST_OPENSSL
   if (l.use_ssl) {
-    boost::asio::spawn(service, std::bind(
-      [this] (boost::asio::yield_context yield, tcp::socket& s) mutable {
+    boost::asio::spawn(context,
+      [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+        Connection conn{s};
+        auto c = connections.add(conn);
         // wrap the socket in an ssl stream
         ssl::stream<tcp::socket&> stream{s, *ssl_context};
-        beast::flat_buffer buffer;
+        boost::beast::flat_buffer buffer;
         // do ssl handshake
         boost::system::error_code ec;
         auto bytes = stream.async_handshake(ssl::stream_base::server,
@@ -498,24 +598,28 @@ void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
           return;
         }
         buffer.consume(bytes);
-        handle_connection(env, stream, buffer, true, ec, yield);
+        handle_connection(env, stream, buffer, true, pause_mutex,
+                          scheduler.get(), ec, yield);
         if (!ec) {
           // ssl shutdown (ignoring errors)
           stream.async_shutdown(yield[ec]);
         }
         s.shutdown(tcp::socket::shutdown_both, ec);
-      }, std::placeholders::_1, std::move(socket)));
+      });
   } else {
 #else
   {
 #endif // WITH_RADOSGW_BEAST_OPENSSL
-    boost::asio::spawn(service, std::bind(
-      [this] (boost::asio::yield_context yield, tcp::socket& s) mutable {
-        beast::flat_buffer buffer;
+    boost::asio::spawn(context,
+      [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+        Connection conn{s};
+        auto c = connections.add(conn);
+        boost::beast::flat_buffer buffer;
         boost::system::error_code ec;
-        handle_connection(env, s, buffer, false, ec, yield);
+        handle_connection(env, s, buffer, false, pause_mutex,
+                          scheduler.get(), ec, yield);
         s.shutdown(tcp::socket::shutdown_both, ec);
-      }, std::placeholders::_1, std::move(socket)));
+      });
   }
 }
 
@@ -527,15 +631,16 @@ int AsioFrontend::run()
 
   ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
 
+  // the worker threads call io_context::run(), which will return when there's
+  // no work left. hold a work guard to keep these threads going until join()
+  work.emplace(boost::asio::make_work_guard(context));
+
   for (int i = 0; i < thread_count; i++) {
     threads.emplace_back([=] {
-      for (;;) {
-        service.run();
-        if (going_down) {
-          break;
-        }
-        pauser.wait();
-      }
+      // request warnings on synchronous librados calls in this thread
+      is_asio_thread = true;
+      boost::system::error_code ec;
+      context.run(ec);
     });
   }
   return 0;
@@ -552,9 +657,9 @@ void AsioFrontend::stop()
   for (auto& listener : listeners) {
     listener.acceptor.close(ec);
   }
-
-  // unblock the run() threads
-  service.stop();
+  // close all connections
+  connections.close(ec);
+  pause_mutex.cancel();
 }
 
 void AsioFrontend::join()
@@ -562,6 +667,8 @@ void AsioFrontend::join()
   if (!going_down) {
     stop();
   }
+  work.reset();
+
   ldout(ctx(), 4) << "frontend joining threads..." << dendl;
   for (auto& thread : threads) {
     thread.join();
@@ -571,12 +678,22 @@ void AsioFrontend::join()
 
 void AsioFrontend::pause()
 {
-  ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
-  pauser.pause(threads.size(), [=] {
-    // unblock the run() threads
-    service.stop();
-  });
-  ldout(ctx(), 4) << "frontend paused" << dendl;
+  ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
+
+  // cancel pending calls to accept(), but don't close the sockets
+  boost::system::error_code ec;
+  for (auto& l : listeners) {
+    l.acceptor.cancel(ec);
+  }
+
+  // pause and wait for outstanding requests to complete
+  pause_mutex.lock(ec);
+
+  if (ec) {
+    ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
+  } else {
+    ldout(ctx(), 4) << "frontend paused" << dendl;
+  }
 }
 
 void AsioFrontend::unpause(RGWRados* const store,
@@ -584,21 +701,34 @@ void AsioFrontend::unpause(RGWRados* const store,
 {
   env.store = store;
   env.auth_registry = std::move(auth_registry);
+
+  // unpause to unblock connections
+  pause_mutex.unlock();
+
+  // start accepting connections again
+  for (auto& l : listeners) {
+    l.acceptor.async_accept(l.socket,
+                            [this, &l] (boost::system::error_code ec) {
+                              accept(l, ec);
+                            });
+  }
+
   ldout(ctx(), 4) << "frontend unpaused" << dendl;
-  service.reset();
-  pauser.unpause();
 }
 
 } // anonymous namespace
 
 class RGWAsioFrontend::Impl : public AsioFrontend {
  public:
-  Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf) : AsioFrontend(env, conf) {}
+  Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+       rgw::dmclock::SchedulerCtx& sched_ctx)
+    : AsioFrontend(env, conf, sched_ctx) {}
 };
 
 RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
-                                 RGWFrontendConfig* conf)
-  : impl(new Impl(env, conf))
+                                 RGWFrontendConfig* conf,
+                                rgw::dmclock::SchedulerCtx& sched_ctx)
+  : impl(new Impl(env, conf, sched_ctx))
 {
 }