]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
update sources to 12.2.8
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5
6 #include <boost/utility/string_ref.hpp>
7
8 #include <curl/curl.h>
9 #include <curl/easy.h>
10 #include <curl/multi.h>
11
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/RefCountedObj.h"
16
17 #include "rgw_coroutine.h"
18
19 #include <atomic>
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 struct rgw_http_req_data : public RefCountedObject {
25 CURL *easy_handle;
26 curl_slist *h;
27 uint64_t id;
28 int ret;
29 std::atomic<bool> done = { false };
30 RGWHTTPClient *client;
31 void *user_info;
32 bool registered;
33 RGWHTTPManager *mgr;
34 char error_buf[CURL_ERROR_SIZE];
35
36 Mutex lock;
37 Cond cond;
38
39 rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
40 client(nullptr), user_info(nullptr), registered(false),
41 mgr(NULL), lock("rgw_http_req_data::lock") {
42 memset(error_buf, 0, sizeof(error_buf));
43 }
44
45 int wait() {
46 Mutex::Locker l(lock);
47 cond.Wait(lock);
48 return ret;
49 }
50
51
52 void finish(int r) {
53 Mutex::Locker l(lock);
54 ret = r;
55 if (easy_handle)
56 curl_easy_cleanup(easy_handle);
57
58 if (h)
59 curl_slist_free_all(h);
60
61 easy_handle = NULL;
62 h = NULL;
63 done = true;
64 cond.Signal();
65 }
66
67 bool is_done() {
68 return done;
69 }
70
71 int get_retcode() {
72 Mutex::Locker l(lock);
73 return ret;
74 }
75
76 RGWHTTPManager *get_manager() {
77 Mutex::Locker l(lock);
78 return mgr;
79 }
80 };
81
82 struct RGWCurlHandle {
83 int uses;
84 mono_time lastuse;
85 CURL* h;
86
87 RGWCurlHandle(CURL* h) : uses(0), h(h) {};
88 CURL* operator*() {
89 return this->h;
90 }
91 };
92
93 #define MAXIDLE 5
94 class RGWCurlHandles : public Thread {
95 public:
96 Mutex cleaner_lock;
97 std::vector<RGWCurlHandle*>saved_curl;
98 int cleaner_shutdown;
99 Cond cleaner_cond;
100
101 RGWCurlHandles() :
102 cleaner_lock{"RGWCurlHandles::cleaner_lock"},
103 cleaner_shutdown{0} {
104 }
105
106 RGWCurlHandle* get_curl_handle();
107 void release_curl_handle_now(RGWCurlHandle* curl);
108 void release_curl_handle(RGWCurlHandle* curl);
109 void flush_curl_handles();
110 void* entry();
111 void stop();
112 };
113
114 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
115 RGWCurlHandle* curl = 0;
116 CURL* h;
117 {
118 Mutex::Locker lock(cleaner_lock);
119 if (!saved_curl.empty()) {
120 curl = *saved_curl.begin();
121 saved_curl.erase(saved_curl.begin());
122 }
123 }
124 if (curl) {
125 } else if ((h = curl_easy_init())) {
126 curl = new RGWCurlHandle{h};
127 } else {
128 // curl = 0;
129 }
130 return curl;
131 }
132
133 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
134 {
135 curl_easy_cleanup(**curl);
136 delete curl;
137 }
138
139 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
140 {
141 if (cleaner_shutdown) {
142 release_curl_handle_now(curl);
143 } else {
144 curl_easy_reset(**curl);
145 Mutex::Locker lock(cleaner_lock);
146 curl->lastuse = mono_clock::now();
147 saved_curl.insert(saved_curl.begin(), 1, curl);
148 }
149 }
150
151 void* RGWCurlHandles::entry()
152 {
153 RGWCurlHandle* curl;
154 Mutex::Locker lock(cleaner_lock);
155
156 for (;;) {
157 if (cleaner_shutdown) {
158 if (saved_curl.empty())
159 break;
160 } else {
161 utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0);
162 cleaner_cond.WaitUntil(cleaner_lock, release);
163 }
164 mono_time now = mono_clock::now();
165 while (!saved_curl.empty()) {
166 auto cend = saved_curl.end();
167 --cend;
168 curl = *cend;
169 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
170 break;
171 saved_curl.erase(cend);
172 release_curl_handle_now(curl);
173 }
174 }
175 return nullptr;
176 }
177
178 void RGWCurlHandles::stop()
179 {
180 Mutex::Locker lock(cleaner_lock);
181 cleaner_shutdown = 1;
182 cleaner_cond.Signal();
183 }
184
185 void RGWCurlHandles::flush_curl_handles()
186 {
187 stop();
188 join();
189 if (!saved_curl.empty()) {
190 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
191 }
192 saved_curl.shrink_to_fit();
193 }
194
195 static RGWCurlHandles *handles;
196 // XXX make this part of the token cache? (but that's swift-only;
197 // and this especially needs to integrates with s3...)
198
199 void rgw_setup_saved_curl_handles()
200 {
201 handles = new RGWCurlHandles();
202 handles->create("rgw_curl");
203 }
204
205 void rgw_release_all_curl_handles()
206 {
207 handles->flush_curl_handles();
208 delete handles;
209 }
210
211 /*
212 * the simple set of callbacks will be called on RGWHTTPClient::process()
213 */
214 /* Static methods - callbacks for libcurl. */
215 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
216 const size_t size,
217 const size_t nmemb,
218 void * const _info)
219 {
220 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
221 const size_t len = size * nmemb;
222 int ret = client->receive_header(ptr, size * nmemb);
223 if (ret < 0) {
224 dout(0) << "WARNING: client->receive_header() returned ret="
225 << ret << dendl;
226 }
227
228 return len;
229 }
230
231 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
232 const size_t size,
233 const size_t nmemb,
234 void * const _info)
235 {
236 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
237 const size_t len = size * nmemb;
238 int ret = client->receive_data(ptr, size * nmemb);
239 if (ret < 0) {
240 dout(0) << "WARNING: client->receive_data() returned ret="
241 << ret << dendl;
242 }
243
244 return len;
245 }
246
247 size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
248 const size_t size,
249 const size_t nmemb,
250 void * const _info)
251 {
252 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
253 int ret = client->send_data(ptr, size * nmemb);
254 if (ret < 0) {
255 dout(0) << "WARNING: client->send_data() returned ret="
256 << ret << dendl;
257 }
258
259 return ret;
260 }
261
262 /*
263 * the following set of callbacks will be called either on RGWHTTPManager::process(),
264 * or via the RGWHTTPManager async processing.
265 */
266 size_t RGWHTTPClient::receive_http_header(void * const ptr,
267 const size_t size,
268 const size_t nmemb,
269 void * const _info)
270 {
271 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
272 size_t len = size * nmemb;
273
274 Mutex::Locker l(req_data->lock);
275
276 if (!req_data->registered) {
277 return len;
278 }
279
280 int ret = req_data->client->receive_header(ptr, size * nmemb);
281 if (ret < 0) {
282 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
283 }
284
285 return len;
286 }
287
288 size_t RGWHTTPClient::receive_http_data(void * const ptr,
289 const size_t size,
290 const size_t nmemb,
291 void * const _info)
292 {
293 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
294 size_t len = size * nmemb;
295
296 Mutex::Locker l(req_data->lock);
297
298 if (!req_data->registered) {
299 return len;
300 }
301
302 int ret = req_data->client->receive_data(ptr, size * nmemb);
303 if (ret < 0) {
304 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
305 }
306
307 return len;
308 }
309
310 size_t RGWHTTPClient::send_http_data(void * const ptr,
311 const size_t size,
312 const size_t nmemb,
313 void * const _info)
314 {
315 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
316
317 Mutex::Locker l(req_data->lock);
318
319 if (!req_data->registered) {
320 return 0;
321 }
322
323 int ret = req_data->client->send_data(ptr, size * nmemb);
324 if (ret < 0) {
325 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
326 }
327
328 return ret;
329 }
330
331 static curl_slist *headers_to_slist(param_vec_t& headers)
332 {
333 curl_slist *h = NULL;
334
335 param_vec_t::iterator iter;
336 for (iter = headers.begin(); iter != headers.end(); ++iter) {
337 pair<string, string>& p = *iter;
338 string val = p.first;
339
340 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
341 val = val.substr(5);
342 }
343
344 /* we need to convert all underscores into dashes as some web servers forbid them
345 * in the http header field names
346 */
347 for (size_t i = 0; i < val.size(); i++) {
348 if (val[i] == '_') {
349 val[i] = '-';
350 }
351 }
352
353 // curl won't send headers with empty values unless it ends with a ; instead
354 if (p.second.empty()) {
355 val.append(1, ';');
356 } else {
357 val.append(": ");
358 val.append(p.second);
359 }
360 h = curl_slist_append(h, val.c_str());
361 }
362
363 return h;
364 }
365
366 static bool is_upload_request(const char *method)
367 {
368 if (method == nullptr) {
369 return false;
370 }
371 return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
372 }
373
374 /*
375 * process a single simple one off request, not going through RGWHTTPManager. Not using
376 * req_data.
377 */
378 int RGWHTTPClient::process(const char *method, const char *url)
379 {
380 int ret = 0;
381 CURL *curl_handle;
382
383 char error_buf[CURL_ERROR_SIZE];
384
385 last_method = (method ? method : "");
386 last_url = (url ? url : "");
387
388 auto ca = handles->get_curl_handle();
389 curl_handle = **ca;
390
391 dout(20) << "sending request to " << url << dendl;
392
393 curl_slist *h = headers_to_slist(headers);
394
395 curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
396 curl_easy_setopt(curl_handle, CURLOPT_URL, url);
397 curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
398 curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
399 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
400 curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
401 curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
402 curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
403 curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
404 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
405 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
406 if (h) {
407 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
408 }
409 curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
410 curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
411 if (is_upload_request(method)) {
412 curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
413 }
414 if (has_send_len) {
415 curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
416 }
417 if (!verify_ssl) {
418 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
419 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
420 dout(20) << "ssl verification is set to off" << dendl;
421 }
422
423 CURLcode status = curl_easy_perform(curl_handle);
424 if (status) {
425 dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
426 ret = -EINVAL;
427 }
428 curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
429 handles->release_curl_handle(ca);
430 curl_slist_free_all(h);
431
432 return ret;
433 }
434
435 string RGWHTTPClient::to_str()
436 {
437 string method_str = (last_method.empty() ? "<no-method>" : last_method);
438 string url_str = (last_url.empty() ? "<no-url>" : last_url);
439 return method_str + " " + url_str;
440 }
441
442 int RGWHTTPClient::get_req_retcode()
443 {
444 if (!req_data) {
445 return -EINVAL;
446 }
447
448 return req_data->get_retcode();
449 }
450
451 /*
452 * init request, will be used later with RGWHTTPManager
453 */
454 int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
455 {
456 assert(!req_data);
457 _req_data->get();
458 req_data = _req_data;
459
460 CURL *easy_handle;
461
462 easy_handle = curl_easy_init();
463
464 req_data->easy_handle = easy_handle;
465
466 dout(20) << "sending request to " << url << dendl;
467
468 curl_slist *h = headers_to_slist(headers);
469
470 req_data->h = h;
471
472 last_method = (method ? method : "");
473 last_url = (url ? url : "");
474
475 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
476 curl_easy_setopt(easy_handle, CURLOPT_URL, url);
477 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
478 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
479 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
480 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
481 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
482 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
483 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
484 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
485 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
486 if (h) {
487 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
488 }
489 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
490 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
491 if (send_data_hint || is_upload_request(method)) {
492 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
493 }
494 if (has_send_len) {
495 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
496 }
497 if (!verify_ssl) {
498 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
499 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
500 dout(20) << "ssl verification is set to off" << dendl;
501 }
502 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
503
504 return 0;
505 }
506
507 /*
508 * wait for async request to complete
509 */
510 int RGWHTTPClient::wait()
511 {
512 if (!req_data->is_done()) {
513 return req_data->wait();
514 }
515
516 return req_data->ret;
517 }
518
519 RGWHTTPClient::~RGWHTTPClient()
520 {
521 if (req_data) {
522 RGWHTTPManager *http_manager = req_data->get_manager();
523 if (http_manager) {
524 http_manager->remove_request(this);
525 }
526
527 req_data->put();
528 }
529 }
530
531
532 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
533 {
534 const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
535
536 /* We're tokening the line that way due to backward compatibility. */
537 const size_t sep_loc = header_line.find_first_of(" \t:");
538
539 if (boost::string_ref::npos == sep_loc) {
540 /* Wrongly formatted header? Just skip it. */
541 return 0;
542 }
543
544 header_name_t name(header_line.substr(0, sep_loc));
545 if (0 == relevant_headers.count(name)) {
546 /* Not interested in this particular header. */
547 return 0;
548 }
549
550 const auto value_part = header_line.substr(sep_loc + 1);
551
552 /* Skip spaces and tabs after the separator. */
553 const size_t val_loc_s = value_part.find_first_not_of(' ');
554 const size_t val_loc_e = value_part.find_first_of("\r\n");
555
556 if (boost::string_ref::npos == val_loc_s ||
557 boost::string_ref::npos == val_loc_e) {
558 /* Empty value case. */
559 found_headers.emplace(name, header_value_t());
560 } else {
561 found_headers.emplace(name, header_value_t(
562 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
563 }
564
565 return 0;
566 }
567
568 int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
569 {
570 int length_to_copy = 0;
571 if (post_data_index < post_data.length()) {
572 length_to_copy = min(post_data.length() - post_data_index, len);
573 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
574 post_data_index += length_to_copy;
575 }
576 return length_to_copy;
577 }
578
579
580 static int clear_signal(int fd)
581 {
582 // since we're in non-blocking mode, we can try to read a lot more than
583 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
584 // are also required to support the curl_multi_wait bug workaround
585 std::array<char, 256> buf;
586 int ret = ::read(fd, (void *)buf.data(), buf.size());
587 if (ret < 0) {
588 ret = -errno;
589 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
590 }
591 return 0;
592 }
593
594 #if HAVE_CURL_MULTI_WAIT
595
596 static std::once_flag detect_flag;
597 static bool curl_multi_wait_bug_present = false;
598
599 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
600 int write_fd, int read_fd)
601 {
602 int ret = 0;
603
604 // write to write_fd so that read_fd becomes readable
605 uint32_t buf = 0;
606 ret = ::write(write_fd, &buf, sizeof(buf));
607 if (ret < 0) {
608 ret = -errno;
609 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
610 return ret;
611 }
612
613 // pass read_fd in extra_fds for curl_multi_wait()
614 int num_fds;
615 struct curl_waitfd wait_fd;
616
617 wait_fd.fd = read_fd;
618 wait_fd.events = CURL_WAIT_POLLIN;
619 wait_fd.revents = 0;
620
621 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
622 if (ret != CURLM_OK) {
623 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
624 return -EIO;
625 }
626
627 // curl_multi_wait should flag revents when extra_fd is readable. if it
628 // doesn't, the bug is present and we can't rely on revents
629 if (wait_fd.revents == 0) {
630 curl_multi_wait_bug_present = true;
631 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
632 "bug in curl_multi_wait(). enabling a workaround that may degrade "
633 "performance slightly." << dendl;
634 }
635
636 return clear_signal(read_fd);
637 }
638
639 static bool is_signaled(const curl_waitfd& wait_fd)
640 {
641 if (wait_fd.fd < 0) {
642 // no fd to signal
643 return false;
644 }
645
646 if (curl_multi_wait_bug_present) {
647 // we can't rely on revents, so we always return true if a wait_fd is given.
648 // this means we'll be trying a non-blocking read on this fd every time that
649 // curl_multi_wait() wakes up
650 return true;
651 }
652
653 return wait_fd.revents > 0;
654 }
655
656 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
657 {
658 int num_fds;
659 struct curl_waitfd wait_fd;
660
661 wait_fd.fd = signal_fd;
662 wait_fd.events = CURL_WAIT_POLLIN;
663 wait_fd.revents = 0;
664
665 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
666 if (ret) {
667 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
668 return -EIO;
669 }
670
671 if (is_signaled(wait_fd)) {
672 ret = clear_signal(signal_fd);
673 if (ret < 0) {
674 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
675 return ret;
676 }
677 }
678 return 0;
679 }
680
681 #else
682
683 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
684 {
685 fd_set fdread;
686 fd_set fdwrite;
687 fd_set fdexcep;
688 int maxfd = -1;
689
690 FD_ZERO(&fdread);
691 FD_ZERO(&fdwrite);
692 FD_ZERO(&fdexcep);
693
694 /* get file descriptors from the transfers */
695 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
696 if (ret) {
697 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
698 return -EIO;
699 }
700
701 if (signal_fd > 0) {
702 FD_SET(signal_fd, &fdread);
703 if (signal_fd >= maxfd) {
704 maxfd = signal_fd + 1;
705 }
706 }
707
708 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
709 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
710 #define RGW_CURL_TIMEOUT 1000
711 if (!to)
712 to = RGW_CURL_TIMEOUT;
713 struct timeval timeout;
714 timeout.tv_sec = to / 1000;
715 timeout.tv_usec = to % 1000;
716
717 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
718 if (ret < 0) {
719 ret = -errno;
720 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
721 return ret;
722 }
723
724 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
725 ret = clear_signal(signal_fd);
726 if (ret < 0) {
727 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
728 return ret;
729 }
730 }
731
732 return 0;
733 }
734
735 #endif
736
737 void *RGWHTTPManager::ReqsThread::entry()
738 {
739 manager->reqs_thread_entry();
740 return NULL;
741 }
742
743 /*
744 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
745 */
746 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
747 completion_mgr(_cm), is_threaded(false),
748 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
749 reqs_thread(NULL)
750 {
751 multi_handle = (void *)curl_multi_init();
752 thread_pipe[0] = -1;
753 thread_pipe[1] = -1;
754 }
755
756 RGWHTTPManager::~RGWHTTPManager() {
757 stop();
758 if (multi_handle)
759 curl_multi_cleanup((CURLM *)multi_handle);
760 }
761
762 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
763 {
764 RWLock::WLocker rl(reqs_lock);
765 req_data->id = num_reqs;
766 req_data->registered = true;
767 reqs[num_reqs] = req_data;
768 num_reqs++;
769 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
770 }
771
772 void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
773 {
774 RWLock::WLocker rl(reqs_lock);
775 req_data->get();
776 req_data->registered = false;
777 unregistered_reqs.push_back(req_data);
778 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
779 }
780
781 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
782 {
783 RWLock::WLocker rl(reqs_lock);
784 _complete_request(req_data);
785 }
786
787 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
788 {
789 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
790 if (iter != reqs.end()) {
791 reqs.erase(iter);
792 }
793 {
794 Mutex::Locker l(req_data->lock);
795 req_data->mgr = nullptr;
796 }
797 if (completion_mgr) {
798 completion_mgr->complete(NULL, req_data->user_info);
799 }
800
801 req_data->put();
802 }
803
804 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
805 {
806 req_data->finish(ret);
807 complete_request(req_data);
808 }
809
810 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
811 {
812 req_data->finish(ret);
813 _complete_request(req_data);
814 }
815
816 /*
817 * hook request to the curl multi handle
818 */
819 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
820 {
821 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
822 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
823 if (mstatus) {
824 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
825 return -EIO;
826 }
827 return 0;
828 }
829
830 /*
831 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
832 * there will be no more processing on this request
833 */
834 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
835 {
836 if (req_data->easy_handle) {
837 curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
838 }
839 if (!req_data->is_done()) {
840 _finish_request(req_data, -ECANCELED);
841 }
842 }
843
844 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
845 {
846 RWLock::WLocker wl(reqs_lock);
847 _unlink_request(req_data);
848 }
849
850 void RGWHTTPManager::manage_pending_requests()
851 {
852 reqs_lock.get_read();
853 if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
854 reqs_lock.unlock();
855 return;
856 }
857 reqs_lock.unlock();
858
859 RWLock::WLocker wl(reqs_lock);
860
861 if (!unregistered_reqs.empty()) {
862 for (auto& r : unregistered_reqs) {
863 _unlink_request(r);
864 r->put();
865 }
866
867 unregistered_reqs.clear();
868 }
869
870 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
871
872 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
873
874 for (; iter != reqs.end(); ++iter) {
875 rgw_http_req_data *req_data = iter->second;
876 int r = link_request(req_data);
877 if (r < 0) {
878 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
879 remove_reqs.push_back(std::make_pair(iter->second, r));
880 } else {
881 max_threaded_req = iter->first + 1;
882 }
883 }
884
885 for (auto piter : remove_reqs) {
886 rgw_http_req_data *req_data = piter.first;
887 int r = piter.second;
888
889 _finish_request(req_data, r);
890 }
891 }
892
893 int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
894 {
895 rgw_http_req_data *req_data = new rgw_http_req_data;
896
897 int ret = client->init_request(method, url, req_data, send_data_hint);
898 if (ret < 0) {
899 req_data->put();
900 req_data = NULL;
901 return ret;
902 }
903
904 req_data->mgr = this;
905 req_data->client = client;
906 req_data->user_info = client->get_user_info();
907
908 register_request(req_data);
909
910 if (!is_threaded) {
911 ret = link_request(req_data);
912 if (ret < 0) {
913 req_data->put();
914 req_data = NULL;
915 }
916 return ret;
917 }
918 ret = signal_thread();
919 if (ret < 0) {
920 finish_request(req_data, ret);
921 }
922
923 return ret;
924 }
925
926 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
927 {
928 rgw_http_req_data *req_data = client->get_req_data();
929
930 if (!is_threaded) {
931 unlink_request(req_data);
932 return 0;
933 }
934 unregister_request(req_data);
935 int ret = signal_thread();
936 if (ret < 0) {
937 return ret;
938 }
939
940 return 0;
941 }
942
943 /*
944 * the synchronous, non-threaded request processing method.
945 */
946 int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
947 {
948 assert(!is_threaded);
949
950 int still_running;
951 int mstatus;
952
953 do {
954 if (wait_for_data) {
955 int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
956 if (ret < 0) {
957 return ret;
958 }
959 }
960
961 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
962 switch (mstatus) {
963 case CURLM_OK:
964 case CURLM_CALL_MULTI_PERFORM:
965 break;
966 default:
967 dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
968 return -EINVAL;
969 }
970 int msgs_left;
971 CURLMsg *msg;
972 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
973 if (msg->msg == CURLMSG_DONE) {
974 CURL *e = msg->easy_handle;
975 rgw_http_req_data *req_data;
976 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
977
978 long http_status;
979 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
980
981 int status = rgw_http_error_to_errno(http_status);
982 int result = msg->data.result;
983 finish_request(req_data, status);
984 switch (result) {
985 case CURLE_OK:
986 break;
987 default:
988 dout(20) << "ERROR: msg->data.result=" << result << dendl;
989 return -EIO;
990 }
991 }
992 }
993 } while (mstatus == CURLM_CALL_MULTI_PERFORM);
994
995 *done = (still_running == 0);
996
997 return 0;
998 }
999
1000 /*
1001 * the synchronous, non-threaded request processing completion method.
1002 */
1003 int RGWHTTPManager::complete_requests()
1004 {
1005 bool done = false;
1006 int ret;
1007 do {
1008 ret = process_requests(true, &done);
1009 } while (!done && !ret);
1010
1011 return ret;
1012 }
1013
1014 int RGWHTTPManager::set_threaded()
1015 {
1016 int r = pipe(thread_pipe);
1017 if (r < 0) {
1018 r = -errno;
1019 ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
1020 return r;
1021 }
1022
1023 // enable non-blocking reads
1024 r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
1025 if (r < 0) {
1026 r = -errno;
1027 ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
1028 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1029 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1030 return r;
1031 }
1032
1033 #ifdef HAVE_CURL_MULTI_WAIT
1034 // on first initialization, use this pipe to detect whether we're using a
1035 // buggy version of libcurl
1036 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1037 static_cast<CURLM*>(multi_handle),
1038 thread_pipe[1], thread_pipe[0]);
1039 #endif
1040
1041 is_threaded = true;
1042 reqs_thread = new ReqsThread(this);
1043 reqs_thread->create("http_manager");
1044 return 0;
1045 }
1046
1047 void RGWHTTPManager::stop()
1048 {
1049 if (is_stopped) {
1050 return;
1051 }
1052
1053 is_stopped = true;
1054
1055 if (is_threaded) {
1056 going_down = true;
1057 signal_thread();
1058 reqs_thread->join();
1059 delete reqs_thread;
1060 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1061 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1062 }
1063 }
1064
1065 int RGWHTTPManager::signal_thread()
1066 {
1067 uint32_t buf = 0;
1068 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1069 if (ret < 0) {
1070 ret = -errno;
1071 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1072 return ret;
1073 }
1074 return 0;
1075 }
1076
1077 void *RGWHTTPManager::reqs_thread_entry()
1078 {
1079 int still_running;
1080 int mstatus;
1081
1082 ldout(cct, 20) << __func__ << ": start" << dendl;
1083
1084 while (!going_down) {
1085 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1086 if (ret < 0) {
1087 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1088 return NULL;
1089 }
1090
1091 manage_pending_requests();
1092
1093 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1094 switch (mstatus) {
1095 case CURLM_OK:
1096 case CURLM_CALL_MULTI_PERFORM:
1097 break;
1098 default:
1099 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1100 break;
1101 }
1102 int msgs_left;
1103 CURLMsg *msg;
1104 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1105 if (msg->msg == CURLMSG_DONE) {
1106 int result = msg->data.result;
1107 CURL *e = msg->easy_handle;
1108 rgw_http_req_data *req_data;
1109 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1110 curl_multi_remove_handle((CURLM *)multi_handle, e);
1111
1112 long http_status;
1113 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1114
1115 int status = rgw_http_error_to_errno(http_status);
1116 if (result != CURLE_OK && http_status == 0) {
1117 status = -EAGAIN;
1118 }
1119 int id = req_data->id;
1120 finish_request(req_data, status);
1121 switch (result) {
1122 case CURLE_OK:
1123 break;
1124 case CURLE_OPERATION_TIMEDOUT:
1125 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1126 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1127 default:
1128 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1129 break;
1130 }
1131 }
1132 }
1133 }
1134
1135
1136 RWLock::WLocker rl(reqs_lock);
1137 for (auto r : unregistered_reqs) {
1138 _finish_request(r, -ECANCELED);
1139 }
1140
1141 unregistered_reqs.clear();
1142
1143 auto all_reqs = std::move(reqs);
1144 for (auto iter : all_reqs) {
1145 _finish_request(iter.second, -ECANCELED);
1146 }
1147
1148 reqs.clear();
1149
1150 if (completion_mgr) {
1151 completion_mgr->go_down();
1152 }
1153
1154 return 0;
1155 }
1156
1157