]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7 #include <boost/utility/string_ref.hpp>
8
9 #include <curl/curl.h>
10 #include <curl/easy.h>
11 #include <curl/multi.h>
12
13 #include "rgw_common.h"
14 #include "rgw_http_client.h"
15 #include "rgw_http_errors.h"
16 #include "common/RefCountedObj.h"
17
18 #include "rgw_coroutine.h"
19
20 #include <atomic>
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 struct rgw_http_req_data : public RefCountedObject {
26 CURL *easy_handle;
27 curl_slist *h;
28 uint64_t id;
29 int ret;
30 std::atomic<bool> done = { false };
31 RGWHTTPClient *client;
32 void *user_info;
33 bool registered;
34 RGWHTTPManager *mgr;
35 char error_buf[CURL_ERROR_SIZE];
36
37 Mutex lock;
38 Cond cond;
39
40 rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
41 client(nullptr), user_info(nullptr), registered(false),
42 mgr(NULL), lock("rgw_http_req_data::lock") {
43 memset(error_buf, 0, sizeof(error_buf));
44 }
45
46 int wait() {
47 Mutex::Locker l(lock);
48 cond.Wait(lock);
49 return ret;
50 }
51
52
53 void finish(int r) {
54 Mutex::Locker l(lock);
55 ret = r;
56 if (easy_handle)
57 curl_easy_cleanup(easy_handle);
58
59 if (h)
60 curl_slist_free_all(h);
61
62 easy_handle = NULL;
63 h = NULL;
64 done = true;
65 cond.Signal();
66 }
67
68 bool is_done() {
69 return done;
70 }
71
72 int get_retcode() {
73 Mutex::Locker l(lock);
74 return ret;
75 }
76
77 RGWHTTPManager *get_manager() {
78 Mutex::Locker l(lock);
79 return mgr;
80 }
81 };
82
83 struct RGWCurlHandle {
84 int uses;
85 mono_time lastuse;
86 CURL* h;
87
88 RGWCurlHandle(CURL* h) : uses(0), h(h) {};
89 CURL* operator*() {
90 return this->h;
91 }
92 };
93
94 #define MAXIDLE 5
95 class RGWCurlHandles : public Thread {
96 public:
97 Mutex cleaner_lock;
98 std::vector<RGWCurlHandle*>saved_curl;
99 int cleaner_shutdown;
100 Cond cleaner_cond;
101
102 RGWCurlHandles() :
103 cleaner_lock{"RGWCurlHandles::cleaner_lock"},
104 cleaner_shutdown{0} {
105 }
106
107 RGWCurlHandle* get_curl_handle();
108 void release_curl_handle_now(RGWCurlHandle* curl);
109 void release_curl_handle(RGWCurlHandle* curl);
110 void flush_curl_handles();
111 void* entry();
112 void stop();
113 };
114
115 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
116 RGWCurlHandle* curl = 0;
117 CURL* h;
118 {
119 Mutex::Locker lock(cleaner_lock);
120 if (!saved_curl.empty()) {
121 curl = *saved_curl.begin();
122 saved_curl.erase(saved_curl.begin());
123 }
124 }
125 if (curl) {
126 } else if ((h = curl_easy_init())) {
127 curl = new RGWCurlHandle{h};
128 } else {
129 // curl = 0;
130 }
131 return curl;
132 }
133
134 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
135 {
136 curl_easy_cleanup(**curl);
137 delete curl;
138 }
139
140 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
141 {
142 if (cleaner_shutdown) {
143 release_curl_handle_now(curl);
144 } else {
145 curl_easy_reset(**curl);
146 Mutex::Locker lock(cleaner_lock);
147 curl->lastuse = mono_clock::now();
148 saved_curl.insert(saved_curl.begin(), 1, curl);
149 }
150 }
151
152 void* RGWCurlHandles::entry()
153 {
154 RGWCurlHandle* curl;
155 Mutex::Locker lock(cleaner_lock);
156
157 for (;;) {
158 if (cleaner_shutdown) {
159 if (saved_curl.empty())
160 break;
161 } else {
162 utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0);
163 cleaner_cond.WaitUntil(cleaner_lock, release);
164 }
165 mono_time now = mono_clock::now();
166 while (!saved_curl.empty()) {
167 auto cend = saved_curl.end();
168 --cend;
169 curl = *cend;
170 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
171 break;
172 saved_curl.erase(cend);
173 release_curl_handle_now(curl);
174 }
175 }
176 return nullptr;
177 }
178
179 void RGWCurlHandles::stop()
180 {
181 Mutex::Locker lock(cleaner_lock);
182 cleaner_shutdown = 1;
183 cleaner_cond.Signal();
184 }
185
186 void RGWCurlHandles::flush_curl_handles()
187 {
188 stop();
189 join();
190 if (!saved_curl.empty()) {
191 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
192 }
193 saved_curl.shrink_to_fit();
194 }
195
196 static RGWCurlHandles *handles;
197 // XXX make this part of the token cache? (but that's swift-only;
198 // and this especially needs to integrates with s3...)
199
200 void rgw_setup_saved_curl_handles()
201 {
202 handles = new RGWCurlHandles();
203 handles->create("rgw_curl");
204 }
205
206 void rgw_release_all_curl_handles()
207 {
208 handles->flush_curl_handles();
209 delete handles;
210 }
211
212 /*
213 * the simple set of callbacks will be called on RGWHTTPClient::process()
214 */
215 /* Static methods - callbacks for libcurl. */
216 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
217 const size_t size,
218 const size_t nmemb,
219 void * const _info)
220 {
221 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
222 const size_t len = size * nmemb;
223 int ret = client->receive_header(ptr, size * nmemb);
224 if (ret < 0) {
225 dout(0) << "WARNING: client->receive_header() returned ret="
226 << ret << dendl;
227 }
228
229 return len;
230 }
231
232 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
233 const size_t size,
234 const size_t nmemb,
235 void * const _info)
236 {
237 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
238 const size_t len = size * nmemb;
239 int ret = client->receive_data(ptr, size * nmemb);
240 if (ret < 0) {
241 dout(0) << "WARNING: client->receive_data() returned ret="
242 << ret << dendl;
243 }
244
245 return len;
246 }
247
248 size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
249 const size_t size,
250 const size_t nmemb,
251 void * const _info)
252 {
253 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
254 int ret = client->send_data(ptr, size * nmemb);
255 if (ret < 0) {
256 dout(0) << "WARNING: client->send_data() returned ret="
257 << ret << dendl;
258 }
259
260 return ret;
261 }
262
263 /*
264 * the following set of callbacks will be called either on RGWHTTPManager::process(),
265 * or via the RGWHTTPManager async processing.
266 */
267 size_t RGWHTTPClient::receive_http_header(void * const ptr,
268 const size_t size,
269 const size_t nmemb,
270 void * const _info)
271 {
272 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
273 size_t len = size * nmemb;
274
275 Mutex::Locker l(req_data->lock);
276
277 if (!req_data->registered) {
278 return len;
279 }
280
281 int ret = req_data->client->receive_header(ptr, size * nmemb);
282 if (ret < 0) {
283 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
284 }
285
286 return len;
287 }
288
289 size_t RGWHTTPClient::receive_http_data(void * const ptr,
290 const size_t size,
291 const size_t nmemb,
292 void * const _info)
293 {
294 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
295 size_t len = size * nmemb;
296
297 Mutex::Locker l(req_data->lock);
298
299 if (!req_data->registered) {
300 return len;
301 }
302
303 int ret = req_data->client->receive_data(ptr, size * nmemb);
304 if (ret < 0) {
305 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
306 }
307
308 return len;
309 }
310
311 size_t RGWHTTPClient::send_http_data(void * const ptr,
312 const size_t size,
313 const size_t nmemb,
314 void * const _info)
315 {
316 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
317
318 Mutex::Locker l(req_data->lock);
319
320 if (!req_data->registered) {
321 return 0;
322 }
323
324 int ret = req_data->client->send_data(ptr, size * nmemb);
325 if (ret < 0) {
326 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
327 }
328
329 return ret;
330 }
331
332 static curl_slist *headers_to_slist(param_vec_t& headers)
333 {
334 curl_slist *h = NULL;
335
336 param_vec_t::iterator iter;
337 for (iter = headers.begin(); iter != headers.end(); ++iter) {
338 pair<string, string>& p = *iter;
339 string val = p.first;
340
341 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
342 val = val.substr(5);
343 }
344
345 /* we need to convert all underscores into dashes as some web servers forbid them
346 * in the http header field names
347 */
348 for (size_t i = 0; i < val.size(); i++) {
349 if (val[i] == '_') {
350 val[i] = '-';
351 }
352 }
353
354 // curl won't send headers with empty values unless it ends with a ; instead
355 if (p.second.empty()) {
356 val.append(1, ';');
357 } else {
358 val.append(": ");
359 val.append(p.second);
360 }
361 h = curl_slist_append(h, val.c_str());
362 }
363
364 return h;
365 }
366
367 static bool is_upload_request(const char *method)
368 {
369 if (method == nullptr) {
370 return false;
371 }
372 return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
373 }
374
375 /*
376 * process a single simple one off request, not going through RGWHTTPManager. Not using
377 * req_data.
378 */
379 int RGWHTTPClient::process(const char *method, const char *url)
380 {
381 int ret = 0;
382 CURL *curl_handle;
383
384 char error_buf[CURL_ERROR_SIZE];
385
386 last_method = (method ? method : "");
387 last_url = (url ? url : "");
388
389 auto ca = handles->get_curl_handle();
390 curl_handle = **ca;
391
392 dout(20) << "sending request to " << url << dendl;
393
394 curl_slist *h = headers_to_slist(headers);
395
396 curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
397 curl_easy_setopt(curl_handle, CURLOPT_URL, url);
398 curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
399 curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
400 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
401 curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
402 curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
403 curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
404 curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
405 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
406 curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
407 if (h) {
408 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
409 }
410 curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
411 curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
412 if (is_upload_request(method)) {
413 curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
414 }
415 if (has_send_len) {
416 curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
417 }
418 if (!verify_ssl) {
419 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
420 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
421 dout(20) << "ssl verification is set to off" << dendl;
422 }
423
424 CURLcode status = curl_easy_perform(curl_handle);
425 if (status) {
426 dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
427 ret = -EINVAL;
428 }
429 curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
430 handles->release_curl_handle(ca);
431 curl_slist_free_all(h);
432
433 return ret;
434 }
435
436 string RGWHTTPClient::to_str()
437 {
438 string method_str = (last_method.empty() ? "<no-method>" : last_method);
439 string url_str = (last_url.empty() ? "<no-url>" : last_url);
440 return method_str + " " + url_str;
441 }
442
443 int RGWHTTPClient::get_req_retcode()
444 {
445 if (!req_data) {
446 return -EINVAL;
447 }
448
449 return req_data->get_retcode();
450 }
451
452 /*
453 * init request, will be used later with RGWHTTPManager
454 */
455 int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint)
456 {
457 assert(!req_data);
458 _req_data->get();
459 req_data = _req_data;
460
461 CURL *easy_handle;
462
463 easy_handle = curl_easy_init();
464
465 req_data->easy_handle = easy_handle;
466
467 dout(20) << "sending request to " << url << dendl;
468
469 curl_slist *h = headers_to_slist(headers);
470
471 req_data->h = h;
472
473 last_method = (method ? method : "");
474 last_url = (url ? url : "");
475
476 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
477 curl_easy_setopt(easy_handle, CURLOPT_URL, url);
478 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
479 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
480 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
481 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
482 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
483 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
484 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
485 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
486 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
487 if (h) {
488 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
489 }
490 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
491 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
492 if (send_data_hint || is_upload_request(method)) {
493 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
494 }
495 if (has_send_len) {
496 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
497 }
498 if (!verify_ssl) {
499 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
500 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
501 dout(20) << "ssl verification is set to off" << dendl;
502 }
503 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
504
505 return 0;
506 }
507
508 /*
509 * wait for async request to complete
510 */
511 int RGWHTTPClient::wait()
512 {
513 if (!req_data->is_done()) {
514 return req_data->wait();
515 }
516
517 return req_data->ret;
518 }
519
520 RGWHTTPClient::~RGWHTTPClient()
521 {
522 if (req_data) {
523 RGWHTTPManager *http_manager = req_data->get_manager();
524 if (http_manager) {
525 http_manager->remove_request(this);
526 }
527
528 req_data->put();
529 }
530 }
531
532
533 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
534 {
535 const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
536
537 /* We're tokening the line that way due to backward compatibility. */
538 const size_t sep_loc = header_line.find_first_of(" \t:");
539
540 if (boost::string_ref::npos == sep_loc) {
541 /* Wrongly formatted header? Just skip it. */
542 return 0;
543 }
544
545 header_name_t name(header_line.substr(0, sep_loc));
546 if (0 == relevant_headers.count(name)) {
547 /* Not interested in this particular header. */
548 return 0;
549 }
550
551 const auto value_part = header_line.substr(sep_loc + 1);
552
553 /* Skip spaces and tabs after the separator. */
554 const size_t val_loc_s = value_part.find_first_not_of(' ');
555 const size_t val_loc_e = value_part.find_first_of("\r\n");
556
557 if (boost::string_ref::npos == val_loc_s ||
558 boost::string_ref::npos == val_loc_e) {
559 /* Empty value case. */
560 found_headers.emplace(name, header_value_t());
561 } else {
562 found_headers.emplace(name, header_value_t(
563 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
564 }
565
566 return 0;
567 }
568
569 int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
570 {
571 int length_to_copy = 0;
572 if (post_data_index < post_data.length()) {
573 length_to_copy = min(post_data.length() - post_data_index, len);
574 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
575 post_data_index += length_to_copy;
576 }
577 return length_to_copy;
578 }
579
580
581 static int clear_signal(int fd)
582 {
583 // since we're in non-blocking mode, we can try to read a lot more than
584 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
585 // are also required to support the curl_multi_wait bug workaround
586 std::array<char, 256> buf;
587 int ret = ::read(fd, (void *)buf.data(), buf.size());
588 if (ret < 0) {
589 ret = -errno;
590 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
591 }
592 return 0;
593 }
594
595 #if HAVE_CURL_MULTI_WAIT
596
597 static std::once_flag detect_flag;
598 static bool curl_multi_wait_bug_present = false;
599
600 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
601 int write_fd, int read_fd)
602 {
603 int ret = 0;
604
605 // write to write_fd so that read_fd becomes readable
606 uint32_t buf = 0;
607 ret = ::write(write_fd, &buf, sizeof(buf));
608 if (ret < 0) {
609 ret = -errno;
610 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
611 return ret;
612 }
613
614 // pass read_fd in extra_fds for curl_multi_wait()
615 int num_fds;
616 struct curl_waitfd wait_fd;
617
618 wait_fd.fd = read_fd;
619 wait_fd.events = CURL_WAIT_POLLIN;
620 wait_fd.revents = 0;
621
622 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
623 if (ret != CURLM_OK) {
624 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
625 return -EIO;
626 }
627
628 // curl_multi_wait should flag revents when extra_fd is readable. if it
629 // doesn't, the bug is present and we can't rely on revents
630 if (wait_fd.revents == 0) {
631 curl_multi_wait_bug_present = true;
632 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
633 "bug in curl_multi_wait(). enabling a workaround that may degrade "
634 "performance slightly." << dendl;
635 }
636
637 return clear_signal(read_fd);
638 }
639
640 static bool is_signaled(const curl_waitfd& wait_fd)
641 {
642 if (wait_fd.fd < 0) {
643 // no fd to signal
644 return false;
645 }
646
647 if (curl_multi_wait_bug_present) {
648 // we can't rely on revents, so we always return true if a wait_fd is given.
649 // this means we'll be trying a non-blocking read on this fd every time that
650 // curl_multi_wait() wakes up
651 return true;
652 }
653
654 return wait_fd.revents > 0;
655 }
656
657 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
658 {
659 int num_fds;
660 struct curl_waitfd wait_fd;
661
662 wait_fd.fd = signal_fd;
663 wait_fd.events = CURL_WAIT_POLLIN;
664 wait_fd.revents = 0;
665
666 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
667 if (ret) {
668 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
669 return -EIO;
670 }
671
672 if (is_signaled(wait_fd)) {
673 ret = clear_signal(signal_fd);
674 if (ret < 0) {
675 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
676 return ret;
677 }
678 }
679 return 0;
680 }
681
682 #else
683
684 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
685 {
686 fd_set fdread;
687 fd_set fdwrite;
688 fd_set fdexcep;
689 int maxfd = -1;
690
691 FD_ZERO(&fdread);
692 FD_ZERO(&fdwrite);
693 FD_ZERO(&fdexcep);
694
695 /* get file descriptors from the transfers */
696 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
697 if (ret) {
698 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
699 return -EIO;
700 }
701
702 if (signal_fd > 0) {
703 FD_SET(signal_fd, &fdread);
704 if (signal_fd >= maxfd) {
705 maxfd = signal_fd + 1;
706 }
707 }
708
709 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
710 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
711 #define RGW_CURL_TIMEOUT 1000
712 if (!to)
713 to = RGW_CURL_TIMEOUT;
714 struct timeval timeout;
715 timeout.tv_sec = to / 1000;
716 timeout.tv_usec = to % 1000;
717
718 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
719 if (ret < 0) {
720 ret = -errno;
721 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
722 return ret;
723 }
724
725 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
726 ret = clear_signal(signal_fd);
727 if (ret < 0) {
728 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
729 return ret;
730 }
731 }
732
733 return 0;
734 }
735
736 #endif
737
738 void *RGWHTTPManager::ReqsThread::entry()
739 {
740 manager->reqs_thread_entry();
741 return NULL;
742 }
743
744 /*
745 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
746 */
747 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
748 completion_mgr(_cm), is_threaded(false),
749 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
750 reqs_thread(NULL)
751 {
752 multi_handle = (void *)curl_multi_init();
753 thread_pipe[0] = -1;
754 thread_pipe[1] = -1;
755 }
756
757 RGWHTTPManager::~RGWHTTPManager() {
758 stop();
759 if (multi_handle)
760 curl_multi_cleanup((CURLM *)multi_handle);
761 }
762
763 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
764 {
765 RWLock::WLocker rl(reqs_lock);
766 req_data->id = num_reqs;
767 req_data->registered = true;
768 reqs[num_reqs] = req_data;
769 num_reqs++;
770 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
771 }
772
773 void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
774 {
775 RWLock::WLocker rl(reqs_lock);
776 req_data->get();
777 req_data->registered = false;
778 unregistered_reqs.push_back(req_data);
779 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
780 }
781
782 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
783 {
784 RWLock::WLocker rl(reqs_lock);
785 _complete_request(req_data);
786 }
787
788 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
789 {
790 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
791 if (iter != reqs.end()) {
792 reqs.erase(iter);
793 }
794 {
795 Mutex::Locker l(req_data->lock);
796 req_data->mgr = nullptr;
797 }
798 if (completion_mgr) {
799 completion_mgr->complete(NULL, req_data->user_info);
800 }
801
802 req_data->put();
803 }
804
805 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
806 {
807 req_data->finish(ret);
808 complete_request(req_data);
809 }
810
811 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
812 {
813 req_data->finish(ret);
814 _complete_request(req_data);
815 }
816
817 /*
818 * hook request to the curl multi handle
819 */
820 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
821 {
822 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
823 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
824 if (mstatus) {
825 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
826 return -EIO;
827 }
828 return 0;
829 }
830
831 /*
832 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
833 * there will be no more processing on this request
834 */
835 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
836 {
837 if (req_data->easy_handle) {
838 curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
839 }
840 if (!req_data->is_done()) {
841 _finish_request(req_data, -ECANCELED);
842 }
843 }
844
845 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
846 {
847 RWLock::WLocker wl(reqs_lock);
848 _unlink_request(req_data);
849 }
850
851 void RGWHTTPManager::manage_pending_requests()
852 {
853 reqs_lock.get_read();
854 if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
855 reqs_lock.unlock();
856 return;
857 }
858 reqs_lock.unlock();
859
860 RWLock::WLocker wl(reqs_lock);
861
862 if (!unregistered_reqs.empty()) {
863 for (auto& r : unregistered_reqs) {
864 _unlink_request(r);
865 r->put();
866 }
867
868 unregistered_reqs.clear();
869 }
870
871 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
872
873 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
874
875 for (; iter != reqs.end(); ++iter) {
876 rgw_http_req_data *req_data = iter->second;
877 int r = link_request(req_data);
878 if (r < 0) {
879 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
880 remove_reqs.push_back(std::make_pair(iter->second, r));
881 } else {
882 max_threaded_req = iter->first + 1;
883 }
884 }
885
886 for (auto piter : remove_reqs) {
887 rgw_http_req_data *req_data = piter.first;
888 int r = piter.second;
889
890 _finish_request(req_data, r);
891 }
892 }
893
894 int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint)
895 {
896 rgw_http_req_data *req_data = new rgw_http_req_data;
897
898 int ret = client->init_request(method, url, req_data, send_data_hint);
899 if (ret < 0) {
900 req_data->put();
901 req_data = NULL;
902 return ret;
903 }
904
905 req_data->mgr = this;
906 req_data->client = client;
907 req_data->user_info = client->get_user_info();
908
909 register_request(req_data);
910
911 if (!is_threaded) {
912 ret = link_request(req_data);
913 if (ret < 0) {
914 req_data->put();
915 req_data = NULL;
916 }
917 return ret;
918 }
919 ret = signal_thread();
920 if (ret < 0) {
921 finish_request(req_data, ret);
922 }
923
924 return ret;
925 }
926
927 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
928 {
929 rgw_http_req_data *req_data = client->get_req_data();
930
931 if (!is_threaded) {
932 unlink_request(req_data);
933 return 0;
934 }
935 unregister_request(req_data);
936 int ret = signal_thread();
937 if (ret < 0) {
938 return ret;
939 }
940
941 return 0;
942 }
943
944 /*
945 * the synchronous, non-threaded request processing method.
946 */
947 int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
948 {
949 assert(!is_threaded);
950
951 int still_running;
952 int mstatus;
953
954 do {
955 if (wait_for_data) {
956 int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
957 if (ret < 0) {
958 return ret;
959 }
960 }
961
962 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
963 switch (mstatus) {
964 case CURLM_OK:
965 case CURLM_CALL_MULTI_PERFORM:
966 break;
967 default:
968 dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
969 return -EINVAL;
970 }
971 int msgs_left;
972 CURLMsg *msg;
973 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
974 if (msg->msg == CURLMSG_DONE) {
975 CURL *e = msg->easy_handle;
976 rgw_http_req_data *req_data;
977 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
978
979 long http_status;
980 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
981
982 int status = rgw_http_error_to_errno(http_status);
983 int result = msg->data.result;
984 finish_request(req_data, status);
985 switch (result) {
986 case CURLE_OK:
987 break;
988 default:
989 dout(20) << "ERROR: msg->data.result=" << result << dendl;
990 return -EIO;
991 }
992 }
993 }
994 } while (mstatus == CURLM_CALL_MULTI_PERFORM);
995
996 *done = (still_running == 0);
997
998 return 0;
999 }
1000
1001 /*
1002 * the synchronous, non-threaded request processing completion method.
1003 */
1004 int RGWHTTPManager::complete_requests()
1005 {
1006 bool done = false;
1007 int ret;
1008 do {
1009 ret = process_requests(true, &done);
1010 } while (!done && !ret);
1011
1012 return ret;
1013 }
1014
1015 int RGWHTTPManager::set_threaded()
1016 {
1017 if (pipe_cloexec(thread_pipe) < 0) {
1018 int e = errno;
1019 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1020 return -e;
1021 }
1022
1023 // enable non-blocking reads
1024 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1025 int e = errno;
1026 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1027 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1028 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1029 return -e;
1030 }
1031
1032 #ifdef HAVE_CURL_MULTI_WAIT
1033 // on first initialization, use this pipe to detect whether we're using a
1034 // buggy version of libcurl
1035 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1036 static_cast<CURLM*>(multi_handle),
1037 thread_pipe[1], thread_pipe[0]);
1038 #endif
1039
1040 is_threaded = true;
1041 reqs_thread = new ReqsThread(this);
1042 reqs_thread->create("http_manager");
1043 return 0;
1044 }
1045
1046 void RGWHTTPManager::stop()
1047 {
1048 if (is_stopped) {
1049 return;
1050 }
1051
1052 is_stopped = true;
1053
1054 if (is_threaded) {
1055 going_down = true;
1056 signal_thread();
1057 reqs_thread->join();
1058 delete reqs_thread;
1059 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1060 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1061 }
1062 }
1063
1064 int RGWHTTPManager::signal_thread()
1065 {
1066 uint32_t buf = 0;
1067 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1068 if (ret < 0) {
1069 ret = -errno;
1070 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1071 return ret;
1072 }
1073 return 0;
1074 }
1075
1076 void *RGWHTTPManager::reqs_thread_entry()
1077 {
1078 int still_running;
1079 int mstatus;
1080
1081 ldout(cct, 20) << __func__ << ": start" << dendl;
1082
1083 while (!going_down) {
1084 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1085 if (ret < 0) {
1086 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1087 return NULL;
1088 }
1089
1090 manage_pending_requests();
1091
1092 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1093 switch (mstatus) {
1094 case CURLM_OK:
1095 case CURLM_CALL_MULTI_PERFORM:
1096 break;
1097 default:
1098 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1099 break;
1100 }
1101 int msgs_left;
1102 CURLMsg *msg;
1103 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1104 if (msg->msg == CURLMSG_DONE) {
1105 int result = msg->data.result;
1106 CURL *e = msg->easy_handle;
1107 rgw_http_req_data *req_data;
1108 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1109 curl_multi_remove_handle((CURLM *)multi_handle, e);
1110
1111 long http_status;
1112 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1113
1114 int status = rgw_http_error_to_errno(http_status);
1115 if (result != CURLE_OK && http_status == 0) {
1116 status = -EAGAIN;
1117 }
1118 int id = req_data->id;
1119 finish_request(req_data, status);
1120 switch (result) {
1121 case CURLE_OK:
1122 break;
1123 case CURLE_OPERATION_TIMEDOUT:
1124 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1125 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1126 default:
1127 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1128 break;
1129 }
1130 }
1131 }
1132 }
1133
1134
1135 RWLock::WLocker rl(reqs_lock);
1136 for (auto r : unregistered_reqs) {
1137 _unlink_request(r);
1138 }
1139
1140 unregistered_reqs.clear();
1141
1142 auto all_reqs = std::move(reqs);
1143 for (auto iter : all_reqs) {
1144 _unlink_request(iter.second);
1145 }
1146
1147 reqs.clear();
1148
1149 if (completion_mgr) {
1150 completion_mgr->go_down();
1151 }
1152
1153 return 0;
1154 }
1155
1156