1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
7 #include "common/async/yield_context.h"
8 #include "common/RWLock.h"
9 #include "common/Cond.h"
10 #include "rgw_common.h"
11 #include "rgw_string.h"
12 #include "rgw_http_client_types.h"
16 using param_pair_t
= pair
<string
, string
>;
17 using param_vec_t
= vector
<param_pair_t
>;
19 void rgw_http_client_init(CephContext
*cct
);
20 void rgw_http_client_cleanup();
22 struct rgw_http_req_data
;
25 class RGWHTTPClient
: public RGWIOProvider
27 friend class RGWHTTPManager
;
30 bufferlist::iterator send_iter
;
33 bool send_data_hint
{false};
34 size_t receive_pause_skip
{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
37 void *user_info
{nullptr};
39 rgw_http_req_data
*req_data
;
41 bool verify_ssl
; // Do not validate self signed certificates, default to false
43 std::atomic
<unsigned> stopped
{ 0 };
56 RGWHTTPManager
*get_manager();
58 int init_request(rgw_http_req_data
*req_data
);
60 virtual int receive_header(void *ptr
, size_t len
) {
63 virtual int receive_data(void *ptr
, size_t len
, bool *pause
) {
67 virtual int send_data(void *ptr
, size_t len
, bool *pause
=nullptr) {
71 /* Callbacks for libcurl. */
72 static size_t receive_http_header(void *ptr
,
77 static size_t receive_http_data(void *ptr
,
82 static size_t send_http_data(void *ptr
,
87 ceph::mutex
& get_req_lock();
89 /* needs to be called under req_lock() */
90 void _set_write_paused(bool pause
);
91 void _set_read_paused(bool pause
);
93 static const long HTTP_STATUS_NOSTATUS
= 0;
94 static const long HTTP_STATUS_UNAUTHORIZED
= 401;
95 static const long HTTP_STATUS_NOTFOUND
= 404;
97 static constexpr int HTTPCLIENT_IO_READ
= 0x1;
98 static constexpr int HTTPCLIENT_IO_WRITE
= 0x2;
99 static constexpr int HTTPCLIENT_IO_CONTROL
= 0x4;
101 virtual ~RGWHTTPClient();
102 explicit RGWHTTPClient(CephContext
*cct
,
103 const string
& _method
,
105 : has_send_len(false),
106 http_status(HTTP_STATUS_NOSTATUS
),
108 verify_ssl(cct
->_conf
->rgw_verify_ssl
),
114 void append_header(const string
& name
, const string
& val
) {
115 headers
.push_back(pair
<string
, string
>(name
, val
));
118 void set_send_length(size_t len
) {
123 void set_send_data_hint(bool hint
) {
124 send_data_hint
= hint
;
127 long get_http_status() const {
131 void set_http_status(long _http_status
) {
132 http_status
= _http_status
;
135 void set_verify_ssl(bool flag
) {
139 int process(optional_yield y
);
141 int wait(optional_yield y
);
145 rgw_http_req_data
*get_req_data() { return req_data
; }
149 int get_req_retcode();
151 void set_url(const string
& _url
) {
155 void set_method(const string
& _method
) {
159 void set_io_user_info(void *_user_info
) override
{
160 user_info
= _user_info
;
163 void *get_io_user_info() override
{
169 class RGWHTTPHeadersCollector
: public RGWHTTPClient
{
171 typedef std::string header_name_t
;
172 typedef std::string header_value_t
;
173 typedef std::set
<header_name_t
, ltstr_nocase
> header_spec_t
;
175 RGWHTTPHeadersCollector(CephContext
* const cct
,
176 const string
& method
,
178 const header_spec_t
&relevant_headers
)
179 : RGWHTTPClient(cct
, method
, url
),
180 relevant_headers(relevant_headers
) {
183 std::map
<header_name_t
, header_value_t
, ltstr_nocase
> get_headers() const {
184 return found_headers
;
187 /* Throws std::out_of_range */
188 const header_value_t
& get_header_value(const header_name_t
& name
) const {
189 return found_headers
.at(name
);
193 int receive_header(void *ptr
, size_t len
) override
;
196 const std::set
<header_name_t
, ltstr_nocase
> relevant_headers
;
197 std::map
<header_name_t
, header_value_t
, ltstr_nocase
> found_headers
;
201 class RGWHTTPTransceiver
: public RGWHTTPHeadersCollector
{
202 bufferlist
* const read_bl
;
203 std::string post_data
;
204 size_t post_data_index
;
207 RGWHTTPTransceiver(CephContext
* const cct
,
208 const string
& method
,
210 bufferlist
* const read_bl
,
211 const header_spec_t intercept_headers
= {})
212 : RGWHTTPHeadersCollector(cct
, method
, url
, intercept_headers
),
217 RGWHTTPTransceiver(CephContext
* const cct
,
218 const string
& method
,
220 bufferlist
* const read_bl
,
221 const bool verify_ssl
,
222 const header_spec_t intercept_headers
= {})
223 : RGWHTTPHeadersCollector(cct
, method
, url
, intercept_headers
),
226 set_verify_ssl(verify_ssl
);
229 void set_post_data(const std::string
& _post_data
) {
230 this->post_data
= _post_data
;
234 int send_data(void* ptr
, size_t len
, bool *pause
=nullptr) override
;
236 int receive_data(void *ptr
, size_t len
, bool *pause
) override
{
237 read_bl
->append((char *)ptr
, len
);
242 typedef RGWHTTPTransceiver RGWPostHTTPData
;
245 class RGWCompletionManager
;
247 enum RGWHTTPRequestSetState
{
249 SET_WRITE_PAUSED
= 1,
250 SET_WRITE_RESUME
= 2,
255 class RGWHTTPManager
{
257 rgw_http_req_data
*req
;
260 set_state(rgw_http_req_data
*_req
, int _bitmask
) : req(_req
), bitmask(_bitmask
) {}
263 RGWCompletionManager
*completion_mgr
;
265 bool is_started
= false;
266 std::atomic
<unsigned> going_down
{ 0 };
267 std::atomic
<unsigned> is_stopped
{ 0 };
269 ceph::shared_mutex reqs_lock
= ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
270 map
<uint64_t, rgw_http_req_data
*> reqs
;
271 list
<rgw_http_req_data
*> unregistered_reqs
;
272 list
<set_state
> reqs_change_state
;
273 map
<uint64_t, rgw_http_req_data
*> complete_reqs
;
274 int64_t num_reqs
= 0;
275 int64_t max_threaded_req
= 0;
278 void register_request(rgw_http_req_data
*req_data
);
279 void complete_request(rgw_http_req_data
*req_data
);
280 void _complete_request(rgw_http_req_data
*req_data
);
281 bool unregister_request(rgw_http_req_data
*req_data
);
282 void _unlink_request(rgw_http_req_data
*req_data
);
283 void unlink_request(rgw_http_req_data
*req_data
);
284 void finish_request(rgw_http_req_data
*req_data
, int r
, long http_status
= -1);
285 void _finish_request(rgw_http_req_data
*req_data
, int r
);
286 void _set_req_state(set_state
& ss
);
287 int link_request(rgw_http_req_data
*req_data
);
289 void manage_pending_requests();
291 class ReqsThread
: public Thread
{
292 RGWHTTPManager
*manager
;
295 explicit ReqsThread(RGWHTTPManager
*_m
) : manager(_m
) {}
296 void *entry() override
;
299 ReqsThread
*reqs_thread
= nullptr;
301 void *reqs_thread_entry();
306 RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*completion_mgr
= NULL
);
312 int add_request(RGWHTTPClient
*client
);
313 int remove_request(RGWHTTPClient
*client
);
314 int set_request_state(RGWHTTPClient
*client
, RGWHTTPRequestSetState state
);
320 static int send(RGWHTTPClient
*req
);
321 static int process(RGWHTTPClient
*req
, optional_yield y
);