1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
7 #include "common/async/yield_context.h"
8 #include "common/RWLock.h"
9 #include "common/Cond.h"
10 #include "rgw_common.h"
11 #include "rgw_string.h"
12 #include "rgw_http_client_types.h"
16 using param_pair_t
= pair
<string
, string
>;
17 using param_vec_t
= vector
<param_pair_t
>;
19 void rgw_http_client_init(CephContext
*cct
);
20 void rgw_http_client_cleanup();
22 struct rgw_http_req_data
;
25 class RGWHTTPClient
: public RGWIOProvider
27 friend class RGWHTTPManager
;
30 bufferlist::iterator send_iter
;
33 bool send_data_hint
{false};
34 size_t receive_pause_skip
{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
37 void *user_info
{nullptr};
39 rgw_http_req_data
*req_data
;
41 bool verify_ssl
; // Do not validate self signed certificates, default to false
43 std::atomic
<unsigned> stopped
{ 0 };
58 RGWHTTPManager
*get_manager();
60 int init_request(rgw_http_req_data
*req_data
);
62 virtual int receive_header(void *ptr
, size_t len
) {
65 virtual int receive_data(void *ptr
, size_t len
, bool *pause
) {
69 virtual int send_data(void *ptr
, size_t len
, bool *pause
=nullptr) {
73 /* Callbacks for libcurl. */
74 static size_t receive_http_header(void *ptr
,
79 static size_t receive_http_data(void *ptr
,
84 static size_t send_http_data(void *ptr
,
89 ceph::mutex
& get_req_lock();
91 /* needs to be called under req_lock() */
92 void _set_write_paused(bool pause
);
93 void _set_read_paused(bool pause
);
95 static const long HTTP_STATUS_NOSTATUS
= 0;
96 static const long HTTP_STATUS_UNAUTHORIZED
= 401;
97 static const long HTTP_STATUS_NOTFOUND
= 404;
99 static constexpr int HTTPCLIENT_IO_READ
= 0x1;
100 static constexpr int HTTPCLIENT_IO_WRITE
= 0x2;
101 static constexpr int HTTPCLIENT_IO_CONTROL
= 0x4;
103 virtual ~RGWHTTPClient();
104 explicit RGWHTTPClient(CephContext
*cct
,
105 const string
& _method
,
107 : has_send_len(false),
108 http_status(HTTP_STATUS_NOSTATUS
),
110 verify_ssl(cct
->_conf
->rgw_verify_ssl
),
116 void append_header(const string
& name
, const string
& val
) {
117 headers
.push_back(pair
<string
, string
>(name
, val
));
120 void set_send_length(size_t len
) {
125 void set_send_data_hint(bool hint
) {
126 send_data_hint
= hint
;
129 long get_http_status() const {
133 void set_http_status(long _http_status
) {
134 http_status
= _http_status
;
137 void set_verify_ssl(bool flag
) {
141 // set request timeout in seconds
142 // zero (default) mean that request will never timeout
143 void set_req_timeout(long timeout
) {
144 req_timeout
= timeout
;
147 int process(optional_yield y
);
149 int wait(optional_yield y
);
153 rgw_http_req_data
*get_req_data() { return req_data
; }
157 int get_req_retcode();
159 void set_url(const string
& _url
) {
163 void set_method(const string
& _method
) {
167 void set_io_user_info(void *_user_info
) override
{
168 user_info
= _user_info
;
171 void *get_io_user_info() override
{
177 class RGWHTTPHeadersCollector
: public RGWHTTPClient
{
179 typedef std::string header_name_t
;
180 typedef std::string header_value_t
;
181 typedef std::set
<header_name_t
, ltstr_nocase
> header_spec_t
;
183 RGWHTTPHeadersCollector(CephContext
* const cct
,
184 const string
& method
,
186 const header_spec_t
&relevant_headers
)
187 : RGWHTTPClient(cct
, method
, url
),
188 relevant_headers(relevant_headers
) {
191 std::map
<header_name_t
, header_value_t
, ltstr_nocase
> get_headers() const {
192 return found_headers
;
195 /* Throws std::out_of_range */
196 const header_value_t
& get_header_value(const header_name_t
& name
) const {
197 return found_headers
.at(name
);
201 int receive_header(void *ptr
, size_t len
) override
;
204 const std::set
<header_name_t
, ltstr_nocase
> relevant_headers
;
205 std::map
<header_name_t
, header_value_t
, ltstr_nocase
> found_headers
;
209 class RGWHTTPTransceiver
: public RGWHTTPHeadersCollector
{
210 bufferlist
* const read_bl
;
211 std::string post_data
;
212 size_t post_data_index
;
215 RGWHTTPTransceiver(CephContext
* const cct
,
216 const string
& method
,
218 bufferlist
* const read_bl
,
219 const header_spec_t intercept_headers
= {})
220 : RGWHTTPHeadersCollector(cct
, method
, url
, intercept_headers
),
225 RGWHTTPTransceiver(CephContext
* const cct
,
226 const string
& method
,
228 bufferlist
* const read_bl
,
229 const bool verify_ssl
,
230 const header_spec_t intercept_headers
= {})
231 : RGWHTTPHeadersCollector(cct
, method
, url
, intercept_headers
),
234 set_verify_ssl(verify_ssl
);
237 void set_post_data(const std::string
& _post_data
) {
238 this->post_data
= _post_data
;
242 int send_data(void* ptr
, size_t len
, bool *pause
=nullptr) override
;
244 int receive_data(void *ptr
, size_t len
, bool *pause
) override
{
245 read_bl
->append((char *)ptr
, len
);
250 typedef RGWHTTPTransceiver RGWPostHTTPData
;
253 class RGWCompletionManager
;
255 enum RGWHTTPRequestSetState
{
257 SET_WRITE_PAUSED
= 1,
258 SET_WRITE_RESUME
= 2,
263 class RGWHTTPManager
{
265 rgw_http_req_data
*req
;
268 set_state(rgw_http_req_data
*_req
, int _bitmask
) : req(_req
), bitmask(_bitmask
) {}
271 RGWCompletionManager
*completion_mgr
;
273 bool is_started
= false;
274 std::atomic
<unsigned> going_down
{ 0 };
275 std::atomic
<unsigned> is_stopped
{ 0 };
277 ceph::shared_mutex reqs_lock
= ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
278 map
<uint64_t, rgw_http_req_data
*> reqs
;
279 list
<rgw_http_req_data
*> unregistered_reqs
;
280 list
<set_state
> reqs_change_state
;
281 map
<uint64_t, rgw_http_req_data
*> complete_reqs
;
282 int64_t num_reqs
= 0;
283 int64_t max_threaded_req
= 0;
286 void register_request(rgw_http_req_data
*req_data
);
287 void complete_request(rgw_http_req_data
*req_data
);
288 void _complete_request(rgw_http_req_data
*req_data
);
289 bool unregister_request(rgw_http_req_data
*req_data
);
290 void _unlink_request(rgw_http_req_data
*req_data
);
291 void unlink_request(rgw_http_req_data
*req_data
);
292 void finish_request(rgw_http_req_data
*req_data
, int r
, long http_status
= -1);
293 void _finish_request(rgw_http_req_data
*req_data
, int r
);
294 void _set_req_state(set_state
& ss
);
295 int link_request(rgw_http_req_data
*req_data
);
297 void manage_pending_requests();
299 class ReqsThread
: public Thread
{
300 RGWHTTPManager
*manager
;
303 explicit ReqsThread(RGWHTTPManager
*_m
) : manager(_m
) {}
304 void *entry() override
;
307 ReqsThread
*reqs_thread
= nullptr;
309 void *reqs_thread_entry();
314 RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*completion_mgr
= NULL
);
320 int add_request(RGWHTTPClient
*client
);
321 int remove_request(RGWHTTPClient
*client
);
322 int set_request_state(RGWHTTPClient
*client
, RGWHTTPRequestSetState state
);
328 static int send(RGWHTTPClient
*req
);
329 static int process(RGWHTTPClient
*req
, optional_yield y
);