]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7 #include <boost/utility/string_ref.hpp>
8
9 #include <curl/curl.h>
10 #include <curl/easy.h>
11 #include <curl/multi.h>
12
13 #include "rgw_common.h"
14 #include "rgw_http_client.h"
15 #include "rgw_http_errors.h"
16 #include "common/async/completion.h"
17 #include "common/RefCountedObj.h"
18
19 #include "rgw_coroutine.h"
20
21 #include <atomic>
22
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_rgw
25
26 RGWHTTPManager *rgw_http_manager;
27
28 struct RGWCurlHandle;
29
30 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
31
32 struct rgw_http_req_data : public RefCountedObject {
33 RGWCurlHandle *curl_handle{nullptr};
34 curl_slist *h{nullptr};
35 uint64_t id;
36 int ret{0};
37 std::atomic<bool> done = { false };
38 RGWHTTPClient *client{nullptr};
39 rgw_io_id control_io_id;
40 void *user_info{nullptr};
41 bool registered{false};
42 RGWHTTPManager *mgr{nullptr};
43 char error_buf[CURL_ERROR_SIZE];
44 bool write_paused{false};
45 bool read_paused{false};
46
47 Mutex lock;
48 Cond cond;
49
50 using Signature = void(boost::system::error_code);
51 using Completion = ceph::async::Completion<Signature>;
52 std::unique_ptr<Completion> completion;
53
54 rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
55 // FIPS zeroization audit 20191115: this memset is not security related.
56 memset(error_buf, 0, sizeof(error_buf));
57 }
58
59 template <typename ExecutionContext, typename CompletionToken>
60 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
61 boost::asio::async_completion<CompletionToken, Signature> init(token);
62 auto& handler = init.completion_handler;
63 {
64 std::unique_lock l{lock};
65 completion = Completion::create(ctx.get_executor(), std::move(handler));
66 }
67 return init.result.get();
68 }
69 int wait(optional_yield y) {
70 if (done) {
71 return ret;
72 }
73 #ifdef HAVE_BOOST_CONTEXT
74 if (y) {
75 auto& context = y.get_io_context();
76 auto& yield = y.get_yield_context();
77 boost::system::error_code ec;
78 async_wait(context, yield[ec]);
79 return -ec.value();
80 }
81 #endif
82 Mutex::Locker l(lock);
83 cond.Wait(lock);
84 return ret;
85 }
86
87 void set_state(int bitmask);
88
89 void finish(int r) {
90 Mutex::Locker l(lock);
91 ret = r;
92 if (curl_handle)
93 do_curl_easy_cleanup(curl_handle);
94
95 if (h)
96 curl_slist_free_all(h);
97
98 curl_handle = NULL;
99 h = NULL;
100 done = true;
101 if (completion) {
102 boost::system::error_code ec(-ret, boost::system::system_category());
103 Completion::post(std::move(completion), ec);
104 } else {
105 cond.Signal();
106 }
107 }
108
109 bool is_done() {
110 return done;
111 }
112
113 int get_retcode() {
114 Mutex::Locker l(lock);
115 return ret;
116 }
117
118 RGWHTTPManager *get_manager() {
119 Mutex::Locker l(lock);
120 return mgr;
121 }
122
123 CURL *get_easy_handle() const;
124 };
125
126 struct RGWCurlHandle {
127 int uses;
128 mono_time lastuse;
129 CURL* h;
130
131 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
132 CURL* operator*() {
133 return this->h;
134 }
135 };
136
137 void rgw_http_req_data::set_state(int bitmask) {
138 /* no need to lock here, moreover curl_easy_pause() might trigger
139 * the data receive callback :/
140 */
141 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
142 if (rc != CURLE_OK) {
143 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
144 }
145 }
146
147 #define MAXIDLE 5
148 class RGWCurlHandles : public Thread {
149 public:
150 Mutex cleaner_lock;
151 std::vector<RGWCurlHandle*>saved_curl;
152 int cleaner_shutdown;
153 Cond cleaner_cond;
154
155 RGWCurlHandles() :
156 cleaner_lock{"RGWCurlHandles::cleaner_lock"},
157 cleaner_shutdown{0} {
158 }
159
160 RGWCurlHandle* get_curl_handle();
161 void release_curl_handle_now(RGWCurlHandle* curl);
162 void release_curl_handle(RGWCurlHandle* curl);
163 void flush_curl_handles();
164 void* entry();
165 void stop();
166 };
167
168 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
169 RGWCurlHandle* curl = 0;
170 CURL* h;
171 {
172 Mutex::Locker lock(cleaner_lock);
173 if (!saved_curl.empty()) {
174 curl = *saved_curl.begin();
175 saved_curl.erase(saved_curl.begin());
176 }
177 }
178 if (curl) {
179 } else if ((h = curl_easy_init())) {
180 curl = new RGWCurlHandle{h};
181 } else {
182 // curl = 0;
183 }
184 return curl;
185 }
186
187 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
188 {
189 curl_easy_cleanup(**curl);
190 delete curl;
191 }
192
193 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
194 {
195 if (cleaner_shutdown) {
196 release_curl_handle_now(curl);
197 } else {
198 curl_easy_reset(**curl);
199 Mutex::Locker lock(cleaner_lock);
200 curl->lastuse = mono_clock::now();
201 saved_curl.insert(saved_curl.begin(), 1, curl);
202 }
203 }
204
205 void* RGWCurlHandles::entry()
206 {
207 RGWCurlHandle* curl;
208 Mutex::Locker lock(cleaner_lock);
209
210 for (;;) {
211 if (cleaner_shutdown) {
212 if (saved_curl.empty())
213 break;
214 } else {
215 utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0);
216 cleaner_cond.WaitUntil(cleaner_lock, release);
217 }
218 mono_time now = mono_clock::now();
219 while (!saved_curl.empty()) {
220 auto cend = saved_curl.end();
221 --cend;
222 curl = *cend;
223 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
224 break;
225 saved_curl.erase(cend);
226 release_curl_handle_now(curl);
227 }
228 }
229 return nullptr;
230 }
231
232 void RGWCurlHandles::stop()
233 {
234 Mutex::Locker lock(cleaner_lock);
235 cleaner_shutdown = 1;
236 cleaner_cond.Signal();
237 }
238
239 void RGWCurlHandles::flush_curl_handles()
240 {
241 stop();
242 join();
243 if (!saved_curl.empty()) {
244 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
245 }
246 saved_curl.shrink_to_fit();
247 }
248
249 CURL *rgw_http_req_data::get_easy_handle() const
250 {
251 return **curl_handle;
252 }
253
254 static RGWCurlHandles *handles;
255
256 static RGWCurlHandle *do_curl_easy_init()
257 {
258 return handles->get_curl_handle();
259 }
260
261 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
262 {
263 handles->release_curl_handle(curl_handle);
264 }
265
266 // XXX make this part of the token cache? (but that's swift-only;
267 // and this especially needs to integrates with s3...)
268
269 void rgw_setup_saved_curl_handles()
270 {
271 handles = new RGWCurlHandles();
272 handles->create("rgw_curl");
273 }
274
275 void rgw_release_all_curl_handles()
276 {
277 handles->flush_curl_handles();
278 delete handles;
279 }
280
281 void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
282 {
283 if (id == 0) {
284 id = io_id_provider.get_next();
285 }
286 }
287
288 /*
289 * the following set of callbacks will be called either on RGWHTTPManager::process(),
290 * or via the RGWHTTPManager async processing.
291 */
292 size_t RGWHTTPClient::receive_http_header(void * const ptr,
293 const size_t size,
294 const size_t nmemb,
295 void * const _info)
296 {
297 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
298 size_t len = size * nmemb;
299
300 Mutex::Locker l(req_data->lock);
301
302 if (!req_data->registered) {
303 return len;
304 }
305
306 int ret = req_data->client->receive_header(ptr, size * nmemb);
307 if (ret < 0) {
308 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
309 }
310
311 return len;
312 }
313
314 size_t RGWHTTPClient::receive_http_data(void * const ptr,
315 const size_t size,
316 const size_t nmemb,
317 void * const _info)
318 {
319 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
320 size_t len = size * nmemb;
321
322 bool pause = false;
323
324 RGWHTTPClient *client;
325
326 {
327 Mutex::Locker l(req_data->lock);
328 if (!req_data->registered) {
329 return len;
330 }
331
332 client = req_data->client;
333 }
334
335 size_t& skip_bytes = client->receive_pause_skip;
336
337 if (skip_bytes >= len) {
338 skip_bytes -= len;
339 return len;
340 }
341
342 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
343 if (ret < 0) {
344 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
345 }
346
347 if (pause) {
348 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
349 skip_bytes = len;
350 Mutex::Locker l(req_data->lock);
351 req_data->read_paused = true;
352 return CURL_WRITEFUNC_PAUSE;
353 }
354
355 skip_bytes = 0;
356
357 return len;
358 }
359
360 size_t RGWHTTPClient::send_http_data(void * const ptr,
361 const size_t size,
362 const size_t nmemb,
363 void * const _info)
364 {
365 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
366
367 RGWHTTPClient *client;
368
369 {
370 Mutex::Locker l(req_data->lock);
371
372 if (!req_data->registered) {
373 return 0;
374 }
375
376 client = req_data->client;
377 }
378
379 bool pause = false;
380
381 int ret = client->send_data(ptr, size * nmemb, &pause);
382 if (ret < 0) {
383 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
384 }
385
386 if (ret == 0 &&
387 pause) {
388 Mutex::Locker l(req_data->lock);
389 req_data->write_paused = true;
390 return CURL_READFUNC_PAUSE;
391 }
392
393 return ret;
394 }
395
396 Mutex& RGWHTTPClient::get_req_lock()
397 {
398 return req_data->lock;
399 }
400
401 void RGWHTTPClient::_set_write_paused(bool pause)
402 {
403 ceph_assert(req_data->lock.is_locked());
404
405 RGWHTTPManager *mgr = req_data->mgr;
406 if (pause == req_data->write_paused) {
407 return;
408 }
409 if (pause) {
410 mgr->set_request_state(this, SET_WRITE_PAUSED);
411 } else {
412 mgr->set_request_state(this, SET_WRITE_RESUME);
413 }
414 }
415
416 void RGWHTTPClient::_set_read_paused(bool pause)
417 {
418 ceph_assert(req_data->lock.is_locked());
419
420 RGWHTTPManager *mgr = req_data->mgr;
421 if (pause == req_data->read_paused) {
422 return;
423 }
424 if (pause) {
425 mgr->set_request_state(this, SET_READ_PAUSED);
426 } else {
427 mgr->set_request_state(this, SET_READ_RESUME);
428 }
429 }
430
431 static curl_slist *headers_to_slist(param_vec_t& headers)
432 {
433 curl_slist *h = NULL;
434
435 param_vec_t::iterator iter;
436 for (iter = headers.begin(); iter != headers.end(); ++iter) {
437 pair<string, string>& p = *iter;
438 string val = p.first;
439
440 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
441 val = val.substr(5);
442 }
443
444 /* we need to convert all underscores into dashes as some web servers forbid them
445 * in the http header field names
446 */
447 for (size_t i = 0; i < val.size(); i++) {
448 if (val[i] == '_') {
449 val[i] = '-';
450 }
451 }
452
453 val = camelcase_dash_http_attr(val);
454
455 // curl won't send headers with empty values unless it ends with a ; instead
456 if (p.second.empty()) {
457 val.append(1, ';');
458 } else {
459 val.append(": ");
460 val.append(p.second);
461 }
462 h = curl_slist_append(h, val.c_str());
463 }
464
465 return h;
466 }
467
468 static bool is_upload_request(const string& method)
469 {
470 return method == "POST" || method == "PUT";
471 }
472
473 /*
474 * process a single simple one off request
475 */
476 int RGWHTTPClient::process(optional_yield y)
477 {
478 return RGWHTTP::process(this, y);
479 }
480
481 string RGWHTTPClient::to_str()
482 {
483 string method_str = (method.empty() ? "<no-method>" : method);
484 string url_str = (url.empty() ? "<no-url>" : url);
485 return method_str + " " + url_str;
486 }
487
488 int RGWHTTPClient::get_req_retcode()
489 {
490 if (!req_data) {
491 return -EINVAL;
492 }
493
494 return req_data->get_retcode();
495 }
496
497 /*
498 * init request, will be used later with RGWHTTPManager
499 */
500 int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
501 {
502 ceph_assert(!req_data);
503 _req_data->get();
504 req_data = _req_data;
505
506 req_data->curl_handle = do_curl_easy_init();
507
508 CURL *easy_handle = req_data->get_easy_handle();
509
510 dout(20) << "sending request to " << url << dendl;
511
512 curl_slist *h = headers_to_slist(headers);
513
514 req_data->h = h;
515
516 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
517 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
518 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
519 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
520 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
521 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
522 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
523 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
524 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
525 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
526 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
527 if (h) {
528 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
529 }
530 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
531 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
532 if (send_data_hint || is_upload_request(method)) {
533 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
534 }
535 if (has_send_len) {
536 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
537 }
538 if (!verify_ssl) {
539 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
540 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
541 dout(20) << "ssl verification is set to off" << dendl;
542 }
543 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
544
545 return 0;
546 }
547
548 bool RGWHTTPClient::is_done()
549 {
550 return req_data->is_done();
551 }
552
553 /*
554 * wait for async request to complete
555 */
556 int RGWHTTPClient::wait(optional_yield y)
557 {
558 return req_data->wait(y);
559 }
560
561 void RGWHTTPClient::cancel()
562 {
563 if (req_data) {
564 RGWHTTPManager *http_manager = req_data->mgr;
565 if (http_manager) {
566 http_manager->remove_request(this);
567 }
568 }
569 }
570
571 RGWHTTPClient::~RGWHTTPClient()
572 {
573 cancel();
574 if (req_data) {
575 req_data->put();
576 }
577 }
578
579
580 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
581 {
582 const boost::string_ref header_line(static_cast<const char *>(ptr), len);
583
584 /* We're tokening the line that way due to backward compatibility. */
585 const size_t sep_loc = header_line.find_first_of(" \t:");
586
587 if (boost::string_ref::npos == sep_loc) {
588 /* Wrongly formatted header? Just skip it. */
589 return 0;
590 }
591
592 header_name_t name(header_line.substr(0, sep_loc));
593 if (0 == relevant_headers.count(name)) {
594 /* Not interested in this particular header. */
595 return 0;
596 }
597
598 const auto value_part = header_line.substr(sep_loc + 1);
599
600 /* Skip spaces and tabs after the separator. */
601 const size_t val_loc_s = value_part.find_first_not_of(' ');
602 const size_t val_loc_e = value_part.find_first_of("\r\n");
603
604 if (boost::string_ref::npos == val_loc_s ||
605 boost::string_ref::npos == val_loc_e) {
606 /* Empty value case. */
607 found_headers.emplace(name, header_value_t());
608 } else {
609 found_headers.emplace(name, header_value_t(
610 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
611 }
612
613 return 0;
614 }
615
616 int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
617 {
618 int length_to_copy = 0;
619 if (post_data_index < post_data.length()) {
620 length_to_copy = min(post_data.length() - post_data_index, len);
621 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
622 post_data_index += length_to_copy;
623 }
624 return length_to_copy;
625 }
626
627
628 static int clear_signal(int fd)
629 {
630 // since we're in non-blocking mode, we can try to read a lot more than
631 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
632 // are also required to support the curl_multi_wait bug workaround
633 std::array<char, 256> buf;
634 int ret = ::read(fd, (void *)buf.data(), buf.size());
635 if (ret < 0) {
636 ret = -errno;
637 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
638 }
639 return 0;
640 }
641
642 #if HAVE_CURL_MULTI_WAIT
643
644 static std::once_flag detect_flag;
645 static bool curl_multi_wait_bug_present = false;
646
647 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
648 int write_fd, int read_fd)
649 {
650 int ret = 0;
651
652 // write to write_fd so that read_fd becomes readable
653 uint32_t buf = 0;
654 ret = ::write(write_fd, &buf, sizeof(buf));
655 if (ret < 0) {
656 ret = -errno;
657 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
658 return ret;
659 }
660
661 // pass read_fd in extra_fds for curl_multi_wait()
662 int num_fds;
663 struct curl_waitfd wait_fd;
664
665 wait_fd.fd = read_fd;
666 wait_fd.events = CURL_WAIT_POLLIN;
667 wait_fd.revents = 0;
668
669 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
670 if (ret != CURLM_OK) {
671 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
672 return -EIO;
673 }
674
675 // curl_multi_wait should flag revents when extra_fd is readable. if it
676 // doesn't, the bug is present and we can't rely on revents
677 if (wait_fd.revents == 0) {
678 curl_multi_wait_bug_present = true;
679 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
680 "bug in curl_multi_wait(). enabling a workaround that may degrade "
681 "performance slightly." << dendl;
682 }
683
684 return clear_signal(read_fd);
685 }
686
687 static bool is_signaled(const curl_waitfd& wait_fd)
688 {
689 if (wait_fd.fd < 0) {
690 // no fd to signal
691 return false;
692 }
693
694 if (curl_multi_wait_bug_present) {
695 // we can't rely on revents, so we always return true if a wait_fd is given.
696 // this means we'll be trying a non-blocking read on this fd every time that
697 // curl_multi_wait() wakes up
698 return true;
699 }
700
701 return wait_fd.revents > 0;
702 }
703
704 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
705 {
706 int num_fds;
707 struct curl_waitfd wait_fd;
708
709 wait_fd.fd = signal_fd;
710 wait_fd.events = CURL_WAIT_POLLIN;
711 wait_fd.revents = 0;
712
713 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
714 if (ret) {
715 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
716 return -EIO;
717 }
718
719 if (is_signaled(wait_fd)) {
720 ret = clear_signal(signal_fd);
721 if (ret < 0) {
722 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
723 return ret;
724 }
725 }
726 return 0;
727 }
728
729 #else
730
731 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
732 {
733 fd_set fdread;
734 fd_set fdwrite;
735 fd_set fdexcep;
736 int maxfd = -1;
737
738 FD_ZERO(&fdread);
739 FD_ZERO(&fdwrite);
740 FD_ZERO(&fdexcep);
741
742 /* get file descriptors from the transfers */
743 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
744 if (ret) {
745 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
746 return -EIO;
747 }
748
749 if (signal_fd > 0) {
750 FD_SET(signal_fd, &fdread);
751 if (signal_fd >= maxfd) {
752 maxfd = signal_fd + 1;
753 }
754 }
755
756 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
757 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
758 #define RGW_CURL_TIMEOUT 1000
759 if (!to)
760 to = RGW_CURL_TIMEOUT;
761 struct timeval timeout;
762 timeout.tv_sec = to / 1000;
763 timeout.tv_usec = to % 1000;
764
765 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
766 if (ret < 0) {
767 ret = -errno;
768 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
769 return ret;
770 }
771
772 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
773 ret = clear_signal(signal_fd);
774 if (ret < 0) {
775 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
776 return ret;
777 }
778 }
779
780 return 0;
781 }
782
783 #endif
784
785 void *RGWHTTPManager::ReqsThread::entry()
786 {
787 manager->reqs_thread_entry();
788 return NULL;
789 }
790
791 /*
792 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
793 */
794 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
795 completion_mgr(_cm), is_started(false),
796 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
797 reqs_thread(NULL)
798 {
799 multi_handle = (void *)curl_multi_init();
800 thread_pipe[0] = -1;
801 thread_pipe[1] = -1;
802 }
803
804 RGWHTTPManager::~RGWHTTPManager() {
805 stop();
806 if (multi_handle)
807 curl_multi_cleanup((CURLM *)multi_handle);
808 }
809
810 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
811 {
812 RWLock::WLocker rl(reqs_lock);
813 req_data->id = num_reqs;
814 req_data->registered = true;
815 reqs[num_reqs] = req_data;
816 num_reqs++;
817 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
818 }
819
820 bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
821 {
822 RWLock::WLocker rl(reqs_lock);
823 if (!req_data->registered) {
824 return false;
825 }
826 req_data->get();
827 req_data->registered = false;
828 unregistered_reqs.push_back(req_data);
829 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
830 return true;
831 }
832
833 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
834 {
835 RWLock::WLocker rl(reqs_lock);
836 _complete_request(req_data);
837 }
838
839 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
840 {
841 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
842 if (iter != reqs.end()) {
843 reqs.erase(iter);
844 }
845 {
846 Mutex::Locker l(req_data->lock);
847 req_data->mgr = nullptr;
848 }
849 if (completion_mgr) {
850 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
851 }
852
853 req_data->put();
854 }
855
856 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
857 {
858 req_data->finish(ret);
859 complete_request(req_data);
860 }
861
862 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
863 {
864 req_data->finish(ret);
865 _complete_request(req_data);
866 }
867
868 void RGWHTTPManager::_set_req_state(set_state& ss)
869 {
870 ss.req->set_state(ss.bitmask);
871 }
872 /*
873 * hook request to the curl multi handle
874 */
875 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
876 {
877 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
878 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
879 if (mstatus) {
880 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
881 return -EIO;
882 }
883 return 0;
884 }
885
886 /*
887 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
888 * there will be no more processing on this request
889 */
890 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
891 {
892 if (req_data->curl_handle) {
893 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
894 }
895 if (!req_data->is_done()) {
896 _finish_request(req_data, -ECANCELED);
897 }
898 }
899
900 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
901 {
902 RWLock::WLocker wl(reqs_lock);
903 _unlink_request(req_data);
904 }
905
906 void RGWHTTPManager::manage_pending_requests()
907 {
908 reqs_lock.get_read();
909 if (max_threaded_req == num_reqs &&
910 unregistered_reqs.empty() &&
911 reqs_change_state.empty()) {
912 reqs_lock.unlock();
913 return;
914 }
915 reqs_lock.unlock();
916
917 RWLock::WLocker wl(reqs_lock);
918
919 if (!unregistered_reqs.empty()) {
920 for (auto& r : unregistered_reqs) {
921 _unlink_request(r);
922 r->put();
923 }
924
925 unregistered_reqs.clear();
926 }
927
928 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
929
930 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
931
932 for (; iter != reqs.end(); ++iter) {
933 rgw_http_req_data *req_data = iter->second;
934 int r = link_request(req_data);
935 if (r < 0) {
936 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
937 remove_reqs.push_back(std::make_pair(iter->second, r));
938 } else {
939 max_threaded_req = iter->first + 1;
940 }
941 }
942
943 if (!reqs_change_state.empty()) {
944 for (auto siter : reqs_change_state) {
945 _set_req_state(siter);
946 }
947 reqs_change_state.clear();
948 }
949
950 for (auto piter : remove_reqs) {
951 rgw_http_req_data *req_data = piter.first;
952 int r = piter.second;
953
954 _finish_request(req_data, r);
955 }
956 }
957
958 int RGWHTTPManager::add_request(RGWHTTPClient *client)
959 {
960 rgw_http_req_data *req_data = new rgw_http_req_data;
961
962 int ret = client->init_request(req_data);
963 if (ret < 0) {
964 req_data->put();
965 req_data = NULL;
966 return ret;
967 }
968
969 req_data->mgr = this;
970 req_data->client = client;
971 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
972 req_data->user_info = client->get_io_user_info();
973
974 register_request(req_data);
975
976 if (!is_started) {
977 ret = link_request(req_data);
978 if (ret < 0) {
979 req_data->put();
980 req_data = NULL;
981 }
982 return ret;
983 }
984 ret = signal_thread();
985 if (ret < 0) {
986 finish_request(req_data, ret);
987 }
988
989 return ret;
990 }
991
992 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
993 {
994 rgw_http_req_data *req_data = client->get_req_data();
995
996 if (!is_started) {
997 unlink_request(req_data);
998 return 0;
999 }
1000 if (!unregister_request(req_data)) {
1001 return 0;
1002 }
1003 int ret = signal_thread();
1004 if (ret < 0) {
1005 return ret;
1006 }
1007
1008 return 0;
1009 }
1010
1011 int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
1012 {
1013 rgw_http_req_data *req_data = client->get_req_data();
1014
1015 ceph_assert(req_data->lock.is_locked());
1016
1017 /* can only do that if threaded */
1018 if (!is_started) {
1019 return -EINVAL;
1020 }
1021
1022 bool suggested_wr_paused = req_data->write_paused;
1023 bool suggested_rd_paused = req_data->read_paused;
1024
1025 switch (state) {
1026 case SET_WRITE_PAUSED:
1027 suggested_wr_paused = true;
1028 break;
1029 case SET_WRITE_RESUME:
1030 suggested_wr_paused = false;
1031 break;
1032 case SET_READ_PAUSED:
1033 suggested_rd_paused = true;
1034 break;
1035 case SET_READ_RESUME:
1036 suggested_rd_paused = false;
1037 break;
1038 default:
1039 /* shouldn't really be here */
1040 return -EIO;
1041 }
1042 if (suggested_wr_paused == req_data->write_paused &&
1043 suggested_rd_paused == req_data->read_paused) {
1044 return 0;
1045 }
1046
1047 req_data->write_paused = suggested_wr_paused;
1048 req_data->read_paused = suggested_rd_paused;
1049
1050 int bitmask = CURLPAUSE_CONT;
1051
1052 if (req_data->write_paused) {
1053 bitmask |= CURLPAUSE_SEND;
1054 }
1055
1056 if (req_data->read_paused) {
1057 bitmask |= CURLPAUSE_RECV;
1058 }
1059
1060 reqs_change_state.push_back(set_state(req_data, bitmask));
1061 int ret = signal_thread();
1062 if (ret < 0) {
1063 return ret;
1064 }
1065
1066 return 0;
1067 }
1068
1069 int RGWHTTPManager::start()
1070 {
1071 if (pipe_cloexec(thread_pipe) < 0) {
1072 int e = errno;
1073 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1074 return -e;
1075 }
1076
1077 // enable non-blocking reads
1078 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1079 int e = errno;
1080 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1081 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1082 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1083 return -e;
1084 }
1085
1086 #ifdef HAVE_CURL_MULTI_WAIT
1087 // on first initialization, use this pipe to detect whether we're using a
1088 // buggy version of libcurl
1089 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1090 static_cast<CURLM*>(multi_handle),
1091 thread_pipe[1], thread_pipe[0]);
1092 #endif
1093
1094 is_started = true;
1095 reqs_thread = new ReqsThread(this);
1096 reqs_thread->create("http_manager");
1097 return 0;
1098 }
1099
1100 void RGWHTTPManager::stop()
1101 {
1102 if (is_stopped) {
1103 return;
1104 }
1105
1106 is_stopped = true;
1107
1108 if (is_started) {
1109 going_down = true;
1110 signal_thread();
1111 reqs_thread->join();
1112 delete reqs_thread;
1113 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1114 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1115 }
1116 }
1117
1118 int RGWHTTPManager::signal_thread()
1119 {
1120 uint32_t buf = 0;
1121 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1122 if (ret < 0) {
1123 ret = -errno;
1124 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1125 return ret;
1126 }
1127 return 0;
1128 }
1129
1130 void *RGWHTTPManager::reqs_thread_entry()
1131 {
1132 int still_running;
1133 int mstatus;
1134
1135 ldout(cct, 20) << __func__ << ": start" << dendl;
1136
1137 while (!going_down) {
1138 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1139 if (ret < 0) {
1140 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1141 return NULL;
1142 }
1143
1144 manage_pending_requests();
1145
1146 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1147 switch (mstatus) {
1148 case CURLM_OK:
1149 case CURLM_CALL_MULTI_PERFORM:
1150 break;
1151 default:
1152 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1153 break;
1154 }
1155 int msgs_left;
1156 CURLMsg *msg;
1157 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1158 if (msg->msg == CURLMSG_DONE) {
1159 int result = msg->data.result;
1160 CURL *e = msg->easy_handle;
1161 rgw_http_req_data *req_data;
1162 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1163 curl_multi_remove_handle((CURLM *)multi_handle, e);
1164
1165 long http_status;
1166 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1167
1168 int status = rgw_http_error_to_errno(http_status);
1169 if (result != CURLE_OK && status == 0) {
1170 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1171 status = -EAGAIN;
1172 }
1173 int id = req_data->id;
1174 finish_request(req_data, status);
1175 switch (result) {
1176 case CURLE_OK:
1177 break;
1178 case CURLE_OPERATION_TIMEDOUT:
1179 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1180 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1181 default:
1182 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1183 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
1184 break;
1185 }
1186 }
1187 }
1188 }
1189
1190
1191 RWLock::WLocker rl(reqs_lock);
1192 for (auto r : unregistered_reqs) {
1193 _unlink_request(r);
1194 }
1195
1196 unregistered_reqs.clear();
1197
1198 auto all_reqs = std::move(reqs);
1199 for (auto iter : all_reqs) {
1200 _unlink_request(iter.second);
1201 }
1202
1203 reqs.clear();
1204
1205 if (completion_mgr) {
1206 completion_mgr->go_down();
1207 }
1208
1209 return 0;
1210 }
1211
1212 void rgw_http_client_init(CephContext *cct)
1213 {
1214 curl_global_init(CURL_GLOBAL_ALL);
1215 rgw_http_manager = new RGWHTTPManager(cct);
1216 rgw_http_manager->start();
1217 }
1218
1219 void rgw_http_client_cleanup()
1220 {
1221 rgw_http_manager->stop();
1222 delete rgw_http_manager;
1223 curl_global_cleanup();
1224 }
1225
1226
1227 int RGWHTTP::send(RGWHTTPClient *req) {
1228 if (!req) {
1229 return 0;
1230 }
1231 int r = rgw_http_manager->add_request(req);
1232 if (r < 0) {
1233 return r;
1234 }
1235
1236 return 0;
1237 }
1238
1239 int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
1240 if (!req) {
1241 return 0;
1242 }
1243 int r = send(req);
1244 if (r < 0) {
1245 return r;
1246 }
1247
1248 return req->wait(y);
1249 }
1250