]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7
8 #include <curl/curl.h>
9 #include <curl/easy.h>
10 #include <curl/multi.h>
11
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/async/completion.h"
16 #include "common/RefCountedObj.h"
17
18 #include "rgw_coroutine.h"
19 #include "rgw_tools.h"
20
21 #include <atomic>
22 #include <string_view>
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rgw
26
27 using namespace std;
28
29 RGWHTTPManager *rgw_http_manager;
30
31 struct RGWCurlHandle;
32
33 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
34
35 struct rgw_http_req_data : public RefCountedObject {
36 RGWCurlHandle *curl_handle{nullptr};
37 curl_slist *h{nullptr};
38 uint64_t id;
39 int ret{0};
40 std::atomic<bool> done = { false };
41 RGWHTTPClient *client{nullptr};
42 rgw_io_id control_io_id;
43 void *user_info{nullptr};
44 bool registered{false};
45 RGWHTTPManager *mgr{nullptr};
46 char error_buf[CURL_ERROR_SIZE];
47 bool write_paused{false};
48 bool read_paused{false};
49
50 optional<int> user_ret;
51
52 ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
53 ceph::condition_variable cond;
54
55 using Signature = void(boost::system::error_code);
56 using Completion = ceph::async::Completion<Signature>;
57 std::unique_ptr<Completion> completion;
58
59 rgw_http_req_data() : id(-1) {
60 // FIPS zeroization audit 20191115: this memset is not security related.
61 memset(error_buf, 0, sizeof(error_buf));
62 }
63
64 template <typename ExecutionContext, typename CompletionToken>
65 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
66 boost::asio::async_completion<CompletionToken, Signature> init(token);
67 auto& handler = init.completion_handler;
68 {
69 std::unique_lock l{lock};
70 completion = Completion::create(ctx.get_executor(), std::move(handler));
71 }
72 return init.result.get();
73 }
74
75 int wait(optional_yield y) {
76 if (done) {
77 return ret;
78 }
79 if (y) {
80 auto& context = y.get_io_context();
81 auto& yield = y.get_yield_context();
82 boost::system::error_code ec;
83 async_wait(context, yield[ec]);
84 return -ec.value();
85 }
86 // work on asio threads should be asynchronous, so warn when they block
87 if (is_asio_thread) {
88 dout(20) << "WARNING: blocking http request" << dendl;
89 }
90 std::unique_lock l{lock};
91 cond.wait(l, [this]{return done==true;});
92 return ret;
93 }
94
95 void set_state(int bitmask);
96
97 void finish(int r, long http_status = -1) {
98 std::lock_guard l{lock};
99 if (http_status != -1) {
100 if (client) {
101 client->set_http_status(http_status);
102 }
103 }
104 ret = r;
105 if (curl_handle)
106 do_curl_easy_cleanup(curl_handle);
107
108 if (h)
109 curl_slist_free_all(h);
110
111 curl_handle = NULL;
112 h = NULL;
113 done = true;
114 if (completion) {
115 boost::system::error_code ec(-ret, boost::system::system_category());
116 Completion::post(std::move(completion), ec);
117 } else {
118 cond.notify_all();
119 }
120 }
121
122 bool is_done() {
123 return done;
124 }
125
126 int get_retcode() {
127 std::lock_guard l{lock};
128 return ret;
129 }
130
131 RGWHTTPManager *get_manager() {
132 std::lock_guard l{lock};
133 return mgr;
134 }
135
136 CURL *get_easy_handle() const;
137 };
138
139 struct RGWCurlHandle {
140 int uses;
141 mono_time lastuse;
142 CURL* h;
143
144 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
145 CURL* operator*() {
146 return this->h;
147 }
148 };
149
150 void rgw_http_req_data::set_state(int bitmask) {
151 /* no need to lock here, moreover curl_easy_pause() might trigger
152 * the data receive callback :/
153 */
154 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
155 if (rc != CURLE_OK) {
156 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
157 }
158 }
159
160 #define MAXIDLE 5
161 class RGWCurlHandles : public Thread {
162 public:
163 ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
164 std::vector<RGWCurlHandle*> saved_curl;
165 int cleaner_shutdown;
166 ceph::condition_variable cleaner_cond;
167
168 RGWCurlHandles() :
169 cleaner_shutdown{0} {
170 }
171
172 RGWCurlHandle* get_curl_handle();
173 void release_curl_handle_now(RGWCurlHandle* curl);
174 void release_curl_handle(RGWCurlHandle* curl);
175 void flush_curl_handles();
176 void* entry();
177 void stop();
178 };
179
180 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
181 RGWCurlHandle* curl = 0;
182 CURL* h;
183 {
184 std::lock_guard lock{cleaner_lock};
185 if (!saved_curl.empty()) {
186 curl = *saved_curl.begin();
187 saved_curl.erase(saved_curl.begin());
188 }
189 }
190 if (curl) {
191 } else if ((h = curl_easy_init())) {
192 curl = new RGWCurlHandle{h};
193 } else {
194 // curl = 0;
195 }
196 return curl;
197 }
198
199 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
200 {
201 curl_easy_cleanup(**curl);
202 delete curl;
203 }
204
205 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
206 {
207 if (cleaner_shutdown) {
208 release_curl_handle_now(curl);
209 } else {
210 curl_easy_reset(**curl);
211 std::lock_guard lock{cleaner_lock};
212 curl->lastuse = mono_clock::now();
213 saved_curl.insert(saved_curl.begin(), 1, curl);
214 }
215 }
216
217 void* RGWCurlHandles::entry()
218 {
219 RGWCurlHandle* curl;
220 std::unique_lock lock{cleaner_lock};
221
222 for (;;) {
223 if (cleaner_shutdown) {
224 if (saved_curl.empty())
225 break;
226 } else {
227 cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
228 }
229 mono_time now = mono_clock::now();
230 while (!saved_curl.empty()) {
231 auto cend = saved_curl.end();
232 --cend;
233 curl = *cend;
234 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
235 break;
236 saved_curl.erase(cend);
237 release_curl_handle_now(curl);
238 }
239 }
240 return nullptr;
241 }
242
243 void RGWCurlHandles::stop()
244 {
245 std::lock_guard lock{cleaner_lock};
246 cleaner_shutdown = 1;
247 cleaner_cond.notify_all();
248 }
249
250 void RGWCurlHandles::flush_curl_handles()
251 {
252 stop();
253 join();
254 if (!saved_curl.empty()) {
255 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
256 }
257 saved_curl.shrink_to_fit();
258 }
259
260 CURL *rgw_http_req_data::get_easy_handle() const
261 {
262 return **curl_handle;
263 }
264
265 static RGWCurlHandles *handles;
266
267 static RGWCurlHandle *do_curl_easy_init()
268 {
269 return handles->get_curl_handle();
270 }
271
272 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
273 {
274 handles->release_curl_handle(curl_handle);
275 }
276
277 // XXX make this part of the token cache? (but that's swift-only;
278 // and this especially needs to integrates with s3...)
279
280 void rgw_setup_saved_curl_handles()
281 {
282 handles = new RGWCurlHandles();
283 handles->create("rgw_curl");
284 }
285
286 void rgw_release_all_curl_handles()
287 {
288 handles->flush_curl_handles();
289 delete handles;
290 }
291
292 void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
293 {
294 if (id == 0) {
295 id = io_id_provider.get_next();
296 }
297 }
298
299 RGWHTTPClient::RGWHTTPClient(CephContext *cct,
300 const string& _method,
301 const string& _url)
302 : NoDoutPrefix(cct, dout_subsys),
303 has_send_len(false),
304 http_status(HTTP_STATUS_NOSTATUS),
305 req_data(nullptr),
306 verify_ssl(cct->_conf->rgw_verify_ssl),
307 cct(cct),
308 method(_method),
309 url(_url) {
310 init();
311 }
312
313 std::ostream& RGWHTTPClient::gen_prefix(std::ostream& out) const
314 {
315 out << "http_client[" << method << "/" << url << "]";
316 return out;
317 }
318
319 void RGWHTTPClient::init()
320 {
321 auto pos = url.find("://");
322 if (pos == string::npos) {
323 host = url;
324 return;
325 }
326
327 protocol = url.substr(0, pos);
328
329 pos += 3;
330
331 auto host_end_pos = url.find("/", pos);
332 if (host_end_pos == string::npos) {
333 host = url.substr(pos);
334 return;
335 }
336
337 host = url.substr(pos, host_end_pos - pos);
338 resource_prefix = url.substr(host_end_pos + 1);
339 if (resource_prefix.size() > 0 && resource_prefix[resource_prefix.size() - 1] != '/') {
340 resource_prefix.append("/");
341 }
342 }
343
344 /*
345 * the following set of callbacks will be called either on RGWHTTPManager::process(),
346 * or via the RGWHTTPManager async processing.
347 */
348 size_t RGWHTTPClient::receive_http_header(void * const ptr,
349 const size_t size,
350 const size_t nmemb,
351 void * const _info)
352 {
353 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
354 size_t len = size * nmemb;
355
356 std::lock_guard l{req_data->lock};
357
358 if (!req_data->registered) {
359 return len;
360 }
361
362 int ret = req_data->client->receive_header(ptr, size * nmemb);
363 if (ret < 0) {
364 dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
365 req_data->user_ret = ret;
366 return CURLE_WRITE_ERROR;
367 }
368
369 return len;
370 }
371
372 size_t RGWHTTPClient::receive_http_data(void * const ptr,
373 const size_t size,
374 const size_t nmemb,
375 void * const _info)
376 {
377 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
378 size_t len = size * nmemb;
379
380 bool pause = false;
381
382 RGWHTTPClient *client;
383
384 {
385 std::lock_guard l{req_data->lock};
386 if (!req_data->registered) {
387 return len;
388 }
389
390 client = req_data->client;
391 }
392
393 size_t& skip_bytes = client->receive_pause_skip;
394
395 if (skip_bytes >= len) {
396 skip_bytes -= len;
397 return len;
398 }
399
400 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
401 if (ret < 0) {
402 dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
403 req_data->user_ret = ret;
404 return CURLE_WRITE_ERROR;
405 }
406
407 if (pause) {
408 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
409 skip_bytes = len;
410 std::lock_guard l{req_data->lock};
411 req_data->read_paused = true;
412 return CURL_WRITEFUNC_PAUSE;
413 }
414
415 skip_bytes = 0;
416
417 return len;
418 }
419
420 size_t RGWHTTPClient::send_http_data(void * const ptr,
421 const size_t size,
422 const size_t nmemb,
423 void * const _info)
424 {
425 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
426
427 RGWHTTPClient *client;
428
429 {
430 std::lock_guard l{req_data->lock};
431
432 if (!req_data->registered) {
433 return 0;
434 }
435
436 client = req_data->client;
437 }
438
439 bool pause = false;
440
441 int ret = client->send_data(ptr, size * nmemb, &pause);
442 if (ret < 0) {
443 dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl;
444 req_data->user_ret = ret;
445 return CURLE_READ_ERROR;
446 }
447
448 if (ret == 0 &&
449 pause) {
450 std::lock_guard l{req_data->lock};
451 req_data->write_paused = true;
452 return CURL_READFUNC_PAUSE;
453 }
454
455 return ret;
456 }
457
458 ceph::mutex& RGWHTTPClient::get_req_lock()
459 {
460 return req_data->lock;
461 }
462
463 void RGWHTTPClient::_set_write_paused(bool pause)
464 {
465 ceph_assert(ceph_mutex_is_locked(req_data->lock));
466
467 RGWHTTPManager *mgr = req_data->mgr;
468 if (pause == req_data->write_paused) {
469 return;
470 }
471 if (pause) {
472 mgr->set_request_state(this, SET_WRITE_PAUSED);
473 } else {
474 mgr->set_request_state(this, SET_WRITE_RESUME);
475 }
476 }
477
478 void RGWHTTPClient::_set_read_paused(bool pause)
479 {
480 ceph_assert(ceph_mutex_is_locked(req_data->lock));
481
482 RGWHTTPManager *mgr = req_data->mgr;
483 if (pause == req_data->read_paused) {
484 return;
485 }
486 if (pause) {
487 mgr->set_request_state(this, SET_READ_PAUSED);
488 } else {
489 mgr->set_request_state(this, SET_READ_RESUME);
490 }
491 }
492
493 static curl_slist *headers_to_slist(param_vec_t& headers)
494 {
495 curl_slist *h = NULL;
496
497 param_vec_t::iterator iter;
498 for (iter = headers.begin(); iter != headers.end(); ++iter) {
499 pair<string, string>& p = *iter;
500 string val = p.first;
501
502 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
503 val = val.substr(5);
504 }
505
506 /* we need to convert all underscores into dashes as some web servers forbid them
507 * in the http header field names
508 */
509 for (size_t i = 0; i < val.size(); i++) {
510 if (val[i] == '_') {
511 val[i] = '-';
512 }
513 }
514
515 val = camelcase_dash_http_attr(val);
516
517 // curl won't send headers with empty values unless it ends with a ; instead
518 if (p.second.empty()) {
519 val.append(1, ';');
520 } else {
521 val.append(": ");
522 val.append(p.second);
523 }
524 h = curl_slist_append(h, val.c_str());
525 }
526
527 return h;
528 }
529
530 static bool is_upload_request(const string& method)
531 {
532 return method == "POST" || method == "PUT";
533 }
534
535 /*
536 * process a single simple one off request
537 */
538 int RGWHTTPClient::process(optional_yield y)
539 {
540 return RGWHTTP::process(this, y);
541 }
542
543 string RGWHTTPClient::to_str()
544 {
545 string method_str = (method.empty() ? "<no-method>" : method);
546 string url_str = (url.empty() ? "<no-url>" : url);
547 return method_str + " " + url_str;
548 }
549
550 int RGWHTTPClient::get_req_retcode()
551 {
552 if (!req_data) {
553 return -EINVAL;
554 }
555
556 return req_data->get_retcode();
557 }
558
559 /*
560 * init request, will be used later with RGWHTTPManager
561 */
562 int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
563 {
564 ceph_assert(!req_data);
565 _req_data->get();
566 req_data = _req_data;
567
568 req_data->curl_handle = do_curl_easy_init();
569
570 CURL *easy_handle = req_data->get_easy_handle();
571
572 dout(20) << "sending request to " << url << dendl;
573
574 curl_slist *h = headers_to_slist(headers);
575
576 req_data->h = h;
577
578 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
579 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
580 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
581 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
582 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
583 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
584 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
585 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
586 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
587 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
588 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
589 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
590 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
591 curl_easy_setopt(easy_handle, CURLOPT_BUFFERSIZE, cct->_conf->rgw_curl_buffersize);
592 if (send_data_hint || is_upload_request(method)) {
593 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
594 }
595 if (has_send_len) {
596 // TODO: prevent overflow by using curl_off_t
597 // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
598 const long size = send_len;
599 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
600 if (method == "POST") {
601 curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size);
602 // TODO: set to size smaller than 1MB should prevent the "Expect" field
603 // from being sent. So explicit removal is not needed
604 h = curl_slist_append(h, "Expect:");
605 }
606 }
607
608 if (method == "HEAD") {
609 curl_easy_setopt(easy_handle, CURLOPT_NOBODY, 1L);
610 }
611
612 if (h) {
613 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
614 }
615 if (!verify_ssl) {
616 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
617 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
618 dout(20) << "ssl verification is set to off" << dendl;
619 } else {
620 if (!ca_path.empty()) {
621 curl_easy_setopt(easy_handle, CURLOPT_CAINFO, ca_path.c_str());
622 dout(20) << "using custom ca cert "<< ca_path.c_str() << " for ssl" << dendl;
623 }
624 if (!client_cert.empty()) {
625 if (!client_key.empty()) {
626 curl_easy_setopt(easy_handle, CURLOPT_SSLCERT, client_cert.c_str());
627 curl_easy_setopt(easy_handle, CURLOPT_SSLKEY, client_key.c_str());
628 dout(20) << "using custom client cert " << client_cert.c_str()
629 << " and private key " << client_key.c_str() << dendl;
630 } else {
631 dout(5) << "private key is missing for client certificate" << dendl;
632 }
633 }
634 }
635 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
636 curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);
637
638 return 0;
639 }
640
641 bool RGWHTTPClient::is_done()
642 {
643 return req_data->is_done();
644 }
645
646 /*
647 * wait for async request to complete
648 */
649 int RGWHTTPClient::wait(optional_yield y)
650 {
651 return req_data->wait(y);
652 }
653
654 void RGWHTTPClient::cancel()
655 {
656 if (req_data) {
657 RGWHTTPManager *http_manager = req_data->mgr;
658 if (http_manager) {
659 http_manager->remove_request(this);
660 }
661 }
662 }
663
664 RGWHTTPClient::~RGWHTTPClient()
665 {
666 cancel();
667 if (req_data) {
668 req_data->put();
669 }
670 }
671
672
673 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
674 {
675 const std::string_view header_line(static_cast<const char *>(ptr), len);
676
677 /* We're tokening the line that way due to backward compatibility. */
678 const size_t sep_loc = header_line.find_first_of(" \t:");
679
680 if (std::string_view::npos == sep_loc) {
681 /* Wrongly formatted header? Just skip it. */
682 return 0;
683 }
684
685 header_name_t name(header_line.substr(0, sep_loc));
686 if (0 == relevant_headers.count(name)) {
687 /* Not interested in this particular header. */
688 return 0;
689 }
690
691 const auto value_part = header_line.substr(sep_loc + 1);
692
693 /* Skip spaces and tabs after the separator. */
694 const size_t val_loc_s = value_part.find_first_not_of(' ');
695 const size_t val_loc_e = value_part.find_first_of("\r\n");
696
697 if (std::string_view::npos == val_loc_s ||
698 std::string_view::npos == val_loc_e) {
699 /* Empty value case. */
700 found_headers.emplace(name, header_value_t());
701 } else {
702 found_headers.emplace(name, header_value_t(
703 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
704 }
705
706 return 0;
707 }
708
709 int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
710 {
711 int length_to_copy = 0;
712 if (post_data_index < post_data.length()) {
713 length_to_copy = min(post_data.length() - post_data_index, len);
714 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
715 post_data_index += length_to_copy;
716 }
717 return length_to_copy;
718 }
719
720
721 static int clear_signal(int fd)
722 {
723 // since we're in non-blocking mode, we can try to read a lot more than
724 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
725 // are also required to support the curl_multi_wait bug workaround
726 std::array<char, 256> buf{};
727 int ret = ::read(fd, (void *)buf.data(), buf.size());
728 if (ret < 0) {
729 ret = -errno;
730 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
731 }
732 return 0;
733 }
734
735 #if HAVE_CURL_MULTI_WAIT
736
737 static std::once_flag detect_flag;
738 static bool curl_multi_wait_bug_present = false;
739
740 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
741 int write_fd, int read_fd)
742 {
743 int ret = 0;
744
745 // write to write_fd so that read_fd becomes readable
746 uint32_t buf = 0;
747 ret = ::write(write_fd, &buf, sizeof(buf));
748 if (ret < 0) {
749 ret = -errno;
750 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
751 return ret;
752 }
753
754 // pass read_fd in extra_fds for curl_multi_wait()
755 int num_fds;
756 struct curl_waitfd wait_fd;
757
758 wait_fd.fd = read_fd;
759 wait_fd.events = CURL_WAIT_POLLIN;
760 wait_fd.revents = 0;
761
762 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
763 if (ret != CURLM_OK) {
764 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
765 return -EIO;
766 }
767
768 // curl_multi_wait should flag revents when extra_fd is readable. if it
769 // doesn't, the bug is present and we can't rely on revents
770 if (wait_fd.revents == 0) {
771 curl_multi_wait_bug_present = true;
772 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
773 "bug in curl_multi_wait(). enabling a workaround that may degrade "
774 "performance slightly." << dendl;
775 }
776
777 return clear_signal(read_fd);
778 }
779
780 static bool is_signaled(const curl_waitfd& wait_fd)
781 {
782 if (wait_fd.fd < 0) {
783 // no fd to signal
784 return false;
785 }
786
787 if (curl_multi_wait_bug_present) {
788 // we can't rely on revents, so we always return true if a wait_fd is given.
789 // this means we'll be trying a non-blocking read on this fd every time that
790 // curl_multi_wait() wakes up
791 return true;
792 }
793
794 return wait_fd.revents > 0;
795 }
796
797 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
798 {
799 int num_fds;
800 struct curl_waitfd wait_fd;
801
802 wait_fd.fd = signal_fd;
803 wait_fd.events = CURL_WAIT_POLLIN;
804 wait_fd.revents = 0;
805
806 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
807 if (ret) {
808 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
809 return -EIO;
810 }
811
812 if (is_signaled(wait_fd)) {
813 ret = clear_signal(signal_fd);
814 if (ret < 0) {
815 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
816 return ret;
817 }
818 }
819 return 0;
820 }
821
822 #else
823
824 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
825 {
826 fd_set fdread;
827 fd_set fdwrite;
828 fd_set fdexcep;
829 int maxfd = -1;
830
831 FD_ZERO(&fdread);
832 FD_ZERO(&fdwrite);
833 FD_ZERO(&fdexcep);
834
835 /* get file descriptors from the transfers */
836 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
837 if (ret) {
838 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
839 return -EIO;
840 }
841
842 if (signal_fd > 0) {
843 FD_SET(signal_fd, &fdread);
844 if (signal_fd >= maxfd) {
845 maxfd = signal_fd + 1;
846 }
847 }
848
849 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
850 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
851 #define RGW_CURL_TIMEOUT 1000
852 if (!to)
853 to = RGW_CURL_TIMEOUT;
854 struct timeval timeout;
855 timeout.tv_sec = to / 1000;
856 timeout.tv_usec = to % 1000;
857
858 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
859 if (ret < 0) {
860 ret = -errno;
861 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
862 return ret;
863 }
864
865 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
866 ret = clear_signal(signal_fd);
867 if (ret < 0) {
868 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
869 return ret;
870 }
871 }
872
873 return 0;
874 }
875
876 #endif
877
878 void *RGWHTTPManager::ReqsThread::entry()
879 {
880 manager->reqs_thread_entry();
881 return NULL;
882 }
883
884 /*
885 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
886 */
887 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
888 completion_mgr(_cm)
889 {
890 multi_handle = (void *)curl_multi_init();
891 thread_pipe[0] = -1;
892 thread_pipe[1] = -1;
893 }
894
895 RGWHTTPManager::~RGWHTTPManager() {
896 stop();
897 if (multi_handle)
898 curl_multi_cleanup((CURLM *)multi_handle);
899 }
900
901 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
902 {
903 std::unique_lock rl{reqs_lock};
904 req_data->id = num_reqs;
905 req_data->registered = true;
906 reqs[num_reqs] = req_data;
907 num_reqs++;
908 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
909 }
910
911 bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
912 {
913 std::unique_lock rl{reqs_lock};
914 if (!req_data->registered) {
915 return false;
916 }
917 req_data->get();
918 req_data->registered = false;
919 unregistered_reqs.push_back(req_data);
920 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
921 return true;
922 }
923
924 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
925 {
926 std::unique_lock rl{reqs_lock};
927 _complete_request(req_data);
928 }
929
930 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
931 {
932 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
933 if (iter != reqs.end()) {
934 reqs.erase(iter);
935 }
936 {
937 std::lock_guard l{req_data->lock};
938 req_data->mgr = nullptr;
939 }
940 if (completion_mgr) {
941 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
942 }
943
944 req_data->put();
945 }
946
947 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status)
948 {
949 req_data->finish(ret, http_status);
950 complete_request(req_data);
951 }
952
953 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
954 {
955 req_data->finish(ret);
956 _complete_request(req_data);
957 }
958
959 void RGWHTTPManager::_set_req_state(set_state& ss)
960 {
961 ss.req->set_state(ss.bitmask);
962 }
963 /*
964 * hook request to the curl multi handle
965 */
966 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
967 {
968 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
969 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
970 if (mstatus) {
971 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
972 return -EIO;
973 }
974 return 0;
975 }
976
977 /*
978 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
979 * there will be no more processing on this request
980 */
981 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
982 {
983 if (req_data->curl_handle) {
984 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
985 }
986 if (!req_data->is_done()) {
987 _finish_request(req_data, -ECANCELED);
988 }
989 }
990
991 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
992 {
993 std::unique_lock wl{reqs_lock};
994 _unlink_request(req_data);
995 }
996
997 void RGWHTTPManager::manage_pending_requests()
998 {
999 reqs_lock.lock_shared();
1000 if (max_threaded_req == num_reqs &&
1001 unregistered_reqs.empty() &&
1002 reqs_change_state.empty()) {
1003 reqs_lock.unlock_shared();
1004 return;
1005 }
1006 reqs_lock.unlock_shared();
1007
1008 std::unique_lock wl{reqs_lock};
1009
1010 if (!reqs_change_state.empty()) {
1011 for (auto siter : reqs_change_state) {
1012 _set_req_state(siter);
1013 }
1014 reqs_change_state.clear();
1015 }
1016
1017 if (!unregistered_reqs.empty()) {
1018 for (auto& r : unregistered_reqs) {
1019 _unlink_request(r);
1020 r->put();
1021 }
1022
1023 unregistered_reqs.clear();
1024 }
1025
1026 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
1027
1028 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
1029
1030 for (; iter != reqs.end(); ++iter) {
1031 rgw_http_req_data *req_data = iter->second;
1032 int r = link_request(req_data);
1033 if (r < 0) {
1034 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
1035 remove_reqs.push_back(std::make_pair(iter->second, r));
1036 } else {
1037 max_threaded_req = iter->first + 1;
1038 }
1039 }
1040
1041 for (auto piter : remove_reqs) {
1042 rgw_http_req_data *req_data = piter.first;
1043 int r = piter.second;
1044
1045 _finish_request(req_data, r);
1046 }
1047 }
1048
1049 int RGWHTTPManager::add_request(RGWHTTPClient *client)
1050 {
1051 rgw_http_req_data *req_data = new rgw_http_req_data;
1052
1053 int ret = client->init_request(req_data);
1054 if (ret < 0) {
1055 req_data->put();
1056 req_data = NULL;
1057 return ret;
1058 }
1059
1060 req_data->mgr = this;
1061 req_data->client = client;
1062 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
1063 req_data->user_info = client->get_io_user_info();
1064
1065 register_request(req_data);
1066
1067 if (!is_started) {
1068 ret = link_request(req_data);
1069 if (ret < 0) {
1070 req_data->put();
1071 req_data = NULL;
1072 }
1073 return ret;
1074 }
1075 ret = signal_thread();
1076 if (ret < 0) {
1077 finish_request(req_data, ret);
1078 }
1079
1080 return ret;
1081 }
1082
1083 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
1084 {
1085 rgw_http_req_data *req_data = client->get_req_data();
1086
1087 if (!is_started) {
1088 unlink_request(req_data);
1089 return 0;
1090 }
1091 if (!unregister_request(req_data)) {
1092 return 0;
1093 }
1094 int ret = signal_thread();
1095 if (ret < 0) {
1096 return ret;
1097 }
1098
1099 return 0;
1100 }
1101
1102 int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
1103 {
1104 rgw_http_req_data *req_data = client->get_req_data();
1105
1106 ceph_assert(ceph_mutex_is_locked(req_data->lock));
1107
1108 /* can only do that if threaded */
1109 if (!is_started) {
1110 return -EINVAL;
1111 }
1112
1113 bool suggested_wr_paused = req_data->write_paused;
1114 bool suggested_rd_paused = req_data->read_paused;
1115
1116 switch (state) {
1117 case SET_WRITE_PAUSED:
1118 suggested_wr_paused = true;
1119 break;
1120 case SET_WRITE_RESUME:
1121 suggested_wr_paused = false;
1122 break;
1123 case SET_READ_PAUSED:
1124 suggested_rd_paused = true;
1125 break;
1126 case SET_READ_RESUME:
1127 suggested_rd_paused = false;
1128 break;
1129 default:
1130 /* shouldn't really be here */
1131 return -EIO;
1132 }
1133 if (suggested_wr_paused == req_data->write_paused &&
1134 suggested_rd_paused == req_data->read_paused) {
1135 return 0;
1136 }
1137
1138 req_data->write_paused = suggested_wr_paused;
1139 req_data->read_paused = suggested_rd_paused;
1140
1141 int bitmask = CURLPAUSE_CONT;
1142
1143 if (req_data->write_paused) {
1144 bitmask |= CURLPAUSE_SEND;
1145 }
1146
1147 if (req_data->read_paused) {
1148 bitmask |= CURLPAUSE_RECV;
1149 }
1150
1151 reqs_change_state.push_back(set_state(req_data, bitmask));
1152 int ret = signal_thread();
1153 if (ret < 0) {
1154 return ret;
1155 }
1156
1157 return 0;
1158 }
1159
1160 int RGWHTTPManager::start()
1161 {
1162 if (pipe_cloexec(thread_pipe, 0) < 0) {
1163 int e = errno;
1164 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1165 return -e;
1166 }
1167
1168 // enable non-blocking reads
1169 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1170 int e = errno;
1171 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1172 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1173 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1174 return -e;
1175 }
1176
1177 #ifdef HAVE_CURL_MULTI_WAIT
1178 // on first initialization, use this pipe to detect whether we're using a
1179 // buggy version of libcurl
1180 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1181 static_cast<CURLM*>(multi_handle),
1182 thread_pipe[1], thread_pipe[0]);
1183 #endif
1184
1185 is_started = true;
1186 reqs_thread = new ReqsThread(this);
1187 reqs_thread->create("http_manager");
1188 return 0;
1189 }
1190
1191 void RGWHTTPManager::stop()
1192 {
1193 if (is_stopped) {
1194 return;
1195 }
1196
1197 is_stopped = true;
1198
1199 if (is_started) {
1200 going_down = true;
1201 signal_thread();
1202 reqs_thread->join();
1203 delete reqs_thread;
1204 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1205 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1206 }
1207 }
1208
1209 int RGWHTTPManager::signal_thread()
1210 {
1211 uint32_t buf = 0;
1212 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1213 if (ret < 0) {
1214 ret = -errno;
1215 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1216 return ret;
1217 }
1218 return 0;
1219 }
1220
1221 void *RGWHTTPManager::reqs_thread_entry()
1222 {
1223 int still_running;
1224 int mstatus;
1225
1226 ldout(cct, 20) << __func__ << ": start" << dendl;
1227
1228 while (!going_down) {
1229 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1230 if (ret < 0) {
1231 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1232 return NULL;
1233 }
1234
1235 manage_pending_requests();
1236
1237 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1238 switch (mstatus) {
1239 case CURLM_OK:
1240 case CURLM_CALL_MULTI_PERFORM:
1241 break;
1242 default:
1243 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1244 break;
1245 }
1246 int msgs_left;
1247 CURLMsg *msg;
1248 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1249 if (msg->msg == CURLMSG_DONE) {
1250 int result = msg->data.result;
1251 CURL *e = msg->easy_handle;
1252 rgw_http_req_data *req_data;
1253 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1254 curl_multi_remove_handle((CURLM *)multi_handle, e);
1255
1256 long http_status;
1257 int status;
1258 if (!req_data->user_ret) {
1259 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1260
1261 status = rgw_http_error_to_errno(http_status);
1262 if (result != CURLE_OK && status == 0) {
1263 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1264 status = -EAGAIN;
1265 }
1266 } else {
1267 status = *req_data->user_ret;
1268 rgw_err err;
1269 set_req_state_err(err, status, 0);
1270 http_status = err.http_ret;
1271 }
1272 int id = req_data->id;
1273 finish_request(req_data, status, http_status);
1274 switch (result) {
1275 case CURLE_OK:
1276 break;
1277 case CURLE_OPERATION_TIMEDOUT:
1278 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1279 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1280 default:
1281 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1282 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << " req_data->error_buf=" << req_data->error_buf << dendl;
1283 break;
1284 }
1285 }
1286 }
1287 }
1288
1289
1290 std::unique_lock rl{reqs_lock};
1291 for (auto r : unregistered_reqs) {
1292 _unlink_request(r);
1293 }
1294
1295 unregistered_reqs.clear();
1296
1297 auto all_reqs = std::move(reqs);
1298 for (auto iter : all_reqs) {
1299 _unlink_request(iter.second);
1300 }
1301
1302 reqs.clear();
1303
1304 if (completion_mgr) {
1305 completion_mgr->go_down();
1306 }
1307
1308 return 0;
1309 }
1310
1311 void rgw_http_client_init(CephContext *cct)
1312 {
1313 curl_global_init(CURL_GLOBAL_ALL);
1314 rgw_http_manager = new RGWHTTPManager(cct);
1315 rgw_http_manager->start();
1316 }
1317
1318 void rgw_http_client_cleanup()
1319 {
1320 rgw_http_manager->stop();
1321 delete rgw_http_manager;
1322 curl_global_cleanup();
1323 }
1324
1325
1326 int RGWHTTP::send(RGWHTTPClient *req) {
1327 if (!req) {
1328 return 0;
1329 }
1330 int r = rgw_http_manager->add_request(req);
1331 if (r < 0) {
1332 return r;
1333 }
1334
1335 return 0;
1336 }
1337
1338 int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
1339 if (!req) {
1340 return 0;
1341 }
1342 int r = send(req);
1343 if (r < 0) {
1344 return r;
1345 }
1346
1347 return req->wait(y);
1348 }
1349