]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_http_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
6
7 #include "common/async/yield_context.h"
8 #include "common/RWLock.h"
9 #include "common/Cond.h"
10 #include "rgw_common.h"
11 #include "rgw_string.h"
12 #include "rgw_http_client_types.h"
13
14 #include <atomic>
15
16 using param_pair_t = pair<string, string>;
17 using param_vec_t = vector<param_pair_t>;
18
19 void rgw_http_client_init(CephContext *cct);
20 void rgw_http_client_cleanup();
21
22 struct rgw_http_req_data;
23 class RGWHTTPManager;
24
25 class RGWHTTPClient : public RGWIOProvider
26 {
27 friend class RGWHTTPManager;
28
29 bufferlist send_bl;
30 bufferlist::iterator send_iter;
31 bool has_send_len;
32 long http_status;
33 bool send_data_hint{false};
34 size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
36
37 void *user_info{nullptr};
38
39 rgw_http_req_data *req_data;
40
41 bool verify_ssl; // Do not validate self signed certificates, default to false
42
43 std::atomic<unsigned> stopped { 0 };
44
45
46 protected:
47 CephContext *cct;
48
49 string method;
50 string url;
51
52 size_t send_len{0};
53
54 param_vec_t headers;
55
56 long req_timeout{0L};
57
58 RGWHTTPManager *get_manager();
59
60 int init_request(rgw_http_req_data *req_data);
61
62 virtual int receive_header(void *ptr, size_t len) {
63 return 0;
64 }
65 virtual int receive_data(void *ptr, size_t len, bool *pause) {
66 return 0;
67 }
68
69 virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
70 return 0;
71 }
72
73 /* Callbacks for libcurl. */
74 static size_t receive_http_header(void *ptr,
75 size_t size,
76 size_t nmemb,
77 void *_info);
78
79 static size_t receive_http_data(void *ptr,
80 size_t size,
81 size_t nmemb,
82 void *_info);
83
84 static size_t send_http_data(void *ptr,
85 size_t size,
86 size_t nmemb,
87 void *_info);
88
89 ceph::mutex& get_req_lock();
90
91 /* needs to be called under req_lock() */
92 void _set_write_paused(bool pause);
93 void _set_read_paused(bool pause);
94 public:
95 static const long HTTP_STATUS_NOSTATUS = 0;
96 static const long HTTP_STATUS_UNAUTHORIZED = 401;
97 static const long HTTP_STATUS_NOTFOUND = 404;
98
99 static constexpr int HTTPCLIENT_IO_READ = 0x1;
100 static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
101 static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
102
103 virtual ~RGWHTTPClient();
104 explicit RGWHTTPClient(CephContext *cct,
105 const string& _method,
106 const string& _url)
107 : has_send_len(false),
108 http_status(HTTP_STATUS_NOSTATUS),
109 req_data(nullptr),
110 verify_ssl(cct->_conf->rgw_verify_ssl),
111 cct(cct),
112 method(_method),
113 url(_url) {
114 }
115
116 void append_header(const string& name, const string& val) {
117 headers.push_back(pair<string, string>(name, val));
118 }
119
120 void set_send_length(size_t len) {
121 send_len = len;
122 has_send_len = true;
123 }
124
125 void set_send_data_hint(bool hint) {
126 send_data_hint = hint;
127 }
128
129 long get_http_status() const {
130 return http_status;
131 }
132
133 void set_http_status(long _http_status) {
134 http_status = _http_status;
135 }
136
137 void set_verify_ssl(bool flag) {
138 verify_ssl = flag;
139 }
140
141 // set request timeout in seconds
142 // zero (default) mean that request will never timeout
143 void set_req_timeout(long timeout) {
144 req_timeout = timeout;
145 }
146
147 int process(optional_yield y);
148
149 int wait(optional_yield y);
150 void cancel();
151 bool is_done();
152
153 rgw_http_req_data *get_req_data() { return req_data; }
154
155 string to_str();
156
157 int get_req_retcode();
158
159 void set_url(const string& _url) {
160 url = _url;
161 }
162
163 void set_method(const string& _method) {
164 method = _method;
165 }
166
167 void set_io_user_info(void *_user_info) override {
168 user_info = _user_info;
169 }
170
171 void *get_io_user_info() override {
172 return user_info;
173 }
174 };
175
176
177 class RGWHTTPHeadersCollector : public RGWHTTPClient {
178 public:
179 typedef std::string header_name_t;
180 typedef std::string header_value_t;
181 typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
182
183 RGWHTTPHeadersCollector(CephContext * const cct,
184 const string& method,
185 const string& url,
186 const header_spec_t &relevant_headers)
187 : RGWHTTPClient(cct, method, url),
188 relevant_headers(relevant_headers) {
189 }
190
191 std::map<header_name_t, header_value_t, ltstr_nocase> get_headers() const {
192 return found_headers;
193 }
194
195 /* Throws std::out_of_range */
196 const header_value_t& get_header_value(const header_name_t& name) const {
197 return found_headers.at(name);
198 }
199
200 protected:
201 int receive_header(void *ptr, size_t len) override;
202
203 private:
204 const std::set<header_name_t, ltstr_nocase> relevant_headers;
205 std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
206 };
207
208
209 class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
210 bufferlist * const read_bl;
211 std::string post_data;
212 size_t post_data_index;
213
214 public:
215 RGWHTTPTransceiver(CephContext * const cct,
216 const string& method,
217 const string& url,
218 bufferlist * const read_bl,
219 const header_spec_t intercept_headers = {})
220 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
221 read_bl(read_bl),
222 post_data_index(0) {
223 }
224
225 RGWHTTPTransceiver(CephContext * const cct,
226 const string& method,
227 const string& url,
228 bufferlist * const read_bl,
229 const bool verify_ssl,
230 const header_spec_t intercept_headers = {})
231 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
232 read_bl(read_bl),
233 post_data_index(0) {
234 set_verify_ssl(verify_ssl);
235 }
236
237 void set_post_data(const std::string& _post_data) {
238 this->post_data = _post_data;
239 }
240
241 protected:
242 int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
243
244 int receive_data(void *ptr, size_t len, bool *pause) override {
245 read_bl->append((char *)ptr, len);
246 return 0;
247 }
248 };
249
250 typedef RGWHTTPTransceiver RGWPostHTTPData;
251
252
253 class RGWCompletionManager;
254
255 enum RGWHTTPRequestSetState {
256 SET_NOP = 0,
257 SET_WRITE_PAUSED = 1,
258 SET_WRITE_RESUME = 2,
259 SET_READ_PAUSED = 3,
260 SET_READ_RESUME = 4,
261 };
262
263 class RGWHTTPManager {
264 struct set_state {
265 rgw_http_req_data *req;
266 int bitmask;
267
268 set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
269 };
270 CephContext *cct;
271 RGWCompletionManager *completion_mgr;
272 void *multi_handle;
273 bool is_started = false;
274 std::atomic<unsigned> going_down { 0 };
275 std::atomic<unsigned> is_stopped { 0 };
276
277 ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
278 map<uint64_t, rgw_http_req_data *> reqs;
279 list<rgw_http_req_data *> unregistered_reqs;
280 list<set_state> reqs_change_state;
281 map<uint64_t, rgw_http_req_data *> complete_reqs;
282 int64_t num_reqs = 0;
283 int64_t max_threaded_req = 0;
284 int thread_pipe[2];
285
286 void register_request(rgw_http_req_data *req_data);
287 void complete_request(rgw_http_req_data *req_data);
288 void _complete_request(rgw_http_req_data *req_data);
289 bool unregister_request(rgw_http_req_data *req_data);
290 void _unlink_request(rgw_http_req_data *req_data);
291 void unlink_request(rgw_http_req_data *req_data);
292 void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
293 void _finish_request(rgw_http_req_data *req_data, int r);
294 void _set_req_state(set_state& ss);
295 int link_request(rgw_http_req_data *req_data);
296
297 void manage_pending_requests();
298
299 class ReqsThread : public Thread {
300 RGWHTTPManager *manager;
301
302 public:
303 explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
304 void *entry() override;
305 };
306
307 ReqsThread *reqs_thread = nullptr;
308
309 void *reqs_thread_entry();
310
311 int signal_thread();
312
313 public:
314 RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
315 ~RGWHTTPManager();
316
317 int start();
318 void stop();
319
320 int add_request(RGWHTTPClient *client);
321 int remove_request(RGWHTTPClient *client);
322 int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
323 };
324
325 class RGWHTTP
326 {
327 public:
328 static int send(RGWHTTPClient *req);
329 static int process(RGWHTTPClient *req, optional_yield y);
330 };
331 #endif