]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
bump version to 16.2.6-pve2
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7
8 #include <curl/curl.h>
9 #include <curl/easy.h>
10 #include <curl/multi.h>
11
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/async/completion.h"
16 #include "common/RefCountedObj.h"
17
18 #include "rgw_coroutine.h"
19 #include "rgw_tools.h"
20
21 #include <atomic>
22 #include <string_view>
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rgw
26
27 RGWHTTPManager *rgw_http_manager;
28
29 struct RGWCurlHandle;
30
31 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
32
33 struct rgw_http_req_data : public RefCountedObject {
34 RGWCurlHandle *curl_handle{nullptr};
35 curl_slist *h{nullptr};
36 uint64_t id;
37 int ret{0};
38 std::atomic<bool> done = { false };
39 RGWHTTPClient *client{nullptr};
40 rgw_io_id control_io_id;
41 void *user_info{nullptr};
42 bool registered{false};
43 RGWHTTPManager *mgr{nullptr};
44 char error_buf[CURL_ERROR_SIZE];
45 bool write_paused{false};
46 bool read_paused{false};
47
48 optional<int> user_ret;
49
50 ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
51 ceph::condition_variable cond;
52
53 using Signature = void(boost::system::error_code);
54 using Completion = ceph::async::Completion<Signature>;
55 std::unique_ptr<Completion> completion;
56
57 rgw_http_req_data() : id(-1) {
58 // FIPS zeroization audit 20191115: this memset is not security related.
59 memset(error_buf, 0, sizeof(error_buf));
60 }
61
62 template <typename ExecutionContext, typename CompletionToken>
63 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
64 boost::asio::async_completion<CompletionToken, Signature> init(token);
65 auto& handler = init.completion_handler;
66 {
67 std::unique_lock l{lock};
68 completion = Completion::create(ctx.get_executor(), std::move(handler));
69 }
70 return init.result.get();
71 }
72
73 int wait(optional_yield y) {
74 if (done) {
75 return ret;
76 }
77 if (y) {
78 auto& context = y.get_io_context();
79 auto& yield = y.get_yield_context();
80 boost::system::error_code ec;
81 async_wait(context, yield[ec]);
82 return -ec.value();
83 }
84 // work on asio threads should be asynchronous, so warn when they block
85 if (is_asio_thread) {
86 dout(20) << "WARNING: blocking http request" << dendl;
87 }
88 std::unique_lock l{lock};
89 cond.wait(l, [this]{return done==true;});
90 return ret;
91 }
92
93 void set_state(int bitmask);
94
95 void finish(int r, long http_status = -1) {
96 std::lock_guard l{lock};
97 if (http_status != -1) {
98 if (client) {
99 client->set_http_status(http_status);
100 }
101 }
102 ret = r;
103 if (curl_handle)
104 do_curl_easy_cleanup(curl_handle);
105
106 if (h)
107 curl_slist_free_all(h);
108
109 curl_handle = NULL;
110 h = NULL;
111 done = true;
112 if (completion) {
113 boost::system::error_code ec(-ret, boost::system::system_category());
114 Completion::post(std::move(completion), ec);
115 } else {
116 cond.notify_all();
117 }
118 }
119
120 bool is_done() {
121 return done;
122 }
123
124 int get_retcode() {
125 std::lock_guard l{lock};
126 return ret;
127 }
128
129 RGWHTTPManager *get_manager() {
130 std::lock_guard l{lock};
131 return mgr;
132 }
133
134 CURL *get_easy_handle() const;
135 };
136
137 struct RGWCurlHandle {
138 int uses;
139 mono_time lastuse;
140 CURL* h;
141
142 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
143 CURL* operator*() {
144 return this->h;
145 }
146 };
147
148 void rgw_http_req_data::set_state(int bitmask) {
149 /* no need to lock here, moreover curl_easy_pause() might trigger
150 * the data receive callback :/
151 */
152 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
153 if (rc != CURLE_OK) {
154 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
155 }
156 }
157
158 #define MAXIDLE 5
159 class RGWCurlHandles : public Thread {
160 public:
161 ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
162 std::vector<RGWCurlHandle*> saved_curl;
163 int cleaner_shutdown;
164 ceph::condition_variable cleaner_cond;
165
166 RGWCurlHandles() :
167 cleaner_shutdown{0} {
168 }
169
170 RGWCurlHandle* get_curl_handle();
171 void release_curl_handle_now(RGWCurlHandle* curl);
172 void release_curl_handle(RGWCurlHandle* curl);
173 void flush_curl_handles();
174 void* entry();
175 void stop();
176 };
177
178 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
179 RGWCurlHandle* curl = 0;
180 CURL* h;
181 {
182 std::lock_guard lock{cleaner_lock};
183 if (!saved_curl.empty()) {
184 curl = *saved_curl.begin();
185 saved_curl.erase(saved_curl.begin());
186 }
187 }
188 if (curl) {
189 } else if ((h = curl_easy_init())) {
190 curl = new RGWCurlHandle{h};
191 } else {
192 // curl = 0;
193 }
194 return curl;
195 }
196
197 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
198 {
199 curl_easy_cleanup(**curl);
200 delete curl;
201 }
202
203 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
204 {
205 if (cleaner_shutdown) {
206 release_curl_handle_now(curl);
207 } else {
208 curl_easy_reset(**curl);
209 std::lock_guard lock{cleaner_lock};
210 curl->lastuse = mono_clock::now();
211 saved_curl.insert(saved_curl.begin(), 1, curl);
212 }
213 }
214
215 void* RGWCurlHandles::entry()
216 {
217 RGWCurlHandle* curl;
218 std::unique_lock lock{cleaner_lock};
219
220 for (;;) {
221 if (cleaner_shutdown) {
222 if (saved_curl.empty())
223 break;
224 } else {
225 cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
226 }
227 mono_time now = mono_clock::now();
228 while (!saved_curl.empty()) {
229 auto cend = saved_curl.end();
230 --cend;
231 curl = *cend;
232 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
233 break;
234 saved_curl.erase(cend);
235 release_curl_handle_now(curl);
236 }
237 }
238 return nullptr;
239 }
240
241 void RGWCurlHandles::stop()
242 {
243 std::lock_guard lock{cleaner_lock};
244 cleaner_shutdown = 1;
245 cleaner_cond.notify_all();
246 }
247
248 void RGWCurlHandles::flush_curl_handles()
249 {
250 stop();
251 join();
252 if (!saved_curl.empty()) {
253 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
254 }
255 saved_curl.shrink_to_fit();
256 }
257
258 CURL *rgw_http_req_data::get_easy_handle() const
259 {
260 return **curl_handle;
261 }
262
263 static RGWCurlHandles *handles;
264
265 static RGWCurlHandle *do_curl_easy_init()
266 {
267 return handles->get_curl_handle();
268 }
269
270 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
271 {
272 handles->release_curl_handle(curl_handle);
273 }
274
275 // XXX make this part of the token cache? (but that's swift-only;
276 // and this especially needs to integrates with s3...)
277
278 void rgw_setup_saved_curl_handles()
279 {
280 handles = new RGWCurlHandles();
281 handles->create("rgw_curl");
282 }
283
284 void rgw_release_all_curl_handles()
285 {
286 handles->flush_curl_handles();
287 delete handles;
288 }
289
290 void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
291 {
292 if (id == 0) {
293 id = io_id_provider.get_next();
294 }
295 }
296
297 /*
298 * the following set of callbacks will be called either on RGWHTTPManager::process(),
299 * or via the RGWHTTPManager async processing.
300 */
301 size_t RGWHTTPClient::receive_http_header(void * const ptr,
302 const size_t size,
303 const size_t nmemb,
304 void * const _info)
305 {
306 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
307 size_t len = size * nmemb;
308
309 std::lock_guard l{req_data->lock};
310
311 if (!req_data->registered) {
312 return len;
313 }
314
315 int ret = req_data->client->receive_header(ptr, size * nmemb);
316 if (ret < 0) {
317 dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
318 req_data->user_ret = ret;
319 return CURLE_WRITE_ERROR;
320 }
321
322 return len;
323 }
324
325 size_t RGWHTTPClient::receive_http_data(void * const ptr,
326 const size_t size,
327 const size_t nmemb,
328 void * const _info)
329 {
330 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
331 size_t len = size * nmemb;
332
333 bool pause = false;
334
335 RGWHTTPClient *client;
336
337 {
338 std::lock_guard l{req_data->lock};
339 if (!req_data->registered) {
340 return len;
341 }
342
343 client = req_data->client;
344 }
345
346 size_t& skip_bytes = client->receive_pause_skip;
347
348 if (skip_bytes >= len) {
349 skip_bytes -= len;
350 return len;
351 }
352
353 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
354 if (ret < 0) {
355 dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
356 req_data->user_ret = ret;
357 return CURLE_WRITE_ERROR;
358 }
359
360 if (pause) {
361 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
362 skip_bytes = len;
363 std::lock_guard l{req_data->lock};
364 req_data->read_paused = true;
365 return CURL_WRITEFUNC_PAUSE;
366 }
367
368 skip_bytes = 0;
369
370 return len;
371 }
372
373 size_t RGWHTTPClient::send_http_data(void * const ptr,
374 const size_t size,
375 const size_t nmemb,
376 void * const _info)
377 {
378 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
379
380 RGWHTTPClient *client;
381
382 {
383 std::lock_guard l{req_data->lock};
384
385 if (!req_data->registered) {
386 return 0;
387 }
388
389 client = req_data->client;
390 }
391
392 bool pause = false;
393
394 int ret = client->send_data(ptr, size * nmemb, &pause);
395 if (ret < 0) {
396 dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl;
397 req_data->user_ret = ret;
398 return CURLE_READ_ERROR;
399 }
400
401 if (ret == 0 &&
402 pause) {
403 std::lock_guard l{req_data->lock};
404 req_data->write_paused = true;
405 return CURL_READFUNC_PAUSE;
406 }
407
408 return ret;
409 }
410
411 ceph::mutex& RGWHTTPClient::get_req_lock()
412 {
413 return req_data->lock;
414 }
415
416 void RGWHTTPClient::_set_write_paused(bool pause)
417 {
418 ceph_assert(ceph_mutex_is_locked(req_data->lock));
419
420 RGWHTTPManager *mgr = req_data->mgr;
421 if (pause == req_data->write_paused) {
422 return;
423 }
424 if (pause) {
425 mgr->set_request_state(this, SET_WRITE_PAUSED);
426 } else {
427 mgr->set_request_state(this, SET_WRITE_RESUME);
428 }
429 }
430
431 void RGWHTTPClient::_set_read_paused(bool pause)
432 {
433 ceph_assert(ceph_mutex_is_locked(req_data->lock));
434
435 RGWHTTPManager *mgr = req_data->mgr;
436 if (pause == req_data->read_paused) {
437 return;
438 }
439 if (pause) {
440 mgr->set_request_state(this, SET_READ_PAUSED);
441 } else {
442 mgr->set_request_state(this, SET_READ_RESUME);
443 }
444 }
445
446 static curl_slist *headers_to_slist(param_vec_t& headers)
447 {
448 curl_slist *h = NULL;
449
450 param_vec_t::iterator iter;
451 for (iter = headers.begin(); iter != headers.end(); ++iter) {
452 pair<string, string>& p = *iter;
453 string val = p.first;
454
455 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
456 val = val.substr(5);
457 }
458
459 /* we need to convert all underscores into dashes as some web servers forbid them
460 * in the http header field names
461 */
462 for (size_t i = 0; i < val.size(); i++) {
463 if (val[i] == '_') {
464 val[i] = '-';
465 }
466 }
467
468 val = camelcase_dash_http_attr(val);
469
470 // curl won't send headers with empty values unless it ends with a ; instead
471 if (p.second.empty()) {
472 val.append(1, ';');
473 } else {
474 val.append(": ");
475 val.append(p.second);
476 }
477 h = curl_slist_append(h, val.c_str());
478 }
479
480 return h;
481 }
482
483 static bool is_upload_request(const string& method)
484 {
485 return method == "POST" || method == "PUT";
486 }
487
488 /*
489 * process a single simple one off request
490 */
491 int RGWHTTPClient::process(optional_yield y)
492 {
493 return RGWHTTP::process(this, y);
494 }
495
496 string RGWHTTPClient::to_str()
497 {
498 string method_str = (method.empty() ? "<no-method>" : method);
499 string url_str = (url.empty() ? "<no-url>" : url);
500 return method_str + " " + url_str;
501 }
502
503 int RGWHTTPClient::get_req_retcode()
504 {
505 if (!req_data) {
506 return -EINVAL;
507 }
508
509 return req_data->get_retcode();
510 }
511
512 /*
513 * init request, will be used later with RGWHTTPManager
514 */
515 int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
516 {
517 ceph_assert(!req_data);
518 _req_data->get();
519 req_data = _req_data;
520
521 req_data->curl_handle = do_curl_easy_init();
522
523 CURL *easy_handle = req_data->get_easy_handle();
524
525 dout(20) << "sending request to " << url << dendl;
526
527 curl_slist *h = headers_to_slist(headers);
528
529 req_data->h = h;
530
531 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
532 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
533 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
534 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
535 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
536 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
537 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
538 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
539 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
540 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
541 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
542 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
543 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
544 curl_easy_setopt(easy_handle, CURLOPT_BUFFERSIZE, cct->_conf->rgw_curl_buffersize);
545 if (send_data_hint || is_upload_request(method)) {
546 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
547 }
548 if (has_send_len) {
549 // TODO: prevent overflow by using curl_off_t
550 // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
551 const long size = send_len;
552 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
553 if (method == "POST") {
554 curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size);
555 // TODO: set to size smaller than 1MB should prevent the "Expect" field
556 // from being sent. So explicit removal is not needed
557 h = curl_slist_append(h, "Expect:");
558 }
559 }
560 if (h) {
561 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
562 }
563 if (!verify_ssl) {
564 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
565 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
566 dout(20) << "ssl verification is set to off" << dendl;
567 } else {
568 if (!ca_path.empty()) {
569 curl_easy_setopt(easy_handle, CURLOPT_CAINFO, ca_path.c_str());
570 dout(20) << "using custom ca cert "<< ca_path.c_str() << " for ssl" << dendl;
571 }
572 if (!client_cert.empty()) {
573 if (!client_key.empty()) {
574 curl_easy_setopt(easy_handle, CURLOPT_SSLCERT, client_cert.c_str());
575 curl_easy_setopt(easy_handle, CURLOPT_SSLKEY, client_key.c_str());
576 dout(20) << "using custom client cert " << client_cert.c_str()
577 << " and private key " << client_key.c_str() << dendl;
578 } else {
579 dout(5) << "private key is missing for client certificate" << dendl;
580 }
581 }
582 }
583 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
584 curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);
585
586 return 0;
587 }
588
589 bool RGWHTTPClient::is_done()
590 {
591 return req_data->is_done();
592 }
593
594 /*
595 * wait for async request to complete
596 */
597 int RGWHTTPClient::wait(optional_yield y)
598 {
599 return req_data->wait(y);
600 }
601
602 void RGWHTTPClient::cancel()
603 {
604 if (req_data) {
605 RGWHTTPManager *http_manager = req_data->mgr;
606 if (http_manager) {
607 http_manager->remove_request(this);
608 }
609 }
610 }
611
612 RGWHTTPClient::~RGWHTTPClient()
613 {
614 cancel();
615 if (req_data) {
616 req_data->put();
617 }
618 }
619
620
621 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
622 {
623 const std::string_view header_line(static_cast<const char *>(ptr), len);
624
625 /* We're tokening the line that way due to backward compatibility. */
626 const size_t sep_loc = header_line.find_first_of(" \t:");
627
628 if (std::string_view::npos == sep_loc) {
629 /* Wrongly formatted header? Just skip it. */
630 return 0;
631 }
632
633 header_name_t name(header_line.substr(0, sep_loc));
634 if (0 == relevant_headers.count(name)) {
635 /* Not interested in this particular header. */
636 return 0;
637 }
638
639 const auto value_part = header_line.substr(sep_loc + 1);
640
641 /* Skip spaces and tabs after the separator. */
642 const size_t val_loc_s = value_part.find_first_not_of(' ');
643 const size_t val_loc_e = value_part.find_first_of("\r\n");
644
645 if (std::string_view::npos == val_loc_s ||
646 std::string_view::npos == val_loc_e) {
647 /* Empty value case. */
648 found_headers.emplace(name, header_value_t());
649 } else {
650 found_headers.emplace(name, header_value_t(
651 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
652 }
653
654 return 0;
655 }
656
657 int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
658 {
659 int length_to_copy = 0;
660 if (post_data_index < post_data.length()) {
661 length_to_copy = min(post_data.length() - post_data_index, len);
662 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
663 post_data_index += length_to_copy;
664 }
665 return length_to_copy;
666 }
667
668
669 static int clear_signal(int fd)
670 {
671 // since we're in non-blocking mode, we can try to read a lot more than
672 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
673 // are also required to support the curl_multi_wait bug workaround
674 std::array<char, 256> buf;
675 int ret = ::read(fd, (void *)buf.data(), buf.size());
676 if (ret < 0) {
677 ret = -errno;
678 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
679 }
680 return 0;
681 }
682
683 #if HAVE_CURL_MULTI_WAIT
684
685 static std::once_flag detect_flag;
686 static bool curl_multi_wait_bug_present = false;
687
688 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
689 int write_fd, int read_fd)
690 {
691 int ret = 0;
692
693 // write to write_fd so that read_fd becomes readable
694 uint32_t buf = 0;
695 ret = ::write(write_fd, &buf, sizeof(buf));
696 if (ret < 0) {
697 ret = -errno;
698 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
699 return ret;
700 }
701
702 // pass read_fd in extra_fds for curl_multi_wait()
703 int num_fds;
704 struct curl_waitfd wait_fd;
705
706 wait_fd.fd = read_fd;
707 wait_fd.events = CURL_WAIT_POLLIN;
708 wait_fd.revents = 0;
709
710 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
711 if (ret != CURLM_OK) {
712 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
713 return -EIO;
714 }
715
716 // curl_multi_wait should flag revents when extra_fd is readable. if it
717 // doesn't, the bug is present and we can't rely on revents
718 if (wait_fd.revents == 0) {
719 curl_multi_wait_bug_present = true;
720 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
721 "bug in curl_multi_wait(). enabling a workaround that may degrade "
722 "performance slightly." << dendl;
723 }
724
725 return clear_signal(read_fd);
726 }
727
728 static bool is_signaled(const curl_waitfd& wait_fd)
729 {
730 if (wait_fd.fd < 0) {
731 // no fd to signal
732 return false;
733 }
734
735 if (curl_multi_wait_bug_present) {
736 // we can't rely on revents, so we always return true if a wait_fd is given.
737 // this means we'll be trying a non-blocking read on this fd every time that
738 // curl_multi_wait() wakes up
739 return true;
740 }
741
742 return wait_fd.revents > 0;
743 }
744
745 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
746 {
747 int num_fds;
748 struct curl_waitfd wait_fd;
749
750 wait_fd.fd = signal_fd;
751 wait_fd.events = CURL_WAIT_POLLIN;
752 wait_fd.revents = 0;
753
754 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
755 if (ret) {
756 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
757 return -EIO;
758 }
759
760 if (is_signaled(wait_fd)) {
761 ret = clear_signal(signal_fd);
762 if (ret < 0) {
763 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
764 return ret;
765 }
766 }
767 return 0;
768 }
769
770 #else
771
772 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
773 {
774 fd_set fdread;
775 fd_set fdwrite;
776 fd_set fdexcep;
777 int maxfd = -1;
778
779 FD_ZERO(&fdread);
780 FD_ZERO(&fdwrite);
781 FD_ZERO(&fdexcep);
782
783 /* get file descriptors from the transfers */
784 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
785 if (ret) {
786 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
787 return -EIO;
788 }
789
790 if (signal_fd > 0) {
791 FD_SET(signal_fd, &fdread);
792 if (signal_fd >= maxfd) {
793 maxfd = signal_fd + 1;
794 }
795 }
796
797 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
798 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
799 #define RGW_CURL_TIMEOUT 1000
800 if (!to)
801 to = RGW_CURL_TIMEOUT;
802 struct timeval timeout;
803 timeout.tv_sec = to / 1000;
804 timeout.tv_usec = to % 1000;
805
806 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
807 if (ret < 0) {
808 ret = -errno;
809 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
810 return ret;
811 }
812
813 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
814 ret = clear_signal(signal_fd);
815 if (ret < 0) {
816 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
817 return ret;
818 }
819 }
820
821 return 0;
822 }
823
824 #endif
825
826 void *RGWHTTPManager::ReqsThread::entry()
827 {
828 manager->reqs_thread_entry();
829 return NULL;
830 }
831
832 /*
833 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
834 */
835 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
836 completion_mgr(_cm)
837 {
838 multi_handle = (void *)curl_multi_init();
839 thread_pipe[0] = -1;
840 thread_pipe[1] = -1;
841 }
842
843 RGWHTTPManager::~RGWHTTPManager() {
844 stop();
845 if (multi_handle)
846 curl_multi_cleanup((CURLM *)multi_handle);
847 }
848
849 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
850 {
851 std::unique_lock rl{reqs_lock};
852 req_data->id = num_reqs;
853 req_data->registered = true;
854 reqs[num_reqs] = req_data;
855 num_reqs++;
856 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
857 }
858
859 bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
860 {
861 std::unique_lock rl{reqs_lock};
862 if (!req_data->registered) {
863 return false;
864 }
865 req_data->get();
866 req_data->registered = false;
867 unregistered_reqs.push_back(req_data);
868 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
869 return true;
870 }
871
872 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
873 {
874 std::unique_lock rl{reqs_lock};
875 _complete_request(req_data);
876 }
877
878 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
879 {
880 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
881 if (iter != reqs.end()) {
882 reqs.erase(iter);
883 }
884 {
885 std::lock_guard l{req_data->lock};
886 req_data->mgr = nullptr;
887 }
888 if (completion_mgr) {
889 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
890 }
891
892 req_data->put();
893 }
894
895 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status)
896 {
897 req_data->finish(ret, http_status);
898 complete_request(req_data);
899 }
900
901 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
902 {
903 req_data->finish(ret);
904 _complete_request(req_data);
905 }
906
907 void RGWHTTPManager::_set_req_state(set_state& ss)
908 {
909 ss.req->set_state(ss.bitmask);
910 }
911 /*
912 * hook request to the curl multi handle
913 */
914 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
915 {
916 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
917 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
918 if (mstatus) {
919 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
920 return -EIO;
921 }
922 return 0;
923 }
924
925 /*
926 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
927 * there will be no more processing on this request
928 */
929 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
930 {
931 if (req_data->curl_handle) {
932 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
933 }
934 if (!req_data->is_done()) {
935 _finish_request(req_data, -ECANCELED);
936 }
937 }
938
939 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
940 {
941 std::unique_lock wl{reqs_lock};
942 _unlink_request(req_data);
943 }
944
945 void RGWHTTPManager::manage_pending_requests()
946 {
947 reqs_lock.lock_shared();
948 if (max_threaded_req == num_reqs &&
949 unregistered_reqs.empty() &&
950 reqs_change_state.empty()) {
951 reqs_lock.unlock_shared();
952 return;
953 }
954 reqs_lock.unlock_shared();
955
956 std::unique_lock wl{reqs_lock};
957
958 if (!reqs_change_state.empty()) {
959 for (auto siter : reqs_change_state) {
960 _set_req_state(siter);
961 }
962 reqs_change_state.clear();
963 }
964
965 if (!unregistered_reqs.empty()) {
966 for (auto& r : unregistered_reqs) {
967 _unlink_request(r);
968 r->put();
969 }
970
971 unregistered_reqs.clear();
972 }
973
974 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
975
976 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
977
978 for (; iter != reqs.end(); ++iter) {
979 rgw_http_req_data *req_data = iter->second;
980 int r = link_request(req_data);
981 if (r < 0) {
982 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
983 remove_reqs.push_back(std::make_pair(iter->second, r));
984 } else {
985 max_threaded_req = iter->first + 1;
986 }
987 }
988
989 for (auto piter : remove_reqs) {
990 rgw_http_req_data *req_data = piter.first;
991 int r = piter.second;
992
993 _finish_request(req_data, r);
994 }
995 }
996
997 int RGWHTTPManager::add_request(RGWHTTPClient *client)
998 {
999 rgw_http_req_data *req_data = new rgw_http_req_data;
1000
1001 int ret = client->init_request(req_data);
1002 if (ret < 0) {
1003 req_data->put();
1004 req_data = NULL;
1005 return ret;
1006 }
1007
1008 req_data->mgr = this;
1009 req_data->client = client;
1010 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
1011 req_data->user_info = client->get_io_user_info();
1012
1013 register_request(req_data);
1014
1015 if (!is_started) {
1016 ret = link_request(req_data);
1017 if (ret < 0) {
1018 req_data->put();
1019 req_data = NULL;
1020 }
1021 return ret;
1022 }
1023 ret = signal_thread();
1024 if (ret < 0) {
1025 finish_request(req_data, ret);
1026 }
1027
1028 return ret;
1029 }
1030
1031 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
1032 {
1033 rgw_http_req_data *req_data = client->get_req_data();
1034
1035 if (!is_started) {
1036 unlink_request(req_data);
1037 return 0;
1038 }
1039 if (!unregister_request(req_data)) {
1040 return 0;
1041 }
1042 int ret = signal_thread();
1043 if (ret < 0) {
1044 return ret;
1045 }
1046
1047 return 0;
1048 }
1049
1050 int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
1051 {
1052 rgw_http_req_data *req_data = client->get_req_data();
1053
1054 ceph_assert(ceph_mutex_is_locked(req_data->lock));
1055
1056 /* can only do that if threaded */
1057 if (!is_started) {
1058 return -EINVAL;
1059 }
1060
1061 bool suggested_wr_paused = req_data->write_paused;
1062 bool suggested_rd_paused = req_data->read_paused;
1063
1064 switch (state) {
1065 case SET_WRITE_PAUSED:
1066 suggested_wr_paused = true;
1067 break;
1068 case SET_WRITE_RESUME:
1069 suggested_wr_paused = false;
1070 break;
1071 case SET_READ_PAUSED:
1072 suggested_rd_paused = true;
1073 break;
1074 case SET_READ_RESUME:
1075 suggested_rd_paused = false;
1076 break;
1077 default:
1078 /* shouldn't really be here */
1079 return -EIO;
1080 }
1081 if (suggested_wr_paused == req_data->write_paused &&
1082 suggested_rd_paused == req_data->read_paused) {
1083 return 0;
1084 }
1085
1086 req_data->write_paused = suggested_wr_paused;
1087 req_data->read_paused = suggested_rd_paused;
1088
1089 int bitmask = CURLPAUSE_CONT;
1090
1091 if (req_data->write_paused) {
1092 bitmask |= CURLPAUSE_SEND;
1093 }
1094
1095 if (req_data->read_paused) {
1096 bitmask |= CURLPAUSE_RECV;
1097 }
1098
1099 reqs_change_state.push_back(set_state(req_data, bitmask));
1100 int ret = signal_thread();
1101 if (ret < 0) {
1102 return ret;
1103 }
1104
1105 return 0;
1106 }
1107
1108 int RGWHTTPManager::start()
1109 {
1110 if (pipe_cloexec(thread_pipe, 0) < 0) {
1111 int e = errno;
1112 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1113 return -e;
1114 }
1115
1116 // enable non-blocking reads
1117 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1118 int e = errno;
1119 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1120 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1121 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1122 return -e;
1123 }
1124
1125 #ifdef HAVE_CURL_MULTI_WAIT
1126 // on first initialization, use this pipe to detect whether we're using a
1127 // buggy version of libcurl
1128 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
1129 static_cast<CURLM*>(multi_handle),
1130 thread_pipe[1], thread_pipe[0]);
1131 #endif
1132
1133 is_started = true;
1134 reqs_thread = new ReqsThread(this);
1135 reqs_thread->create("http_manager");
1136 return 0;
1137 }
1138
1139 void RGWHTTPManager::stop()
1140 {
1141 if (is_stopped) {
1142 return;
1143 }
1144
1145 is_stopped = true;
1146
1147 if (is_started) {
1148 going_down = true;
1149 signal_thread();
1150 reqs_thread->join();
1151 delete reqs_thread;
1152 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1153 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1154 }
1155 }
1156
1157 int RGWHTTPManager::signal_thread()
1158 {
1159 uint32_t buf = 0;
1160 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1161 if (ret < 0) {
1162 ret = -errno;
1163 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1164 return ret;
1165 }
1166 return 0;
1167 }
1168
1169 void *RGWHTTPManager::reqs_thread_entry()
1170 {
1171 int still_running;
1172 int mstatus;
1173
1174 ldout(cct, 20) << __func__ << ": start" << dendl;
1175
1176 while (!going_down) {
1177 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1178 if (ret < 0) {
1179 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1180 return NULL;
1181 }
1182
1183 manage_pending_requests();
1184
1185 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1186 switch (mstatus) {
1187 case CURLM_OK:
1188 case CURLM_CALL_MULTI_PERFORM:
1189 break;
1190 default:
1191 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1192 break;
1193 }
1194 int msgs_left;
1195 CURLMsg *msg;
1196 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1197 if (msg->msg == CURLMSG_DONE) {
1198 int result = msg->data.result;
1199 CURL *e = msg->easy_handle;
1200 rgw_http_req_data *req_data;
1201 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1202 curl_multi_remove_handle((CURLM *)multi_handle, e);
1203
1204 long http_status;
1205 int status;
1206 if (!req_data->user_ret) {
1207 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1208
1209 status = rgw_http_error_to_errno(http_status);
1210 if (result != CURLE_OK && status == 0) {
1211 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1212 status = -EAGAIN;
1213 }
1214 } else {
1215 status = *req_data->user_ret;
1216 rgw_err err;
1217 set_req_state_err(err, status, 0);
1218 http_status = err.http_ret;
1219 }
1220 int id = req_data->id;
1221 finish_request(req_data, status, http_status);
1222 switch (result) {
1223 case CURLE_OK:
1224 break;
1225 case CURLE_OPERATION_TIMEDOUT:
1226 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1227 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1228 default:
1229 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1230 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << " req_data->error_buf=" << req_data->error_buf << dendl;
1231 break;
1232 }
1233 }
1234 }
1235 }
1236
1237
1238 std::unique_lock rl{reqs_lock};
1239 for (auto r : unregistered_reqs) {
1240 _unlink_request(r);
1241 }
1242
1243 unregistered_reqs.clear();
1244
1245 auto all_reqs = std::move(reqs);
1246 for (auto iter : all_reqs) {
1247 _unlink_request(iter.second);
1248 }
1249
1250 reqs.clear();
1251
1252 if (completion_mgr) {
1253 completion_mgr->go_down();
1254 }
1255
1256 return 0;
1257 }
1258
1259 void rgw_http_client_init(CephContext *cct)
1260 {
1261 curl_global_init(CURL_GLOBAL_ALL);
1262 rgw_http_manager = new RGWHTTPManager(cct);
1263 rgw_http_manager->start();
1264 }
1265
1266 void rgw_http_client_cleanup()
1267 {
1268 rgw_http_manager->stop();
1269 delete rgw_http_manager;
1270 curl_global_cleanup();
1271 }
1272
1273
1274 int RGWHTTP::send(RGWHTTPClient *req) {
1275 if (!req) {
1276 return 0;
1277 }
1278 int r = rgw_http_manager->add_request(req);
1279 if (r < 0) {
1280 return r;
1281 }
1282
1283 return 0;
1284 }
1285
1286 int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
1287 if (!req) {
1288 return 0;
1289 }
1290 int r = send(req);
1291 if (r < 0) {
1292 return r;
1293 }
1294
1295 return req->wait(y);
1296 }
1297