]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_http_client.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include "include/compat.h"
91327a77 5#include "common/errno.h"
7c673cae 6
7c673cae
FG
7
8#include <curl/curl.h>
9#include <curl/easy.h>
10#include <curl/multi.h>
11
12#include "rgw_common.h"
13#include "rgw_http_client.h"
14#include "rgw_http_errors.h"
eafe8130 15#include "common/async/completion.h"
7c673cae
FG
16#include "common/RefCountedObj.h"
17
18#include "rgw_coroutine.h"
9f95a23c 19#include "rgw_tools.h"
7c673cae
FG
20
21#include <atomic>
f67539c2 22#include <string_view>
7c673cae
FG
23
24#define dout_context g_ceph_context
25#define dout_subsys ceph_subsys_rgw
26
20effc67
TL
27using namespace std;
28
11fdf7f2
TL
29RGWHTTPManager *rgw_http_manager;
30
31struct RGWCurlHandle;
32
33static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle);
34
7c673cae 35struct rgw_http_req_data : public RefCountedObject {
11fdf7f2
TL
36 RGWCurlHandle *curl_handle{nullptr};
37 curl_slist *h{nullptr};
7c673cae 38 uint64_t id;
11fdf7f2 39 int ret{0};
7c673cae 40 std::atomic<bool> done = { false };
11fdf7f2
TL
41 RGWHTTPClient *client{nullptr};
42 rgw_io_id control_io_id;
43 void *user_info{nullptr};
44 bool registered{false};
45 RGWHTTPManager *mgr{nullptr};
7c673cae 46 char error_buf[CURL_ERROR_SIZE];
11fdf7f2
TL
47 bool write_paused{false};
48 bool read_paused{false};
7c673cae 49
9f95a23c
TL
50 optional<int> user_ret;
51
52 ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
53 ceph::condition_variable cond;
7c673cae 54
eafe8130
TL
55 using Signature = void(boost::system::error_code);
56 using Completion = ceph::async::Completion<Signature>;
57 std::unique_ptr<Completion> completion;
58
9f95a23c 59 rgw_http_req_data() : id(-1) {
92f5a8d4 60 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
61 memset(error_buf, 0, sizeof(error_buf));
62 }
63
eafe8130
TL
64 template <typename ExecutionContext, typename CompletionToken>
65 auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
66 boost::asio::async_completion<CompletionToken, Signature> init(token);
67 auto& handler = init.completion_handler;
68 {
69 std::unique_lock l{lock};
70 completion = Completion::create(ctx.get_executor(), std::move(handler));
71 }
72 return init.result.get();
73 }
9f95a23c 74
eafe8130 75 int wait(optional_yield y) {
11fdf7f2
TL
76 if (done) {
77 return ret;
78 }
eafe8130
TL
79 if (y) {
80 auto& context = y.get_io_context();
81 auto& yield = y.get_yield_context();
82 boost::system::error_code ec;
83 async_wait(context, yield[ec]);
84 return -ec.value();
85 }
9f95a23c
TL
86 // work on asio threads should be asynchronous, so warn when they block
87 if (is_asio_thread) {
88 dout(20) << "WARNING: blocking http request" << dendl;
89 }
9f95a23c 90 std::unique_lock l{lock};
f67539c2 91 cond.wait(l, [this]{return done==true;});
7c673cae
FG
92 return ret;
93 }
94
11fdf7f2 95 void set_state(int bitmask);
7c673cae 96
9f95a23c
TL
97 void finish(int r, long http_status = -1) {
98 std::lock_guard l{lock};
99 if (http_status != -1) {
100 if (client) {
101 client->set_http_status(http_status);
102 }
103 }
7c673cae 104 ret = r;
11fdf7f2
TL
105 if (curl_handle)
106 do_curl_easy_cleanup(curl_handle);
7c673cae
FG
107
108 if (h)
109 curl_slist_free_all(h);
110
11fdf7f2 111 curl_handle = NULL;
7c673cae
FG
112 h = NULL;
113 done = true;
eafe8130
TL
114 if (completion) {
115 boost::system::error_code ec(-ret, boost::system::system_category());
116 Completion::post(std::move(completion), ec);
117 } else {
9f95a23c 118 cond.notify_all();
eafe8130 119 }
11fdf7f2
TL
120 }
121
7c673cae
FG
122 bool is_done() {
123 return done;
124 }
125
126 int get_retcode() {
9f95a23c 127 std::lock_guard l{lock};
7c673cae
FG
128 return ret;
129 }
9f95a23c 130
7c673cae 131 RGWHTTPManager *get_manager() {
9f95a23c 132 std::lock_guard l{lock};
7c673cae
FG
133 return mgr;
134 }
11fdf7f2
TL
135
136 CURL *get_easy_handle() const;
7c673cae
FG
137};
138
94b18763
FG
139struct RGWCurlHandle {
140 int uses;
141 mono_time lastuse;
142 CURL* h;
143
11fdf7f2 144 explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
94b18763
FG
145 CURL* operator*() {
146 return this->h;
147 }
148};
149
11fdf7f2
TL
150void rgw_http_req_data::set_state(int bitmask) {
151 /* no need to lock here, moreover curl_easy_pause() might trigger
152 * the data receive callback :/
153 */
154 CURLcode rc = curl_easy_pause(**curl_handle, bitmask);
155 if (rc != CURLE_OK) {
156 dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl;
157 }
158}
159
94b18763
FG
160#define MAXIDLE 5
161class RGWCurlHandles : public Thread {
162public:
9f95a23c
TL
163 ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock");
164 std::vector<RGWCurlHandle*> saved_curl;
94b18763 165 int cleaner_shutdown;
9f95a23c 166 ceph::condition_variable cleaner_cond;
94b18763
FG
167
168 RGWCurlHandles() :
94b18763
FG
169 cleaner_shutdown{0} {
170 }
171
172 RGWCurlHandle* get_curl_handle();
173 void release_curl_handle_now(RGWCurlHandle* curl);
174 void release_curl_handle(RGWCurlHandle* curl);
175 void flush_curl_handles();
176 void* entry();
177 void stop();
178};
179
180RGWCurlHandle* RGWCurlHandles::get_curl_handle() {
181 RGWCurlHandle* curl = 0;
182 CURL* h;
183 {
9f95a23c 184 std::lock_guard lock{cleaner_lock};
94b18763
FG
185 if (!saved_curl.empty()) {
186 curl = *saved_curl.begin();
187 saved_curl.erase(saved_curl.begin());
188 }
189 }
190 if (curl) {
191 } else if ((h = curl_easy_init())) {
192 curl = new RGWCurlHandle{h};
193 } else {
194 // curl = 0;
195 }
196 return curl;
197}
198
199void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl)
200{
201 curl_easy_cleanup(**curl);
202 delete curl;
203}
204
205void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl)
206{
207 if (cleaner_shutdown) {
208 release_curl_handle_now(curl);
209 } else {
210 curl_easy_reset(**curl);
9f95a23c 211 std::lock_guard lock{cleaner_lock};
94b18763
FG
212 curl->lastuse = mono_clock::now();
213 saved_curl.insert(saved_curl.begin(), 1, curl);
214 }
215}
216
217void* RGWCurlHandles::entry()
218{
219 RGWCurlHandle* curl;
9f95a23c 220 std::unique_lock lock{cleaner_lock};
94b18763
FG
221
222 for (;;) {
223 if (cleaner_shutdown) {
224 if (saved_curl.empty())
225 break;
226 } else {
9f95a23c 227 cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
94b18763
FG
228 }
229 mono_time now = mono_clock::now();
230 while (!saved_curl.empty()) {
231 auto cend = saved_curl.end();
232 --cend;
233 curl = *cend;
234 if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE))
235 break;
236 saved_curl.erase(cend);
237 release_curl_handle_now(curl);
238 }
239 }
240 return nullptr;
241}
242
243void RGWCurlHandles::stop()
244{
9f95a23c 245 std::lock_guard lock{cleaner_lock};
94b18763 246 cleaner_shutdown = 1;
9f95a23c 247 cleaner_cond.notify_all();
94b18763
FG
248}
249
250void RGWCurlHandles::flush_curl_handles()
251{
252 stop();
253 join();
254 if (!saved_curl.empty()) {
255 dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl;
256 }
257 saved_curl.shrink_to_fit();
258}
259
11fdf7f2
TL
260CURL *rgw_http_req_data::get_easy_handle() const
261{
262 return **curl_handle;
263}
264
94b18763 265static RGWCurlHandles *handles;
11fdf7f2
TL
266
267static RGWCurlHandle *do_curl_easy_init()
268{
269 return handles->get_curl_handle();
270}
271
272static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle)
273{
274 handles->release_curl_handle(curl_handle);
275}
276
94b18763
FG
277// XXX make this part of the token cache? (but that's swift-only;
278// and this especially needs to integrates with s3...)
279
280void rgw_setup_saved_curl_handles()
281{
282 handles = new RGWCurlHandles();
283 handles->create("rgw_curl");
284}
285
286void rgw_release_all_curl_handles()
287{
288 handles->flush_curl_handles();
289 delete handles;
290}
291
11fdf7f2 292void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
7c673cae 293{
11fdf7f2
TL
294 if (id == 0) {
295 id = io_id_provider.get_next();
7c673cae 296 }
7c673cae
FG
297}
298
20effc67
TL
299RGWHTTPClient::RGWHTTPClient(CephContext *cct,
300 const string& _method,
301 const string& _url)
302 : NoDoutPrefix(cct, dout_subsys),
303 has_send_len(false),
304 http_status(HTTP_STATUS_NOSTATUS),
305 req_data(nullptr),
306 verify_ssl(cct->_conf->rgw_verify_ssl),
307 cct(cct),
308 method(_method),
309 url(_url) {
310 init();
311}
312
313std::ostream& RGWHTTPClient::gen_prefix(std::ostream& out) const
314{
315 out << "http_client[" << method << "/" << url << "]";
316 return out;
317}
318
319void RGWHTTPClient::init()
320{
321 auto pos = url.find("://");
322 if (pos == string::npos) {
323 host = url;
324 return;
325 }
326
327 protocol = url.substr(0, pos);
328
329 pos += 3;
330
331 auto host_end_pos = url.find("/", pos);
332 if (host_end_pos == string::npos) {
333 host = url.substr(pos);
334 return;
335 }
336
337 host = url.substr(pos, host_end_pos - pos);
338 resource_prefix = url.substr(host_end_pos + 1);
339 if (resource_prefix.size() > 0 && resource_prefix[resource_prefix.size() - 1] != '/') {
340 resource_prefix.append("/");
341 }
342}
343
7c673cae
FG
344/*
345 * the following set of callbacks will be called either on RGWHTTPManager::process(),
346 * or via the RGWHTTPManager async processing.
347 */
348size_t RGWHTTPClient::receive_http_header(void * const ptr,
349 const size_t size,
350 const size_t nmemb,
351 void * const _info)
352{
353 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
354 size_t len = size * nmemb;
355
9f95a23c 356 std::lock_guard l{req_data->lock};
7c673cae
FG
357
358 if (!req_data->registered) {
359 return len;
360 }
361
362 int ret = req_data->client->receive_header(ptr, size * nmemb);
363 if (ret < 0) {
9f95a23c
TL
364 dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
365 req_data->user_ret = ret;
366 return CURLE_WRITE_ERROR;
7c673cae
FG
367 }
368
369 return len;
370}
371
372size_t RGWHTTPClient::receive_http_data(void * const ptr,
373 const size_t size,
374 const size_t nmemb,
375 void * const _info)
376{
377 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
378 size_t len = size * nmemb;
379
11fdf7f2
TL
380 bool pause = false;
381
382 RGWHTTPClient *client;
383
384 {
9f95a23c 385 std::lock_guard l{req_data->lock};
11fdf7f2
TL
386 if (!req_data->registered) {
387 return len;
388 }
389
390 client = req_data->client;
391 }
392
393 size_t& skip_bytes = client->receive_pause_skip;
394
395 if (skip_bytes >= len) {
396 skip_bytes -= len;
7c673cae
FG
397 return len;
398 }
11fdf7f2
TL
399
400 int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause);
7c673cae 401 if (ret < 0) {
9f95a23c
TL
402 dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
403 req_data->user_ret = ret;
404 return CURLE_WRITE_ERROR;
7c673cae
FG
405 }
406
11fdf7f2
TL
407 if (pause) {
408 dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl;
409 skip_bytes = len;
9f95a23c 410 std::lock_guard l{req_data->lock};
11fdf7f2
TL
411 req_data->read_paused = true;
412 return CURL_WRITEFUNC_PAUSE;
413 }
414
415 skip_bytes = 0;
416
7c673cae
FG
417 return len;
418}
419
420size_t RGWHTTPClient::send_http_data(void * const ptr,
421 const size_t size,
422 const size_t nmemb,
423 void * const _info)
424{
425 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
426
11fdf7f2
TL
427 RGWHTTPClient *client;
428
429 {
9f95a23c 430 std::lock_guard l{req_data->lock};
7c673cae 431
11fdf7f2
TL
432 if (!req_data->registered) {
433 return 0;
434 }
435
436 client = req_data->client;
7c673cae
FG
437 }
438
11fdf7f2
TL
439 bool pause = false;
440
441 int ret = client->send_data(ptr, size * nmemb, &pause);
7c673cae 442 if (ret < 0) {
9f95a23c
TL
443 dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl;
444 req_data->user_ret = ret;
445 return CURLE_READ_ERROR;
7c673cae
FG
446 }
447
11fdf7f2
TL
448 if (ret == 0 &&
449 pause) {
9f95a23c 450 std::lock_guard l{req_data->lock};
11fdf7f2
TL
451 req_data->write_paused = true;
452 return CURL_READFUNC_PAUSE;
453 }
454
7c673cae
FG
455 return ret;
456}
457
9f95a23c 458ceph::mutex& RGWHTTPClient::get_req_lock()
11fdf7f2
TL
459{
460 return req_data->lock;
461}
462
463void RGWHTTPClient::_set_write_paused(bool pause)
464{
9f95a23c 465 ceph_assert(ceph_mutex_is_locked(req_data->lock));
11fdf7f2
TL
466
467 RGWHTTPManager *mgr = req_data->mgr;
468 if (pause == req_data->write_paused) {
469 return;
470 }
471 if (pause) {
472 mgr->set_request_state(this, SET_WRITE_PAUSED);
473 } else {
474 mgr->set_request_state(this, SET_WRITE_RESUME);
475 }
476}
477
478void RGWHTTPClient::_set_read_paused(bool pause)
479{
9f95a23c 480 ceph_assert(ceph_mutex_is_locked(req_data->lock));
11fdf7f2
TL
481
482 RGWHTTPManager *mgr = req_data->mgr;
483 if (pause == req_data->read_paused) {
484 return;
485 }
486 if (pause) {
487 mgr->set_request_state(this, SET_READ_PAUSED);
488 } else {
489 mgr->set_request_state(this, SET_READ_RESUME);
490 }
491}
492
7c673cae
FG
493static curl_slist *headers_to_slist(param_vec_t& headers)
494{
495 curl_slist *h = NULL;
496
497 param_vec_t::iterator iter;
498 for (iter = headers.begin(); iter != headers.end(); ++iter) {
499 pair<string, string>& p = *iter;
500 string val = p.first;
501
502 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
503 val = val.substr(5);
504 }
505
506 /* we need to convert all underscores into dashes as some web servers forbid them
507 * in the http header field names
508 */
509 for (size_t i = 0; i < val.size(); i++) {
510 if (val[i] == '_') {
511 val[i] = '-';
512 }
513 }
11fdf7f2
TL
514
515 val = camelcase_dash_http_attr(val);
7c673cae 516
28e407b8
AA
517 // curl won't send headers with empty values unless it ends with a ; instead
518 if (p.second.empty()) {
519 val.append(1, ';');
520 } else {
521 val.append(": ");
522 val.append(p.second);
523 }
7c673cae
FG
524 h = curl_slist_append(h, val.c_str());
525 }
526
527 return h;
528}
529
11fdf7f2 530static bool is_upload_request(const string& method)
7c673cae 531{
11fdf7f2 532 return method == "POST" || method == "PUT";
7c673cae
FG
533}
534
535/*
11fdf7f2 536 * process a single simple one off request
7c673cae 537 */
eafe8130 538int RGWHTTPClient::process(optional_yield y)
7c673cae 539{
eafe8130 540 return RGWHTTP::process(this, y);
7c673cae
FG
541}
542
543string RGWHTTPClient::to_str()
544{
11fdf7f2
TL
545 string method_str = (method.empty() ? "<no-method>" : method);
546 string url_str = (url.empty() ? "<no-url>" : url);
7c673cae
FG
547 return method_str + " " + url_str;
548}
549
550int RGWHTTPClient::get_req_retcode()
551{
552 if (!req_data) {
553 return -EINVAL;
554 }
555
556 return req_data->get_retcode();
557}
558
559/*
560 * init request, will be used later with RGWHTTPManager
561 */
11fdf7f2 562int RGWHTTPClient::init_request(rgw_http_req_data *_req_data)
7c673cae 563{
11fdf7f2 564 ceph_assert(!req_data);
7c673cae
FG
565 _req_data->get();
566 req_data = _req_data;
567
11fdf7f2 568 req_data->curl_handle = do_curl_easy_init();
7c673cae 569
11fdf7f2 570 CURL *easy_handle = req_data->get_easy_handle();
7c673cae
FG
571
572 dout(20) << "sending request to " << url << dendl;
573
574 curl_slist *h = headers_to_slist(headers);
575
576 req_data->h = h;
577
11fdf7f2
TL
578 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
579 curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
7c673cae
FG
580 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
581 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
582 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
583 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
584 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
585 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
586 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
1adf2230
AA
587 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
588 curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
1e59de90 589 curl_easy_setopt(easy_handle, CURLOPT_TCP_KEEPALIVE, cct->_conf->rgw_curl_tcp_keepalive);
7c673cae
FG
590 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
591 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
f67539c2 592 curl_easy_setopt(easy_handle, CURLOPT_BUFFERSIZE, cct->_conf->rgw_curl_buffersize);
31f18b77 593 if (send_data_hint || is_upload_request(method)) {
7c673cae
FG
594 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
595 }
596 if (has_send_len) {
f67539c2
TL
597 // TODO: prevent overflow by using curl_off_t
598 // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
599 const long size = send_len;
600 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
601 if (method == "POST") {
602 curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size);
603 // TODO: set to size smaller than 1MB should prevent the "Expect" field
604 // from being sent. So explicit removal is not needed
605 h = curl_slist_append(h, "Expect:");
606 }
607 }
20effc67
TL
608
609 if (method == "HEAD") {
610 curl_easy_setopt(easy_handle, CURLOPT_NOBODY, 1L);
611 }
612
f67539c2
TL
613 if (h) {
614 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
7c673cae 615 }
31f18b77
FG
616 if (!verify_ssl) {
617 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
618 curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
619 dout(20) << "ssl verification is set to off" << dendl;
522d829b
TL
620 } else {
621 if (!ca_path.empty()) {
622 curl_easy_setopt(easy_handle, CURLOPT_CAINFO, ca_path.c_str());
623 dout(20) << "using custom ca cert "<< ca_path.c_str() << " for ssl" << dendl;
624 }
625 if (!client_cert.empty()) {
626 if (!client_key.empty()) {
627 curl_easy_setopt(easy_handle, CURLOPT_SSLCERT, client_cert.c_str());
628 curl_easy_setopt(easy_handle, CURLOPT_SSLKEY, client_key.c_str());
629 dout(20) << "using custom client cert " << client_cert.c_str()
630 << " and private key " << client_key.c_str() << dendl;
631 } else {
632 dout(5) << "private key is missing for client certificate" << dendl;
633 }
634 }
31f18b77 635 }
7c673cae 636 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
f67539c2 637 curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);
7c673cae
FG
638
639 return 0;
640}
641
11fdf7f2
TL
642bool RGWHTTPClient::is_done()
643{
644 return req_data->is_done();
645}
646
7c673cae
FG
647/*
648 * wait for async request to complete
649 */
eafe8130 650int RGWHTTPClient::wait(optional_yield y)
7c673cae 651{
eafe8130 652 return req_data->wait(y);
7c673cae
FG
653}
654
11fdf7f2 655void RGWHTTPClient::cancel()
7c673cae
FG
656{
657 if (req_data) {
11fdf7f2 658 RGWHTTPManager *http_manager = req_data->mgr;
7c673cae
FG
659 if (http_manager) {
660 http_manager->remove_request(this);
661 }
11fdf7f2
TL
662 }
663}
7c673cae 664
11fdf7f2
TL
665RGWHTTPClient::~RGWHTTPClient()
666{
667 cancel();
668 if (req_data) {
7c673cae
FG
669 req_data->put();
670 }
671}
672
673
674int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
675{
f67539c2 676 const std::string_view header_line(static_cast<const char *>(ptr), len);
7c673cae
FG
677
678 /* We're tokening the line that way due to backward compatibility. */
679 const size_t sep_loc = header_line.find_first_of(" \t:");
680
f67539c2 681 if (std::string_view::npos == sep_loc) {
7c673cae
FG
682 /* Wrongly formatted header? Just skip it. */
683 return 0;
684 }
685
686 header_name_t name(header_line.substr(0, sep_loc));
687 if (0 == relevant_headers.count(name)) {
688 /* Not interested in this particular header. */
689 return 0;
690 }
691
692 const auto value_part = header_line.substr(sep_loc + 1);
693
694 /* Skip spaces and tabs after the separator. */
695 const size_t val_loc_s = value_part.find_first_not_of(' ');
696 const size_t val_loc_e = value_part.find_first_of("\r\n");
697
f67539c2
TL
698 if (std::string_view::npos == val_loc_s ||
699 std::string_view::npos == val_loc_e) {
7c673cae
FG
700 /* Empty value case. */
701 found_headers.emplace(name, header_value_t());
702 } else {
703 found_headers.emplace(name, header_value_t(
704 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
705 }
706
707 return 0;
708}
709
11fdf7f2 710int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause)
7c673cae
FG
711{
712 int length_to_copy = 0;
713 if (post_data_index < post_data.length()) {
714 length_to_copy = min(post_data.length() - post_data_index, len);
715 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
716 post_data_index += length_to_copy;
717 }
718 return length_to_copy;
719}
720
721
722static int clear_signal(int fd)
723{
724 // since we're in non-blocking mode, we can try to read a lot more than
1e59de90 725 // one signal from signal_thread() to avoid later wakeups
20effc67 726 std::array<char, 256> buf{};
7c673cae
FG
727 int ret = ::read(fd, (void *)buf.data(), buf.size());
728 if (ret < 0) {
729 ret = -errno;
730 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
731 }
732 return 0;
733}
734
7c673cae
FG
735static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
736{
737 int num_fds;
738 struct curl_waitfd wait_fd;
739
740 wait_fd.fd = signal_fd;
741 wait_fd.events = CURL_WAIT_POLLIN;
742 wait_fd.revents = 0;
743
744 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
745 if (ret) {
746 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
747 return -EIO;
748 }
749
1e59de90 750 if (wait_fd.revents > 0) {
7c673cae
FG
751 ret = clear_signal(signal_fd);
752 if (ret < 0) {
753 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
754 return ret;
755 }
756 }
7c673cae
FG
757 return 0;
758}
759
7c673cae
FG
760void *RGWHTTPManager::ReqsThread::entry()
761{
762 manager->reqs_thread_entry();
763 return NULL;
764}
765
766/*
767 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
768 */
769RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
9f95a23c 770 completion_mgr(_cm)
7c673cae
FG
771{
772 multi_handle = (void *)curl_multi_init();
773 thread_pipe[0] = -1;
774 thread_pipe[1] = -1;
775}
776
777RGWHTTPManager::~RGWHTTPManager() {
778 stop();
779 if (multi_handle)
780 curl_multi_cleanup((CURLM *)multi_handle);
781}
782
783void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
784{
9f95a23c 785 std::unique_lock rl{reqs_lock};
7c673cae
FG
786 req_data->id = num_reqs;
787 req_data->registered = true;
788 reqs[num_reqs] = req_data;
789 num_reqs++;
11fdf7f2 790 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
7c673cae
FG
791}
792
11fdf7f2 793bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
7c673cae 794{
9f95a23c 795 std::unique_lock rl{reqs_lock};
11fdf7f2
TL
796 if (!req_data->registered) {
797 return false;
798 }
7c673cae
FG
799 req_data->get();
800 req_data->registered = false;
801 unregistered_reqs.push_back(req_data);
11fdf7f2
TL
802 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
803 return true;
7c673cae
FG
804}
805
806void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
807{
9f95a23c 808 std::unique_lock rl{reqs_lock};
7c673cae
FG
809 _complete_request(req_data);
810}
811
812void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
813{
814 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
815 if (iter != reqs.end()) {
816 reqs.erase(iter);
817 }
818 {
9f95a23c 819 std::lock_guard l{req_data->lock};
7c673cae
FG
820 req_data->mgr = nullptr;
821 }
822 if (completion_mgr) {
11fdf7f2 823 completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
7c673cae
FG
824 }
825
826 req_data->put();
827}
828
9f95a23c 829void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status)
7c673cae 830{
9f95a23c 831 req_data->finish(ret, http_status);
7c673cae
FG
832 complete_request(req_data);
833}
834
835void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
836{
837 req_data->finish(ret);
838 _complete_request(req_data);
839}
840
11fdf7f2
TL
841void RGWHTTPManager::_set_req_state(set_state& ss)
842{
843 ss.req->set_state(ss.bitmask);
844}
7c673cae
FG
845/*
846 * hook request to the curl multi handle
847 */
848int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
849{
11fdf7f2
TL
850 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl;
851 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
7c673cae
FG
852 if (mstatus) {
853 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
854 return -EIO;
855 }
856 return 0;
857}
858
859/*
860 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
861 * there will be no more processing on this request
862 */
863void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
864{
11fdf7f2
TL
865 if (req_data->curl_handle) {
866 curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
7c673cae 867 }
eafe8130 868 if (!req_data->is_done()) {
7c673cae
FG
869 _finish_request(req_data, -ECANCELED);
870 }
871}
872
873void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
874{
9f95a23c 875 std::unique_lock wl{reqs_lock};
7c673cae
FG
876 _unlink_request(req_data);
877}
878
879void RGWHTTPManager::manage_pending_requests()
880{
9f95a23c 881 reqs_lock.lock_shared();
11fdf7f2
TL
882 if (max_threaded_req == num_reqs &&
883 unregistered_reqs.empty() &&
884 reqs_change_state.empty()) {
9f95a23c 885 reqs_lock.unlock_shared();
7c673cae
FG
886 return;
887 }
9f95a23c 888 reqs_lock.unlock_shared();
7c673cae 889
9f95a23c 890 std::unique_lock wl{reqs_lock};
7c673cae 891
f6b5b4d7
TL
892 if (!reqs_change_state.empty()) {
893 for (auto siter : reqs_change_state) {
894 _set_req_state(siter);
895 }
896 reqs_change_state.clear();
897 }
898
7c673cae
FG
899 if (!unregistered_reqs.empty()) {
900 for (auto& r : unregistered_reqs) {
901 _unlink_request(r);
902 r->put();
903 }
904
905 unregistered_reqs.clear();
906 }
907
908 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
909
910 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
911
912 for (; iter != reqs.end(); ++iter) {
913 rgw_http_req_data *req_data = iter->second;
914 int r = link_request(req_data);
915 if (r < 0) {
916 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
917 remove_reqs.push_back(std::make_pair(iter->second, r));
918 } else {
919 max_threaded_req = iter->first + 1;
920 }
921 }
922
923 for (auto piter : remove_reqs) {
924 rgw_http_req_data *req_data = piter.first;
925 int r = piter.second;
926
927 _finish_request(req_data, r);
928 }
929}
930
11fdf7f2 931int RGWHTTPManager::add_request(RGWHTTPClient *client)
7c673cae
FG
932{
933 rgw_http_req_data *req_data = new rgw_http_req_data;
934
11fdf7f2 935 int ret = client->init_request(req_data);
7c673cae
FG
936 if (ret < 0) {
937 req_data->put();
938 req_data = NULL;
939 return ret;
940 }
941
942 req_data->mgr = this;
943 req_data->client = client;
11fdf7f2
TL
944 req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
945 req_data->user_info = client->get_io_user_info();
7c673cae
FG
946
947 register_request(req_data);
948
11fdf7f2 949 if (!is_started) {
7c673cae
FG
950 ret = link_request(req_data);
951 if (ret < 0) {
952 req_data->put();
953 req_data = NULL;
954 }
955 return ret;
956 }
957 ret = signal_thread();
958 if (ret < 0) {
959 finish_request(req_data, ret);
960 }
961
962 return ret;
963}
964
965int RGWHTTPManager::remove_request(RGWHTTPClient *client)
966{
967 rgw_http_req_data *req_data = client->get_req_data();
968
11fdf7f2 969 if (!is_started) {
7c673cae
FG
970 unlink_request(req_data);
971 return 0;
972 }
11fdf7f2
TL
973 if (!unregister_request(req_data)) {
974 return 0;
975 }
7c673cae
FG
976 int ret = signal_thread();
977 if (ret < 0) {
978 return ret;
979 }
980
981 return 0;
982}
983
11fdf7f2 984int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state)
7c673cae 985{
11fdf7f2 986 rgw_http_req_data *req_data = client->get_req_data();
7c673cae 987
9f95a23c 988 ceph_assert(ceph_mutex_is_locked(req_data->lock));
7c673cae 989
11fdf7f2
TL
990 /* can only do that if threaded */
991 if (!is_started) {
992 return -EINVAL;
993 }
7c673cae 994
11fdf7f2
TL
995 bool suggested_wr_paused = req_data->write_paused;
996 bool suggested_rd_paused = req_data->read_paused;
997
998 switch (state) {
999 case SET_WRITE_PAUSED:
1000 suggested_wr_paused = true;
1001 break;
1002 case SET_WRITE_RESUME:
1003 suggested_wr_paused = false;
1004 break;
1005 case SET_READ_PAUSED:
1006 suggested_rd_paused = true;
1007 break;
1008 case SET_READ_RESUME:
1009 suggested_rd_paused = false;
1010 break;
1011 default:
1012 /* shouldn't really be here */
1013 return -EIO;
1014 }
1015 if (suggested_wr_paused == req_data->write_paused &&
1016 suggested_rd_paused == req_data->read_paused) {
1017 return 0;
1018 }
7c673cae 1019
11fdf7f2
TL
1020 req_data->write_paused = suggested_wr_paused;
1021 req_data->read_paused = suggested_rd_paused;
7c673cae 1022
11fdf7f2 1023 int bitmask = CURLPAUSE_CONT;
7c673cae 1024
11fdf7f2
TL
1025 if (req_data->write_paused) {
1026 bitmask |= CURLPAUSE_SEND;
1027 }
7c673cae 1028
11fdf7f2
TL
1029 if (req_data->read_paused) {
1030 bitmask |= CURLPAUSE_RECV;
1031 }
7c673cae 1032
11fdf7f2
TL
1033 reqs_change_state.push_back(set_state(req_data, bitmask));
1034 int ret = signal_thread();
1035 if (ret < 0) {
1036 return ret;
1037 }
7c673cae 1038
11fdf7f2 1039 return 0;
7c673cae
FG
1040}
1041
11fdf7f2 1042int RGWHTTPManager::start()
7c673cae 1043{
9f95a23c 1044 if (pipe_cloexec(thread_pipe, 0) < 0) {
91327a77
AA
1045 int e = errno;
1046 ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl;
1047 return -e;
7c673cae
FG
1048 }
1049
1050 // enable non-blocking reads
91327a77
AA
1051 if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) {
1052 int e = errno;
1053 ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl;
7c673cae
FG
1054 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1055 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
91327a77 1056 return -e;
7c673cae
FG
1057 }
1058
11fdf7f2 1059 is_started = true;
7c673cae
FG
1060 reqs_thread = new ReqsThread(this);
1061 reqs_thread->create("http_manager");
1062 return 0;
1063}
1064
1065void RGWHTTPManager::stop()
1066{
1067 if (is_stopped) {
1068 return;
1069 }
1070
1071 is_stopped = true;
1072
11fdf7f2 1073 if (is_started) {
7c673cae
FG
1074 going_down = true;
1075 signal_thread();
1076 reqs_thread->join();
1077 delete reqs_thread;
1078 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
1079 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
1080 }
1081}
1082
1083int RGWHTTPManager::signal_thread()
1084{
1085 uint32_t buf = 0;
1086 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
1087 if (ret < 0) {
1088 ret = -errno;
1089 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
1090 return ret;
1091 }
1092 return 0;
1093}
1094
1095void *RGWHTTPManager::reqs_thread_entry()
1096{
1097 int still_running;
1098 int mstatus;
1099
1100 ldout(cct, 20) << __func__ << ": start" << dendl;
1101
1102 while (!going_down) {
1103 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
1104 if (ret < 0) {
1105 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
1106 return NULL;
1107 }
1108
1109 manage_pending_requests();
1110
1111 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
1112 switch (mstatus) {
1113 case CURLM_OK:
1114 case CURLM_CALL_MULTI_PERFORM:
1115 break;
1116 default:
1117 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
1118 break;
1119 }
1120 int msgs_left;
1121 CURLMsg *msg;
1122 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
1123 if (msg->msg == CURLMSG_DONE) {
1124 int result = msg->data.result;
1125 CURL *e = msg->easy_handle;
1126 rgw_http_req_data *req_data;
1127 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
1128 curl_multi_remove_handle((CURLM *)multi_handle, e);
1129
1130 long http_status;
9f95a23c
TL
1131 int status;
1132 if (!req_data->user_ret) {
1133 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
1134
1135 status = rgw_http_error_to_errno(http_status);
1136 if (result != CURLE_OK && status == 0) {
1137 dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
1138 status = -EAGAIN;
1139 }
1140 } else {
1141 status = *req_data->user_ret;
1142 rgw_err err;
1143 set_req_state_err(err, status, 0);
1144 http_status = err.http_ret;
7c673cae
FG
1145 }
1146 int id = req_data->id;
9f95a23c 1147 finish_request(req_data, status, http_status);
7c673cae
FG
1148 switch (result) {
1149 case CURLE_OK:
1150 break;
1adf2230
AA
1151 case CURLE_OPERATION_TIMEDOUT:
1152 dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
1153 << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
7c673cae
FG
1154 default:
1155 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
522d829b 1156 dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << " req_data->error_buf=" << req_data->error_buf << dendl;
7c673cae
FG
1157 break;
1158 }
1159 }
1160 }
1161 }
1162
1163
9f95a23c 1164 std::unique_lock rl{reqs_lock};
7c673cae 1165 for (auto r : unregistered_reqs) {
91327a77 1166 _unlink_request(r);
7c673cae
FG
1167 }
1168
1169 unregistered_reqs.clear();
1170
1171 auto all_reqs = std::move(reqs);
1172 for (auto iter : all_reqs) {
91327a77 1173 _unlink_request(iter.second);
7c673cae
FG
1174 }
1175
1176 reqs.clear();
1177
1178 if (completion_mgr) {
1179 completion_mgr->go_down();
1180 }
1181
1182 return 0;
1183}
1184
11fdf7f2
TL
1185void rgw_http_client_init(CephContext *cct)
1186{
1187 curl_global_init(CURL_GLOBAL_ALL);
1188 rgw_http_manager = new RGWHTTPManager(cct);
1189 rgw_http_manager->start();
1190}
1191
1192void rgw_http_client_cleanup()
1193{
1194 rgw_http_manager->stop();
1195 delete rgw_http_manager;
1196 curl_global_cleanup();
1197}
1198
1199
1200int RGWHTTP::send(RGWHTTPClient *req) {
1201 if (!req) {
1202 return 0;
1203 }
1204 int r = rgw_http_manager->add_request(req);
1205 if (r < 0) {
1206 return r;
1207 }
1208
1209 return 0;
1210}
1211
eafe8130 1212int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) {
11fdf7f2
TL
1213 if (!req) {
1214 return 0;
1215 }
1216 int r = send(req);
1217 if (r < 0) {
1218 return r;
1219 }
1220
eafe8130 1221 return req->wait(y);
11fdf7f2 1222}
7c673cae 1223