]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/compat.h"
5 #include "common/errno.h"
6
7
8 #include <curl/curl.h>
9 #include <curl/easy.h>
10 #include <curl/multi.h>
11
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/async/completion.h"
16 #include "common/RefCountedObj.h"
17
18 #include "rgw_coroutine.h"
19 #include "rgw_tools.h"
20
21 #include <atomic>
22 #include <string_view>
23
24 #define dout_context g_ceph_context
25 #define dout_subsys ceph_subsys_rgw
26
27 using namespace std;
28
29 RGWHTTPManager *rgw_http_manager;
30
31 struct RGWCurlHandle;
32
33 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
34
35 struct rgw_http_req_data : public RefCountedObject {
36 RGWCurlHandle *curl_handle{nullptr};
37 curl_slist *h{nullptr};
38 uint64_t id;
39 int ret{0};
40 std::atomic<bool> done = { false };
41 RGWHTTPClient *client{nullptr};
42 rgw_io_id control_io_id;
43 void *user_info{nullptr};
44 bool registered{false};
45 RGWHTTPManager *mgr{nullptr};
46 char error_buf[CURL_ERROR_SIZE];
47 bool write_paused{false};
48 bool read_paused{false};
49
50 optional<int> user_ret;
51
52 ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
53 ceph::condition_variable cond;
54
55 using Signature = void(boost::system::error_code);
56 using Completion = ceph::async::Completion<Signature>;
57 std::unique_ptr<Completion> completion;
58
59 rgw_http_req_data() : id(-1) {
60 // FIPS zeroization audit 20191115: this memset is not security related.
61 memset(error_buf, 0, sizeof(error_buf));
62 }
63
64 template <typename ExecutionContext, typename CompletionToken>
65 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
66 boost::asio::async_completion<CompletionToken, Signature> init(token);
67 auto& handler = init.completion_handler;
68 {
69 std::unique_lock l{lock};
70 completion = Completion::create(ctx.get_executor(), std::move(handler));
71 }
72 return init.result.get();
73 }
74
75 int wait(optional_yield y) {
76 if (done) {
77 return ret;
78 }
79 if (y) {
80 auto& context = y.get_io_context();
81 auto& yield = y.get_yield_context();
82 boost::system::error_code ec;
83 async_wait(context, yield[ec]);
84 return -ec.value();
85 }
86 // work on asio threads should be asynchronous, so warn when they block
87 if (is_asio_thread) {
88 dout(20) << "WARNING: blocking http request" << dendl;
89 }
90 std::unique_lock l{lock};
91 cond.wait(l, [this]{return done==true;});
92 return ret;
93 }
94
95 void set_state(int bitmask);
96
97 void finish(int r, long http_status = -1) {
98 std::lock_guard l{lock};
99 if (http_status != -1) {
100 if (client) {
101 client->set_http_status(http_status);
102 }
103 }
104 ret = r;
105 if (curl_handle)
106 do_curl_easy_cleanup(curl_handle);
107
108 if (h)
109 curl_slist_free_all(h);
110
111 curl_handle = NULL;
112 h = NULL;
113 done = true;
114 if (completion) {
115 boost::system::error_code ec(-ret, boost::system::system_category());
116 Completion::post(std::move(completion), ec);
117 } else {
118 cond.notify_all();
119 }
120 }
121
122 bool is_done() {
123 return done;
124 }
125
126 int get_retcode() {
127 std::lock_guard l{lock};
128 return ret;
129 }
130
131 RGWHTTPManager *get_manager() {
132 std::lock_guard l{lock};
133 return mgr;
134 }
135
136 CURL *get_easy_handle() const;
137 };
138
139 struct RGWCurlHandle {
140 int uses;
141 mono_time lastuse;
142 CURL* h;
143
144 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
145 CURL* operator*() {
146 return this->h;
147 }
148 };
149
150 void rgw_http_req_data::set_state(int bitmask) {
151 /* no need to lock here, moreover curl_easy_pause() might trigger
152 * the data receive callback :/
153 */
154 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
155 if (rc != CURLE_OK) {
156 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
157 }
158 }
159
160 #define MAXIDLE 5
161 class RGWCurlHandles : public Thread {
162 public:
163 ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
164 std::vector<RGWCurlHandle*> saved_curl;
165 int cleaner_shutdown;
166 ceph::condition_variable cleaner_cond;
167
168 RGWCurlHandles() :
169 cleaner_shutdown{0} {
170 }
171
172 RGWCurlHandle* get_curl_handle();
173 void release_curl_handle_now(RGWCurlHandle* curl);
174 void release_curl_handle(RGWCurlHandle* curl);
175 void flush_curl_handles();
176 void* entry();
177 void stop();
178 };
179
180 RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
181 RGWCurlHandle* curl = 0;
182 CURL* h;
183 {
184 std::lock_guard lock{cleaner_lock};
185 if (!saved_curl.empty()) {
186 curl = *saved_curl.begin();
187 saved_curl.erase(saved_curl.begin());
188 }
189 }
190 if (curl) {
191 } else if ((h = curl_easy_init())) {
192 curl = new RGWCurlHandle{h};
193 } else {
194 // curl = 0;
195 }
196 return curl;
197 }
198
199 void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
200 {
201 curl_easy_cleanup(**curl);
202 delete curl;
203 }
204
205 void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
206 {
207 if (cleaner_shutdown) {
208 release_curl_handle_now(curl);
209 } else {
210 curl_easy_reset(**curl);
211 std::lock_guard lock{cleaner_lock};
212 curl->lastuse = mono_clock::now();
213 saved_curl.insert(saved_curl.begin(), 1, curl);
214 }
215 }
216
217 void* RGWCurlHandles::entry()
218 {
219 RGWCurlHandle* curl;
220 std::unique_lock lock{cleaner_lock};
221
222 for (;;) {
223 if (cleaner_shutdown) {
224 if (saved_curl.empty())
225 break;
226 } else {
227 cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
228 }
229 mono_time now = mono_clock::now();
230 while (!saved_curl.empty()) {
231 auto cend = saved_curl.end();
232 --cend;
233 curl = *cend;
234 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
235 break;
236 saved_curl.erase(cend);
237 release_curl_handle_now(curl);
238 }
239 }
240 return nullptr;
241 }
242
243 void RGWCurlHandles::stop()
244 {
245 std::lock_guard lock{cleaner_lock};
246 cleaner_shutdown = 1;
247 cleaner_cond.notify_all();
248 }
249
250 void RGWCurlHandles::flush_curl_handles()
251 {
252 stop();
253 join();
254 if (!saved_curl.empty()) {
255 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
256 }
257 saved_curl.shrink_to_fit();
258 }
259
260 CURL *rgw_http_req_data::get_easy_handle() const
261 {
262 return **curl_handle;
263 }
264
265 static RGWCurlHandles *handles;
266
267 static RGWCurlHandle *do_curl_easy_init()
268 {
269 return handles->get_curl_handle();
270 }
271
272 static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
273 {
274 handles->release_curl_handle(curl_handle);
275 }
276
277 // XXX make this part of the token cache? (but that's swift-only;
278 // and this especially needs to integrates with s3...)
279
280 void rgw_setup_saved_curl_handles()
281 {
282 handles = new RGWCurlHandles();
283 handles->create("rgw_curl");
284 }
285
286 void rgw_release_all_curl_handles()
287 {
288 handles->flush_curl_handles();
289 delete handles;
290 }
291
292 void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
293 {
294 if (id == 0) {
295 id = io_id_provider.get_next();
296 }
297 }
298
299 RGWHTTPClient::RGWHTTPClient(CephContext *cct,
300 const string& _method,
301 const string& _url)
302 : NoDoutPrefix(cct, dout_subsys),
303 has_send_len(false),
304 http_status(HTTP_STATUS_NOSTATUS),
305 req_data(nullptr),
306 verify_ssl(cct->_conf->rgw_verify_ssl),
307 cct(cct),
308 method(_method),
309 url(_url) {
310 init();
311 }
312
313 std::ostream& RGWHTTPClient::gen_prefix(std::ostream& out) const
314 {
315 out << "http_client[" << method << "/" << url << "]";
316 return out;
317 }
318
319 void RGWHTTPClient::init()
320 {
321 auto pos = url.find("://");
322 if (pos == string::npos) {
323 host = url;
324 return;
325 }
326
327 protocol = url.substr(0, pos);
328
329 pos += 3;
330
331 auto host_end_pos = url.find("/", pos);
332 if (host_end_pos == string::npos) {
333 host = url.substr(pos);
334 return;
335 }
336
337 host = url.substr(pos, host_end_pos - pos);
338 resource_prefix = url.substr(host_end_pos + 1);
339 if (resource_prefix.size() > 0 && resource_prefix[resource_prefix.size() - 1] != '/') {
340 resource_prefix.append("/");
341 }
342 }
343
344 /*
345 * the following set of callbacks will be called either on RGWHTTPManager::process(),
346 * or via the RGWHTTPManager async processing.
347 */
348 size_t RGWHTTPClient::receive_http_header(void * const ptr,
349 const size_t size,
350 const size_t nmemb,
351 void * const _info)
352 {
353 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
354 size_t len = size * nmemb;
355
356 std::lock_guard l{req_data->lock};
357
358 if (!req_data->registered) {
359 return len;
360 }
361
362 int ret = req_data->client->receive_header(ptr, size * nmemb);
363 if (ret < 0) {
364 dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
365 req_data->user_ret = ret;
366 return CURLE_WRITE_ERROR;
367 }
368
369 return len;
370 }
371
372 size_t RGWHTTPClient::receive_http_data(void * const ptr,
373 const size_t size,
374 const size_t nmemb,
375 void * const _info)
376 {
377 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
378 size_t len = size * nmemb;
379
380 bool pause = false;
381
382 RGWHTTPClient *client;
383
384 {
385 std::lock_guard l{req_data->lock};
386 if (!req_data->registered) {
387 return len;
388 }
389
390 client = req_data->client;
391 }
392
393 size_t& skip_bytes = client->receive_pause_skip;
394
395 if (skip_bytes >= len) {
396 skip_bytes -= len;
397 return len;
398 }
399
400 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
401 if (ret < 0) {
402 dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
403 req_data->user_ret = ret;
404 return CURLE_WRITE_ERROR;
405 }
406
407 if (pause) {
408 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
409 skip_bytes = len;
410 std::lock_guard l{req_data->lock};
411 req_data->read_paused = true;
412 return CURL_WRITEFUNC_PAUSE;
413 }
414
415 skip_bytes = 0;
416
417 return len;
418 }
419
420 size_t RGWHTTPClient::send_http_data(void * const ptr,
421 const size_t size,
422 const size_t nmemb,
423 void * const _info)
424 {
425 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
426
427 RGWHTTPClient *client;
428
429 {
430 std::lock_guard l{req_data->lock};
431
432 if (!req_data->registered) {
433 return 0;
434 }
435
436 client = req_data->client;
437 }
438
439 bool pause = false;
440
441 int ret = client->send_data(ptr, size * nmemb, &pause);
442 if (ret < 0) {
443 dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl;
444 req_data->user_ret = ret;
445 return CURLE_READ_ERROR;
446 }
447
448 if (ret == 0 &&
449 pause) {
450 std::lock_guard l{req_data->lock};
451 req_data->write_paused = true;
452 return CURL_READFUNC_PAUSE;
453 }
454
455 return ret;
456 }
457
458 ceph::mutex& RGWHTTPClient::get_req_lock()
459 {
460 return req_data->lock;
461 }
462
463 void RGWHTTPClient::_set_write_paused(bool pause)
464 {
465 ceph_assert(ceph_mutex_is_locked(req_data->lock));
466
467 RGWHTTPManager *mgr = req_data->mgr;
468 if (pause == req_data->write_paused) {
469 return;
470 }
471 if (pause) {
472 mgr->set_request_state(this, SET_WRITE_PAUSED);
473 } else {
474 mgr->set_request_state(this, SET_WRITE_RESUME);
475 }
476 }
477
478 void RGWHTTPClient::_set_read_paused(bool pause)
479 {
480 ceph_assert(ceph_mutex_is_locked(req_data->lock));
481
482 RGWHTTPManager *mgr = req_data->mgr;
483 if (pause == req_data->read_paused) {
484 return;
485 }
486 if (pause) {
487 mgr->set_request_state(this, SET_READ_PAUSED);
488 } else {
489 mgr->set_request_state(this, SET_READ_RESUME);
490 }
491 }
492
493 static curl_slist *headers_to_slist(param_vec_t& headers)
494 {
495 curl_slist *h = NULL;
496
497 param_vec_t::iterator iter;
498 for (iter = headers.begin(); iter != headers.end(); ++iter) {
499 pair<string, string>& p = *iter;
500 string val = p.first;
501
502 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
503 val = val.substr(5);
504 }
505
506 /* we need to convert all underscores into dashes as some web servers forbid them
507 * in the http header field names
508 */
509 for (size_t i = 0; i < val.size(); i++) {
510 if (val[i] == '_') {
511 val[i] = '-';
512 }
513 }
514
515 val = camelcase_dash_http_attr(val);
516
517 // curl won't send headers with empty values unless it ends with a ; instead
518 if (p.second.empty()) {
519 val.append(1, ';');
520 } else {
521 val.append(": ");
522 val.append(p.second);
523 }
524 h = curl_slist_append(h, val.c_str());
525 }
526
527 return h;
528 }
529
530 static bool is_upload_request(const string& method)
531 {
532 return method == "POST" || method == "PUT";
533 }
534
535 /*
536 * process a single simple one off request
537 */
538 int RGWHTTPClient::process(optional_yield y)
539 {
540 return RGWHTTP::process(this, y);
541 }
542
543 string RGWHTTPClient::to_str()
544 {
545 string method_str = (method.empty() ? "<no-method>" : method);
546 string url_str = (url.empty() ? "<no-url>" : url);
547 return method_str + " " + url_str;
548 }
549
550 int RGWHTTPClient::get_req_retcode()
551 {
552 if (!req_data) {
553 return -EINVAL;
554 }
555
556 return req_data->get_retcode();
557 }
558
559 /*
560 * init request, will be used later with RGWHTTPManager
561 */
562 int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
563 {
564 ceph_assert(!req_data);
565 _req_data->get();
566 req_data = _req_data;
567
568 req_data->curl_handle = do_curl_easy_init();
569
570 CURL *easy_handle = req_data->get_easy_handle();
571
572 dout(20) << "sending request to " << url << dendl;
573
574 curl_slist *h = headers_to_slist(headers);
575
576 req_data->h = h;
577
578 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
579 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
580 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
581 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
582 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
583 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
584 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
585 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
586 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
587 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
588 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
589 curl_easy_setopt(easy_handle, CURLOPT_TCP_KEEPALIVE, cct->_conf->rgw_curl_tcp_keepalive);
590 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
591 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
592 curl_easy_setopt(easy_handle, CURLOPT_BUFFERSIZE, cct->_conf->rgw_curl_buffersize);
593 if (send_data_hint || is_upload_request(method)) {
594 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
595 }
596 if (has_send_len) {
597 // TODO: prevent overflow by using curl_off_t
598 // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
599 const long size = send_len;
600 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
601 if (method == "POST") {
602 curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size);
603 // TODO: set to size smaller than 1MB should prevent the "Expect" field
604 // from being sent. So explicit removal is not needed
605 h = curl_slist_append(h, "Expect:");
606 }
607 }
608
609 if (method == "HEAD") {
610 curl_easy_setopt(easy_handle, CURLOPT_NOBODY, 1L);
611 }
612
613 if (h) {
614 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
615 }
616 if (!verify_ssl) {
617 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
618 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
619 dout(20) << "ssl verification is set to off" << dendl;
620 } else {
621 if (!ca_path.empty()) {
622 curl_easy_setopt(easy_handle, CURLOPT_CAINFO, ca_path.c_str());
623 dout(20) << "using custom ca cert "<< ca_path.c_str() << " for ssl" << dendl;
624 }
625 if (!client_cert.empty()) {
626 if (!client_key.empty()) {
627 curl_easy_setopt(easy_handle, CURLOPT_SSLCERT, client_cert.c_str());
628 curl_easy_setopt(easy_handle, CURLOPT_SSLKEY, client_key.c_str());
629 dout(20) << "using custom client cert " << client_cert.c_str()
630 << " and private key " << client_key.c_str() << dendl;
631 } else {
632 dout(5) << "private key is missing for client certificate" << dendl;
633 }
634 }
635 }
636 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
637 curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);
638
639 return 0;
640 }
641
642 bool RGWHTTPClient::is_done()
643 {
644 return req_data->is_done();
645 }
646
647 /*
648 * wait for async request to complete
649 */
650 int RGWHTTPClient::wait(optional_yield y)
651 {
652 return req_data->wait(y);
653 }
654
655 void RGWHTTPClient::cancel()
656 {
657 if (req_data) {
658 RGWHTTPManager *http_manager = req_data->mgr;
659 if (http_manager) {
660 http_manager->remove_request(this);
661 }
662 }
663 }
664
665 RGWHTTPClient::~RGWHTTPClient()
666 {
667 cancel();
668 if (req_data) {
669 req_data->put();
670 }
671 }
672
673
674 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
675 {
676 const std::string_view header_line(static_cast<const char *>(ptr), len);
677
678 /* We're tokening the line that way due to backward compatibility. */
679 const size_t sep_loc = header_line.find_first_of(" \t:");
680
681 if (std::string_view::npos == sep_loc) {
682 /* Wrongly formatted header? Just skip it. */
683 return 0;
684 }
685
686 header_name_t name(header_line.substr(0, sep_loc));
687 if (0 == relevant_headers.count(name)) {
688 /* Not interested in this particular header. */
689 return 0;
690 }
691
692 const auto value_part = header_line.substr(sep_loc + 1);
693
694 /* Skip spaces and tabs after the separator. */
695 const size_t val_loc_s = value_part.find_first_not_of(' ');
696 const size_t val_loc_e = value_part.find_first_of("\r\n");
697
698 if (std::string_view::npos == val_loc_s ||
699 std::string_view::npos == val_loc_e) {
700 /* Empty value case. */
701 found_headers.emplace(name, header_value_t());
702 } else {
703 found_headers.emplace(name, header_value_t(
704 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
705 }
706
707 return 0;
708 }
709
710 int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
711 {
712 int length_to_copy = 0;
713 if (post_data_index < post_data.length()) {
714 length_to_copy = min(post_data.length() - post_data_index, len);
715 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
716 post_data_index += length_to_copy;
717 }
718 return length_to_copy;
719 }
720
721
722 static int clear_signal(int fd)
723 {
724 // since we're in non-blocking mode, we can try to read a lot more than
725 // one signal from signal_thread() to avoid later wakeups
726 std::array<char, 256> buf{};
727 int ret = ::read(fd, (void *)buf.data(), buf.size());
728 if (ret < 0) {
729 ret = -errno;
730 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
731 }
732 return 0;
733 }
734
735 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
736 {
737 int num_fds;
738 struct curl_waitfd wait_fd;
739
740 wait_fd.fd = signal_fd;
741 wait_fd.events = CURL_WAIT_POLLIN;
742 wait_fd.revents = 0;
743
744 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
745 if (ret) {
746 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
747 return -EIO;
748 }
749
750 if (wait_fd.revents > 0) {
751 ret = clear_signal(signal_fd);
752 if (ret < 0) {
753 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
754 return ret;
755 }
756 }
757 return 0;
758 }
759
760 void *RGWHTTPManager::ReqsThread::entry()
761 {
762 manager->reqs_thread_entry();
763 return NULL;
764 }
765
766 /*
767 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
768 */
769 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
770 completion_mgr(_cm)
771 {
772 multi_handle = (void *)curl_multi_init();
773 thread_pipe[0] = -1;
774 thread_pipe[1] = -1;
775 }
776
777 RGWHTTPManager::~RGWHTTPManager() {
778 stop();
779 if (multi_handle)
780 curl_multi_cleanup((CURLM *)multi_handle);
781 }
782
783 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
784 {
785 std::unique_lock rl{reqs_lock};
786 req_data->id = num_reqs;
787 req_data->registered = true;
788 reqs[num_reqs] = req_data;
789 num_reqs++;
790 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
791 }
792
793 bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
794 {
795 std::unique_lock rl{reqs_lock};
796 if (!req_data->registered) {
797 return false;
798 }
799 req_data->get();
800 req_data->registered = false;
801 unregistered_reqs.push_back(req_data);
802 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
803 return true;
804 }
805
806 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
807 {
808 std::unique_lock rl{reqs_lock};
809 _complete_request(req_data);
810 }
811
812 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
813 {
814 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
815 if (iter != reqs.end()) {
816 reqs.erase(iter);
817 }
818 {
819 std::lock_guard l{req_data->lock};
820 req_data->mgr = nullptr;
821 }
822 if (completion_mgr) {
823 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
824 }
825
826 req_data->put();
827 }
828
829 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status)
830 {
831 req_data->finish(ret, http_status);
832 complete_request(req_data);
833 }
834
835 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
836 {
837 req_data->finish(ret);
838 _complete_request(req_data);
839 }
840
841 void RGWHTTPManager::_set_req_state(set_state& ss)
842 {
843 ss.req->set_state(ss.bitmask);
844 }
845 /*
846 * hook request to the curl multi handle
847 */
848 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
849 {
850 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
851 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
852 if (mstatus) {
853 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
854 return -EIO;
855 }
856 return 0;
857 }
858
859 /*
860 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
861 * there will be no more processing on this request
862 */
863 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
864 {
865 if (req_data->curl_handle) {
866 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
867 }
868 if (!req_data->is_done()) {
869 _finish_request(req_data, -ECANCELED);
870 }
871 }
872
873 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
874 {
875 std::unique_lock wl{reqs_lock};
876 _unlink_request(req_data);
877 }
878
879 void RGWHTTPManager::manage_pending_requests()
880 {
881 reqs_lock.lock_shared();
882 if (max_threaded_req == num_reqs &&
883 unregistered_reqs.empty() &&
884 reqs_change_state.empty()) {
885 reqs_lock.unlock_shared();
886 return;
887 }
888 reqs_lock.unlock_shared();
889
890 std::unique_lock wl{reqs_lock};
891
892 if (!reqs_change_state.empty()) {
893 for (auto siter : reqs_change_state) {
894 _set_req_state(siter);
895 }
896 reqs_change_state.clear();
897 }
898
899 if (!unregistered_reqs.empty()) {
900 for (auto& r : unregistered_reqs) {
901 _unlink_request(r);
902 r->put();
903 }
904
905 unregistered_reqs.clear();
906 }
907
908 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
909
910 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
911
912 for (; iter != reqs.end(); ++iter) {
913 rgw_http_req_data *req_data = iter->second;
914 int r = link_request(req_data);
915 if (r < 0) {
916 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
917 remove_reqs.push_back(std::make_pair(iter->second, r));
918 } else {
919 max_threaded_req = iter->first + 1;
920 }
921 }
922
923 for (auto piter : remove_reqs) {
924 rgw_http_req_data *req_data = piter.first;
925 int r = piter.second;
926
927 _finish_request(req_data, r);
928 }
929 }
930
931 int RGWHTTPManager::add_request(RGWHTTPClient *client)
932 {
933 rgw_http_req_data *req_data = new rgw_http_req_data;
934
935 int ret = client->init_request(req_data);
936 if (ret < 0) {
937 req_data->put();
938 req_data = NULL;
939 return ret;
940 }
941
942 req_data->mgr = this;
943 req_data->client = client;
944 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
945 req_data->user_info = client->get_io_user_info();
946
947 register_request(req_data);
948
949 if (!is_started) {
950 ret = link_request(req_data);
951 if (ret < 0) {
952 req_data->put();
953 req_data = NULL;
954 }
955 return ret;
956 }
957 ret = signal_thread();
958 if (ret < 0) {
959 finish_request(req_data, ret);
960 }
961
962 return ret;
963 }
964
965 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
966 {
967 rgw_http_req_data *req_data = client->get_req_data();
968
969 if (!is_started) {
970 unlink_request(req_data);
971 return 0;
972 }
973 if (!unregister_request(req_data)) {
974 return 0;
975 }
976 int ret = signal_thread();
977 if (ret < 0) {
978 return ret;
979 }
980
981 return 0;
982 }
983
984 int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
985 {
986 rgw_http_req_data *req_data = client->get_req_data();
987
988 ceph_assert(ceph_mutex_is_locked(req_data->lock));
989
990 /* can only do that if threaded */
991 if (!is_started) {
992 return -EINVAL;
993 }
994
995 bool suggested_wr_paused = req_data->write_paused;
996 bool suggested_rd_paused = req_data->read_paused;
997
998 switch (state) {
999 case SET_WRITE_PAUSED:
1000 suggested_wr_paused = true;
1001 break;
1002 case SET_WRITE_RESUME:
1003 suggested_wr_paused = false;
1004 break;
1005 case SET_READ_PAUSED:
1006 suggested_rd_paused = true;
1007 break;
1008 case SET_READ_RESUME:
1009 suggested_rd_paused = false;
1010 break;
1011 default:
1012 /* shouldn't really be here */
1013 return -EIO;
1014 }
1015 if (suggested_wr_paused == req_data->write_paused &&
1016 suggested_rd_paused == req_data->read_paused) {
1017 return 0;
1018 }
1019
1020 req_data->write_paused = suggested_wr_paused;
1021 req_data->read_paused = suggested_rd_paused;
1022
1023 int bitmask = CURLPAUSE_CONT;
1024
1025 if (req_data->write_paused) {
1026 bitmask |= CURLPAUSE_SEND;
1027 }
1028
1029 if (req_data->read_paused) {
1030 bitmask |= CURLPAUSE_RECV;
1031 }
1032
1033 reqs_change_state.push_back(set_state(req_data, bitmask));
1034 int ret = signal_thread();
1035 if (ret < 0) {
1036 return ret;
1037 }
1038
1039 return 0;
1040 }
1041
1042 int RGWHTTPManager::start()
1043 {
1044 if (pipe_cloexec(thread_pipe, 0) < 0) {
1045 int e = errno;
1046 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1047 return -e;
1048 }
1049
1050 // enable non-blocking reads
1051 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1052 int e = errno;
1053 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
1054 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1055 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1056 return -e;
1057 }
1058
1059 is_started = true;
1060 reqs_thread = new ReqsThread(this);
1061 reqs_thread->create("http_manager");
1062 return 0;
1063 }
1064
1065 void RGWHTTPManager::stop()
1066 {
1067 if (is_stopped) {
1068 return;
1069 }
1070
1071 is_stopped = true;
1072
1073 if (is_started) {
1074 going_down = true;
1075 signal_thread();
1076 reqs_thread->join();
1077 delete reqs_thread;
1078 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1079 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1080 }
1081 }
1082
1083 int RGWHTTPManager::signal_thread()
1084 {
1085 uint32_t buf = 0;
1086 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1087 if (ret < 0) {
1088 ret = -errno;
1089 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1090 return ret;
1091 }
1092 return 0;
1093 }
1094
1095 void *RGWHTTPManager::reqs_thread_entry()
1096 {
1097 int still_running;
1098 int mstatus;
1099
1100 ldout(cct, 20) << __func__ << ": start" << dendl;
1101
1102 while (!going_down) {
1103 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1104 if (ret < 0) {
1105 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1106 return NULL;
1107 }
1108
1109 manage_pending_requests();
1110
1111 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1112 switch (mstatus) {
1113 case CURLM_OK:
1114 case CURLM_CALL_MULTI_PERFORM:
1115 break;
1116 default:
1117 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1118 break;
1119 }
1120 int msgs_left;
1121 CURLMsg *msg;
1122 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1123 if (msg->msg == CURLMSG_DONE) {
1124 int result = msg->data.result;
1125 CURL *e = msg->easy_handle;
1126 rgw_http_req_data *req_data;
1127 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1128 curl_multi_remove_handle((CURLM *)multi_handle, e);
1129
1130 long http_status;
1131 int status;
1132 if (!req_data->user_ret) {
1133 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1134
1135 status = rgw_http_error_to_errno(http_status);
1136 if (result != CURLE_OK && status == 0) {
1137 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1138 status = -EAGAIN;
1139 }
1140 } else {
1141 status = *req_data->user_ret;
1142 rgw_err err;
1143 set_req_state_err(err, status, 0);
1144 http_status = err.http_ret;
1145 }
1146 int id = req_data->id;
1147 finish_request(req_data, status, http_status);
1148 switch (result) {
1149 case CURLE_OK:
1150 break;
1151 case CURLE_OPERATION_TIMEDOUT:
1152 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1153 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
1154 default:
1155 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
1156 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << " req_data->error_buf=" << req_data->error_buf << dendl;
1157 break;
1158 }
1159 }
1160 }
1161 }
1162
1163
1164 std::unique_lock rl{reqs_lock};
1165 for (auto r : unregistered_reqs) {
1166 _unlink_request(r);
1167 }
1168
1169 unregistered_reqs.clear();
1170
1171 auto all_reqs = std::move(reqs);
1172 for (auto iter : all_reqs) {
1173 _unlink_request(iter.second);
1174 }
1175
1176 reqs.clear();
1177
1178 if (completion_mgr) {
1179 completion_mgr->go_down();
1180 }
1181
1182 return 0;
1183 }
1184
1185 void rgw_http_client_init(CephContext *cct)
1186 {
1187 curl_global_init(CURL_GLOBAL_ALL);
1188 rgw_http_manager = new RGWHTTPManager(cct);
1189 rgw_http_manager->start();
1190 }
1191
1192 void rgw_http_client_cleanup()
1193 {
1194 rgw_http_manager->stop();
1195 delete rgw_http_manager;
1196 curl_global_cleanup();
1197 }
1198
1199
1200 int RGWHTTP::send(RGWHTTPClient *req) {
1201 if (!req) {
1202 return 0;
1203 }
1204 int r = rgw_http_manager->add_request(req);
1205 if (r < 0) {
1206 return r;
1207 }
1208
1209 return 0;
1210 }
1211
1212 int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
1213 if (!req) {
1214 return 0;
1215 }
1216 int r = send(req);
1217 if (r < 0) {
1218 return r;
1219 }
1220
1221 return req->wait(y);
1222 }
1223