]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_http_client.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_http_client.h
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#ifndef CEPH_RGW_HTTP_CLIENT_H
5#define CEPH_RGW_HTTP_CLIENT_H
6
eafe8130 7#include "common/async/yield_context.h"
7c673cae
FG
8#include "common/RWLock.h"
9#include "common/Cond.h"
10#include "rgw_common.h"
11#include "rgw_string.h"
9f95a23c 12#include "rgw_http_client_types.h"
7c673cae
FG
13
14#include <atomic>
15
16using param_pair_t = pair<string, string>;
17using param_vec_t = vector<param_pair_t>;
18
11fdf7f2
TL
19void rgw_http_client_init(CephContext *cct);
20void rgw_http_client_cleanup();
21
7c673cae 22struct rgw_http_req_data;
11fdf7f2
TL
23class RGWHTTPManager;
24
11fdf7f2 25class RGWHTTPClient : public RGWIOProvider
7c673cae
FG
26{
27 friend class RGWHTTPManager;
28
29 bufferlist send_bl;
30 bufferlist::iterator send_iter;
7c673cae
FG
31 bool has_send_len;
32 long http_status;
11fdf7f2
TL
33 bool send_data_hint{false};
34 size_t receive_pause_skip{0}; /* how many bytes to skip next time receive_data is called
35 due to being paused */
7c673cae 36
11fdf7f2 37 void *user_info{nullptr};
7c673cae 38
11fdf7f2 39 rgw_http_req_data *req_data;
7c673cae 40
7c673cae
FG
41 bool verify_ssl; // Do not validate self signed certificates, default to false
42
43 std::atomic<unsigned> stopped { 0 };
44
11fdf7f2 45
7c673cae
FG
46protected:
47 CephContext *cct;
11fdf7f2
TL
48
49 string method;
50 string url;
51
52 size_t send_len{0};
53
7c673cae
FG
54 param_vec_t headers;
55
11fdf7f2
TL
56 RGWHTTPManager *get_manager();
57
58 int init_request(rgw_http_req_data *req_data);
7c673cae
FG
59
60 virtual int receive_header(void *ptr, size_t len) {
61 return 0;
62 }
11fdf7f2 63 virtual int receive_data(void *ptr, size_t len, bool *pause) {
7c673cae
FG
64 return 0;
65 }
11fdf7f2
TL
66
67 virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
7c673cae
FG
68 return 0;
69 }
70
71 /* Callbacks for libcurl. */
7c673cae
FG
72 static size_t receive_http_header(void *ptr,
73 size_t size,
74 size_t nmemb,
75 void *_info);
76
7c673cae
FG
77 static size_t receive_http_data(void *ptr,
78 size_t size,
79 size_t nmemb,
80 void *_info);
81
7c673cae
FG
82 static size_t send_http_data(void *ptr,
83 size_t size,
84 size_t nmemb,
85 void *_info);
11fdf7f2 86
9f95a23c 87 ceph::mutex& get_req_lock();
11fdf7f2
TL
88
89 /* needs to be called under req_lock() */
90 void _set_write_paused(bool pause);
91 void _set_read_paused(bool pause);
7c673cae
FG
92public:
93 static const long HTTP_STATUS_NOSTATUS = 0;
94 static const long HTTP_STATUS_UNAUTHORIZED = 401;
95 static const long HTTP_STATUS_NOTFOUND = 404;
96
11fdf7f2
TL
97 static constexpr int HTTPCLIENT_IO_READ = 0x1;
98 static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
99 static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
100
7c673cae 101 virtual ~RGWHTTPClient();
11fdf7f2
TL
102 explicit RGWHTTPClient(CephContext *cct,
103 const string& _method,
104 const string& _url)
105 : has_send_len(false),
7c673cae
FG
106 http_status(HTTP_STATUS_NOSTATUS),
107 req_data(nullptr),
31f18b77 108 verify_ssl(cct->_conf->rgw_verify_ssl),
11fdf7f2
TL
109 cct(cct),
110 method(_method),
111 url(_url) {
7c673cae
FG
112 }
113
114 void append_header(const string& name, const string& val) {
115 headers.push_back(pair<string, string>(name, val));
116 }
117
118 void set_send_length(size_t len) {
119 send_len = len;
120 has_send_len = true;
121 }
122
11fdf7f2
TL
123 void set_send_data_hint(bool hint) {
124 send_data_hint = hint;
125 }
7c673cae
FG
126
127 long get_http_status() const {
128 return http_status;
129 }
130
9f95a23c
TL
131 void set_http_status(long _http_status) {
132 http_status = _http_status;
133 }
134
7c673cae
FG
135 void set_verify_ssl(bool flag) {
136 verify_ssl = flag;
137 }
138
9f95a23c 139 int process(optional_yield y);
7c673cae 140
9f95a23c 141 int wait(optional_yield y);
11fdf7f2
TL
142 void cancel();
143 bool is_done();
144
7c673cae
FG
145 rgw_http_req_data *get_req_data() { return req_data; }
146
147 string to_str();
148
149 int get_req_retcode();
11fdf7f2
TL
150
151 void set_url(const string& _url) {
152 url = _url;
153 }
154
155 void set_method(const string& _method) {
156 method = _method;
157 }
158
159 void set_io_user_info(void *_user_info) override {
160 user_info = _user_info;
161 }
162
163 void *get_io_user_info() override {
164 return user_info;
165 }
7c673cae
FG
166};
167
168
169class RGWHTTPHeadersCollector : public RGWHTTPClient {
170public:
171 typedef std::string header_name_t;
172 typedef std::string header_value_t;
173 typedef std::set<header_name_t, ltstr_nocase> header_spec_t;
174
175 RGWHTTPHeadersCollector(CephContext * const cct,
11fdf7f2
TL
176 const string& method,
177 const string& url,
178 const header_spec_t &relevant_headers)
179 : RGWHTTPClient(cct, method, url),
7c673cae
FG
180 relevant_headers(relevant_headers) {
181 }
182
183 std::map<header_name_t, header_value_t, ltstr_nocase> get_headers() const {
184 return found_headers;
185 }
186
187 /* Throws std::out_of_range */
188 const header_value_t& get_header_value(const header_name_t& name) const {
189 return found_headers.at(name);
190 }
191
192protected:
193 int receive_header(void *ptr, size_t len) override;
194
7c673cae
FG
195private:
196 const std::set<header_name_t, ltstr_nocase> relevant_headers;
197 std::map<header_name_t, header_value_t, ltstr_nocase> found_headers;
198};
199
200
201class RGWHTTPTransceiver : public RGWHTTPHeadersCollector {
202 bufferlist * const read_bl;
203 std::string post_data;
204 size_t post_data_index;
205
206public:
207 RGWHTTPTransceiver(CephContext * const cct,
11fdf7f2
TL
208 const string& method,
209 const string& url,
7c673cae
FG
210 bufferlist * const read_bl,
211 const header_spec_t intercept_headers = {})
11fdf7f2 212 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
7c673cae
FG
213 read_bl(read_bl),
214 post_data_index(0) {
215 }
216
217 RGWHTTPTransceiver(CephContext * const cct,
11fdf7f2
TL
218 const string& method,
219 const string& url,
7c673cae
FG
220 bufferlist * const read_bl,
221 const bool verify_ssl,
222 const header_spec_t intercept_headers = {})
11fdf7f2 223 : RGWHTTPHeadersCollector(cct, method, url, intercept_headers),
7c673cae
FG
224 read_bl(read_bl),
225 post_data_index(0) {
226 set_verify_ssl(verify_ssl);
227 }
228
229 void set_post_data(const std::string& _post_data) {
230 this->post_data = _post_data;
231 }
232
233protected:
11fdf7f2 234 int send_data(void* ptr, size_t len, bool *pause=nullptr) override;
7c673cae 235
11fdf7f2 236 int receive_data(void *ptr, size_t len, bool *pause) override {
7c673cae
FG
237 read_bl->append((char *)ptr, len);
238 return 0;
239 }
240};
241
242typedef RGWHTTPTransceiver RGWPostHTTPData;
243
244
245class RGWCompletionManager;
246
11fdf7f2
TL
247enum RGWHTTPRequestSetState {
248 SET_NOP = 0,
249 SET_WRITE_PAUSED = 1,
250 SET_WRITE_RESUME = 2,
251 SET_READ_PAUSED = 3,
252 SET_READ_RESUME = 4,
253};
254
7c673cae 255class RGWHTTPManager {
11fdf7f2
TL
256 struct set_state {
257 rgw_http_req_data *req;
258 int bitmask;
259
260 set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
261 };
7c673cae
FG
262 CephContext *cct;
263 RGWCompletionManager *completion_mgr;
264 void *multi_handle;
9f95a23c 265 bool is_started = false;
7c673cae
FG
266 std::atomic<unsigned> going_down { 0 };
267 std::atomic<unsigned> is_stopped { 0 };
268
9f95a23c 269 ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
7c673cae
FG
270 map<uint64_t, rgw_http_req_data *> reqs;
271 list<rgw_http_req_data *> unregistered_reqs;
11fdf7f2 272 list<set_state> reqs_change_state;
7c673cae 273 map<uint64_t, rgw_http_req_data *> complete_reqs;
9f95a23c
TL
274 int64_t num_reqs = 0;
275 int64_t max_threaded_req = 0;
7c673cae
FG
276 int thread_pipe[2];
277
278 void register_request(rgw_http_req_data *req_data);
279 void complete_request(rgw_http_req_data *req_data);
280 void _complete_request(rgw_http_req_data *req_data);
11fdf7f2 281 bool unregister_request(rgw_http_req_data *req_data);
7c673cae
FG
282 void _unlink_request(rgw_http_req_data *req_data);
283 void unlink_request(rgw_http_req_data *req_data);
9f95a23c 284 void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
7c673cae 285 void _finish_request(rgw_http_req_data *req_data, int r);
11fdf7f2 286 void _set_req_state(set_state& ss);
7c673cae
FG
287 int link_request(rgw_http_req_data *req_data);
288
289 void manage_pending_requests();
290
291 class ReqsThread : public Thread {
292 RGWHTTPManager *manager;
293
294 public:
11fdf7f2 295 explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
7c673cae
FG
296 void *entry() override;
297 };
298
9f95a23c 299 ReqsThread *reqs_thread = nullptr;
7c673cae
FG
300
301 void *reqs_thread_entry();
302
303 int signal_thread();
304
305public:
306 RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
307 ~RGWHTTPManager();
308
11fdf7f2 309 int start();
7c673cae
FG
310 void stop();
311
11fdf7f2 312 int add_request(RGWHTTPClient *client);
7c673cae 313 int remove_request(RGWHTTPClient *client);
11fdf7f2 314 int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
7c673cae
FG
315};
316
11fdf7f2
TL
317class RGWHTTP
318{
319public:
320 static int send(RGWHTTPClient *req);
9f95a23c 321 static int process(RGWHTTPClient *req, optional_yield y);
11fdf7f2 322};
7c673cae 323#endif