]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/compat.h" | |
5 | ||
6 | #include <boost/utility/string_ref.hpp> | |
7 | ||
8 | #include <curl/curl.h> | |
9 | #include <curl/easy.h> | |
10 | #include <curl/multi.h> | |
11 | ||
12 | #include "rgw_common.h" | |
13 | #include "rgw_http_client.h" | |
14 | #include "rgw_http_errors.h" | |
15 | #include "common/RefCountedObj.h" | |
16 | ||
17 | #include "rgw_coroutine.h" | |
18 | ||
19 | #include <atomic> | |
20 | ||
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_rgw | |
23 | ||
24 | struct rgw_http_req_data : public RefCountedObject { | |
25 | CURL *easy_handle; | |
26 | curl_slist *h; | |
27 | uint64_t id; | |
28 | int ret; | |
29 | std::atomic<bool> done = { false }; | |
30 | RGWHTTPClient *client; | |
31 | void *user_info; | |
32 | bool registered; | |
33 | RGWHTTPManager *mgr; | |
34 | char error_buf[CURL_ERROR_SIZE]; | |
35 | ||
36 | Mutex lock; | |
37 | Cond cond; | |
38 | ||
39 | rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0), | |
40 | client(nullptr), user_info(nullptr), registered(false), | |
41 | mgr(NULL), lock("rgw_http_req_data::lock") { | |
42 | memset(error_buf, 0, sizeof(error_buf)); | |
43 | } | |
44 | ||
45 | int wait() { | |
46 | Mutex::Locker l(lock); | |
47 | cond.Wait(lock); | |
48 | return ret; | |
49 | } | |
50 | ||
51 | ||
52 | void finish(int r) { | |
53 | Mutex::Locker l(lock); | |
54 | ret = r; | |
55 | if (easy_handle) | |
56 | curl_easy_cleanup(easy_handle); | |
57 | ||
58 | if (h) | |
59 | curl_slist_free_all(h); | |
60 | ||
61 | easy_handle = NULL; | |
62 | h = NULL; | |
63 | done = true; | |
64 | cond.Signal(); | |
65 | } | |
66 | ||
67 | bool is_done() { | |
68 | return done; | |
69 | } | |
70 | ||
71 | int get_retcode() { | |
72 | Mutex::Locker l(lock); | |
73 | return ret; | |
74 | } | |
75 | ||
76 | RGWHTTPManager *get_manager() { | |
77 | Mutex::Locker l(lock); | |
78 | return mgr; | |
79 | } | |
80 | }; | |
81 | ||
94b18763 FG |
82 | struct RGWCurlHandle { |
83 | int uses; | |
84 | mono_time lastuse; | |
85 | CURL* h; | |
86 | ||
87 | RGWCurlHandle(CURL* h) : uses(0), h(h) {}; | |
88 | CURL* operator*() { | |
89 | return this->h; | |
90 | } | |
91 | }; | |
92 | ||
93 | #define MAXIDLE 5 | |
94 | class RGWCurlHandles : public Thread { | |
95 | public: | |
96 | Mutex cleaner_lock; | |
97 | std::vector<RGWCurlHandle*>saved_curl; | |
98 | int cleaner_shutdown; | |
99 | Cond cleaner_cond; | |
100 | ||
101 | RGWCurlHandles() : | |
102 | cleaner_lock{"RGWCurlHandles::cleaner_lock"}, | |
103 | cleaner_shutdown{0} { | |
104 | } | |
105 | ||
106 | RGWCurlHandle* get_curl_handle(); | |
107 | void release_curl_handle_now(RGWCurlHandle* curl); | |
108 | void release_curl_handle(RGWCurlHandle* curl); | |
109 | void flush_curl_handles(); | |
110 | void* entry(); | |
111 | void stop(); | |
112 | }; | |
113 | ||
114 | RGWCurlHandle* RGWCurlHandles::get_curl_handle() { | |
115 | RGWCurlHandle* curl = 0; | |
116 | CURL* h; | |
117 | { | |
118 | Mutex::Locker lock(cleaner_lock); | |
119 | if (!saved_curl.empty()) { | |
120 | curl = *saved_curl.begin(); | |
121 | saved_curl.erase(saved_curl.begin()); | |
122 | } | |
123 | } | |
124 | if (curl) { | |
125 | } else if ((h = curl_easy_init())) { | |
126 | curl = new RGWCurlHandle{h}; | |
127 | } else { | |
128 | // curl = 0; | |
129 | } | |
130 | return curl; | |
131 | } | |
132 | ||
133 | void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl) | |
134 | { | |
135 | curl_easy_cleanup(**curl); | |
136 | delete curl; | |
137 | } | |
138 | ||
139 | void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl) | |
140 | { | |
141 | if (cleaner_shutdown) { | |
142 | release_curl_handle_now(curl); | |
143 | } else { | |
144 | curl_easy_reset(**curl); | |
145 | Mutex::Locker lock(cleaner_lock); | |
146 | curl->lastuse = mono_clock::now(); | |
147 | saved_curl.insert(saved_curl.begin(), 1, curl); | |
148 | } | |
149 | } | |
150 | ||
151 | void* RGWCurlHandles::entry() | |
152 | { | |
153 | RGWCurlHandle* curl; | |
154 | Mutex::Locker lock(cleaner_lock); | |
155 | ||
156 | for (;;) { | |
157 | if (cleaner_shutdown) { | |
158 | if (saved_curl.empty()) | |
159 | break; | |
160 | } else { | |
161 | utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0); | |
162 | cleaner_cond.WaitUntil(cleaner_lock, release); | |
163 | } | |
164 | mono_time now = mono_clock::now(); | |
165 | while (!saved_curl.empty()) { | |
166 | auto cend = saved_curl.end(); | |
167 | --cend; | |
168 | curl = *cend; | |
169 | if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE)) | |
170 | break; | |
171 | saved_curl.erase(cend); | |
172 | release_curl_handle_now(curl); | |
173 | } | |
174 | } | |
175 | return nullptr; | |
176 | } | |
177 | ||
178 | void RGWCurlHandles::stop() | |
179 | { | |
180 | Mutex::Locker lock(cleaner_lock); | |
181 | cleaner_shutdown = 1; | |
182 | cleaner_cond.Signal(); | |
183 | } | |
184 | ||
185 | void RGWCurlHandles::flush_curl_handles() | |
186 | { | |
187 | stop(); | |
188 | join(); | |
189 | if (!saved_curl.empty()) { | |
190 | dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl; | |
191 | } | |
192 | saved_curl.shrink_to_fit(); | |
193 | } | |
194 | ||
195 | static RGWCurlHandles *handles; | |
196 | // XXX make this part of the token cache? (but that's swift-only; | |
197 | // and this especially needs to integrates with s3...) | |
198 | ||
199 | void rgw_setup_saved_curl_handles() | |
200 | { | |
201 | handles = new RGWCurlHandles(); | |
202 | handles->create("rgw_curl"); | |
203 | } | |
204 | ||
205 | void rgw_release_all_curl_handles() | |
206 | { | |
207 | handles->flush_curl_handles(); | |
208 | delete handles; | |
209 | } | |
210 | ||
7c673cae FG |
211 | /* |
212 | * the simple set of callbacks will be called on RGWHTTPClient::process() | |
213 | */ | |
214 | /* Static methods - callbacks for libcurl. */ | |
215 | size_t RGWHTTPClient::simple_receive_http_header(void * const ptr, | |
216 | const size_t size, | |
217 | const size_t nmemb, | |
218 | void * const _info) | |
219 | { | |
220 | RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info); | |
221 | const size_t len = size * nmemb; | |
222 | int ret = client->receive_header(ptr, size * nmemb); | |
223 | if (ret < 0) { | |
224 | dout(0) << "WARNING: client->receive_header() returned ret=" | |
225 | << ret << dendl; | |
226 | } | |
227 | ||
228 | return len; | |
229 | } | |
230 | ||
231 | size_t RGWHTTPClient::simple_receive_http_data(void * const ptr, | |
232 | const size_t size, | |
233 | const size_t nmemb, | |
234 | void * const _info) | |
235 | { | |
236 | RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info); | |
237 | const size_t len = size * nmemb; | |
238 | int ret = client->receive_data(ptr, size * nmemb); | |
239 | if (ret < 0) { | |
240 | dout(0) << "WARNING: client->receive_data() returned ret=" | |
241 | << ret << dendl; | |
242 | } | |
243 | ||
244 | return len; | |
245 | } | |
246 | ||
247 | size_t RGWHTTPClient::simple_send_http_data(void * const ptr, | |
248 | const size_t size, | |
249 | const size_t nmemb, | |
250 | void * const _info) | |
251 | { | |
252 | RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info); | |
253 | int ret = client->send_data(ptr, size * nmemb); | |
254 | if (ret < 0) { | |
255 | dout(0) << "WARNING: client->send_data() returned ret=" | |
256 | << ret << dendl; | |
257 | } | |
258 | ||
259 | return ret; | |
260 | } | |
261 | ||
262 | /* | |
263 | * the following set of callbacks will be called either on RGWHTTPManager::process(), | |
264 | * or via the RGWHTTPManager async processing. | |
265 | */ | |
266 | size_t RGWHTTPClient::receive_http_header(void * const ptr, | |
267 | const size_t size, | |
268 | const size_t nmemb, | |
269 | void * const _info) | |
270 | { | |
271 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
272 | size_t len = size * nmemb; | |
273 | ||
274 | Mutex::Locker l(req_data->lock); | |
275 | ||
276 | if (!req_data->registered) { | |
277 | return len; | |
278 | } | |
279 | ||
280 | int ret = req_data->client->receive_header(ptr, size * nmemb); | |
281 | if (ret < 0) { | |
282 | dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl; | |
283 | } | |
284 | ||
285 | return len; | |
286 | } | |
287 | ||
288 | size_t RGWHTTPClient::receive_http_data(void * const ptr, | |
289 | const size_t size, | |
290 | const size_t nmemb, | |
291 | void * const _info) | |
292 | { | |
293 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
294 | size_t len = size * nmemb; | |
295 | ||
296 | Mutex::Locker l(req_data->lock); | |
297 | ||
298 | if (!req_data->registered) { | |
299 | return len; | |
300 | } | |
301 | ||
302 | int ret = req_data->client->receive_data(ptr, size * nmemb); | |
303 | if (ret < 0) { | |
304 | dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; | |
305 | } | |
306 | ||
307 | return len; | |
308 | } | |
309 | ||
310 | size_t RGWHTTPClient::send_http_data(void * const ptr, | |
311 | const size_t size, | |
312 | const size_t nmemb, | |
313 | void * const _info) | |
314 | { | |
315 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
316 | ||
317 | Mutex::Locker l(req_data->lock); | |
318 | ||
319 | if (!req_data->registered) { | |
320 | return 0; | |
321 | } | |
322 | ||
323 | int ret = req_data->client->send_data(ptr, size * nmemb); | |
324 | if (ret < 0) { | |
325 | dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; | |
326 | } | |
327 | ||
328 | return ret; | |
329 | } | |
330 | ||
331 | static curl_slist *headers_to_slist(param_vec_t& headers) | |
332 | { | |
333 | curl_slist *h = NULL; | |
334 | ||
335 | param_vec_t::iterator iter; | |
336 | for (iter = headers.begin(); iter != headers.end(); ++iter) { | |
337 | pair<string, string>& p = *iter; | |
338 | string val = p.first; | |
339 | ||
340 | if (strncmp(val.c_str(), "HTTP_", 5) == 0) { | |
341 | val = val.substr(5); | |
342 | } | |
343 | ||
344 | /* we need to convert all underscores into dashes as some web servers forbid them | |
345 | * in the http header field names | |
346 | */ | |
347 | for (size_t i = 0; i < val.size(); i++) { | |
348 | if (val[i] == '_') { | |
349 | val[i] = '-'; | |
350 | } | |
351 | } | |
352 | ||
28e407b8 AA |
353 | // curl won't send headers with empty values unless it ends with a ; instead |
354 | if (p.second.empty()) { | |
355 | val.append(1, ';'); | |
356 | } else { | |
357 | val.append(": "); | |
358 | val.append(p.second); | |
359 | } | |
7c673cae FG |
360 | h = curl_slist_append(h, val.c_str()); |
361 | } | |
362 | ||
363 | return h; | |
364 | } | |
365 | ||
366 | static bool is_upload_request(const char *method) | |
367 | { | |
368 | if (method == nullptr) { | |
369 | return false; | |
370 | } | |
371 | return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0; | |
372 | } | |
373 | ||
374 | /* | |
375 | * process a single simple one off request, not going through RGWHTTPManager. Not using | |
376 | * req_data. | |
377 | */ | |
378 | int RGWHTTPClient::process(const char *method, const char *url) | |
379 | { | |
380 | int ret = 0; | |
381 | CURL *curl_handle; | |
382 | ||
383 | char error_buf[CURL_ERROR_SIZE]; | |
384 | ||
385 | last_method = (method ? method : ""); | |
386 | last_url = (url ? url : ""); | |
387 | ||
94b18763 FG |
388 | auto ca = handles->get_curl_handle(); |
389 | curl_handle = **ca; | |
7c673cae FG |
390 | |
391 | dout(20) << "sending request to " << url << dendl; | |
392 | ||
393 | curl_slist *h = headers_to_slist(headers); | |
394 | ||
395 | curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method); | |
396 | curl_easy_setopt(curl_handle, CURLOPT_URL, url); | |
397 | curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L); | |
398 | curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L); | |
399 | curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header); | |
400 | curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this); | |
401 | curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data); | |
402 | curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this); | |
403 | curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); | |
404 | if (h) { | |
405 | curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h); | |
406 | } | |
407 | curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data); | |
408 | curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this); | |
409 | if (is_upload_request(method)) { | |
410 | curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); | |
411 | } | |
412 | if (has_send_len) { | |
413 | curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); | |
414 | } | |
415 | if (!verify_ssl) { | |
416 | curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L); | |
417 | curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L); | |
418 | dout(20) << "ssl verification is set to off" << dendl; | |
419 | } | |
420 | ||
421 | CURLcode status = curl_easy_perform(curl_handle); | |
422 | if (status) { | |
423 | dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl; | |
424 | ret = -EINVAL; | |
425 | } | |
426 | curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status); | |
94b18763 | 427 | handles->release_curl_handle(ca); |
7c673cae FG |
428 | curl_slist_free_all(h); |
429 | ||
430 | return ret; | |
431 | } | |
432 | ||
433 | string RGWHTTPClient::to_str() | |
434 | { | |
435 | string method_str = (last_method.empty() ? "<no-method>" : last_method); | |
436 | string url_str = (last_url.empty() ? "<no-url>" : last_url); | |
437 | return method_str + " " + url_str; | |
438 | } | |
439 | ||
440 | int RGWHTTPClient::get_req_retcode() | |
441 | { | |
442 | if (!req_data) { | |
443 | return -EINVAL; | |
444 | } | |
445 | ||
446 | return req_data->get_retcode(); | |
447 | } | |
448 | ||
449 | /* | |
450 | * init request, will be used later with RGWHTTPManager | |
451 | */ | |
31f18b77 | 452 | int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint) |
7c673cae FG |
453 | { |
454 | assert(!req_data); | |
455 | _req_data->get(); | |
456 | req_data = _req_data; | |
457 | ||
458 | CURL *easy_handle; | |
459 | ||
460 | easy_handle = curl_easy_init(); | |
461 | ||
462 | req_data->easy_handle = easy_handle; | |
463 | ||
464 | dout(20) << "sending request to " << url << dendl; | |
465 | ||
466 | curl_slist *h = headers_to_slist(headers); | |
467 | ||
468 | req_data->h = h; | |
469 | ||
470 | last_method = (method ? method : ""); | |
471 | last_url = (url ? url : ""); | |
472 | ||
473 | curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method); | |
474 | curl_easy_setopt(easy_handle, CURLOPT_URL, url); | |
475 | curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); | |
476 | curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); | |
477 | curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); | |
478 | curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data); | |
479 | curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); | |
480 | curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data); | |
481 | curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf); | |
482 | if (h) { | |
483 | curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); | |
484 | } | |
485 | curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); | |
486 | curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); | |
31f18b77 | 487 | if (send_data_hint || is_upload_request(method)) { |
7c673cae FG |
488 | curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); |
489 | } | |
490 | if (has_send_len) { | |
491 | curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); | |
492 | } | |
31f18b77 FG |
493 | if (!verify_ssl) { |
494 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L); | |
495 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L); | |
496 | dout(20) << "ssl verification is set to off" << dendl; | |
497 | } | |
7c673cae FG |
498 | curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data); |
499 | ||
500 | return 0; | |
501 | } | |
502 | ||
503 | /* | |
504 | * wait for async request to complete | |
505 | */ | |
506 | int RGWHTTPClient::wait() | |
507 | { | |
508 | if (!req_data->is_done()) { | |
509 | return req_data->wait(); | |
510 | } | |
511 | ||
512 | return req_data->ret; | |
513 | } | |
514 | ||
515 | RGWHTTPClient::~RGWHTTPClient() | |
516 | { | |
517 | if (req_data) { | |
518 | RGWHTTPManager *http_manager = req_data->get_manager(); | |
519 | if (http_manager) { | |
520 | http_manager->remove_request(this); | |
521 | } | |
522 | ||
523 | req_data->put(); | |
524 | } | |
525 | } | |
526 | ||
527 | ||
528 | int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len) | |
529 | { | |
530 | const boost::string_ref header_line(static_cast<const char * const>(ptr), len); | |
531 | ||
532 | /* We're tokening the line that way due to backward compatibility. */ | |
533 | const size_t sep_loc = header_line.find_first_of(" \t:"); | |
534 | ||
535 | if (boost::string_ref::npos == sep_loc) { | |
536 | /* Wrongly formatted header? Just skip it. */ | |
537 | return 0; | |
538 | } | |
539 | ||
540 | header_name_t name(header_line.substr(0, sep_loc)); | |
541 | if (0 == relevant_headers.count(name)) { | |
542 | /* Not interested in this particular header. */ | |
543 | return 0; | |
544 | } | |
545 | ||
546 | const auto value_part = header_line.substr(sep_loc + 1); | |
547 | ||
548 | /* Skip spaces and tabs after the separator. */ | |
549 | const size_t val_loc_s = value_part.find_first_not_of(' '); | |
550 | const size_t val_loc_e = value_part.find_first_of("\r\n"); | |
551 | ||
552 | if (boost::string_ref::npos == val_loc_s || | |
553 | boost::string_ref::npos == val_loc_e) { | |
554 | /* Empty value case. */ | |
555 | found_headers.emplace(name, header_value_t()); | |
556 | } else { | |
557 | found_headers.emplace(name, header_value_t( | |
558 | value_part.substr(val_loc_s, val_loc_e - val_loc_s))); | |
559 | } | |
560 | ||
561 | return 0; | |
562 | } | |
563 | ||
564 | int RGWHTTPTransceiver::send_data(void* ptr, size_t len) | |
565 | { | |
566 | int length_to_copy = 0; | |
567 | if (post_data_index < post_data.length()) { | |
568 | length_to_copy = min(post_data.length() - post_data_index, len); | |
569 | memcpy(ptr, post_data.data() + post_data_index, length_to_copy); | |
570 | post_data_index += length_to_copy; | |
571 | } | |
572 | return length_to_copy; | |
573 | } | |
574 | ||
575 | ||
576 | static int clear_signal(int fd) | |
577 | { | |
578 | // since we're in non-blocking mode, we can try to read a lot more than | |
579 | // one signal from signal_thread() to avoid later wakeups. non-blocking reads | |
580 | // are also required to support the curl_multi_wait bug workaround | |
581 | std::array<char, 256> buf; | |
582 | int ret = ::read(fd, (void *)buf.data(), buf.size()); | |
583 | if (ret < 0) { | |
584 | ret = -errno; | |
585 | return ret == -EAGAIN ? 0 : ret; // clear EAGAIN | |
586 | } | |
587 | return 0; | |
588 | } | |
589 | ||
590 | #if HAVE_CURL_MULTI_WAIT | |
591 | ||
592 | static std::once_flag detect_flag; | |
593 | static bool curl_multi_wait_bug_present = false; | |
594 | ||
595 | static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle, | |
596 | int write_fd, int read_fd) | |
597 | { | |
598 | int ret = 0; | |
599 | ||
600 | // write to write_fd so that read_fd becomes readable | |
601 | uint32_t buf = 0; | |
602 | ret = ::write(write_fd, &buf, sizeof(buf)); | |
603 | if (ret < 0) { | |
604 | ret = -errno; | |
605 | ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl; | |
606 | return ret; | |
607 | } | |
608 | ||
609 | // pass read_fd in extra_fds for curl_multi_wait() | |
610 | int num_fds; | |
611 | struct curl_waitfd wait_fd; | |
612 | ||
613 | wait_fd.fd = read_fd; | |
614 | wait_fd.events = CURL_WAIT_POLLIN; | |
615 | wait_fd.revents = 0; | |
616 | ||
617 | ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds); | |
618 | if (ret != CURLM_OK) { | |
619 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
620 | return -EIO; | |
621 | } | |
622 | ||
623 | // curl_multi_wait should flag revents when extra_fd is readable. if it | |
624 | // doesn't, the bug is present and we can't rely on revents | |
625 | if (wait_fd.revents == 0) { | |
626 | curl_multi_wait_bug_present = true; | |
627 | ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a " | |
628 | "bug in curl_multi_wait(). enabling a workaround that may degrade " | |
629 | "performance slightly." << dendl; | |
630 | } | |
631 | ||
632 | return clear_signal(read_fd); | |
633 | } | |
634 | ||
635 | static bool is_signaled(const curl_waitfd& wait_fd) | |
636 | { | |
637 | if (wait_fd.fd < 0) { | |
638 | // no fd to signal | |
639 | return false; | |
640 | } | |
641 | ||
642 | if (curl_multi_wait_bug_present) { | |
643 | // we can't rely on revents, so we always return true if a wait_fd is given. | |
644 | // this means we'll be trying a non-blocking read on this fd every time that | |
645 | // curl_multi_wait() wakes up | |
646 | return true; | |
647 | } | |
648 | ||
649 | return wait_fd.revents > 0; | |
650 | } | |
651 | ||
652 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) | |
653 | { | |
654 | int num_fds; | |
655 | struct curl_waitfd wait_fd; | |
656 | ||
657 | wait_fd.fd = signal_fd; | |
658 | wait_fd.events = CURL_WAIT_POLLIN; | |
659 | wait_fd.revents = 0; | |
660 | ||
661 | int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); | |
662 | if (ret) { | |
663 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
664 | return -EIO; | |
665 | } | |
666 | ||
667 | if (is_signaled(wait_fd)) { | |
668 | ret = clear_signal(signal_fd); | |
669 | if (ret < 0) { | |
670 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
671 | return ret; | |
672 | } | |
673 | } | |
674 | return 0; | |
675 | } | |
676 | ||
677 | #else | |
678 | ||
679 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) | |
680 | { | |
681 | fd_set fdread; | |
682 | fd_set fdwrite; | |
683 | fd_set fdexcep; | |
684 | int maxfd = -1; | |
685 | ||
686 | FD_ZERO(&fdread); | |
687 | FD_ZERO(&fdwrite); | |
688 | FD_ZERO(&fdexcep); | |
689 | ||
690 | /* get file descriptors from the transfers */ | |
691 | int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd); | |
692 | if (ret) { | |
693 | ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl; | |
694 | return -EIO; | |
695 | } | |
696 | ||
697 | if (signal_fd > 0) { | |
698 | FD_SET(signal_fd, &fdread); | |
699 | if (signal_fd >= maxfd) { | |
700 | maxfd = signal_fd + 1; | |
701 | } | |
702 | } | |
703 | ||
704 | /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */ | |
705 | uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms; | |
706 | #define RGW_CURL_TIMEOUT 1000 | |
707 | if (!to) | |
708 | to = RGW_CURL_TIMEOUT; | |
709 | struct timeval timeout; | |
710 | timeout.tv_sec = to / 1000; | |
711 | timeout.tv_usec = to % 1000; | |
712 | ||
713 | ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); | |
714 | if (ret < 0) { | |
715 | ret = -errno; | |
716 | ldout(cct, 0) << "ERROR: select returned " << ret << dendl; | |
717 | return ret; | |
718 | } | |
719 | ||
720 | if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) { | |
721 | ret = clear_signal(signal_fd); | |
722 | if (ret < 0) { | |
723 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
724 | return ret; | |
725 | } | |
726 | } | |
727 | ||
728 | return 0; | |
729 | } | |
730 | ||
731 | #endif | |
732 | ||
733 | void *RGWHTTPManager::ReqsThread::entry() | |
734 | { | |
735 | manager->reqs_thread_entry(); | |
736 | return NULL; | |
737 | } | |
738 | ||
739 | /* | |
740 | * RGWHTTPManager has two modes of operation: threaded and non-threaded. | |
741 | */ | |
742 | RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), | |
743 | completion_mgr(_cm), is_threaded(false), | |
744 | reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), | |
745 | reqs_thread(NULL) | |
746 | { | |
747 | multi_handle = (void *)curl_multi_init(); | |
748 | thread_pipe[0] = -1; | |
749 | thread_pipe[1] = -1; | |
750 | } | |
751 | ||
752 | RGWHTTPManager::~RGWHTTPManager() { | |
753 | stop(); | |
754 | if (multi_handle) | |
755 | curl_multi_cleanup((CURLM *)multi_handle); | |
756 | } | |
757 | ||
758 | void RGWHTTPManager::register_request(rgw_http_req_data *req_data) | |
759 | { | |
760 | RWLock::WLocker rl(reqs_lock); | |
761 | req_data->id = num_reqs; | |
762 | req_data->registered = true; | |
763 | reqs[num_reqs] = req_data; | |
764 | num_reqs++; | |
765 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; | |
766 | } | |
767 | ||
768 | void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) | |
769 | { | |
770 | RWLock::WLocker rl(reqs_lock); | |
771 | req_data->get(); | |
772 | req_data->registered = false; | |
773 | unregistered_reqs.push_back(req_data); | |
774 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; | |
775 | } | |
776 | ||
777 | void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) | |
778 | { | |
779 | RWLock::WLocker rl(reqs_lock); | |
780 | _complete_request(req_data); | |
781 | } | |
782 | ||
783 | void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) | |
784 | { | |
785 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id); | |
786 | if (iter != reqs.end()) { | |
787 | reqs.erase(iter); | |
788 | } | |
789 | { | |
790 | Mutex::Locker l(req_data->lock); | |
791 | req_data->mgr = nullptr; | |
792 | } | |
793 | if (completion_mgr) { | |
794 | completion_mgr->complete(NULL, req_data->user_info); | |
795 | } | |
796 | ||
797 | req_data->put(); | |
798 | } | |
799 | ||
800 | void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret) | |
801 | { | |
802 | req_data->finish(ret); | |
803 | complete_request(req_data); | |
804 | } | |
805 | ||
806 | void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) | |
807 | { | |
808 | req_data->finish(ret); | |
809 | _complete_request(req_data); | |
810 | } | |
811 | ||
812 | /* | |
813 | * hook request to the curl multi handle | |
814 | */ | |
815 | int RGWHTTPManager::link_request(rgw_http_req_data *req_data) | |
816 | { | |
817 | ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; | |
818 | CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle); | |
819 | if (mstatus) { | |
820 | dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; | |
821 | return -EIO; | |
822 | } | |
823 | return 0; | |
824 | } | |
825 | ||
826 | /* | |
827 | * unhook request from the curl multi handle, and finish request if it wasn't finished yet as | |
828 | * there will be no more processing on this request | |
829 | */ | |
830 | void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) | |
831 | { | |
832 | if (req_data->easy_handle) { | |
833 | curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle); | |
834 | } | |
835 | if (!req_data->is_done()) { | |
836 | _finish_request(req_data, -ECANCELED); | |
837 | } | |
838 | } | |
839 | ||
840 | void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) | |
841 | { | |
842 | RWLock::WLocker wl(reqs_lock); | |
843 | _unlink_request(req_data); | |
844 | } | |
845 | ||
846 | void RGWHTTPManager::manage_pending_requests() | |
847 | { | |
848 | reqs_lock.get_read(); | |
849 | if (max_threaded_req == num_reqs && unregistered_reqs.empty()) { | |
850 | reqs_lock.unlock(); | |
851 | return; | |
852 | } | |
853 | reqs_lock.unlock(); | |
854 | ||
855 | RWLock::WLocker wl(reqs_lock); | |
856 | ||
857 | if (!unregistered_reqs.empty()) { | |
858 | for (auto& r : unregistered_reqs) { | |
859 | _unlink_request(r); | |
860 | r->put(); | |
861 | } | |
862 | ||
863 | unregistered_reqs.clear(); | |
864 | } | |
865 | ||
866 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req); | |
867 | ||
868 | list<std::pair<rgw_http_req_data *, int> > remove_reqs; | |
869 | ||
870 | for (; iter != reqs.end(); ++iter) { | |
871 | rgw_http_req_data *req_data = iter->second; | |
872 | int r = link_request(req_data); | |
873 | if (r < 0) { | |
874 | ldout(cct, 0) << "ERROR: failed to link http request" << dendl; | |
875 | remove_reqs.push_back(std::make_pair(iter->second, r)); | |
876 | } else { | |
877 | max_threaded_req = iter->first + 1; | |
878 | } | |
879 | } | |
880 | ||
881 | for (auto piter : remove_reqs) { | |
882 | rgw_http_req_data *req_data = piter.first; | |
883 | int r = piter.second; | |
884 | ||
885 | _finish_request(req_data, r); | |
886 | } | |
887 | } | |
888 | ||
31f18b77 | 889 | int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint) |
7c673cae FG |
890 | { |
891 | rgw_http_req_data *req_data = new rgw_http_req_data; | |
892 | ||
31f18b77 | 893 | int ret = client->init_request(method, url, req_data, send_data_hint); |
7c673cae FG |
894 | if (ret < 0) { |
895 | req_data->put(); | |
896 | req_data = NULL; | |
897 | return ret; | |
898 | } | |
899 | ||
900 | req_data->mgr = this; | |
901 | req_data->client = client; | |
902 | req_data->user_info = client->get_user_info(); | |
903 | ||
904 | register_request(req_data); | |
905 | ||
906 | if (!is_threaded) { | |
907 | ret = link_request(req_data); | |
908 | if (ret < 0) { | |
909 | req_data->put(); | |
910 | req_data = NULL; | |
911 | } | |
912 | return ret; | |
913 | } | |
914 | ret = signal_thread(); | |
915 | if (ret < 0) { | |
916 | finish_request(req_data, ret); | |
917 | } | |
918 | ||
919 | return ret; | |
920 | } | |
921 | ||
922 | int RGWHTTPManager::remove_request(RGWHTTPClient *client) | |
923 | { | |
924 | rgw_http_req_data *req_data = client->get_req_data(); | |
925 | ||
926 | if (!is_threaded) { | |
927 | unlink_request(req_data); | |
928 | return 0; | |
929 | } | |
930 | unregister_request(req_data); | |
931 | int ret = signal_thread(); | |
932 | if (ret < 0) { | |
933 | return ret; | |
934 | } | |
935 | ||
936 | return 0; | |
937 | } | |
938 | ||
939 | /* | |
940 | * the synchronous, non-threaded request processing method. | |
941 | */ | |
942 | int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) | |
943 | { | |
944 | assert(!is_threaded); | |
945 | ||
946 | int still_running; | |
947 | int mstatus; | |
948 | ||
949 | do { | |
950 | if (wait_for_data) { | |
951 | int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1); | |
952 | if (ret < 0) { | |
953 | return ret; | |
954 | } | |
955 | } | |
956 | ||
957 | mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); | |
958 | switch (mstatus) { | |
959 | case CURLM_OK: | |
960 | case CURLM_CALL_MULTI_PERFORM: | |
961 | break; | |
962 | default: | |
963 | dout(20) << "curl_multi_perform returned: " << mstatus << dendl; | |
964 | return -EINVAL; | |
965 | } | |
966 | int msgs_left; | |
967 | CURLMsg *msg; | |
968 | while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { | |
969 | if (msg->msg == CURLMSG_DONE) { | |
970 | CURL *e = msg->easy_handle; | |
971 | rgw_http_req_data *req_data; | |
972 | curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); | |
973 | ||
974 | long http_status; | |
975 | curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); | |
976 | ||
977 | int status = rgw_http_error_to_errno(http_status); | |
978 | int result = msg->data.result; | |
979 | finish_request(req_data, status); | |
980 | switch (result) { | |
981 | case CURLE_OK: | |
982 | break; | |
983 | default: | |
984 | dout(20) << "ERROR: msg->data.result=" << result << dendl; | |
985 | return -EIO; | |
986 | } | |
987 | } | |
988 | } | |
989 | } while (mstatus == CURLM_CALL_MULTI_PERFORM); | |
990 | ||
991 | *done = (still_running == 0); | |
992 | ||
993 | return 0; | |
994 | } | |
995 | ||
996 | /* | |
997 | * the synchronous, non-threaded request processing completion method. | |
998 | */ | |
999 | int RGWHTTPManager::complete_requests() | |
1000 | { | |
1001 | bool done = false; | |
1002 | int ret; | |
1003 | do { | |
1004 | ret = process_requests(true, &done); | |
1005 | } while (!done && !ret); | |
1006 | ||
1007 | return ret; | |
1008 | } | |
1009 | ||
1010 | int RGWHTTPManager::set_threaded() | |
1011 | { | |
1012 | int r = pipe(thread_pipe); | |
1013 | if (r < 0) { | |
1014 | r = -errno; | |
1015 | ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl; | |
1016 | return r; | |
1017 | } | |
1018 | ||
1019 | // enable non-blocking reads | |
1020 | r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK); | |
1021 | if (r < 0) { | |
1022 | r = -errno; | |
1023 | ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl; | |
1024 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); | |
1025 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
1026 | return r; | |
1027 | } | |
1028 | ||
1029 | #ifdef HAVE_CURL_MULTI_WAIT | |
1030 | // on first initialization, use this pipe to detect whether we're using a | |
1031 | // buggy version of libcurl | |
1032 | std::call_once(detect_flag, detect_curl_multi_wait_bug, cct, | |
1033 | static_cast<CURLM*>(multi_handle), | |
1034 | thread_pipe[1], thread_pipe[0]); | |
1035 | #endif | |
1036 | ||
1037 | is_threaded = true; | |
1038 | reqs_thread = new ReqsThread(this); | |
1039 | reqs_thread->create("http_manager"); | |
1040 | return 0; | |
1041 | } | |
1042 | ||
1043 | void RGWHTTPManager::stop() | |
1044 | { | |
1045 | if (is_stopped) { | |
1046 | return; | |
1047 | } | |
1048 | ||
1049 | is_stopped = true; | |
1050 | ||
1051 | if (is_threaded) { | |
1052 | going_down = true; | |
1053 | signal_thread(); | |
1054 | reqs_thread->join(); | |
1055 | delete reqs_thread; | |
1056 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
1057 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); | |
1058 | } | |
1059 | } | |
1060 | ||
1061 | int RGWHTTPManager::signal_thread() | |
1062 | { | |
1063 | uint32_t buf = 0; | |
1064 | int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf)); | |
1065 | if (ret < 0) { | |
1066 | ret = -errno; | |
1067 | ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl; | |
1068 | return ret; | |
1069 | } | |
1070 | return 0; | |
1071 | } | |
1072 | ||
1073 | void *RGWHTTPManager::reqs_thread_entry() | |
1074 | { | |
1075 | int still_running; | |
1076 | int mstatus; | |
1077 | ||
1078 | ldout(cct, 20) << __func__ << ": start" << dendl; | |
1079 | ||
1080 | while (!going_down) { | |
1081 | int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]); | |
1082 | if (ret < 0) { | |
1083 | dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; | |
1084 | return NULL; | |
1085 | } | |
1086 | ||
1087 | manage_pending_requests(); | |
1088 | ||
1089 | mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); | |
1090 | switch (mstatus) { | |
1091 | case CURLM_OK: | |
1092 | case CURLM_CALL_MULTI_PERFORM: | |
1093 | break; | |
1094 | default: | |
1095 | dout(10) << "curl_multi_perform returned: " << mstatus << dendl; | |
1096 | break; | |
1097 | } | |
1098 | int msgs_left; | |
1099 | CURLMsg *msg; | |
1100 | while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { | |
1101 | if (msg->msg == CURLMSG_DONE) { | |
1102 | int result = msg->data.result; | |
1103 | CURL *e = msg->easy_handle; | |
1104 | rgw_http_req_data *req_data; | |
1105 | curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); | |
1106 | curl_multi_remove_handle((CURLM *)multi_handle, e); | |
1107 | ||
1108 | long http_status; | |
1109 | curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); | |
1110 | ||
1111 | int status = rgw_http_error_to_errno(http_status); | |
1112 | if (result != CURLE_OK && http_status == 0) { | |
1113 | status = -EAGAIN; | |
1114 | } | |
1115 | int id = req_data->id; | |
1116 | finish_request(req_data, status); | |
1117 | switch (result) { | |
1118 | case CURLE_OK: | |
1119 | break; | |
1120 | default: | |
1121 | dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl; | |
1122 | break; | |
1123 | } | |
1124 | } | |
1125 | } | |
1126 | } | |
1127 | ||
1128 | ||
1129 | RWLock::WLocker rl(reqs_lock); | |
1130 | for (auto r : unregistered_reqs) { | |
1131 | _finish_request(r, -ECANCELED); | |
1132 | } | |
1133 | ||
1134 | unregistered_reqs.clear(); | |
1135 | ||
1136 | auto all_reqs = std::move(reqs); | |
1137 | for (auto iter : all_reqs) { | |
1138 | _finish_request(iter.second, -ECANCELED); | |
1139 | } | |
1140 | ||
1141 | reqs.clear(); | |
1142 | ||
1143 | if (completion_mgr) { | |
1144 | completion_mgr->go_down(); | |
1145 | } | |
1146 | ||
1147 | return 0; | |
1148 | } | |
1149 | ||
1150 |