]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_http_client.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include "include/compat.h"
91327a77 5#include "common/errno.h"
7c673cae
FG
6
7#include <boost/utility/string_ref.hpp>
8
9#include <curl/curl.h>
10#include <curl/easy.h>
11#include <curl/multi.h>
12
13#include "rgw_common.h"
14#include "rgw_http_client.h"
15#include "rgw_http_errors.h"
eafe8130 16#include "common/async/completion.h"
7c673cae
FG
17#include "common/RefCountedObj.h"
18
19#include "rgw_coroutine.h"
9f95a23c 20#include "rgw_tools.h"
7c673cae
FG
21
22#include <atomic>
23
24#define dout_context g_ceph_context
25#define dout_subsys ceph_subsys_rgw
26
11fdf7f2
TL
27RGWHTTPManager *rgw_http_manager;
28
29struct RGWCurlHandle;
30
31static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
32
7c673cae 33struct rgw_http_req_data : public RefCountedObject {
11fdf7f2
TL
34 RGWCurlHandle *curl_handle{nullptr};
35 curl_slist *h{nullptr};
7c673cae 36 uint64_t id;
11fdf7f2 37 int ret{0};
7c673cae 38 std::atomic<bool> done = { false };
11fdf7f2
TL
39 RGWHTTPClient *client{nullptr};
40 rgw_io_id control_io_id;
41 void *user_info{nullptr};
42 bool registered{false};
43 RGWHTTPManager *mgr{nullptr};
7c673cae 44 char error_buf[CURL_ERROR_SIZE];
11fdf7f2
TL
45 bool write_paused{false};
46 bool read_paused{false};
7c673cae 47
9f95a23c
TL
48 optional<int> user_ret;
49
50 ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
51 ceph::condition_variable cond;
7c673cae 52
eafe8130
TL
53 using Signature = void(boost::system::error_code);
54 using Completion = ceph::async::Completion<Signature>;
55 std::unique_ptr<Completion> completion;
56
9f95a23c 57 rgw_http_req_data() : id(-1) {
92f5a8d4 58 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
59 memset(error_buf, 0, sizeof(error_buf));
60 }
61
eafe8130
TL
62 template <typename ExecutionContext, typename CompletionToken>
63 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
64 boost::asio::async_completion<CompletionToken, Signature> init(token);
65 auto& handler = init.completion_handler;
66 {
67 std::unique_lock l{lock};
68 completion = Completion::create(ctx.get_executor(), std::move(handler));
69 }
70 return init.result.get();
71 }
9f95a23c 72
eafe8130 73 int wait(optional_yield y) {
11fdf7f2
TL
74 if (done) {
75 return ret;
76 }
eafe8130
TL
77#ifdef HAVE_BOOST_CONTEXT
78 if (y) {
79 auto& context = y.get_io_context();
80 auto& yield = y.get_yield_context();
81 boost::system::error_code ec;
82 async_wait(context, yield[ec]);
83 return -ec.value();
84 }
9f95a23c
TL
85 // work on asio threads should be asynchronous, so warn when they block
86 if (is_asio_thread) {
87 dout(20) << "WARNING: blocking http request" << dendl;
88 }
eafe8130 89#endif
9f95a23c
TL
90 std::unique_lock l{lock};
91 cond.wait(l);
7c673cae
FG
92 return ret;
93 }
94
11fdf7f2 95 void set_state(int bitmask);
7c673cae 96
9f95a23c
TL
97 void finish(int r, long http_status = -1) {
98 std::lock_guard l{lock};
99 if (http_status != -1) {
100 if (client) {
101 client->set_http_status(http_status);
102 }
103 }
7c673cae 104 ret = r;
11fdf7f2
TL
105 if (curl_handle)
106 do_curl_easy_cleanup(curl_handle);
7c673cae
FG
107
108 if (h)
109 curl_slist_free_all(h);
110
11fdf7f2 111 curl_handle = NULL;
7c673cae
FG
112 h = NULL;
113 done = true;
eafe8130
TL
114 if (completion) {
115 boost::system::error_code ec(-ret, boost::system::system_category());
116 Completion::post(std::move(completion), ec);
117 } else {
9f95a23c 118 cond.notify_all();
eafe8130 119 }
11fdf7f2
TL
120 }
121
7c673cae
FG
122 bool is_done() {
123 return done;
124 }
125
126 int get_retcode() {
9f95a23c 127 std::lock_guard l{lock};
7c673cae
FG
128 return ret;
129 }
9f95a23c 130
7c673cae 131 RGWHTTPManager *get_manager() {
9f95a23c 132 std::lock_guard l{lock};
7c673cae
FG
133 return mgr;
134 }
11fdf7f2
TL
135
136 CURL *get_easy_handle() const;
7c673cae
FG
137};
138
94b18763
FG
139struct RGWCurlHandle {
140 int uses;
141 mono_time lastuse;
142 CURL* h;
143
11fdf7f2 144 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
94b18763
FG
145 CURL* operator*() {
146 return this->h;
147 }
148};
149
11fdf7f2
TL
150void rgw_http_req_data::set_state(int bitmask) {
151 /* no need to lock here, moreover curl_easy_pause() might trigger
152 * the data receive callback :/
153 */
154 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
155 if (rc != CURLE_OK) {
156 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
157 }
158}
159
94b18763
FG
160#define MAXIDLE 5
161class RGWCurlHandles : public Thread {
162public:
9f95a23c
TL
163 ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
164 std::vector<RGWCurlHandle*> saved_curl;
94b18763 165 int cleaner_shutdown;
9f95a23c 166 ceph::condition_variable cleaner_cond;
94b18763
FG
167
168 RGWCurlHandles() :
94b18763
FG
169 cleaner_shutdown{0} {
170 }
171
172 RGWCurlHandle* get_curl_handle();
173 void release_curl_handle_now(RGWCurlHandle* curl);
174 void release_curl_handle(RGWCurlHandle* curl);
175 void flush_curl_handles();
176 void* entry();
177 void stop();
178};
179
180RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
181 RGWCurlHandle* curl = 0;
182 CURL* h;
183 {
9f95a23c 184 std::lock_guard lock{cleaner_lock};
94b18763
FG
185 if (!saved_curl.empty()) {
186 curl = *saved_curl.begin();
187 saved_curl.erase(saved_curl.begin());
188 }
189 }
190 if (curl) {
191 } else if ((h = curl_easy_init())) {
192 curl = new RGWCurlHandle{h};
193 } else {
194 // curl = 0;
195 }
196 return curl;
197}
198
199void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
200{
201 curl_easy_cleanup(**curl);
202 delete curl;
203}
204
205void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
206{
207 if (cleaner_shutdown) {
208 release_curl_handle_now(curl);
209 } else {
210 curl_easy_reset(**curl);
9f95a23c 211 std::lock_guard lock{cleaner_lock};
94b18763
FG
212 curl->lastuse = mono_clock::now();
213 saved_curl.insert(saved_curl.begin(), 1, curl);
214 }
215}
216
217void* RGWCurlHandles::entry()
218{
219 RGWCurlHandle* curl;
9f95a23c 220 std::unique_lock lock{cleaner_lock};
94b18763
FG
221
222 for (;;) {
223 if (cleaner_shutdown) {
224 if (saved_curl.empty())
225 break;
226 } else {
9f95a23c 227 cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
94b18763
FG
228 }
229 mono_time now = mono_clock::now();
230 while (!saved_curl.empty()) {
231 auto cend = saved_curl.end();
232 --cend;
233 curl = *cend;
234 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
235 break;
236 saved_curl.erase(cend);
237 release_curl_handle_now(curl);
238 }
239 }
240 return nullptr;
241}
242
243void RGWCurlHandles::stop()
244{
9f95a23c 245 std::lock_guard lock{cleaner_lock};
94b18763 246 cleaner_shutdown = 1;
9f95a23c 247 cleaner_cond.notify_all();
94b18763
FG
248}
249
250void RGWCurlHandles::flush_curl_handles()
251{
252 stop();
253 join();
254 if (!saved_curl.empty()) {
255 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
256 }
257 saved_curl.shrink_to_fit();
258}
259
11fdf7f2
TL
260CURL *rgw_http_req_data::get_easy_handle() const
261{
262 return **curl_handle;
263}
264
94b18763 265static RGWCurlHandles *handles;
11fdf7f2
TL
266
267static RGWCurlHandle *do_curl_easy_init()
268{
269 return handles->get_curl_handle();
270}
271
272static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
273{
274 handles->release_curl_handle(curl_handle);
275}
276
94b18763
FG
277// XXX make this part of the token cache? (but that's swift-only;
278// and this especially needs to integrates with s3...)
279
280void rgw_setup_saved_curl_handles()
281{
282 handles = new RGWCurlHandles();
283 handles->create("rgw_curl");
284}
285
286void rgw_release_all_curl_handles()
287{
288 handles->flush_curl_handles();
289 delete handles;
290}
291
11fdf7f2 292void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
7c673cae 293{
11fdf7f2
TL
294 if (id == 0) {
295 id = io_id_provider.get_next();
7c673cae 296 }
7c673cae
FG
297}
298
299/*
300 * the following set of callbacks will be called either on RGWHTTPManager::process(),
301 * or via the RGWHTTPManager async processing.
302 */
303size_t RGWHTTPClient::receive_http_header(void * const ptr,
304 const size_t size,
305 const size_t nmemb,
306 void * const _info)
307{
308 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
309 size_t len = size * nmemb;
310
9f95a23c 311 std::lock_guard l{req_data->lock};
7c673cae
FG
312
313 if (!req_data->registered) {
314 return len;
315 }
316
317 int ret = req_data->client->receive_header(ptr, size * nmemb);
318 if (ret < 0) {
9f95a23c
TL
319 dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
320 req_data->user_ret = ret;
321 return CURLE_WRITE_ERROR;
7c673cae
FG
322 }
323
324 return len;
325}
326
327size_t RGWHTTPClient::receive_http_data(void * const ptr,
328 const size_t size,
329 const size_t nmemb,
330 void * const _info)
331{
332 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
333 size_t len = size * nmemb;
334
11fdf7f2
TL
335 bool pause = false;
336
337 RGWHTTPClient *client;
338
339 {
9f95a23c 340 std::lock_guard l{req_data->lock};
11fdf7f2
TL
341 if (!req_data->registered) {
342 return len;
343 }
344
345 client = req_data->client;
346 }
347
348 size_t& skip_bytes = client->receive_pause_skip;
349
350 if (skip_bytes >= len) {
351 skip_bytes -= len;
7c673cae
FG
352 return len;
353 }
11fdf7f2
TL
354
355 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
7c673cae 356 if (ret < 0) {
9f95a23c
TL
357 dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
358 req_data->user_ret = ret;
359 return CURLE_WRITE_ERROR;
7c673cae
FG
360 }
361
11fdf7f2
TL
362 if (pause) {
363 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
364 skip_bytes = len;
9f95a23c 365 std::lock_guard l{req_data->lock};
11fdf7f2
TL
366 req_data->read_paused = true;
367 return CURL_WRITEFUNC_PAUSE;
368 }
369
370 skip_bytes = 0;
371
7c673cae
FG
372 return len;
373}
374
375size_t RGWHTTPClient::send_http_data(void * const ptr,
376 const size_t size,
377 const size_t nmemb,
378 void * const _info)
379{
380 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
381
11fdf7f2
TL
382 RGWHTTPClient *client;
383
384 {
9f95a23c 385 std::lock_guard l{req_data->lock};
7c673cae 386
11fdf7f2
TL
387 if (!req_data->registered) {
388 return 0;
389 }
390
391 client = req_data->client;
7c673cae
FG
392 }
393
11fdf7f2
TL
394 bool pause = false;
395
396 int ret = client->send_data(ptr, size * nmemb, &pause);
7c673cae 397 if (ret < 0) {
9f95a23c
TL
398 dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl;
399 req_data->user_ret = ret;
400 return CURLE_READ_ERROR;
7c673cae
FG
401 }
402
11fdf7f2
TL
403 if (ret == 0 &&
404 pause) {
9f95a23c 405 std::lock_guard l{req_data->lock};
11fdf7f2
TL
406 req_data->write_paused = true;
407 return CURL_READFUNC_PAUSE;
408 }
409
7c673cae
FG
410 return ret;
411}
412
9f95a23c 413ceph::mutex& RGWHTTPClient::get_req_lock()
11fdf7f2
TL
414{
415 return req_data->lock;
416}
417
418void RGWHTTPClient::_set_write_paused(bool pause)
419{
9f95a23c 420 ceph_assert(ceph_mutex_is_locked(req_data->lock));
11fdf7f2
TL
421
422 RGWHTTPManager *mgr = req_data->mgr;
423 if (pause == req_data->write_paused) {
424 return;
425 }
426 if (pause) {
427 mgr->set_request_state(this, SET_WRITE_PAUSED);
428 } else {
429 mgr->set_request_state(this, SET_WRITE_RESUME);
430 }
431}
432
433void RGWHTTPClient::_set_read_paused(bool pause)
434{
9f95a23c 435 ceph_assert(ceph_mutex_is_locked(req_data->lock));
11fdf7f2
TL
436
437 RGWHTTPManager *mgr = req_data->mgr;
438 if (pause == req_data->read_paused) {
439 return;
440 }
441 if (pause) {
442 mgr->set_request_state(this, SET_READ_PAUSED);
443 } else {
444 mgr->set_request_state(this, SET_READ_RESUME);
445 }
446}
447
7c673cae
FG
448static curl_slist *headers_to_slist(param_vec_t& headers)
449{
450 curl_slist *h = NULL;
451
452 param_vec_t::iterator iter;
453 for (iter = headers.begin(); iter != headers.end(); ++iter) {
454 pair<string, string>& p = *iter;
455 string val = p.first;
456
457 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
458 val = val.substr(5);
459 }
460
461 /* we need to convert all underscores into dashes as some web servers forbid them
462 * in the http header field names
463 */
464 for (size_t i = 0; i < val.size(); i++) {
465 if (val[i] == '_') {
466 val[i] = '-';
467 }
468 }
11fdf7f2
TL
469
470 val = camelcase_dash_http_attr(val);
7c673cae 471
28e407b8
AA
472 // curl won't send headers with empty values unless it ends with a ; instead
473 if (p.second.empty()) {
474 val.append(1, ';');
475 } else {
476 val.append(": ");
477 val.append(p.second);
478 }
7c673cae
FG
479 h = curl_slist_append(h, val.c_str());
480 }
481
482 return h;
483}
484
11fdf7f2 485static bool is_upload_request(const string& method)
7c673cae 486{
11fdf7f2 487 return method == "POST" || method == "PUT";
7c673cae
FG
488}
489
490/*
11fdf7f2 491 * process a single simple one off request
7c673cae 492 */
eafe8130 493int RGWHTTPClient::process(optional_yield y)
7c673cae 494{
eafe8130 495 return RGWHTTP::process(this, y);
7c673cae
FG
496}
497
498string RGWHTTPClient::to_str()
499{
11fdf7f2
TL
500 string method_str = (method.empty() ? "<no-method>" : method);
501 string url_str = (url.empty() ? "<no-url>" : url);
7c673cae
FG
502 return method_str + " " + url_str;
503}
504
505int RGWHTTPClient::get_req_retcode()
506{
507 if (!req_data) {
508 return -EINVAL;
509 }
510
511 return req_data->get_retcode();
512}
513
514/*
515 * init request, will be used later with RGWHTTPManager
516 */
11fdf7f2 517int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
7c673cae 518{
11fdf7f2 519 ceph_assert(!req_data);
7c673cae
FG
520 _req_data->get();
521 req_data = _req_data;
522
11fdf7f2 523 req_data->curl_handle = do_curl_easy_init();
7c673cae 524
11fdf7f2 525 CURL *easy_handle = req_data->get_easy_handle();
7c673cae
FG
526
527 dout(20) << "sending request to " << url << dendl;
528
529 curl_slist *h = headers_to_slist(headers);
530
531 req_data->h = h;
532
11fdf7f2
TL
533 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
534 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
7c673cae
FG
535 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
536 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
537 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
538 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
539 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
540 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
541 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
1adf2230
AA
542 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
543 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
7c673cae
FG
544 if (h) {
545 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
546 }
547 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
548 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
31f18b77 549 if (send_data_hint || is_upload_request(method)) {
7c673cae
FG
550 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
551 }
552 if (has_send_len) {
553 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
554 }
31f18b77
FG
555 if (!verify_ssl) {
556 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
557 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
558 dout(20) << "ssl verification is set to off" << dendl;
559 }
7c673cae
FG
560 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
561
562 return 0;
563}
564
11fdf7f2
TL
565bool RGWHTTPClient::is_done()
566{
567 return req_data->is_done();
568}
569
7c673cae
FG
570/*
571 * wait for async request to complete
572 */
eafe8130 573int RGWHTTPClient::wait(optional_yield y)
7c673cae 574{
eafe8130 575 return req_data->wait(y);
7c673cae
FG
576}
577
11fdf7f2 578void RGWHTTPClient::cancel()
7c673cae
FG
579{
580 if (req_data) {
11fdf7f2 581 RGWHTTPManager *http_manager = req_data->mgr;
7c673cae
FG
582 if (http_manager) {
583 http_manager->remove_request(this);
584 }
11fdf7f2
TL
585 }
586}
7c673cae 587
11fdf7f2
TL
588RGWHTTPClient::~RGWHTTPClient()
589{
590 cancel();
591 if (req_data) {
7c673cae
FG
592 req_data->put();
593 }
594}
595
596
597int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
598{
11fdf7f2 599 const boost::string_ref header_line(static_cast<const char *>(ptr), len);
7c673cae
FG
600
601 /* We're tokening the line that way due to backward compatibility. */
602 const size_t sep_loc = header_line.find_first_of(" \t:");
603
604 if (boost::string_ref::npos == sep_loc) {
605 /* Wrongly formatted header? Just skip it. */
606 return 0;
607 }
608
609 header_name_t name(header_line.substr(0, sep_loc));
610 if (0 == relevant_headers.count(name)) {
611 /* Not interested in this particular header. */
612 return 0;
613 }
614
615 const auto value_part = header_line.substr(sep_loc + 1);
616
617 /* Skip spaces and tabs after the separator. */
618 const size_t val_loc_s = value_part.find_first_not_of(' ');
619 const size_t val_loc_e = value_part.find_first_of("\r\n");
620
621 if (boost::string_ref::npos == val_loc_s ||
622 boost::string_ref::npos == val_loc_e) {
623 /* Empty value case. */
624 found_headers.emplace(name, header_value_t());
625 } else {
626 found_headers.emplace(name, header_value_t(
627 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
628 }
629
630 return 0;
631}
632
11fdf7f2 633int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
7c673cae
FG
634{
635 int length_to_copy = 0;
636 if (post_data_index < post_data.length()) {
637 length_to_copy = min(post_data.length() - post_data_index, len);
638 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
639 post_data_index += length_to_copy;
640 }
641 return length_to_copy;
642}
643
644
645static int clear_signal(int fd)
646{
647 // since we're in non-blocking mode, we can try to read a lot more than
648 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
649 // are also required to support the curl_multi_wait bug workaround
650 std::array<char, 256> buf;
651 int ret = ::read(fd, (void *)buf.data(), buf.size());
652 if (ret < 0) {
653 ret = -errno;
654 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
655 }
656 return 0;
657}
658
659#if HAVE_CURL_MULTI_WAIT
660
661static std::once_flag detect_flag;
662static bool curl_multi_wait_bug_present = false;
663
664static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
665 int write_fd, int read_fd)
666{
667 int ret = 0;
668
669 // write to write_fd so that read_fd becomes readable
670 uint32_t buf = 0;
671 ret = ::write(write_fd, &buf, sizeof(buf));
672 if (ret < 0) {
673 ret = -errno;
674 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
675 return ret;
676 }
677
678 // pass read_fd in extra_fds for curl_multi_wait()
679 int num_fds;
680 struct curl_waitfd wait_fd;
681
682 wait_fd.fd = read_fd;
683 wait_fd.events = CURL_WAIT_POLLIN;
684 wait_fd.revents = 0;
685
686 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
687 if (ret != CURLM_OK) {
688 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
689 return -EIO;
690 }
691
692 // curl_multi_wait should flag revents when extra_fd is readable. if it
693 // doesn't, the bug is present and we can't rely on revents
694 if (wait_fd.revents == 0) {
695 curl_multi_wait_bug_present = true;
696 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
697 "bug in curl_multi_wait(). enabling a workaround that may degrade "
698 "performance slightly." << dendl;
699 }
700
701 return clear_signal(read_fd);
702}
703
704static bool is_signaled(const curl_waitfd& wait_fd)
705{
706 if (wait_fd.fd < 0) {
707 // no fd to signal
708 return false;
709 }
710
711 if (curl_multi_wait_bug_present) {
712 // we can't rely on revents, so we always return true if a wait_fd is given.
713 // this means we'll be trying a non-blocking read on this fd every time that
714 // curl_multi_wait() wakes up
715 return true;
716 }
717
718 return wait_fd.revents > 0;
719}
720
721static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
722{
723 int num_fds;
724 struct curl_waitfd wait_fd;
725
726 wait_fd.fd = signal_fd;
727 wait_fd.events = CURL_WAIT_POLLIN;
728 wait_fd.revents = 0;
729
730 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
731 if (ret) {
732 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
733 return -EIO;
734 }
735
736 if (is_signaled(wait_fd)) {
737 ret = clear_signal(signal_fd);
738 if (ret < 0) {
739 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
740 return ret;
741 }
742 }
743 return 0;
744}
745
746#else
747
748static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
749{
750 fd_set fdread;
751 fd_set fdwrite;
752 fd_set fdexcep;
753 int maxfd = -1;
754
755 FD_ZERO(&fdread);
756 FD_ZERO(&fdwrite);
757 FD_ZERO(&fdexcep);
758
759 /* get file descriptors from the transfers */
760 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
761 if (ret) {
762 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
763 return -EIO;
764 }
765
766 if (signal_fd > 0) {
767 FD_SET(signal_fd, &fdread);
768 if (signal_fd >= maxfd) {
769 maxfd = signal_fd + 1;
770 }
771 }
772
773 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
774 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
775#define RGW_CURL_TIMEOUT 1000
776 if (!to)
777 to = RGW_CURL_TIMEOUT;
778 struct timeval timeout;
779 timeout.tv_sec = to / 1000;
780 timeout.tv_usec = to % 1000;
781
782 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
783 if (ret < 0) {
784 ret = -errno;
785 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
786 return ret;
787 }
788
789 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
790 ret = clear_signal(signal_fd);
791 if (ret < 0) {
792 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
793 return ret;
794 }
795 }
796
797 return 0;
798}
799
800#endif
801
802void *RGWHTTPManager::ReqsThread::entry()
803{
804 manager->reqs_thread_entry();
805 return NULL;
806}
807
808/*
809 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
810 */
811RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
9f95a23c 812 completion_mgr(_cm)
7c673cae
FG
813{
814 multi_handle = (void *)curl_multi_init();
815 thread_pipe[0] = -1;
816 thread_pipe[1] = -1;
817}
818
819RGWHTTPManager::~RGWHTTPManager() {
820 stop();
821 if (multi_handle)
822 curl_multi_cleanup((CURLM *)multi_handle);
823}
824
825void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
826{
9f95a23c 827 std::unique_lock rl{reqs_lock};
7c673cae
FG
828 req_data->id = num_reqs;
829 req_data->registered = true;
830 reqs[num_reqs] = req_data;
831 num_reqs++;
11fdf7f2 832 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
7c673cae
FG
833}
834
11fdf7f2 835bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
7c673cae 836{
9f95a23c 837 std::unique_lock rl{reqs_lock};
11fdf7f2
TL
838 if (!req_data->registered) {
839 return false;
840 }
7c673cae
FG
841 req_data->get();
842 req_data->registered = false;
843 unregistered_reqs.push_back(req_data);
11fdf7f2
TL
844 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
845 return true;
7c673cae
FG
846}
847
848void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
849{
9f95a23c 850 std::unique_lock rl{reqs_lock};
7c673cae
FG
851 _complete_request(req_data);
852}
853
854void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
855{
856 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
857 if (iter != reqs.end()) {
858 reqs.erase(iter);
859 }
860 {
9f95a23c 861 std::lock_guard l{req_data->lock};
7c673cae
FG
862 req_data->mgr = nullptr;
863 }
864 if (completion_mgr) {
11fdf7f2 865 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
7c673cae
FG
866 }
867
868 req_data->put();
869}
870
9f95a23c 871void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status)
7c673cae 872{
9f95a23c 873 req_data->finish(ret, http_status);
7c673cae
FG
874 complete_request(req_data);
875}
876
877void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
878{
879 req_data->finish(ret);
880 _complete_request(req_data);
881}
882
11fdf7f2
TL
883void RGWHTTPManager::_set_req_state(set_state& ss)
884{
885 ss.req->set_state(ss.bitmask);
886}
7c673cae
FG
887/*
888 * hook request to the curl multi handle
889 */
890int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
891{
11fdf7f2
TL
892 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
893 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
7c673cae
FG
894 if (mstatus) {
895 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
896 return -EIO;
897 }
898 return 0;
899}
900
901/*
902 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
903 * there will be no more processing on this request
904 */
905void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
906{
11fdf7f2
TL
907 if (req_data->curl_handle) {
908 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
7c673cae 909 }
eafe8130 910 if (!req_data->is_done()) {
7c673cae
FG
911 _finish_request(req_data, -ECANCELED);
912 }
913}
914
915void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
916{
9f95a23c 917 std::unique_lock wl{reqs_lock};
7c673cae
FG
918 _unlink_request(req_data);
919}
920
921void RGWHTTPManager::manage_pending_requests()
922{
9f95a23c 923 reqs_lock.lock_shared();
11fdf7f2
TL
924 if (max_threaded_req == num_reqs &&
925 unregistered_reqs.empty() &&
926 reqs_change_state.empty()) {
9f95a23c 927 reqs_lock.unlock_shared();
7c673cae
FG
928 return;
929 }
9f95a23c 930 reqs_lock.unlock_shared();
7c673cae 931
9f95a23c 932 std::unique_lock wl{reqs_lock};
7c673cae 933
f6b5b4d7
TL
934 if (!reqs_change_state.empty()) {
935 for (auto siter : reqs_change_state) {
936 _set_req_state(siter);
937 }
938 reqs_change_state.clear();
939 }
940
7c673cae
FG
941 if (!unregistered_reqs.empty()) {
942 for (auto& r : unregistered_reqs) {
943 _unlink_request(r);
944 r->put();
945 }
946
947 unregistered_reqs.clear();
948 }
949
950 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
951
952 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
953
954 for (; iter != reqs.end(); ++iter) {
955 rgw_http_req_data *req_data = iter->second;
956 int r = link_request(req_data);
957 if (r < 0) {
958 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
959 remove_reqs.push_back(std::make_pair(iter->second, r));
960 } else {
961 max_threaded_req = iter->first + 1;
962 }
963 }
964
965 for (auto piter : remove_reqs) {
966 rgw_http_req_data *req_data = piter.first;
967 int r = piter.second;
968
969 _finish_request(req_data, r);
970 }
971}
972
11fdf7f2 973int RGWHTTPManager::add_request(RGWHTTPClient *client)
7c673cae
FG
974{
975 rgw_http_req_data *req_data = new rgw_http_req_data;
976
11fdf7f2 977 int ret = client->init_request(req_data);
7c673cae
FG
978 if (ret < 0) {
979 req_data->put();
980 req_data = NULL;
981 return ret;
982 }
983
984 req_data->mgr = this;
985 req_data->client = client;
11fdf7f2
TL
986 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
987 req_data->user_info = client->get_io_user_info();
7c673cae
FG
988
989 register_request(req_data);
990
11fdf7f2 991 if (!is_started) {
7c673cae
FG
992 ret = link_request(req_data);
993 if (ret < 0) {
994 req_data->put();
995 req_data = NULL;
996 }
997 return ret;
998 }
999 ret = signal_thread();
1000 if (ret < 0) {
1001 finish_request(req_data, ret);
1002 }
1003
1004 return ret;
1005}
1006
1007int RGWHTTPManager::remove_request(RGWHTTPClient *client)
1008{
1009 rgw_http_req_data *req_data = client->get_req_data();
1010
11fdf7f2 1011 if (!is_started) {
7c673cae
FG
1012 unlink_request(req_data);
1013 return 0;
1014 }
11fdf7f2
TL
1015 if (!unregister_request(req_data)) {
1016 return 0;
1017 }
7c673cae
FG
1018 int ret = signal_thread();
1019 if (ret < 0) {
1020 return ret;
1021 }
1022
1023 return 0;
1024}
1025
11fdf7f2 1026int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
7c673cae 1027{
11fdf7f2 1028 rgw_http_req_data *req_data = client->get_req_data();
7c673cae 1029
9f95a23c 1030 ceph_assert(ceph_mutex_is_locked(req_data->lock));
7c673cae 1031
11fdf7f2
TL
1032 /* can only do that if threaded */
1033 if (!is_started) {
1034 return -EINVAL;
1035 }
7c673cae 1036
11fdf7f2
TL
1037 bool suggested_wr_paused = req_data->write_paused;
1038 bool suggested_rd_paused = req_data->read_paused;
1039
1040 switch (state) {
1041 case SET_WRITE_PAUSED:
1042 suggested_wr_paused = true;
1043 break;
1044 case SET_WRITE_RESUME:
1045 suggested_wr_paused = false;
1046 break;
1047 case SET_READ_PAUSED:
1048 suggested_rd_paused = true;
1049 break;
1050 case SET_READ_RESUME:
1051 suggested_rd_paused = false;
1052 break;
1053 default:
1054 /* shouldn't really be here */
1055 return -EIO;
1056 }
1057 if (suggested_wr_paused == req_data->write_paused &&
1058 suggested_rd_paused == req_data->read_paused) {
1059 return 0;
1060 }
7c673cae 1061
11fdf7f2
TL
1062 req_data->write_paused = suggested_wr_paused;
1063 req_data->read_paused = suggested_rd_paused;
7c673cae 1064
11fdf7f2 1065 int bitmask = CURLPAUSE_CONT;
7c673cae 1066
11fdf7f2
TL
1067 if (req_data->write_paused) {
1068 bitmask |= CURLPAUSE_SEND;
1069 }
7c673cae 1070
11fdf7f2
TL
1071 if (req_data->read_paused) {
1072 bitmask |= CURLPAUSE_RECV;
1073 }
7c673cae 1074
11fdf7f2
TL
1075 reqs_change_state.push_back(set_state(req_data, bitmask));
1076 int ret = signal_thread();
1077 if (ret < 0) {
1078 return ret;
1079 }
7c673cae 1080
11fdf7f2 1081 return 0;
7c673cae
FG
1082}
1083
11fdf7f2 1084int RGWHTTPManager::start()
7c673cae 1085{
9f95a23c 1086 if (pipe_cloexec(thread_pipe, 0) < 0) {
91327a77
AA
1087 int e = errno;
1088 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1089 return -e;
7c673cae
FG
1090 }
1091
1092 // enable non-blocking reads
91327a77
AA
1093 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1094 int e = errno;
1095 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
7c673cae
FG
1096 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1097 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
91327a77 1098 return -e;
7c673cae
FG
1099 }
1100
1101#ifdef HAVE_CURL_MULTI_WAIT
1102 // on first initialization, use this pipe to detect whether we're using a
1103 // buggy version of libcurl
1104 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1105 static_cast<CURLM*>(multi_handle),
1106 thread_pipe[1], thread_pipe[0]);
1107#endif
1108
11fdf7f2 1109 is_started = true;
7c673cae
FG
1110 reqs_thread = new ReqsThread(this);
1111 reqs_thread->create("http_manager");
1112 return 0;
1113}
1114
1115void RGWHTTPManager::stop()
1116{
1117 if (is_stopped) {
1118 return;
1119 }
1120
1121 is_stopped = true;
1122
11fdf7f2 1123 if (is_started) {
7c673cae
FG
1124 going_down = true;
1125 signal_thread();
1126 reqs_thread->join();
1127 delete reqs_thread;
1128 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1129 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1130 }
1131}
1132
1133int RGWHTTPManager::signal_thread()
1134{
1135 uint32_t buf = 0;
1136 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1137 if (ret < 0) {
1138 ret = -errno;
1139 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1140 return ret;
1141 }
1142 return 0;
1143}
1144
1145void *RGWHTTPManager::reqs_thread_entry()
1146{
1147 int still_running;
1148 int mstatus;
1149
1150 ldout(cct, 20) << __func__ << ": start" << dendl;
1151
1152 while (!going_down) {
1153 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1154 if (ret < 0) {
1155 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1156 return NULL;
1157 }
1158
1159 manage_pending_requests();
1160
1161 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1162 switch (mstatus) {
1163 case CURLM_OK:
1164 case CURLM_CALL_MULTI_PERFORM:
1165 break;
1166 default:
1167 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1168 break;
1169 }
1170 int msgs_left;
1171 CURLMsg *msg;
1172 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1173 if (msg->msg == CURLMSG_DONE) {
1174 int result = msg->data.result;
1175 CURL *e = msg->easy_handle;
1176 rgw_http_req_data *req_data;
1177 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1178 curl_multi_remove_handle((CURLM *)multi_handle, e);
1179
1180 long http_status;
9f95a23c
TL
1181 int status;
1182 if (!req_data->user_ret) {
1183 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1184
1185 status = rgw_http_error_to_errno(http_status);
1186 if (result != CURLE_OK && status == 0) {
1187 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1188 status = -EAGAIN;
1189 }
1190 } else {
1191 status = *req_data->user_ret;
1192 rgw_err err;
1193 set_req_state_err(err, status, 0);
1194 http_status = err.http_ret;
7c673cae
FG
1195 }
1196 int id = req_data->id;
9f95a23c 1197 finish_request(req_data, status, http_status);
7c673cae
FG
1198 switch (result) {
1199 case CURLE_OK:
1200 break;
1adf2230
AA
1201 case CURLE_OPERATION_TIMEDOUT:
1202 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1203 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
7c673cae
FG
1204 default:
1205 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
11fdf7f2 1206 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
7c673cae
FG
1207 break;
1208 }
1209 }
1210 }
1211 }
1212
1213
9f95a23c 1214 std::unique_lock rl{reqs_lock};
7c673cae 1215 for (auto r : unregistered_reqs) {
91327a77 1216 _unlink_request(r);
7c673cae
FG
1217 }
1218
1219 unregistered_reqs.clear();
1220
1221 auto all_reqs = std::move(reqs);
1222 for (auto iter : all_reqs) {
91327a77 1223 _unlink_request(iter.second);
7c673cae
FG
1224 }
1225
1226 reqs.clear();
1227
1228 if (completion_mgr) {
1229 completion_mgr->go_down();
1230 }
1231
1232 return 0;
1233}
1234
11fdf7f2
TL
1235void rgw_http_client_init(CephContext *cct)
1236{
1237 curl_global_init(CURL_GLOBAL_ALL);
1238 rgw_http_manager = new RGWHTTPManager(cct);
1239 rgw_http_manager->start();
1240}
1241
1242void rgw_http_client_cleanup()
1243{
1244 rgw_http_manager->stop();
1245 delete rgw_http_manager;
1246 curl_global_cleanup();
1247}
1248
1249
1250int RGWHTTP::send(RGWHTTPClient *req) {
1251 if (!req) {
1252 return 0;
1253 }
1254 int r = rgw_http_manager->add_request(req);
1255 if (r < 0) {
1256 return r;
1257 }
1258
1259 return 0;
1260}
1261
eafe8130 1262int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
11fdf7f2
TL
1263 if (!req) {
1264 return 0;
1265 }
1266 int r = send(req);
1267 if (r < 0) {
1268 return r;
1269 }
1270
eafe8130 1271 return req->wait(y);
11fdf7f2 1272}
7c673cae 1273