]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_http_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
6
7 #include "common/async/yield_context.h"
8 #include "common/Cond.h"
9 #include "rgw_common.h"
10 #include "rgw_string.h"
11 #include "rgw_http_client_types.h"
12
13 #include <atomic>
14
15 using param_pair_t = std::pair<std::string, std::string>;
16 using param_vec_t = std::vector<param_pair_t>;
17
18 void rgw_http_client_init(CephContext *cct);
19 void rgw_http_client_cleanup();
20
21 struct rgw_http_req_data;
22 class RGWHTTPManager;
23
24 class RGWHTTPClient : public RGWIOProvider,
25 public NoDoutPrefix
26 {
27 friend class RGWHTTPManager;
28
29 bufferlist send_bl;
30 bufferlist::iterator send_iter;
31 bool has_send_len;
32 long http_status;
33 bool send_data_hint{false};
34 size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
36
37 void *user_info{nullptr};
38
39 rgw_http_req_data *req_data;
40
41 bool verify_ssl; // Do not validate self signed certificates, default to false
42
43 std::string ca_path;
44
45 std::string client_cert;
46
47 std::string client_key;
48
49 std::atomic<unsigned> stopped { 0 };
50
51
52 protected:
53 CephContext *cct;
54
55 std::string method;
56 std::string url;
57
58 std::string protocol;
59 std::string host;
60 std::string resource_prefix;
61
62 size_t send_len{0};
63
64 param_vec_t headers;
65
66 long req_timeout{0L};
67
68 void init();
69
70 RGWHTTPManager *get_manager();
71
72 int init_request(rgw_http_req_data *req_data);
73
74 virtual int receive_header(void *ptr, size_t len) {
75 return 0;
76 }
77 virtual int receive_data(void *ptr, size_t len, bool *pause) {
78 return 0;
79 }
80
81 virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
82 return 0;
83 }
84
85 /* Callbacks for libcurl. */
86 static size_t receive_http_header(void *ptr,
87 size_t size,
88 size_t nmemb,
89 void *_info);
90
91 static size_t receive_http_data(void *ptr,
92 size_t size,
93 size_t nmemb,
94 void *_info);
95
96 static size_t send_http_data(void *ptr,
97 size_t size,
98 size_t nmemb,
99 void *_info);
100
101 ceph::mutex& get_req_lock();
102
103 /* needs to be called under req_lock() */
104 void _set_write_paused(bool pause);
105 void _set_read_paused(bool pause);
106 public:
107 static const long HTTP_STATUS_NOSTATUS = 0;
108 static const long HTTP_STATUS_UNAUTHORIZED = 401;
109 static const long HTTP_STATUS_NOTFOUND = 404;
110
111 static constexpr int HTTPCLIENT_IO_READ = 0x1;
112 static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
113 static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
114
115 virtual ~RGWHTTPClient();
116 explicit RGWHTTPClient(CephContext *cct,
117 const std::string& _method,
118 const std::string& _url);
119
120 std::ostream& gen_prefix(std::ostream& out) const override;
121
122
123 void append_header(const std::string& name, const std::string& val) {
124 headers.push_back(std::pair<std::string, std::string>(name, val));
125 }
126
127 void set_send_length(size_t len) {
128 send_len = len;
129 has_send_len = true;
130 }
131
132 void set_send_data_hint(bool hint) {
133 send_data_hint = hint;
134 }
135
136 long get_http_status() const {
137 return http_status;
138 }
139
140 void set_http_status(long _http_status) {
141 http_status = _http_status;
142 }
143
144 void set_verify_ssl(bool flag) {
145 verify_ssl = flag;
146 }
147
148 // set request timeout in seconds
149 // zero (default) mean that request will never timeout
150 void set_req_timeout(long timeout) {
151 req_timeout = timeout;
152 }
153
154 int process(optional_yield y);
155
156 int wait(optional_yield y);
157 void cancel();
158 bool is_done();
159
160 rgw_http_req_data *get_req_data() { return req_data; }
161
162 std::string to_str();
163
164 int get_req_retcode();
165
166 void set_url(const std::string& _url) {
167 url = _url;
168 }
169
170 void set_method(const std::string& _method) {
171 method = _method;
172 }
173
174 void set_io_user_info(void *_user_info) override {
175 user_info = _user_info;
176 }
177
178 void *get_io_user_info() override {
179 return user_info;
180 }
181
182 void set_ca_path(const std::string& _ca_path) {
183 ca_path = _ca_path;
184 }
185
186 void set_client_cert(const std::string& _client_cert) {
187 client_cert = _client_cert;
188 }
189
190 void set_client_key(const std::string& _client_key) {
191 client_key = _client_key;
192 }
193 };
194
195
196 class RGWHTTPHeadersCollector : public RGWHTTPClient {
197 public:
198 typedef std::string header_name_t;
199 typedef std::string header_value_t;
200 typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
201
202 RGWHTTPHeadersCollector(CephContext * const cct,
203 const std::string& method,
204 const std::string& url,
205 const header_spec_t &relevant_headers)
206 : RGWHTTPClient(cct, method, url),
207 relevant_headers(relevant_headers) {
208 }
209
210 std::map<header_name_t, header_value_t, ltstr_nocase> get_headers() const {
211 return found_headers;
212 }
213
214 /* Throws std::out_of_range */
215 const header_value_t& get_header_value(const header_name_t& name) const {
216 return found_headers.at(name);
217 }
218
219 protected:
220 int receive_header(void *ptr, size_t len) override;
221
222 private:
223 const std::set<header_name_t, ltstr_nocase> relevant_headers;
224 std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
225 };
226
227
228 class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
229 bufferlist * const read_bl;
230 std::string post_data;
231 size_t post_data_index;
232
233 public:
234 RGWHTTPTransceiver(CephContext * const cct,
235 const std::string& method,
236 const std::string& url,
237 bufferlist * const read_bl,
238 const header_spec_t intercept_headers = {})
239 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
240 read_bl(read_bl),
241 post_data_index(0) {
242 }
243
244 RGWHTTPTransceiver(CephContext * const cct,
245 const std::string& method,
246 const std::string& url,
247 bufferlist * const read_bl,
248 const bool verify_ssl,
249 const header_spec_t intercept_headers = {})
250 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
251 read_bl(read_bl),
252 post_data_index(0) {
253 set_verify_ssl(verify_ssl);
254 }
255
256 void set_post_data(const std::string& _post_data) {
257 this->post_data = _post_data;
258 }
259
260 protected:
261 int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
262
263 int receive_data(void *ptr, size_t len, bool *pause) override {
264 read_bl->append((char *)ptr, len);
265 return 0;
266 }
267 };
268
269 typedef RGWHTTPTransceiver RGWPostHTTPData;
270
271
272 class RGWCompletionManager;
273
274 enum RGWHTTPRequestSetState {
275 SET_NOP = 0,
276 SET_WRITE_PAUSED = 1,
277 SET_WRITE_RESUME = 2,
278 SET_READ_PAUSED = 3,
279 SET_READ_RESUME = 4,
280 };
281
282 class RGWHTTPManager {
283 struct set_state {
284 rgw_http_req_data *req;
285 int bitmask;
286
287 set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
288 };
289 CephContext *cct;
290 RGWCompletionManager *completion_mgr;
291 void *multi_handle;
292 bool is_started = false;
293 std::atomic<unsigned> going_down { 0 };
294 std::atomic<unsigned> is_stopped { 0 };
295
296 ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
297 std::map<uint64_t, rgw_http_req_data *> reqs;
298 std::list<rgw_http_req_data *> unregistered_reqs;
299 std::list<set_state> reqs_change_state;
300 std::map<uint64_t, rgw_http_req_data *> complete_reqs;
301 int64_t num_reqs = 0;
302 int64_t max_threaded_req = 0;
303 int thread_pipe[2];
304
305 void register_request(rgw_http_req_data *req_data);
306 void complete_request(rgw_http_req_data *req_data);
307 void _complete_request(rgw_http_req_data *req_data);
308 bool unregister_request(rgw_http_req_data *req_data);
309 void _unlink_request(rgw_http_req_data *req_data);
310 void unlink_request(rgw_http_req_data *req_data);
311 void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
312 void _finish_request(rgw_http_req_data *req_data, int r);
313 void _set_req_state(set_state& ss);
314 int link_request(rgw_http_req_data *req_data);
315
316 void manage_pending_requests();
317
318 class ReqsThread : public Thread {
319 RGWHTTPManager *manager;
320
321 public:
322 explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
323 void *entry() override;
324 };
325
326 ReqsThread *reqs_thread = nullptr;
327
328 void *reqs_thread_entry();
329
330 int signal_thread();
331
332 public:
333 RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
334 ~RGWHTTPManager();
335
336 int start();
337 void stop();
338
339 int add_request(RGWHTTPClient *client);
340 int remove_request(RGWHTTPClient *client);
341 int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
342 };
343
344 class RGWHTTP
345 {
346 public:
347 static int send(RGWHTTPClient *req);
348 static int process(RGWHTTPClient *req, optional_yield y);
349 };
350 #endif