]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #include "include/compat.h" | |
91327a77 | 5 | #include "common/errno.h" |
7c673cae | 6 | |
7c673cae FG |
7 | |
8 | #include <curl/curl.h> | |
9 | #include <curl/easy.h> | |
10 | #include <curl/multi.h> | |
11 | ||
12 | #include "rgw_common.h" | |
13 | #include "rgw_http_client.h" | |
14 | #include "rgw_http_errors.h" | |
eafe8130 | 15 | #include "common/async/completion.h" |
7c673cae FG |
16 | #include "common/RefCountedObj.h" |
17 | ||
18 | #include "rgw_coroutine.h" | |
9f95a23c | 19 | #include "rgw_tools.h" |
7c673cae FG |
20 | |
21 | #include <atomic> | |
f67539c2 | 22 | #include <string_view> |
7c673cae FG |
23 | |
24 | #define dout_context g_ceph_context | |
25 | #define dout_subsys ceph_subsys_rgw | |
26 | ||
20effc67 TL |
27 | using namespace std; |
28 | ||
11fdf7f2 TL |
29 | RGWHTTPManager *rgw_http_manager; |
30 | ||
31 | struct RGWCurlHandle; | |
32 | ||
33 | static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle); | |
34 | ||
7c673cae | 35 | struct rgw_http_req_data : public RefCountedObject { |
11fdf7f2 TL |
36 | RGWCurlHandle *curl_handle{nullptr}; |
37 | curl_slist *h{nullptr}; | |
7c673cae | 38 | uint64_t id; |
11fdf7f2 | 39 | int ret{0}; |
7c673cae | 40 | std::atomic<bool> done = { false }; |
11fdf7f2 TL |
41 | RGWHTTPClient *client{nullptr}; |
42 | rgw_io_id control_io_id; | |
43 | void *user_info{nullptr}; | |
44 | bool registered{false}; | |
45 | RGWHTTPManager *mgr{nullptr}; | |
7c673cae | 46 | char error_buf[CURL_ERROR_SIZE]; |
11fdf7f2 TL |
47 | bool write_paused{false}; |
48 | bool read_paused{false}; | |
7c673cae | 49 | |
9f95a23c TL |
50 | optional<int> user_ret; |
51 | ||
52 | ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock"); | |
53 | ceph::condition_variable cond; | |
7c673cae | 54 | |
eafe8130 TL |
55 | using Signature = void(boost::system::error_code); |
56 | using Completion = ceph::async::Completion<Signature>; | |
57 | std::unique_ptr<Completion> completion; | |
58 | ||
9f95a23c | 59 | rgw_http_req_data() : id(-1) { |
92f5a8d4 | 60 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae FG |
61 | memset(error_buf, 0, sizeof(error_buf)); |
62 | } | |
63 | ||
eafe8130 TL |
64 | template <typename ExecutionContext, typename CompletionToken> |
65 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
66 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
67 | auto& handler = init.completion_handler; | |
68 | { | |
69 | std::unique_lock l{lock}; | |
70 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
71 | } | |
72 | return init.result.get(); | |
73 | } | |
9f95a23c | 74 | |
eafe8130 | 75 | int wait(optional_yield y) { |
11fdf7f2 TL |
76 | if (done) { |
77 | return ret; | |
78 | } | |
eafe8130 TL |
79 | if (y) { |
80 | auto& context = y.get_io_context(); | |
81 | auto& yield = y.get_yield_context(); | |
82 | boost::system::error_code ec; | |
83 | async_wait(context, yield[ec]); | |
84 | return -ec.value(); | |
85 | } | |
9f95a23c TL |
86 | // work on asio threads should be asynchronous, so warn when they block |
87 | if (is_asio_thread) { | |
88 | dout(20) << "WARNING: blocking http request" << dendl; | |
89 | } | |
9f95a23c | 90 | std::unique_lock l{lock}; |
f67539c2 | 91 | cond.wait(l, [this]{return done==true;}); |
7c673cae FG |
92 | return ret; |
93 | } | |
94 | ||
11fdf7f2 | 95 | void set_state(int bitmask); |
7c673cae | 96 | |
9f95a23c TL |
97 | void finish(int r, long http_status = -1) { |
98 | std::lock_guard l{lock}; | |
99 | if (http_status != -1) { | |
100 | if (client) { | |
101 | client->set_http_status(http_status); | |
102 | } | |
103 | } | |
7c673cae | 104 | ret = r; |
11fdf7f2 TL |
105 | if (curl_handle) |
106 | do_curl_easy_cleanup(curl_handle); | |
7c673cae FG |
107 | |
108 | if (h) | |
109 | curl_slist_free_all(h); | |
110 | ||
11fdf7f2 | 111 | curl_handle = NULL; |
7c673cae FG |
112 | h = NULL; |
113 | done = true; | |
eafe8130 TL |
114 | if (completion) { |
115 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
116 | Completion::post(std::move(completion), ec); | |
117 | } else { | |
9f95a23c | 118 | cond.notify_all(); |
eafe8130 | 119 | } |
11fdf7f2 TL |
120 | } |
121 | ||
7c673cae FG |
122 | bool is_done() { |
123 | return done; | |
124 | } | |
125 | ||
126 | int get_retcode() { | |
9f95a23c | 127 | std::lock_guard l{lock}; |
7c673cae FG |
128 | return ret; |
129 | } | |
9f95a23c | 130 | |
7c673cae | 131 | RGWHTTPManager *get_manager() { |
9f95a23c | 132 | std::lock_guard l{lock}; |
7c673cae FG |
133 | return mgr; |
134 | } | |
11fdf7f2 TL |
135 | |
136 | CURL *get_easy_handle() const; | |
7c673cae FG |
137 | }; |
138 | ||
94b18763 FG |
139 | struct RGWCurlHandle { |
140 | int uses; | |
141 | mono_time lastuse; | |
142 | CURL* h; | |
143 | ||
11fdf7f2 | 144 | explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {}; |
94b18763 FG |
145 | CURL* operator*() { |
146 | return this->h; | |
147 | } | |
148 | }; | |
149 | ||
11fdf7f2 TL |
150 | void rgw_http_req_data::set_state(int bitmask) { |
151 | /* no need to lock here, moreover curl_easy_pause() might trigger | |
152 | * the data receive callback :/ | |
153 | */ | |
154 | CURLcode rc = curl_easy_pause(**curl_handle, bitmask); | |
155 | if (rc != CURLE_OK) { | |
156 | dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl; | |
157 | } | |
158 | } | |
159 | ||
94b18763 FG |
160 | #define MAXIDLE 5 |
161 | class RGWCurlHandles : public Thread { | |
162 | public: | |
9f95a23c TL |
163 | ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock"); |
164 | std::vector<RGWCurlHandle*> saved_curl; | |
94b18763 | 165 | int cleaner_shutdown; |
9f95a23c | 166 | ceph::condition_variable cleaner_cond; |
94b18763 FG |
167 | |
168 | RGWCurlHandles() : | |
94b18763 FG |
169 | cleaner_shutdown{0} { |
170 | } | |
171 | ||
172 | RGWCurlHandle* get_curl_handle(); | |
173 | void release_curl_handle_now(RGWCurlHandle* curl); | |
174 | void release_curl_handle(RGWCurlHandle* curl); | |
175 | void flush_curl_handles(); | |
176 | void* entry(); | |
177 | void stop(); | |
178 | }; | |
179 | ||
180 | RGWCurlHandle* RGWCurlHandles::get_curl_handle() { | |
181 | RGWCurlHandle* curl = 0; | |
182 | CURL* h; | |
183 | { | |
9f95a23c | 184 | std::lock_guard lock{cleaner_lock}; |
94b18763 FG |
185 | if (!saved_curl.empty()) { |
186 | curl = *saved_curl.begin(); | |
187 | saved_curl.erase(saved_curl.begin()); | |
188 | } | |
189 | } | |
190 | if (curl) { | |
191 | } else if ((h = curl_easy_init())) { | |
192 | curl = new RGWCurlHandle{h}; | |
193 | } else { | |
194 | // curl = 0; | |
195 | } | |
196 | return curl; | |
197 | } | |
198 | ||
199 | void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl) | |
200 | { | |
201 | curl_easy_cleanup(**curl); | |
202 | delete curl; | |
203 | } | |
204 | ||
205 | void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl) | |
206 | { | |
207 | if (cleaner_shutdown) { | |
208 | release_curl_handle_now(curl); | |
209 | } else { | |
210 | curl_easy_reset(**curl); | |
9f95a23c | 211 | std::lock_guard lock{cleaner_lock}; |
94b18763 FG |
212 | curl->lastuse = mono_clock::now(); |
213 | saved_curl.insert(saved_curl.begin(), 1, curl); | |
214 | } | |
215 | } | |
216 | ||
217 | void* RGWCurlHandles::entry() | |
218 | { | |
219 | RGWCurlHandle* curl; | |
9f95a23c | 220 | std::unique_lock lock{cleaner_lock}; |
94b18763 FG |
221 | |
222 | for (;;) { | |
223 | if (cleaner_shutdown) { | |
224 | if (saved_curl.empty()) | |
225 | break; | |
226 | } else { | |
9f95a23c | 227 | cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE)); |
94b18763 FG |
228 | } |
229 | mono_time now = mono_clock::now(); | |
230 | while (!saved_curl.empty()) { | |
231 | auto cend = saved_curl.end(); | |
232 | --cend; | |
233 | curl = *cend; | |
234 | if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE)) | |
235 | break; | |
236 | saved_curl.erase(cend); | |
237 | release_curl_handle_now(curl); | |
238 | } | |
239 | } | |
240 | return nullptr; | |
241 | } | |
242 | ||
243 | void RGWCurlHandles::stop() | |
244 | { | |
9f95a23c | 245 | std::lock_guard lock{cleaner_lock}; |
94b18763 | 246 | cleaner_shutdown = 1; |
9f95a23c | 247 | cleaner_cond.notify_all(); |
94b18763 FG |
248 | } |
249 | ||
250 | void RGWCurlHandles::flush_curl_handles() | |
251 | { | |
252 | stop(); | |
253 | join(); | |
254 | if (!saved_curl.empty()) { | |
255 | dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl; | |
256 | } | |
257 | saved_curl.shrink_to_fit(); | |
258 | } | |
259 | ||
11fdf7f2 TL |
260 | CURL *rgw_http_req_data::get_easy_handle() const |
261 | { | |
262 | return **curl_handle; | |
263 | } | |
264 | ||
94b18763 | 265 | static RGWCurlHandles *handles; |
11fdf7f2 TL |
266 | |
267 | static RGWCurlHandle *do_curl_easy_init() | |
268 | { | |
269 | return handles->get_curl_handle(); | |
270 | } | |
271 | ||
272 | static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle) | |
273 | { | |
274 | handles->release_curl_handle(curl_handle); | |
275 | } | |
276 | ||
94b18763 FG |
277 | // XXX make this part of the token cache? (but that's swift-only; |
278 | // and this especially needs to integrates with s3...) | |
279 | ||
280 | void rgw_setup_saved_curl_handles() | |
281 | { | |
282 | handles = new RGWCurlHandles(); | |
283 | handles->create("rgw_curl"); | |
284 | } | |
285 | ||
286 | void rgw_release_all_curl_handles() | |
287 | { | |
288 | handles->flush_curl_handles(); | |
289 | delete handles; | |
290 | } | |
291 | ||
11fdf7f2 | 292 | void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type) |
7c673cae | 293 | { |
11fdf7f2 TL |
294 | if (id == 0) { |
295 | id = io_id_provider.get_next(); | |
7c673cae | 296 | } |
7c673cae FG |
297 | } |
298 | ||
20effc67 TL |
299 | RGWHTTPClient::RGWHTTPClient(CephContext *cct, |
300 | const string& _method, | |
301 | const string& _url) | |
302 | : NoDoutPrefix(cct, dout_subsys), | |
303 | has_send_len(false), | |
304 | http_status(HTTP_STATUS_NOSTATUS), | |
305 | req_data(nullptr), | |
306 | verify_ssl(cct->_conf->rgw_verify_ssl), | |
307 | cct(cct), | |
308 | method(_method), | |
309 | url(_url) { | |
310 | init(); | |
311 | } | |
312 | ||
313 | std::ostream& RGWHTTPClient::gen_prefix(std::ostream& out) const | |
314 | { | |
315 | out << "http_client[" << method << "/" << url << "]"; | |
316 | return out; | |
317 | } | |
318 | ||
319 | void RGWHTTPClient::init() | |
320 | { | |
321 | auto pos = url.find("://"); | |
322 | if (pos == string::npos) { | |
323 | host = url; | |
324 | return; | |
325 | } | |
326 | ||
327 | protocol = url.substr(0, pos); | |
328 | ||
329 | pos += 3; | |
330 | ||
331 | auto host_end_pos = url.find("/", pos); | |
332 | if (host_end_pos == string::npos) { | |
333 | host = url.substr(pos); | |
334 | return; | |
335 | } | |
336 | ||
337 | host = url.substr(pos, host_end_pos - pos); | |
338 | resource_prefix = url.substr(host_end_pos + 1); | |
339 | if (resource_prefix.size() > 0 && resource_prefix[resource_prefix.size() - 1] != '/') { | |
340 | resource_prefix.append("/"); | |
341 | } | |
342 | } | |
343 | ||
7c673cae FG |
344 | /* |
345 | * the following set of callbacks will be called either on RGWHTTPManager::process(), | |
346 | * or via the RGWHTTPManager async processing. | |
347 | */ | |
348 | size_t RGWHTTPClient::receive_http_header(void * const ptr, | |
349 | const size_t size, | |
350 | const size_t nmemb, | |
351 | void * const _info) | |
352 | { | |
353 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
354 | size_t len = size * nmemb; | |
355 | ||
9f95a23c | 356 | std::lock_guard l{req_data->lock}; |
7c673cae FG |
357 | |
358 | if (!req_data->registered) { | |
359 | return len; | |
360 | } | |
361 | ||
362 | int ret = req_data->client->receive_header(ptr, size * nmemb); | |
363 | if (ret < 0) { | |
9f95a23c TL |
364 | dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl; |
365 | req_data->user_ret = ret; | |
366 | return CURLE_WRITE_ERROR; | |
7c673cae FG |
367 | } |
368 | ||
369 | return len; | |
370 | } | |
371 | ||
372 | size_t RGWHTTPClient::receive_http_data(void * const ptr, | |
373 | const size_t size, | |
374 | const size_t nmemb, | |
375 | void * const _info) | |
376 | { | |
377 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
378 | size_t len = size * nmemb; | |
379 | ||
11fdf7f2 TL |
380 | bool pause = false; |
381 | ||
382 | RGWHTTPClient *client; | |
383 | ||
384 | { | |
9f95a23c | 385 | std::lock_guard l{req_data->lock}; |
11fdf7f2 TL |
386 | if (!req_data->registered) { |
387 | return len; | |
388 | } | |
389 | ||
390 | client = req_data->client; | |
391 | } | |
392 | ||
393 | size_t& skip_bytes = client->receive_pause_skip; | |
394 | ||
395 | if (skip_bytes >= len) { | |
396 | skip_bytes -= len; | |
7c673cae FG |
397 | return len; |
398 | } | |
11fdf7f2 TL |
399 | |
400 | int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause); | |
7c673cae | 401 | if (ret < 0) { |
9f95a23c TL |
402 | dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl; |
403 | req_data->user_ret = ret; | |
404 | return CURLE_WRITE_ERROR; | |
7c673cae FG |
405 | } |
406 | ||
11fdf7f2 TL |
407 | if (pause) { |
408 | dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl; | |
409 | skip_bytes = len; | |
9f95a23c | 410 | std::lock_guard l{req_data->lock}; |
11fdf7f2 TL |
411 | req_data->read_paused = true; |
412 | return CURL_WRITEFUNC_PAUSE; | |
413 | } | |
414 | ||
415 | skip_bytes = 0; | |
416 | ||
7c673cae FG |
417 | return len; |
418 | } | |
419 | ||
420 | size_t RGWHTTPClient::send_http_data(void * const ptr, | |
421 | const size_t size, | |
422 | const size_t nmemb, | |
423 | void * const _info) | |
424 | { | |
425 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
426 | ||
11fdf7f2 TL |
427 | RGWHTTPClient *client; |
428 | ||
429 | { | |
9f95a23c | 430 | std::lock_guard l{req_data->lock}; |
7c673cae | 431 | |
11fdf7f2 TL |
432 | if (!req_data->registered) { |
433 | return 0; | |
434 | } | |
435 | ||
436 | client = req_data->client; | |
7c673cae FG |
437 | } |
438 | ||
11fdf7f2 TL |
439 | bool pause = false; |
440 | ||
441 | int ret = client->send_data(ptr, size * nmemb, &pause); | |
7c673cae | 442 | if (ret < 0) { |
9f95a23c TL |
443 | dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl; |
444 | req_data->user_ret = ret; | |
445 | return CURLE_READ_ERROR; | |
7c673cae FG |
446 | } |
447 | ||
11fdf7f2 TL |
448 | if (ret == 0 && |
449 | pause) { | |
9f95a23c | 450 | std::lock_guard l{req_data->lock}; |
11fdf7f2 TL |
451 | req_data->write_paused = true; |
452 | return CURL_READFUNC_PAUSE; | |
453 | } | |
454 | ||
7c673cae FG |
455 | return ret; |
456 | } | |
457 | ||
9f95a23c | 458 | ceph::mutex& RGWHTTPClient::get_req_lock() |
11fdf7f2 TL |
459 | { |
460 | return req_data->lock; | |
461 | } | |
462 | ||
463 | void RGWHTTPClient::_set_write_paused(bool pause) | |
464 | { | |
9f95a23c | 465 | ceph_assert(ceph_mutex_is_locked(req_data->lock)); |
11fdf7f2 TL |
466 | |
467 | RGWHTTPManager *mgr = req_data->mgr; | |
468 | if (pause == req_data->write_paused) { | |
469 | return; | |
470 | } | |
471 | if (pause) { | |
472 | mgr->set_request_state(this, SET_WRITE_PAUSED); | |
473 | } else { | |
474 | mgr->set_request_state(this, SET_WRITE_RESUME); | |
475 | } | |
476 | } | |
477 | ||
478 | void RGWHTTPClient::_set_read_paused(bool pause) | |
479 | { | |
9f95a23c | 480 | ceph_assert(ceph_mutex_is_locked(req_data->lock)); |
11fdf7f2 TL |
481 | |
482 | RGWHTTPManager *mgr = req_data->mgr; | |
483 | if (pause == req_data->read_paused) { | |
484 | return; | |
485 | } | |
486 | if (pause) { | |
487 | mgr->set_request_state(this, SET_READ_PAUSED); | |
488 | } else { | |
489 | mgr->set_request_state(this, SET_READ_RESUME); | |
490 | } | |
491 | } | |
492 | ||
7c673cae FG |
493 | static curl_slist *headers_to_slist(param_vec_t& headers) |
494 | { | |
495 | curl_slist *h = NULL; | |
496 | ||
497 | param_vec_t::iterator iter; | |
498 | for (iter = headers.begin(); iter != headers.end(); ++iter) { | |
499 | pair<string, string>& p = *iter; | |
500 | string val = p.first; | |
501 | ||
502 | if (strncmp(val.c_str(), "HTTP_", 5) == 0) { | |
503 | val = val.substr(5); | |
504 | } | |
505 | ||
506 | /* we need to convert all underscores into dashes as some web servers forbid them | |
507 | * in the http header field names | |
508 | */ | |
509 | for (size_t i = 0; i < val.size(); i++) { | |
510 | if (val[i] == '_') { | |
511 | val[i] = '-'; | |
512 | } | |
513 | } | |
11fdf7f2 TL |
514 | |
515 | val = camelcase_dash_http_attr(val); | |
7c673cae | 516 | |
28e407b8 AA |
517 | // curl won't send headers with empty values unless it ends with a ; instead |
518 | if (p.second.empty()) { | |
519 | val.append(1, ';'); | |
520 | } else { | |
521 | val.append(": "); | |
522 | val.append(p.second); | |
523 | } | |
7c673cae FG |
524 | h = curl_slist_append(h, val.c_str()); |
525 | } | |
526 | ||
527 | return h; | |
528 | } | |
529 | ||
11fdf7f2 | 530 | static bool is_upload_request(const string& method) |
7c673cae | 531 | { |
11fdf7f2 | 532 | return method == "POST" || method == "PUT"; |
7c673cae FG |
533 | } |
534 | ||
535 | /* | |
11fdf7f2 | 536 | * process a single simple one off request |
7c673cae | 537 | */ |
eafe8130 | 538 | int RGWHTTPClient::process(optional_yield y) |
7c673cae | 539 | { |
eafe8130 | 540 | return RGWHTTP::process(this, y); |
7c673cae FG |
541 | } |
542 | ||
543 | string RGWHTTPClient::to_str() | |
544 | { | |
11fdf7f2 TL |
545 | string method_str = (method.empty() ? "<no-method>" : method); |
546 | string url_str = (url.empty() ? "<no-url>" : url); | |
7c673cae FG |
547 | return method_str + " " + url_str; |
548 | } | |
549 | ||
550 | int RGWHTTPClient::get_req_retcode() | |
551 | { | |
552 | if (!req_data) { | |
553 | return -EINVAL; | |
554 | } | |
555 | ||
556 | return req_data->get_retcode(); | |
557 | } | |
558 | ||
559 | /* | |
560 | * init request, will be used later with RGWHTTPManager | |
561 | */ | |
11fdf7f2 | 562 | int RGWHTTPClient::init_request(rgw_http_req_data *_req_data) |
7c673cae | 563 | { |
11fdf7f2 | 564 | ceph_assert(!req_data); |
7c673cae FG |
565 | _req_data->get(); |
566 | req_data = _req_data; | |
567 | ||
11fdf7f2 | 568 | req_data->curl_handle = do_curl_easy_init(); |
7c673cae | 569 | |
11fdf7f2 | 570 | CURL *easy_handle = req_data->get_easy_handle(); |
7c673cae FG |
571 | |
572 | dout(20) << "sending request to " << url << dendl; | |
573 | ||
574 | curl_slist *h = headers_to_slist(headers); | |
575 | ||
576 | req_data->h = h; | |
577 | ||
11fdf7f2 TL |
578 | curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str()); |
579 | curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str()); | |
7c673cae FG |
580 | curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); |
581 | curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); | |
582 | curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); | |
583 | curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data); | |
584 | curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); | |
585 | curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data); | |
586 | curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf); | |
1adf2230 AA |
587 | curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time); |
588 | curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit); | |
1e59de90 | 589 | curl_easy_setopt(easy_handle, CURLOPT_TCP_KEEPALIVE, cct->_conf->rgw_curl_tcp_keepalive); |
7c673cae FG |
590 | curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); |
591 | curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); | |
f67539c2 | 592 | curl_easy_setopt(easy_handle, CURLOPT_BUFFERSIZE, cct->_conf->rgw_curl_buffersize); |
31f18b77 | 593 | if (send_data_hint || is_upload_request(method)) { |
7c673cae FG |
594 | curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); |
595 | } | |
596 | if (has_send_len) { | |
f67539c2 TL |
597 | // TODO: prevent overflow by using curl_off_t |
598 | // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE | |
599 | const long size = send_len; | |
600 | curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size); | |
601 | if (method == "POST") { | |
602 | curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size); | |
603 | // TODO: set to size smaller than 1MB should prevent the "Expect" field | |
604 | // from being sent. So explicit removal is not needed | |
605 | h = curl_slist_append(h, "Expect:"); | |
606 | } | |
607 | } | |
20effc67 TL |
608 | |
609 | if (method == "HEAD") { | |
610 | curl_easy_setopt(easy_handle, CURLOPT_NOBODY, 1L); | |
611 | } | |
612 | ||
f67539c2 TL |
613 | if (h) { |
614 | curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); | |
7c673cae | 615 | } |
31f18b77 FG |
616 | if (!verify_ssl) { |
617 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L); | |
618 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L); | |
619 | dout(20) << "ssl verification is set to off" << dendl; | |
522d829b TL |
620 | } else { |
621 | if (!ca_path.empty()) { | |
622 | curl_easy_setopt(easy_handle, CURLOPT_CAINFO, ca_path.c_str()); | |
623 | dout(20) << "using custom ca cert "<< ca_path.c_str() << " for ssl" << dendl; | |
624 | } | |
625 | if (!client_cert.empty()) { | |
626 | if (!client_key.empty()) { | |
627 | curl_easy_setopt(easy_handle, CURLOPT_SSLCERT, client_cert.c_str()); | |
628 | curl_easy_setopt(easy_handle, CURLOPT_SSLKEY, client_key.c_str()); | |
629 | dout(20) << "using custom client cert " << client_cert.c_str() | |
630 | << " and private key " << client_key.c_str() << dendl; | |
631 | } else { | |
632 | dout(5) << "private key is missing for client certificate" << dendl; | |
633 | } | |
634 | } | |
31f18b77 | 635 | } |
7c673cae | 636 | curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data); |
f67539c2 | 637 | curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout); |
7c673cae FG |
638 | |
639 | return 0; | |
640 | } | |
641 | ||
11fdf7f2 TL |
642 | bool RGWHTTPClient::is_done() |
643 | { | |
644 | return req_data->is_done(); | |
645 | } | |
646 | ||
7c673cae FG |
647 | /* |
648 | * wait for async request to complete | |
649 | */ | |
eafe8130 | 650 | int RGWHTTPClient::wait(optional_yield y) |
7c673cae | 651 | { |
eafe8130 | 652 | return req_data->wait(y); |
7c673cae FG |
653 | } |
654 | ||
11fdf7f2 | 655 | void RGWHTTPClient::cancel() |
7c673cae FG |
656 | { |
657 | if (req_data) { | |
11fdf7f2 | 658 | RGWHTTPManager *http_manager = req_data->mgr; |
7c673cae FG |
659 | if (http_manager) { |
660 | http_manager->remove_request(this); | |
661 | } | |
11fdf7f2 TL |
662 | } |
663 | } | |
7c673cae | 664 | |
11fdf7f2 TL |
665 | RGWHTTPClient::~RGWHTTPClient() |
666 | { | |
667 | cancel(); | |
668 | if (req_data) { | |
7c673cae FG |
669 | req_data->put(); |
670 | } | |
671 | } | |
672 | ||
673 | ||
674 | int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len) | |
675 | { | |
f67539c2 | 676 | const std::string_view header_line(static_cast<const char *>(ptr), len); |
7c673cae FG |
677 | |
678 | /* We're tokening the line that way due to backward compatibility. */ | |
679 | const size_t sep_loc = header_line.find_first_of(" \t:"); | |
680 | ||
f67539c2 | 681 | if (std::string_view::npos == sep_loc) { |
7c673cae FG |
682 | /* Wrongly formatted header? Just skip it. */ |
683 | return 0; | |
684 | } | |
685 | ||
686 | header_name_t name(header_line.substr(0, sep_loc)); | |
687 | if (0 == relevant_headers.count(name)) { | |
688 | /* Not interested in this particular header. */ | |
689 | return 0; | |
690 | } | |
691 | ||
692 | const auto value_part = header_line.substr(sep_loc + 1); | |
693 | ||
694 | /* Skip spaces and tabs after the separator. */ | |
695 | const size_t val_loc_s = value_part.find_first_not_of(' '); | |
696 | const size_t val_loc_e = value_part.find_first_of("\r\n"); | |
697 | ||
f67539c2 TL |
698 | if (std::string_view::npos == val_loc_s || |
699 | std::string_view::npos == val_loc_e) { | |
7c673cae FG |
700 | /* Empty value case. */ |
701 | found_headers.emplace(name, header_value_t()); | |
702 | } else { | |
703 | found_headers.emplace(name, header_value_t( | |
704 | value_part.substr(val_loc_s, val_loc_e - val_loc_s))); | |
705 | } | |
706 | ||
707 | return 0; | |
708 | } | |
709 | ||
11fdf7f2 | 710 | int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause) |
7c673cae FG |
711 | { |
712 | int length_to_copy = 0; | |
713 | if (post_data_index < post_data.length()) { | |
714 | length_to_copy = min(post_data.length() - post_data_index, len); | |
715 | memcpy(ptr, post_data.data() + post_data_index, length_to_copy); | |
716 | post_data_index += length_to_copy; | |
717 | } | |
718 | return length_to_copy; | |
719 | } | |
720 | ||
721 | ||
722 | static int clear_signal(int fd) | |
723 | { | |
724 | // since we're in non-blocking mode, we can try to read a lot more than | |
1e59de90 | 725 | // one signal from signal_thread() to avoid later wakeups |
20effc67 | 726 | std::array<char, 256> buf{}; |
7c673cae FG |
727 | int ret = ::read(fd, (void *)buf.data(), buf.size()); |
728 | if (ret < 0) { | |
729 | ret = -errno; | |
730 | return ret == -EAGAIN ? 0 : ret; // clear EAGAIN | |
731 | } | |
732 | return 0; | |
733 | } | |
734 | ||
7c673cae FG |
735 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) |
736 | { | |
737 | int num_fds; | |
738 | struct curl_waitfd wait_fd; | |
739 | ||
740 | wait_fd.fd = signal_fd; | |
741 | wait_fd.events = CURL_WAIT_POLLIN; | |
742 | wait_fd.revents = 0; | |
743 | ||
744 | int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); | |
745 | if (ret) { | |
746 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
747 | return -EIO; | |
748 | } | |
749 | ||
1e59de90 | 750 | if (wait_fd.revents > 0) { |
7c673cae FG |
751 | ret = clear_signal(signal_fd); |
752 | if (ret < 0) { | |
753 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
754 | return ret; | |
755 | } | |
756 | } | |
7c673cae FG |
757 | return 0; |
758 | } | |
759 | ||
7c673cae FG |
760 | void *RGWHTTPManager::ReqsThread::entry() |
761 | { | |
762 | manager->reqs_thread_entry(); | |
763 | return NULL; | |
764 | } | |
765 | ||
766 | /* | |
767 | * RGWHTTPManager has two modes of operation: threaded and non-threaded. | |
768 | */ | |
769 | RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), | |
9f95a23c | 770 | completion_mgr(_cm) |
7c673cae FG |
771 | { |
772 | multi_handle = (void *)curl_multi_init(); | |
773 | thread_pipe[0] = -1; | |
774 | thread_pipe[1] = -1; | |
775 | } | |
776 | ||
777 | RGWHTTPManager::~RGWHTTPManager() { | |
778 | stop(); | |
779 | if (multi_handle) | |
780 | curl_multi_cleanup((CURLM *)multi_handle); | |
781 | } | |
782 | ||
783 | void RGWHTTPManager::register_request(rgw_http_req_data *req_data) | |
784 | { | |
9f95a23c | 785 | std::unique_lock rl{reqs_lock}; |
7c673cae FG |
786 | req_data->id = num_reqs; |
787 | req_data->registered = true; | |
788 | reqs[num_reqs] = req_data; | |
789 | num_reqs++; | |
11fdf7f2 | 790 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; |
7c673cae FG |
791 | } |
792 | ||
11fdf7f2 | 793 | bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) |
7c673cae | 794 | { |
9f95a23c | 795 | std::unique_lock rl{reqs_lock}; |
11fdf7f2 TL |
796 | if (!req_data->registered) { |
797 | return false; | |
798 | } | |
7c673cae FG |
799 | req_data->get(); |
800 | req_data->registered = false; | |
801 | unregistered_reqs.push_back(req_data); | |
11fdf7f2 TL |
802 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; |
803 | return true; | |
7c673cae FG |
804 | } |
805 | ||
806 | void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) | |
807 | { | |
9f95a23c | 808 | std::unique_lock rl{reqs_lock}; |
7c673cae FG |
809 | _complete_request(req_data); |
810 | } | |
811 | ||
812 | void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) | |
813 | { | |
814 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id); | |
815 | if (iter != reqs.end()) { | |
816 | reqs.erase(iter); | |
817 | } | |
818 | { | |
9f95a23c | 819 | std::lock_guard l{req_data->lock}; |
7c673cae FG |
820 | req_data->mgr = nullptr; |
821 | } | |
822 | if (completion_mgr) { | |
11fdf7f2 | 823 | completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info); |
7c673cae FG |
824 | } |
825 | ||
826 | req_data->put(); | |
827 | } | |
828 | ||
9f95a23c | 829 | void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status) |
7c673cae | 830 | { |
9f95a23c | 831 | req_data->finish(ret, http_status); |
7c673cae FG |
832 | complete_request(req_data); |
833 | } | |
834 | ||
835 | void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) | |
836 | { | |
837 | req_data->finish(ret); | |
838 | _complete_request(req_data); | |
839 | } | |
840 | ||
11fdf7f2 TL |
841 | void RGWHTTPManager::_set_req_state(set_state& ss) |
842 | { | |
843 | ss.req->set_state(ss.bitmask); | |
844 | } | |
7c673cae FG |
845 | /* |
846 | * hook request to the curl multi handle | |
847 | */ | |
848 | int RGWHTTPManager::link_request(rgw_http_req_data *req_data) | |
849 | { | |
11fdf7f2 TL |
850 | ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; |
851 | CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle()); | |
7c673cae FG |
852 | if (mstatus) { |
853 | dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; | |
854 | return -EIO; | |
855 | } | |
856 | return 0; | |
857 | } | |
858 | ||
859 | /* | |
860 | * unhook request from the curl multi handle, and finish request if it wasn't finished yet as | |
861 | * there will be no more processing on this request | |
862 | */ | |
863 | void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) | |
864 | { | |
11fdf7f2 TL |
865 | if (req_data->curl_handle) { |
866 | curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle()); | |
7c673cae | 867 | } |
eafe8130 | 868 | if (!req_data->is_done()) { |
7c673cae FG |
869 | _finish_request(req_data, -ECANCELED); |
870 | } | |
871 | } | |
872 | ||
873 | void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) | |
874 | { | |
9f95a23c | 875 | std::unique_lock wl{reqs_lock}; |
7c673cae FG |
876 | _unlink_request(req_data); |
877 | } | |
878 | ||
879 | void RGWHTTPManager::manage_pending_requests() | |
880 | { | |
9f95a23c | 881 | reqs_lock.lock_shared(); |
11fdf7f2 TL |
882 | if (max_threaded_req == num_reqs && |
883 | unregistered_reqs.empty() && | |
884 | reqs_change_state.empty()) { | |
9f95a23c | 885 | reqs_lock.unlock_shared(); |
7c673cae FG |
886 | return; |
887 | } | |
9f95a23c | 888 | reqs_lock.unlock_shared(); |
7c673cae | 889 | |
9f95a23c | 890 | std::unique_lock wl{reqs_lock}; |
7c673cae | 891 | |
f6b5b4d7 TL |
892 | if (!reqs_change_state.empty()) { |
893 | for (auto siter : reqs_change_state) { | |
894 | _set_req_state(siter); | |
895 | } | |
896 | reqs_change_state.clear(); | |
897 | } | |
898 | ||
7c673cae FG |
899 | if (!unregistered_reqs.empty()) { |
900 | for (auto& r : unregistered_reqs) { | |
901 | _unlink_request(r); | |
902 | r->put(); | |
903 | } | |
904 | ||
905 | unregistered_reqs.clear(); | |
906 | } | |
907 | ||
908 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req); | |
909 | ||
910 | list<std::pair<rgw_http_req_data *, int> > remove_reqs; | |
911 | ||
912 | for (; iter != reqs.end(); ++iter) { | |
913 | rgw_http_req_data *req_data = iter->second; | |
914 | int r = link_request(req_data); | |
915 | if (r < 0) { | |
916 | ldout(cct, 0) << "ERROR: failed to link http request" << dendl; | |
917 | remove_reqs.push_back(std::make_pair(iter->second, r)); | |
918 | } else { | |
919 | max_threaded_req = iter->first + 1; | |
920 | } | |
921 | } | |
922 | ||
923 | for (auto piter : remove_reqs) { | |
924 | rgw_http_req_data *req_data = piter.first; | |
925 | int r = piter.second; | |
926 | ||
927 | _finish_request(req_data, r); | |
928 | } | |
929 | } | |
930 | ||
11fdf7f2 | 931 | int RGWHTTPManager::add_request(RGWHTTPClient *client) |
7c673cae FG |
932 | { |
933 | rgw_http_req_data *req_data = new rgw_http_req_data; | |
934 | ||
11fdf7f2 | 935 | int ret = client->init_request(req_data); |
7c673cae FG |
936 | if (ret < 0) { |
937 | req_data->put(); | |
938 | req_data = NULL; | |
939 | return ret; | |
940 | } | |
941 | ||
942 | req_data->mgr = this; | |
943 | req_data->client = client; | |
11fdf7f2 TL |
944 | req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL); |
945 | req_data->user_info = client->get_io_user_info(); | |
7c673cae FG |
946 | |
947 | register_request(req_data); | |
948 | ||
11fdf7f2 | 949 | if (!is_started) { |
7c673cae FG |
950 | ret = link_request(req_data); |
951 | if (ret < 0) { | |
952 | req_data->put(); | |
953 | req_data = NULL; | |
954 | } | |
955 | return ret; | |
956 | } | |
957 | ret = signal_thread(); | |
958 | if (ret < 0) { | |
959 | finish_request(req_data, ret); | |
960 | } | |
961 | ||
962 | return ret; | |
963 | } | |
964 | ||
965 | int RGWHTTPManager::remove_request(RGWHTTPClient *client) | |
966 | { | |
967 | rgw_http_req_data *req_data = client->get_req_data(); | |
968 | ||
11fdf7f2 | 969 | if (!is_started) { |
7c673cae FG |
970 | unlink_request(req_data); |
971 | return 0; | |
972 | } | |
11fdf7f2 TL |
973 | if (!unregister_request(req_data)) { |
974 | return 0; | |
975 | } | |
7c673cae FG |
976 | int ret = signal_thread(); |
977 | if (ret < 0) { | |
978 | return ret; | |
979 | } | |
980 | ||
981 | return 0; | |
982 | } | |
983 | ||
11fdf7f2 | 984 | int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state) |
7c673cae | 985 | { |
11fdf7f2 | 986 | rgw_http_req_data *req_data = client->get_req_data(); |
7c673cae | 987 | |
9f95a23c | 988 | ceph_assert(ceph_mutex_is_locked(req_data->lock)); |
7c673cae | 989 | |
11fdf7f2 TL |
990 | /* can only do that if threaded */ |
991 | if (!is_started) { | |
992 | return -EINVAL; | |
993 | } | |
7c673cae | 994 | |
11fdf7f2 TL |
995 | bool suggested_wr_paused = req_data->write_paused; |
996 | bool suggested_rd_paused = req_data->read_paused; | |
997 | ||
998 | switch (state) { | |
999 | case SET_WRITE_PAUSED: | |
1000 | suggested_wr_paused = true; | |
1001 | break; | |
1002 | case SET_WRITE_RESUME: | |
1003 | suggested_wr_paused = false; | |
1004 | break; | |
1005 | case SET_READ_PAUSED: | |
1006 | suggested_rd_paused = true; | |
1007 | break; | |
1008 | case SET_READ_RESUME: | |
1009 | suggested_rd_paused = false; | |
1010 | break; | |
1011 | default: | |
1012 | /* shouldn't really be here */ | |
1013 | return -EIO; | |
1014 | } | |
1015 | if (suggested_wr_paused == req_data->write_paused && | |
1016 | suggested_rd_paused == req_data->read_paused) { | |
1017 | return 0; | |
1018 | } | |
7c673cae | 1019 | |
11fdf7f2 TL |
1020 | req_data->write_paused = suggested_wr_paused; |
1021 | req_data->read_paused = suggested_rd_paused; | |
7c673cae | 1022 | |
11fdf7f2 | 1023 | int bitmask = CURLPAUSE_CONT; |
7c673cae | 1024 | |
11fdf7f2 TL |
1025 | if (req_data->write_paused) { |
1026 | bitmask |= CURLPAUSE_SEND; | |
1027 | } | |
7c673cae | 1028 | |
11fdf7f2 TL |
1029 | if (req_data->read_paused) { |
1030 | bitmask |= CURLPAUSE_RECV; | |
1031 | } | |
7c673cae | 1032 | |
11fdf7f2 TL |
1033 | reqs_change_state.push_back(set_state(req_data, bitmask)); |
1034 | int ret = signal_thread(); | |
1035 | if (ret < 0) { | |
1036 | return ret; | |
1037 | } | |
7c673cae | 1038 | |
11fdf7f2 | 1039 | return 0; |
7c673cae FG |
1040 | } |
1041 | ||
11fdf7f2 | 1042 | int RGWHTTPManager::start() |
7c673cae | 1043 | { |
9f95a23c | 1044 | if (pipe_cloexec(thread_pipe, 0) < 0) { |
91327a77 AA |
1045 | int e = errno; |
1046 | ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl; | |
1047 | return -e; | |
7c673cae FG |
1048 | } |
1049 | ||
1050 | // enable non-blocking reads | |
91327a77 AA |
1051 | if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) { |
1052 | int e = errno; | |
1053 | ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl; | |
7c673cae FG |
1054 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); |
1055 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
91327a77 | 1056 | return -e; |
7c673cae FG |
1057 | } |
1058 | ||
11fdf7f2 | 1059 | is_started = true; |
7c673cae FG |
1060 | reqs_thread = new ReqsThread(this); |
1061 | reqs_thread->create("http_manager"); | |
1062 | return 0; | |
1063 | } | |
1064 | ||
1065 | void RGWHTTPManager::stop() | |
1066 | { | |
1067 | if (is_stopped) { | |
1068 | return; | |
1069 | } | |
1070 | ||
1071 | is_stopped = true; | |
1072 | ||
11fdf7f2 | 1073 | if (is_started) { |
7c673cae FG |
1074 | going_down = true; |
1075 | signal_thread(); | |
1076 | reqs_thread->join(); | |
1077 | delete reqs_thread; | |
1078 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
1079 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); | |
1080 | } | |
1081 | } | |
1082 | ||
1083 | int RGWHTTPManager::signal_thread() | |
1084 | { | |
1085 | uint32_t buf = 0; | |
1086 | int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf)); | |
1087 | if (ret < 0) { | |
1088 | ret = -errno; | |
1089 | ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl; | |
1090 | return ret; | |
1091 | } | |
1092 | return 0; | |
1093 | } | |
1094 | ||
1095 | void *RGWHTTPManager::reqs_thread_entry() | |
1096 | { | |
1097 | int still_running; | |
1098 | int mstatus; | |
1099 | ||
1100 | ldout(cct, 20) << __func__ << ": start" << dendl; | |
1101 | ||
1102 | while (!going_down) { | |
1103 | int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]); | |
1104 | if (ret < 0) { | |
1105 | dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; | |
1106 | return NULL; | |
1107 | } | |
1108 | ||
1109 | manage_pending_requests(); | |
1110 | ||
1111 | mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); | |
1112 | switch (mstatus) { | |
1113 | case CURLM_OK: | |
1114 | case CURLM_CALL_MULTI_PERFORM: | |
1115 | break; | |
1116 | default: | |
1117 | dout(10) << "curl_multi_perform returned: " << mstatus << dendl; | |
1118 | break; | |
1119 | } | |
1120 | int msgs_left; | |
1121 | CURLMsg *msg; | |
1122 | while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { | |
1123 | if (msg->msg == CURLMSG_DONE) { | |
1124 | int result = msg->data.result; | |
1125 | CURL *e = msg->easy_handle; | |
1126 | rgw_http_req_data *req_data; | |
1127 | curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); | |
1128 | curl_multi_remove_handle((CURLM *)multi_handle, e); | |
1129 | ||
1130 | long http_status; | |
9f95a23c TL |
1131 | int status; |
1132 | if (!req_data->user_ret) { | |
1133 | curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); | |
1134 | ||
1135 | status = rgw_http_error_to_errno(http_status); | |
1136 | if (result != CURLE_OK && status == 0) { | |
1137 | dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl; | |
1138 | status = -EAGAIN; | |
1139 | } | |
1140 | } else { | |
1141 | status = *req_data->user_ret; | |
1142 | rgw_err err; | |
1143 | set_req_state_err(err, status, 0); | |
1144 | http_status = err.http_ret; | |
7c673cae FG |
1145 | } |
1146 | int id = req_data->id; | |
9f95a23c | 1147 | finish_request(req_data, status, http_status); |
7c673cae FG |
1148 | switch (result) { |
1149 | case CURLE_OK: | |
1150 | break; | |
1adf2230 AA |
1151 | case CURLE_OPERATION_TIMEDOUT: |
1152 | dout(0) << "WARNING: curl operation timed out, network average transfer speed less than " | |
1153 | << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl; | |
7c673cae FG |
1154 | default: |
1155 | dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl; | |
522d829b | 1156 | dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << " req_data->error_buf=" << req_data->error_buf << dendl; |
7c673cae FG |
1157 | break; |
1158 | } | |
1159 | } | |
1160 | } | |
1161 | } | |
1162 | ||
1163 | ||
9f95a23c | 1164 | std::unique_lock rl{reqs_lock}; |
7c673cae | 1165 | for (auto r : unregistered_reqs) { |
91327a77 | 1166 | _unlink_request(r); |
7c673cae FG |
1167 | } |
1168 | ||
1169 | unregistered_reqs.clear(); | |
1170 | ||
1171 | auto all_reqs = std::move(reqs); | |
1172 | for (auto iter : all_reqs) { | |
91327a77 | 1173 | _unlink_request(iter.second); |
7c673cae FG |
1174 | } |
1175 | ||
1176 | reqs.clear(); | |
1177 | ||
1178 | if (completion_mgr) { | |
1179 | completion_mgr->go_down(); | |
1180 | } | |
1181 | ||
1182 | return 0; | |
1183 | } | |
1184 | ||
11fdf7f2 TL |
1185 | void rgw_http_client_init(CephContext *cct) |
1186 | { | |
1187 | curl_global_init(CURL_GLOBAL_ALL); | |
1188 | rgw_http_manager = new RGWHTTPManager(cct); | |
1189 | rgw_http_manager->start(); | |
1190 | } | |
1191 | ||
1192 | void rgw_http_client_cleanup() | |
1193 | { | |
1194 | rgw_http_manager->stop(); | |
1195 | delete rgw_http_manager; | |
1196 | curl_global_cleanup(); | |
1197 | } | |
1198 | ||
1199 | ||
1200 | int RGWHTTP::send(RGWHTTPClient *req) { | |
1201 | if (!req) { | |
1202 | return 0; | |
1203 | } | |
1204 | int r = rgw_http_manager->add_request(req); | |
1205 | if (r < 0) { | |
1206 | return r; | |
1207 | } | |
1208 | ||
1209 | return 0; | |
1210 | } | |
1211 | ||
eafe8130 | 1212 | int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) { |
11fdf7f2 TL |
1213 | if (!req) { |
1214 | return 0; | |
1215 | } | |
1216 | int r = send(req); | |
1217 | if (r < 0) { | |
1218 | return r; | |
1219 | } | |
1220 | ||
eafe8130 | 1221 | return req->wait(y); |
11fdf7f2 | 1222 | } |
7c673cae | 1223 |