1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
7 #include "common/async/yield_context.h"
8 #include "common/Cond.h"
9 #include "rgw_common.h"
10 #include "rgw_string.h"
11 #include "rgw_http_client_types.h"
15 using param_pair_t
= std::pair
<std::string
, std::string
>;
16 using param_vec_t
= std::vector
<param_pair_t
>;
18 void rgw_http_client_init(CephContext
*cct
);
19 void rgw_http_client_cleanup();
21 struct rgw_http_req_data
;
24 class RGWHTTPClient
: public RGWIOProvider
,
27 friend class RGWHTTPManager
;
30 bufferlist::iterator send_iter
;
33 bool send_data_hint
{false};
34 size_t receive_pause_skip
{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
37 void *user_info
{nullptr};
39 rgw_http_req_data
*req_data
;
41 bool verify_ssl
; // Do not validate self signed certificates, default to false
45 std::string client_cert
;
47 std::string client_key
;
49 std::atomic
<unsigned> stopped
{ 0 };
60 std::string resource_prefix
;
70 RGWHTTPManager
*get_manager();
72 int init_request(rgw_http_req_data
*req_data
);
74 virtual int receive_header(void *ptr
, size_t len
) {
77 virtual int receive_data(void *ptr
, size_t len
, bool *pause
) {
81 virtual int send_data(void *ptr
, size_t len
, bool *pause
=nullptr) {
85 /* Callbacks for libcurl. */
86 static size_t receive_http_header(void *ptr
,
91 static size_t receive_http_data(void *ptr
,
96 static size_t send_http_data(void *ptr
,
101 ceph::mutex
& get_req_lock();
103 /* needs to be called under req_lock() */
104 void _set_write_paused(bool pause
);
105 void _set_read_paused(bool pause
);
107 static const long HTTP_STATUS_NOSTATUS
= 0;
108 static const long HTTP_STATUS_UNAUTHORIZED
= 401;
109 static const long HTTP_STATUS_NOTFOUND
= 404;
111 static constexpr int HTTPCLIENT_IO_READ
= 0x1;
112 static constexpr int HTTPCLIENT_IO_WRITE
= 0x2;
113 static constexpr int HTTPCLIENT_IO_CONTROL
= 0x4;
115 virtual ~RGWHTTPClient();
116 explicit RGWHTTPClient(CephContext
*cct
,
117 const std::string
& _method
,
118 const std::string
& _url
);
120 std::ostream
& gen_prefix(std::ostream
& out
) const override
;
123 void append_header(const std::string
& name
, const std::string
& val
) {
124 headers
.push_back(std::pair
<std::string
, std::string
>(name
, val
));
127 void set_send_length(size_t len
) {
132 void set_send_data_hint(bool hint
) {
133 send_data_hint
= hint
;
136 long get_http_status() const {
140 void set_http_status(long _http_status
) {
141 http_status
= _http_status
;
144 void set_verify_ssl(bool flag
) {
148 // set request timeout in seconds
149 // zero (default) mean that request will never timeout
150 void set_req_timeout(long timeout
) {
151 req_timeout
= timeout
;
154 int process(optional_yield y
);
156 int wait(optional_yield y
);
160 rgw_http_req_data
*get_req_data() { return req_data
; }
162 std::string
to_str();
164 int get_req_retcode();
166 void set_url(const std::string
& _url
) {
170 void set_method(const std::string
& _method
) {
174 void set_io_user_info(void *_user_info
) override
{
175 user_info
= _user_info
;
178 void *get_io_user_info() override
{
182 void set_ca_path(const std::string
& _ca_path
) {
186 void set_client_cert(const std::string
& _client_cert
) {
187 client_cert
= _client_cert
;
190 void set_client_key(const std::string
& _client_key
) {
191 client_key
= _client_key
;
196 class RGWHTTPHeadersCollector
: public RGWHTTPClient
{
198 typedef std::string header_name_t
;
199 typedef std::string header_value_t
;
200 typedef std::set
<header_name_t
, ltstr_nocase
> header_spec_t
;
202 RGWHTTPHeadersCollector(CephContext
* const cct
,
203 const std::string
& method
,
204 const std::string
& url
,
205 const header_spec_t
&relevant_headers
)
206 : RGWHTTPClient(cct
, method
, url
),
207 relevant_headers(relevant_headers
) {
210 std::map
<header_name_t
, header_value_t
, ltstr_nocase
> get_headers() const {
211 return found_headers
;
214 /* Throws std::out_of_range */
215 const header_value_t
& get_header_value(const header_name_t
& name
) const {
216 return found_headers
.at(name
);
220 int receive_header(void *ptr
, size_t len
) override
;
223 const std::set
<header_name_t
, ltstr_nocase
> relevant_headers
;
224 std::map
<header_name_t
, header_value_t
, ltstr_nocase
> found_headers
;
228 class RGWHTTPTransceiver
: public RGWHTTPHeadersCollector
{
229 bufferlist
* const read_bl
;
230 std::string post_data
;
231 size_t post_data_index
;
234 RGWHTTPTransceiver(CephContext
* const cct
,
235 const std::string
& method
,
236 const std::string
& url
,
237 bufferlist
* const read_bl
,
238 const header_spec_t intercept_headers
= {})
239 : RGWHTTPHeadersCollector(cct
, method
, url
, intercept_headers
),
244 RGWHTTPTransceiver(CephContext
* const cct
,
245 const std::string
& method
,
246 const std::string
& url
,
247 bufferlist
* const read_bl
,
248 const bool verify_ssl
,
249 const header_spec_t intercept_headers
= {})
250 : RGWHTTPHeadersCollector(cct
, method
, url
, intercept_headers
),
253 set_verify_ssl(verify_ssl
);
256 void set_post_data(const std::string
& _post_data
) {
257 this->post_data
= _post_data
;
261 int send_data(void* ptr
, size_t len
, bool *pause
=nullptr) override
;
263 int receive_data(void *ptr
, size_t len
, bool *pause
) override
{
264 read_bl
->append((char *)ptr
, len
);
269 typedef RGWHTTPTransceiver RGWPostHTTPData
;
272 class RGWCompletionManager
;
274 enum RGWHTTPRequestSetState
{
276 SET_WRITE_PAUSED
= 1,
277 SET_WRITE_RESUME
= 2,
282 class RGWHTTPManager
{
284 rgw_http_req_data
*req
;
287 set_state(rgw_http_req_data
*_req
, int _bitmask
) : req(_req
), bitmask(_bitmask
) {}
290 RGWCompletionManager
*completion_mgr
;
292 bool is_started
= false;
293 std::atomic
<unsigned> going_down
{ 0 };
294 std::atomic
<unsigned> is_stopped
{ 0 };
296 ceph::shared_mutex reqs_lock
= ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
297 std::map
<uint64_t, rgw_http_req_data
*> reqs
;
298 std::list
<rgw_http_req_data
*> unregistered_reqs
;
299 std::list
<set_state
> reqs_change_state
;
300 std::map
<uint64_t, rgw_http_req_data
*> complete_reqs
;
301 int64_t num_reqs
= 0;
302 int64_t max_threaded_req
= 0;
305 void register_request(rgw_http_req_data
*req_data
);
306 void complete_request(rgw_http_req_data
*req_data
);
307 void _complete_request(rgw_http_req_data
*req_data
);
308 bool unregister_request(rgw_http_req_data
*req_data
);
309 void _unlink_request(rgw_http_req_data
*req_data
);
310 void unlink_request(rgw_http_req_data
*req_data
);
311 void finish_request(rgw_http_req_data
*req_data
, int r
, long http_status
= -1);
312 void _finish_request(rgw_http_req_data
*req_data
, int r
);
313 void _set_req_state(set_state
& ss
);
314 int link_request(rgw_http_req_data
*req_data
);
316 void manage_pending_requests();
318 class ReqsThread
: public Thread
{
319 RGWHTTPManager
*manager
;
322 explicit ReqsThread(RGWHTTPManager
*_m
) : manager(_m
) {}
323 void *entry() override
;
326 ReqsThread
*reqs_thread
= nullptr;
328 void *reqs_thread_entry();
333 RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*completion_mgr
= NULL
);
339 int add_request(RGWHTTPClient
*client
);
340 int remove_request(RGWHTTPClient
*client
);
341 int set_request_state(RGWHTTPClient
*client
, RGWHTTPRequestSetState state
);
347 static int send(RGWHTTPClient
*req
);
348 static int process(RGWHTTPClient
*req
, optional_yield y
);