1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
6 #include <boost/utility/string_ref.hpp>
10 #include <curl/multi.h>
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/RefCountedObj.h"
17 #include "rgw_coroutine.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
24 struct rgw_http_req_data
: public RefCountedObject
{
29 std::atomic
<bool> done
= { false };
30 RGWHTTPClient
*client
;
34 char error_buf
[CURL_ERROR_SIZE
];
39 rgw_http_req_data() : easy_handle(NULL
), h(NULL
), id(-1), ret(0),
40 client(nullptr), user_info(nullptr), registered(false),
41 mgr(NULL
), lock("rgw_http_req_data::lock") {
42 memset(error_buf
, 0, sizeof(error_buf
));
46 Mutex::Locker
l(lock
);
53 Mutex::Locker
l(lock
);
56 curl_easy_cleanup(easy_handle
);
59 curl_slist_free_all(h
);
72 Mutex::Locker
l(lock
);
76 RGWHTTPManager
*get_manager() {
77 Mutex::Locker
l(lock
);
83 * the simple set of callbacks will be called on RGWHTTPClient::process()
85 /* Static methods - callbacks for libcurl. */
86 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr
,
91 RGWHTTPClient
*client
= static_cast<RGWHTTPClient
*>(_info
);
92 const size_t len
= size
* nmemb
;
93 int ret
= client
->receive_header(ptr
, size
* nmemb
);
95 dout(0) << "WARNING: client->receive_header() returned ret="
102 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr
,
107 RGWHTTPClient
*client
= static_cast<RGWHTTPClient
*>(_info
);
108 const size_t len
= size
* nmemb
;
109 int ret
= client
->receive_data(ptr
, size
* nmemb
);
111 dout(0) << "WARNING: client->receive_data() returned ret="
118 size_t RGWHTTPClient::simple_send_http_data(void * const ptr
,
123 RGWHTTPClient
*client
= static_cast<RGWHTTPClient
*>(_info
);
124 int ret
= client
->send_data(ptr
, size
* nmemb
);
126 dout(0) << "WARNING: client->send_data() returned ret="
134 * the following set of callbacks will be called either on RGWHTTPManager::process(),
135 * or via the RGWHTTPManager async processing.
137 size_t RGWHTTPClient::receive_http_header(void * const ptr
,
142 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
143 size_t len
= size
* nmemb
;
145 Mutex::Locker
l(req_data
->lock
);
147 if (!req_data
->registered
) {
151 int ret
= req_data
->client
->receive_header(ptr
, size
* nmemb
);
153 dout(0) << "WARNING: client->receive_header() returned ret=" << ret
<< dendl
;
159 size_t RGWHTTPClient::receive_http_data(void * const ptr
,
164 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
165 size_t len
= size
* nmemb
;
167 Mutex::Locker
l(req_data
->lock
);
169 if (!req_data
->registered
) {
173 int ret
= req_data
->client
->receive_data(ptr
, size
* nmemb
);
175 dout(0) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
181 size_t RGWHTTPClient::send_http_data(void * const ptr
,
186 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
188 Mutex::Locker
l(req_data
->lock
);
190 if (!req_data
->registered
) {
194 int ret
= req_data
->client
->send_data(ptr
, size
* nmemb
);
196 dout(0) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
202 static curl_slist
*headers_to_slist(param_vec_t
& headers
)
204 curl_slist
*h
= NULL
;
206 param_vec_t::iterator iter
;
207 for (iter
= headers
.begin(); iter
!= headers
.end(); ++iter
) {
208 pair
<string
, string
>& p
= *iter
;
209 string val
= p
.first
;
211 if (strncmp(val
.c_str(), "HTTP_", 5) == 0) {
215 /* we need to convert all underscores into dashes as some web servers forbid them
216 * in the http header field names
218 for (size_t i
= 0; i
< val
.size(); i
++) {
225 val
.append(p
.second
);
226 h
= curl_slist_append(h
, val
.c_str());
232 static bool is_upload_request(const char *method
)
234 if (method
== nullptr) {
237 return strcmp(method
, "POST") == 0 || strcmp(method
, "PUT") == 0;
241 * process a single simple one off request, not going through RGWHTTPManager. Not using
244 int RGWHTTPClient::process(const char *method
, const char *url
)
249 char error_buf
[CURL_ERROR_SIZE
];
251 last_method
= (method
? method
: "");
252 last_url
= (url
? url
: "");
254 curl_handle
= curl_easy_init();
256 dout(20) << "sending request to " << url
<< dendl
;
258 curl_slist
*h
= headers_to_slist(headers
);
260 curl_easy_setopt(curl_handle
, CURLOPT_CUSTOMREQUEST
, method
);
261 curl_easy_setopt(curl_handle
, CURLOPT_URL
, url
);
262 curl_easy_setopt(curl_handle
, CURLOPT_NOPROGRESS
, 1L);
263 curl_easy_setopt(curl_handle
, CURLOPT_NOSIGNAL
, 1L);
264 curl_easy_setopt(curl_handle
, CURLOPT_HEADERFUNCTION
, simple_receive_http_header
);
265 curl_easy_setopt(curl_handle
, CURLOPT_WRITEHEADER
, (void *)this);
266 curl_easy_setopt(curl_handle
, CURLOPT_WRITEFUNCTION
, simple_receive_http_data
);
267 curl_easy_setopt(curl_handle
, CURLOPT_WRITEDATA
, (void *)this);
268 curl_easy_setopt(curl_handle
, CURLOPT_ERRORBUFFER
, (void *)error_buf
);
270 curl_easy_setopt(curl_handle
, CURLOPT_HTTPHEADER
, (void *)h
);
272 curl_easy_setopt(curl_handle
, CURLOPT_READFUNCTION
, simple_send_http_data
);
273 curl_easy_setopt(curl_handle
, CURLOPT_READDATA
, (void *)this);
274 if (is_upload_request(method
)) {
275 curl_easy_setopt(curl_handle
, CURLOPT_UPLOAD
, 1L);
278 curl_easy_setopt(curl_handle
, CURLOPT_INFILESIZE
, (void *)send_len
);
281 curl_easy_setopt(curl_handle
, CURLOPT_SSL_VERIFYPEER
, 0L);
282 curl_easy_setopt(curl_handle
, CURLOPT_SSL_VERIFYHOST
, 0L);
283 dout(20) << "ssl verification is set to off" << dendl
;
286 CURLcode status
= curl_easy_perform(curl_handle
);
288 dout(0) << "curl_easy_perform returned status " << status
<< " error: " << error_buf
<< dendl
;
291 curl_easy_getinfo(curl_handle
, CURLINFO_RESPONSE_CODE
, &http_status
);
292 curl_easy_cleanup(curl_handle
);
293 curl_slist_free_all(h
);
298 string
RGWHTTPClient::to_str()
300 string method_str
= (last_method
.empty() ? "<no-method>" : last_method
);
301 string url_str
= (last_url
.empty() ? "<no-url>" : last_url
);
302 return method_str
+ " " + url_str
;
305 int RGWHTTPClient::get_req_retcode()
311 return req_data
->get_retcode();
315 * init request, will be used later with RGWHTTPManager
317 int RGWHTTPClient::init_request(const char *method
, const char *url
, rgw_http_req_data
*_req_data
)
321 req_data
= _req_data
;
325 easy_handle
= curl_easy_init();
327 req_data
->easy_handle
= easy_handle
;
329 dout(20) << "sending request to " << url
<< dendl
;
331 curl_slist
*h
= headers_to_slist(headers
);
335 last_method
= (method
? method
: "");
336 last_url
= (url
? url
: "");
338 curl_easy_setopt(easy_handle
, CURLOPT_CUSTOMREQUEST
, method
);
339 curl_easy_setopt(easy_handle
, CURLOPT_URL
, url
);
340 curl_easy_setopt(easy_handle
, CURLOPT_NOPROGRESS
, 1L);
341 curl_easy_setopt(easy_handle
, CURLOPT_NOSIGNAL
, 1L);
342 curl_easy_setopt(easy_handle
, CURLOPT_HEADERFUNCTION
, receive_http_header
);
343 curl_easy_setopt(easy_handle
, CURLOPT_WRITEHEADER
, (void *)req_data
);
344 curl_easy_setopt(easy_handle
, CURLOPT_WRITEFUNCTION
, receive_http_data
);
345 curl_easy_setopt(easy_handle
, CURLOPT_WRITEDATA
, (void *)req_data
);
346 curl_easy_setopt(easy_handle
, CURLOPT_ERRORBUFFER
, (void *)req_data
->error_buf
);
348 curl_easy_setopt(easy_handle
, CURLOPT_HTTPHEADER
, (void *)h
);
350 curl_easy_setopt(easy_handle
, CURLOPT_READFUNCTION
, send_http_data
);
351 curl_easy_setopt(easy_handle
, CURLOPT_READDATA
, (void *)req_data
);
352 if (is_upload_request(method
)) {
353 curl_easy_setopt(easy_handle
, CURLOPT_UPLOAD
, 1L);
356 curl_easy_setopt(easy_handle
, CURLOPT_INFILESIZE
, (void *)send_len
);
358 curl_easy_setopt(easy_handle
, CURLOPT_PRIVATE
, (void *)req_data
);
364 * wait for async request to complete
366 int RGWHTTPClient::wait()
368 if (!req_data
->is_done()) {
369 return req_data
->wait();
372 return req_data
->ret
;
375 RGWHTTPClient::~RGWHTTPClient()
378 RGWHTTPManager
*http_manager
= req_data
->get_manager();
380 http_manager
->remove_request(this);
388 int RGWHTTPHeadersCollector::receive_header(void * const ptr
, const size_t len
)
390 const boost::string_ref
header_line(static_cast<const char * const>(ptr
), len
);
392 /* We're tokening the line that way due to backward compatibility. */
393 const size_t sep_loc
= header_line
.find_first_of(" \t:");
395 if (boost::string_ref::npos
== sep_loc
) {
396 /* Wrongly formatted header? Just skip it. */
400 header_name_t
name(header_line
.substr(0, sep_loc
));
401 if (0 == relevant_headers
.count(name
)) {
402 /* Not interested in this particular header. */
406 const auto value_part
= header_line
.substr(sep_loc
+ 1);
408 /* Skip spaces and tabs after the separator. */
409 const size_t val_loc_s
= value_part
.find_first_not_of(' ');
410 const size_t val_loc_e
= value_part
.find_first_of("\r\n");
412 if (boost::string_ref::npos
== val_loc_s
||
413 boost::string_ref::npos
== val_loc_e
) {
414 /* Empty value case. */
415 found_headers
.emplace(name
, header_value_t());
417 found_headers
.emplace(name
, header_value_t(
418 value_part
.substr(val_loc_s
, val_loc_e
- val_loc_s
)));
424 int RGWHTTPTransceiver::send_data(void* ptr
, size_t len
)
426 int length_to_copy
= 0;
427 if (post_data_index
< post_data
.length()) {
428 length_to_copy
= min(post_data
.length() - post_data_index
, len
);
429 memcpy(ptr
, post_data
.data() + post_data_index
, length_to_copy
);
430 post_data_index
+= length_to_copy
;
432 return length_to_copy
;
436 static int clear_signal(int fd
)
438 // since we're in non-blocking mode, we can try to read a lot more than
439 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
440 // are also required to support the curl_multi_wait bug workaround
441 std::array
<char, 256> buf
;
442 int ret
= ::read(fd
, (void *)buf
.data(), buf
.size());
445 return ret
== -EAGAIN
? 0 : ret
; // clear EAGAIN
450 #if HAVE_CURL_MULTI_WAIT
452 static std::once_flag detect_flag
;
453 static bool curl_multi_wait_bug_present
= false;
455 static int detect_curl_multi_wait_bug(CephContext
*cct
, CURLM
*handle
,
456 int write_fd
, int read_fd
)
460 // write to write_fd so that read_fd becomes readable
462 ret
= ::write(write_fd
, &buf
, sizeof(buf
));
465 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): write() returned " << ret
<< dendl
;
469 // pass read_fd in extra_fds for curl_multi_wait()
471 struct curl_waitfd wait_fd
;
473 wait_fd
.fd
= read_fd
;
474 wait_fd
.events
= CURL_WAIT_POLLIN
;
477 ret
= curl_multi_wait(handle
, &wait_fd
, 1, 0, &num_fds
);
478 if (ret
!= CURLM_OK
) {
479 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
483 // curl_multi_wait should flag revents when extra_fd is readable. if it
484 // doesn't, the bug is present and we can't rely on revents
485 if (wait_fd
.revents
== 0) {
486 curl_multi_wait_bug_present
= true;
487 ldout(cct
, 0) << "WARNING: detected a version of libcurl which contains a "
488 "bug in curl_multi_wait(). enabling a workaround that may degrade "
489 "performance slightly." << dendl
;
492 return clear_signal(read_fd
);
495 static bool is_signaled(const curl_waitfd
& wait_fd
)
497 if (wait_fd
.fd
< 0) {
502 if (curl_multi_wait_bug_present
) {
503 // we can't rely on revents, so we always return true if a wait_fd is given.
504 // this means we'll be trying a non-blocking read on this fd every time that
505 // curl_multi_wait() wakes up
509 return wait_fd
.revents
> 0;
512 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
515 struct curl_waitfd wait_fd
;
517 wait_fd
.fd
= signal_fd
;
518 wait_fd
.events
= CURL_WAIT_POLLIN
;
521 int ret
= curl_multi_wait(handle
, &wait_fd
, 1, cct
->_conf
->rgw_curl_wait_timeout_ms
, &num_fds
);
523 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
527 if (is_signaled(wait_fd
)) {
528 ret
= clear_signal(signal_fd
);
530 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
539 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
550 /* get file descriptors from the transfers */
551 int ret
= curl_multi_fdset(handle
, &fdread
, &fdwrite
, &fdexcep
, &maxfd
);
553 ldout(cct
, 0) << "ERROR: curl_multi_fdset returned " << ret
<< dendl
;
558 FD_SET(signal_fd
, &fdread
);
559 if (signal_fd
>= maxfd
) {
560 maxfd
= signal_fd
+ 1;
564 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
565 uint64_t to
= cct
->_conf
->rgw_curl_wait_timeout_ms
;
566 #define RGW_CURL_TIMEOUT 1000
568 to
= RGW_CURL_TIMEOUT
;
569 struct timeval timeout
;
570 timeout
.tv_sec
= to
/ 1000;
571 timeout
.tv_usec
= to
% 1000;
573 ret
= select(maxfd
+1, &fdread
, &fdwrite
, &fdexcep
, &timeout
);
576 ldout(cct
, 0) << "ERROR: select returned " << ret
<< dendl
;
580 if (signal_fd
> 0 && FD_ISSET(signal_fd
, &fdread
)) {
581 ret
= clear_signal(signal_fd
);
583 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
593 void *RGWHTTPManager::ReqsThread::entry()
595 manager
->reqs_thread_entry();
600 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
602 RGWHTTPManager::RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*_cm
) : cct(_cct
),
603 completion_mgr(_cm
), is_threaded(false),
604 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
607 multi_handle
= (void *)curl_multi_init();
612 RGWHTTPManager::~RGWHTTPManager() {
615 curl_multi_cleanup((CURLM
*)multi_handle
);
618 void RGWHTTPManager::register_request(rgw_http_req_data
*req_data
)
620 RWLock::WLocker
rl(reqs_lock
);
621 req_data
->id
= num_reqs
;
622 req_data
->registered
= true;
623 reqs
[num_reqs
] = req_data
;
625 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", easy_handle=" << req_data
->easy_handle
<< dendl
;
628 void RGWHTTPManager::unregister_request(rgw_http_req_data
*req_data
)
630 RWLock::WLocker
rl(reqs_lock
);
632 req_data
->registered
= false;
633 unregistered_reqs
.push_back(req_data
);
634 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", easy_handle=" << req_data
->easy_handle
<< dendl
;
637 void RGWHTTPManager::complete_request(rgw_http_req_data
*req_data
)
639 RWLock::WLocker
rl(reqs_lock
);
640 _complete_request(req_data
);
643 void RGWHTTPManager::_complete_request(rgw_http_req_data
*req_data
)
645 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(req_data
->id
);
646 if (iter
!= reqs
.end()) {
650 Mutex::Locker
l(req_data
->lock
);
651 req_data
->mgr
= nullptr;
653 if (completion_mgr
) {
654 completion_mgr
->complete(NULL
, req_data
->user_info
);
660 void RGWHTTPManager::finish_request(rgw_http_req_data
*req_data
, int ret
)
662 req_data
->finish(ret
);
663 complete_request(req_data
);
666 void RGWHTTPManager::_finish_request(rgw_http_req_data
*req_data
, int ret
)
668 req_data
->finish(ret
);
669 _complete_request(req_data
);
673 * hook request to the curl multi handle
675 int RGWHTTPManager::link_request(rgw_http_req_data
*req_data
)
677 ldout(cct
, 20) << __func__
<< " req_data=" << req_data
<< " req_data->id=" << req_data
->id
<< ", easy_handle=" << req_data
->easy_handle
<< dendl
;
678 CURLMcode mstatus
= curl_multi_add_handle((CURLM
*)multi_handle
, req_data
->easy_handle
);
680 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus
<< dendl
;
687 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
688 * there will be no more processing on this request
690 void RGWHTTPManager::_unlink_request(rgw_http_req_data
*req_data
)
692 if (req_data
->easy_handle
) {
693 curl_multi_remove_handle((CURLM
*)multi_handle
, req_data
->easy_handle
);
695 if (!req_data
->is_done()) {
696 _finish_request(req_data
, -ECANCELED
);
700 void RGWHTTPManager::unlink_request(rgw_http_req_data
*req_data
)
702 RWLock::WLocker
wl(reqs_lock
);
703 _unlink_request(req_data
);
706 void RGWHTTPManager::manage_pending_requests()
708 reqs_lock
.get_read();
709 if (max_threaded_req
== num_reqs
&& unregistered_reqs
.empty()) {
715 RWLock::WLocker
wl(reqs_lock
);
717 if (!unregistered_reqs
.empty()) {
718 for (auto& r
: unregistered_reqs
) {
723 unregistered_reqs
.clear();
726 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(max_threaded_req
);
728 list
<std::pair
<rgw_http_req_data
*, int> > remove_reqs
;
730 for (; iter
!= reqs
.end(); ++iter
) {
731 rgw_http_req_data
*req_data
= iter
->second
;
732 int r
= link_request(req_data
);
734 ldout(cct
, 0) << "ERROR: failed to link http request" << dendl
;
735 remove_reqs
.push_back(std::make_pair(iter
->second
, r
));
737 max_threaded_req
= iter
->first
+ 1;
741 for (auto piter
: remove_reqs
) {
742 rgw_http_req_data
*req_data
= piter
.first
;
743 int r
= piter
.second
;
745 _finish_request(req_data
, r
);
749 int RGWHTTPManager::add_request(RGWHTTPClient
*client
, const char *method
, const char *url
)
751 rgw_http_req_data
*req_data
= new rgw_http_req_data
;
753 int ret
= client
->init_request(method
, url
, req_data
);
760 req_data
->mgr
= this;
761 req_data
->client
= client
;
762 req_data
->user_info
= client
->get_user_info();
764 register_request(req_data
);
767 ret
= link_request(req_data
);
774 ret
= signal_thread();
776 finish_request(req_data
, ret
);
782 int RGWHTTPManager::remove_request(RGWHTTPClient
*client
)
784 rgw_http_req_data
*req_data
= client
->get_req_data();
787 unlink_request(req_data
);
790 unregister_request(req_data
);
791 int ret
= signal_thread();
800 * the synchronous, non-threaded request processing method.
802 int RGWHTTPManager::process_requests(bool wait_for_data
, bool *done
)
804 assert(!is_threaded
);
811 int ret
= do_curl_wait(cct
, (CURLM
*)multi_handle
, -1);
817 mstatus
= curl_multi_perform((CURLM
*)multi_handle
, &still_running
);
820 case CURLM_CALL_MULTI_PERFORM
:
823 dout(20) << "curl_multi_perform returned: " << mstatus
<< dendl
;
828 while ((msg
= curl_multi_info_read((CURLM
*)multi_handle
, &msgs_left
))) {
829 if (msg
->msg
== CURLMSG_DONE
) {
830 CURL
*e
= msg
->easy_handle
;
831 rgw_http_req_data
*req_data
;
832 curl_easy_getinfo(e
, CURLINFO_PRIVATE
, (void **)&req_data
);
835 curl_easy_getinfo(e
, CURLINFO_RESPONSE_CODE
, (void **)&http_status
);
837 int status
= rgw_http_error_to_errno(http_status
);
838 int result
= msg
->data
.result
;
839 finish_request(req_data
, status
);
844 dout(20) << "ERROR: msg->data.result=" << result
<< dendl
;
849 } while (mstatus
== CURLM_CALL_MULTI_PERFORM
);
851 *done
= (still_running
== 0);
857 * the synchronous, non-threaded request processing completion method.
859 int RGWHTTPManager::complete_requests()
864 ret
= process_requests(true, &done
);
865 } while (!done
&& !ret
);
870 int RGWHTTPManager::set_threaded()
872 int r
= pipe(thread_pipe
);
875 ldout(cct
, 0) << "ERROR: pipe() returned errno=" << r
<< dendl
;
879 // enable non-blocking reads
880 r
= ::fcntl(thread_pipe
[0], F_SETFL
, O_NONBLOCK
);
883 ldout(cct
, 0) << "ERROR: fcntl() returned errno=" << r
<< dendl
;
884 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
885 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
889 #ifdef HAVE_CURL_MULTI_WAIT
890 // on first initialization, use this pipe to detect whether we're using a
891 // buggy version of libcurl
892 std::call_once(detect_flag
, detect_curl_multi_wait_bug
, cct
,
893 static_cast<CURLM
*>(multi_handle
),
894 thread_pipe
[1], thread_pipe
[0]);
898 reqs_thread
= new ReqsThread(this);
899 reqs_thread
->create("http_manager");
903 void RGWHTTPManager::stop()
916 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
917 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
921 int RGWHTTPManager::signal_thread()
924 int ret
= write(thread_pipe
[1], (void *)&buf
, sizeof(buf
));
927 ldout(cct
, 0) << "ERROR: " << __func__
<< ": write() returned ret=" << ret
<< dendl
;
933 void *RGWHTTPManager::reqs_thread_entry()
938 ldout(cct
, 20) << __func__
<< ": start" << dendl
;
940 while (!going_down
) {
941 int ret
= do_curl_wait(cct
, (CURLM
*)multi_handle
, thread_pipe
[0]);
943 dout(0) << "ERROR: do_curl_wait() returned: " << ret
<< dendl
;
947 manage_pending_requests();
949 mstatus
= curl_multi_perform((CURLM
*)multi_handle
, &still_running
);
952 case CURLM_CALL_MULTI_PERFORM
:
955 dout(10) << "curl_multi_perform returned: " << mstatus
<< dendl
;
960 while ((msg
= curl_multi_info_read((CURLM
*)multi_handle
, &msgs_left
))) {
961 if (msg
->msg
== CURLMSG_DONE
) {
962 int result
= msg
->data
.result
;
963 CURL
*e
= msg
->easy_handle
;
964 rgw_http_req_data
*req_data
;
965 curl_easy_getinfo(e
, CURLINFO_PRIVATE
, (void **)&req_data
);
966 curl_multi_remove_handle((CURLM
*)multi_handle
, e
);
969 curl_easy_getinfo(e
, CURLINFO_RESPONSE_CODE
, (void **)&http_status
);
971 int status
= rgw_http_error_to_errno(http_status
);
972 if (result
!= CURLE_OK
&& http_status
== 0) {
975 int id
= req_data
->id
;
976 finish_request(req_data
, status
);
981 dout(20) << "ERROR: msg->data.result=" << result
<< " req_data->id=" << id
<< " http_status=" << http_status
<< dendl
;
989 RWLock::WLocker
rl(reqs_lock
);
990 for (auto r
: unregistered_reqs
) {
991 _finish_request(r
, -ECANCELED
);
994 unregistered_reqs
.clear();
996 auto all_reqs
= std::move(reqs
);
997 for (auto iter
: all_reqs
) {
998 _finish_request(iter
.second
, -ECANCELED
);
1003 if (completion_mgr
) {
1004 completion_mgr
->go_down();