]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_http_client.cc
update sources to v12.2.5
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/compat.h"
5
6#include <boost/utility/string_ref.hpp>
7
8#include <curl/curl.h>
9#include <curl/easy.h>
10#include <curl/multi.h>
11
12#include "rgw_common.h"
13#include "rgw_http_client.h"
14#include "rgw_http_errors.h"
15#include "common/RefCountedObj.h"
16
17#include "rgw_coroutine.h"
18
19#include <atomic>
20
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_rgw
23
24struct rgw_http_req_data : public RefCountedObject {
25 CURL *easy_handle;
26 curl_slist *h;
27 uint64_t id;
28 int ret;
29 std::atomic<bool> done = { false };
30 RGWHTTPClient *client;
31 void *user_info;
32 bool registered;
33 RGWHTTPManager *mgr;
34 char error_buf[CURL_ERROR_SIZE];
35
36 Mutex lock;
37 Cond cond;
38
39 rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
40 client(nullptr), user_info(nullptr), registered(false),
41 mgr(NULL), lock("rgw_http_req_data::lock") {
42 memset(error_buf, 0, sizeof(error_buf));
43 }
44
45 int wait() {
46 Mutex::Locker l(lock);
47 cond.Wait(lock);
48 return ret;
49 }
50
51
52 void finish(int r) {
53 Mutex::Locker l(lock);
54 ret = r;
55 if (easy_handle)
56 curl_easy_cleanup(easy_handle);
57
58 if (h)
59 curl_slist_free_all(h);
60
61 easy_handle = NULL;
62 h = NULL;
63 done = true;
64 cond.Signal();
65 }
66
67 bool is_done() {
68 return done;
69 }
70
71 int get_retcode() {
72 Mutex::Locker l(lock);
73 return ret;
74 }
75
76 RGWHTTPManager *get_manager() {
77 Mutex::Locker l(lock);
78 return mgr;
79 }
80};
81
94b18763
FG
82struct RGWCurlHandle {
83 int uses;
84 mono_time lastuse;
85 CURL* h;
86
87 RGWCurlHandle(CURL* h) : uses(0), h(h) {};
88 CURL* operator*() {
89 return this->h;
90 }
91};
92
93#define MAXIDLE 5
94class RGWCurlHandles : public Thread {
95public:
96 Mutex cleaner_lock;
97 std::vector<RGWCurlHandle*>saved_curl;
98 int cleaner_shutdown;
99 Cond cleaner_cond;
100
101 RGWCurlHandles() :
102 cleaner_lock{"RGWCurlHandles::cleaner_lock"},
103 cleaner_shutdown{0} {
104 }
105
106 RGWCurlHandle* get_curl_handle();
107 void release_curl_handle_now(RGWCurlHandle* curl);
108 void release_curl_handle(RGWCurlHandle* curl);
109 void flush_curl_handles();
110 void* entry();
111 void stop();
112};
113
114RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
115 RGWCurlHandle* curl = 0;
116 CURL* h;
117 {
118 Mutex::Locker lock(cleaner_lock);
119 if (!saved_curl.empty()) {
120 curl = *saved_curl.begin();
121 saved_curl.erase(saved_curl.begin());
122 }
123 }
124 if (curl) {
125 } else if ((h = curl_easy_init())) {
126 curl = new RGWCurlHandle{h};
127 } else {
128 // curl = 0;
129 }
130 return curl;
131}
132
133void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
134{
135 curl_easy_cleanup(**curl);
136 delete curl;
137}
138
139void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
140{
141 if (cleaner_shutdown) {
142 release_curl_handle_now(curl);
143 } else {
144 curl_easy_reset(**curl);
145 Mutex::Locker lock(cleaner_lock);
146 curl->lastuse = mono_clock::now();
147 saved_curl.insert(saved_curl.begin(), 1, curl);
148 }
149}
150
151void* RGWCurlHandles::entry()
152{
153 RGWCurlHandle* curl;
154 Mutex::Locker lock(cleaner_lock);
155
156 for (;;) {
157 if (cleaner_shutdown) {
158 if (saved_curl.empty())
159 break;
160 } else {
161 utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0);
162 cleaner_cond.WaitUntil(cleaner_lock, release);
163 }
164 mono_time now = mono_clock::now();
165 while (!saved_curl.empty()) {
166 auto cend = saved_curl.end();
167 --cend;
168 curl = *cend;
169 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
170 break;
171 saved_curl.erase(cend);
172 release_curl_handle_now(curl);
173 }
174 }
175 return nullptr;
176}
177
178void RGWCurlHandles::stop()
179{
180 Mutex::Locker lock(cleaner_lock);
181 cleaner_shutdown = 1;
182 cleaner_cond.Signal();
183}
184
185void RGWCurlHandles::flush_curl_handles()
186{
187 stop();
188 join();
189 if (!saved_curl.empty()) {
190 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
191 }
192 saved_curl.shrink_to_fit();
193}
194
195static RGWCurlHandles *handles;
196// XXX make this part of the token cache? (but that's swift-only;
197// and this especially needs to integrates with s3...)
198
199void rgw_setup_saved_curl_handles()
200{
201 handles = new RGWCurlHandles();
202 handles->create("rgw_curl");
203}
204
205void rgw_release_all_curl_handles()
206{
207 handles->flush_curl_handles();
208 delete handles;
209}
210
7c673cae
FG
211/*
212 * the simple set of callbacks will be called on RGWHTTPClient::process()
213 */
214/* Static methods - callbacks for libcurl. */
215size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
216 const size_t size,
217 const size_t nmemb,
218 void * const _info)
219{
220 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
221 const size_t len = size * nmemb;
222 int ret = client->receive_header(ptr, size * nmemb);
223 if (ret < 0) {
224 dout(0) << "WARNING: client->receive_header() returned ret="
225 << ret << dendl;
226 }
227
228 return len;
229}
230
231size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
232 const size_t size,
233 const size_t nmemb,
234 void * const _info)
235{
236 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
237 const size_t len = size * nmemb;
238 int ret = client->receive_data(ptr, size * nmemb);
239 if (ret < 0) {
240 dout(0) << "WARNING: client->receive_data() returned ret="
241 << ret << dendl;
242 }
243
244 return len;
245}
246
247size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
248 const size_t size,
249 const size_t nmemb,
250 void * const _info)
251{
252 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
253 int ret = client->send_data(ptr, size * nmemb);
254 if (ret < 0) {
255 dout(0) << "WARNING: client->send_data() returned ret="
256 << ret << dendl;
257 }
258
259 return ret;
260}
261
262/*
263 * the following set of callbacks will be called either on RGWHTTPManager::process(),
264 * or via the RGWHTTPManager async processing.
265 */
266size_t RGWHTTPClient::receive_http_header(void * const ptr,
267 const size_t size,
268 const size_t nmemb,
269 void * const _info)
270{
271 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
272 size_t len = size * nmemb;
273
274 Mutex::Locker l(req_data->lock);
275
276 if (!req_data->registered) {
277 return len;
278 }
279
280 int ret = req_data->client->receive_header(ptr, size * nmemb);
281 if (ret < 0) {
282 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
283 }
284
285 return len;
286}
287
288size_t RGWHTTPClient::receive_http_data(void * const ptr,
289 const size_t size,
290 const size_t nmemb,
291 void * const _info)
292{
293 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
294 size_t len = size * nmemb;
295
296 Mutex::Locker l(req_data->lock);
297
298 if (!req_data->registered) {
299 return len;
300 }
301
302 int ret = req_data->client->receive_data(ptr, size * nmemb);
303 if (ret < 0) {
304 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
305 }
306
307 return len;
308}
309
310size_t RGWHTTPClient::send_http_data(void * const ptr,
311 const size_t size,
312 const size_t nmemb,
313 void * const _info)
314{
315 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
316
317 Mutex::Locker l(req_data->lock);
318
319 if (!req_data->registered) {
320 return 0;
321 }
322
323 int ret = req_data->client->send_data(ptr, size * nmemb);
324 if (ret < 0) {
325 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
326 }
327
328 return ret;
329}
330
331static curl_slist *headers_to_slist(param_vec_t& headers)
332{
333 curl_slist *h = NULL;
334
335 param_vec_t::iterator iter;
336 for (iter = headers.begin(); iter != headers.end(); ++iter) {
337 pair<string, string>& p = *iter;
338 string val = p.first;
339
340 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
341 val = val.substr(5);
342 }
343
344 /* we need to convert all underscores into dashes as some web servers forbid them
345 * in the http header field names
346 */
347 for (size_t i = 0; i < val.size(); i++) {
348 if (val[i] == '_') {
349 val[i] = '-';
350 }
351 }
352
353 val.append(": ");
354 val.append(p.second);
355 h = curl_slist_append(h, val.c_str());
356 }
357
358 return h;
359}
360
361static bool is_upload_request(const char *method)
362{
363 if (method == nullptr) {
364 return false;
365 }
366 return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
367}
368
369/*
370 * process a single simple one off request, not going through RGWHTTPManager. Not using
371 * req_data.
372 */
373int RGWHTTPClient::process(const char *method, const char *url)
374{
375 int ret = 0;
376 CURL *curl_handle;
377
378 char error_buf[CURL_ERROR_SIZE];
379
380 last_method = (method ? method : "");
381 last_url = (url ? url : "");
382
94b18763
FG
383 auto ca = handles->get_curl_handle();
384 curl_handle = **ca;
7c673cae
FG
385
386 dout(20) << "sending request to " << url << dendl;
387
388 curl_slist *h = headers_to_slist(headers);
389
390 curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
391 curl_easy_setopt(curl_handle, CURLOPT_URL, url);
392 curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
393 curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
394 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
395 curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
396 curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
397 curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
398 curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
399 if (h) {
400 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
401 }
402 curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
403 curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
404 if (is_upload_request(method)) {
405 curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
406 }
407 if (has_send_len) {
408 curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
409 }
410 if (!verify_ssl) {
411 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
412 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
413 dout(20) << "ssl verification is set to off" << dendl;
414 }
415
416 CURLcode status = curl_easy_perform(curl_handle);
417 if (status) {
418 dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
419 ret = -EINVAL;
420 }
421 curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
94b18763 422 handles->release_curl_handle(ca);
7c673cae
FG
423 curl_slist_free_all(h);
424
425 return ret;
426}
427
428string RGWHTTPClient::to_str()
429{
430 string method_str = (last_method.empty() ? "<no-method>" : last_method);
431 string url_str = (last_url.empty() ? "<no-url>" : last_url);
432 return method_str + " " + url_str;
433}
434
435int RGWHTTPClient::get_req_retcode()
436{
437 if (!req_data) {
438 return -EINVAL;
439 }
440
441 return req_data->get_retcode();
442}
443
444/*
445 * init request, will be used later with RGWHTTPManager
446 */
31f18b77 447int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
7c673cae
FG
448{
449 assert(!req_data);
450 _req_data->get();
451 req_data = _req_data;
452
453 CURL *easy_handle;
454
455 easy_handle = curl_easy_init();
456
457 req_data->easy_handle = easy_handle;
458
459 dout(20) << "sending request to " << url << dendl;
460
461 curl_slist *h = headers_to_slist(headers);
462
463 req_data->h = h;
464
465 last_method = (method ? method : "");
466 last_url = (url ? url : "");
467
468 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
469 curl_easy_setopt(easy_handle, CURLOPT_URL, url);
470 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
471 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
472 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
473 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
474 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
475 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
476 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
477 if (h) {
478 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
479 }
480 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
481 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
31f18b77 482 if (send_data_hint || is_upload_request(method)) {
7c673cae
FG
483 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
484 }
485 if (has_send_len) {
486 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
487 }
31f18b77
FG
488 if (!verify_ssl) {
489 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
490 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
491 dout(20) << "ssl verification is set to off" << dendl;
492 }
7c673cae
FG
493 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
494
495 return 0;
496}
497
498/*
499 * wait for async request to complete
500 */
501int RGWHTTPClient::wait()
502{
503 if (!req_data->is_done()) {
504 return req_data->wait();
505 }
506
507 return req_data->ret;
508}
509
510RGWHTTPClient::~RGWHTTPClient()
511{
512 if (req_data) {
513 RGWHTTPManager *http_manager = req_data->get_manager();
514 if (http_manager) {
515 http_manager->remove_request(this);
516 }
517
518 req_data->put();
519 }
520}
521
522
523int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
524{
525 const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
526
527 /* We're tokening the line that way due to backward compatibility. */
528 const size_t sep_loc = header_line.find_first_of(" \t:");
529
530 if (boost::string_ref::npos == sep_loc) {
531 /* Wrongly formatted header? Just skip it. */
532 return 0;
533 }
534
535 header_name_t name(header_line.substr(0, sep_loc));
536 if (0 == relevant_headers.count(name)) {
537 /* Not interested in this particular header. */
538 return 0;
539 }
540
541 const auto value_part = header_line.substr(sep_loc + 1);
542
543 /* Skip spaces and tabs after the separator. */
544 const size_t val_loc_s = value_part.find_first_not_of(' ');
545 const size_t val_loc_e = value_part.find_first_of("\r\n");
546
547 if (boost::string_ref::npos == val_loc_s ||
548 boost::string_ref::npos == val_loc_e) {
549 /* Empty value case. */
550 found_headers.emplace(name, header_value_t());
551 } else {
552 found_headers.emplace(name, header_value_t(
553 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
554 }
555
556 return 0;
557}
558
559int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
560{
561 int length_to_copy = 0;
562 if (post_data_index < post_data.length()) {
563 length_to_copy = min(post_data.length() - post_data_index, len);
564 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
565 post_data_index += length_to_copy;
566 }
567 return length_to_copy;
568}
569
570
571static int clear_signal(int fd)
572{
573 // since we're in non-blocking mode, we can try to read a lot more than
574 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
575 // are also required to support the curl_multi_wait bug workaround
576 std::array<char, 256> buf;
577 int ret = ::read(fd, (void *)buf.data(), buf.size());
578 if (ret < 0) {
579 ret = -errno;
580 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
581 }
582 return 0;
583}
584
585#if HAVE_CURL_MULTI_WAIT
586
587static std::once_flag detect_flag;
588static bool curl_multi_wait_bug_present = false;
589
590static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
591 int write_fd, int read_fd)
592{
593 int ret = 0;
594
595 // write to write_fd so that read_fd becomes readable
596 uint32_t buf = 0;
597 ret = ::write(write_fd, &buf, sizeof(buf));
598 if (ret < 0) {
599 ret = -errno;
600 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
601 return ret;
602 }
603
604 // pass read_fd in extra_fds for curl_multi_wait()
605 int num_fds;
606 struct curl_waitfd wait_fd;
607
608 wait_fd.fd = read_fd;
609 wait_fd.events = CURL_WAIT_POLLIN;
610 wait_fd.revents = 0;
611
612 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
613 if (ret != CURLM_OK) {
614 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
615 return -EIO;
616 }
617
618 // curl_multi_wait should flag revents when extra_fd is readable. if it
619 // doesn't, the bug is present and we can't rely on revents
620 if (wait_fd.revents == 0) {
621 curl_multi_wait_bug_present = true;
622 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
623 "bug in curl_multi_wait(). enabling a workaround that may degrade "
624 "performance slightly." << dendl;
625 }
626
627 return clear_signal(read_fd);
628}
629
630static bool is_signaled(const curl_waitfd& wait_fd)
631{
632 if (wait_fd.fd < 0) {
633 // no fd to signal
634 return false;
635 }
636
637 if (curl_multi_wait_bug_present) {
638 // we can't rely on revents, so we always return true if a wait_fd is given.
639 // this means we'll be trying a non-blocking read on this fd every time that
640 // curl_multi_wait() wakes up
641 return true;
642 }
643
644 return wait_fd.revents > 0;
645}
646
647static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
648{
649 int num_fds;
650 struct curl_waitfd wait_fd;
651
652 wait_fd.fd = signal_fd;
653 wait_fd.events = CURL_WAIT_POLLIN;
654 wait_fd.revents = 0;
655
656 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
657 if (ret) {
658 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
659 return -EIO;
660 }
661
662 if (is_signaled(wait_fd)) {
663 ret = clear_signal(signal_fd);
664 if (ret < 0) {
665 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
666 return ret;
667 }
668 }
669 return 0;
670}
671
672#else
673
674static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
675{
676 fd_set fdread;
677 fd_set fdwrite;
678 fd_set fdexcep;
679 int maxfd = -1;
680
681 FD_ZERO(&fdread);
682 FD_ZERO(&fdwrite);
683 FD_ZERO(&fdexcep);
684
685 /* get file descriptors from the transfers */
686 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
687 if (ret) {
688 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
689 return -EIO;
690 }
691
692 if (signal_fd > 0) {
693 FD_SET(signal_fd, &fdread);
694 if (signal_fd >= maxfd) {
695 maxfd = signal_fd + 1;
696 }
697 }
698
699 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
700 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
701#define RGW_CURL_TIMEOUT 1000
702 if (!to)
703 to = RGW_CURL_TIMEOUT;
704 struct timeval timeout;
705 timeout.tv_sec = to / 1000;
706 timeout.tv_usec = to % 1000;
707
708 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
709 if (ret < 0) {
710 ret = -errno;
711 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
712 return ret;
713 }
714
715 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
716 ret = clear_signal(signal_fd);
717 if (ret < 0) {
718 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
719 return ret;
720 }
721 }
722
723 return 0;
724}
725
726#endif
727
728void *RGWHTTPManager::ReqsThread::entry()
729{
730 manager->reqs_thread_entry();
731 return NULL;
732}
733
734/*
735 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
736 */
737RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
738 completion_mgr(_cm), is_threaded(false),
739 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
740 reqs_thread(NULL)
741{
742 multi_handle = (void *)curl_multi_init();
743 thread_pipe[0] = -1;
744 thread_pipe[1] = -1;
745}
746
747RGWHTTPManager::~RGWHTTPManager() {
748 stop();
749 if (multi_handle)
750 curl_multi_cleanup((CURLM *)multi_handle);
751}
752
753void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
754{
755 RWLock::WLocker rl(reqs_lock);
756 req_data->id = num_reqs;
757 req_data->registered = true;
758 reqs[num_reqs] = req_data;
759 num_reqs++;
760 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
761}
762
763void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
764{
765 RWLock::WLocker rl(reqs_lock);
766 req_data->get();
767 req_data->registered = false;
768 unregistered_reqs.push_back(req_data);
769 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
770}
771
772void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
773{
774 RWLock::WLocker rl(reqs_lock);
775 _complete_request(req_data);
776}
777
778void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
779{
780 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
781 if (iter != reqs.end()) {
782 reqs.erase(iter);
783 }
784 {
785 Mutex::Locker l(req_data->lock);
786 req_data->mgr = nullptr;
787 }
788 if (completion_mgr) {
789 completion_mgr->complete(NULL, req_data->user_info);
790 }
791
792 req_data->put();
793}
794
795void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
796{
797 req_data->finish(ret);
798 complete_request(req_data);
799}
800
801void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
802{
803 req_data->finish(ret);
804 _complete_request(req_data);
805}
806
807/*
808 * hook request to the curl multi handle
809 */
810int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
811{
812 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
813 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
814 if (mstatus) {
815 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
816 return -EIO;
817 }
818 return 0;
819}
820
821/*
822 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
823 * there will be no more processing on this request
824 */
825void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
826{
827 if (req_data->easy_handle) {
828 curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
829 }
830 if (!req_data->is_done()) {
831 _finish_request(req_data, -ECANCELED);
832 }
833}
834
835void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
836{
837 RWLock::WLocker wl(reqs_lock);
838 _unlink_request(req_data);
839}
840
841void RGWHTTPManager::manage_pending_requests()
842{
843 reqs_lock.get_read();
844 if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
845 reqs_lock.unlock();
846 return;
847 }
848 reqs_lock.unlock();
849
850 RWLock::WLocker wl(reqs_lock);
851
852 if (!unregistered_reqs.empty()) {
853 for (auto& r : unregistered_reqs) {
854 _unlink_request(r);
855 r->put();
856 }
857
858 unregistered_reqs.clear();
859 }
860
861 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
862
863 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
864
865 for (; iter != reqs.end(); ++iter) {
866 rgw_http_req_data *req_data = iter->second;
867 int r = link_request(req_data);
868 if (r < 0) {
869 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
870 remove_reqs.push_back(std::make_pair(iter->second, r));
871 } else {
872 max_threaded_req = iter->first + 1;
873 }
874 }
875
876 for (auto piter : remove_reqs) {
877 rgw_http_req_data *req_data = piter.first;
878 int r = piter.second;
879
880 _finish_request(req_data, r);
881 }
882}
883
31f18b77 884int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
7c673cae
FG
885{
886 rgw_http_req_data *req_data = new rgw_http_req_data;
887
31f18b77 888 int ret = client->init_request(method, url, req_data, send_data_hint);
7c673cae
FG
889 if (ret < 0) {
890 req_data->put();
891 req_data = NULL;
892 return ret;
893 }
894
895 req_data->mgr = this;
896 req_data->client = client;
897 req_data->user_info = client->get_user_info();
898
899 register_request(req_data);
900
901 if (!is_threaded) {
902 ret = link_request(req_data);
903 if (ret < 0) {
904 req_data->put();
905 req_data = NULL;
906 }
907 return ret;
908 }
909 ret = signal_thread();
910 if (ret < 0) {
911 finish_request(req_data, ret);
912 }
913
914 return ret;
915}
916
917int RGWHTTPManager::remove_request(RGWHTTPClient *client)
918{
919 rgw_http_req_data *req_data = client->get_req_data();
920
921 if (!is_threaded) {
922 unlink_request(req_data);
923 return 0;
924 }
925 unregister_request(req_data);
926 int ret = signal_thread();
927 if (ret < 0) {
928 return ret;
929 }
930
931 return 0;
932}
933
934/*
935 * the synchronous, non-threaded request processing method.
936 */
937int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
938{
939 assert(!is_threaded);
940
941 int still_running;
942 int mstatus;
943
944 do {
945 if (wait_for_data) {
946 int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
947 if (ret < 0) {
948 return ret;
949 }
950 }
951
952 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
953 switch (mstatus) {
954 case CURLM_OK:
955 case CURLM_CALL_MULTI_PERFORM:
956 break;
957 default:
958 dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
959 return -EINVAL;
960 }
961 int msgs_left;
962 CURLMsg *msg;
963 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
964 if (msg->msg == CURLMSG_DONE) {
965 CURL *e = msg->easy_handle;
966 rgw_http_req_data *req_data;
967 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
968
969 long http_status;
970 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
971
972 int status = rgw_http_error_to_errno(http_status);
973 int result = msg->data.result;
974 finish_request(req_data, status);
975 switch (result) {
976 case CURLE_OK:
977 break;
978 default:
979 dout(20) << "ERROR: msg->data.result=" << result << dendl;
980 return -EIO;
981 }
982 }
983 }
984 } while (mstatus == CURLM_CALL_MULTI_PERFORM);
985
986 *done = (still_running == 0);
987
988 return 0;
989}
990
991/*
992 * the synchronous, non-threaded request processing completion method.
993 */
994int RGWHTTPManager::complete_requests()
995{
996 bool done = false;
997 int ret;
998 do {
999 ret = process_requests(true, &done);
1000 } while (!done && !ret);
1001
1002 return ret;
1003}
1004
1005int RGWHTTPManager::set_threaded()
1006{
1007 int r = pipe(thread_pipe);
1008 if (r < 0) {
1009 r = -errno;
1010 ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
1011 return r;
1012 }
1013
1014 // enable non-blocking reads
1015 r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
1016 if (r < 0) {
1017 r = -errno;
1018 ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
1019 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1020 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1021 return r;
1022 }
1023
1024#ifdef HAVE_CURL_MULTI_WAIT
1025 // on first initialization, use this pipe to detect whether we're using a
1026 // buggy version of libcurl
1027 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1028 static_cast<CURLM*>(multi_handle),
1029 thread_pipe[1], thread_pipe[0]);
1030#endif
1031
1032 is_threaded = true;
1033 reqs_thread = new ReqsThread(this);
1034 reqs_thread->create("http_manager");
1035 return 0;
1036}
1037
1038void RGWHTTPManager::stop()
1039{
1040 if (is_stopped) {
1041 return;
1042 }
1043
1044 is_stopped = true;
1045
1046 if (is_threaded) {
1047 going_down = true;
1048 signal_thread();
1049 reqs_thread->join();
1050 delete reqs_thread;
1051 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1052 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1053 }
1054}
1055
1056int RGWHTTPManager::signal_thread()
1057{
1058 uint32_t buf = 0;
1059 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1060 if (ret < 0) {
1061 ret = -errno;
1062 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1063 return ret;
1064 }
1065 return 0;
1066}
1067
1068void *RGWHTTPManager::reqs_thread_entry()
1069{
1070 int still_running;
1071 int mstatus;
1072
1073 ldout(cct, 20) << __func__ << ": start" << dendl;
1074
1075 while (!going_down) {
1076 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1077 if (ret < 0) {
1078 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1079 return NULL;
1080 }
1081
1082 manage_pending_requests();
1083
1084 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1085 switch (mstatus) {
1086 case CURLM_OK:
1087 case CURLM_CALL_MULTI_PERFORM:
1088 break;
1089 default:
1090 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1091 break;
1092 }
1093 int msgs_left;
1094 CURLMsg *msg;
1095 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1096 if (msg->msg == CURLMSG_DONE) {
1097 int result = msg->data.result;
1098 CURL *e = msg->easy_handle;
1099 rgw_http_req_data *req_data;
1100 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1101 curl_multi_remove_handle((CURLM *)multi_handle, e);
1102
1103 long http_status;
1104 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1105
1106 int status = rgw_http_error_to_errno(http_status);
1107 if (result != CURLE_OK && http_status == 0) {
1108 status = -EAGAIN;
1109 }
1110 int id = req_data->id;
1111 finish_request(req_data, status);
1112 switch (result) {
1113 case CURLE_OK:
1114 break;
1115 default:
1116 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1117 break;
1118 }
1119 }
1120 }
1121 }
1122
1123
1124 RWLock::WLocker rl(reqs_lock);
1125 for (auto r : unregistered_reqs) {
1126 _finish_request(r, -ECANCELED);
1127 }
1128
1129 unregistered_reqs.clear();
1130
1131 auto all_reqs = std::move(reqs);
1132 for (auto iter : all_reqs) {
1133 _finish_request(iter.second, -ECANCELED);
1134 }
1135
1136 reqs.clear();
1137
1138 if (completion_mgr) {
1139 completion_mgr->go_down();
1140 }
1141
1142 return 0;
1143}
1144
1145