1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "include/compat.h"
5 #include "common/errno.h"
10 #include <curl/multi.h>
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/async/completion.h"
16 #include "common/RefCountedObj.h"
18 #include "rgw_coroutine.h"
19 #include "rgw_tools.h"
22 #include <string_view>
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rgw
29 RGWHTTPManager
*rgw_http_manager
;
33 static void do_curl_easy_cleanup(RGWCurlHandle
*curl_handle
);
35 struct rgw_http_req_data
: public RefCountedObject
{
36 RGWCurlHandle
*curl_handle
{nullptr};
37 curl_slist
*h
{nullptr};
40 std::atomic
<bool> done
= { false };
41 RGWHTTPClient
*client
{nullptr};
42 rgw_io_id control_io_id
;
43 void *user_info
{nullptr};
44 bool registered
{false};
45 RGWHTTPManager
*mgr
{nullptr};
46 char error_buf
[CURL_ERROR_SIZE
];
47 bool write_paused
{false};
48 bool read_paused
{false};
50 optional
<int> user_ret
;
52 ceph::mutex lock
= ceph::make_mutex("rgw_http_req_data::lock");
53 ceph::condition_variable cond
;
55 using Signature
= void(boost::system::error_code
);
56 using Completion
= ceph::async::Completion
<Signature
>;
57 std::unique_ptr
<Completion
> completion
;
59 rgw_http_req_data() : id(-1) {
60 // FIPS zeroization audit 20191115: this memset is not security related.
61 memset(error_buf
, 0, sizeof(error_buf
));
64 template <typename ExecutionContext
, typename CompletionToken
>
65 auto async_wait(ExecutionContext
& ctx
, CompletionToken
&& token
) {
66 boost::asio::async_completion
<CompletionToken
, Signature
> init(token
);
67 auto& handler
= init
.completion_handler
;
69 std::unique_lock l
{lock
};
70 completion
= Completion::create(ctx
.get_executor(), std::move(handler
));
72 return init
.result
.get();
75 int wait(optional_yield y
) {
80 auto& context
= y
.get_io_context();
81 auto& yield
= y
.get_yield_context();
82 boost::system::error_code ec
;
83 async_wait(context
, yield
[ec
]);
86 // work on asio threads should be asynchronous, so warn when they block
88 dout(20) << "WARNING: blocking http request" << dendl
;
90 std::unique_lock l
{lock
};
91 cond
.wait(l
, [this]{return done
==true;});
95 void set_state(int bitmask
);
97 void finish(int r
, long http_status
= -1) {
98 std::lock_guard l
{lock
};
99 if (http_status
!= -1) {
101 client
->set_http_status(http_status
);
106 do_curl_easy_cleanup(curl_handle
);
109 curl_slist_free_all(h
);
115 boost::system::error_code
ec(-ret
, boost::system::system_category());
116 Completion::post(std::move(completion
), ec
);
127 std::lock_guard l
{lock
};
131 RGWHTTPManager
*get_manager() {
132 std::lock_guard l
{lock
};
136 CURL
*get_easy_handle() const;
139 struct RGWCurlHandle
{
144 explicit RGWCurlHandle(CURL
* h
) : uses(0), h(h
) {};
150 void rgw_http_req_data::set_state(int bitmask
) {
151 /* no need to lock here, moreover curl_easy_pause() might trigger
152 * the data receive callback :/
154 CURLcode rc
= curl_easy_pause(**curl_handle
, bitmask
);
155 if (rc
!= CURLE_OK
) {
156 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc
<< dendl
;
161 class RGWCurlHandles
: public Thread
{
163 ceph::mutex cleaner_lock
= ceph::make_mutex("RGWCurlHandles::cleaner_lock");
164 std::vector
<RGWCurlHandle
*> saved_curl
;
165 int cleaner_shutdown
;
166 ceph::condition_variable cleaner_cond
;
169 cleaner_shutdown
{0} {
172 RGWCurlHandle
* get_curl_handle();
173 void release_curl_handle_now(RGWCurlHandle
* curl
);
174 void release_curl_handle(RGWCurlHandle
* curl
);
175 void flush_curl_handles();
180 RGWCurlHandle
* RGWCurlHandles::get_curl_handle() {
181 RGWCurlHandle
* curl
= 0;
184 std::lock_guard lock
{cleaner_lock
};
185 if (!saved_curl
.empty()) {
186 curl
= *saved_curl
.begin();
187 saved_curl
.erase(saved_curl
.begin());
191 } else if ((h
= curl_easy_init())) {
192 curl
= new RGWCurlHandle
{h
};
199 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle
* curl
)
201 curl_easy_cleanup(**curl
);
205 void RGWCurlHandles::release_curl_handle(RGWCurlHandle
* curl
)
207 if (cleaner_shutdown
) {
208 release_curl_handle_now(curl
);
210 curl_easy_reset(**curl
);
211 std::lock_guard lock
{cleaner_lock
};
212 curl
->lastuse
= mono_clock::now();
213 saved_curl
.insert(saved_curl
.begin(), 1, curl
);
217 void* RGWCurlHandles::entry()
220 std::unique_lock lock
{cleaner_lock
};
223 if (cleaner_shutdown
) {
224 if (saved_curl
.empty())
227 cleaner_cond
.wait_for(lock
, std::chrono::seconds(MAXIDLE
));
229 mono_time now
= mono_clock::now();
230 while (!saved_curl
.empty()) {
231 auto cend
= saved_curl
.end();
234 if (!cleaner_shutdown
&& now
- curl
->lastuse
< std::chrono::seconds(MAXIDLE
))
236 saved_curl
.erase(cend
);
237 release_curl_handle_now(curl
);
243 void RGWCurlHandles::stop()
245 std::lock_guard lock
{cleaner_lock
};
246 cleaner_shutdown
= 1;
247 cleaner_cond
.notify_all();
250 void RGWCurlHandles::flush_curl_handles()
254 if (!saved_curl
.empty()) {
255 dout(0) << "ERROR: " << __func__
<< " failed final cleanup" << dendl
;
257 saved_curl
.shrink_to_fit();
260 CURL
*rgw_http_req_data::get_easy_handle() const
262 return **curl_handle
;
265 static RGWCurlHandles
*handles
;
267 static RGWCurlHandle
*do_curl_easy_init()
269 return handles
->get_curl_handle();
272 static void do_curl_easy_cleanup(RGWCurlHandle
*curl_handle
)
274 handles
->release_curl_handle(curl_handle
);
277 // XXX make this part of the token cache? (but that's swift-only;
278 // and this especially needs to integrates with s3...)
280 void rgw_setup_saved_curl_handles()
282 handles
= new RGWCurlHandles();
283 handles
->create("rgw_curl");
286 void rgw_release_all_curl_handles()
288 handles
->flush_curl_handles();
292 void RGWIOProvider::assign_io(RGWIOIDProvider
& io_id_provider
, int io_type
)
295 id
= io_id_provider
.get_next();
299 RGWHTTPClient::RGWHTTPClient(CephContext
*cct
,
300 const string
& _method
,
302 : NoDoutPrefix(cct
, dout_subsys
),
304 http_status(HTTP_STATUS_NOSTATUS
),
306 verify_ssl(cct
->_conf
->rgw_verify_ssl
),
313 std::ostream
& RGWHTTPClient::gen_prefix(std::ostream
& out
) const
315 out
<< "http_client[" << method
<< "/" << url
<< "]";
319 void RGWHTTPClient::init()
321 auto pos
= url
.find("://");
322 if (pos
== string::npos
) {
327 protocol
= url
.substr(0, pos
);
331 auto host_end_pos
= url
.find("/", pos
);
332 if (host_end_pos
== string::npos
) {
333 host
= url
.substr(pos
);
337 host
= url
.substr(pos
, host_end_pos
- pos
);
338 resource_prefix
= url
.substr(host_end_pos
+ 1);
339 if (resource_prefix
.size() > 0 && resource_prefix
[resource_prefix
.size() - 1] != '/') {
340 resource_prefix
.append("/");
345 * the following set of callbacks will be called either on RGWHTTPManager::process(),
346 * or via the RGWHTTPManager async processing.
348 size_t RGWHTTPClient::receive_http_header(void * const ptr
,
353 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
354 size_t len
= size
* nmemb
;
356 std::lock_guard l
{req_data
->lock
};
358 if (!req_data
->registered
) {
362 int ret
= req_data
->client
->receive_header(ptr
, size
* nmemb
);
364 dout(5) << "WARNING: client->receive_header() returned ret=" << ret
<< dendl
;
365 req_data
->user_ret
= ret
;
366 return CURLE_WRITE_ERROR
;
372 size_t RGWHTTPClient::receive_http_data(void * const ptr
,
377 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
378 size_t len
= size
* nmemb
;
382 RGWHTTPClient
*client
;
385 std::lock_guard l
{req_data
->lock
};
386 if (!req_data
->registered
) {
390 client
= req_data
->client
;
393 size_t& skip_bytes
= client
->receive_pause_skip
;
395 if (skip_bytes
>= len
) {
400 int ret
= client
->receive_data((char *)ptr
+ skip_bytes
, len
- skip_bytes
, &pause
);
402 dout(5) << "WARNING: client->receive_data() returned ret=" << ret
<< dendl
;
403 req_data
->user_ret
= ret
;
404 return CURLE_WRITE_ERROR
;
408 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl
;
410 std::lock_guard l
{req_data
->lock
};
411 req_data
->read_paused
= true;
412 return CURL_WRITEFUNC_PAUSE
;
420 size_t RGWHTTPClient::send_http_data(void * const ptr
,
425 rgw_http_req_data
*req_data
= static_cast<rgw_http_req_data
*>(_info
);
427 RGWHTTPClient
*client
;
430 std::lock_guard l
{req_data
->lock
};
432 if (!req_data
->registered
) {
436 client
= req_data
->client
;
441 int ret
= client
->send_data(ptr
, size
* nmemb
, &pause
);
443 dout(5) << "WARNING: client->send_data() returned ret=" << ret
<< dendl
;
444 req_data
->user_ret
= ret
;
445 return CURLE_READ_ERROR
;
450 std::lock_guard l
{req_data
->lock
};
451 req_data
->write_paused
= true;
452 return CURL_READFUNC_PAUSE
;
458 ceph::mutex
& RGWHTTPClient::get_req_lock()
460 return req_data
->lock
;
463 void RGWHTTPClient::_set_write_paused(bool pause
)
465 ceph_assert(ceph_mutex_is_locked(req_data
->lock
));
467 RGWHTTPManager
*mgr
= req_data
->mgr
;
468 if (pause
== req_data
->write_paused
) {
472 mgr
->set_request_state(this, SET_WRITE_PAUSED
);
474 mgr
->set_request_state(this, SET_WRITE_RESUME
);
478 void RGWHTTPClient::_set_read_paused(bool pause
)
480 ceph_assert(ceph_mutex_is_locked(req_data
->lock
));
482 RGWHTTPManager
*mgr
= req_data
->mgr
;
483 if (pause
== req_data
->read_paused
) {
487 mgr
->set_request_state(this, SET_READ_PAUSED
);
489 mgr
->set_request_state(this, SET_READ_RESUME
);
493 static curl_slist
*headers_to_slist(param_vec_t
& headers
)
495 curl_slist
*h
= NULL
;
497 param_vec_t::iterator iter
;
498 for (iter
= headers
.begin(); iter
!= headers
.end(); ++iter
) {
499 pair
<string
, string
>& p
= *iter
;
500 string val
= p
.first
;
502 if (strncmp(val
.c_str(), "HTTP_", 5) == 0) {
506 /* we need to convert all underscores into dashes as some web servers forbid them
507 * in the http header field names
509 for (size_t i
= 0; i
< val
.size(); i
++) {
515 val
= camelcase_dash_http_attr(val
);
517 // curl won't send headers with empty values unless it ends with a ; instead
518 if (p
.second
.empty()) {
522 val
.append(p
.second
);
524 h
= curl_slist_append(h
, val
.c_str());
530 static bool is_upload_request(const string
& method
)
532 return method
== "POST" || method
== "PUT";
536 * process a single simple one off request
538 int RGWHTTPClient::process(optional_yield y
)
540 return RGWHTTP::process(this, y
);
543 string
RGWHTTPClient::to_str()
545 string method_str
= (method
.empty() ? "<no-method>" : method
);
546 string url_str
= (url
.empty() ? "<no-url>" : url
);
547 return method_str
+ " " + url_str
;
550 int RGWHTTPClient::get_req_retcode()
556 return req_data
->get_retcode();
560 * init request, will be used later with RGWHTTPManager
562 int RGWHTTPClient::init_request(rgw_http_req_data
*_req_data
)
564 ceph_assert(!req_data
);
566 req_data
= _req_data
;
568 req_data
->curl_handle
= do_curl_easy_init();
570 CURL
*easy_handle
= req_data
->get_easy_handle();
572 dout(20) << "sending request to " << url
<< dendl
;
574 curl_slist
*h
= headers_to_slist(headers
);
578 curl_easy_setopt(easy_handle
, CURLOPT_CUSTOMREQUEST
, method
.c_str());
579 curl_easy_setopt(easy_handle
, CURLOPT_URL
, url
.c_str());
580 curl_easy_setopt(easy_handle
, CURLOPT_NOPROGRESS
, 1L);
581 curl_easy_setopt(easy_handle
, CURLOPT_NOSIGNAL
, 1L);
582 curl_easy_setopt(easy_handle
, CURLOPT_HEADERFUNCTION
, receive_http_header
);
583 curl_easy_setopt(easy_handle
, CURLOPT_WRITEHEADER
, (void *)req_data
);
584 curl_easy_setopt(easy_handle
, CURLOPT_WRITEFUNCTION
, receive_http_data
);
585 curl_easy_setopt(easy_handle
, CURLOPT_WRITEDATA
, (void *)req_data
);
586 curl_easy_setopt(easy_handle
, CURLOPT_ERRORBUFFER
, (void *)req_data
->error_buf
);
587 curl_easy_setopt(easy_handle
, CURLOPT_LOW_SPEED_TIME
, cct
->_conf
->rgw_curl_low_speed_time
);
588 curl_easy_setopt(easy_handle
, CURLOPT_LOW_SPEED_LIMIT
, cct
->_conf
->rgw_curl_low_speed_limit
);
589 curl_easy_setopt(easy_handle
, CURLOPT_TCP_KEEPALIVE
, cct
->_conf
->rgw_curl_tcp_keepalive
);
590 curl_easy_setopt(easy_handle
, CURLOPT_READFUNCTION
, send_http_data
);
591 curl_easy_setopt(easy_handle
, CURLOPT_READDATA
, (void *)req_data
);
592 curl_easy_setopt(easy_handle
, CURLOPT_BUFFERSIZE
, cct
->_conf
->rgw_curl_buffersize
);
593 if (send_data_hint
|| is_upload_request(method
)) {
594 curl_easy_setopt(easy_handle
, CURLOPT_UPLOAD
, 1L);
597 // TODO: prevent overflow by using curl_off_t
598 // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
599 const long size
= send_len
;
600 curl_easy_setopt(easy_handle
, CURLOPT_INFILESIZE
, size
);
601 if (method
== "POST") {
602 curl_easy_setopt(easy_handle
, CURLOPT_POSTFIELDSIZE
, size
);
603 // TODO: set to size smaller than 1MB should prevent the "Expect" field
604 // from being sent. So explicit removal is not needed
605 h
= curl_slist_append(h
, "Expect:");
609 if (method
== "HEAD") {
610 curl_easy_setopt(easy_handle
, CURLOPT_NOBODY
, 1L);
614 curl_easy_setopt(easy_handle
, CURLOPT_HTTPHEADER
, (void *)h
);
617 curl_easy_setopt(easy_handle
, CURLOPT_SSL_VERIFYPEER
, 0L);
618 curl_easy_setopt(easy_handle
, CURLOPT_SSL_VERIFYHOST
, 0L);
619 dout(20) << "ssl verification is set to off" << dendl
;
621 if (!ca_path
.empty()) {
622 curl_easy_setopt(easy_handle
, CURLOPT_CAINFO
, ca_path
.c_str());
623 dout(20) << "using custom ca cert "<< ca_path
.c_str() << " for ssl" << dendl
;
625 if (!client_cert
.empty()) {
626 if (!client_key
.empty()) {
627 curl_easy_setopt(easy_handle
, CURLOPT_SSLCERT
, client_cert
.c_str());
628 curl_easy_setopt(easy_handle
, CURLOPT_SSLKEY
, client_key
.c_str());
629 dout(20) << "using custom client cert " << client_cert
.c_str()
630 << " and private key " << client_key
.c_str() << dendl
;
632 dout(5) << "private key is missing for client certificate" << dendl
;
636 curl_easy_setopt(easy_handle
, CURLOPT_PRIVATE
, (void *)req_data
);
637 curl_easy_setopt(easy_handle
, CURLOPT_TIMEOUT
, req_timeout
);
642 bool RGWHTTPClient::is_done()
644 return req_data
->is_done();
648 * wait for async request to complete
650 int RGWHTTPClient::wait(optional_yield y
)
652 return req_data
->wait(y
);
655 void RGWHTTPClient::cancel()
658 RGWHTTPManager
*http_manager
= req_data
->mgr
;
660 http_manager
->remove_request(this);
665 RGWHTTPClient::~RGWHTTPClient()
674 int RGWHTTPHeadersCollector::receive_header(void * const ptr
, const size_t len
)
676 const std::string_view
header_line(static_cast<const char *>(ptr
), len
);
678 /* We're tokening the line that way due to backward compatibility. */
679 const size_t sep_loc
= header_line
.find_first_of(" \t:");
681 if (std::string_view::npos
== sep_loc
) {
682 /* Wrongly formatted header? Just skip it. */
686 header_name_t
name(header_line
.substr(0, sep_loc
));
687 if (0 == relevant_headers
.count(name
)) {
688 /* Not interested in this particular header. */
692 const auto value_part
= header_line
.substr(sep_loc
+ 1);
694 /* Skip spaces and tabs after the separator. */
695 const size_t val_loc_s
= value_part
.find_first_not_of(' ');
696 const size_t val_loc_e
= value_part
.find_first_of("\r\n");
698 if (std::string_view::npos
== val_loc_s
||
699 std::string_view::npos
== val_loc_e
) {
700 /* Empty value case. */
701 found_headers
.emplace(name
, header_value_t());
703 found_headers
.emplace(name
, header_value_t(
704 value_part
.substr(val_loc_s
, val_loc_e
- val_loc_s
)));
710 int RGWHTTPTransceiver::send_data(void* ptr
, size_t len
, bool* pause
)
712 int length_to_copy
= 0;
713 if (post_data_index
< post_data
.length()) {
714 length_to_copy
= min(post_data
.length() - post_data_index
, len
);
715 memcpy(ptr
, post_data
.data() + post_data_index
, length_to_copy
);
716 post_data_index
+= length_to_copy
;
718 return length_to_copy
;
722 static int clear_signal(int fd
)
724 // since we're in non-blocking mode, we can try to read a lot more than
725 // one signal from signal_thread() to avoid later wakeups
726 std::array
<char, 256> buf
{};
727 int ret
= ::read(fd
, (void *)buf
.data(), buf
.size());
730 return ret
== -EAGAIN
? 0 : ret
; // clear EAGAIN
735 static int do_curl_wait(CephContext
*cct
, CURLM
*handle
, int signal_fd
)
738 struct curl_waitfd wait_fd
;
740 wait_fd
.fd
= signal_fd
;
741 wait_fd
.events
= CURL_WAIT_POLLIN
;
744 int ret
= curl_multi_wait(handle
, &wait_fd
, 1, cct
->_conf
->rgw_curl_wait_timeout_ms
, &num_fds
);
746 ldout(cct
, 0) << "ERROR: curl_multi_wait() returned " << ret
<< dendl
;
750 if (wait_fd
.revents
> 0) {
751 ret
= clear_signal(signal_fd
);
753 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): read() returned " << ret
<< dendl
;
760 void *RGWHTTPManager::ReqsThread::entry()
762 manager
->reqs_thread_entry();
767 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
769 RGWHTTPManager::RGWHTTPManager(CephContext
*_cct
, RGWCompletionManager
*_cm
) : cct(_cct
),
772 multi_handle
= (void *)curl_multi_init();
777 RGWHTTPManager::~RGWHTTPManager() {
780 curl_multi_cleanup((CURLM
*)multi_handle
);
783 void RGWHTTPManager::register_request(rgw_http_req_data
*req_data
)
785 std::unique_lock rl
{reqs_lock
};
786 req_data
->id
= num_reqs
;
787 req_data
->registered
= true;
788 reqs
[num_reqs
] = req_data
;
790 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", curl_handle=" << req_data
->curl_handle
<< dendl
;
793 bool RGWHTTPManager::unregister_request(rgw_http_req_data
*req_data
)
795 std::unique_lock rl
{reqs_lock
};
796 if (!req_data
->registered
) {
800 req_data
->registered
= false;
801 unregistered_reqs
.push_back(req_data
);
802 ldout(cct
, 20) << __func__
<< " mgr=" << this << " req_data->id=" << req_data
->id
<< ", curl_handle=" << req_data
->curl_handle
<< dendl
;
806 void RGWHTTPManager::complete_request(rgw_http_req_data
*req_data
)
808 std::unique_lock rl
{reqs_lock
};
809 _complete_request(req_data
);
812 void RGWHTTPManager::_complete_request(rgw_http_req_data
*req_data
)
814 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(req_data
->id
);
815 if (iter
!= reqs
.end()) {
819 std::lock_guard l
{req_data
->lock
};
820 req_data
->mgr
= nullptr;
822 if (completion_mgr
) {
823 completion_mgr
->complete(NULL
, req_data
->control_io_id
, req_data
->user_info
);
829 void RGWHTTPManager::finish_request(rgw_http_req_data
*req_data
, int ret
, long http_status
)
831 req_data
->finish(ret
, http_status
);
832 complete_request(req_data
);
835 void RGWHTTPManager::_finish_request(rgw_http_req_data
*req_data
, int ret
)
837 req_data
->finish(ret
);
838 _complete_request(req_data
);
841 void RGWHTTPManager::_set_req_state(set_state
& ss
)
843 ss
.req
->set_state(ss
.bitmask
);
846 * hook request to the curl multi handle
848 int RGWHTTPManager::link_request(rgw_http_req_data
*req_data
)
850 ldout(cct
, 20) << __func__
<< " req_data=" << req_data
<< " req_data->id=" << req_data
->id
<< ", curl_handle=" << req_data
->curl_handle
<< dendl
;
851 CURLMcode mstatus
= curl_multi_add_handle((CURLM
*)multi_handle
, req_data
->get_easy_handle());
853 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus
<< dendl
;
860 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
861 * there will be no more processing on this request
863 void RGWHTTPManager::_unlink_request(rgw_http_req_data
*req_data
)
865 if (req_data
->curl_handle
) {
866 curl_multi_remove_handle((CURLM
*)multi_handle
, req_data
->get_easy_handle());
868 if (!req_data
->is_done()) {
869 _finish_request(req_data
, -ECANCELED
);
873 void RGWHTTPManager::unlink_request(rgw_http_req_data
*req_data
)
875 std::unique_lock wl
{reqs_lock
};
876 _unlink_request(req_data
);
879 void RGWHTTPManager::manage_pending_requests()
881 reqs_lock
.lock_shared();
882 if (max_threaded_req
== num_reqs
&&
883 unregistered_reqs
.empty() &&
884 reqs_change_state
.empty()) {
885 reqs_lock
.unlock_shared();
888 reqs_lock
.unlock_shared();
890 std::unique_lock wl
{reqs_lock
};
892 if (!reqs_change_state
.empty()) {
893 for (auto siter
: reqs_change_state
) {
894 _set_req_state(siter
);
896 reqs_change_state
.clear();
899 if (!unregistered_reqs
.empty()) {
900 for (auto& r
: unregistered_reqs
) {
905 unregistered_reqs
.clear();
908 map
<uint64_t, rgw_http_req_data
*>::iterator iter
= reqs
.find(max_threaded_req
);
910 list
<std::pair
<rgw_http_req_data
*, int> > remove_reqs
;
912 for (; iter
!= reqs
.end(); ++iter
) {
913 rgw_http_req_data
*req_data
= iter
->second
;
914 int r
= link_request(req_data
);
916 ldout(cct
, 0) << "ERROR: failed to link http request" << dendl
;
917 remove_reqs
.push_back(std::make_pair(iter
->second
, r
));
919 max_threaded_req
= iter
->first
+ 1;
923 for (auto piter
: remove_reqs
) {
924 rgw_http_req_data
*req_data
= piter
.first
;
925 int r
= piter
.second
;
927 _finish_request(req_data
, r
);
931 int RGWHTTPManager::add_request(RGWHTTPClient
*client
)
933 rgw_http_req_data
*req_data
= new rgw_http_req_data
;
935 int ret
= client
->init_request(req_data
);
942 req_data
->mgr
= this;
943 req_data
->client
= client
;
944 req_data
->control_io_id
= client
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL
);
945 req_data
->user_info
= client
->get_io_user_info();
947 register_request(req_data
);
950 ret
= link_request(req_data
);
957 ret
= signal_thread();
959 finish_request(req_data
, ret
);
965 int RGWHTTPManager::remove_request(RGWHTTPClient
*client
)
967 rgw_http_req_data
*req_data
= client
->get_req_data();
970 unlink_request(req_data
);
973 if (!unregister_request(req_data
)) {
976 int ret
= signal_thread();
984 int RGWHTTPManager::set_request_state(RGWHTTPClient
*client
, RGWHTTPRequestSetState state
)
986 rgw_http_req_data
*req_data
= client
->get_req_data();
988 ceph_assert(ceph_mutex_is_locked(req_data
->lock
));
990 /* can only do that if threaded */
995 bool suggested_wr_paused
= req_data
->write_paused
;
996 bool suggested_rd_paused
= req_data
->read_paused
;
999 case SET_WRITE_PAUSED
:
1000 suggested_wr_paused
= true;
1002 case SET_WRITE_RESUME
:
1003 suggested_wr_paused
= false;
1005 case SET_READ_PAUSED
:
1006 suggested_rd_paused
= true;
1008 case SET_READ_RESUME
:
1009 suggested_rd_paused
= false;
1012 /* shouldn't really be here */
1015 if (suggested_wr_paused
== req_data
->write_paused
&&
1016 suggested_rd_paused
== req_data
->read_paused
) {
1020 req_data
->write_paused
= suggested_wr_paused
;
1021 req_data
->read_paused
= suggested_rd_paused
;
1023 int bitmask
= CURLPAUSE_CONT
;
1025 if (req_data
->write_paused
) {
1026 bitmask
|= CURLPAUSE_SEND
;
1029 if (req_data
->read_paused
) {
1030 bitmask
|= CURLPAUSE_RECV
;
1033 reqs_change_state
.push_back(set_state(req_data
, bitmask
));
1034 int ret
= signal_thread();
1042 int RGWHTTPManager::start()
1044 if (pipe_cloexec(thread_pipe
, 0) < 0) {
1046 ldout(cct
, 0) << "ERROR: pipe(): " << cpp_strerror(e
) << dendl
;
1050 // enable non-blocking reads
1051 if (::fcntl(thread_pipe
[0], F_SETFL
, O_NONBLOCK
) < 0) {
1053 ldout(cct
, 0) << "ERROR: fcntl(): " << cpp_strerror(e
) << dendl
;
1054 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
1055 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
1060 reqs_thread
= new ReqsThread(this);
1061 reqs_thread
->create("http_manager");
1065 void RGWHTTPManager::stop()
1076 reqs_thread
->join();
1078 TEMP_FAILURE_RETRY(::close(thread_pipe
[1]));
1079 TEMP_FAILURE_RETRY(::close(thread_pipe
[0]));
1083 int RGWHTTPManager::signal_thread()
1086 int ret
= write(thread_pipe
[1], (void *)&buf
, sizeof(buf
));
1089 ldout(cct
, 0) << "ERROR: " << __func__
<< ": write() returned ret=" << ret
<< dendl
;
1095 void *RGWHTTPManager::reqs_thread_entry()
1100 ldout(cct
, 20) << __func__
<< ": start" << dendl
;
1102 while (!going_down
) {
1103 int ret
= do_curl_wait(cct
, (CURLM
*)multi_handle
, thread_pipe
[0]);
1105 dout(0) << "ERROR: do_curl_wait() returned: " << ret
<< dendl
;
1109 manage_pending_requests();
1111 mstatus
= curl_multi_perform((CURLM
*)multi_handle
, &still_running
);
1114 case CURLM_CALL_MULTI_PERFORM
:
1117 dout(10) << "curl_multi_perform returned: " << mstatus
<< dendl
;
1122 while ((msg
= curl_multi_info_read((CURLM
*)multi_handle
, &msgs_left
))) {
1123 if (msg
->msg
== CURLMSG_DONE
) {
1124 int result
= msg
->data
.result
;
1125 CURL
*e
= msg
->easy_handle
;
1126 rgw_http_req_data
*req_data
;
1127 curl_easy_getinfo(e
, CURLINFO_PRIVATE
, (void **)&req_data
);
1128 curl_multi_remove_handle((CURLM
*)multi_handle
, e
);
1132 if (!req_data
->user_ret
) {
1133 curl_easy_getinfo(e
, CURLINFO_RESPONSE_CODE
, (void **)&http_status
);
1135 status
= rgw_http_error_to_errno(http_status
);
1136 if (result
!= CURLE_OK
&& status
== 0) {
1137 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode
)result
) << ", maybe network unstable" << dendl
;
1141 status
= *req_data
->user_ret
;
1143 set_req_state_err(err
, status
, 0);
1144 http_status
= err
.http_ret
;
1146 int id
= req_data
->id
;
1147 finish_request(req_data
, status
, http_status
);
1151 case CURLE_OPERATION_TIMEDOUT
:
1152 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1153 << cct
->_conf
->rgw_curl_low_speed_limit
<< " Bytes per second during " << cct
->_conf
->rgw_curl_low_speed_time
<< " seconds." << dendl
;
1155 dout(20) << "ERROR: msg->data.result=" << result
<< " req_data->id=" << id
<< " http_status=" << http_status
<< dendl
;
1156 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode
)result
) << " req_data->error_buf=" << req_data
->error_buf
<< dendl
;
1164 std::unique_lock rl
{reqs_lock
};
1165 for (auto r
: unregistered_reqs
) {
1169 unregistered_reqs
.clear();
1171 auto all_reqs
= std::move(reqs
);
1172 for (auto iter
: all_reqs
) {
1173 _unlink_request(iter
.second
);
1178 if (completion_mgr
) {
1179 completion_mgr
->go_down();
1185 void rgw_http_client_init(CephContext
*cct
)
1187 curl_global_init(CURL_GLOBAL_ALL
);
1188 rgw_http_manager
= new RGWHTTPManager(cct
);
1189 rgw_http_manager
->start();
1192 void rgw_http_client_cleanup()
1194 rgw_http_manager
->stop();
1195 delete rgw_http_manager
;
1196 curl_global_cleanup();
1200 int RGWHTTP::send(RGWHTTPClient
*req
) {
1204 int r
= rgw_http_manager
->add_request(req
);
1212 int RGWHTTP::process(RGWHTTPClient
*req
, optional_yield y
) {
1221 return req
->wait(y
);