]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.h
import ceph 16.2.6
[ceph.git] / ceph / src / rgw / rgw_http_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
6
7 #include "common/async/yield_context.h"
8 #include "common/RWLock.h"
9 #include "common/Cond.h"
10 #include "rgw_common.h"
11 #include "rgw_string.h"
12 #include "rgw_http_client_types.h"
13
14 #include <atomic>
15
16 using param_pair_t = pair<string, string>;
17 using param_vec_t = vector<param_pair_t>;
18
19 void rgw_http_client_init(CephContext *cct);
20 void rgw_http_client_cleanup();
21
22 struct rgw_http_req_data;
23 class RGWHTTPManager;
24
25 class RGWHTTPClient : public RGWIOProvider
26 {
27 friend class RGWHTTPManager;
28
29 bufferlist send_bl;
30 bufferlist::iterator send_iter;
31 bool has_send_len;
32 long http_status;
33 bool send_data_hint{false};
34 size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
36
37 void *user_info{nullptr};
38
39 rgw_http_req_data *req_data;
40
41 bool verify_ssl; // Do not validate self signed certificates, default to false
42
43 string ca_path;
44
45 string client_cert;
46
47 string client_key;
48
49 std::atomic<unsigned> stopped { 0 };
50
51
52 protected:
53 CephContext *cct;
54
55 string method;
56 string url;
57
58 size_t send_len{0};
59
60 param_vec_t headers;
61
62 long req_timeout{0L};
63
64 RGWHTTPManager *get_manager();
65
66 int init_request(rgw_http_req_data *req_data);
67
68 virtual int receive_header(void *ptr, size_t len) {
69 return 0;
70 }
71 virtual int receive_data(void *ptr, size_t len, bool *pause) {
72 return 0;
73 }
74
75 virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
76 return 0;
77 }
78
79 /* Callbacks for libcurl. */
80 static size_t receive_http_header(void *ptr,
81 size_t size,
82 size_t nmemb,
83 void *_info);
84
85 static size_t receive_http_data(void *ptr,
86 size_t size,
87 size_t nmemb,
88 void *_info);
89
90 static size_t send_http_data(void *ptr,
91 size_t size,
92 size_t nmemb,
93 void *_info);
94
95 ceph::mutex& get_req_lock();
96
97 /* needs to be called under req_lock() */
98 void _set_write_paused(bool pause);
99 void _set_read_paused(bool pause);
100 public:
101 static const long HTTP_STATUS_NOSTATUS = 0;
102 static const long HTTP_STATUS_UNAUTHORIZED = 401;
103 static const long HTTP_STATUS_NOTFOUND = 404;
104
105 static constexpr int HTTPCLIENT_IO_READ = 0x1;
106 static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
107 static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
108
109 virtual ~RGWHTTPClient();
110 explicit RGWHTTPClient(CephContext *cct,
111 const string& _method,
112 const string& _url)
113 : has_send_len(false),
114 http_status(HTTP_STATUS_NOSTATUS),
115 req_data(nullptr),
116 verify_ssl(cct->_conf->rgw_verify_ssl),
117 cct(cct),
118 method(_method),
119 url(_url) {
120 }
121
122 void append_header(const string& name, const string& val) {
123 headers.push_back(pair<string, string>(name, val));
124 }
125
126 void set_send_length(size_t len) {
127 send_len = len;
128 has_send_len = true;
129 }
130
131 void set_send_data_hint(bool hint) {
132 send_data_hint = hint;
133 }
134
135 long get_http_status() const {
136 return http_status;
137 }
138
139 void set_http_status(long _http_status) {
140 http_status = _http_status;
141 }
142
143 void set_verify_ssl(bool flag) {
144 verify_ssl = flag;
145 }
146
147 // set request timeout in seconds
148 // zero (default) mean that request will never timeout
149 void set_req_timeout(long timeout) {
150 req_timeout = timeout;
151 }
152
153 int process(optional_yield y);
154
155 int wait(optional_yield y);
156 void cancel();
157 bool is_done();
158
159 rgw_http_req_data *get_req_data() { return req_data; }
160
161 string to_str();
162
163 int get_req_retcode();
164
165 void set_url(const string& _url) {
166 url = _url;
167 }
168
169 void set_method(const string& _method) {
170 method = _method;
171 }
172
173 void set_io_user_info(void *_user_info) override {
174 user_info = _user_info;
175 }
176
177 void *get_io_user_info() override {
178 return user_info;
179 }
180
181 void set_ca_path(const string& _ca_path) {
182 ca_path = _ca_path;
183 }
184
185 void set_client_cert(const string& _client_cert) {
186 client_cert = _client_cert;
187 }
188
189 void set_client_key(const string& _client_key) {
190 client_key = _client_key;
191 }
192 };
193
194
195 class RGWHTTPHeadersCollector : public RGWHTTPClient {
196 public:
197 typedef std::string header_name_t;
198 typedef std::string header_value_t;
199 typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
200
201 RGWHTTPHeadersCollector(CephContext * const cct,
202 const string& method,
203 const string& url,
204 const header_spec_t &relevant_headers)
205 : RGWHTTPClient(cct, method, url),
206 relevant_headers(relevant_headers) {
207 }
208
209 std::map<header_name_t, header_value_t, ltstr_nocase> get_headers() const {
210 return found_headers;
211 }
212
213 /* Throws std::out_of_range */
214 const header_value_t& get_header_value(const header_name_t& name) const {
215 return found_headers.at(name);
216 }
217
218 protected:
219 int receive_header(void *ptr, size_t len) override;
220
221 private:
222 const std::set<header_name_t, ltstr_nocase> relevant_headers;
223 std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
224 };
225
226
227 class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
228 bufferlist * const read_bl;
229 std::string post_data;
230 size_t post_data_index;
231
232 public:
233 RGWHTTPTransceiver(CephContext * const cct,
234 const string& method,
235 const string& url,
236 bufferlist * const read_bl,
237 const header_spec_t intercept_headers = {})
238 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
239 read_bl(read_bl),
240 post_data_index(0) {
241 }
242
243 RGWHTTPTransceiver(CephContext * const cct,
244 const string& method,
245 const string& url,
246 bufferlist * const read_bl,
247 const bool verify_ssl,
248 const header_spec_t intercept_headers = {})
249 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
250 read_bl(read_bl),
251 post_data_index(0) {
252 set_verify_ssl(verify_ssl);
253 }
254
255 void set_post_data(const std::string& _post_data) {
256 this->post_data = _post_data;
257 }
258
259 protected:
260 int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
261
262 int receive_data(void *ptr, size_t len, bool *pause) override {
263 read_bl->append((char *)ptr, len);
264 return 0;
265 }
266 };
267
268 typedef RGWHTTPTransceiver RGWPostHTTPData;
269
270
271 class RGWCompletionManager;
272
273 enum RGWHTTPRequestSetState {
274 SET_NOP = 0,
275 SET_WRITE_PAUSED = 1,
276 SET_WRITE_RESUME = 2,
277 SET_READ_PAUSED = 3,
278 SET_READ_RESUME = 4,
279 };
280
281 class RGWHTTPManager {
282 struct set_state {
283 rgw_http_req_data *req;
284 int bitmask;
285
286 set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
287 };
288 CephContext *cct;
289 RGWCompletionManager *completion_mgr;
290 void *multi_handle;
291 bool is_started = false;
292 std::atomic<unsigned> going_down { 0 };
293 std::atomic<unsigned> is_stopped { 0 };
294
295 ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
296 map<uint64_t, rgw_http_req_data *> reqs;
297 list<rgw_http_req_data *> unregistered_reqs;
298 list<set_state> reqs_change_state;
299 map<uint64_t, rgw_http_req_data *> complete_reqs;
300 int64_t num_reqs = 0;
301 int64_t max_threaded_req = 0;
302 int thread_pipe[2];
303
304 void register_request(rgw_http_req_data *req_data);
305 void complete_request(rgw_http_req_data *req_data);
306 void _complete_request(rgw_http_req_data *req_data);
307 bool unregister_request(rgw_http_req_data *req_data);
308 void _unlink_request(rgw_http_req_data *req_data);
309 void unlink_request(rgw_http_req_data *req_data);
310 void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
311 void _finish_request(rgw_http_req_data *req_data, int r);
312 void _set_req_state(set_state& ss);
313 int link_request(rgw_http_req_data *req_data);
314
315 void manage_pending_requests();
316
317 class ReqsThread : public Thread {
318 RGWHTTPManager *manager;
319
320 public:
321 explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
322 void *entry() override;
323 };
324
325 ReqsThread *reqs_thread = nullptr;
326
327 void *reqs_thread_entry();
328
329 int signal_thread();
330
331 public:
332 RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
333 ~RGWHTTPManager();
334
335 int start();
336 void stop();
337
338 int add_request(RGWHTTPClient *client);
339 int remove_request(RGWHTTPClient *client);
340 int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
341 };
342
343 class RGWHTTP
344 {
345 public:
346 static int send(RGWHTTPClient *req);
347 static int process(RGWHTTPClient *req, optional_yield y);
348 };
349 #endif