]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_http_client.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
index e11deb7847bf41a1f0953f400174ef729a474c48..50fe7915099faf64d85e16bd526050a942e0c095 100644 (file)
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
+RGWHTTPManager *rgw_http_manager;
+
+struct RGWCurlHandle;
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
+
 struct rgw_http_req_data : public RefCountedObject {
-  CURL *easy_handle;
-  curl_slist *h;
+  RGWCurlHandle *curl_handle{nullptr};
+  curl_slist *h{nullptr};
   uint64_t id;
-  int ret;
+  int ret{0};
   std::atomic<bool> done = { false };
-  RGWHTTPClient *client;
-  void *user_info;
-  bool registered;
-  RGWHTTPManager *mgr;
+  RGWHTTPClient *client{nullptr};
+  rgw_io_id control_io_id;
+  void *user_info{nullptr};
+  bool registered{false};
+  RGWHTTPManager *mgr{nullptr};
   char error_buf[CURL_ERROR_SIZE];
+  bool write_paused{false};
+  bool read_paused{false};
 
   Mutex lock;
   Cond cond;
 
-  rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
-                        client(nullptr), user_info(nullptr), registered(false),
-                        mgr(NULL), lock("rgw_http_req_data::lock") {
+  rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
     memset(error_buf, 0, sizeof(error_buf));
   }
 
   int wait() {
     Mutex::Locker l(lock);
+    if (done) {
+      return ret;
+    }
     cond.Wait(lock);
     return ret;
   }
 
+  void set_state(int bitmask);
 
   void finish(int r) {
     Mutex::Locker l(lock);
     ret = r;
-    if (easy_handle)
-      curl_easy_cleanup(easy_handle);
+    if (curl_handle)
+      do_curl_easy_cleanup(curl_handle);
 
     if (h)
       curl_slist_free_all(h);
 
-    easy_handle = NULL;
+    curl_handle = NULL;
     h = NULL;
     done = true;
     cond.Signal();
   }
 
+  bool _is_done() {
+    return done;
+  }
+
   bool is_done() {
+    Mutex::Locker l(lock);
     return done;
   }
 
@@ -78,6 +94,8 @@ struct rgw_http_req_data : public RefCountedObject {
     Mutex::Locker l(lock);
     return mgr;
   }
+
+  CURL *get_easy_handle() const;
 };
 
 struct RGWCurlHandle {
@@ -85,12 +103,22 @@ struct RGWCurlHandle {
   mono_time lastuse;
   CURL* h;
 
-  RGWCurlHandle(CURL* h) : uses(0), h(h) {};
+  explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
   CURL* operator*() {
     return this->h;
   }
 };
 
+void rgw_http_req_data::set_state(int bitmask) {
+  /* no need to lock here, moreover curl_easy_pause() might trigger
+   * the data receive callback :/
+   */
+  CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
+  if (rc != CURLE_OK) {
+    dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+  }
+}
+
 #define MAXIDLE 5
 class RGWCurlHandles : public Thread {
 public:
@@ -193,7 +221,23 @@ void RGWCurlHandles::flush_curl_handles()
   saved_curl.shrink_to_fit();
 }
 
+CURL *rgw_http_req_data::get_easy_handle() const
+{
+  return **curl_handle;
+}
+
 static RGWCurlHandles *handles;
+
+static RGWCurlHandle *do_curl_easy_init()
+{
+  return handles->get_curl_handle();
+}
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
+{
+  handles->release_curl_handle(curl_handle);
+}
+
 // XXX make this part of the token cache?  (but that's swift-only;
 //     and this especially needs to integrates with s3...)
 
@@ -209,55 +253,11 @@ void rgw_release_all_curl_handles()
   delete handles;
 }
 
-/*
- * the simple set of callbacks will be called on RGWHTTPClient::process()
- */
-/* Static methods - callbacks for libcurl. */
-size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
-                                                 const size_t size,
-                                                 const size_t nmemb,
-                                                 void * const _info)
+void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
 {
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  const size_t len = size * nmemb;
-  int ret = client->receive_header(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->receive_header() returned ret="
-            << ret << dendl;
+  if (id == 0) {
+    id = io_id_provider.get_next();
   }
-
-  return len;
-}
-
-size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
-                                               const size_t size,
-                                               const size_t nmemb,
-                                               void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  const size_t len = size * nmemb;
-  int ret = client->receive_data(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->receive_data() returned ret="
-            << ret << dendl;
-  }
-
-  return len;
-}
-
-size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
-                                            const size_t size,
-                                            const size_t nmemb,
-                                            void * const _info)
-{
-  RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
-  int ret = client->send_data(ptr, size * nmemb);
-  if (ret < 0) {
-    dout(0) << "WARNING: client->send_data() returned ret="
-            << ret << dendl;
-  }
-
-  return ret;
 }
 
 /*
@@ -294,17 +294,41 @@ size_t RGWHTTPClient::receive_http_data(void * const ptr,
   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
   size_t len = size * nmemb;
 
-  Mutex::Locker l(req_data->lock);
-  
-  if (!req_data->registered) {
+  bool pause = false;
+
+  RGWHTTPClient *client;
+
+  {
+    Mutex::Locker l(req_data->lock);
+    if (!req_data->registered) {
+      return len;
+    }
+
+    client = req_data->client;
+  }
+
+  size_t& skip_bytes = client->receive_pause_skip;
+
+  if (skip_bytes >= len) {
+    skip_bytes -= len;
     return len;
   }
-  
-  int ret = req_data->client->receive_data(ptr, size * nmemb);
+
+  int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
+  if (pause) {
+    dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
+    skip_bytes = len;
+    Mutex::Locker l(req_data->lock);
+    req_data->read_paused = true;
+    return CURL_WRITEFUNC_PAUSE;
+  }
+
+  skip_bytes = 0;
+
   return len;
 }
 
@@ -315,20 +339,70 @@ size_t RGWHTTPClient::send_http_data(void * const ptr,
 {
   rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
 
-  Mutex::Locker l(req_data->lock);
+  RGWHTTPClient *client;
+
+  {
+    Mutex::Locker l(req_data->lock);
   
-  if (!req_data->registered) {
-    return 0;
+    if (!req_data->registered) {
+      return 0;
+    }
+
+    client = req_data->client;
   }
 
-  int ret = req_data->client->send_data(ptr, size * nmemb);
+  bool pause = false;
+
+  int ret = client->send_data(ptr, size * nmemb, &pause);
   if (ret < 0) {
     dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
   }
 
+  if (ret == 0 &&
+      pause) {
+    Mutex::Locker l(req_data->lock);
+    req_data->write_paused = true;
+    return CURL_READFUNC_PAUSE;
+  }
+
   return ret;
 }
 
+Mutex& RGWHTTPClient::get_req_lock()
+{
+  return req_data->lock;
+}
+
+void RGWHTTPClient::_set_write_paused(bool pause)
+{
+  ceph_assert(req_data->lock.is_locked());
+  
+  RGWHTTPManager *mgr = req_data->mgr;
+  if (pause == req_data->write_paused) {
+    return;
+  }
+  if (pause) {
+    mgr->set_request_state(this, SET_WRITE_PAUSED);
+  } else {
+    mgr->set_request_state(this, SET_WRITE_RESUME);
+  }
+}
+
+void RGWHTTPClient::_set_read_paused(bool pause)
+{
+  ceph_assert(req_data->lock.is_locked());
+  
+  RGWHTTPManager *mgr = req_data->mgr;
+  if (pause == req_data->read_paused) {
+    return;
+  }
+  if (pause) {
+    mgr->set_request_state(this, SET_READ_PAUSED);
+  } else {
+    mgr->set_request_state(this, SET_READ_RESUME);
+  }
+}
+
 static curl_slist *headers_to_slist(param_vec_t& headers)
 {
   curl_slist *h = NULL;
@@ -350,6 +424,8 @@ static curl_slist *headers_to_slist(param_vec_t& headers)
         val[i] = '-';
       }
     }
+    
+    val = camelcase_dash_http_attr(val);
 
     // curl won't send headers with empty values unless it ends with a ; instead
     if (p.second.empty()) {
@@ -364,79 +440,23 @@ static curl_slist *headers_to_slist(param_vec_t& headers)
   return h;
 }
 
-static bool is_upload_request(const char *method)
+static bool is_upload_request(const string& method)
 {
-  if (method == nullptr) {
-    return false;
-  }
-  return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
+  return method == "POST" || method == "PUT";
 }
 
 /*
- * process a single simple one off request, not going through RGWHTTPManager. Not using
- * req_data.
+ * process a single simple one off request
  */
-int RGWHTTPClient::process(const char *method, const char *url)
+int RGWHTTPClient::process()
 {
-  int ret = 0;
-  CURL *curl_handle;
-
-  char error_buf[CURL_ERROR_SIZE];
-
-  last_method = (method ? method : "");
-  last_url = (url ? url : "");
-
-  auto ca = handles->get_curl_handle();
-  curl_handle = **ca;
-
-  dout(20) << "sending request to " << url << dendl;
-
-  curl_slist *h = headers_to_slist(headers);
-
-  curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
-  curl_easy_setopt(curl_handle, CURLOPT_URL, url);
-  curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
-  curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
-  curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
-  curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
-  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
-  curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
-  if (h) {
-    curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
-  }
-  curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
-  curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
-  if (is_upload_request(method)) {
-    curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
-  }
-  if (has_send_len) {
-    curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); 
-  }
-  if (!verify_ssl) {
-    curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
-    curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
-    dout(20) << "ssl verification is set to off" << dendl;
-  }
-
-  CURLcode status = curl_easy_perform(curl_handle);
-  if (status) {
-    dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
-    ret = -EINVAL;
-  }
-  curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
-  handles->release_curl_handle(ca);
-  curl_slist_free_all(h);
-
-  return ret;
+  return RGWHTTP::process(this);
 }
 
 string RGWHTTPClient::to_str()
 {
-  string method_str = (last_method.empty() ? "<no-method>" : last_method);
-  string url_str = (last_url.empty() ? "<no-url>" : last_url);
+  string method_str = (method.empty() ? "<no-method>" : method);
+  string url_str = (url.empty() ? "<no-url>" : url);
   return method_str + " " + url_str;
 }
 
@@ -452,17 +472,15 @@ int RGWHTTPClient::get_req_retcode()
 /*
  * init request, will be used later with RGWHTTPManager
  */
-int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
+int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
 {
-  assert(!req_data);
+  ceph_assert(!req_data);
   _req_data->get();
   req_data = _req_data;
 
-  CURL *easy_handle;
-
-  easy_handle = curl_easy_init();
+  req_data->curl_handle = do_curl_easy_init();
 
-  req_data->easy_handle = easy_handle;
+  CURL *easy_handle = req_data->get_easy_handle();
 
   dout(20) << "sending request to " << url << dendl;
 
@@ -470,11 +488,8 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
 
   req_data->h = h;
 
-  last_method = (method ? method : "");
-  last_url = (url ? url : "");
-
-  curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
-  curl_easy_setopt(easy_handle, CURLOPT_URL, url);
+  curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
+  curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
   curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
   curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
   curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
@@ -505,26 +520,33 @@ int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_re
   return 0;
 }
 
+bool RGWHTTPClient::is_done()
+{
+  return req_data->is_done();
+}
+
 /*
  * wait for async request to complete
  */
 int RGWHTTPClient::wait()
 {
-  if (!req_data->is_done()) {
-    return req_data->wait();
-  }
-
-  return req_data->ret;
+  return req_data->wait();
 }
 
-RGWHTTPClient::~RGWHTTPClient()
+void RGWHTTPClient::cancel()
 {
   if (req_data) {
-    RGWHTTPManager *http_manager = req_data->get_manager();
+    RGWHTTPManager *http_manager = req_data->mgr;
     if (http_manager) {
       http_manager->remove_request(this);
     }
+  }
+}
 
+RGWHTTPClient::~RGWHTTPClient()
+{
+  cancel();
+  if (req_data) {
     req_data->put();
   }
 }
@@ -532,7 +554,7 @@ RGWHTTPClient::~RGWHTTPClient()
 
 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
 {
-  const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
+  const boost::string_ref header_line(static_cast<const char *>(ptr), len);
 
   /* We're tokening the line that way due to backward compatibility. */
   const size_t sep_loc = header_line.find_first_of(" \t:");
@@ -566,7 +588,7 @@ int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
   return 0;
 }
 
-int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
+int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
 {
   int length_to_copy = 0;
   if (post_data_index < post_data.length()) {
@@ -745,7 +767,7 @@ void *RGWHTTPManager::ReqsThread::entry()
  * RGWHTTPManager has two modes of operation: threaded and non-threaded.
  */
 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
-                                                    completion_mgr(_cm), is_threaded(false),
+                                                    completion_mgr(_cm), is_started(false),
                                                     reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
                                                     reqs_thread(NULL)
 {
@@ -767,16 +789,20 @@ void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
   req_data->registered = true;
   reqs[num_reqs] = req_data;
   num_reqs++;
-  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
 }
 
-void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
+bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
 {
   RWLock::WLocker rl(reqs_lock);
+  if (!req_data->registered) {
+    return false;
+  }
   req_data->get();
   req_data->registered = false;
   unregistered_reqs.push_back(req_data);
-  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+  ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+  return true;
 }
 
 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
@@ -796,7 +822,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
     req_data->mgr = nullptr;
   }
   if (completion_mgr) {
-    completion_mgr->complete(NULL, req_data->user_info);
+    completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
   }
 
   req_data->put();
@@ -814,13 +840,17 @@ void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
   _complete_request(req_data);
 }
 
+void RGWHTTPManager::_set_req_state(set_state& ss)
+{
+  ss.req->set_state(ss.bitmask);
+}
 /*
  * hook request to the curl multi handle
  */
 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
 {
-  ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
-  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+  ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+  CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   if (mstatus) {
     dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
     return -EIO;
@@ -834,10 +864,10 @@ int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
  */
 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
 {
-  if (req_data->easy_handle) {
-    curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+  if (req_data->curl_handle) {
+    curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
   }
-  if (!req_data->is_done()) {
+  if (!req_data->_is_done()) {
     _finish_request(req_data, -ECANCELED);
   }
 }
@@ -851,7 +881,9 @@ void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
 void RGWHTTPManager::manage_pending_requests()
 {
   reqs_lock.get_read();
-  if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
+  if (max_threaded_req == num_reqs &&
+      unregistered_reqs.empty() &&
+      reqs_change_state.empty()) {
     reqs_lock.unlock();
     return;
   }
@@ -883,6 +915,13 @@ void RGWHTTPManager::manage_pending_requests()
     }
   }
 
+  if (!reqs_change_state.empty()) {
+    for (auto siter : reqs_change_state) {
+      _set_req_state(siter);
+    }
+    reqs_change_state.clear();
+  }
+
   for (auto piter : remove_reqs) {
     rgw_http_req_data *req_data = piter.first;
     int r = piter.second;
@@ -891,11 +930,11 @@ void RGWHTTPManager::manage_pending_requests()
   }
 }
 
-int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
+int RGWHTTPManager::add_request(RGWHTTPClient *client)
 {
   rgw_http_req_data *req_data = new rgw_http_req_data;
 
-  int ret = client->init_request(method, url, req_data, send_data_hint);
+  int ret = client->init_request(req_data);
   if (ret < 0) {
     req_data->put();
     req_data = NULL;
@@ -904,11 +943,12 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const
 
   req_data->mgr = this;
   req_data->client = client;
-  req_data->user_info = client->get_user_info();
+  req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
+  req_data->user_info = client->get_io_user_info();
 
   register_request(req_data);
 
-  if (!is_threaded) {
+  if (!is_started) {
     ret = link_request(req_data);
     if (ret < 0) {
       req_data->put();
@@ -928,11 +968,13 @@ int RGWHTTPManager::remove_request(RGWHTTPClient *client)
 {
   rgw_http_req_data *req_data = client->get_req_data();
 
-  if (!is_threaded) {
+  if (!is_started) {
     unlink_request(req_data);
     return 0;
   }
-  unregister_request(req_data);
+  if (!unregister_request(req_data)) {
+    return 0;
+  }
   int ret = signal_thread();
   if (ret < 0) {
     return ret;
@@ -941,78 +983,65 @@ int RGWHTTPManager::remove_request(RGWHTTPClient *client)
   return 0;
 }
 
-/*
- * the synchronous, non-threaded request processing method.
- */
-int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
+int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
 {
-  assert(!is_threaded);
+  rgw_http_req_data *req_data = client->get_req_data();
 
-  int still_running;
-  int mstatus;
+  ceph_assert(req_data->lock.is_locked());
 
-  do {
-    if (wait_for_data) {
-      int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
-      if (ret < 0) {
-        return ret;
-      }
-    }
+  /* can only do that if threaded */
+  if (!is_started) {
+    return -EINVAL;
+  }
 
-    mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
-    switch (mstatus) {
-      case CURLM_OK:
-      case CURLM_CALL_MULTI_PERFORM:
-        break;
-      default:
-        dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
-        return -EINVAL;
-    }
-    int msgs_left;
-    CURLMsg *msg;
-    while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
-      if (msg->msg == CURLMSG_DONE) {
-       CURL *e = msg->easy_handle;
-       rgw_http_req_data *req_data;
-       curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
+  bool suggested_wr_paused = req_data->write_paused;
+  bool suggested_rd_paused = req_data->read_paused;
+
+  switch (state) {
+    case SET_WRITE_PAUSED:
+      suggested_wr_paused = true;
+      break;
+    case SET_WRITE_RESUME:
+      suggested_wr_paused = false;
+      break;
+    case SET_READ_PAUSED:
+      suggested_rd_paused = true;
+      break;
+    case SET_READ_RESUME:
+      suggested_rd_paused = false;
+      break;
+    default:
+      /* shouldn't really be here */
+      return -EIO;
+  }
+  if (suggested_wr_paused == req_data->write_paused &&
+      suggested_rd_paused == req_data->read_paused) {
+    return 0;
+  }
 
-       long http_status;
-       curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
+  req_data->write_paused = suggested_wr_paused;
+  req_data->read_paused = suggested_rd_paused;
 
-       int status = rgw_http_error_to_errno(http_status);
-       int result = msg->data.result;
-       finish_request(req_data, status);
-        switch (result) {
-          case CURLE_OK:
-            break;
-          default:
-            dout(20) << "ERROR: msg->data.result=" << result << dendl;
-            return -EIO;
-        }
-      }
-    }
-  } while (mstatus == CURLM_CALL_MULTI_PERFORM);
+  int bitmask = CURLPAUSE_CONT;
 
-  *done = (still_running == 0);
+  if (req_data->write_paused) {
+    bitmask |= CURLPAUSE_SEND;
+  }
 
-  return 0;
-}
+  if (req_data->read_paused) {
+    bitmask |= CURLPAUSE_RECV;
+  }
 
-/*
- * the synchronous, non-threaded request processing completion method.
- */
-int RGWHTTPManager::complete_requests()
-{
-  bool done = false;
-  int ret;
-  do {
-    ret = process_requests(true, &done);
-  } while (!done && !ret);
+  reqs_change_state.push_back(set_state(req_data, bitmask));
+  int ret = signal_thread();
+  if (ret < 0) {
+    return ret;
+  }
 
-  return ret;
+  return 0;
 }
 
-int RGWHTTPManager::set_threaded()
+int RGWHTTPManager::start()
 {
   if (pipe_cloexec(thread_pipe) < 0) {
     int e = errno;
@@ -1037,7 +1066,7 @@ int RGWHTTPManager::set_threaded()
                  thread_pipe[1], thread_pipe[0]);
 #endif
 
-  is_threaded = true;
+  is_started = true;
   reqs_thread = new ReqsThread(this);
   reqs_thread->create("http_manager");
   return 0;
@@ -1051,7 +1080,7 @@ void RGWHTTPManager::stop()
 
   is_stopped = true;
 
-  if (is_threaded) {
+  if (is_started) {
     going_down = true;
     signal_thread();
     reqs_thread->join();
@@ -1125,6 +1154,7 @@ void *RGWHTTPManager::reqs_thread_entry()
               << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
           default:
             dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
+            dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
            break;
         }
       }
@@ -1153,4 +1183,42 @@ void *RGWHTTPManager::reqs_thread_entry()
   return 0;
 }
 
+void rgw_http_client_init(CephContext *cct)
+{
+  curl_global_init(CURL_GLOBAL_ALL);
+  rgw_http_manager = new RGWHTTPManager(cct);
+  rgw_http_manager->start();
+}
+
+void rgw_http_client_cleanup()
+{
+  rgw_http_manager->stop();
+  delete rgw_http_manager;
+  curl_global_cleanup();
+}
+
+
+int RGWHTTP::send(RGWHTTPClient *req) {
+  if (!req) {
+    return 0;
+  }
+  int r = rgw_http_manager->add_request(req);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWHTTP::process(RGWHTTPClient *req) {
+  if (!req) {
+    return 0;
+  }
+  int r = send(req);
+  if (r < 0) {
+    return r;
+  }
+
+  return req->wait();
+}