1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/errno.h"
7 #include <boost/utility/string_ref.hpp>
10 #include <curl/easy.h>
11 #include <curl/multi.h>
13 #include "rgw_common.h"
14 #include "rgw_http_client.h"
15 #include "rgw_http_errors.h"
16 #include "common/RefCountedObj.h"
18 #include "rgw_coroutine.h"
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
25 RGWHTTPManager
*rgw_http_manager
;
29 static void do_curl_easy_cleanup(RGWCurlHandle
*curl_handle
);
31 struct rgw_http_req_data
: public RefCountedObject
{
32 RGWCurlHandle
*curl_handle
{nullptr};
33 curl_slist
*h
{nullptr};
36 std::atomic
<bool> done
= { false };
37 RGWHTTPClient
*client
{nullptr};
38 rgw_io_id control_io_id
;
39 void *user_info
{nullptr};
40 bool registered
{false};
41 RGWHTTPManager
*mgr
{nullptr};
42 char error_buf
[CURL_ERROR_SIZE
];
43 bool write_paused
{false};
44 bool read_paused
{false};
49 rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
50 memset(error_buf
, 0, sizeof(error_buf
));
54 Mutex::Locker
l(lock
);
62 void set_state(int bitmask
);
65 Mutex::Locker
l(lock
);
68 do_curl_easy_cleanup(curl_handle
);
71 curl_slist_free_all(h
);
84 Mutex::Locker
l(lock
);
89 Mutex::Locker
l(lock
);
93 RGWHTTPManager
*get_manager() {
94 Mutex::Locker
l(lock
);
98 CURL
*get_easy_handle() const;
101 struct RGWCurlHandle
{
106 explicit RGWCurlHandle(CURL
* h
) : uses(0), h(h
) {};
112 void rgw_http_req_data::set_state(int bitmask
) {
113 /* no need to lock here, moreover curl_easy_pause() might trigger
114 * the data receive callback :/
116 CURLcode rc
= curl_easy_pause(**curl_handle
, bitmask
);
117 if (rc
!= CURLE_OK
) {
118 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc
<< dendl
;
123 class RGWCurlHandles
: public Thread
{
126 std::vector
<RGWCurlHandle
*>saved_curl
;
127 int cleaner_shutdown
;
131 cleaner_lock
{"RGWCurlHandles::cleaner_lock"},
132 cleaner_shutdown
{0} {
135 RGWCurlHandle
* get_curl_handle();
136 void release_curl_handle_now(RGWCurlHandle
* curl
);
137 void release_curl_handle(RGWCurlHandle
* curl
);
138 void flush_curl_handles();
143 RGWCurlHandle
* RGWCurlHandles::get_curl_handle() {
144 RGWCurlHandle
* curl
= 0;
147 Mutex::Locker
lock(cleaner_lock
);
148 if (!saved_curl
.empty()) {
149 curl
= *saved_curl
.begin();
150 saved_curl
.erase(saved_curl
.begin());
154 } else if ((h
= curl_easy_init())) {
155 curl
= new RGWCurlHandle
{h
};
162 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle
* curl
)
164 curl_easy_cleanup(**curl
);
168 void RGWCurlHandles::release_curl_handle(RGWCurlHandle
* curl
)
170 if (cleaner_shutdown
) {
171 release_curl_handle_now(curl
);
173 curl_easy_reset(**curl
);
174 Mutex::Locker
lock(cleaner_lock
);
175 curl
->lastuse
= mono_clock::now();
176 saved_curl
.insert(saved_curl
.begin(), 1, curl
);
180 void* RGWCurlHandles::entry()
183 Mutex::Locker
lock(cleaner_lock
);
186 if (cleaner_shutdown
) {
187 if (saved_curl
.empty())
190 utime_t release
= ceph_clock_now() + utime_t(MAXIDLE
,0);
191 cleaner_cond
.WaitUntil(cleaner_lock
, release
);
193 mono_time now
= mono_clock::now();
194 while (!saved_curl
.empty()) {
195 auto cend
= saved_curl
.end();
198 if (!cleaner_shutdown
&& now
- curl
->lastuse
< std::chrono::seconds(MAXIDLE
))
200 saved_curl
.erase(cend
);
201 release_curl_handle_now(curl
);
207 void RGWCurlHandles::stop()
209 Mutex::Locker
lock(cleaner_lock
);
210 cleaner_shutdown
= 1;
211 cleaner_cond
.Signal();
214 void RGWCurlHandles::flush_curl_handles()
218 if (!saved_curl
.empty()) {
219 dout(0) << "ERROR: " << __func__
<< " failed final cleanup" << dendl
;
221 saved_curl
.shrink_to_fit();
224 CURL
*rgw_http_req_data::get_easy_handle() const
226 return **curl_handle
;
229 static RGWCurlHandles
*handles
;
231 static RGWCurlHandle
*do_curl_easy_init()
233 return handles
->get_curl_handle();
236 static void do_curl_easy_cleanup(RGWCurlHandle
*curl_handle
)
238 handles
->release_curl_handle(curl_handle
);
241 // XXX make this part of the token cache? (but that's swift-only;
242 // and this especially needs to integrates with s3...)
244 void rgw_setup_saved_curl_handles()
246 handles
= new RGWCurlHandles();
247 handles
->create("rgw_curl");
250 void rgw_release_all_curl_handles()
252 handles
->flush_curl_handles();
256 void RGWIOProvider::assign_io(RGWIOIDProvider
& io_id_provider
, int io_type
)
259 id
= io_id_provider
.get_next();
264 * the following set of callbacks will be called either on RGWHTTPManager::process(),
265 * or via the RGWHTTPManager async processing.
267 size_t RGWHTTPClient::receive_http_header(void * const ptr
,
272 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
273 size_t len
= size
* nmemb
;
275 Mutex::Locker
l(req_data
->lock
);
277 if (!req_data
->registered
) {
281 int ret
= req_data
->client
->receive_header(ptr
, size
* nmemb
);
283 dout(0) << "WARNING: client->receive_header() returned ret=" << ret
<< dendl
;
289 size_t RGWHTTPClient::receive_http_data(void * const ptr
,
294 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
295 size_t len
= size
* nmemb
;
299 RGWHTTPClient
*client
;
302 Mutex::Locker
l(req_data
->lock
);
303 if (!req_data
->registered
) {
307 client
= req_data
->client
;
310 size_t& skip_bytes
= client
->receive_pause_skip
;
312 if (skip_bytes
>= len
) {
317 int ret
= client
->receive_data((char *)ptr
+ skip_bytes
, len
- skip_bytes
, &pause
);
319 dout(0) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
323 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl
;
325 Mutex::Locker
l(req_data
->lock
);
326 req_data
->read_paused
= true;
327 return CURL_WRITEFUNC_PAUSE
;
335 size_t RGWHTTPClient::send_http_data(void * const ptr
,
340 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
342 RGWHTTPClient
*client
;
345 Mutex::Locker
l(req_data
->lock
);
347 if (!req_data
->registered
) {
351 client
= req_data
->client
;
356 int ret
= client
->send_data(ptr
, size
* nmemb
, &pause
);
358 dout(0) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
363 Mutex::Locker
l(req_data
->lock
);
364 req_data
->write_paused
= true;
365 return CURL_READFUNC_PAUSE
;
371 Mutex
& RGWHTTPClient::get_req_lock()
373 return req_data
->lock
;
376 void RGWHTTPClient::_set_write_paused(bool pause
)
378 ceph_assert(req_data
->lock
.is_locked());
380 RGWHTTPManager
*mgr
= req_data
->mgr
;
381 if (pause
== req_data
->write_paused
) {
385 mgr
->set_request_state(this, SET_WRITE_PAUSED
);
387 mgr
->set_request_state(this, SET_WRITE_RESUME
);
391 void RGWHTTPClient::_set_read_paused(bool pause
)
393 ceph_assert(req_data
->lock
.is_locked());
395 RGWHTTPManager
*mgr
= req_data
->mgr
;
396 if (pause
== req_data
->read_paused
) {
400 mgr
->set_request_state(this, SET_READ_PAUSED
);
402 mgr
->set_request_state(this, SET_READ_RESUME
);
406 static curl_slist
*headers_to_slist(param_vec_t
& headers
)
408 curl_slist
*h
= NULL
;
410 param_vec_t::iterator iter
;
411 for (iter
= headers
.begin(); iter
!= headers
.end(); ++iter
) {
412 pair
<string
, string
>& p
= *iter
;
413 string val
= p
.first
;
415 if (strncmp(val
.c_str(), "HTTP_", 5) == 0) {
419 /* we need to convert all underscores into dashes as some web servers forbid them
420 * in the http header field names
422 for (size_t i
= 0; i
< val
.size(); i
++) {
428 val
= camelcase_dash_http_attr(val
);
430 // curl won't send headers with empty values unless it ends with a ; instead
431 if (p
.second
.empty()) {
435 val
.append(p
.second
);
437 h
= curl_slist_append(h
, val
.c_str());
443 static bool is_upload_request(const string
& method
)
445 return method
== "POST" || method
== "PUT";
449 * process a single simple one off request
451 int RGWHTTPClient::process()
453 return RGWHTTP::process(this);
456 string
RGWHTTPClient::to_str()
458 string method_str
= (method
.empty() ? "<no-method>" : method
);
459 string url_str
= (url
.empty() ? "<no-url>" : url
);
460 return method_str
+ " " + url_str
;
463 int RGWHTTPClient::get_req_retcode()
469 return req_data
->get_retcode();
473 * init request, will be used later with RGWHTTPManager
475 int RGWHTTPClient::init_request(rgw_http_req_data
*_req_data
)
477 ceph_assert(!req_data
);
479 req_data
= _req_data
;
481 req_data
->curl_handle
= do_curl_easy_init();
483 CURL
*easy_handle
= req_data
->get_easy_handle();
485 dout(20) << "sending request to " << url
<< dendl
;
487 curl_slist
*h
= headers_to_slist(headers
);
491 curl_easy_setopt(easy_handle
, CURLOPT_CUSTOMREQUEST
, method
.c_str());
492 curl_easy_setopt(easy_handle
, CURLOPT_URL
, url
.c_str());
493 curl_easy_setopt(easy_handle
, CURLOPT_NOPROGRESS
, 1L);
494 curl_easy_setopt(easy_handle
, CURLOPT_NOSIGNAL
, 1L);
495 curl_easy_setopt(easy_handle
, CURLOPT_HEADERFUNCTION
, receive_http_header
);
496 curl_easy_setopt(easy_handle
, CURLOPT_WRITEHEADER
, (void *)req_data
);
497 curl_easy_setopt(easy_handle
, CURLOPT_WRITEFUNCTION
, receive_http_data
);
498 curl_easy_setopt(easy_handle
, CURLOPT_WRITEDATA
, (void *)req_data
);
499 curl_easy_setopt(easy_handle
, CURLOPT_ERRORBUFFER
, (void *)req_data
->error_buf
);
500 curl_easy_setopt(easy_handle
, CURLOPT_LOW_SPEED_TIME
, cct
->_conf
->rgw_curl_low_speed_time
);
501 curl_easy_setopt(easy_handle
, CURLOPT_LOW_SPEED_LIMIT
, cct
->_conf
->rgw_curl_low_speed_limit
);
503 curl_easy_setopt(easy_handle
, CURLOPT_HTTPHEADER
, (void *)h
);
505 curl_easy_setopt(easy_handle
, CURLOPT_READFUNCTION
, send_http_data
);
506 curl_easy_setopt(easy_handle
, CURLOPT_READDATA
, (void *)req_data
);
507 if (send_data_hint
|| is_upload_request(method
)) {
508 curl_easy_setopt(easy_handle
, CURLOPT_UPLOAD
, 1L);
511 curl_easy_setopt(easy_handle
, CURLOPT_INFILESIZE
, (void *)send_len
);
514 curl_easy_setopt(easy_handle
, CURLOPT_SSL_VERIFYPEER
, 0L);
515 curl_easy_setopt(easy_handle
, CURLOPT_SSL_VERIFYHOST
, 0L);
516 dout(20) << "ssl verification is set to off" << dendl
;
518 curl_easy_setopt(easy_handle
, CURLOPT_PRIVATE
, (void *)req_data
);
523 bool RGWHTTPClient::is_done()
525 return req_data
->is_done();
529 * wait for async request to complete
531 int RGWHTTPClient::wait()
533 return req_data
->wait();
536 void RGWHTTPClient::cancel()
539 RGWHTTPManager
*http_manager
= req_data
->mgr
;
541 http_manager
->remove_request(this);
546 RGWHTTPClient::~RGWHTTPClient()
555 int RGWHTTPHeadersCollector::receive_header(void * const ptr
, const size_t len
)
557 const boost::string_ref
header_line(static_cast<const char *>(ptr
), len
);
559 /* We're tokening the line that way due to backward compatibility. */
560 const size_t sep_loc
= header_line
.find_first_of(" \t:");
562 if (boost::string_ref::npos
== sep_loc
) {
563 /* Wrongly formatted header? Just skip it. */
567 header_name_t
name(header_line
.substr(0, sep_loc
));
568 if (0 == relevant_headers
.count(name
)) {
569 /* Not interested in this particular header. */
573 const auto value_part
= header_line
.substr(sep_loc
+ 1);
575 /* Skip spaces and tabs after the separator. */
576 const size_t val_loc_s
= value_part
.find_first_not_of(' ');
577 const size_t val_loc_e
= value_part
.find_first_of("\r\n");
579 if (boost::string_ref::npos
== val_loc_s
||
580 boost::string_ref::npos
== val_loc_e
) {
581 /* Empty value case. */
582 found_headers
.emplace(name
, header_value_t());
584 found_headers
.emplace(name
, header_value_t(
585 value_part
.substr(val_loc_s
, val_loc_e
- val_loc_s
)));
591 int RGWHTTPTransceiver::send_data(void* ptr
, size_t len
, bool* pause
)
593 int length_to_copy
= 0;
594 if (post_data_index
< post_data
.length()) {
595 length_to_copy
= min(post_data
.length() - post_data_index
, len
);
596 memcpy(ptr
, post_data
.data() + post_data_index
, length_to_copy
);
597 post_data_index
+= length_to_copy
;
599 return length_to_copy
;
603 static int clear_signal(int fd
)
605 // since we're in non-blocking mode, we can try to read a lot more than
606 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
607 // are also required to support the curl_multi_wait bug workaround
608 std::array
<char, 256> buf
;
609 int ret
= ::read(fd
, (void *)buf
.data(), buf
.size());
612 return ret
== -EAGAIN
? 0 : ret
; // clear EAGAIN
617 #if HAVE_CURL_MULTI_WAIT
619 static std::once_flag detect_flag
;
620 static bool curl_multi_wait_bug_present
= false;
622 static int detect_curl_multi_wait_bug(CephContext
*cct
, CURLM
*handle
,
623 int write_fd
, int read_fd
)
627 // write to write_fd so that read_fd becomes readable
629 ret
= ::write(write_fd
, &buf
, sizeof(buf
));
632 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): write() returned " << ret
<< dendl
;
636 // pass read_fd in extra_fds for curl_multi_wait()
638 struct curl_waitfd wait_fd
;
640 wait_fd
.fd
= read_fd
;
641 wait_fd
.events
= CURL_WAIT_POLLIN
;
644 ret
= curl_multi_wait(handle
, &wait_fd
, 1, 0, &num_fds
);
645 if (ret
!= CURLM_OK
) {
646 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
650 // curl_multi_wait should flag revents when extra_fd is readable. if it
651 // doesn't, the bug is present and we can't rely on revents
652 if (wait_fd
.revents
== 0) {
653 curl_multi_wait_bug_present
= true;
654 ldout(cct
, 0) << "WARNING: detected a version of libcurl which contains a "
655 "bug in curl_multi_wait(). enabling a workaround that may degrade "
656 "performance slightly." << dendl
;
659 return clear_signal(read_fd
);
662 static bool is_signaled(const curl_waitfd
& wait_fd
)
664 if (wait_fd
.fd
< 0) {
669 if (curl_multi_wait_bug_present
) {
670 // we can't rely on revents, so we always return true if a wait_fd is given.
671 // this means we'll be trying a non-blocking read on this fd every time that
672 // curl_multi_wait() wakes up
676 return wait_fd
.revents
> 0;
679 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
682 struct curl_waitfd wait_fd
;
684 wait_fd
.fd
= signal_fd
;
685 wait_fd
.events
= CURL_WAIT_POLLIN
;
688 int ret
= curl_multi_wait(handle
, &wait_fd
, 1, cct
->_conf
->rgw_curl_wait_timeout_ms
, &num_fds
);
690 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
694 if (is_signaled(wait_fd
)) {
695 ret
= clear_signal(signal_fd
);
697 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
706 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
717 /* get file descriptors from the transfers */
718 int ret
= curl_multi_fdset(handle
, &fdread
, &fdwrite
, &fdexcep
, &maxfd
);
720 ldout(cct
, 0) << "ERROR: curl_multi_fdset returned " << ret
<< dendl
;
725 FD_SET(signal_fd
, &fdread
);
726 if (signal_fd
>= maxfd
) {
727 maxfd
= signal_fd
+ 1;
731 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
732 uint64_t to
= cct
->_conf
->rgw_curl_wait_timeout_ms
;
733 #define RGW_CURL_TIMEOUT 1000
735 to
= RGW_CURL_TIMEOUT
;
736 struct timeval timeout
;
737 timeout
.tv_sec
= to
/ 1000;
738 timeout
.tv_usec
= to
% 1000;
740 ret
= select(maxfd
+1, &fdread
, &fdwrite
, &fdexcep
, &timeout
);
743 ldout(cct
, 0) << "ERROR: select returned " << ret
<< dendl
;
747 if (signal_fd
> 0 && FD_ISSET(signal_fd
, &fdread
)) {
748 ret
= clear_signal(signal_fd
);
750 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
760 void *RGWHTTPManager::ReqsThread::entry()
762 manager
->reqs_thread_entry();
767 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
769 RGWHTTPManager::RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*_cm
) : cct(_cct
),
770 completion_mgr(_cm
), is_started(false),
771 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
774 multi_handle
= (void *)curl_multi_init();
779 RGWHTTPManager::~RGWHTTPManager() {
782 curl_multi_cleanup((CURLM
*)multi_handle
);
785 void RGWHTTPManager::register_request(rgw_http_req_data
*req_data
)
787 RWLock::WLocker
rl(reqs_lock
);
788 req_data
->id
= num_reqs
;
789 req_data
->registered
= true;
790 reqs
[num_reqs
] = req_data
;
792 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", curl_handle=" << req_data
->curl_handle
<< dendl
;
795 bool RGWHTTPManager::unregister_request(rgw_http_req_data
*req_data
)
797 RWLock::WLocker
rl(reqs_lock
);
798 if (!req_data
->registered
) {
802 req_data
->registered
= false;
803 unregistered_reqs
.push_back(req_data
);
804 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", curl_handle=" << req_data
->curl_handle
<< dendl
;
808 void RGWHTTPManager::complete_request(rgw_http_req_data
*req_data
)
810 RWLock::WLocker
rl(reqs_lock
);
811 _complete_request(req_data
);
814 void RGWHTTPManager::_complete_request(rgw_http_req_data
*req_data
)
816 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(req_data
->id
);
817 if (iter
!= reqs
.end()) {
821 Mutex::Locker
l(req_data
->lock
);
822 req_data
->mgr
= nullptr;
824 if (completion_mgr
) {
825 completion_mgr
->complete(NULL
, req_data
->control_io_id
, req_data
->user_info
);
831 void RGWHTTPManager::finish_request(rgw_http_req_data
*req_data
, int ret
)
833 req_data
->finish(ret
);
834 complete_request(req_data
);
837 void RGWHTTPManager::_finish_request(rgw_http_req_data
*req_data
, int ret
)
839 req_data
->finish(ret
);
840 _complete_request(req_data
);
843 void RGWHTTPManager::_set_req_state(set_state
& ss
)
845 ss
.req
->set_state(ss
.bitmask
);
848 * hook request to the curl multi handle
850 int RGWHTTPManager::link_request(rgw_http_req_data
*req_data
)
852 ldout(cct
, 20) << __func__
<< " req_data=" << req_data
<< " req_data->id=" << req_data
->id
<< ", curl_handle=" << req_data
->curl_handle
<< dendl
;
853 CURLMcode mstatus
= curl_multi_add_handle((CURLM
*)multi_handle
, req_data
->get_easy_handle());
855 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus
<< dendl
;
862 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
863 * there will be no more processing on this request
865 void RGWHTTPManager::_unlink_request(rgw_http_req_data
*req_data
)
867 if (req_data
->curl_handle
) {
868 curl_multi_remove_handle((CURLM
*)multi_handle
, req_data
->get_easy_handle());
870 if (!req_data
->_is_done()) {
871 _finish_request(req_data
, -ECANCELED
);
875 void RGWHTTPManager::unlink_request(rgw_http_req_data
*req_data
)
877 RWLock::WLocker
wl(reqs_lock
);
878 _unlink_request(req_data
);
881 void RGWHTTPManager::manage_pending_requests()
883 reqs_lock
.get_read();
884 if (max_threaded_req
== num_reqs
&&
885 unregistered_reqs
.empty() &&
886 reqs_change_state
.empty()) {
892 RWLock::WLocker
wl(reqs_lock
);
894 if (!unregistered_reqs
.empty()) {
895 for (auto& r
: unregistered_reqs
) {
900 unregistered_reqs
.clear();
903 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(max_threaded_req
);
905 list
<std::pair
<rgw_http_req_data
*, int> > remove_reqs
;
907 for (; iter
!= reqs
.end(); ++iter
) {
908 rgw_http_req_data
*req_data
= iter
->second
;
909 int r
= link_request(req_data
);
911 ldout(cct
, 0) << "ERROR: failed to link http request" << dendl
;
912 remove_reqs
.push_back(std::make_pair(iter
->second
, r
));
914 max_threaded_req
= iter
->first
+ 1;
918 if (!reqs_change_state
.empty()) {
919 for (auto siter
: reqs_change_state
) {
920 _set_req_state(siter
);
922 reqs_change_state
.clear();
925 for (auto piter
: remove_reqs
) {
926 rgw_http_req_data
*req_data
= piter
.first
;
927 int r
= piter
.second
;
929 _finish_request(req_data
, r
);
933 int RGWHTTPManager::add_request(RGWHTTPClient
*client
)
935 rgw_http_req_data
*req_data
= new rgw_http_req_data
;
937 int ret
= client
->init_request(req_data
);
944 req_data
->mgr
= this;
945 req_data
->client
= client
;
946 req_data
->control_io_id
= client
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL
);
947 req_data
->user_info
= client
->get_io_user_info();
949 register_request(req_data
);
952 ret
= link_request(req_data
);
959 ret
= signal_thread();
961 finish_request(req_data
, ret
);
967 int RGWHTTPManager::remove_request(RGWHTTPClient
*client
)
969 rgw_http_req_data
*req_data
= client
->get_req_data();
972 unlink_request(req_data
);
975 if (!unregister_request(req_data
)) {
978 int ret
= signal_thread();
986 int RGWHTTPManager::set_request_state(RGWHTTPClient
*client
, RGWHTTPRequestSetState state
)
988 rgw_http_req_data
*req_data
= client
->get_req_data();
990 ceph_assert(req_data
->lock
.is_locked());
992 /* can only do that if threaded */
997 bool suggested_wr_paused
= req_data
->write_paused
;
998 bool suggested_rd_paused
= req_data
->read_paused
;
1001 case SET_WRITE_PAUSED
:
1002 suggested_wr_paused
= true;
1004 case SET_WRITE_RESUME
:
1005 suggested_wr_paused
= false;
1007 case SET_READ_PAUSED
:
1008 suggested_rd_paused
= true;
1010 case SET_READ_RESUME
:
1011 suggested_rd_paused
= false;
1014 /* shouldn't really be here */
1017 if (suggested_wr_paused
== req_data
->write_paused
&&
1018 suggested_rd_paused
== req_data
->read_paused
) {
1022 req_data
->write_paused
= suggested_wr_paused
;
1023 req_data
->read_paused
= suggested_rd_paused
;
1025 int bitmask
= CURLPAUSE_CONT
;
1027 if (req_data
->write_paused
) {
1028 bitmask
|= CURLPAUSE_SEND
;
1031 if (req_data
->read_paused
) {
1032 bitmask
|= CURLPAUSE_RECV
;
1035 reqs_change_state
.push_back(set_state(req_data
, bitmask
));
1036 int ret
= signal_thread();
1044 int RGWHTTPManager::start()
1046 if (pipe_cloexec(thread_pipe
) < 0) {
1048 ldout(cct
, 0) << "ERROR: pipe(): " << cpp_strerror(e
) << dendl
;
1052 // enable non-blocking reads
1053 if (::fcntl(thread_pipe
[0], F_SETFL
, O_NONBLOCK
) < 0) {
1055 ldout(cct
, 0) << "ERROR: fcntl(): " << cpp_strerror(e
) << dendl
;
1056 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
1057 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
1061 #ifdef HAVE_CURL_MULTI_WAIT
1062 // on first initialization, use this pipe to detect whether we're using a
1063 // buggy version of libcurl
1064 std::call_once(detect_flag
, detect_curl_multi_wait_bug
, cct
,
1065 static_cast<CURLM
*>(multi_handle
),
1066 thread_pipe
[1], thread_pipe
[0]);
1070 reqs_thread
= new ReqsThread(this);
1071 reqs_thread
->create("http_manager");
1075 void RGWHTTPManager::stop()
1086 reqs_thread
->join();
1088 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
1089 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
1093 int RGWHTTPManager::signal_thread()
1096 int ret
= write(thread_pipe
[1], (void *)&buf
, sizeof(buf
));
1099 ldout(cct
, 0) << "ERROR: " << __func__
<< ": write() returned ret=" << ret
<< dendl
;
1105 void *RGWHTTPManager::reqs_thread_entry()
1110 ldout(cct
, 20) << __func__
<< ": start" << dendl
;
1112 while (!going_down
) {
1113 int ret
= do_curl_wait(cct
, (CURLM
*)multi_handle
, thread_pipe
[0]);
1115 dout(0) << "ERROR: do_curl_wait() returned: " << ret
<< dendl
;
1119 manage_pending_requests();
1121 mstatus
= curl_multi_perform((CURLM
*)multi_handle
, &still_running
);
1124 case CURLM_CALL_MULTI_PERFORM
:
1127 dout(10) << "curl_multi_perform returned: " << mstatus
<< dendl
;
1132 while ((msg
= curl_multi_info_read((CURLM
*)multi_handle
, &msgs_left
))) {
1133 if (msg
->msg
== CURLMSG_DONE
) {
1134 int result
= msg
->data
.result
;
1135 CURL
*e
= msg
->easy_handle
;
1136 rgw_http_req_data
*req_data
;
1137 curl_easy_getinfo(e
, CURLINFO_PRIVATE
, (void **)&req_data
);
1138 curl_multi_remove_handle((CURLM
*)multi_handle
, e
);
1141 curl_easy_getinfo(e
, CURLINFO_RESPONSE_CODE
, (void **)&http_status
);
1143 int status
= rgw_http_error_to_errno(http_status
);
1144 if (result
!= CURLE_OK
&& status
== 0) {
1145 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode
)result
) << ", maybe network unstable" << dendl
;
1148 int id
= req_data
->id
;
1149 finish_request(req_data
, status
);
1153 case CURLE_OPERATION_TIMEDOUT
:
1154 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1155 << cct
->_conf
->rgw_curl_low_speed_limit
<< " Bytes per second during " << cct
->_conf
->rgw_curl_low_speed_time
<< " seconds." << dendl
;
1157 dout(20) << "ERROR: msg->data.result=" << result
<< " req_data->id=" << id
<< " http_status=" << http_status
<< dendl
;
1158 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode
)result
) << dendl
;
1166 RWLock::WLocker
rl(reqs_lock
);
1167 for (auto r
: unregistered_reqs
) {
1171 unregistered_reqs
.clear();
1173 auto all_reqs
= std::move(reqs
);
1174 for (auto iter
: all_reqs
) {
1175 _unlink_request(iter
.second
);
1180 if (completion_mgr
) {
1181 completion_mgr
->go_down();
1187 void rgw_http_client_init(CephContext
*cct
)
1189 curl_global_init(CURL_GLOBAL_ALL
);
1190 rgw_http_manager
= new RGWHTTPManager(cct
);
1191 rgw_http_manager
->start();
1194 void rgw_http_client_cleanup()
1196 rgw_http_manager
->stop();
1197 delete rgw_http_manager
;
1198 curl_global_cleanup();
1202 int RGWHTTP::send(RGWHTTPClient
*req
) {
1206 int r
= rgw_http_manager
->add_request(req
);
1214 int RGWHTTP::process(RGWHTTPClient
*req
) {