]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_http_client.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_http_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5
6 #include <boost/utility/string_ref.hpp>
7
8 #include <curl/curl.h>
9 #include <curl/easy.h>
10 #include <curl/multi.h>
11
12 #include "rgw_common.h"
13 #include "rgw_http_client.h"
14 #include "rgw_http_errors.h"
15 #include "common/RefCountedObj.h"
16
17 #include "rgw_coroutine.h"
18
19 #include <atomic>
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 struct rgw_http_req_data : public RefCountedObject {
25 CURL *easy_handle;
26 curl_slist *h;
27 uint64_t id;
28 int ret;
29 std::atomic<bool> done = { false };
30 RGWHTTPClient *client;
31 void *user_info;
32 bool registered;
33 RGWHTTPManager *mgr;
34 char error_buf[CURL_ERROR_SIZE];
35
36 Mutex lock;
37 Cond cond;
38
39 rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0),
40 client(nullptr), user_info(nullptr), registered(false),
41 mgr(NULL), lock("rgw_http_req_data::lock") {
42 memset(error_buf, 0, sizeof(error_buf));
43 }
44
45 int wait() {
46 Mutex::Locker l(lock);
47 cond.Wait(lock);
48 return ret;
49 }
50
51
52 void finish(int r) {
53 Mutex::Locker l(lock);
54 ret = r;
55 if (easy_handle)
56 curl_easy_cleanup(easy_handle);
57
58 if (h)
59 curl_slist_free_all(h);
60
61 easy_handle = NULL;
62 h = NULL;
63 done = true;
64 cond.Signal();
65 }
66
67 bool is_done() {
68 return done;
69 }
70
71 int get_retcode() {
72 Mutex::Locker l(lock);
73 return ret;
74 }
75
76 RGWHTTPManager *get_manager() {
77 Mutex::Locker l(lock);
78 return mgr;
79 }
80 };
81
82 /*
83 * the simple set of callbacks will be called on RGWHTTPClient::process()
84 */
85 /* Static methods - callbacks for libcurl. */
86 size_t RGWHTTPClient::simple_receive_http_header(void * const ptr,
87 const size_t size,
88 const size_t nmemb,
89 void * const _info)
90 {
91 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
92 const size_t len = size * nmemb;
93 int ret = client->receive_header(ptr, size * nmemb);
94 if (ret < 0) {
95 dout(0) << "WARNING: client->receive_header() returned ret="
96 << ret << dendl;
97 }
98
99 return len;
100 }
101
102 size_t RGWHTTPClient::simple_receive_http_data(void * const ptr,
103 const size_t size,
104 const size_t nmemb,
105 void * const _info)
106 {
107 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
108 const size_t len = size * nmemb;
109 int ret = client->receive_data(ptr, size * nmemb);
110 if (ret < 0) {
111 dout(0) << "WARNING: client->receive_data() returned ret="
112 << ret << dendl;
113 }
114
115 return len;
116 }
117
118 size_t RGWHTTPClient::simple_send_http_data(void * const ptr,
119 const size_t size,
120 const size_t nmemb,
121 void * const _info)
122 {
123 RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info);
124 int ret = client->send_data(ptr, size * nmemb);
125 if (ret < 0) {
126 dout(0) << "WARNING: client->send_data() returned ret="
127 << ret << dendl;
128 }
129
130 return ret;
131 }
132
133 /*
134 * the following set of callbacks will be called either on RGWHTTPManager::process(),
135 * or via the RGWHTTPManager async processing.
136 */
137 size_t RGWHTTPClient::receive_http_header(void * const ptr,
138 const size_t size,
139 const size_t nmemb,
140 void * const _info)
141 {
142 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
143 size_t len = size * nmemb;
144
145 Mutex::Locker l(req_data->lock);
146
147 if (!req_data->registered) {
148 return len;
149 }
150
151 int ret = req_data->client->receive_header(ptr, size * nmemb);
152 if (ret < 0) {
153 dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl;
154 }
155
156 return len;
157 }
158
159 size_t RGWHTTPClient::receive_http_data(void * const ptr,
160 const size_t size,
161 const size_t nmemb,
162 void * const _info)
163 {
164 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
165 size_t len = size * nmemb;
166
167 Mutex::Locker l(req_data->lock);
168
169 if (!req_data->registered) {
170 return len;
171 }
172
173 int ret = req_data->client->receive_data(ptr, size * nmemb);
174 if (ret < 0) {
175 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
176 }
177
178 return len;
179 }
180
181 size_t RGWHTTPClient::send_http_data(void * const ptr,
182 const size_t size,
183 const size_t nmemb,
184 void * const _info)
185 {
186 rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info);
187
188 Mutex::Locker l(req_data->lock);
189
190 if (!req_data->registered) {
191 return 0;
192 }
193
194 int ret = req_data->client->send_data(ptr, size * nmemb);
195 if (ret < 0) {
196 dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl;
197 }
198
199 return ret;
200 }
201
202 static curl_slist *headers_to_slist(param_vec_t& headers)
203 {
204 curl_slist *h = NULL;
205
206 param_vec_t::iterator iter;
207 for (iter = headers.begin(); iter != headers.end(); ++iter) {
208 pair<string, string>& p = *iter;
209 string val = p.first;
210
211 if (strncmp(val.c_str(), "HTTP_", 5) == 0) {
212 val = val.substr(5);
213 }
214
215 /* we need to convert all underscores into dashes as some web servers forbid them
216 * in the http header field names
217 */
218 for (size_t i = 0; i < val.size(); i++) {
219 if (val[i] == '_') {
220 val[i] = '-';
221 }
222 }
223
224 val.append(": ");
225 val.append(p.second);
226 h = curl_slist_append(h, val.c_str());
227 }
228
229 return h;
230 }
231
232 static bool is_upload_request(const char *method)
233 {
234 if (method == nullptr) {
235 return false;
236 }
237 return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0;
238 }
239
240 /*
241 * process a single simple one off request, not going through RGWHTTPManager. Not using
242 * req_data.
243 */
244 int RGWHTTPClient::process(const char *method, const char *url)
245 {
246 int ret = 0;
247 CURL *curl_handle;
248
249 char error_buf[CURL_ERROR_SIZE];
250
251 last_method = (method ? method : "");
252 last_url = (url ? url : "");
253
254 curl_handle = curl_easy_init();
255
256 dout(20) << "sending request to " << url << dendl;
257
258 curl_slist *h = headers_to_slist(headers);
259
260 curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method);
261 curl_easy_setopt(curl_handle, CURLOPT_URL, url);
262 curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L);
263 curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L);
264 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header);
265 curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this);
266 curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data);
267 curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this);
268 curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf);
269 if (h) {
270 curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h);
271 }
272 curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data);
273 curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this);
274 if (is_upload_request(method)) {
275 curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L);
276 }
277 if (has_send_len) {
278 curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len);
279 }
280 if (!verify_ssl) {
281 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L);
282 curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L);
283 dout(20) << "ssl verification is set to off" << dendl;
284 }
285
286 CURLcode status = curl_easy_perform(curl_handle);
287 if (status) {
288 dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl;
289 ret = -EINVAL;
290 }
291 curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status);
292 curl_easy_cleanup(curl_handle);
293 curl_slist_free_all(h);
294
295 return ret;
296 }
297
298 string RGWHTTPClient::to_str()
299 {
300 string method_str = (last_method.empty() ? "<no-method>" : last_method);
301 string url_str = (last_url.empty() ? "<no-url>" : last_url);
302 return method_str + " " + url_str;
303 }
304
305 int RGWHTTPClient::get_req_retcode()
306 {
307 if (!req_data) {
308 return -EINVAL;
309 }
310
311 return req_data->get_retcode();
312 }
313
314 /*
315 * init request, will be used later with RGWHTTPManager
316 */
317 int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data)
318 {
319 assert(!req_data);
320 _req_data->get();
321 req_data = _req_data;
322
323 CURL *easy_handle;
324
325 easy_handle = curl_easy_init();
326
327 req_data->easy_handle = easy_handle;
328
329 dout(20) << "sending request to " << url << dendl;
330
331 curl_slist *h = headers_to_slist(headers);
332
333 req_data->h = h;
334
335 last_method = (method ? method : "");
336 last_url = (url ? url : "");
337
338 curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method);
339 curl_easy_setopt(easy_handle, CURLOPT_URL, url);
340 curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
341 curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
342 curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
343 curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
344 curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
345 curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
346 curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
347 if (h) {
348 curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
349 }
350 curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
351 curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
352 if (is_upload_request(method)) {
353 curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
354 }
355 if (has_send_len) {
356 curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len);
357 }
358 curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
359
360 return 0;
361 }
362
363 /*
364 * wait for async request to complete
365 */
366 int RGWHTTPClient::wait()
367 {
368 if (!req_data->is_done()) {
369 return req_data->wait();
370 }
371
372 return req_data->ret;
373 }
374
375 RGWHTTPClient::~RGWHTTPClient()
376 {
377 if (req_data) {
378 RGWHTTPManager *http_manager = req_data->get_manager();
379 if (http_manager) {
380 http_manager->remove_request(this);
381 }
382
383 req_data->put();
384 }
385 }
386
387
388 int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len)
389 {
390 const boost::string_ref header_line(static_cast<const char * const>(ptr), len);
391
392 /* We're tokening the line that way due to backward compatibility. */
393 const size_t sep_loc = header_line.find_first_of(" \t:");
394
395 if (boost::string_ref::npos == sep_loc) {
396 /* Wrongly formatted header? Just skip it. */
397 return 0;
398 }
399
400 header_name_t name(header_line.substr(0, sep_loc));
401 if (0 == relevant_headers.count(name)) {
402 /* Not interested in this particular header. */
403 return 0;
404 }
405
406 const auto value_part = header_line.substr(sep_loc + 1);
407
408 /* Skip spaces and tabs after the separator. */
409 const size_t val_loc_s = value_part.find_first_not_of(' ');
410 const size_t val_loc_e = value_part.find_first_of("\r\n");
411
412 if (boost::string_ref::npos == val_loc_s ||
413 boost::string_ref::npos == val_loc_e) {
414 /* Empty value case. */
415 found_headers.emplace(name, header_value_t());
416 } else {
417 found_headers.emplace(name, header_value_t(
418 value_part.substr(val_loc_s, val_loc_e - val_loc_s)));
419 }
420
421 return 0;
422 }
423
424 int RGWHTTPTransceiver::send_data(void* ptr, size_t len)
425 {
426 int length_to_copy = 0;
427 if (post_data_index < post_data.length()) {
428 length_to_copy = min(post_data.length() - post_data_index, len);
429 memcpy(ptr, post_data.data() + post_data_index, length_to_copy);
430 post_data_index += length_to_copy;
431 }
432 return length_to_copy;
433 }
434
435
436 static int clear_signal(int fd)
437 {
438 // since we're in non-blocking mode, we can try to read a lot more than
439 // one signal from signal_thread() to avoid later wakeups. non-blocking reads
440 // are also required to support the curl_multi_wait bug workaround
441 std::array<char, 256> buf;
442 int ret = ::read(fd, (void *)buf.data(), buf.size());
443 if (ret < 0) {
444 ret = -errno;
445 return ret == -EAGAIN ? 0 : ret; // clear EAGAIN
446 }
447 return 0;
448 }
449
450 #if HAVE_CURL_MULTI_WAIT
451
452 static std::once_flag detect_flag;
453 static bool curl_multi_wait_bug_present = false;
454
455 static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle,
456 int write_fd, int read_fd)
457 {
458 int ret = 0;
459
460 // write to write_fd so that read_fd becomes readable
461 uint32_t buf = 0;
462 ret = ::write(write_fd, &buf, sizeof(buf));
463 if (ret < 0) {
464 ret = -errno;
465 ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl;
466 return ret;
467 }
468
469 // pass read_fd in extra_fds for curl_multi_wait()
470 int num_fds;
471 struct curl_waitfd wait_fd;
472
473 wait_fd.fd = read_fd;
474 wait_fd.events = CURL_WAIT_POLLIN;
475 wait_fd.revents = 0;
476
477 ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds);
478 if (ret != CURLM_OK) {
479 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
480 return -EIO;
481 }
482
483 // curl_multi_wait should flag revents when extra_fd is readable. if it
484 // doesn't, the bug is present and we can't rely on revents
485 if (wait_fd.revents == 0) {
486 curl_multi_wait_bug_present = true;
487 ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a "
488 "bug in curl_multi_wait(). enabling a workaround that may degrade "
489 "performance slightly." << dendl;
490 }
491
492 return clear_signal(read_fd);
493 }
494
495 static bool is_signaled(const curl_waitfd& wait_fd)
496 {
497 if (wait_fd.fd < 0) {
498 // no fd to signal
499 return false;
500 }
501
502 if (curl_multi_wait_bug_present) {
503 // we can't rely on revents, so we always return true if a wait_fd is given.
504 // this means we'll be trying a non-blocking read on this fd every time that
505 // curl_multi_wait() wakes up
506 return true;
507 }
508
509 return wait_fd.revents > 0;
510 }
511
512 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
513 {
514 int num_fds;
515 struct curl_waitfd wait_fd;
516
517 wait_fd.fd = signal_fd;
518 wait_fd.events = CURL_WAIT_POLLIN;
519 wait_fd.revents = 0;
520
521 int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds);
522 if (ret) {
523 ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl;
524 return -EIO;
525 }
526
527 if (is_signaled(wait_fd)) {
528 ret = clear_signal(signal_fd);
529 if (ret < 0) {
530 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
531 return ret;
532 }
533 }
534 return 0;
535 }
536
537 #else
538
539 static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
540 {
541 fd_set fdread;
542 fd_set fdwrite;
543 fd_set fdexcep;
544 int maxfd = -1;
545
546 FD_ZERO(&fdread);
547 FD_ZERO(&fdwrite);
548 FD_ZERO(&fdexcep);
549
550 /* get file descriptors from the transfers */
551 int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
552 if (ret) {
553 ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
554 return -EIO;
555 }
556
557 if (signal_fd > 0) {
558 FD_SET(signal_fd, &fdread);
559 if (signal_fd >= maxfd) {
560 maxfd = signal_fd + 1;
561 }
562 }
563
564 /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */
565 uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
566 #define RGW_CURL_TIMEOUT 1000
567 if (!to)
568 to = RGW_CURL_TIMEOUT;
569 struct timeval timeout;
570 timeout.tv_sec = to / 1000;
571 timeout.tv_usec = to % 1000;
572
573 ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
574 if (ret < 0) {
575 ret = -errno;
576 ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
577 return ret;
578 }
579
580 if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
581 ret = clear_signal(signal_fd);
582 if (ret < 0) {
583 ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
584 return ret;
585 }
586 }
587
588 return 0;
589 }
590
591 #endif
592
593 void *RGWHTTPManager::ReqsThread::entry()
594 {
595 manager->reqs_thread_entry();
596 return NULL;
597 }
598
599 /*
600 * RGWHTTPManager has two modes of operation: threaded and non-threaded.
601 */
602 RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct),
603 completion_mgr(_cm), is_threaded(false),
604 reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0),
605 reqs_thread(NULL)
606 {
607 multi_handle = (void *)curl_multi_init();
608 thread_pipe[0] = -1;
609 thread_pipe[1] = -1;
610 }
611
612 RGWHTTPManager::~RGWHTTPManager() {
613 stop();
614 if (multi_handle)
615 curl_multi_cleanup((CURLM *)multi_handle);
616 }
617
618 void RGWHTTPManager::register_request(rgw_http_req_data *req_data)
619 {
620 RWLock::WLocker rl(reqs_lock);
621 req_data->id = num_reqs;
622 req_data->registered = true;
623 reqs[num_reqs] = req_data;
624 num_reqs++;
625 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
626 }
627
628 void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data)
629 {
630 RWLock::WLocker rl(reqs_lock);
631 req_data->get();
632 req_data->registered = false;
633 unregistered_reqs.push_back(req_data);
634 ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
635 }
636
637 void RGWHTTPManager::complete_request(rgw_http_req_data *req_data)
638 {
639 RWLock::WLocker rl(reqs_lock);
640 _complete_request(req_data);
641 }
642
643 void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
644 {
645 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
646 if (iter != reqs.end()) {
647 reqs.erase(iter);
648 }
649 {
650 Mutex::Locker l(req_data->lock);
651 req_data->mgr = nullptr;
652 }
653 if (completion_mgr) {
654 completion_mgr->complete(NULL, req_data->user_info);
655 }
656
657 req_data->put();
658 }
659
660 void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret)
661 {
662 req_data->finish(ret);
663 complete_request(req_data);
664 }
665
666 void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret)
667 {
668 req_data->finish(ret);
669 _complete_request(req_data);
670 }
671
672 /*
673 * hook request to the curl multi handle
674 */
675 int RGWHTTPManager::link_request(rgw_http_req_data *req_data)
676 {
677 ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl;
678 CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle);
679 if (mstatus) {
680 dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl;
681 return -EIO;
682 }
683 return 0;
684 }
685
686 /*
687 * unhook request from the curl multi handle, and finish request if it wasn't finished yet as
688 * there will be no more processing on this request
689 */
690 void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data)
691 {
692 if (req_data->easy_handle) {
693 curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle);
694 }
695 if (!req_data->is_done()) {
696 _finish_request(req_data, -ECANCELED);
697 }
698 }
699
700 void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data)
701 {
702 RWLock::WLocker wl(reqs_lock);
703 _unlink_request(req_data);
704 }
705
706 void RGWHTTPManager::manage_pending_requests()
707 {
708 reqs_lock.get_read();
709 if (max_threaded_req == num_reqs && unregistered_reqs.empty()) {
710 reqs_lock.unlock();
711 return;
712 }
713 reqs_lock.unlock();
714
715 RWLock::WLocker wl(reqs_lock);
716
717 if (!unregistered_reqs.empty()) {
718 for (auto& r : unregistered_reqs) {
719 _unlink_request(r);
720 r->put();
721 }
722
723 unregistered_reqs.clear();
724 }
725
726 map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
727
728 list<std::pair<rgw_http_req_data *, int> > remove_reqs;
729
730 for (; iter != reqs.end(); ++iter) {
731 rgw_http_req_data *req_data = iter->second;
732 int r = link_request(req_data);
733 if (r < 0) {
734 ldout(cct, 0) << "ERROR: failed to link http request" << dendl;
735 remove_reqs.push_back(std::make_pair(iter->second, r));
736 } else {
737 max_threaded_req = iter->first + 1;
738 }
739 }
740
741 for (auto piter : remove_reqs) {
742 rgw_http_req_data *req_data = piter.first;
743 int r = piter.second;
744
745 _finish_request(req_data, r);
746 }
747 }
748
749 int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url)
750 {
751 rgw_http_req_data *req_data = new rgw_http_req_data;
752
753 int ret = client->init_request(method, url, req_data);
754 if (ret < 0) {
755 req_data->put();
756 req_data = NULL;
757 return ret;
758 }
759
760 req_data->mgr = this;
761 req_data->client = client;
762 req_data->user_info = client->get_user_info();
763
764 register_request(req_data);
765
766 if (!is_threaded) {
767 ret = link_request(req_data);
768 if (ret < 0) {
769 req_data->put();
770 req_data = NULL;
771 }
772 return ret;
773 }
774 ret = signal_thread();
775 if (ret < 0) {
776 finish_request(req_data, ret);
777 }
778
779 return ret;
780 }
781
782 int RGWHTTPManager::remove_request(RGWHTTPClient *client)
783 {
784 rgw_http_req_data *req_data = client->get_req_data();
785
786 if (!is_threaded) {
787 unlink_request(req_data);
788 return 0;
789 }
790 unregister_request(req_data);
791 int ret = signal_thread();
792 if (ret < 0) {
793 return ret;
794 }
795
796 return 0;
797 }
798
799 /*
800 * the synchronous, non-threaded request processing method.
801 */
802 int RGWHTTPManager::process_requests(bool wait_for_data, bool *done)
803 {
804 assert(!is_threaded);
805
806 int still_running;
807 int mstatus;
808
809 do {
810 if (wait_for_data) {
811 int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1);
812 if (ret < 0) {
813 return ret;
814 }
815 }
816
817 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
818 switch (mstatus) {
819 case CURLM_OK:
820 case CURLM_CALL_MULTI_PERFORM:
821 break;
822 default:
823 dout(20) << "curl_multi_perform returned: " << mstatus << dendl;
824 return -EINVAL;
825 }
826 int msgs_left;
827 CURLMsg *msg;
828 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
829 if (msg->msg == CURLMSG_DONE) {
830 CURL *e = msg->easy_handle;
831 rgw_http_req_data *req_data;
832 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
833
834 long http_status;
835 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
836
837 int status = rgw_http_error_to_errno(http_status);
838 int result = msg->data.result;
839 finish_request(req_data, status);
840 switch (result) {
841 case CURLE_OK:
842 break;
843 default:
844 dout(20) << "ERROR: msg->data.result=" << result << dendl;
845 return -EIO;
846 }
847 }
848 }
849 } while (mstatus == CURLM_CALL_MULTI_PERFORM);
850
851 *done = (still_running == 0);
852
853 return 0;
854 }
855
856 /*
857 * the synchronous, non-threaded request processing completion method.
858 */
859 int RGWHTTPManager::complete_requests()
860 {
861 bool done = false;
862 int ret;
863 do {
864 ret = process_requests(true, &done);
865 } while (!done && !ret);
866
867 return ret;
868 }
869
870 int RGWHTTPManager::set_threaded()
871 {
872 int r = pipe(thread_pipe);
873 if (r < 0) {
874 r = -errno;
875 ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl;
876 return r;
877 }
878
879 // enable non-blocking reads
880 r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK);
881 if (r < 0) {
882 r = -errno;
883 ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl;
884 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
885 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
886 return r;
887 }
888
889 #ifdef HAVE_CURL_MULTI_WAIT
890 // on first initialization, use this pipe to detect whether we're using a
891 // buggy version of libcurl
892 std::call_once(detect_flag, detect_curl_multi_wait_bug, cct,
893 static_cast<CURLM*>(multi_handle),
894 thread_pipe[1], thread_pipe[0]);
895 #endif
896
897 is_threaded = true;
898 reqs_thread = new ReqsThread(this);
899 reqs_thread->create("http_manager");
900 return 0;
901 }
902
903 void RGWHTTPManager::stop()
904 {
905 if (is_stopped) {
906 return;
907 }
908
909 is_stopped = true;
910
911 if (is_threaded) {
912 going_down = true;
913 signal_thread();
914 reqs_thread->join();
915 delete reqs_thread;
916 TEMP_FAILURE_RETRY(::close(thread_pipe[1]));
917 TEMP_FAILURE_RETRY(::close(thread_pipe[0]));
918 }
919 }
920
921 int RGWHTTPManager::signal_thread()
922 {
923 uint32_t buf = 0;
924 int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf));
925 if (ret < 0) {
926 ret = -errno;
927 ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl;
928 return ret;
929 }
930 return 0;
931 }
932
933 void *RGWHTTPManager::reqs_thread_entry()
934 {
935 int still_running;
936 int mstatus;
937
938 ldout(cct, 20) << __func__ << ": start" << dendl;
939
940 while (!going_down) {
941 int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
942 if (ret < 0) {
943 dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl;
944 return NULL;
945 }
946
947 manage_pending_requests();
948
949 mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
950 switch (mstatus) {
951 case CURLM_OK:
952 case CURLM_CALL_MULTI_PERFORM:
953 break;
954 default:
955 dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
956 break;
957 }
958 int msgs_left;
959 CURLMsg *msg;
960 while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
961 if (msg->msg == CURLMSG_DONE) {
962 int result = msg->data.result;
963 CURL *e = msg->easy_handle;
964 rgw_http_req_data *req_data;
965 curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
966 curl_multi_remove_handle((CURLM *)multi_handle, e);
967
968 long http_status;
969 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
970
971 int status = rgw_http_error_to_errno(http_status);
972 if (result != CURLE_OK && http_status == 0) {
973 status = -EAGAIN;
974 }
975 int id = req_data->id;
976 finish_request(req_data, status);
977 switch (result) {
978 case CURLE_OK:
979 break;
980 default:
981 dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
982 break;
983 }
984 }
985 }
986 }
987
988
989 RWLock::WLocker rl(reqs_lock);
990 for (auto r : unregistered_reqs) {
991 _finish_request(r, -ECANCELED);
992 }
993
994 unregistered_reqs.clear();
995
996 auto all_reqs = std::move(reqs);
997 for (auto iter : all_reqs) {
998 _finish_request(iter.second, -ECANCELED);
999 }
1000
1001 reqs.clear();
1002
1003 if (completion_mgr) {
1004 completion_mgr->go_down();
1005 }
1006
1007 return 0;
1008 }
1009
1010