]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_http_client.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
index 72c8a14f1db9d06c92884aa63b8b7038b8151168..bb14e03fd3336a5010d60e10eba0469193a5b930 100644 (file)
@@ -2,6 +2,7 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "include/compat.h"
+#include "common/errno.h"
 
 #include <boost/utility/string_ref.hpp>
 
@@ -12,6 +13,7 @@
 #include "rgw_common.h"
 #include "rgw_http_client.h"
 #include "rgw_http_errors.h"
+#include "common/async/completion.h"
 #include "common/RefCountedObj.h"
 
 #include "rgw_coroutine.h"
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+RGWHTTPManager *rgw_http_manager;
+
+struct RGWCurlHandle;
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
+
 struct rgw_http_req_data : public RefCountedObject {
-  CURL *easy_handle;
-  curl_slist *h;
+  RGWCurlHandle *curl_handle{nullptr};
+  curl_slist *h{nullptr};
   uint64_t id;
-  int ret;
+  int ret{0};
   std::atomic<bool> done = { false };
-  RGWHTTPClient *client;
-  void *user_info;
-  bool registered;
-  RGWHTTPManager *mgr;
+  RGWHTTPClient *client{nullptr};
+  rgw_io_id control_io_id;
+  void *user_info{nullptr};
+  bool registered{false};
+  RGWHTTPManager *mgr{nullptr};
   char error_buf[CURL_ERROR_SIZE];
+  bool write_paused{false};
+  bool read_paused{false};
 
   Mutex lock;
   Cond cond;
 
-  rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
-                        client(nullptr), user_info(nullptr), registered(false),
-                        mgr(NULL), lock("rgw_http_req_data::lock") {
+  using Signature = void(boost::system::error_code);
+  using Completion = ceph::async::Completion<Signature>;
+  std::unique_ptr<Completion> completion;
+
+  rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
+    // FIPS zeroization audit 20191115: this memset is not security related.
     memset(error_buf, 0, sizeof(error_buf));
   }
 
-  int wait() {
+  template <typename ExecutionContext, typename CompletionToken>
+  auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+    boost::asio::async_completion<CompletionToken, Signature> init(token);
+    auto& handler = init.completion_handler;
+    {
+      std::unique_lock l{lock};
+      completion = Completion::create(ctx.get_executor(), std::move(handler));
+    }
+    return init.result.get();
+  }
+  int wait(optional_yield y) {
+    if (done) {
+      return ret;
+    }
+#ifdef HAVE_BOOST_CONTEXT
+    if (y) {
+      auto& context = y.get_io_context();
+      auto& yield = y.get_yield_context();
+      boost::system::error_code ec;
+      async_wait(context, yield[ec]);
+      return -ec.value();
+    }
+#endif
     Mutex::Locker l(lock);
     cond.Wait(lock);
     return ret;
   }
 
+  void set_state(int bitmask);
 
   void finish(int r) {
     Mutex::Locker l(lock);
     ret = r;
-    if (easy_handle)
-      curl_easy_cleanup(easy_handle);
+    if (curl_handle)
+      do_curl_easy_cleanup(curl_handle);
 
     if (h)
       curl_slist_free_all(h);
 
-    easy_handle = NULL;
+    curl_handle = NULL;
     h = NULL;
     done = true;
-    cond.Signal();
+    if (completion) {
+      boost::system::error_code ec(-ret, boost::system::system_category());
+      Completion::post(std::move(completion), ec);
+    } else {
+      cond.Signal();
+    }
   }
 
   bool is_done() {
@@ -77,6 +119,8 @@ struct rgw_http_req_data : public RefCountedObject {
     Mutex::Locker l(lock);
     return mgr;
   }
+
+  CURL *get_easy_handle() const;
 };
 
 struct RGWCurlHandle {
@@ -84,12 +128,22 @@ struct RGWCurlHandle {
   mono_time lastuse;
   CURL* h;
 
-  RGWCurlHandle(CURL* h) : uses(0), h(h) {};
+  explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
   CURL* operator*() {
     return this->h;
   }
 };
 
+void rgw_http_req_data::set_state(int bitmask) {
+  /* no need to lock here, moreover curl_easy_pause() might trigger
+   * the data receive callback :/
+   */
+  CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
+  if (rc != CURLE_OK) {
+    dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+  }
+}
+
 #define MAXIDLE 5
 class RGWCurlHandles : public Thread {
 public:
@@ -192,7 +246,23 @@ void RGWCurlHandles::flush_curl_handles()
   saved_curl.shrink_to_fit();
 }
 
+CURL *rgw_http_req_data::get_easy_handle() const
+{
+  return **curl_handle;
+}
+
 static RGWCurlHandles *handles;
+
+static RGWCurlHandle *do_curl_easy_init()
+{
+  return handles->get_curl_handle();
+}
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
+{
+  handles->release_curl_handle(curl_handle);
+}
+
 // XXX make this part of the token cache?  (but that's swift-only;
 //     and this especially needs to integrates with s3...)
 
@@ -208,55 +278,11 @@ void rgw_release_all_curl_handles()
   delete handles;
 }
 
-/*
- * the simple set of callbacks will be called on RGWHTTPClient::process()
- */
-/* Static methods - callbacks for libcurl. */
-size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
-                                                 const size_t size,
-                                                 const size_t nmemb,
-                                                 void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  const size_t len = size * nmemb;
-  int ret = client->receive_header(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->receive_header() returned ret="
-            << ret << dendl;
-  }
-
-  return len;
-}
-
-size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
-                                               const size_t size,
-                                               const size_t nmemb,
-                                               void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  const size_t len = size * nmemb;
-  int ret = client->receive_data(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->receive_data() returned ret="
-            << ret << dendl;
-  }
-
-  return len;
-}
-
-size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
-                                            const size_t size,
-                                            const size_t nmemb,
-                                            void * const _info)
+void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
 {
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  int ret = client->send_data(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->send_data() returned ret="
-            << ret << dendl;
+  if (id == 0) {
+    id = io_id_provider.get_next();
   }
-
-  return ret;
 }
 
 /*
@@ -293,17 +319,41 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr,
   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
   size_t len = size * nmemb;
 
-  Mutex::Locker l(req_data->lock);
-  
-  if (!req_data->registered) {
+  bool pause = false;
+
+  RGWHTTPClient *client;
+
+  {
+    Mutex::Locker l(req_data->lock);
+    if (!req_data->registered) {
+      return len;
+    }
+
+    client = req_data->client;
+  }
+
+  size_t& skip_bytes = client->receive_pause_skip;
+
+  if (skip_bytes >= len) {
+    skip_bytes -= len;
     return len;
   }
-  
-  int ret = req_data->client->receive_data(ptr, size * nmemb);
+
+  int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
+  if (pause) {
+    dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
+    skip_bytes = len;
+    Mutex::Locker l(req_data->lock);
+    req_data->read_paused = true;
+    return CURL_WRITEFUNC_PAUSE;
+  }
+
+  skip_bytes = 0;
+
   return len;
 }
 
@@ -314,20 +364,70 @@ size_t RGWHTTPClient::send_http_data(void * const ptr,
 {
   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
 
-  Mutex::Locker l(req_data->lock);
+  RGWHTTPClient *client;
+
+  {
+    Mutex::Locker l(req_data->lock);
   
-  if (!req_data->registered) {
-    return 0;
+    if (!req_data->registered) {
+      return 0;
+    }
+
+    client = req_data->client;
   }
 
-  int ret = req_data->client->send_data(ptr, size * nmemb);
+  bool pause = false;
+
+  int ret = client->send_data(ptr, size * nmemb, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
+  if (ret == 0 &&
+      pause) {
+    Mutex::Locker l(req_data->lock);
+    req_data->write_paused = true;
+    return CURL_READFUNC_PAUSE;
+  }
+
   return ret;
 }
 
+Mutex& RGWHTTPClient::get_req_lock()
+{
+  return req_data->lock;
+}
+
+void RGWHTTPClient::_set_write_paused(bool pause)
+{
+  ceph_assert(req_data->lock.is_locked());
+  
+  RGWHTTPManager *mgr = req_data->mgr;
+  if (pause == req_data->write_paused) {
+    return;
+  }
+  if (pause) {
+    mgr->set_request_state(this, SET_WRITE_PAUSED);
+  } else {
+    mgr->set_request_state(this, SET_WRITE_RESUME);
+  }
+}
+
+void RGWHTTPClient::_set_read_paused(bool pause)
+{
+  ceph_assert(req_data->lock.is_locked());
+  
+  RGWHTTPManager *mgr = req_data->mgr;
+  if (pause == req_data->read_paused) {
+    return;
+  }
+  if (pause) {
+    mgr->set_request_state(this, SET_READ_PAUSED);
+  } else {
+    mgr->set_request_state(this, SET_READ_RESUME);
+  }
+}
+
 static curl_slist *headers_to_slist(param_vec_t& headers)
 {
   curl_slist *h = NULL;
@@ -349,86 +449,39 @@ static curl_slist *headers_to_slist(param_vec_t& headers)
         val[i] = '-';
       }
     }
+    
+    val = camelcase_dash_http_attr(val);
 
-    val.append(": ");
-    val.append(p.second);
+    // curl won't send headers with empty values unless it ends with a ; instead
+    if (p.second.empty()) {
+      val.append(1, ';');
+    } else {
+      val.append(": ");
+      val.append(p.second);
+    }
     h = curl_slist_append(h, val.c_str());
   }
 
   return h;
 }
 
-static bool is_upload_request(const char *method)
+static bool is_upload_request(const string& method)
 {
-  if (method == nullptr) {
-    return false;
-  }
-  return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
+  return method == "POST" || method == "PUT";
 }
 
 /*
- * process a single simple one off request, not going through RGWHTTPManager. Not using
- * req_data.
+ * process a single simple one off request
  */
-int RGWHTTPClient::process(const char *method, const char *url)
+int RGWHTTPClient::process(optional_yield y)
 {
-  int ret = 0;
-  CURL *curl_handle;
-
-  char error_buf[CURL_ERROR_SIZE];
-
-  last_method = (method ? method : "");
-  last_url = (url ? url : "");
-
-  auto ca = handles->get_curl_handle();
-  curl_handle = **ca;
-
-  dout(20) << "sending request to " << url << dendl;
-
-  curl_slist *h = headers_to_slist(headers);
-
-  curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
-  curl_easy_setopt(curl_handle, CURLOPT_URL, url);
-  curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
-  if (h) {
-    curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
-  }
-  curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
-  curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
-  if (is_upload_request(method)) {
-    curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
-  }
-  if (has_send_len) {
-    curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); 
-  }
-  if (!verify_ssl) {
-    curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
-    curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
-    dout(20) << "ssl verification is set to off" << dendl;
-  }
-
-  CURLcode status = curl_easy_perform(curl_handle);
-  if (status) {
-    dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
-    ret = -EINVAL;
-  }
-  curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
-  handles->release_curl_handle(ca);
-  curl_slist_free_all(h);
-
-  return ret;
+  return RGWHTTP::process(this, y);
 }
 
 string RGWHTTPClient::to_str()
 {
-  string method_str = (last_method.empty() ? "<no-method>" : last_method);
-  string url_str = (last_url.empty() ? "<no-url>" : last_url);
+  string method_str = (method.empty() ? "<no-method>" : method);
+  string url_str = (url.empty() ? "<no-url>" : url);
   return method_str + " " + url_str;
 }
 
@@ -444,17 +497,15 @@ int RGWHTTPClient::get_req_retcode()
 /*
  * init request, will be used later with RGWHTTPManager
  */
-int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
+int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
 {
-  assert(!req_data);
+  ceph_assert(!req_data);
   _req_data->get();
   req_data = _req_data;
 
-  CURL *easy_handle;
+  req_data->curl_handle = do_curl_easy_init();
 
-  easy_handle = curl_easy_init();
-
-  req_data->easy_handle = easy_handle;
+  CURL *easy_handle = req_data->get_easy_handle();
 
   dout(20) << "sending request to " << url << dendl;
 
@@ -462,11 +513,8 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
 
   req_data->h = h;
 
-  last_method = (method ? method : "");
-  last_url = (url ? url : "");
-
-  curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
-  curl_easy_setopt(easy_handle, CURLOPT_URL, url);
+  curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
+  curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
   curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
   curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
   curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
@@ -474,6 +522,8 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
   curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
   curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
   curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
+  curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
+  curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
   if (h) {
     curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
   }
@@ -495,26 +545,33 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
   return 0;
 }
 
+bool RGWHTTPClient::is_done()
+{
+  return req_data->is_done();
+}
+
 /*
  * wait for async request to complete
  */
-int RGWHTTPClient::wait()
+int RGWHTTPClient::wait(optional_yield y)
 {
-  if (!req_data->is_done()) {
-    return req_data->wait();
-  }
-
-  return req_data->ret;
+  return req_data->wait(y);
 }
 
-RGWHTTPClient::~RGWHTTPClient()
+void RGWHTTPClient::cancel()
 {
   if (req_data) {
-    RGWHTTPManager *http_manager = req_data->get_manager();
+    RGWHTTPManager *http_manager = req_data->mgr;
     if (http_manager) {
       http_manager->remove_request(this);
     }
+  }
+}
 
+RGWHTTPClient::~RGWHTTPClient()
+{
+  cancel();
+  if (req_data) {
     req_data->put();
   }
 }
@@ -522,7 +579,7 @@ RGWHTTPClient::~RGWHTTPClient()
 
 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
 {
-  const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
+  const boost::string_ref header_line(static_cast<const char *>(ptr), len);
 
   /* We're tokening the line that way due to backward compatibility. */
   const size_t sep_loc = header_line.find_first_of(" \t:");
@@ -556,7 +613,7 @@ int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
   return 0;
 }
 
-int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
+int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
 {
   int length_to_copy = 0;
   if (post_data_index < post_data.length()) {
@@ -735,7 +792,7 @@ void *RGWHTTPManager::ReqsThread::entry()
  * RGWHTTPManager has two modes of operation: threaded and non-threaded.
  */
 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
-                                                    completion_mgr(_cm), is_threaded(false),
+                                                    completion_mgr(_cm), is_started(false),
                                                     reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
                                                     reqs_thread(NULL)
 {
@@ -757,16 +814,20 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
   req_data->registered = true;
   reqs[num_reqs] = req_data;
   num_reqs++;
-  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
 }
 
-void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
+bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
 {
   RWLock::WLocker rl(reqs_lock);
+  if (!req_data->registered) {
+    return false;
+  }
   req_data->get();
   req_data->registered = false;
   unregistered_reqs.push_back(req_data);
-  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+  return true;
 }
 
 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
@@ -786,7 +847,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
     req_data->mgr = nullptr;
   }
   if (completion_mgr) {
-    completion_mgr->complete(NULL, req_data->user_info);
+    completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
   }
 
   req_data->put();
@@ -804,13 +865,17 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
   _complete_request(req_data);
 }
 
+void RGWHTTPManager::_set_req_state(set_state& ss)
+{
+  ss.req->set_state(ss.bitmask);
+}
 /*
  * hook request to the curl multi handle
  */
 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
 {
-  ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
-  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+  ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   if (mstatus) {
     dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
     return -EIO;
@@ -824,8 +889,8 @@ int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
  */
 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
 {
-  if (req_data->easy_handle) {
-    curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+  if (req_data->curl_handle) {
+    curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   }
   if (!req_data->is_done()) {
     _finish_request(req_data, -ECANCELED);
@@ -841,7 +906,9 @@ void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
 void RGWHTTPManager::manage_pending_requests()
 {
   reqs_lock.get_read();
-  if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
+  if (max_threaded_req == num_reqs &&
+      unregistered_reqs.empty() &&
+      reqs_change_state.empty()) {
     reqs_lock.unlock();
     return;
   }
@@ -873,6 +940,13 @@ void RGWHTTPManager::manage_pending_requests()
     }
   }
 
+  if (!reqs_change_state.empty()) {
+    for (auto siter : reqs_change_state) {
+      _set_req_state(siter);
+    }
+    reqs_change_state.clear();
+  }
+
   for (auto piter : remove_reqs) {
     rgw_http_req_data *req_data = piter.first;
     int r = piter.second;
@@ -881,11 +955,11 @@ void RGWHTTPManager::manage_pending_requests()
   }
 }
 
-int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
+int RGWHTTPManager::add_request(RGWHTTPClient *client)
 {
   rgw_http_req_data *req_data = new rgw_http_req_data;
 
-  int ret = client->init_request(method, url, req_data, send_data_hint);
+  int ret = client->init_request(req_data);
   if (ret < 0) {
     req_data->put();
     req_data = NULL;
@@ -894,11 +968,12 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const
 
   req_data->mgr = this;
   req_data->client = client;
-  req_data->user_info = client->get_user_info();
+  req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
+  req_data->user_info = client->get_io_user_info();
 
   register_request(req_data);
 
-  if (!is_threaded) {
+  if (!is_started) {
     ret = link_request(req_data);
     if (ret < 0) {
       req_data->put();
@@ -918,11 +993,13 @@ int RGWHTTPManager::remove_request(RGWHTTPClient *client)
 {
   rgw_http_req_data *req_data = client->get_req_data();
 
-  if (!is_threaded) {
+  if (!is_started) {
     unlink_request(req_data);
     return 0;
   }
-  unregister_request(req_data);
+  if (!unregister_request(req_data)) {
+    return 0;
+  }
   int ret = signal_thread();
   if (ret < 0) {
     return ret;
@@ -931,94 +1008,79 @@ int RGWHTTPManager::remove_request(RGWHTTPClient *client)
   return 0;
 }
 
-/*
- * the synchronous, non-threaded request processing method.
- */
-int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
+int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
 {
-  assert(!is_threaded);
+  rgw_http_req_data *req_data = client->get_req_data();
 
-  int still_running;
-  int mstatus;
+  ceph_assert(req_data->lock.is_locked());
 
-  do {
-    if (wait_for_data) {
-      int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
-      if (ret < 0) {
-        return ret;
-      }
-    }
+  /* can only do that if threaded */
+  if (!is_started) {
+    return -EINVAL;
+  }
 
-    mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
-    switch (mstatus) {
-      case CURLM_OK:
-      case CURLM_CALL_MULTI_PERFORM:
-        break;
-      default:
-        dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
-        return -EINVAL;
-    }
-    int msgs_left;
-    CURLMsg *msg;
-    while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
-      if (msg->msg == CURLMSG_DONE) {
-       CURL *e = msg->easy_handle;
-       rgw_http_req_data *req_data;
-       curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
+  bool suggested_wr_paused = req_data->write_paused;
+  bool suggested_rd_paused = req_data->read_paused;
+
+  switch (state) {
+    case SET_WRITE_PAUSED:
+      suggested_wr_paused = true;
+      break;
+    case SET_WRITE_RESUME:
+      suggested_wr_paused = false;
+      break;
+    case SET_READ_PAUSED:
+      suggested_rd_paused = true;
+      break;
+    case SET_READ_RESUME:
+      suggested_rd_paused = false;
+      break;
+    default:
+      /* shouldn't really be here */
+      return -EIO;
+  }
+  if (suggested_wr_paused == req_data->write_paused &&
+      suggested_rd_paused == req_data->read_paused) {
+    return 0;
+  }
 
-       long http_status;
-       curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
+  req_data->write_paused = suggested_wr_paused;
+  req_data->read_paused = suggested_rd_paused;
 
-       int status = rgw_http_error_to_errno(http_status);
-       int result = msg->data.result;
-       finish_request(req_data, status);
-        switch (result) {
-          case CURLE_OK:
-            break;
-          default:
-            dout(20) << "ERROR: msg->data.result=" << result << dendl;
-            return -EIO;
-        }
-      }
-    }
-  } while (mstatus == CURLM_CALL_MULTI_PERFORM);
+  int bitmask = CURLPAUSE_CONT;
 
-  *done = (still_running == 0);
+  if (req_data->write_paused) {
+    bitmask |= CURLPAUSE_SEND;
+  }
 
-  return 0;
-}
+  if (req_data->read_paused) {
+    bitmask |= CURLPAUSE_RECV;
+  }
 
-/*
- * the synchronous, non-threaded request processing completion method.
- */
-int RGWHTTPManager::complete_requests()
-{
-  bool done = false;
-  int ret;
-  do {
-    ret = process_requests(true, &done);
-  } while (!done && !ret);
+  reqs_change_state.push_back(set_state(req_data, bitmask));
+  int ret = signal_thread();
+  if (ret < 0) {
+    return ret;
+  }
 
-  return ret;
+  return 0;
 }
 
-int RGWHTTPManager::set_threaded()
+int RGWHTTPManager::start()
 {
-  int r = pipe(thread_pipe);
-  if (r < 0) {
-    r = -errno;
-    ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
-    return r;
+  if (pipe_cloexec(thread_pipe) < 0) {
+    int e = errno;
+    ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
+    return -e;
   }
 
   // enable non-blocking reads
-  r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
-  if (r < 0) {
-    r = -errno;
-    ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
+  if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
+    int e = errno;
+    ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
     TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
     TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
-    return r;
+    return -e;
   }
 
 #ifdef HAVE_CURL_MULTI_WAIT
@@ -1029,7 +1091,7 @@ int RGWHTTPManager::set_threaded()
                  thread_pipe[1], thread_pipe[0]);
 #endif
 
-  is_threaded = true;
+  is_started = true;
   reqs_thread = new ReqsThread(this);
   reqs_thread->create("http_manager");
   return 0;
@@ -1043,7 +1105,7 @@ void RGWHTTPManager::stop()
 
   is_stopped = true;
 
-  if (is_threaded) {
+  if (is_started) {
     going_down = true;
     signal_thread();
     reqs_thread->join();
@@ -1104,7 +1166,8 @@ void *RGWHTTPManager::reqs_thread_entry()
        curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
 
        int status = rgw_http_error_to_errno(http_status);
-        if (result != CURLE_OK && http_status == 0) {
+        if (result != CURLE_OK && status == 0) {
+          dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
           status = -EAGAIN;
         }
         int id = req_data->id;
@@ -1112,8 +1175,12 @@ void *RGWHTTPManager::reqs_thread_entry()
         switch (result) {
           case CURLE_OK:
             break;
+          case CURLE_OPERATION_TIMEDOUT:
+            dout(0) << "WARNING: curl operation timed out, network average transfer speed less than " 
+              << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
           default:
             dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
+            dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
            break;
         }
       }
@@ -1123,14 +1190,14 @@ void *RGWHTTPManager::reqs_thread_entry()
 
   RWLock::WLocker rl(reqs_lock);
   for (auto r : unregistered_reqs) {
-    _finish_request(r, -ECANCELED);
+    _unlink_request(r);
   }
 
   unregistered_reqs.clear();
 
   auto all_reqs = std::move(reqs);
   for (auto iter : all_reqs) {
-    _finish_request(iter.second, -ECANCELED);
+    _unlink_request(iter.second);
   }
 
   reqs.clear();
@@ -1142,4 +1209,42 @@ void *RGWHTTPManager::reqs_thread_entry()
   return 0;
 }
 
+void rgw_http_client_init(CephContext *cct)
+{
+  curl_global_init(CURL_GLOBAL_ALL);
+  rgw_http_manager = new RGWHTTPManager(cct);
+  rgw_http_manager->start();
+}
+
+void rgw_http_client_cleanup()
+{
+  rgw_http_manager->stop();
+  delete rgw_http_manager;
+  curl_global_cleanup();
+}
+
+
+int RGWHTTP::send(RGWHTTPClient *req) {
+  if (!req) {
+    return 0;
+  }
+  int r = rgw_http_manager->add_request(req);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
+  if (!req) {
+    return 0;
+  }
+  int r = send(req);
+  if (r < 0) {
+    return r;
+  }
+
+  return req->wait(y);
+}