]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_asio_frontend.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_asio_frontend.cc
index ee6be6256a5ba69b588c612a517b4d190eeee5db..8431be0b89557f6e03b538866de9b05e50e8cd16 100644 (file)
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
-#include <condition_variable>
-#include <mutex>
+#include <atomic>
 #include <thread>
 #include <vector>
 
 #include <boost/asio.hpp>
+#define BOOST_COROUTINES_NO_DEPRECATION_WARNING
 #include <boost/asio/spawn.hpp>
+#include <boost/intrusive/list.hpp>
 
-#include <beast/core/placeholders.hpp>
-#include <beast/http/read.hpp>
-#include <beast/http/string_body.hpp>
-#include <beast/http/write.hpp>
+#include "common/async/shared_mutex.h"
+#include "common/errno.h"
 
-#include "rgw_asio_frontend.h"
 #include "rgw_asio_client.h"
+#include "rgw_asio_frontend.h"
 
-#define dout_subsys ceph_subsys_rgw
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+#include <boost/asio/ssl.hpp>
+#endif
 
-#undef dout_prefix
-#define dout_prefix (*_dout << "asio: ")
+#include "rgw_dmclock_async_scheduler.h"
+
+#define dout_subsys ceph_subsys_rgw
 
 namespace {
 
-class Pauser {
-  std::mutex mutex;
-  std::condition_variable cond_ready; // signaled on ready==true
-  std::condition_variable cond_paused; // signaled on waiters==thread_count
-  bool ready{false};
-  int waiters{0};
+using tcp = boost::asio::ip::tcp;
+namespace http = boost::beast::http;
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+namespace ssl = boost::asio::ssl;
+#endif
+
+template <typename Stream>
+class StreamIO : public rgw::asio::ClientIO {
+  CephContext* const cct;
+  Stream& stream;
+  boost::beast::flat_buffer& buffer;
  public:
-  template <typename Func>
-  void pause(int thread_count, Func&& func);
-  void unpause();
-  void wait();
-};
-
-template <typename Func>
-void Pauser::pause(int thread_count, Func&& func)
-{
-  std::unique_lock<std::mutex> lock(mutex);
-  ready = false;
-  lock.unlock();
-
-  func();
+  StreamIO(CephContext *cct, Stream& stream, rgw::asio::parser_type& parser,
+           boost::beast::flat_buffer& buffer, bool is_ssl,
+           const tcp::endpoint& local_endpoint,
+           const tcp::endpoint& remote_endpoint)
+      : ClientIO(parser, is_ssl, local_endpoint, remote_endpoint),
+        cct(cct), stream(stream), buffer(buffer)
+  {}
+
+  size_t write_data(const char* buf, size_t len) override {
+    boost::system::error_code ec;
+    auto bytes = boost::asio::write(stream, boost::asio::buffer(buf, len), ec);
+    if (ec) {
+      ldout(cct, 4) << "write_data failed: " << ec.message() << dendl;
+      throw rgw::io::Exception(ec.value(), std::system_category());
+    }
+    return bytes;
+  }
 
-  // wait for all threads to pause
-  lock.lock();
-  cond_paused.wait(lock, [=] { return waiters == thread_count; });
-}
+  size_t recv_body(char* buf, size_t max) override {
+    auto& message = parser.get();
+    auto& body_remaining = message.body();
+    body_remaining.data = buf;
+    body_remaining.size = max;
+
+    while (body_remaining.size && !parser.is_done()) {
+      boost::system::error_code ec;
+      http::read_some(stream, buffer, parser, ec);
+      if (ec == http::error::partial_message ||
+          ec == http::error::need_buffer) {
+        break;
+      }
+      if (ec) {
+        ldout(cct, 4) << "failed to read body: " << ec.message() << dendl;
+        throw rgw::io::Exception(ec.value(), std::system_category());
+      }
+    }
+    return max - body_remaining.size;
+  }
+};
 
-void Pauser::unpause()
-{
-  std::lock_guard<std::mutex> lock(mutex);
-  ready = true;
-  cond_ready.notify_all();
-}
+using SharedMutex = ceph::async::SharedMutex<boost::asio::io_context::executor_type>;
 
-void Pauser::wait()
+template <typename Stream>
+void handle_connection(RGWProcessEnv& env, Stream& stream,
+                       boost::beast::flat_buffer& buffer, bool is_ssl,
+                       SharedMutex& pause_mutex,
+                       rgw::dmclock::Scheduler *scheduler,
+                       boost::system::error_code& ec,
+                       boost::asio::yield_context yield)
 {
-  std::unique_lock<std::mutex> lock(mutex);
-  ++waiters;
-  cond_paused.notify_one(); // notify pause() that we're waiting
-  cond_ready.wait(lock, [this] { return ready; }); // wait for unpause()
-  --waiters;
-}
-
-using tcp = boost::asio::ip::tcp;
+  // limit header to 4k, since we read it all into a single flat_buffer
+  static constexpr size_t header_limit = 4096;
+  // don't impose a limit on the body, since we read it in pieces
+  static constexpr size_t body_limit = std::numeric_limits<size_t>::max();
 
-// coroutine to handle a client connection to completion
-static void handle_connection(RGWProcessEnv& env, tcp::socket socket,
-                              boost::asio::yield_context yield)
-{
   auto cct = env.store->ctx();
-  boost::system::error_code ec;
 
-  beast::flat_streambuf buffer{1024};
-
-  // read messages from the socket until eof
+  // read messages from the stream until eof
   for (;;) {
-    // parse the header
+    // configure the parser
     rgw::asio::parser_type parser;
-    do {
-      auto bytes = beast::http::async_read_some(socket, buffer, parser, yield[ec]);
-      buffer.consume(bytes);
-    } while (!ec && !parser.got_header());
+    parser.header_limit(header_limit);
+    parser.body_limit(body_limit);
 
+    // parse the header
+    http::async_read_header(stream, buffer, parser, yield[ec]);
     if (ec == boost::asio::error::connection_reset ||
-        ec == boost::asio::error::eof) {
+        ec == boost::asio::error::bad_descriptor ||
+        ec == boost::asio::error::operation_aborted ||
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+        ec == ssl::error::stream_truncated ||
+#endif
+        ec == http::error::end_of_stream) {
+      ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
       return;
     }
     if (ec) {
+      ldout(cct, 1) << "failed to read header: " << ec.message() << dendl;
       auto& message = parser.get();
-      ldout(cct, 1) << "read failed: " << ec.message() << dendl;
+      http::response<http::empty_body> response;
+      response.result(http::status::bad_request);
+      response.version(message.version() == 10 ? 10 : 11);
+      response.prepare_payload();
+      http::async_write(stream, response, yield[ec]);
+      if (ec) {
+        ldout(cct, 5) << "failed to write response: " << ec.message() << dendl;
+      }
       ldout(cct, 1) << "====== req done http_status=400 ======" << dendl;
-      beast::http::response<beast::http::string_body> response;
-      response.status = 400;
-      response.reason = "Bad Request";
-      response.version = message.version == 10 ? 10 : 11;
-      beast::http::prepare(response);
-      beast::http::async_write(socket, std::move(response), yield[ec]);
-      // ignore ec
       return;
     }
 
-    // process the request
-    RGWRequest req{env.store->get_new_req_id()};
+    {
+      auto lock = pause_mutex.async_lock_shared(yield[ec]);
+      if (ec == boost::asio::error::operation_aborted) {
+        return;
+      } else if (ec) {
+        ldout(cct, 1) << "failed to lock: " << ec.message() << dendl;
+        return;
+      }
 
-    rgw::asio::ClientIO real_client{socket, parser, buffer};
+      // process the request
+      RGWRequest req{env.store->get_new_req_id()};
+
+      auto& socket = stream.lowest_layer();
+      const auto& remote_endpoint = socket.remote_endpoint(ec);
+      if (ec) {
+        ldout(cct, 1) << "failed to connect client: " << ec.message() << dendl;
+        return;
+      }
 
-    auto real_client_io = rgw::io::add_reordering(
-                            rgw::io::add_buffering(cct,
-                              rgw::io::add_chunking(
-                                rgw::io::add_conlen_controlling(
-                                  &real_client))));
-    RGWRestfulIO client(cct, &real_client_io);
-    process_request(env.store, env.rest, &req, env.uri_prefix,
-                    *env.auth_registry, &client, env.olog);
+      StreamIO real_client{cct, stream, parser, buffer, is_ssl,
+                           socket.local_endpoint(),
+                           remote_endpoint};
+
+      auto real_client_io = rgw::io::add_reordering(
+                              rgw::io::add_buffering(cct,
+                                rgw::io::add_chunking(
+                                  rgw::io::add_conlen_controlling(
+                                    &real_client))));
+      RGWRestfulIO client(cct, &real_client_io);
+      auto y = optional_yield{socket.get_io_context(), yield};
+      process_request(env.store, env.rest, &req, env.uri_prefix,
+                      *env.auth_registry, &client, env.olog, y, scheduler);
+    }
 
-    if (real_client.get_conn_close()) {
+    if (!parser.keep_alive()) {
       return;
     }
+
+    // if we failed before reading the entire message, discard any remaining
+    // bytes before reading the next
+    while (!parser.is_done()) {
+      static std::array<char, 1024> discard_buffer;
+
+      auto& body = parser.get().body();
+      body.size = discard_buffer.size();
+      body.data = discard_buffer.data();
+
+      http::async_read_some(stream, buffer, parser, yield[ec]);
+      if (ec == boost::asio::error::connection_reset) {
+        return;
+      }
+      if (ec) {
+        ldout(cct, 5) << "failed to discard unread message: "
+            << ec.message() << dendl;
+        return;
+      }
+    }
   }
 }
 
+struct Connection : boost::intrusive::list_base_hook<> {
+  tcp::socket& socket;
+  Connection(tcp::socket& socket) : socket(socket) {}
+};
+
+class ConnectionList {
+  using List = boost::intrusive::list<Connection>;
+  List connections;
+  std::mutex mutex;
+
+  void remove(Connection& c) {
+    std::lock_guard lock{mutex};
+    if (c.is_linked()) {
+      connections.erase(List::s_iterator_to(c));
+    }
+  }
+ public:
+  class Guard {
+    ConnectionList *list;
+    Connection *conn;
+   public:
+    Guard(ConnectionList *list, Connection *conn) : list(list), conn(conn) {}
+    ~Guard() { list->remove(*conn); }
+  };
+  [[nodiscard]] Guard add(Connection& conn) {
+    std::lock_guard lock{mutex};
+    connections.push_back(conn);
+    return Guard{this, &conn};
+  }
+  void close(boost::system::error_code& ec) {
+    std::lock_guard lock{mutex};
+    for (auto& conn : connections) {
+      conn.socket.close(ec);
+    }
+    connections.clear();
+  }
+};
+
+namespace dmc = rgw::dmclock;
 class AsioFrontend {
   RGWProcessEnv env;
-  boost::asio::io_service service;
-
-  tcp::acceptor acceptor;
-  tcp::socket peer_socket;
+  RGWFrontendConfig* conf;
+  boost::asio::io_context context;
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+  boost::optional<ssl::context> ssl_context;
+  int init_ssl();
+#endif
+  SharedMutex pause_mutex;
+  std::unique_ptr<rgw::dmclock::Scheduler> scheduler;
+
+  struct Listener {
+    tcp::endpoint endpoint;
+    tcp::acceptor acceptor;
+    tcp::socket socket;
+    bool use_ssl = false;
+    bool use_nodelay = false;
+
+    explicit Listener(boost::asio::io_context& context)
+      : acceptor(context), socket(context) {}
+  };
+  std::vector<Listener> listeners;
+
+  ConnectionList connections;
+
+  // work guard to keep run() threads busy while listeners are paused
+  using Executor = boost::asio::io_context::executor_type;
+  std::optional<boost::asio::executor_work_guard<Executor>> work;
 
   std::vector<std::thread> threads;
-  Pauser pauser;
   std::atomic<bool> going_down{false};
 
   CephContext* ctx() const { return env.store->ctx(); }
-
-  void accept(boost::system::error_code ec);
+  std::optional<dmc::ClientCounters> client_counters;
+  std::unique_ptr<dmc::ClientConfig> client_config;
+  void accept(Listener& listener, boost::system::error_code ec);
 
  public:
-  AsioFrontend(const RGWProcessEnv& env)
-    : env(env), acceptor(service), peer_socket(service) {}
+  AsioFrontend(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+              dmc::SchedulerCtx& sched_ctx)
+    : env(env), conf(conf), pause_mutex(context.get_executor())
+  {
+    auto sched_t = dmc::get_scheduler_t(ctx());
+    switch(sched_t){
+    case dmc::scheduler_t::dmclock:
+      scheduler.reset(new dmc::AsyncScheduler(ctx(),
+                                              context,
+                                              std::ref(sched_ctx.get_dmc_client_counters()),
+                                              sched_ctx.get_dmc_client_config(),
+                                              *sched_ctx.get_dmc_client_config(),
+                                              dmc::AtLimit::Reject));
+      break;
+    case dmc::scheduler_t::none:
+      lderr(ctx()) << "Got invalid scheduler type for beast, defaulting to throttler" << dendl;
+      [[fallthrough]];
+    case dmc::scheduler_t::throttler:
+      scheduler.reset(new dmc::SimpleThrottler(ctx()));
+
+    }
+  }
 
   int init();
   int run();
@@ -152,51 +297,330 @@ class AsioFrontend {
   void unpause(RGWRados* store, rgw_auth_registry_ptr_t);
 };
 
+unsigned short parse_port(const char *input, boost::system::error_code& ec)
+{
+  char *end = nullptr;
+  auto port = std::strtoul(input, &end, 10);
+  if (port > std::numeric_limits<unsigned short>::max()) {
+    ec.assign(ERANGE, boost::system::system_category());
+  } else if (port == 0 && end == input) {
+    ec.assign(EINVAL, boost::system::system_category());
+  }
+  return port;
+}
+       
+tcp::endpoint parse_endpoint(boost::asio::string_view input,
+                             unsigned short default_port,
+                             boost::system::error_code& ec)
+{
+  tcp::endpoint endpoint;
+
+  if (input.empty()) {
+    ec = boost::asio::error::invalid_argument;
+    return endpoint;
+  }
+
+  if (input[0] == '[') { // ipv6
+    const size_t addr_begin = 1;
+    const size_t addr_end = input.find(']');
+    if (addr_end == input.npos) { // no matching ]
+      ec = boost::asio::error::invalid_argument;
+      return endpoint;
+    }
+    if (addr_end + 1 < input.size()) {
+      // :port must must follow [ipv6]
+      if (input[addr_end + 1] != ':') {
+        ec = boost::asio::error::invalid_argument;
+        return endpoint;
+      } else {
+        auto port_str = input.substr(addr_end + 2);
+        endpoint.port(parse_port(port_str.data(), ec));
+      }
+    } else {
+      endpoint.port(default_port);
+    }
+    auto addr = input.substr(addr_begin, addr_end - addr_begin);
+    endpoint.address(boost::asio::ip::make_address_v6(addr, ec));
+  } else { // ipv4
+    auto colon = input.find(':');
+    if (colon != input.npos) {
+      auto port_str = input.substr(colon + 1);
+      endpoint.port(parse_port(port_str.data(), ec));
+      if (ec) {
+        return endpoint;
+      }
+    } else {
+      endpoint.port(default_port);
+    }
+    auto addr = input.substr(0, colon);
+    endpoint.address(boost::asio::ip::make_address_v4(addr, ec));
+  }
+  return endpoint;
+}
+
+static int drop_privileges(CephContext *ctx)
+{
+  uid_t uid = ctx->get_set_uid();
+  gid_t gid = ctx->get_set_gid();
+  std::string uid_string = ctx->get_set_uid_string();
+  std::string gid_string = ctx->get_set_gid_string();
+  if (gid && setgid(gid) != 0) {
+    int err = errno;
+    ldout(ctx, -1) << "unable to setgid " << gid << ": " << cpp_strerror(err) << dendl;
+    return -err;
+  }
+  if (uid && setuid(uid) != 0) {
+    int err = errno;
+    ldout(ctx, -1) << "unable to setuid " << uid << ": " << cpp_strerror(err) << dendl;
+    return -err;
+  }
+  if (uid && gid) {
+    ldout(ctx, 0) << "set uid:gid to " << uid << ":" << gid
+                  << " (" << uid_string << ":" << gid_string << ")" << dendl;
+  }
+  return 0;
+}
+
 int AsioFrontend::init()
 {
-  auto ep = tcp::endpoint{tcp::v4(), static_cast<unsigned short>(env.port)};
-  ldout(ctx(), 4) << "frontend listening on " << ep << dendl;
+  boost::system::error_code ec;
+  auto& config = conf->get_config_map();
+
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+  int r = init_ssl();
+  if (r < 0) {
+    return r;
+  }
+#endif
+
+  // parse endpoints
+  auto ports = config.equal_range("port");
+  for (auto i = ports.first; i != ports.second; ++i) {
+    auto port = parse_port(i->second.c_str(), ec);
+    if (ec) {
+      lderr(ctx()) << "failed to parse port=" << i->second << dendl;
+      return -ec.value();
+    }
+    listeners.emplace_back(context);
+    listeners.back().endpoint.port(port);
+
+    listeners.emplace_back(context);
+    listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
+  }
 
+  auto endpoints = config.equal_range("endpoint");
+  for (auto i = endpoints.first; i != endpoints.second; ++i) {
+    auto endpoint = parse_endpoint(i->second, 80, ec);
+    if (ec) {
+      lderr(ctx()) << "failed to parse endpoint=" << i->second << dendl;
+      return -ec.value();
+    }
+    listeners.emplace_back(context);
+    listeners.back().endpoint = endpoint;
+  }
+  // parse tcp nodelay
+  auto nodelay = config.find("tcp_nodelay");
+  if (nodelay != config.end()) {
+    for (auto& l : listeners) {
+      l.use_nodelay = (nodelay->second == "1");
+    }
+  }
+  
+
+  bool socket_bound = false;
+  // start listeners
+  for (auto& l : listeners) {
+    l.acceptor.open(l.endpoint.protocol(), ec);
+    if (ec) {
+      if (ec == boost::asio::error::address_family_not_supported) {
+       ldout(ctx(), 0) << "WARNING: cannot open socket for endpoint=" << l.endpoint
+                       << ", " << ec.message() << dendl;
+       continue;
+      }
+
+      lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
+      return -ec.value();
+    }
+
+    if (l.endpoint.protocol() == tcp::v6()) {
+      l.acceptor.set_option(boost::asio::ip::v6_only(true), ec);
+      if (ec) {
+        lderr(ctx()) << "failed to set v6_only socket option: "
+                    << ec.message() << dendl;
+       return -ec.value();
+      }
+    }
+
+    l.acceptor.set_option(tcp::acceptor::reuse_address(true));
+    l.acceptor.bind(l.endpoint, ec);
+    if (ec) {
+      lderr(ctx()) << "failed to bind address " << l.endpoint
+          << ": " << ec.message() << dendl;
+      return -ec.value();
+    }
+
+    l.acceptor.listen(boost::asio::socket_base::max_connections);
+    l.acceptor.async_accept(l.socket,
+                            [this, &l] (boost::system::error_code ec) {
+                              accept(l, ec);
+                            });
+
+    ldout(ctx(), 4) << "frontend listening on " << l.endpoint << dendl;
+    socket_bound = true;
+  }
+  if (!socket_bound) {
+    lderr(ctx()) << "Unable to listen at any endpoints" << dendl;
+    return -EINVAL;
+  }
+
+  return drop_privileges(ctx());
+}
+
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+int AsioFrontend::init_ssl()
+{
   boost::system::error_code ec;
-  acceptor.open(ep.protocol(), ec);
-  if (ec) {
-    lderr(ctx()) << "failed to open socket: " << ec.message() << dendl;
-    return -ec.value();
+  auto& config = conf->get_config_map();
+
+  // ssl configuration
+  auto cert = config.find("ssl_certificate");
+  const bool have_cert = cert != config.end();
+  if (have_cert) {
+    // only initialize the ssl context if it's going to be used
+    ssl_context = boost::in_place(ssl::context::tls);
+  }
+
+  auto key = config.find("ssl_private_key");
+  const bool have_private_key = key != config.end();
+  if (have_private_key) {
+    if (!have_cert) {
+      lderr(ctx()) << "no ssl_certificate configured for ssl_private_key" << dendl;
+      return -EINVAL;
+    }
+    ssl_context->use_private_key_file(key->second, ssl::context::pem, ec);
+    if (ec) {
+      lderr(ctx()) << "failed to add ssl_private_key=" << key->second
+          << ": " << ec.message() << dendl;
+      return -ec.value();
+    }
+  }
+  if (have_cert) {
+    ssl_context->use_certificate_chain_file(cert->second, ec);
+    if (ec) {
+      lderr(ctx()) << "failed to use ssl_certificate=" << cert->second
+          << ": " << ec.message() << dendl;
+      return -ec.value();
+    }
+    if (!have_private_key) {
+      // attempt to use it as a private key if a separate one wasn't provided
+      ssl_context->use_private_key_file(cert->second, ssl::context::pem, ec);
+      if (ec) {
+        lderr(ctx()) << "failed to use ssl_certificate=" << cert->second
+            << " as a private key: " << ec.message() << dendl;
+        return -ec.value();
+      }
+    }
+  }
+
+  // parse ssl endpoints
+  auto ports = config.equal_range("ssl_port");
+  for (auto i = ports.first; i != ports.second; ++i) {
+    if (!have_cert) {
+      lderr(ctx()) << "no ssl_certificate configured for ssl_port" << dendl;
+      return -EINVAL;
+    }
+    auto port = parse_port(i->second.c_str(), ec);
+    if (ec) {
+      lderr(ctx()) << "failed to parse ssl_port=" << i->second << dendl;
+      return -ec.value();
+    }
+    listeners.emplace_back(context);
+    listeners.back().endpoint.port(port);
+    listeners.back().use_ssl = true;
+
+    listeners.emplace_back(context);
+    listeners.back().endpoint = tcp::endpoint(tcp::v6(), port);
+    listeners.back().use_ssl = true;
+  }
+
+  auto endpoints = config.equal_range("ssl_endpoint");
+  for (auto i = endpoints.first; i != endpoints.second; ++i) {
+    if (!have_cert) {
+      lderr(ctx()) << "no ssl_certificate configured for ssl_endpoint" << dendl;
+      return -EINVAL;
+    }
+    auto endpoint = parse_endpoint(i->second, 443, ec);
+    if (ec) {
+      lderr(ctx()) << "failed to parse ssl_endpoint=" << i->second << dendl;
+      return -ec.value();
+    }
+    listeners.emplace_back(context);
+    listeners.back().endpoint = endpoint;
+    listeners.back().use_ssl = true;
   }
-  acceptor.set_option(tcp::acceptor::reuse_address(true));
-  acceptor.bind(ep, ec);
-  if (ec) {
-    lderr(ctx()) << "failed to bind address " << ep <<
-        ": " << ec.message() << dendl;
-    return -ec.value();
-  }
-  acceptor.listen(boost::asio::socket_base::max_connections);
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
   return 0;
 }
+#endif // WITH_RADOSGW_BEAST_OPENSSL
 
-void AsioFrontend::accept(boost::system::error_code ec)
+void AsioFrontend::accept(Listener& l, boost::system::error_code ec)
 {
-  if (!acceptor.is_open()) {
+  if (!l.acceptor.is_open()) {
     return;
   } else if (ec == boost::asio::error::operation_aborted) {
     return;
   } else if (ec) {
     throw ec;
   }
-  auto socket = std::move(peer_socket);
+  auto socket = std::move(l.socket);
+  tcp::no_delay options(l.use_nodelay);
+  socket.set_option(options,ec);
+  l.acceptor.async_accept(l.socket,
+                          [this, &l] (boost::system::error_code ec) {
+                            accept(l, ec);
+                          });
+
   // spawn a coroutine to handle the connection
-  boost::asio::spawn(service,
-                     [&] (boost::asio::yield_context yield) {
-                       handle_connection(env, std::move(socket), yield);
-                     });
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
+#ifdef WITH_RADOSGW_BEAST_OPENSSL
+  if (l.use_ssl) {
+    boost::asio::spawn(context,
+      [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+        Connection conn{s};
+        auto c = connections.add(conn);
+        // wrap the socket in an ssl stream
+        ssl::stream<tcp::socket&> stream{s, *ssl_context};
+        boost::beast::flat_buffer buffer;
+        // do ssl handshake
+        boost::system::error_code ec;
+        auto bytes = stream.async_handshake(ssl::stream_base::server,
+                                            buffer.data(), yield[ec]);
+        if (ec) {
+          ldout(ctx(), 1) << "ssl handshake failed: " << ec.message() << dendl;
+          return;
+        }
+        buffer.consume(bytes);
+        handle_connection(env, stream, buffer, true, pause_mutex,
+                          scheduler.get(), ec, yield);
+        if (!ec) {
+          // ssl shutdown (ignoring errors)
+          stream.async_shutdown(yield[ec]);
+        }
+        s.shutdown(tcp::socket::shutdown_both, ec);
+      });
+  } else {
+#else
+  {
+#endif // WITH_RADOSGW_BEAST_OPENSSL
+    boost::asio::spawn(context,
+      [this, s=std::move(socket)] (boost::asio::yield_context yield) mutable {
+        Connection conn{s};
+        auto c = connections.add(conn);
+        boost::beast::flat_buffer buffer;
+        boost::system::error_code ec;
+        handle_connection(env, s, buffer, false, pause_mutex,
+                          scheduler.get(), ec, yield);
+        s.shutdown(tcp::socket::shutdown_both, ec);
+      });
+  }
 }
 
 int AsioFrontend::run()
@@ -207,15 +631,16 @@ int AsioFrontend::run()
 
   ldout(cct, 4) << "frontend spawning " << thread_count << " threads" << dendl;
 
+  // the worker threads call io_context::run(), which will return when there's
+  // no work left. hold a work guard to keep these threads going until join()
+  work.emplace(boost::asio::make_work_guard(context));
+
   for (int i = 0; i < thread_count; i++) {
     threads.emplace_back([=] {
-      for (;;) {
-        service.run();
-        if (going_down) {
-          break;
-        }
-        pauser.wait();
-      }
+      // request warnings on synchronous librados calls in this thread
+      is_asio_thread = true;
+      boost::system::error_code ec;
+      context.run(ec);
     });
   }
   return 0;
@@ -228,7 +653,13 @@ void AsioFrontend::stop()
   going_down = true;
 
   boost::system::error_code ec;
-  acceptor.close(ec); // unblock the run() threads
+  // close all listeners
+  for (auto& listener : listeners) {
+    listener.acceptor.close(ec);
+  }
+  // close all connections
+  connections.close(ec);
+  pause_mutex.cancel();
 }
 
 void AsioFrontend::join()
@@ -236,6 +667,8 @@ void AsioFrontend::join()
   if (!going_down) {
     stop();
   }
+  work.reset();
+
   ldout(ctx(), 4) << "frontend joining threads..." << dendl;
   for (auto& thread : threads) {
     thread.join();
@@ -245,13 +678,22 @@ void AsioFrontend::join()
 
 void AsioFrontend::pause()
 {
-  ldout(ctx(), 4) << "frontend pausing threads..." << dendl;
-  pauser.pause(threads.size(), [=] {
-    // stop accepting but leave the port open
-    boost::system::error_code ec;
-    acceptor.cancel(ec);
-  });
-  ldout(ctx(), 4) << "frontend paused" << dendl;
+  ldout(ctx(), 4) << "frontend pausing connections..." << dendl;
+
+  // cancel pending calls to accept(), but don't close the sockets
+  boost::system::error_code ec;
+  for (auto& l : listeners) {
+    l.acceptor.cancel(ec);
+  }
+
+  // pause and wait for outstanding requests to complete
+  pause_mutex.lock(ec);
+
+  if (ec) {
+    ldout(ctx(), 1) << "frontend failed to pause: " << ec.message() << dendl;
+  } else {
+    ldout(ctx(), 4) << "frontend paused" << dendl;
+  }
 }
 
 void AsioFrontend::unpause(RGWRados* const store,
@@ -259,24 +701,34 @@ void AsioFrontend::unpause(RGWRados* const store,
 {
   env.store = store;
   env.auth_registry = std::move(auth_registry);
+
+  // unpause to unblock connections
+  pause_mutex.unlock();
+
+  // start accepting connections again
+  for (auto& l : listeners) {
+    l.acceptor.async_accept(l.socket,
+                            [this, &l] (boost::system::error_code ec) {
+                              accept(l, ec);
+                            });
+  }
+
   ldout(ctx(), 4) << "frontend unpaused" << dendl;
-  service.reset();
-  acceptor.async_accept(peer_socket,
-                        [this] (boost::system::error_code ec) {
-                          return accept(ec);
-                        });
-  pauser.unpause();
 }
 
 } // anonymous namespace
 
 class RGWAsioFrontend::Impl : public AsioFrontend {
  public:
-  Impl(const RGWProcessEnv& env) : AsioFrontend(env) {}
+  Impl(const RGWProcessEnv& env, RGWFrontendConfig* conf,
+       rgw::dmclock::SchedulerCtx& sched_ctx)
+    : AsioFrontend(env, conf, sched_ctx) {}
 };
 
-RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env)
-  : impl(new Impl(env))
+RGWAsioFrontend::RGWAsioFrontend(const RGWProcessEnv& env,
+                                 RGWFrontendConfig* conf,
+                                rgw::dmclock::SchedulerCtx& sched_ctx)
+  : impl(new Impl(env, conf, sched_ctx))
 {
 }