]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7 #include <boost/utility/string_ref.hpp>
8
9 #include <curl/curl.h>
10 #include <curl/easy.h>
11 #include <curl/multi.h>
12
13 #include "rgw_common.h"
14 #include "rgw_http_client.h"
15 #include "rgw_http_errors.h"
16 #include "common/async/completion.h"
17 #include "common/RefCountedObj.h"
18
19 #include "rgw_coroutine.h"
20 #include "rgw_tools.h"
21
22 #include <atomic>
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rgw
26
27 RGWHTTPManager *rgw_http_manager;
28
29 struct RGWCurlHandle;
30
31 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
32
33 struct rgw_http_req_data : public RefCountedObject {
34 RGWCurlHandle *curl_handle{nullptr};
35 curl_slist *h{nullptr};
36 uint64_t id;
37 int ret{0};
38 std::atomic<bool> done = { false };
39 RGWHTTPClient *client{nullptr};
40 rgw_io_id control_io_id;
41 void *user_info{nullptr};
42 bool registered{false};
43 RGWHTTPManager *mgr{nullptr};
44 char error_buf[CURL_ERROR_SIZE];
45 bool write_paused{false};
46 bool read_paused{false};
47
48 optional<int> user_ret;
49
50 ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
51 ceph::condition_variable cond;
52
53 using Signature = void(boost::system::error_code);
54 using Completion = ceph::async::Completion<Signature>;
55 std::unique_ptr<Completion> completion;
56
57 rgw_http_req_data() : id(-1) {
58 // FIPS zeroization audit 20191115: this memset is not security related.
59 memset(error_buf, 0, sizeof(error_buf));
60 }
61
62 template <typename ExecutionContext, typename CompletionToken>
63 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
64 boost::asio::async_completion<CompletionToken, Signature> init(token);
65 auto& handler = init.completion_handler;
66 {
67 std::unique_lock l{lock};
68 completion = Completion::create(ctx.get_executor(), std::move(handler));
69 }
70 return init.result.get();
71 }
72
73 int wait(optional_yield y) {
74 if (done) {
75 return ret;
76 }
77 #ifdef HAVE_BOOST_CONTEXT
78 if (y) {
79 auto& context = y.get_io_context();
80 auto& yield = y.get_yield_context();
81 boost::system::error_code ec;
82 async_wait(context, yield[ec]);
83 return -ec.value();
84 }
85 // work on asio threads should be asynchronous, so warn when they block
86 if (is_asio_thread) {
87 dout(20) << "WARNING: blocking http request" << dendl;
88 }
89 #endif
90 std::unique_lock l{lock};
91 cond.wait(l);
92 return ret;
93 }
94
95 void set_state(int bitmask);
96
97 void finish(int r, long http_status = -1) {
98 std::lock_guard l{lock};
99 if (http_status != -1) {
100 if (client) {
101 client->set_http_status(http_status);
102 }
103 }
104 ret = r;
105 if (curl_handle)
106 do_curl_easy_cleanup(curl_handle);
107
108 if (h)
109 curl_slist_free_all(h);
110
111 curl_handle = NULL;
112 h = NULL;
113 done = true;
114 if (completion) {
115 boost::system::error_code ec(-ret, boost::system::system_category());
116 Completion::post(std::move(completion), ec);
117 } else {
118 cond.notify_all();
119 }
120 }
121
122 bool is_done() {
123 return done;
124 }
125
126 int get_retcode() {
127 std::lock_guard l{lock};
128 return ret;
129 }
130
131 RGWHTTPManager *get_manager() {
132 std::lock_guard l{lock};
133 return mgr;
134 }
135
136 CURL *get_easy_handle() const;
137 };
138
139 struct RGWCurlHandle {
140 int uses;
141 mono_time lastuse;
142 CURL* h;
143
144 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
145 CURL* operator*() {
146 return this->h;
147 }
148 };
149
150 void rgw_http_req_data::set_state(int bitmask) {
151 /* no need to lock here, moreover curl_easy_pause() might trigger
152 * the data receive callback :/
153 */
154 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
155 if (rc != CURLE_OK) {
156 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
157 }
158 }
159
160 #define MAXIDLE 5
161 class RGWCurlHandles : public Thread {
162 public:
163 ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
164 std::vector<RGWCurlHandle*> saved_curl;
165 int cleaner_shutdown;
166 ceph::condition_variable cleaner_cond;
167
168 RGWCurlHandles() :
169 cleaner_shutdown{0} {
170 }
171
172 RGWCurlHandle* get_curl_handle();
173 void release_curl_handle_now(RGWCurlHandle* curl);
174 void release_curl_handle(RGWCurlHandle* curl);
175 void flush_curl_handles();
176 void* entry();
177 void stop();
178 };
179
180 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
181 RGWCurlHandle* curl = 0;
182 CURL* h;
183 {
184 std::lock_guard lock{cleaner_lock};
185 if (!saved_curl.empty()) {
186 curl = *saved_curl.begin();
187 saved_curl.erase(saved_curl.begin());
188 }
189 }
190 if (curl) {
191 } else if ((h = curl_easy_init())) {
192 curl = new RGWCurlHandle{h};
193 } else {
194 // curl = 0;
195 }
196 return curl;
197 }
198
199 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
200 {
201 curl_easy_cleanup(**curl);
202 delete curl;
203 }
204
205 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
206 {
207 if (cleaner_shutdown) {
208 release_curl_handle_now(curl);
209 } else {
210 curl_easy_reset(**curl);
211 std::lock_guard lock{cleaner_lock};
212 curl->lastuse = mono_clock::now();
213 saved_curl.insert(saved_curl.begin(), 1, curl);
214 }
215 }
216
217 void* RGWCurlHandles::entry()
218 {
219 RGWCurlHandle* curl;
220 std::unique_lock lock{cleaner_lock};
221
222 for (;;) {
223 if (cleaner_shutdown) {
224 if (saved_curl.empty())
225 break;
226 } else {
227 cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
228 }
229 mono_time now = mono_clock::now();
230 while (!saved_curl.empty()) {
231 auto cend = saved_curl.end();
232 --cend;
233 curl = *cend;
234 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
235 break;
236 saved_curl.erase(cend);
237 release_curl_handle_now(curl);
238 }
239 }
240 return nullptr;
241 }
242
243 void RGWCurlHandles::stop()
244 {
245 std::lock_guard lock{cleaner_lock};
246 cleaner_shutdown = 1;
247 cleaner_cond.notify_all();
248 }
249
250 void RGWCurlHandles::flush_curl_handles()
251 {
252 stop();
253 join();
254 if (!saved_curl.empty()) {
255 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
256 }
257 saved_curl.shrink_to_fit();
258 }
259
260 CURL *rgw_http_req_data::get_easy_handle() const
261 {
262 return **curl_handle;
263 }
264
265 static RGWCurlHandles *handles;
266
267 static RGWCurlHandle *do_curl_easy_init()
268 {
269 return handles->get_curl_handle();
270 }
271
272 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
273 {
274 handles->release_curl_handle(curl_handle);
275 }
276
277 // XXX make this part of the token cache? (but that's swift-only;
278 // and this especially needs to integrates with s3...)
279
280 void rgw_setup_saved_curl_handles()
281 {
282 handles = new RGWCurlHandles();
283 handles->create("rgw_curl");
284 }
285
286 void rgw_release_all_curl_handles()
287 {
288 handles->flush_curl_handles();
289 delete handles;
290 }
291
292 void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
293 {
294 if (id == 0) {
295 id = io_id_provider.get_next();
296 }
297 }
298
299 /*
300 * the following set of callbacks will be called either on RGWHTTPManager::process(),
301 * or via the RGWHTTPManager async processing.
302 */
303 size_t RGWHTTPClient::receive_http_header(void * const ptr,
304 const size_t size,
305 const size_t nmemb,
306 void * const _info)
307 {
308 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
309 size_t len = size * nmemb;
310
311 std::lock_guard l{req_data->lock};
312
313 if (!req_data->registered) {
314 return len;
315 }
316
317 int ret = req_data->client->receive_header(ptr, size * nmemb);
318 if (ret < 0) {
319 dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
320 req_data->user_ret = ret;
321 return CURLE_WRITE_ERROR;
322 }
323
324 return len;
325 }
326
327 size_t RGWHTTPClient::receive_http_data(void * const ptr,
328 const size_t size,
329 const size_t nmemb,
330 void * const _info)
331 {
332 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
333 size_t len = size * nmemb;
334
335 bool pause = false;
336
337 RGWHTTPClient *client;
338
339 {
340 std::lock_guard l{req_data->lock};
341 if (!req_data->registered) {
342 return len;
343 }
344
345 client = req_data->client;
346 }
347
348 size_t& skip_bytes = client->receive_pause_skip;
349
350 if (skip_bytes >= len) {
351 skip_bytes -= len;
352 return len;
353 }
354
355 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
356 if (ret < 0) {
357 dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
358 req_data->user_ret = ret;
359 return CURLE_WRITE_ERROR;
360 }
361
362 if (pause) {
363 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
364 skip_bytes = len;
365 std::lock_guard l{req_data->lock};
366 req_data->read_paused = true;
367 return CURL_WRITEFUNC_PAUSE;
368 }
369
370 skip_bytes = 0;
371
372 return len;
373 }
374
375 size_t RGWHTTPClient::send_http_data(void * const ptr,
376 const size_t size,
377 const size_t nmemb,
378 void * const _info)
379 {
380 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
381
382 RGWHTTPClient *client;
383
384 {
385 std::lock_guard l{req_data->lock};
386
387 if (!req_data->registered) {
388 return 0;
389 }
390
391 client = req_data->client;
392 }
393
394 bool pause = false;
395
396 int ret = client->send_data(ptr, size * nmemb, &pause);
397 if (ret < 0) {
398 dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl;
399 req_data->user_ret = ret;
400 return CURLE_READ_ERROR;
401 }
402
403 if (ret == 0 &&
404 pause) {
405 std::lock_guard l{req_data->lock};
406 req_data->write_paused = true;
407 return CURL_READFUNC_PAUSE;
408 }
409
410 return ret;
411 }
412
413 ceph::mutex& RGWHTTPClient::get_req_lock()
414 {
415 return req_data->lock;
416 }
417
418 void RGWHTTPClient::_set_write_paused(bool pause)
419 {
420 ceph_assert(ceph_mutex_is_locked(req_data->lock));
421
422 RGWHTTPManager *mgr = req_data->mgr;
423 if (pause == req_data->write_paused) {
424 return;
425 }
426 if (pause) {
427 mgr->set_request_state(this, SET_WRITE_PAUSED);
428 } else {
429 mgr->set_request_state(this, SET_WRITE_RESUME);
430 }
431 }
432
433 void RGWHTTPClient::_set_read_paused(bool pause)
434 {
435 ceph_assert(ceph_mutex_is_locked(req_data->lock));
436
437 RGWHTTPManager *mgr = req_data->mgr;
438 if (pause == req_data->read_paused) {
439 return;
440 }
441 if (pause) {
442 mgr->set_request_state(this, SET_READ_PAUSED);
443 } else {
444 mgr->set_request_state(this, SET_READ_RESUME);
445 }
446 }
447
448 static curl_slist *headers_to_slist(param_vec_t& headers)
449 {
450 curl_slist *h = NULL;
451
452 param_vec_t::iterator iter;
453 for (iter = headers.begin(); iter != headers.end(); ++iter) {
454 pair<string, string>& p = *iter;
455 string val = p.first;
456
457 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
458 val = val.substr(5);
459 }
460
461 /* we need to convert all underscores into dashes as some web servers forbid them
462 * in the http header field names
463 */
464 for (size_t i = 0; i < val.size(); i++) {
465 if (val[i] == '_') {
466 val[i] = '-';
467 }
468 }
469
470 val = camelcase_dash_http_attr(val);
471
472 // curl won't send headers with empty values unless it ends with a ; instead
473 if (p.second.empty()) {
474 val.append(1, ';');
475 } else {
476 val.append(": ");
477 val.append(p.second);
478 }
479 h = curl_slist_append(h, val.c_str());
480 }
481
482 return h;
483 }
484
485 static bool is_upload_request(const string& method)
486 {
487 return method == "POST" || method == "PUT";
488 }
489
490 /*
491 * process a single simple one off request
492 */
493 int RGWHTTPClient::process(optional_yield y)
494 {
495 return RGWHTTP::process(this, y);
496 }
497
498 string RGWHTTPClient::to_str()
499 {
500 string method_str = (method.empty() ? "<no-method>" : method);
501 string url_str = (url.empty() ? "<no-url>" : url);
502 return method_str + " " + url_str;
503 }
504
505 int RGWHTTPClient::get_req_retcode()
506 {
507 if (!req_data) {
508 return -EINVAL;
509 }
510
511 return req_data->get_retcode();
512 }
513
514 /*
515 * init request, will be used later with RGWHTTPManager
516 */
517 int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
518 {
519 ceph_assert(!req_data);
520 _req_data->get();
521 req_data = _req_data;
522
523 req_data->curl_handle = do_curl_easy_init();
524
525 CURL *easy_handle = req_data->get_easy_handle();
526
527 dout(20) << "sending request to " << url << dendl;
528
529 curl_slist *h = headers_to_slist(headers);
530
531 req_data->h = h;
532
533 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
534 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
535 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
536 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
537 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
538 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
539 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
540 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
541 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
542 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
543 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
544 if (h) {
545 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
546 }
547 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
548 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
549 if (send_data_hint || is_upload_request(method)) {
550 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
551 }
552 if (has_send_len) {
553 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
554 }
555 if (!verify_ssl) {
556 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
557 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
558 dout(20) << "ssl verification is set to off" << dendl;
559 }
560 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
561
562 return 0;
563 }
564
565 bool RGWHTTPClient::is_done()
566 {
567 return req_data->is_done();
568 }
569
570 /*
571 * wait for async request to complete
572 */
573 int RGWHTTPClient::wait(optional_yield y)
574 {
575 return req_data->wait(y);
576 }
577
578 void RGWHTTPClient::cancel()
579 {
580 if (req_data) {
581 RGWHTTPManager *http_manager = req_data->mgr;
582 if (http_manager) {
583 http_manager->remove_request(this);
584 }
585 }
586 }
587
588 RGWHTTPClient::~RGWHTTPClient()
589 {
590 cancel();
591 if (req_data) {
592 req_data->put();
593 }
594 }
595
596
597 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
598 {
599 const boost::string_ref header_line(static_cast<const char *>(ptr), len);
600
601 /* We're tokening the line that way due to backward compatibility. */
602 const size_t sep_loc = header_line.find_first_of(" \t:");
603
604 if (boost::string_ref::npos == sep_loc) {
605 /* Wrongly formatted header? Just skip it. */
606 return 0;
607 }
608
609 header_name_t name(header_line.substr(0, sep_loc));
610 if (0 == relevant_headers.count(name)) {
611 /* Not interested in this particular header. */
612 return 0;
613 }
614
615 const auto value_part = header_line.substr(sep_loc + 1);
616
617 /* Skip spaces and tabs after the separator. */
618 const size_t val_loc_s = value_part.find_first_not_of(' ');
619 const size_t val_loc_e = value_part.find_first_of("\r\n");
620
621 if (boost::string_ref::npos == val_loc_s ||
622 boost::string_ref::npos == val_loc_e) {
623 /* Empty value case. */
624 found_headers.emplace(name, header_value_t());
625 } else {
626 found_headers.emplace(name, header_value_t(
627 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
628 }
629
630 return 0;
631 }
632
633 int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
634 {
635 int length_to_copy = 0;
636 if (post_data_index < post_data.length()) {
637 length_to_copy = min(post_data.length() - post_data_index, len);
638 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
639 post_data_index += length_to_copy;
640 }
641 return length_to_copy;
642 }
643
644
645 static int clear_signal(int fd)
646 {
647 // since we're in non-blocking mode, we can try to read a lot more than
648 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
649 // are also required to support the curl_multi_wait bug workaround
650 std::array<char, 256> buf;
651 int ret = ::read(fd, (void *)buf.data(), buf.size());
652 if (ret < 0) {
653 ret = -errno;
654 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
655 }
656 return 0;
657 }
658
659 #if HAVE_CURL_MULTI_WAIT
660
661 static std::once_flag detect_flag;
662 static bool curl_multi_wait_bug_present = false;
663
664 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
665 int write_fd, int read_fd)
666 {
667 int ret = 0;
668
669 // write to write_fd so that read_fd becomes readable
670 uint32_t buf = 0;
671 ret = ::write(write_fd, &buf, sizeof(buf));
672 if (ret < 0) {
673 ret = -errno;
674 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
675 return ret;
676 }
677
678 // pass read_fd in extra_fds for curl_multi_wait()
679 int num_fds;
680 struct curl_waitfd wait_fd;
681
682 wait_fd.fd = read_fd;
683 wait_fd.events = CURL_WAIT_POLLIN;
684 wait_fd.revents = 0;
685
686 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
687 if (ret != CURLM_OK) {
688 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
689 return -EIO;
690 }
691
692 // curl_multi_wait should flag revents when extra_fd is readable. if it
693 // doesn't, the bug is present and we can't rely on revents
694 if (wait_fd.revents == 0) {
695 curl_multi_wait_bug_present = true;
696 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
697 "bug in curl_multi_wait(). enabling a workaround that may degrade "
698 "performance slightly." << dendl;
699 }
700
701 return clear_signal(read_fd);
702 }
703
704 static bool is_signaled(const curl_waitfd& wait_fd)
705 {
706 if (wait_fd.fd < 0) {
707 // no fd to signal
708 return false;
709 }
710
711 if (curl_multi_wait_bug_present) {
712 // we can't rely on revents, so we always return true if a wait_fd is given.
713 // this means we'll be trying a non-blocking read on this fd every time that
714 // curl_multi_wait() wakes up
715 return true;
716 }
717
718 return wait_fd.revents > 0;
719 }
720
721 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
722 {
723 int num_fds;
724 struct curl_waitfd wait_fd;
725
726 wait_fd.fd = signal_fd;
727 wait_fd.events = CURL_WAIT_POLLIN;
728 wait_fd.revents = 0;
729
730 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
731 if (ret) {
732 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
733 return -EIO;
734 }
735
736 if (is_signaled(wait_fd)) {
737 ret = clear_signal(signal_fd);
738 if (ret < 0) {
739 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
740 return ret;
741 }
742 }
743 return 0;
744 }
745
746 #else
747
748 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
749 {
750 fd_set fdread;
751 fd_set fdwrite;
752 fd_set fdexcep;
753 int maxfd = -1;
754
755 FD_ZERO(&fdread);
756 FD_ZERO(&fdwrite);
757 FD_ZERO(&fdexcep);
758
759 /* get file descriptors from the transfers */
760 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
761 if (ret) {
762 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
763 return -EIO;
764 }
765
766 if (signal_fd > 0) {
767 FD_SET(signal_fd, &fdread);
768 if (signal_fd >= maxfd) {
769 maxfd = signal_fd + 1;
770 }
771 }
772
773 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
774 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
775 #define RGW_CURL_TIMEOUT 1000
776 if (!to)
777 to = RGW_CURL_TIMEOUT;
778 struct timeval timeout;
779 timeout.tv_sec = to / 1000;
780 timeout.tv_usec = to % 1000;
781
782 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
783 if (ret < 0) {
784 ret = -errno;
785 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
786 return ret;
787 }
788
789 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
790 ret = clear_signal(signal_fd);
791 if (ret < 0) {
792 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
793 return ret;
794 }
795 }
796
797 return 0;
798 }
799
800 #endif
801
802 void *RGWHTTPManager::ReqsThread::entry()
803 {
804 manager->reqs_thread_entry();
805 return NULL;
806 }
807
808 /*
809 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
810 */
811 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
812 completion_mgr(_cm)
813 {
814 multi_handle = (void *)curl_multi_init();
815 thread_pipe[0] = -1;
816 thread_pipe[1] = -1;
817 }
818
819 RGWHTTPManager::~RGWHTTPManager() {
820 stop();
821 if (multi_handle)
822 curl_multi_cleanup((CURLM *)multi_handle);
823 }
824
825 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
826 {
827 std::unique_lock rl{reqs_lock};
828 req_data->id = num_reqs;
829 req_data->registered = true;
830 reqs[num_reqs] = req_data;
831 num_reqs++;
832 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
833 }
834
835 bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
836 {
837 std::unique_lock rl{reqs_lock};
838 if (!req_data->registered) {
839 return false;
840 }
841 req_data->get();
842 req_data->registered = false;
843 unregistered_reqs.push_back(req_data);
844 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
845 return true;
846 }
847
848 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
849 {
850 std::unique_lock rl{reqs_lock};
851 _complete_request(req_data);
852 }
853
854 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
855 {
856 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
857 if (iter != reqs.end()) {
858 reqs.erase(iter);
859 }
860 {
861 std::lock_guard l{req_data->lock};
862 req_data->mgr = nullptr;
863 }
864 if (completion_mgr) {
865 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
866 }
867
868 req_data->put();
869 }
870
871 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status)
872 {
873 req_data->finish(ret, http_status);
874 complete_request(req_data);
875 }
876
877 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
878 {
879 req_data->finish(ret);
880 _complete_request(req_data);
881 }
882
883 void RGWHTTPManager::_set_req_state(set_state& ss)
884 {
885 ss.req->set_state(ss.bitmask);
886 }
887 /*
888 * hook request to the curl multi handle
889 */
890 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
891 {
892 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
893 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
894 if (mstatus) {
895 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
896 return -EIO;
897 }
898 return 0;
899 }
900
901 /*
902 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
903 * there will be no more processing on this request
904 */
905 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
906 {
907 if (req_data->curl_handle) {
908 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
909 }
910 if (!req_data->is_done()) {
911 _finish_request(req_data, -ECANCELED);
912 }
913 }
914
915 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
916 {
917 std::unique_lock wl{reqs_lock};
918 _unlink_request(req_data);
919 }
920
921 void RGWHTTPManager::manage_pending_requests()
922 {
923 reqs_lock.lock_shared();
924 if (max_threaded_req == num_reqs &&
925 unregistered_reqs.empty() &&
926 reqs_change_state.empty()) {
927 reqs_lock.unlock_shared();
928 return;
929 }
930 reqs_lock.unlock_shared();
931
932 std::unique_lock wl{reqs_lock};
933
934 if (!reqs_change_state.empty()) {
935 for (auto siter : reqs_change_state) {
936 _set_req_state(siter);
937 }
938 reqs_change_state.clear();
939 }
940
941 if (!unregistered_reqs.empty()) {
942 for (auto& r : unregistered_reqs) {
943 _unlink_request(r);
944 r->put();
945 }
946
947 unregistered_reqs.clear();
948 }
949
950 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
951
952 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
953
954 for (; iter != reqs.end(); ++iter) {
955 rgw_http_req_data *req_data = iter->second;
956 int r = link_request(req_data);
957 if (r < 0) {
958 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
959 remove_reqs.push_back(std::make_pair(iter->second, r));
960 } else {
961 max_threaded_req = iter->first + 1;
962 }
963 }
964
965 for (auto piter : remove_reqs) {
966 rgw_http_req_data *req_data = piter.first;
967 int r = piter.second;
968
969 _finish_request(req_data, r);
970 }
971 }
972
973 int RGWHTTPManager::add_request(RGWHTTPClient *client)
974 {
975 rgw_http_req_data *req_data = new rgw_http_req_data;
976
977 int ret = client->init_request(req_data);
978 if (ret < 0) {
979 req_data->put();
980 req_data = NULL;
981 return ret;
982 }
983
984 req_data->mgr = this;
985 req_data->client = client;
986 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
987 req_data->user_info = client->get_io_user_info();
988
989 register_request(req_data);
990
991 if (!is_started) {
992 ret = link_request(req_data);
993 if (ret < 0) {
994 req_data->put();
995 req_data = NULL;
996 }
997 return ret;
998 }
999 ret = signal_thread();
1000 if (ret < 0) {
1001 finish_request(req_data, ret);
1002 }
1003
1004 return ret;
1005 }
1006
1007 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
1008 {
1009 rgw_http_req_data *req_data = client->get_req_data();
1010
1011 if (!is_started) {
1012 unlink_request(req_data);
1013 return 0;
1014 }
1015 if (!unregister_request(req_data)) {
1016 return 0;
1017 }
1018 int ret = signal_thread();
1019 if (ret < 0) {
1020 return ret;
1021 }
1022
1023 return 0;
1024 }
1025
1026 int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
1027 {
1028 rgw_http_req_data *req_data = client->get_req_data();
1029
1030 ceph_assert(ceph_mutex_is_locked(req_data->lock));
1031
1032 /* can only do that if threaded */
1033 if (!is_started) {
1034 return -EINVAL;
1035 }
1036
1037 bool suggested_wr_paused = req_data->write_paused;
1038 bool suggested_rd_paused = req_data->read_paused;
1039
1040 switch (state) {
1041 case SET_WRITE_PAUSED:
1042 suggested_wr_paused = true;
1043 break;
1044 case SET_WRITE_RESUME:
1045 suggested_wr_paused = false;
1046 break;
1047 case SET_READ_PAUSED:
1048 suggested_rd_paused = true;
1049 break;
1050 case SET_READ_RESUME:
1051 suggested_rd_paused = false;
1052 break;
1053 default:
1054 /* shouldn't really be here */
1055 return -EIO;
1056 }
1057 if (suggested_wr_paused == req_data->write_paused &&
1058 suggested_rd_paused == req_data->read_paused) {
1059 return 0;
1060 }
1061
1062 req_data->write_paused = suggested_wr_paused;
1063 req_data->read_paused = suggested_rd_paused;
1064
1065 int bitmask = CURLPAUSE_CONT;
1066
1067 if (req_data->write_paused) {
1068 bitmask |= CURLPAUSE_SEND;
1069 }
1070
1071 if (req_data->read_paused) {
1072 bitmask |= CURLPAUSE_RECV;
1073 }
1074
1075 reqs_change_state.push_back(set_state(req_data, bitmask));
1076 int ret = signal_thread();
1077 if (ret < 0) {
1078 return ret;
1079 }
1080
1081 return 0;
1082 }
1083
1084 int RGWHTTPManager::start()
1085 {
1086 if (pipe_cloexec(thread_pipe, 0) < 0) {
1087 int e = errno;
1088 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1089 return -e;
1090 }
1091
1092 // enable non-blocking reads
1093 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1094 int e = errno;
1095 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1096 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1097 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1098 return -e;
1099 }
1100
1101 #ifdef HAVE_CURL_MULTI_WAIT
1102 // on first initialization, use this pipe to detect whether we're using a
1103 // buggy version of libcurl
1104 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1105 static_cast<CURLM*>(multi_handle),
1106 thread_pipe[1], thread_pipe[0]);
1107 #endif
1108
1109 is_started = true;
1110 reqs_thread = new ReqsThread(this);
1111 reqs_thread->create("http_manager");
1112 return 0;
1113 }
1114
1115 void RGWHTTPManager::stop()
1116 {
1117 if (is_stopped) {
1118 return;
1119 }
1120
1121 is_stopped = true;
1122
1123 if (is_started) {
1124 going_down = true;
1125 signal_thread();
1126 reqs_thread->join();
1127 delete reqs_thread;
1128 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1129 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1130 }
1131 }
1132
1133 int RGWHTTPManager::signal_thread()
1134 {
1135 uint32_t buf = 0;
1136 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1137 if (ret < 0) {
1138 ret = -errno;
1139 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1140 return ret;
1141 }
1142 return 0;
1143 }
1144
1145 void *RGWHTTPManager::reqs_thread_entry()
1146 {
1147 int still_running;
1148 int mstatus;
1149
1150 ldout(cct, 20) << __func__ << ": start" << dendl;
1151
1152 while (!going_down) {
1153 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1154 if (ret < 0) {
1155 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1156 return NULL;
1157 }
1158
1159 manage_pending_requests();
1160
1161 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1162 switch (mstatus) {
1163 case CURLM_OK:
1164 case CURLM_CALL_MULTI_PERFORM:
1165 break;
1166 default:
1167 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1168 break;
1169 }
1170 int msgs_left;
1171 CURLMsg *msg;
1172 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1173 if (msg->msg == CURLMSG_DONE) {
1174 int result = msg->data.result;
1175 CURL *e = msg->easy_handle;
1176 rgw_http_req_data *req_data;
1177 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1178 curl_multi_remove_handle((CURLM *)multi_handle, e);
1179
1180 long http_status;
1181 int status;
1182 if (!req_data->user_ret) {
1183 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1184
1185 status = rgw_http_error_to_errno(http_status);
1186 if (result != CURLE_OK && status == 0) {
1187 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1188 status = -EAGAIN;
1189 }
1190 } else {
1191 status = *req_data->user_ret;
1192 rgw_err err;
1193 set_req_state_err(err, status, 0);
1194 http_status = err.http_ret;
1195 }
1196 int id = req_data->id;
1197 finish_request(req_data, status, http_status);
1198 switch (result) {
1199 case CURLE_OK:
1200 break;
1201 case CURLE_OPERATION_TIMEDOUT:
1202 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1203 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1204 default:
1205 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1206 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
1207 break;
1208 }
1209 }
1210 }
1211 }
1212
1213
1214 std::unique_lock rl{reqs_lock};
1215 for (auto r : unregistered_reqs) {
1216 _unlink_request(r);
1217 }
1218
1219 unregistered_reqs.clear();
1220
1221 auto all_reqs = std::move(reqs);
1222 for (auto iter : all_reqs) {
1223 _unlink_request(iter.second);
1224 }
1225
1226 reqs.clear();
1227
1228 if (completion_mgr) {
1229 completion_mgr->go_down();
1230 }
1231
1232 return 0;
1233 }
1234
1235 void rgw_http_client_init(CephContext *cct)
1236 {
1237 curl_global_init(CURL_GLOBAL_ALL);
1238 rgw_http_manager = new RGWHTTPManager(cct);
1239 rgw_http_manager->start();
1240 }
1241
1242 void rgw_http_client_cleanup()
1243 {
1244 rgw_http_manager->stop();
1245 delete rgw_http_manager;
1246 curl_global_cleanup();
1247 }
1248
1249
1250 int RGWHTTP::send(RGWHTTPClient *req) {
1251 if (!req) {
1252 return 0;
1253 }
1254 int r = rgw_http_manager->add_request(req);
1255 if (r < 0) {
1256 return r;
1257 }
1258
1259 return 0;
1260 }
1261
1262 int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
1263 if (!req) {
1264 return 0;
1265 }
1266 int r = send(req);
1267 if (r < 0) {
1268 return r;
1269 }
1270
1271 return req->wait(y);
1272 }
1273