]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7 #include <boost/utility/string_ref.hpp>
8
9 #include <curl/curl.h>
10 #include <curl/easy.h>
11 #include <curl/multi.h>
12
13 #include "rgw_common.h"
14 #include "rgw_http_client.h"
15 #include "rgw_http_errors.h"
16 #include "common/RefCountedObj.h"
17
18 #include "rgw_coroutine.h"
19
20 #include <atomic>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 RGWHTTPManager *rgw_http_manager;
26
27 struct RGWCurlHandle;
28
29 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
30
31 struct rgw_http_req_data : public RefCountedObject {
32 RGWCurlHandle *curl_handle{nullptr};
33 curl_slist *h{nullptr};
34 uint64_t id;
35 int ret{0};
36 std::atomic<bool> done = { false };
37 RGWHTTPClient *client{nullptr};
38 rgw_io_id control_io_id;
39 void *user_info{nullptr};
40 bool registered{false};
41 RGWHTTPManager *mgr{nullptr};
42 char error_buf[CURL_ERROR_SIZE];
43 bool write_paused{false};
44 bool read_paused{false};
45
46 Mutex lock;
47 Cond cond;
48
49 rgw_http_req_data() : id(-1), lock("rgw_http_req_data::lock") {
50 memset(error_buf, 0, sizeof(error_buf));
51 }
52
53 int wait() {
54 Mutex::Locker l(lock);
55 if (done) {
56 return ret;
57 }
58 cond.Wait(lock);
59 return ret;
60 }
61
62 void set_state(int bitmask);
63
64 void finish(int r) {
65 Mutex::Locker l(lock);
66 ret = r;
67 if (curl_handle)
68 do_curl_easy_cleanup(curl_handle);
69
70 if (h)
71 curl_slist_free_all(h);
72
73 curl_handle = NULL;
74 h = NULL;
75 done = true;
76 cond.Signal();
77 }
78
79 bool _is_done() {
80 return done;
81 }
82
83 bool is_done() {
84 Mutex::Locker l(lock);
85 return done;
86 }
87
88 int get_retcode() {
89 Mutex::Locker l(lock);
90 return ret;
91 }
92
93 RGWHTTPManager *get_manager() {
94 Mutex::Locker l(lock);
95 return mgr;
96 }
97
98 CURL *get_easy_handle() const;
99 };
100
101 struct RGWCurlHandle {
102 int uses;
103 mono_time lastuse;
104 CURL* h;
105
106 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
107 CURL* operator*() {
108 return this->h;
109 }
110 };
111
112 void rgw_http_req_data::set_state(int bitmask) {
113 /* no need to lock here, moreover curl_easy_pause() might trigger
114 * the data receive callback :/
115 */
116 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
117 if (rc != CURLE_OK) {
118 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
119 }
120 }
121
122 #define MAXIDLE 5
123 class RGWCurlHandles : public Thread {
124 public:
125 Mutex cleaner_lock;
126 std::vector<RGWCurlHandle*>saved_curl;
127 int cleaner_shutdown;
128 Cond cleaner_cond;
129
130 RGWCurlHandles() :
131 cleaner_lock{"RGWCurlHandles::cleaner_lock"},
132 cleaner_shutdown{0} {
133 }
134
135 RGWCurlHandle* get_curl_handle();
136 void release_curl_handle_now(RGWCurlHandle* curl);
137 void release_curl_handle(RGWCurlHandle* curl);
138 void flush_curl_handles();
139 void* entry();
140 void stop();
141 };
142
143 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
144 RGWCurlHandle* curl = 0;
145 CURL* h;
146 {
147 Mutex::Locker lock(cleaner_lock);
148 if (!saved_curl.empty()) {
149 curl = *saved_curl.begin();
150 saved_curl.erase(saved_curl.begin());
151 }
152 }
153 if (curl) {
154 } else if ((h = curl_easy_init())) {
155 curl = new RGWCurlHandle{h};
156 } else {
157 // curl = 0;
158 }
159 return curl;
160 }
161
162 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
163 {
164 curl_easy_cleanup(**curl);
165 delete curl;
166 }
167
168 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
169 {
170 if (cleaner_shutdown) {
171 release_curl_handle_now(curl);
172 } else {
173 curl_easy_reset(**curl);
174 Mutex::Locker lock(cleaner_lock);
175 curl->lastuse = mono_clock::now();
176 saved_curl.insert(saved_curl.begin(), 1, curl);
177 }
178 }
179
180 void* RGWCurlHandles::entry()
181 {
182 RGWCurlHandle* curl;
183 Mutex::Locker lock(cleaner_lock);
184
185 for (;;) {
186 if (cleaner_shutdown) {
187 if (saved_curl.empty())
188 break;
189 } else {
190 utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0);
191 cleaner_cond.WaitUntil(cleaner_lock, release);
192 }
193 mono_time now = mono_clock::now();
194 while (!saved_curl.empty()) {
195 auto cend = saved_curl.end();
196 --cend;
197 curl = *cend;
198 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
199 break;
200 saved_curl.erase(cend);
201 release_curl_handle_now(curl);
202 }
203 }
204 return nullptr;
205 }
206
207 void RGWCurlHandles::stop()
208 {
209 Mutex::Locker lock(cleaner_lock);
210 cleaner_shutdown = 1;
211 cleaner_cond.Signal();
212 }
213
214 void RGWCurlHandles::flush_curl_handles()
215 {
216 stop();
217 join();
218 if (!saved_curl.empty()) {
219 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
220 }
221 saved_curl.shrink_to_fit();
222 }
223
224 CURL *rgw_http_req_data::get_easy_handle() const
225 {
226 return **curl_handle;
227 }
228
229 static RGWCurlHandles *handles;
230
231 static RGWCurlHandle *do_curl_easy_init()
232 {
233 return handles->get_curl_handle();
234 }
235
236 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
237 {
238 handles->release_curl_handle(curl_handle);
239 }
240
241 // XXX make this part of the token cache? (but that's swift-only;
242 // and this especially needs to integrates with s3...)
243
244 void rgw_setup_saved_curl_handles()
245 {
246 handles = new RGWCurlHandles();
247 handles->create("rgw_curl");
248 }
249
250 void rgw_release_all_curl_handles()
251 {
252 handles->flush_curl_handles();
253 delete handles;
254 }
255
256 void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
257 {
258 if (id == 0) {
259 id = io_id_provider.get_next();
260 }
261 }
262
263 /*
264 * the following set of callbacks will be called either on RGWHTTPManager::process(),
265 * or via the RGWHTTPManager async processing.
266 */
267 size_t RGWHTTPClient::receive_http_header(void * const ptr,
268 const size_t size,
269 const size_t nmemb,
270 void * const _info)
271 {
272 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
273 size_t len = size * nmemb;
274
275 Mutex::Locker l(req_data->lock);
276
277 if (!req_data->registered) {
278 return len;
279 }
280
281 int ret = req_data->client->receive_header(ptr, size * nmemb);
282 if (ret < 0) {
283 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
284 }
285
286 return len;
287 }
288
289 size_t RGWHTTPClient::receive_http_data(void * const ptr,
290 const size_t size,
291 const size_t nmemb,
292 void * const _info)
293 {
294 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
295 size_t len = size * nmemb;
296
297 bool pause = false;
298
299 RGWHTTPClient *client;
300
301 {
302 Mutex::Locker l(req_data->lock);
303 if (!req_data->registered) {
304 return len;
305 }
306
307 client = req_data->client;
308 }
309
310 size_t& skip_bytes = client->receive_pause_skip;
311
312 if (skip_bytes >= len) {
313 skip_bytes -= len;
314 return len;
315 }
316
317 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
318 if (ret < 0) {
319 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
320 }
321
322 if (pause) {
323 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
324 skip_bytes = len;
325 Mutex::Locker l(req_data->lock);
326 req_data->read_paused = true;
327 return CURL_WRITEFUNC_PAUSE;
328 }
329
330 skip_bytes = 0;
331
332 return len;
333 }
334
335 size_t RGWHTTPClient::send_http_data(void * const ptr,
336 const size_t size,
337 const size_t nmemb,
338 void * const _info)
339 {
340 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
341
342 RGWHTTPClient *client;
343
344 {
345 Mutex::Locker l(req_data->lock);
346
347 if (!req_data->registered) {
348 return 0;
349 }
350
351 client = req_data->client;
352 }
353
354 bool pause = false;
355
356 int ret = client->send_data(ptr, size * nmemb, &pause);
357 if (ret < 0) {
358 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
359 }
360
361 if (ret == 0 &&
362 pause) {
363 Mutex::Locker l(req_data->lock);
364 req_data->write_paused = true;
365 return CURL_READFUNC_PAUSE;
366 }
367
368 return ret;
369 }
370
371 Mutex& RGWHTTPClient::get_req_lock()
372 {
373 return req_data->lock;
374 }
375
376 void RGWHTTPClient::_set_write_paused(bool pause)
377 {
378 ceph_assert(req_data->lock.is_locked());
379
380 RGWHTTPManager *mgr = req_data->mgr;
381 if (pause == req_data->write_paused) {
382 return;
383 }
384 if (pause) {
385 mgr->set_request_state(this, SET_WRITE_PAUSED);
386 } else {
387 mgr->set_request_state(this, SET_WRITE_RESUME);
388 }
389 }
390
391 void RGWHTTPClient::_set_read_paused(bool pause)
392 {
393 ceph_assert(req_data->lock.is_locked());
394
395 RGWHTTPManager *mgr = req_data->mgr;
396 if (pause == req_data->read_paused) {
397 return;
398 }
399 if (pause) {
400 mgr->set_request_state(this, SET_READ_PAUSED);
401 } else {
402 mgr->set_request_state(this, SET_READ_RESUME);
403 }
404 }
405
406 static curl_slist *headers_to_slist(param_vec_t& headers)
407 {
408 curl_slist *h = NULL;
409
410 param_vec_t::iterator iter;
411 for (iter = headers.begin(); iter != headers.end(); ++iter) {
412 pair<string, string>& p = *iter;
413 string val = p.first;
414
415 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
416 val = val.substr(5);
417 }
418
419 /* we need to convert all underscores into dashes as some web servers forbid them
420 * in the http header field names
421 */
422 for (size_t i = 0; i < val.size(); i++) {
423 if (val[i] == '_') {
424 val[i] = '-';
425 }
426 }
427
428 val = camelcase_dash_http_attr(val);
429
430 // curl won't send headers with empty values unless it ends with a ; instead
431 if (p.second.empty()) {
432 val.append(1, ';');
433 } else {
434 val.append(": ");
435 val.append(p.second);
436 }
437 h = curl_slist_append(h, val.c_str());
438 }
439
440 return h;
441 }
442
443 static bool is_upload_request(const string& method)
444 {
445 return method == "POST" || method == "PUT";
446 }
447
448 /*
449 * process a single simple one off request
450 */
451 int RGWHTTPClient::process()
452 {
453 return RGWHTTP::process(this);
454 }
455
456 string RGWHTTPClient::to_str()
457 {
458 string method_str = (method.empty() ? "<no-method>" : method);
459 string url_str = (url.empty() ? "<no-url>" : url);
460 return method_str + " " + url_str;
461 }
462
463 int RGWHTTPClient::get_req_retcode()
464 {
465 if (!req_data) {
466 return -EINVAL;
467 }
468
469 return req_data->get_retcode();
470 }
471
472 /*
473 * init request, will be used later with RGWHTTPManager
474 */
475 int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
476 {
477 ceph_assert(!req_data);
478 _req_data->get();
479 req_data = _req_data;
480
481 req_data->curl_handle = do_curl_easy_init();
482
483 CURL *easy_handle = req_data->get_easy_handle();
484
485 dout(20) << "sending request to " << url << dendl;
486
487 curl_slist *h = headers_to_slist(headers);
488
489 req_data->h = h;
490
491 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
492 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
493 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
494 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
495 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
496 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
497 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
498 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
499 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
500 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
501 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
502 if (h) {
503 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
504 }
505 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
506 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
507 if (send_data_hint || is_upload_request(method)) {
508 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
509 }
510 if (has_send_len) {
511 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
512 }
513 if (!verify_ssl) {
514 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
515 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
516 dout(20) << "ssl verification is set to off" << dendl;
517 }
518 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
519
520 return 0;
521 }
522
523 bool RGWHTTPClient::is_done()
524 {
525 return req_data->is_done();
526 }
527
528 /*
529 * wait for async request to complete
530 */
531 int RGWHTTPClient::wait()
532 {
533 return req_data->wait();
534 }
535
536 void RGWHTTPClient::cancel()
537 {
538 if (req_data) {
539 RGWHTTPManager *http_manager = req_data->mgr;
540 if (http_manager) {
541 http_manager->remove_request(this);
542 }
543 }
544 }
545
546 RGWHTTPClient::~RGWHTTPClient()
547 {
548 cancel();
549 if (req_data) {
550 req_data->put();
551 }
552 }
553
554
555 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
556 {
557 const boost::string_ref header_line(static_cast<const char *>(ptr), len);
558
559 /* We're tokening the line that way due to backward compatibility. */
560 const size_t sep_loc = header_line.find_first_of(" \t:");
561
562 if (boost::string_ref::npos == sep_loc) {
563 /* Wrongly formatted header? Just skip it. */
564 return 0;
565 }
566
567 header_name_t name(header_line.substr(0, sep_loc));
568 if (0 == relevant_headers.count(name)) {
569 /* Not interested in this particular header. */
570 return 0;
571 }
572
573 const auto value_part = header_line.substr(sep_loc + 1);
574
575 /* Skip spaces and tabs after the separator. */
576 const size_t val_loc_s = value_part.find_first_not_of(' ');
577 const size_t val_loc_e = value_part.find_first_of("\r\n");
578
579 if (boost::string_ref::npos == val_loc_s ||
580 boost::string_ref::npos == val_loc_e) {
581 /* Empty value case. */
582 found_headers.emplace(name, header_value_t());
583 } else {
584 found_headers.emplace(name, header_value_t(
585 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
586 }
587
588 return 0;
589 }
590
591 int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
592 {
593 int length_to_copy = 0;
594 if (post_data_index < post_data.length()) {
595 length_to_copy = min(post_data.length() - post_data_index, len);
596 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
597 post_data_index += length_to_copy;
598 }
599 return length_to_copy;
600 }
601
602
603 static int clear_signal(int fd)
604 {
605 // since we're in non-blocking mode, we can try to read a lot more than
606 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
607 // are also required to support the curl_multi_wait bug workaround
608 std::array<char, 256> buf;
609 int ret = ::read(fd, (void *)buf.data(), buf.size());
610 if (ret < 0) {
611 ret = -errno;
612 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
613 }
614 return 0;
615 }
616
617 #if HAVE_CURL_MULTI_WAIT
618
619 static std::once_flag detect_flag;
620 static bool curl_multi_wait_bug_present = false;
621
622 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
623 int write_fd, int read_fd)
624 {
625 int ret = 0;
626
627 // write to write_fd so that read_fd becomes readable
628 uint32_t buf = 0;
629 ret = ::write(write_fd, &buf, sizeof(buf));
630 if (ret < 0) {
631 ret = -errno;
632 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
633 return ret;
634 }
635
636 // pass read_fd in extra_fds for curl_multi_wait()
637 int num_fds;
638 struct curl_waitfd wait_fd;
639
640 wait_fd.fd = read_fd;
641 wait_fd.events = CURL_WAIT_POLLIN;
642 wait_fd.revents = 0;
643
644 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
645 if (ret != CURLM_OK) {
646 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
647 return -EIO;
648 }
649
650 // curl_multi_wait should flag revents when extra_fd is readable. if it
651 // doesn't, the bug is present and we can't rely on revents
652 if (wait_fd.revents == 0) {
653 curl_multi_wait_bug_present = true;
654 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
655 "bug in curl_multi_wait(). enabling a workaround that may degrade "
656 "performance slightly." << dendl;
657 }
658
659 return clear_signal(read_fd);
660 }
661
662 static bool is_signaled(const curl_waitfd& wait_fd)
663 {
664 if (wait_fd.fd < 0) {
665 // no fd to signal
666 return false;
667 }
668
669 if (curl_multi_wait_bug_present) {
670 // we can't rely on revents, so we always return true if a wait_fd is given.
671 // this means we'll be trying a non-blocking read on this fd every time that
672 // curl_multi_wait() wakes up
673 return true;
674 }
675
676 return wait_fd.revents > 0;
677 }
678
679 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
680 {
681 int num_fds;
682 struct curl_waitfd wait_fd;
683
684 wait_fd.fd = signal_fd;
685 wait_fd.events = CURL_WAIT_POLLIN;
686 wait_fd.revents = 0;
687
688 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
689 if (ret) {
690 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
691 return -EIO;
692 }
693
694 if (is_signaled(wait_fd)) {
695 ret = clear_signal(signal_fd);
696 if (ret < 0) {
697 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
698 return ret;
699 }
700 }
701 return 0;
702 }
703
704 #else
705
706 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
707 {
708 fd_set fdread;
709 fd_set fdwrite;
710 fd_set fdexcep;
711 int maxfd = -1;
712
713 FD_ZERO(&fdread);
714 FD_ZERO(&fdwrite);
715 FD_ZERO(&fdexcep);
716
717 /* get file descriptors from the transfers */
718 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
719 if (ret) {
720 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
721 return -EIO;
722 }
723
724 if (signal_fd > 0) {
725 FD_SET(signal_fd, &fdread);
726 if (signal_fd >= maxfd) {
727 maxfd = signal_fd + 1;
728 }
729 }
730
731 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
732 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
733 #define RGW_CURL_TIMEOUT 1000
734 if (!to)
735 to = RGW_CURL_TIMEOUT;
736 struct timeval timeout;
737 timeout.tv_sec = to / 1000;
738 timeout.tv_usec = to % 1000;
739
740 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
741 if (ret < 0) {
742 ret = -errno;
743 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
744 return ret;
745 }
746
747 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
748 ret = clear_signal(signal_fd);
749 if (ret < 0) {
750 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
751 return ret;
752 }
753 }
754
755 return 0;
756 }
757
758 #endif
759
760 void *RGWHTTPManager::ReqsThread::entry()
761 {
762 manager->reqs_thread_entry();
763 return NULL;
764 }
765
766 /*
767 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
768 */
769 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
770 completion_mgr(_cm), is_started(false),
771 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
772 reqs_thread(NULL)
773 {
774 multi_handle = (void *)curl_multi_init();
775 thread_pipe[0] = -1;
776 thread_pipe[1] = -1;
777 }
778
779 RGWHTTPManager::~RGWHTTPManager() {
780 stop();
781 if (multi_handle)
782 curl_multi_cleanup((CURLM *)multi_handle);
783 }
784
785 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
786 {
787 RWLock::WLocker rl(reqs_lock);
788 req_data->id = num_reqs;
789 req_data->registered = true;
790 reqs[num_reqs] = req_data;
791 num_reqs++;
792 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
793 }
794
795 bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
796 {
797 RWLock::WLocker rl(reqs_lock);
798 if (!req_data->registered) {
799 return false;
800 }
801 req_data->get();
802 req_data->registered = false;
803 unregistered_reqs.push_back(req_data);
804 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
805 return true;
806 }
807
808 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
809 {
810 RWLock::WLocker rl(reqs_lock);
811 _complete_request(req_data);
812 }
813
814 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
815 {
816 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
817 if (iter != reqs.end()) {
818 reqs.erase(iter);
819 }
820 {
821 Mutex::Locker l(req_data->lock);
822 req_data->mgr = nullptr;
823 }
824 if (completion_mgr) {
825 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
826 }
827
828 req_data->put();
829 }
830
831 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
832 {
833 req_data->finish(ret);
834 complete_request(req_data);
835 }
836
837 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
838 {
839 req_data->finish(ret);
840 _complete_request(req_data);
841 }
842
843 void RGWHTTPManager::_set_req_state(set_state& ss)
844 {
845 ss.req->set_state(ss.bitmask);
846 }
847 /*
848 * hook request to the curl multi handle
849 */
850 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
851 {
852 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
853 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
854 if (mstatus) {
855 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
856 return -EIO;
857 }
858 return 0;
859 }
860
861 /*
862 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
863 * there will be no more processing on this request
864 */
865 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
866 {
867 if (req_data->curl_handle) {
868 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
869 }
870 if (!req_data->_is_done()) {
871 _finish_request(req_data, -ECANCELED);
872 }
873 }
874
875 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
876 {
877 RWLock::WLocker wl(reqs_lock);
878 _unlink_request(req_data);
879 }
880
881 void RGWHTTPManager::manage_pending_requests()
882 {
883 reqs_lock.get_read();
884 if (max_threaded_req == num_reqs &&
885 unregistered_reqs.empty() &&
886 reqs_change_state.empty()) {
887 reqs_lock.unlock();
888 return;
889 }
890 reqs_lock.unlock();
891
892 RWLock::WLocker wl(reqs_lock);
893
894 if (!unregistered_reqs.empty()) {
895 for (auto& r : unregistered_reqs) {
896 _unlink_request(r);
897 r->put();
898 }
899
900 unregistered_reqs.clear();
901 }
902
903 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
904
905 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
906
907 for (; iter != reqs.end(); ++iter) {
908 rgw_http_req_data *req_data = iter->second;
909 int r = link_request(req_data);
910 if (r < 0) {
911 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
912 remove_reqs.push_back(std::make_pair(iter->second, r));
913 } else {
914 max_threaded_req = iter->first + 1;
915 }
916 }
917
918 if (!reqs_change_state.empty()) {
919 for (auto siter : reqs_change_state) {
920 _set_req_state(siter);
921 }
922 reqs_change_state.clear();
923 }
924
925 for (auto piter : remove_reqs) {
926 rgw_http_req_data *req_data = piter.first;
927 int r = piter.second;
928
929 _finish_request(req_data, r);
930 }
931 }
932
933 int RGWHTTPManager::add_request(RGWHTTPClient *client)
934 {
935 rgw_http_req_data *req_data = new rgw_http_req_data;
936
937 int ret = client->init_request(req_data);
938 if (ret < 0) {
939 req_data->put();
940 req_data = NULL;
941 return ret;
942 }
943
944 req_data->mgr = this;
945 req_data->client = client;
946 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
947 req_data->user_info = client->get_io_user_info();
948
949 register_request(req_data);
950
951 if (!is_started) {
952 ret = link_request(req_data);
953 if (ret < 0) {
954 req_data->put();
955 req_data = NULL;
956 }
957 return ret;
958 }
959 ret = signal_thread();
960 if (ret < 0) {
961 finish_request(req_data, ret);
962 }
963
964 return ret;
965 }
966
967 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
968 {
969 rgw_http_req_data *req_data = client->get_req_data();
970
971 if (!is_started) {
972 unlink_request(req_data);
973 return 0;
974 }
975 if (!unregister_request(req_data)) {
976 return 0;
977 }
978 int ret = signal_thread();
979 if (ret < 0) {
980 return ret;
981 }
982
983 return 0;
984 }
985
986 int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
987 {
988 rgw_http_req_data *req_data = client->get_req_data();
989
990 ceph_assert(req_data->lock.is_locked());
991
992 /* can only do that if threaded */
993 if (!is_started) {
994 return -EINVAL;
995 }
996
997 bool suggested_wr_paused = req_data->write_paused;
998 bool suggested_rd_paused = req_data->read_paused;
999
1000 switch (state) {
1001 case SET_WRITE_PAUSED:
1002 suggested_wr_paused = true;
1003 break;
1004 case SET_WRITE_RESUME:
1005 suggested_wr_paused = false;
1006 break;
1007 case SET_READ_PAUSED:
1008 suggested_rd_paused = true;
1009 break;
1010 case SET_READ_RESUME:
1011 suggested_rd_paused = false;
1012 break;
1013 default:
1014 /* shouldn't really be here */
1015 return -EIO;
1016 }
1017 if (suggested_wr_paused == req_data->write_paused &&
1018 suggested_rd_paused == req_data->read_paused) {
1019 return 0;
1020 }
1021
1022 req_data->write_paused = suggested_wr_paused;
1023 req_data->read_paused = suggested_rd_paused;
1024
1025 int bitmask = CURLPAUSE_CONT;
1026
1027 if (req_data->write_paused) {
1028 bitmask |= CURLPAUSE_SEND;
1029 }
1030
1031 if (req_data->read_paused) {
1032 bitmask |= CURLPAUSE_RECV;
1033 }
1034
1035 reqs_change_state.push_back(set_state(req_data, bitmask));
1036 int ret = signal_thread();
1037 if (ret < 0) {
1038 return ret;
1039 }
1040
1041 return 0;
1042 }
1043
1044 int RGWHTTPManager::start()
1045 {
1046 if (pipe_cloexec(thread_pipe) < 0) {
1047 int e = errno;
1048 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1049 return -e;
1050 }
1051
1052 // enable non-blocking reads
1053 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1054 int e = errno;
1055 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1056 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1057 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1058 return -e;
1059 }
1060
1061 #ifdef HAVE_CURL_MULTI_WAIT
1062 // on first initialization, use this pipe to detect whether we're using a
1063 // buggy version of libcurl
1064 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1065 static_cast<CURLM*>(multi_handle),
1066 thread_pipe[1], thread_pipe[0]);
1067 #endif
1068
1069 is_started = true;
1070 reqs_thread = new ReqsThread(this);
1071 reqs_thread->create("http_manager");
1072 return 0;
1073 }
1074
1075 void RGWHTTPManager::stop()
1076 {
1077 if (is_stopped) {
1078 return;
1079 }
1080
1081 is_stopped = true;
1082
1083 if (is_started) {
1084 going_down = true;
1085 signal_thread();
1086 reqs_thread->join();
1087 delete reqs_thread;
1088 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1089 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1090 }
1091 }
1092
1093 int RGWHTTPManager::signal_thread()
1094 {
1095 uint32_t buf = 0;
1096 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1097 if (ret < 0) {
1098 ret = -errno;
1099 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1100 return ret;
1101 }
1102 return 0;
1103 }
1104
1105 void *RGWHTTPManager::reqs_thread_entry()
1106 {
1107 int still_running;
1108 int mstatus;
1109
1110 ldout(cct, 20) << __func__ << ": start" << dendl;
1111
1112 while (!going_down) {
1113 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1114 if (ret < 0) {
1115 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1116 return NULL;
1117 }
1118
1119 manage_pending_requests();
1120
1121 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1122 switch (mstatus) {
1123 case CURLM_OK:
1124 case CURLM_CALL_MULTI_PERFORM:
1125 break;
1126 default:
1127 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1128 break;
1129 }
1130 int msgs_left;
1131 CURLMsg *msg;
1132 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1133 if (msg->msg == CURLMSG_DONE) {
1134 int result = msg->data.result;
1135 CURL *e = msg->easy_handle;
1136 rgw_http_req_data *req_data;
1137 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1138 curl_multi_remove_handle((CURLM *)multi_handle, e);
1139
1140 long http_status;
1141 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1142
1143 int status = rgw_http_error_to_errno(http_status);
1144 if (result != CURLE_OK && status == 0) {
1145 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1146 status = -EAGAIN;
1147 }
1148 int id = req_data->id;
1149 finish_request(req_data, status);
1150 switch (result) {
1151 case CURLE_OK:
1152 break;
1153 case CURLE_OPERATION_TIMEDOUT:
1154 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1155 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1156 default:
1157 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1158 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
1159 break;
1160 }
1161 }
1162 }
1163 }
1164
1165
1166 RWLock::WLocker rl(reqs_lock);
1167 for (auto r : unregistered_reqs) {
1168 _unlink_request(r);
1169 }
1170
1171 unregistered_reqs.clear();
1172
1173 auto all_reqs = std::move(reqs);
1174 for (auto iter : all_reqs) {
1175 _unlink_request(iter.second);
1176 }
1177
1178 reqs.clear();
1179
1180 if (completion_mgr) {
1181 completion_mgr->go_down();
1182 }
1183
1184 return 0;
1185 }
1186
1187 void rgw_http_client_init(CephContext *cct)
1188 {
1189 curl_global_init(CURL_GLOBAL_ALL);
1190 rgw_http_manager = new RGWHTTPManager(cct);
1191 rgw_http_manager->start();
1192 }
1193
1194 void rgw_http_client_cleanup()
1195 {
1196 rgw_http_manager->stop();
1197 delete rgw_http_manager;
1198 curl_global_cleanup();
1199 }
1200
1201
1202 int RGWHTTP::send(RGWHTTPClient *req) {
1203 if (!req) {
1204 return 0;
1205 }
1206 int r = rgw_http_manager->add_request(req);
1207 if (r < 0) {
1208 return r;
1209 }
1210
1211 return 0;
1212 }
1213
1214 int RGWHTTP::process(RGWHTTPClient *req) {
1215 if (!req) {
1216 return 0;
1217 }
1218 int r = send(req);
1219 if (r < 0) {
1220 return r;
1221 }
1222
1223 return req->wait();
1224 }
1225