1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/errno.h"
7 #include <boost/utility/string_ref.hpp>
10 #include <curl/easy.h>
11 #include <curl/multi.h>
13 #include "rgw_common.h"
14 #include "rgw_http_client.h"
15 #include "rgw_http_errors.h"
16 #include "common/RefCountedObj.h"
18 #include "rgw_coroutine.h"
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
25 struct rgw_http_req_data
: public RefCountedObject
{
30 std::atomic
<bool> done
= { false };
31 RGWHTTPClient
*client
;
35 char error_buf
[CURL_ERROR_SIZE
];
40 rgw_http_req_data() : easy_handle(NULL
), h(NULL
), id(-1), ret(0),
41 client(nullptr), user_info(nullptr), registered(false),
42 mgr(NULL
), lock("rgw_http_req_data::lock") {
43 memset(error_buf
, 0, sizeof(error_buf
));
47 Mutex::Locker
l(lock
);
54 Mutex::Locker
l(lock
);
57 curl_easy_cleanup(easy_handle
);
60 curl_slist_free_all(h
);
73 Mutex::Locker
l(lock
);
77 RGWHTTPManager
*get_manager() {
78 Mutex::Locker
l(lock
);
83 struct RGWCurlHandle
{
88 RGWCurlHandle(CURL
* h
) : uses(0), h(h
) {};
95 class RGWCurlHandles
: public Thread
{
98 std::vector
<RGWCurlHandle
*>saved_curl
;
103 cleaner_lock
{"RGWCurlHandles::cleaner_lock"},
104 cleaner_shutdown
{0} {
107 RGWCurlHandle
* get_curl_handle();
108 void release_curl_handle_now(RGWCurlHandle
* curl
);
109 void release_curl_handle(RGWCurlHandle
* curl
);
110 void flush_curl_handles();
115 RGWCurlHandle
* RGWCurlHandles::get_curl_handle() {
116 RGWCurlHandle
* curl
= 0;
119 Mutex::Locker
lock(cleaner_lock
);
120 if (!saved_curl
.empty()) {
121 curl
= *saved_curl
.begin();
122 saved_curl
.erase(saved_curl
.begin());
126 } else if ((h
= curl_easy_init())) {
127 curl
= new RGWCurlHandle
{h
};
134 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle
* curl
)
136 curl_easy_cleanup(**curl
);
140 void RGWCurlHandles::release_curl_handle(RGWCurlHandle
* curl
)
142 if (cleaner_shutdown
) {
143 release_curl_handle_now(curl
);
145 curl_easy_reset(**curl
);
146 Mutex::Locker
lock(cleaner_lock
);
147 curl
->lastuse
= mono_clock::now();
148 saved_curl
.insert(saved_curl
.begin(), 1, curl
);
152 void* RGWCurlHandles::entry()
155 Mutex::Locker
lock(cleaner_lock
);
158 if (cleaner_shutdown
) {
159 if (saved_curl
.empty())
162 utime_t release
= ceph_clock_now() + utime_t(MAXIDLE
,0);
163 cleaner_cond
.WaitUntil(cleaner_lock
, release
);
165 mono_time now
= mono_clock::now();
166 while (!saved_curl
.empty()) {
167 auto cend
= saved_curl
.end();
170 if (!cleaner_shutdown
&& now
- curl
->lastuse
< std::chrono::seconds(MAXIDLE
))
172 saved_curl
.erase(cend
);
173 release_curl_handle_now(curl
);
179 void RGWCurlHandles::stop()
181 Mutex::Locker
lock(cleaner_lock
);
182 cleaner_shutdown
= 1;
183 cleaner_cond
.Signal();
186 void RGWCurlHandles::flush_curl_handles()
190 if (!saved_curl
.empty()) {
191 dout(0) << "ERROR: " << __func__
<< " failed final cleanup" << dendl
;
193 saved_curl
.shrink_to_fit();
196 static RGWCurlHandles
*handles
;
197 // XXX make this part of the token cache? (but that's swift-only;
198 // and this especially needs to integrates with s3...)
200 void rgw_setup_saved_curl_handles()
202 handles
= new RGWCurlHandles();
203 handles
->create("rgw_curl");
206 void rgw_release_all_curl_handles()
208 handles
->flush_curl_handles();
213 * the simple set of callbacks will be called on RGWHTTPClient::process()
215 /* Static methods - callbacks for libcurl. */
216 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr
,
221 RGWHTTPClient
*client
= static_cast<RGWHTTPClient
*>(_info
);
222 const size_t len
= size
* nmemb
;
223 int ret
= client
->receive_header(ptr
, size
* nmemb
);
225 dout(0) << "WARNING: client->receive_header() returned ret="
232 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr
,
237 RGWHTTPClient
*client
= static_cast<RGWHTTPClient
*>(_info
);
238 const size_t len
= size
* nmemb
;
239 int ret
= client
->receive_data(ptr
, size
* nmemb
);
241 dout(0) << "WARNING: client->receive_data() returned ret="
248 size_t RGWHTTPClient::simple_send_http_data(void * const ptr
,
253 RGWHTTPClient
*client
= static_cast<RGWHTTPClient
*>(_info
);
254 int ret
= client
->send_data(ptr
, size
* nmemb
);
256 dout(0) << "WARNING: client->send_data() returned ret="
264 * the following set of callbacks will be called either on RGWHTTPManager::process(),
265 * or via the RGWHTTPManager async processing.
267 size_t RGWHTTPClient::receive_http_header(void * const ptr
,
272 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
273 size_t len
= size
* nmemb
;
275 Mutex::Locker
l(req_data
->lock
);
277 if (!req_data
->registered
) {
281 int ret
= req_data
->client
->receive_header(ptr
, size
* nmemb
);
283 dout(0) << "WARNING: client->receive_header() returned ret=" << ret
<< dendl
;
289 size_t RGWHTTPClient::receive_http_data(void * const ptr
,
294 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
295 size_t len
= size
* nmemb
;
297 Mutex::Locker
l(req_data
->lock
);
299 if (!req_data
->registered
) {
303 int ret
= req_data
->client
->receive_data(ptr
, size
* nmemb
);
305 dout(0) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
311 size_t RGWHTTPClient::send_http_data(void * const ptr
,
316 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
318 Mutex::Locker
l(req_data
->lock
);
320 if (!req_data
->registered
) {
324 int ret
= req_data
->client
->send_data(ptr
, size
* nmemb
);
326 dout(0) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
332 static curl_slist
*headers_to_slist(param_vec_t
& headers
)
334 curl_slist
*h
= NULL
;
336 param_vec_t::iterator iter
;
337 for (iter
= headers
.begin(); iter
!= headers
.end(); ++iter
) {
338 pair
<string
, string
>& p
= *iter
;
339 string val
= p
.first
;
341 if (strncmp(val
.c_str(), "HTTP_", 5) == 0) {
345 /* we need to convert all underscores into dashes as some web servers forbid them
346 * in the http header field names
348 for (size_t i
= 0; i
< val
.size(); i
++) {
354 // curl won't send headers with empty values unless it ends with a ; instead
355 if (p
.second
.empty()) {
359 val
.append(p
.second
);
361 h
= curl_slist_append(h
, val
.c_str());
367 static bool is_upload_request(const char *method
)
369 if (method
== nullptr) {
372 return strcmp(method
, "POST") == 0 || strcmp(method
, "PUT") == 0;
376 * process a single simple one off request, not going through RGWHTTPManager. Not using
379 int RGWHTTPClient::process(const char *method
, const char *url
)
384 char error_buf
[CURL_ERROR_SIZE
];
386 last_method
= (method
? method
: "");
387 last_url
= (url
? url
: "");
389 auto ca
= handles
->get_curl_handle();
392 dout(20) << "sending request to " << url
<< dendl
;
394 curl_slist
*h
= headers_to_slist(headers
);
396 curl_easy_setopt(curl_handle
, CURLOPT_CUSTOMREQUEST
, method
);
397 curl_easy_setopt(curl_handle
, CURLOPT_URL
, url
);
398 curl_easy_setopt(curl_handle
, CURLOPT_NOPROGRESS
, 1L);
399 curl_easy_setopt(curl_handle
, CURLOPT_NOSIGNAL
, 1L);
400 curl_easy_setopt(curl_handle
, CURLOPT_HEADERFUNCTION
, simple_receive_http_header
);
401 curl_easy_setopt(curl_handle
, CURLOPT_WRITEHEADER
, (void *)this);
402 curl_easy_setopt(curl_handle
, CURLOPT_WRITEFUNCTION
, simple_receive_http_data
);
403 curl_easy_setopt(curl_handle
, CURLOPT_WRITEDATA
, (void *)this);
404 curl_easy_setopt(curl_handle
, CURLOPT_ERRORBUFFER
, (void *)error_buf
);
405 curl_easy_setopt(curl_handle
, CURLOPT_LOW_SPEED_TIME
, cct
->_conf
->rgw_curl_low_speed_time
);
406 curl_easy_setopt(curl_handle
, CURLOPT_LOW_SPEED_LIMIT
, cct
->_conf
->rgw_curl_low_speed_limit
);
408 curl_easy_setopt(curl_handle
, CURLOPT_HTTPHEADER
, (void *)h
);
410 curl_easy_setopt(curl_handle
, CURLOPT_READFUNCTION
, simple_send_http_data
);
411 curl_easy_setopt(curl_handle
, CURLOPT_READDATA
, (void *)this);
412 if (is_upload_request(method
)) {
413 curl_easy_setopt(curl_handle
, CURLOPT_UPLOAD
, 1L);
416 curl_easy_setopt(curl_handle
, CURLOPT_INFILESIZE
, (void *)send_len
);
419 curl_easy_setopt(curl_handle
, CURLOPT_SSL_VERIFYPEER
, 0L);
420 curl_easy_setopt(curl_handle
, CURLOPT_SSL_VERIFYHOST
, 0L);
421 dout(20) << "ssl verification is set to off" << dendl
;
424 CURLcode status
= curl_easy_perform(curl_handle
);
426 dout(0) << "curl_easy_perform returned status " << status
<< " error: " << error_buf
<< dendl
;
429 curl_easy_getinfo(curl_handle
, CURLINFO_RESPONSE_CODE
, &http_status
);
430 handles
->release_curl_handle(ca
);
431 curl_slist_free_all(h
);
436 string
RGWHTTPClient::to_str()
438 string method_str
= (last_method
.empty() ? "<no-method>" : last_method
);
439 string url_str
= (last_url
.empty() ? "<no-url>" : last_url
);
440 return method_str
+ " " + url_str
;
443 int RGWHTTPClient::get_req_retcode()
449 return req_data
->get_retcode();
453 * init request, will be used later with RGWHTTPManager
455 int RGWHTTPClient::init_request(const char *method
, const char *url
, rgw_http_req_data
*_req_data
, bool send_data_hint
)
459 req_data
= _req_data
;
463 easy_handle
= curl_easy_init();
465 req_data
->easy_handle
= easy_handle
;
467 dout(20) << "sending request to " << url
<< dendl
;
469 curl_slist
*h
= headers_to_slist(headers
);
473 last_method
= (method
? method
: "");
474 last_url
= (url
? url
: "");
476 curl_easy_setopt(easy_handle
, CURLOPT_CUSTOMREQUEST
, method
);
477 curl_easy_setopt(easy_handle
, CURLOPT_URL
, url
);
478 curl_easy_setopt(easy_handle
, CURLOPT_NOPROGRESS
, 1L);
479 curl_easy_setopt(easy_handle
, CURLOPT_NOSIGNAL
, 1L);
480 curl_easy_setopt(easy_handle
, CURLOPT_HEADERFUNCTION
, receive_http_header
);
481 curl_easy_setopt(easy_handle
, CURLOPT_WRITEHEADER
, (void *)req_data
);
482 curl_easy_setopt(easy_handle
, CURLOPT_WRITEFUNCTION
, receive_http_data
);
483 curl_easy_setopt(easy_handle
, CURLOPT_WRITEDATA
, (void *)req_data
);
484 curl_easy_setopt(easy_handle
, CURLOPT_ERRORBUFFER
, (void *)req_data
->error_buf
);
485 curl_easy_setopt(easy_handle
, CURLOPT_LOW_SPEED_TIME
, cct
->_conf
->rgw_curl_low_speed_time
);
486 curl_easy_setopt(easy_handle
, CURLOPT_LOW_SPEED_LIMIT
, cct
->_conf
->rgw_curl_low_speed_limit
);
488 curl_easy_setopt(easy_handle
, CURLOPT_HTTPHEADER
, (void *)h
);
490 curl_easy_setopt(easy_handle
, CURLOPT_READFUNCTION
, send_http_data
);
491 curl_easy_setopt(easy_handle
, CURLOPT_READDATA
, (void *)req_data
);
492 if (send_data_hint
|| is_upload_request(method
)) {
493 curl_easy_setopt(easy_handle
, CURLOPT_UPLOAD
, 1L);
496 curl_easy_setopt(easy_handle
, CURLOPT_INFILESIZE
, (void *)send_len
);
499 curl_easy_setopt(easy_handle
, CURLOPT_SSL_VERIFYPEER
, 0L);
500 curl_easy_setopt(easy_handle
, CURLOPT_SSL_VERIFYHOST
, 0L);
501 dout(20) << "ssl verification is set to off" << dendl
;
503 curl_easy_setopt(easy_handle
, CURLOPT_PRIVATE
, (void *)req_data
);
509 * wait for async request to complete
511 int RGWHTTPClient::wait()
513 if (!req_data
->is_done()) {
514 return req_data
->wait();
517 return req_data
->ret
;
520 RGWHTTPClient::~RGWHTTPClient()
523 RGWHTTPManager
*http_manager
= req_data
->get_manager();
525 http_manager
->remove_request(this);
533 int RGWHTTPHeadersCollector::receive_header(void * const ptr
, const size_t len
)
535 const boost::string_ref
header_line(static_cast<const char * const>(ptr
), len
);
537 /* We're tokening the line that way due to backward compatibility. */
538 const size_t sep_loc
= header_line
.find_first_of(" \t:");
540 if (boost::string_ref::npos
== sep_loc
) {
541 /* Wrongly formatted header? Just skip it. */
545 header_name_t
name(header_line
.substr(0, sep_loc
));
546 if (0 == relevant_headers
.count(name
)) {
547 /* Not interested in this particular header. */
551 const auto value_part
= header_line
.substr(sep_loc
+ 1);
553 /* Skip spaces and tabs after the separator. */
554 const size_t val_loc_s
= value_part
.find_first_not_of(' ');
555 const size_t val_loc_e
= value_part
.find_first_of("\r\n");
557 if (boost::string_ref::npos
== val_loc_s
||
558 boost::string_ref::npos
== val_loc_e
) {
559 /* Empty value case. */
560 found_headers
.emplace(name
, header_value_t());
562 found_headers
.emplace(name
, header_value_t(
563 value_part
.substr(val_loc_s
, val_loc_e
- val_loc_s
)));
569 int RGWHTTPTransceiver::send_data(void* ptr
, size_t len
)
571 int length_to_copy
= 0;
572 if (post_data_index
< post_data
.length()) {
573 length_to_copy
= min(post_data
.length() - post_data_index
, len
);
574 memcpy(ptr
, post_data
.data() + post_data_index
, length_to_copy
);
575 post_data_index
+= length_to_copy
;
577 return length_to_copy
;
581 static int clear_signal(int fd
)
583 // since we're in non-blocking mode, we can try to read a lot more than
584 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
585 // are also required to support the curl_multi_wait bug workaround
586 std::array
<char, 256> buf
;
587 int ret
= ::read(fd
, (void *)buf
.data(), buf
.size());
590 return ret
== -EAGAIN
? 0 : ret
; // clear EAGAIN
595 #if HAVE_CURL_MULTI_WAIT
597 static std::once_flag detect_flag
;
598 static bool curl_multi_wait_bug_present
= false;
600 static int detect_curl_multi_wait_bug(CephContext
*cct
, CURLM
*handle
,
601 int write_fd
, int read_fd
)
605 // write to write_fd so that read_fd becomes readable
607 ret
= ::write(write_fd
, &buf
, sizeof(buf
));
610 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): write() returned " << ret
<< dendl
;
614 // pass read_fd in extra_fds for curl_multi_wait()
616 struct curl_waitfd wait_fd
;
618 wait_fd
.fd
= read_fd
;
619 wait_fd
.events
= CURL_WAIT_POLLIN
;
622 ret
= curl_multi_wait(handle
, &wait_fd
, 1, 0, &num_fds
);
623 if (ret
!= CURLM_OK
) {
624 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
628 // curl_multi_wait should flag revents when extra_fd is readable. if it
629 // doesn't, the bug is present and we can't rely on revents
630 if (wait_fd
.revents
== 0) {
631 curl_multi_wait_bug_present
= true;
632 ldout(cct
, 0) << "WARNING: detected a version of libcurl which contains a "
633 "bug in curl_multi_wait(). enabling a workaround that may degrade "
634 "performance slightly." << dendl
;
637 return clear_signal(read_fd
);
640 static bool is_signaled(const curl_waitfd
& wait_fd
)
642 if (wait_fd
.fd
< 0) {
647 if (curl_multi_wait_bug_present
) {
648 // we can't rely on revents, so we always return true if a wait_fd is given.
649 // this means we'll be trying a non-blocking read on this fd every time that
650 // curl_multi_wait() wakes up
654 return wait_fd
.revents
> 0;
657 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
660 struct curl_waitfd wait_fd
;
662 wait_fd
.fd
= signal_fd
;
663 wait_fd
.events
= CURL_WAIT_POLLIN
;
666 int ret
= curl_multi_wait(handle
, &wait_fd
, 1, cct
->_conf
->rgw_curl_wait_timeout_ms
, &num_fds
);
668 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
672 if (is_signaled(wait_fd
)) {
673 ret
= clear_signal(signal_fd
);
675 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
684 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
695 /* get file descriptors from the transfers */
696 int ret
= curl_multi_fdset(handle
, &fdread
, &fdwrite
, &fdexcep
, &maxfd
);
698 ldout(cct
, 0) << "ERROR: curl_multi_fdset returned " << ret
<< dendl
;
703 FD_SET(signal_fd
, &fdread
);
704 if (signal_fd
>= maxfd
) {
705 maxfd
= signal_fd
+ 1;
709 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
710 uint64_t to
= cct
->_conf
->rgw_curl_wait_timeout_ms
;
711 #define RGW_CURL_TIMEOUT 1000
713 to
= RGW_CURL_TIMEOUT
;
714 struct timeval timeout
;
715 timeout
.tv_sec
= to
/ 1000;
716 timeout
.tv_usec
= to
% 1000;
718 ret
= select(maxfd
+1, &fdread
, &fdwrite
, &fdexcep
, &timeout
);
721 ldout(cct
, 0) << "ERROR: select returned " << ret
<< dendl
;
725 if (signal_fd
> 0 && FD_ISSET(signal_fd
, &fdread
)) {
726 ret
= clear_signal(signal_fd
);
728 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
738 void *RGWHTTPManager::ReqsThread::entry()
740 manager
->reqs_thread_entry();
745 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
747 RGWHTTPManager::RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*_cm
) : cct(_cct
),
748 completion_mgr(_cm
), is_threaded(false),
749 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
752 multi_handle
= (void *)curl_multi_init();
757 RGWHTTPManager::~RGWHTTPManager() {
760 curl_multi_cleanup((CURLM
*)multi_handle
);
763 void RGWHTTPManager::register_request(rgw_http_req_data
*req_data
)
765 RWLock::WLocker
rl(reqs_lock
);
766 req_data
->id
= num_reqs
;
767 req_data
->registered
= true;
768 reqs
[num_reqs
] = req_data
;
770 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", easy_handle=" << req_data
->easy_handle
<< dendl
;
773 void RGWHTTPManager::unregister_request(rgw_http_req_data
*req_data
)
775 RWLock::WLocker
rl(reqs_lock
);
777 req_data
->registered
= false;
778 unregistered_reqs
.push_back(req_data
);
779 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", easy_handle=" << req_data
->easy_handle
<< dendl
;
782 void RGWHTTPManager::complete_request(rgw_http_req_data
*req_data
)
784 RWLock::WLocker
rl(reqs_lock
);
785 _complete_request(req_data
);
788 void RGWHTTPManager::_complete_request(rgw_http_req_data
*req_data
)
790 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(req_data
->id
);
791 if (iter
!= reqs
.end()) {
795 Mutex::Locker
l(req_data
->lock
);
796 req_data
->mgr
= nullptr;
798 if (completion_mgr
) {
799 completion_mgr
->complete(NULL
, req_data
->user_info
);
805 void RGWHTTPManager::finish_request(rgw_http_req_data
*req_data
, int ret
)
807 req_data
->finish(ret
);
808 complete_request(req_data
);
811 void RGWHTTPManager::_finish_request(rgw_http_req_data
*req_data
, int ret
)
813 req_data
->finish(ret
);
814 _complete_request(req_data
);
818 * hook request to the curl multi handle
820 int RGWHTTPManager::link_request(rgw_http_req_data
*req_data
)
822 ldout(cct
, 20) << __func__
<< " req_data=" << req_data
<< " req_data->id=" << req_data
->id
<< ", easy_handle=" << req_data
->easy_handle
<< dendl
;
823 CURLMcode mstatus
= curl_multi_add_handle((CURLM
*)multi_handle
, req_data
->easy_handle
);
825 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus
<< dendl
;
832 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
833 * there will be no more processing on this request
835 void RGWHTTPManager::_unlink_request(rgw_http_req_data
*req_data
)
837 if (req_data
->easy_handle
) {
838 curl_multi_remove_handle((CURLM
*)multi_handle
, req_data
->easy_handle
);
840 if (!req_data
->is_done()) {
841 _finish_request(req_data
, -ECANCELED
);
845 void RGWHTTPManager::unlink_request(rgw_http_req_data
*req_data
)
847 RWLock::WLocker
wl(reqs_lock
);
848 _unlink_request(req_data
);
851 void RGWHTTPManager::manage_pending_requests()
853 reqs_lock
.get_read();
854 if (max_threaded_req
== num_reqs
&& unregistered_reqs
.empty()) {
860 RWLock::WLocker
wl(reqs_lock
);
862 if (!unregistered_reqs
.empty()) {
863 for (auto& r
: unregistered_reqs
) {
868 unregistered_reqs
.clear();
871 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(max_threaded_req
);
873 list
<std::pair
<rgw_http_req_data
*, int> > remove_reqs
;
875 for (; iter
!= reqs
.end(); ++iter
) {
876 rgw_http_req_data
*req_data
= iter
->second
;
877 int r
= link_request(req_data
);
879 ldout(cct
, 0) << "ERROR: failed to link http request" << dendl
;
880 remove_reqs
.push_back(std::make_pair(iter
->second
, r
));
882 max_threaded_req
= iter
->first
+ 1;
886 for (auto piter
: remove_reqs
) {
887 rgw_http_req_data
*req_data
= piter
.first
;
888 int r
= piter
.second
;
890 _finish_request(req_data
, r
);
894 int RGWHTTPManager::add_request(RGWHTTPClient
*client
, const char *method
, const char *url
, bool send_data_hint
)
896 rgw_http_req_data
*req_data
= new rgw_http_req_data
;
898 int ret
= client
->init_request(method
, url
, req_data
, send_data_hint
);
905 req_data
->mgr
= this;
906 req_data
->client
= client
;
907 req_data
->user_info
= client
->get_user_info();
909 register_request(req_data
);
912 ret
= link_request(req_data
);
919 ret
= signal_thread();
921 finish_request(req_data
, ret
);
927 int RGWHTTPManager::remove_request(RGWHTTPClient
*client
)
929 rgw_http_req_data
*req_data
= client
->get_req_data();
932 unlink_request(req_data
);
935 unregister_request(req_data
);
936 int ret
= signal_thread();
945 * the synchronous, non-threaded request processing method.
947 int RGWHTTPManager::process_requests(bool wait_for_data
, bool *done
)
949 assert(!is_threaded
);
956 int ret
= do_curl_wait(cct
, (CURLM
*)multi_handle
, -1);
962 mstatus
= curl_multi_perform((CURLM
*)multi_handle
, &still_running
);
965 case CURLM_CALL_MULTI_PERFORM
:
968 dout(20) << "curl_multi_perform returned: " << mstatus
<< dendl
;
973 while ((msg
= curl_multi_info_read((CURLM
*)multi_handle
, &msgs_left
))) {
974 if (msg
->msg
== CURLMSG_DONE
) {
975 CURL
*e
= msg
->easy_handle
;
976 rgw_http_req_data
*req_data
;
977 curl_easy_getinfo(e
, CURLINFO_PRIVATE
, (void **)&req_data
);
980 curl_easy_getinfo(e
, CURLINFO_RESPONSE_CODE
, (void **)&http_status
);
982 int status
= rgw_http_error_to_errno(http_status
);
983 int result
= msg
->data
.result
;
984 finish_request(req_data
, status
);
989 dout(20) << "ERROR: msg->data.result=" << result
<< dendl
;
994 } while (mstatus
== CURLM_CALL_MULTI_PERFORM
);
996 *done
= (still_running
== 0);
1002 * the synchronous, non-threaded request processing completion method.
1004 int RGWHTTPManager::complete_requests()
1009 ret
= process_requests(true, &done
);
1010 } while (!done
&& !ret
);
1015 int RGWHTTPManager::set_threaded()
1017 if (pipe_cloexec(thread_pipe
) < 0) {
1019 ldout(cct
, 0) << "ERROR: pipe(): " << cpp_strerror(e
) << dendl
;
1023 // enable non-blocking reads
1024 if (::fcntl(thread_pipe
[0], F_SETFL
, O_NONBLOCK
) < 0) {
1026 ldout(cct
, 0) << "ERROR: fcntl(): " << cpp_strerror(e
) << dendl
;
1027 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
1028 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
1032 #ifdef HAVE_CURL_MULTI_WAIT
1033 // on first initialization, use this pipe to detect whether we're using a
1034 // buggy version of libcurl
1035 std::call_once(detect_flag
, detect_curl_multi_wait_bug
, cct
,
1036 static_cast<CURLM
*>(multi_handle
),
1037 thread_pipe
[1], thread_pipe
[0]);
1041 reqs_thread
= new ReqsThread(this);
1042 reqs_thread
->create("http_manager");
1046 void RGWHTTPManager::stop()
1057 reqs_thread
->join();
1059 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
1060 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
1064 int RGWHTTPManager::signal_thread()
1067 int ret
= write(thread_pipe
[1], (void *)&buf
, sizeof(buf
));
1070 ldout(cct
, 0) << "ERROR: " << __func__
<< ": write() returned ret=" << ret
<< dendl
;
1076 void *RGWHTTPManager::reqs_thread_entry()
1081 ldout(cct
, 20) << __func__
<< ": start" << dendl
;
1083 while (!going_down
) {
1084 int ret
= do_curl_wait(cct
, (CURLM
*)multi_handle
, thread_pipe
[0]);
1086 dout(0) << "ERROR: do_curl_wait() returned: " << ret
<< dendl
;
1090 manage_pending_requests();
1092 mstatus
= curl_multi_perform((CURLM
*)multi_handle
, &still_running
);
1095 case CURLM_CALL_MULTI_PERFORM
:
1098 dout(10) << "curl_multi_perform returned: " << mstatus
<< dendl
;
1103 while ((msg
= curl_multi_info_read((CURLM
*)multi_handle
, &msgs_left
))) {
1104 if (msg
->msg
== CURLMSG_DONE
) {
1105 int result
= msg
->data
.result
;
1106 CURL
*e
= msg
->easy_handle
;
1107 rgw_http_req_data
*req_data
;
1108 curl_easy_getinfo(e
, CURLINFO_PRIVATE
, (void **)&req_data
);
1109 curl_multi_remove_handle((CURLM
*)multi_handle
, e
);
1112 curl_easy_getinfo(e
, CURLINFO_RESPONSE_CODE
, (void **)&http_status
);
1114 int status
= rgw_http_error_to_errno(http_status
);
1115 if (result
!= CURLE_OK
&& http_status
== 0) {
1118 int id
= req_data
->id
;
1119 finish_request(req_data
, status
);
1123 case CURLE_OPERATION_TIMEDOUT
:
1124 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1125 << cct
->_conf
->rgw_curl_low_speed_limit
<< " Bytes per second during " << cct
->_conf
->rgw_curl_low_speed_time
<< " seconds." << dendl
;
1127 dout(20) << "ERROR: msg->data.result=" << result
<< " req_data->id=" << id
<< " http_status=" << http_status
<< dendl
;
1135 RWLock::WLocker
rl(reqs_lock
);
1136 for (auto r
: unregistered_reqs
) {
1140 unregistered_reqs
.clear();
1142 auto all_reqs
= std::move(reqs
);
1143 for (auto iter
: all_reqs
) {
1144 _unlink_request(iter
.second
);
1149 if (completion_mgr
) {
1150 completion_mgr
->go_down();