]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_http_client.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_HTTP_CLIENT_H
5 #define CEPH_RGW_HTTP_CLIENT_H
6
7 #include "common/async/yield_context.h"
8 #include "common/RWLock.h"
9 #include "common/Cond.h"
10 #include "rgw_common.h"
11 #include "rgw_string.h"
12 #include "rgw_http_client_types.h"
13
14 #include <atomic>
15
16 using param_pair_t = pair<string, string>;
17 using param_vec_t = vector<param_pair_t>;
18
19 void rgw_http_client_init(CephContext *cct);
20 void rgw_http_client_cleanup();
21
22 struct rgw_http_req_data;
23 class RGWHTTPManager;
24
25 class RGWHTTPClient : public RGWIOProvider
26 {
27 friend class RGWHTTPManager;
28
29 bufferlist send_bl;
30 bufferlist::iterator send_iter;
31 bool has_send_len;
32 long http_status;
33 bool send_data_hint{false};
34 size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
36
37 void *user_info{nullptr};
38
39 rgw_http_req_data *req_data;
40
41 bool verify_ssl; // Do not validate self signed certificates, default to false
42
43 std::atomic<unsigned> stopped { 0 };
44
45
46 protected:
47 CephContext *cct;
48
49 string method;
50 string url;
51
52 size_t send_len{0};
53
54 param_vec_t headers;
55
56 RGWHTTPManager *get_manager();
57
58 int init_request(rgw_http_req_data *req_data);
59
60 virtual int receive_header(void *ptr, size_t len) {
61 return 0;
62 }
63 virtual int receive_data(void *ptr, size_t len, bool *pause) {
64 return 0;
65 }
66
67 virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
68 return 0;
69 }
70
71 /* Callbacks for libcurl. */
72 static size_t receive_http_header(void *ptr,
73 size_t size,
74 size_t nmemb,
75 void *_info);
76
77 static size_t receive_http_data(void *ptr,
78 size_t size,
79 size_t nmemb,
80 void *_info);
81
82 static size_t send_http_data(void *ptr,
83 size_t size,
84 size_t nmemb,
85 void *_info);
86
87 ceph::mutex& get_req_lock();
88
89 /* needs to be called under req_lock() */
90 void _set_write_paused(bool pause);
91 void _set_read_paused(bool pause);
92 public:
93 static const long HTTP_STATUS_NOSTATUS = 0;
94 static const long HTTP_STATUS_UNAUTHORIZED = 401;
95 static const long HTTP_STATUS_NOTFOUND = 404;
96
97 static constexpr int HTTPCLIENT_IO_READ = 0x1;
98 static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
99 static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
100
101 virtual ~RGWHTTPClient();
102 explicit RGWHTTPClient(CephContext *cct,
103 const string& _method,
104 const string& _url)
105 : has_send_len(false),
106 http_status(HTTP_STATUS_NOSTATUS),
107 req_data(nullptr),
108 verify_ssl(cct->_conf->rgw_verify_ssl),
109 cct(cct),
110 method(_method),
111 url(_url) {
112 }
113
114 void append_header(const string& name, const string& val) {
115 headers.push_back(pair<string, string>(name, val));
116 }
117
118 void set_send_length(size_t len) {
119 send_len = len;
120 has_send_len = true;
121 }
122
123 void set_send_data_hint(bool hint) {
124 send_data_hint = hint;
125 }
126
127 long get_http_status() const {
128 return http_status;
129 }
130
131 void set_http_status(long _http_status) {
132 http_status = _http_status;
133 }
134
135 void set_verify_ssl(bool flag) {
136 verify_ssl = flag;
137 }
138
139 int process(optional_yield y);
140
141 int wait(optional_yield y);
142 void cancel();
143 bool is_done();
144
145 rgw_http_req_data *get_req_data() { return req_data; }
146
147 string to_str();
148
149 int get_req_retcode();
150
151 void set_url(const string& _url) {
152 url = _url;
153 }
154
155 void set_method(const string& _method) {
156 method = _method;
157 }
158
159 void set_io_user_info(void *_user_info) override {
160 user_info = _user_info;
161 }
162
163 void *get_io_user_info() override {
164 return user_info;
165 }
166 };
167
168
169 class RGWHTTPHeadersCollector : public RGWHTTPClient {
170 public:
171 typedef std::string header_name_t;
172 typedef std::string header_value_t;
173 typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
174
175 RGWHTTPHeadersCollector(CephContext * const cct,
176 const string& method,
177 const string& url,
178 const header_spec_t &relevant_headers)
179 : RGWHTTPClient(cct, method, url),
180 relevant_headers(relevant_headers) {
181 }
182
183 std::map<header_name_t, header_value_t, ltstr_nocase> get_headers() const {
184 return found_headers;
185 }
186
187 /* Throws std::out_of_range */
188 const header_value_t& get_header_value(const header_name_t& name) const {
189 return found_headers.at(name);
190 }
191
192 protected:
193 int receive_header(void *ptr, size_t len) override;
194
195 private:
196 const std::set<header_name_t, ltstr_nocase> relevant_headers;
197 std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
198 };
199
200
201 class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
202 bufferlist * const read_bl;
203 std::string post_data;
204 size_t post_data_index;
205
206 public:
207 RGWHTTPTransceiver(CephContext * const cct,
208 const string& method,
209 const string& url,
210 bufferlist * const read_bl,
211 const header_spec_t intercept_headers = {})
212 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
213 read_bl(read_bl),
214 post_data_index(0) {
215 }
216
217 RGWHTTPTransceiver(CephContext * const cct,
218 const string& method,
219 const string& url,
220 bufferlist * const read_bl,
221 const bool verify_ssl,
222 const header_spec_t intercept_headers = {})
223 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
224 read_bl(read_bl),
225 post_data_index(0) {
226 set_verify_ssl(verify_ssl);
227 }
228
229 void set_post_data(const std::string& _post_data) {
230 this->post_data = _post_data;
231 }
232
233 protected:
234 int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
235
236 int receive_data(void *ptr, size_t len, bool *pause) override {
237 read_bl->append((char *)ptr, len);
238 return 0;
239 }
240 };
241
242 typedef RGWHTTPTransceiver RGWPostHTTPData;
243
244
245 class RGWCompletionManager;
246
247 enum RGWHTTPRequestSetState {
248 SET_NOP = 0,
249 SET_WRITE_PAUSED = 1,
250 SET_WRITE_RESUME = 2,
251 SET_READ_PAUSED = 3,
252 SET_READ_RESUME = 4,
253 };
254
255 class RGWHTTPManager {
256 struct set_state {
257 rgw_http_req_data *req;
258 int bitmask;
259
260 set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
261 };
262 CephContext *cct;
263 RGWCompletionManager *completion_mgr;
264 void *multi_handle;
265 bool is_started = false;
266 std::atomic<unsigned> going_down { 0 };
267 std::atomic<unsigned> is_stopped { 0 };
268
269 ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
270 map<uint64_t, rgw_http_req_data *> reqs;
271 list<rgw_http_req_data *> unregistered_reqs;
272 list<set_state> reqs_change_state;
273 map<uint64_t, rgw_http_req_data *> complete_reqs;
274 int64_t num_reqs = 0;
275 int64_t max_threaded_req = 0;
276 int thread_pipe[2];
277
278 void register_request(rgw_http_req_data *req_data);
279 void complete_request(rgw_http_req_data *req_data);
280 void _complete_request(rgw_http_req_data *req_data);
281 bool unregister_request(rgw_http_req_data *req_data);
282 void _unlink_request(rgw_http_req_data *req_data);
283 void unlink_request(rgw_http_req_data *req_data);
284 void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
285 void _finish_request(rgw_http_req_data *req_data, int r);
286 void _set_req_state(set_state& ss);
287 int link_request(rgw_http_req_data *req_data);
288
289 void manage_pending_requests();
290
291 class ReqsThread : public Thread {
292 RGWHTTPManager *manager;
293
294 public:
295 explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
296 void *entry() override;
297 };
298
299 ReqsThread *reqs_thread = nullptr;
300
301 void *reqs_thread_entry();
302
303 int signal_thread();
304
305 public:
306 RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
307 ~RGWHTTPManager();
308
309 int start();
310 void stop();
311
312 int add_request(RGWHTTPClient *client);
313 int remove_request(RGWHTTPClient *client);
314 int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
315 };
316
317 class RGWHTTP
318 {
319 public:
320 static int send(RGWHTTPClient *req);
321 static int process(RGWHTTPClient *req, optional_yield y);
322 };
323 #endif