using param_pair_t = pair<string, string>;
using param_vec_t = vector<param_pair_t>;
+void rgw_http_client_init(CephContext *cct);
+void rgw_http_client_cleanup();
+
struct rgw_http_req_data;
+class RGWHTTPManager;
+
+class RGWIOIDProvider
+{
+ std::atomic<int64_t> max = {0};
+
+public:
+ RGWIOIDProvider() {}
+ int64_t get_next() {
+ return ++max;
+ }
+};
+
+struct rgw_io_id {
+ int64_t id{0};
+ int channels{0};
+
+ rgw_io_id() {}
+ rgw_io_id(int64_t _id, int _channels) : id(_id), channels(_channels) {}
+
+ bool intersects(const rgw_io_id& rhs) {
+ return (id == rhs.id && ((channels | rhs.channels) != 0));
+ }
+
+ bool operator<(const rgw_io_id& rhs) const {
+ if (id < rhs.id) {
+ return true;
+ }
+ return (id == rhs.id &&
+ channels < rhs.channels);
+ }
+};
+
+class RGWIOProvider
+{
+ int64_t id{-1};
-class RGWHTTPClient
+public:
+ RGWIOProvider() {}
+ virtual ~RGWIOProvider() = default;
+
+ void assign_io(RGWIOIDProvider& io_id_provider, int io_type = -1);
+ rgw_io_id get_io_id(int io_type) {
+ return rgw_io_id{id, io_type};
+ }
+
+ virtual void set_io_user_info(void *_user_info) = 0;
+ virtual void *get_io_user_info() = 0;
+};
+
+class RGWHTTPClient : public RGWIOProvider
{
friend class RGWHTTPManager;
bufferlist send_bl;
bufferlist::iterator send_iter;
- size_t send_len;
bool has_send_len;
long http_status;
+ bool send_data_hint{false};
+ size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
+ due to being paused */
- rgw_http_req_data *req_data;
+ void *user_info{nullptr};
- void *user_info;
+ rgw_http_req_data *req_data;
- string last_method;
- string last_url;
bool verify_ssl; // Do not validate self signed certificates, default to false
std::atomic<unsigned> stopped { 0 };
+
protected:
CephContext *cct;
+
+ string method;
+ string url;
+
+ size_t send_len{0};
+
param_vec_t headers;
- int init_request(const char *method,
- const char *url,
- rgw_http_req_data *req_data,
- bool send_data_hint = false);
+ RGWHTTPManager *get_manager();
+
+ int init_request(rgw_http_req_data *req_data);
virtual int receive_header(void *ptr, size_t len) {
return 0;
}
- virtual int receive_data(void *ptr, size_t len) {
+ virtual int receive_data(void *ptr, size_t len, bool *pause) {
return 0;
}
- virtual int send_data(void *ptr, size_t len) {
+
+ virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
return 0;
}
/* Callbacks for libcurl. */
- static size_t simple_receive_http_header(void *ptr,
- size_t size,
- size_t nmemb,
- void *_info);
static size_t receive_http_header(void *ptr,
size_t size,
size_t nmemb,
void *_info);
- static size_t simple_receive_http_data(void *ptr,
- size_t size,
- size_t nmemb,
- void *_info);
static size_t receive_http_data(void *ptr,
size_t size,
size_t nmemb,
void *_info);
- static size_t simple_send_http_data(void *ptr,
- size_t size,
- size_t nmemb,
- void *_info);
static size_t send_http_data(void *ptr,
size_t size,
size_t nmemb,
void *_info);
+
+ Mutex& get_req_lock();
+
+ /* needs to be called under req_lock() */
+ void _set_write_paused(bool pause);
+ void _set_read_paused(bool pause);
public:
static const long HTTP_STATUS_NOSTATUS = 0;
static const long HTTP_STATUS_UNAUTHORIZED = 401;
static const long HTTP_STATUS_NOTFOUND = 404;
+ static constexpr int HTTPCLIENT_IO_READ = 0x1;
+ static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
+ static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
+
virtual ~RGWHTTPClient();
- explicit RGWHTTPClient(CephContext *cct)
- : send_len(0),
- has_send_len(false),
+ explicit RGWHTTPClient(CephContext *cct,
+ const string& _method,
+ const string& _url)
+ : has_send_len(false),
http_status(HTTP_STATUS_NOSTATUS),
req_data(nullptr),
- user_info(nullptr),
verify_ssl(cct->_conf->rgw_verify_ssl),
- cct(cct) {
- }
-
- void set_user_info(void *info) {
- user_info = info;
- }
-
- void *get_user_info() {
- return user_info;
+ cct(cct),
+ method(_method),
+ url(_url) {
}
void append_header(const string& name, const string& val) {
has_send_len = true;
}
+ void set_send_data_hint(bool hint) {
+ send_data_hint = hint;
+ }
long get_http_status() const {
return http_status;
verify_ssl = flag;
}
- int process(const char *method, const char *url);
- int process(const char *url) { return process("GET", url); }
+ int process();
int wait();
+ void cancel();
+ bool is_done();
+
rgw_http_req_data *get_req_data() { return req_data; }
string to_str();
int get_req_retcode();
+
+ void set_url(const string& _url) {
+ url = _url;
+ }
+
+ void set_method(const string& _method) {
+ method = _method;
+ }
+
+ void set_io_user_info(void *_user_info) override {
+ user_info = _user_info;
+ }
+
+ void *get_io_user_info() override {
+ return user_info;
+ }
};
typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
RGWHTTPHeadersCollector(CephContext * const cct,
- const header_spec_t relevant_headers)
- : RGWHTTPClient(cct),
+ const string& method,
+ const string& url,
+ const header_spec_t &relevant_headers)
+ : RGWHTTPClient(cct, method, url),
relevant_headers(relevant_headers) {
}
protected:
int receive_header(void *ptr, size_t len) override;
- int receive_data(void *ptr, size_t len) override {
- return 0;
- }
-
- int send_data(void *ptr, size_t len) override {
- return 0;
- }
-
private:
const std::set<header_name_t, ltstr_nocase> relevant_headers;
std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
public:
RGWHTTPTransceiver(CephContext * const cct,
+ const string& method,
+ const string& url,
bufferlist * const read_bl,
const header_spec_t intercept_headers = {})
- : RGWHTTPHeadersCollector(cct, intercept_headers),
+ : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
read_bl(read_bl),
post_data_index(0) {
}
RGWHTTPTransceiver(CephContext * const cct,
+ const string& method,
+ const string& url,
bufferlist * const read_bl,
const bool verify_ssl,
const header_spec_t intercept_headers = {})
- : RGWHTTPHeadersCollector(cct, intercept_headers),
+ : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
read_bl(read_bl),
post_data_index(0) {
set_verify_ssl(verify_ssl);
}
protected:
- int send_data(void* ptr, size_t len) override;
+ int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
- int receive_data(void *ptr, size_t len) override {
+ int receive_data(void *ptr, size_t len, bool *pause) override {
read_bl->append((char *)ptr, len);
return 0;
}
class RGWCompletionManager;
+enum RGWHTTPRequestSetState {
+ SET_NOP = 0,
+ SET_WRITE_PAUSED = 1,
+ SET_WRITE_RESUME = 2,
+ SET_READ_PAUSED = 3,
+ SET_READ_RESUME = 4,
+};
+
class RGWHTTPManager {
+ struct set_state {
+ rgw_http_req_data *req;
+ int bitmask;
+
+ set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
+ };
CephContext *cct;
RGWCompletionManager *completion_mgr;
void *multi_handle;
- bool is_threaded;
+ bool is_started;
std::atomic<unsigned> going_down { 0 };
std::atomic<unsigned> is_stopped { 0 };
RWLock reqs_lock;
map<uint64_t, rgw_http_req_data *> reqs;
list<rgw_http_req_data *> unregistered_reqs;
+ list<set_state> reqs_change_state;
map<uint64_t, rgw_http_req_data *> complete_reqs;
int64_t num_reqs;
int64_t max_threaded_req;
void register_request(rgw_http_req_data *req_data);
void complete_request(rgw_http_req_data *req_data);
void _complete_request(rgw_http_req_data *req_data);
- void unregister_request(rgw_http_req_data *req_data);
+ bool unregister_request(rgw_http_req_data *req_data);
void _unlink_request(rgw_http_req_data *req_data);
void unlink_request(rgw_http_req_data *req_data);
void finish_request(rgw_http_req_data *req_data, int r);
void _finish_request(rgw_http_req_data *req_data, int r);
+ void _set_req_state(set_state& ss);
int link_request(rgw_http_req_data *req_data);
void manage_pending_requests();
RGWHTTPManager *manager;
public:
- ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
+ explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
void *entry() override;
};
RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
~RGWHTTPManager();
- int set_threaded();
+ int start();
void stop();
- int add_request(RGWHTTPClient *client, const char *method, const char *url,
- bool send_data_hint = false);
+ int add_request(RGWHTTPClient *client);
int remove_request(RGWHTTPClient *client);
-
- /* only for non threaded case */
- int process_requests(bool wait_for_data, bool *done);
-
- int complete_requests();
+ int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
};
+class RGWHTTP
+{
+public:
+ static int send(RGWHTTPClient *req);
+ static int process(RGWHTTPClient *req);
+};
#endif