// vim: ts=8 sw=2 smarttab
#include "include/compat.h"
+#include "common/errno.h"
#include <boost/utility/string_ref.hpp>
#include "rgw_common.h"
#include "rgw_http_client.h"
#include "rgw_http_errors.h"
+#include "common/async/completion.h"
#include "common/RefCountedObj.h"
#include "rgw_coroutine.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
+RGWHTTPManager *rgw_http_manager;
+
+struct RGWCurlHandle;
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
+
struct rgw_http_req_data : public RefCountedObject {
- CURL *easy_handle;
- curl_slist *h;
+ RGWCurlHandle *curl_handle{nullptr};
+ curl_slist *h{nullptr};
uint64_t id;
- int ret;
+ int ret{0};
std::atomic<bool> done = { false };
- RGWHTTPClient *client;
- void *user_info;
- bool registered;
- RGWHTTPManager *mgr;
+ RGWHTTPClient *client{nullptr};
+ rgw_io_id control_io_id;
+ void *user_info{nullptr};
+ bool registered{false};
+ RGWHTTPManager *mgr{nullptr};
char error_buf[CURL_ERROR_SIZE];
+ bool write_paused{false};
+ bool read_paused{false};
Mutex lock;
Cond cond;
- rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
- client(nullptr), user_info(nullptr), registered(false),
- mgr(NULL), lock("rgw_http_req_data::lock") {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ std::unique_ptr<Completion> completion;
+
+ rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
+ // FIPS zeroization audit 20191115: this memset is not security related.
memset(error_buf, 0, sizeof(error_buf));
}
- int wait() {
+ template <typename ExecutionContext, typename CompletionToken>
+ auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto& handler = init.completion_handler;
+ {
+ std::unique_lock l{lock};
+ completion = Completion::create(ctx.get_executor(), std::move(handler));
+ }
+ return init.result.get();
+ }
+ int wait(optional_yield y) {
+ if (done) {
+ return ret;
+ }
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& context = y.get_io_context();
+ auto& yield = y.get_yield_context();
+ boost::system::error_code ec;
+ async_wait(context, yield[ec]);
+ return -ec.value();
+ }
+#endif
Mutex::Locker l(lock);
cond.Wait(lock);
return ret;
}
+ void set_state(int bitmask);
void finish(int r) {
Mutex::Locker l(lock);
ret = r;
- if (easy_handle)
- curl_easy_cleanup(easy_handle);
+ if (curl_handle)
+ do_curl_easy_cleanup(curl_handle);
if (h)
curl_slist_free_all(h);
- easy_handle = NULL;
+ curl_handle = NULL;
h = NULL;
done = true;
- cond.Signal();
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.Signal();
+ }
}
bool is_done() {
Mutex::Locker l(lock);
return mgr;
}
+
+ CURL *get_easy_handle() const;
};
struct RGWCurlHandle {
mono_time lastuse;
CURL* h;
- RGWCurlHandle(CURL* h) : uses(0), h(h) {};
+ explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
CURL* operator*() {
return this->h;
}
};
+void rgw_http_req_data::set_state(int bitmask) {
+ /* no need to lock here, moreover curl_easy_pause() might trigger
+ * the data receive callback :/
+ */
+ CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
+ if (rc != CURLE_OK) {
+ dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
+ }
+}
+
#define MAXIDLE 5
class RGWCurlHandles : public Thread {
public:
saved_curl.shrink_to_fit();
}
+CURL *rgw_http_req_data::get_easy_handle() const
+{
+ return **curl_handle;
+}
+
static RGWCurlHandles *handles;
+
+static RGWCurlHandle *do_curl_easy_init()
+{
+ return handles->get_curl_handle();
+}
+
+static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
+{
+ handles->release_curl_handle(curl_handle);
+}
+
// XXX make this part of the token cache? (but that's swift-only;
// and this especially needs to integrates with s3...)
delete handles;
}
-/*
- * the simple set of callbacks will be called on RGWHTTPClient::process()
- */
-/* Static methods - callbacks for libcurl. */
-size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
- const size_t size,
- const size_t nmemb,
- void * const _info)
-{
- RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- const size_t len = size * nmemb;
- int ret = client->receive_header(ptr, size * nmemb);
- if (ret < 0) {
- dout(0) << "WARNING: client->receive_header() returned ret="
- << ret << dendl;
- }
-
- return len;
-}
-
-size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
- const size_t size,
- const size_t nmemb,
- void * const _info)
-{
- RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- const size_t len = size * nmemb;
- int ret = client->receive_data(ptr, size * nmemb);
- if (ret < 0) {
- dout(0) << "WARNING: client->receive_data() returned ret="
- << ret << dendl;
- }
-
- return len;
-}
-
-size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
- const size_t size,
- const size_t nmemb,
- void * const _info)
+void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
{
- RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
- int ret = client->send_data(ptr, size * nmemb);
- if (ret < 0) {
- dout(0) << "WARNING: client->send_data() returned ret="
- << ret << dendl;
+ if (id == 0) {
+ id = io_id_provider.get_next();
}
-
- return ret;
}
/*
rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
size_t len = size * nmemb;
- Mutex::Locker l(req_data->lock);
-
- if (!req_data->registered) {
+ bool pause = false;
+
+ RGWHTTPClient *client;
+
+ {
+ Mutex::Locker l(req_data->lock);
+ if (!req_data->registered) {
+ return len;
+ }
+
+ client = req_data->client;
+ }
+
+ size_t& skip_bytes = client->receive_pause_skip;
+
+ if (skip_bytes >= len) {
+ skip_bytes -= len;
return len;
}
-
- int ret = req_data->client->receive_data(ptr, size * nmemb);
+
+ int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
if (ret < 0) {
dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
}
+ if (pause) {
+ dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
+ skip_bytes = len;
+ Mutex::Locker l(req_data->lock);
+ req_data->read_paused = true;
+ return CURL_WRITEFUNC_PAUSE;
+ }
+
+ skip_bytes = 0;
+
return len;
}
{
rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
- Mutex::Locker l(req_data->lock);
+ RGWHTTPClient *client;
+
+ {
+ Mutex::Locker l(req_data->lock);
- if (!req_data->registered) {
- return 0;
+ if (!req_data->registered) {
+ return 0;
+ }
+
+ client = req_data->client;
}
- int ret = req_data->client->send_data(ptr, size * nmemb);
+ bool pause = false;
+
+ int ret = client->send_data(ptr, size * nmemb, &pause);
if (ret < 0) {
dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
}
+ if (ret == 0 &&
+ pause) {
+ Mutex::Locker l(req_data->lock);
+ req_data->write_paused = true;
+ return CURL_READFUNC_PAUSE;
+ }
+
return ret;
}
+Mutex& RGWHTTPClient::get_req_lock()
+{
+ return req_data->lock;
+}
+
+void RGWHTTPClient::_set_write_paused(bool pause)
+{
+ ceph_assert(req_data->lock.is_locked());
+
+ RGWHTTPManager *mgr = req_data->mgr;
+ if (pause == req_data->write_paused) {
+ return;
+ }
+ if (pause) {
+ mgr->set_request_state(this, SET_WRITE_PAUSED);
+ } else {
+ mgr->set_request_state(this, SET_WRITE_RESUME);
+ }
+}
+
+void RGWHTTPClient::_set_read_paused(bool pause)
+{
+ ceph_assert(req_data->lock.is_locked());
+
+ RGWHTTPManager *mgr = req_data->mgr;
+ if (pause == req_data->read_paused) {
+ return;
+ }
+ if (pause) {
+ mgr->set_request_state(this, SET_READ_PAUSED);
+ } else {
+ mgr->set_request_state(this, SET_READ_RESUME);
+ }
+}
+
static curl_slist *headers_to_slist(param_vec_t& headers)
{
curl_slist *h = NULL;
val[i] = '-';
}
}
+
+ val = camelcase_dash_http_attr(val);
// curl won't send headers with empty values unless it ends with a ; instead
if (p.second.empty()) {
return h;
}
-static bool is_upload_request(const char *method)
+static bool is_upload_request(const string& method)
{
- if (method == nullptr) {
- return false;
- }
- return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
+ return method == "POST" || method == "PUT";
}
/*
- * process a single simple one off request, not going through RGWHTTPManager. Not using
- * req_data.
+ * process a single simple one off request
*/
-int RGWHTTPClient::process(const char *method, const char *url)
+int RGWHTTPClient::process(optional_yield y)
{
- int ret = 0;
- CURL *curl_handle;
-
- char error_buf[CURL_ERROR_SIZE];
-
- last_method = (method ? method : "");
- last_url = (url ? url : "");
-
- auto ca = handles->get_curl_handle();
- curl_handle = **ca;
-
- dout(20) << "sending request to " << url << dendl;
-
- curl_slist *h = headers_to_slist(headers);
-
- curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
- curl_easy_setopt(curl_handle, CURLOPT_URL, url);
- curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
- curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
- curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
- curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
- curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
- if (h) {
- curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
- }
- curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
- curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
- if (is_upload_request(method)) {
- curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
- }
- if (has_send_len) {
- curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
- }
- if (!verify_ssl) {
- curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
- curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
- dout(20) << "ssl verification is set to off" << dendl;
- }
-
- CURLcode status = curl_easy_perform(curl_handle);
- if (status) {
- dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
- ret = -EINVAL;
- }
- curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
- handles->release_curl_handle(ca);
- curl_slist_free_all(h);
-
- return ret;
+ return RGWHTTP::process(this, y);
}
string RGWHTTPClient::to_str()
{
- string method_str = (last_method.empty() ? "<no-method>" : last_method);
- string url_str = (last_url.empty() ? "<no-url>" : last_url);
+ string method_str = (method.empty() ? "<no-method>" : method);
+ string url_str = (url.empty() ? "<no-url>" : url);
return method_str + " " + url_str;
}
/*
* init request, will be used later with RGWHTTPManager
*/
-int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
+int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
{
- assert(!req_data);
+ ceph_assert(!req_data);
_req_data->get();
req_data = _req_data;
- CURL *easy_handle;
-
- easy_handle = curl_easy_init();
+ req_data->curl_handle = do_curl_easy_init();
- req_data->easy_handle = easy_handle;
+ CURL *easy_handle = req_data->get_easy_handle();
dout(20) << "sending request to " << url << dendl;
req_data->h = h;
- last_method = (method ? method : "");
- last_url = (url ? url : "");
-
- curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
- curl_easy_setopt(easy_handle, CURLOPT_URL, url);
+ curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
+ curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
+ curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
+ curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
if (h) {
curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
}
return 0;
}
+bool RGWHTTPClient::is_done()
+{
+ return req_data->is_done();
+}
+
/*
* wait for async request to complete
*/
-int RGWHTTPClient::wait()
+int RGWHTTPClient::wait(optional_yield y)
{
- if (!req_data->is_done()) {
- return req_data->wait();
- }
-
- return req_data->ret;
+ return req_data->wait(y);
}
-RGWHTTPClient::~RGWHTTPClient()
+void RGWHTTPClient::cancel()
{
if (req_data) {
- RGWHTTPManager *http_manager = req_data->get_manager();
+ RGWHTTPManager *http_manager = req_data->mgr;
if (http_manager) {
http_manager->remove_request(this);
}
+ }
+}
+RGWHTTPClient::~RGWHTTPClient()
+{
+ cancel();
+ if (req_data) {
req_data->put();
}
}
int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
{
- const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
+ const boost::string_ref header_line(static_cast<const char *>(ptr), len);
/* We're tokening the line that way due to backward compatibility. */
const size_t sep_loc = header_line.find_first_of(" \t:");
return 0;
}
-int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
+int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
{
int length_to_copy = 0;
if (post_data_index < post_data.length()) {
* RGWHTTPManager has two modes of operation: threaded and non-threaded.
*/
RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
- completion_mgr(_cm), is_threaded(false),
+ completion_mgr(_cm), is_started(false),
reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
reqs_thread(NULL)
{
req_data->registered = true;
reqs[num_reqs] = req_data;
num_reqs++;
- ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+ ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
}
-void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
+bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
{
RWLock::WLocker rl(reqs_lock);
+ if (!req_data->registered) {
+ return false;
+ }
req_data->get();
req_data->registered = false;
unregistered_reqs.push_back(req_data);
- ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
+ ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+ return true;
}
void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
req_data->mgr = nullptr;
}
if (completion_mgr) {
- completion_mgr->complete(NULL, req_data->user_info);
+ completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
}
req_data->put();
_complete_request(req_data);
}
+void RGWHTTPManager::_set_req_state(set_state& ss)
+{
+ ss.req->set_state(ss.bitmask);
+}
/*
* hook request to the curl multi handle
*/
int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
{
- ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
- CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
+ ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
+ CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
if (mstatus) {
dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
return -EIO;
*/
void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
{
- if (req_data->easy_handle) {
- curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
+ if (req_data->curl_handle) {
+ curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
}
if (!req_data->is_done()) {
_finish_request(req_data, -ECANCELED);
void RGWHTTPManager::manage_pending_requests()
{
reqs_lock.get_read();
- if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
+ if (max_threaded_req == num_reqs &&
+ unregistered_reqs.empty() &&
+ reqs_change_state.empty()) {
reqs_lock.unlock();
return;
}
}
}
+ if (!reqs_change_state.empty()) {
+ for (auto siter : reqs_change_state) {
+ _set_req_state(siter);
+ }
+ reqs_change_state.clear();
+ }
+
for (auto piter : remove_reqs) {
rgw_http_req_data *req_data = piter.first;
int r = piter.second;
}
}
-int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
+int RGWHTTPManager::add_request(RGWHTTPClient *client)
{
rgw_http_req_data *req_data = new rgw_http_req_data;
- int ret = client->init_request(method, url, req_data, send_data_hint);
+ int ret = client->init_request(req_data);
if (ret < 0) {
req_data->put();
req_data = NULL;
req_data->mgr = this;
req_data->client = client;
- req_data->user_info = client->get_user_info();
+ req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
+ req_data->user_info = client->get_io_user_info();
register_request(req_data);
- if (!is_threaded) {
+ if (!is_started) {
ret = link_request(req_data);
if (ret < 0) {
req_data->put();
{
rgw_http_req_data *req_data = client->get_req_data();
- if (!is_threaded) {
+ if (!is_started) {
unlink_request(req_data);
return 0;
}
- unregister_request(req_data);
+ if (!unregister_request(req_data)) {
+ return 0;
+ }
int ret = signal_thread();
if (ret < 0) {
return ret;
return 0;
}
-/*
- * the synchronous, non-threaded request processing method.
- */
-int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
+int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
{
- assert(!is_threaded);
+ rgw_http_req_data *req_data = client->get_req_data();
- int still_running;
- int mstatus;
+ ceph_assert(req_data->lock.is_locked());
- do {
- if (wait_for_data) {
- int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
- if (ret < 0) {
- return ret;
- }
- }
+ /* can only do that if threaded */
+ if (!is_started) {
+ return -EINVAL;
+ }
- mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
- switch (mstatus) {
- case CURLM_OK:
- case CURLM_CALL_MULTI_PERFORM:
- break;
- default:
- dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
- return -EINVAL;
- }
- int msgs_left;
- CURLMsg *msg;
- while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
- if (msg->msg == CURLMSG_DONE) {
- CURL *e = msg->easy_handle;
- rgw_http_req_data *req_data;
- curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
+ bool suggested_wr_paused = req_data->write_paused;
+ bool suggested_rd_paused = req_data->read_paused;
+
+ switch (state) {
+ case SET_WRITE_PAUSED:
+ suggested_wr_paused = true;
+ break;
+ case SET_WRITE_RESUME:
+ suggested_wr_paused = false;
+ break;
+ case SET_READ_PAUSED:
+ suggested_rd_paused = true;
+ break;
+ case SET_READ_RESUME:
+ suggested_rd_paused = false;
+ break;
+ default:
+ /* shouldn't really be here */
+ return -EIO;
+ }
+ if (suggested_wr_paused == req_data->write_paused &&
+ suggested_rd_paused == req_data->read_paused) {
+ return 0;
+ }
- long http_status;
- curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
+ req_data->write_paused = suggested_wr_paused;
+ req_data->read_paused = suggested_rd_paused;
- int status = rgw_http_error_to_errno(http_status);
- int result = msg->data.result;
- finish_request(req_data, status);
- switch (result) {
- case CURLE_OK:
- break;
- default:
- dout(20) << "ERROR: msg->data.result=" << result << dendl;
- return -EIO;
- }
- }
- }
- } while (mstatus == CURLM_CALL_MULTI_PERFORM);
+ int bitmask = CURLPAUSE_CONT;
- *done = (still_running == 0);
+ if (req_data->write_paused) {
+ bitmask |= CURLPAUSE_SEND;
+ }
- return 0;
-}
+ if (req_data->read_paused) {
+ bitmask |= CURLPAUSE_RECV;
+ }
-/*
- * the synchronous, non-threaded request processing completion method.
- */
-int RGWHTTPManager::complete_requests()
-{
- bool done = false;
- int ret;
- do {
- ret = process_requests(true, &done);
- } while (!done && !ret);
+ reqs_change_state.push_back(set_state(req_data, bitmask));
+ int ret = signal_thread();
+ if (ret < 0) {
+ return ret;
+ }
- return ret;
+ return 0;
}
-int RGWHTTPManager::set_threaded()
+int RGWHTTPManager::start()
{
- int r = pipe(thread_pipe);
- if (r < 0) {
- r = -errno;
- ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
- return r;
+ if (pipe_cloexec(thread_pipe) < 0) {
+ int e = errno;
+ ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
+ return -e;
}
// enable non-blocking reads
- r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
- if (r < 0) {
- r = -errno;
- ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
+ if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
+ int e = errno;
+ ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
- return r;
+ return -e;
}
#ifdef HAVE_CURL_MULTI_WAIT
thread_pipe[1], thread_pipe[0]);
#endif
- is_threaded = true;
+ is_started = true;
reqs_thread = new ReqsThread(this);
reqs_thread->create("http_manager");
return 0;
is_stopped = true;
- if (is_threaded) {
+ if (is_started) {
going_down = true;
signal_thread();
reqs_thread->join();
curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
int status = rgw_http_error_to_errno(http_status);
- if (result != CURLE_OK && http_status == 0) {
+ if (result != CURLE_OK && status == 0) {
+ dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
status = -EAGAIN;
}
int id = req_data->id;
switch (result) {
case CURLE_OK:
break;
+ case CURLE_OPERATION_TIMEDOUT:
+ dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
+ << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
default:
dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
+ dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
break;
}
}
RWLock::WLocker rl(reqs_lock);
for (auto r : unregistered_reqs) {
- _finish_request(r, -ECANCELED);
+ _unlink_request(r);
}
unregistered_reqs.clear();
auto all_reqs = std::move(reqs);
for (auto iter : all_reqs) {
- _finish_request(iter.second, -ECANCELED);
+ _unlink_request(iter.second);
}
reqs.clear();
return 0;
}
+void rgw_http_client_init(CephContext *cct)
+{
+ curl_global_init(CURL_GLOBAL_ALL);
+ rgw_http_manager = new RGWHTTPManager(cct);
+ rgw_http_manager->start();
+}
+
+void rgw_http_client_cleanup()
+{
+ rgw_http_manager->stop();
+ delete rgw_http_manager;
+ curl_global_cleanup();
+}
+
+
+int RGWHTTP::send(RGWHTTPClient *req) {
+ if (!req) {
+ return 0;
+ }
+ int r = rgw_http_manager->add_request(req);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
+ if (!req) {
+ return 0;
+ }
+ int r = send(req);
+ if (r < 0) {
+ return r;
+ }
+
+ return req->wait(y);
+}