]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/compat.h" | |
5 | ||
6 | #include <boost/utility/string_ref.hpp> | |
7 | ||
8 | #include <curl/curl.h> | |
9 | #include <curl/easy.h> | |
10 | #include <curl/multi.h> | |
11 | ||
12 | #include "rgw_common.h" | |
13 | #include "rgw_http_client.h" | |
14 | #include "rgw_http_errors.h" | |
15 | #include "common/RefCountedObj.h" | |
16 | ||
17 | #include "rgw_coroutine.h" | |
18 | ||
19 | #include <atomic> | |
20 | ||
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_rgw | |
23 | ||
24 | struct rgw_http_req_data : public RefCountedObject { | |
25 | CURL *easy_handle; | |
26 | curl_slist *h; | |
27 | uint64_t id; | |
28 | int ret; | |
29 | std::atomic<bool> done = { false }; | |
30 | RGWHTTPClient *client; | |
31 | void *user_info; | |
32 | bool registered; | |
33 | RGWHTTPManager *mgr; | |
34 | char error_buf[CURL_ERROR_SIZE]; | |
35 | ||
36 | Mutex lock; | |
37 | Cond cond; | |
38 | ||
39 | rgw_http_req_data() : easy_handle(NULL), h(NULL), id(-1), ret(0), | |
40 | client(nullptr), user_info(nullptr), registered(false), | |
41 | mgr(NULL), lock("rgw_http_req_data::lock") { | |
42 | memset(error_buf, 0, sizeof(error_buf)); | |
43 | } | |
44 | ||
45 | int wait() { | |
46 | Mutex::Locker l(lock); | |
47 | cond.Wait(lock); | |
48 | return ret; | |
49 | } | |
50 | ||
51 | ||
52 | void finish(int r) { | |
53 | Mutex::Locker l(lock); | |
54 | ret = r; | |
55 | if (easy_handle) | |
56 | curl_easy_cleanup(easy_handle); | |
57 | ||
58 | if (h) | |
59 | curl_slist_free_all(h); | |
60 | ||
61 | easy_handle = NULL; | |
62 | h = NULL; | |
63 | done = true; | |
64 | cond.Signal(); | |
65 | } | |
66 | ||
67 | bool is_done() { | |
68 | return done; | |
69 | } | |
70 | ||
71 | int get_retcode() { | |
72 | Mutex::Locker l(lock); | |
73 | return ret; | |
74 | } | |
75 | ||
76 | RGWHTTPManager *get_manager() { | |
77 | Mutex::Locker l(lock); | |
78 | return mgr; | |
79 | } | |
80 | }; | |
81 | ||
94b18763 FG |
82 | struct RGWCurlHandle { |
83 | int uses; | |
84 | mono_time lastuse; | |
85 | CURL* h; | |
86 | ||
87 | RGWCurlHandle(CURL* h) : uses(0), h(h) {}; | |
88 | CURL* operator*() { | |
89 | return this->h; | |
90 | } | |
91 | }; | |
92 | ||
93 | #define MAXIDLE 5 | |
94 | class RGWCurlHandles : public Thread { | |
95 | public: | |
96 | Mutex cleaner_lock; | |
97 | std::vector<RGWCurlHandle*>saved_curl; | |
98 | int cleaner_shutdown; | |
99 | Cond cleaner_cond; | |
100 | ||
101 | RGWCurlHandles() : | |
102 | cleaner_lock{"RGWCurlHandles::cleaner_lock"}, | |
103 | cleaner_shutdown{0} { | |
104 | } | |
105 | ||
106 | RGWCurlHandle* get_curl_handle(); | |
107 | void release_curl_handle_now(RGWCurlHandle* curl); | |
108 | void release_curl_handle(RGWCurlHandle* curl); | |
109 | void flush_curl_handles(); | |
110 | void* entry(); | |
111 | void stop(); | |
112 | }; | |
113 | ||
114 | RGWCurlHandle* RGWCurlHandles::get_curl_handle() { | |
115 | RGWCurlHandle* curl = 0; | |
116 | CURL* h; | |
117 | { | |
118 | Mutex::Locker lock(cleaner_lock); | |
119 | if (!saved_curl.empty()) { | |
120 | curl = *saved_curl.begin(); | |
121 | saved_curl.erase(saved_curl.begin()); | |
122 | } | |
123 | } | |
124 | if (curl) { | |
125 | } else if ((h = curl_easy_init())) { | |
126 | curl = new RGWCurlHandle{h}; | |
127 | } else { | |
128 | // curl = 0; | |
129 | } | |
130 | return curl; | |
131 | } | |
132 | ||
133 | void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl) | |
134 | { | |
135 | curl_easy_cleanup(**curl); | |
136 | delete curl; | |
137 | } | |
138 | ||
139 | void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl) | |
140 | { | |
141 | if (cleaner_shutdown) { | |
142 | release_curl_handle_now(curl); | |
143 | } else { | |
144 | curl_easy_reset(**curl); | |
145 | Mutex::Locker lock(cleaner_lock); | |
146 | curl->lastuse = mono_clock::now(); | |
147 | saved_curl.insert(saved_curl.begin(), 1, curl); | |
148 | } | |
149 | } | |
150 | ||
151 | void* RGWCurlHandles::entry() | |
152 | { | |
153 | RGWCurlHandle* curl; | |
154 | Mutex::Locker lock(cleaner_lock); | |
155 | ||
156 | for (;;) { | |
157 | if (cleaner_shutdown) { | |
158 | if (saved_curl.empty()) | |
159 | break; | |
160 | } else { | |
161 | utime_t release = ceph_clock_now() + utime_t(MAXIDLE,0); | |
162 | cleaner_cond.WaitUntil(cleaner_lock, release); | |
163 | } | |
164 | mono_time now = mono_clock::now(); | |
165 | while (!saved_curl.empty()) { | |
166 | auto cend = saved_curl.end(); | |
167 | --cend; | |
168 | curl = *cend; | |
169 | if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE)) | |
170 | break; | |
171 | saved_curl.erase(cend); | |
172 | release_curl_handle_now(curl); | |
173 | } | |
174 | } | |
175 | return nullptr; | |
176 | } | |
177 | ||
178 | void RGWCurlHandles::stop() | |
179 | { | |
180 | Mutex::Locker lock(cleaner_lock); | |
181 | cleaner_shutdown = 1; | |
182 | cleaner_cond.Signal(); | |
183 | } | |
184 | ||
185 | void RGWCurlHandles::flush_curl_handles() | |
186 | { | |
187 | stop(); | |
188 | join(); | |
189 | if (!saved_curl.empty()) { | |
190 | dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl; | |
191 | } | |
192 | saved_curl.shrink_to_fit(); | |
193 | } | |
194 | ||
195 | static RGWCurlHandles *handles; | |
196 | // XXX make this part of the token cache? (but that's swift-only; | |
197 | // and this especially needs to integrates with s3...) | |
198 | ||
199 | void rgw_setup_saved_curl_handles() | |
200 | { | |
201 | handles = new RGWCurlHandles(); | |
202 | handles->create("rgw_curl"); | |
203 | } | |
204 | ||
205 | void rgw_release_all_curl_handles() | |
206 | { | |
207 | handles->flush_curl_handles(); | |
208 | delete handles; | |
209 | } | |
210 | ||
7c673cae FG |
211 | /* |
212 | * the simple set of callbacks will be called on RGWHTTPClient::process() | |
213 | */ | |
214 | /* Static methods - callbacks for libcurl. */ | |
215 | size_t RGWHTTPClient::simple_receive_http_header(void * const ptr, | |
216 | const size_t size, | |
217 | const size_t nmemb, | |
218 | void * const _info) | |
219 | { | |
220 | RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info); | |
221 | const size_t len = size * nmemb; | |
222 | int ret = client->receive_header(ptr, size * nmemb); | |
223 | if (ret < 0) { | |
224 | dout(0) << "WARNING: client->receive_header() returned ret=" | |
225 | << ret << dendl; | |
226 | } | |
227 | ||
228 | return len; | |
229 | } | |
230 | ||
231 | size_t RGWHTTPClient::simple_receive_http_data(void * const ptr, | |
232 | const size_t size, | |
233 | const size_t nmemb, | |
234 | void * const _info) | |
235 | { | |
236 | RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info); | |
237 | const size_t len = size * nmemb; | |
238 | int ret = client->receive_data(ptr, size * nmemb); | |
239 | if (ret < 0) { | |
240 | dout(0) << "WARNING: client->receive_data() returned ret=" | |
241 | << ret << dendl; | |
242 | } | |
243 | ||
244 | return len; | |
245 | } | |
246 | ||
247 | size_t RGWHTTPClient::simple_send_http_data(void * const ptr, | |
248 | const size_t size, | |
249 | const size_t nmemb, | |
250 | void * const _info) | |
251 | { | |
252 | RGWHTTPClient *client = static_cast<RGWHTTPClient *>(_info); | |
253 | int ret = client->send_data(ptr, size * nmemb); | |
254 | if (ret < 0) { | |
255 | dout(0) << "WARNING: client->send_data() returned ret=" | |
256 | << ret << dendl; | |
257 | } | |
258 | ||
259 | return ret; | |
260 | } | |
261 | ||
262 | /* | |
263 | * the following set of callbacks will be called either on RGWHTTPManager::process(), | |
264 | * or via the RGWHTTPManager async processing. | |
265 | */ | |
266 | size_t RGWHTTPClient::receive_http_header(void * const ptr, | |
267 | const size_t size, | |
268 | const size_t nmemb, | |
269 | void * const _info) | |
270 | { | |
271 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
272 | size_t len = size * nmemb; | |
273 | ||
274 | Mutex::Locker l(req_data->lock); | |
275 | ||
276 | if (!req_data->registered) { | |
277 | return len; | |
278 | } | |
279 | ||
280 | int ret = req_data->client->receive_header(ptr, size * nmemb); | |
281 | if (ret < 0) { | |
282 | dout(0) << "WARNING: client->receive_header() returned ret=" << ret << dendl; | |
283 | } | |
284 | ||
285 | return len; | |
286 | } | |
287 | ||
288 | size_t RGWHTTPClient::receive_http_data(void * const ptr, | |
289 | const size_t size, | |
290 | const size_t nmemb, | |
291 | void * const _info) | |
292 | { | |
293 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
294 | size_t len = size * nmemb; | |
295 | ||
296 | Mutex::Locker l(req_data->lock); | |
297 | ||
298 | if (!req_data->registered) { | |
299 | return len; | |
300 | } | |
301 | ||
302 | int ret = req_data->client->receive_data(ptr, size * nmemb); | |
303 | if (ret < 0) { | |
304 | dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; | |
305 | } | |
306 | ||
307 | return len; | |
308 | } | |
309 | ||
310 | size_t RGWHTTPClient::send_http_data(void * const ptr, | |
311 | const size_t size, | |
312 | const size_t nmemb, | |
313 | void * const _info) | |
314 | { | |
315 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
316 | ||
317 | Mutex::Locker l(req_data->lock); | |
318 | ||
319 | if (!req_data->registered) { | |
320 | return 0; | |
321 | } | |
322 | ||
323 | int ret = req_data->client->send_data(ptr, size * nmemb); | |
324 | if (ret < 0) { | |
325 | dout(0) << "WARNING: client->receive_data() returned ret=" << ret << dendl; | |
326 | } | |
327 | ||
328 | return ret; | |
329 | } | |
330 | ||
331 | static curl_slist *headers_to_slist(param_vec_t& headers) | |
332 | { | |
333 | curl_slist *h = NULL; | |
334 | ||
335 | param_vec_t::iterator iter; | |
336 | for (iter = headers.begin(); iter != headers.end(); ++iter) { | |
337 | pair<string, string>& p = *iter; | |
338 | string val = p.first; | |
339 | ||
340 | if (strncmp(val.c_str(), "HTTP_", 5) == 0) { | |
341 | val = val.substr(5); | |
342 | } | |
343 | ||
344 | /* we need to convert all underscores into dashes as some web servers forbid them | |
345 | * in the http header field names | |
346 | */ | |
347 | for (size_t i = 0; i < val.size(); i++) { | |
348 | if (val[i] == '_') { | |
349 | val[i] = '-'; | |
350 | } | |
351 | } | |
352 | ||
353 | val.append(": "); | |
354 | val.append(p.second); | |
355 | h = curl_slist_append(h, val.c_str()); | |
356 | } | |
357 | ||
358 | return h; | |
359 | } | |
360 | ||
361 | static bool is_upload_request(const char *method) | |
362 | { | |
363 | if (method == nullptr) { | |
364 | return false; | |
365 | } | |
366 | return strcmp(method, "POST") == 0 || strcmp(method, "PUT") == 0; | |
367 | } | |
368 | ||
369 | /* | |
370 | * process a single simple one off request, not going through RGWHTTPManager. Not using | |
371 | * req_data. | |
372 | */ | |
373 | int RGWHTTPClient::process(const char *method, const char *url) | |
374 | { | |
375 | int ret = 0; | |
376 | CURL *curl_handle; | |
377 | ||
378 | char error_buf[CURL_ERROR_SIZE]; | |
379 | ||
380 | last_method = (method ? method : ""); | |
381 | last_url = (url ? url : ""); | |
382 | ||
94b18763 FG |
383 | auto ca = handles->get_curl_handle(); |
384 | curl_handle = **ca; | |
7c673cae FG |
385 | |
386 | dout(20) << "sending request to " << url << dendl; | |
387 | ||
388 | curl_slist *h = headers_to_slist(headers); | |
389 | ||
390 | curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, method); | |
391 | curl_easy_setopt(curl_handle, CURLOPT_URL, url); | |
392 | curl_easy_setopt(curl_handle, CURLOPT_NOPROGRESS, 1L); | |
393 | curl_easy_setopt(curl_handle, CURLOPT_NOSIGNAL, 1L); | |
394 | curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, simple_receive_http_header); | |
395 | curl_easy_setopt(curl_handle, CURLOPT_WRITEHEADER, (void *)this); | |
396 | curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, simple_receive_http_data); | |
397 | curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)this); | |
398 | curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, (void *)error_buf); | |
399 | if (h) { | |
400 | curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, (void *)h); | |
401 | } | |
402 | curl_easy_setopt(curl_handle, CURLOPT_READFUNCTION, simple_send_http_data); | |
403 | curl_easy_setopt(curl_handle, CURLOPT_READDATA, (void *)this); | |
404 | if (is_upload_request(method)) { | |
405 | curl_easy_setopt(curl_handle, CURLOPT_UPLOAD, 1L); | |
406 | } | |
407 | if (has_send_len) { | |
408 | curl_easy_setopt(curl_handle, CURLOPT_INFILESIZE, (void *)send_len); | |
409 | } | |
410 | if (!verify_ssl) { | |
411 | curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYPEER, 0L); | |
412 | curl_easy_setopt(curl_handle, CURLOPT_SSL_VERIFYHOST, 0L); | |
413 | dout(20) << "ssl verification is set to off" << dendl; | |
414 | } | |
415 | ||
416 | CURLcode status = curl_easy_perform(curl_handle); | |
417 | if (status) { | |
418 | dout(0) << "curl_easy_perform returned status " << status << " error: " << error_buf << dendl; | |
419 | ret = -EINVAL; | |
420 | } | |
421 | curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_status); | |
94b18763 | 422 | handles->release_curl_handle(ca); |
7c673cae FG |
423 | curl_slist_free_all(h); |
424 | ||
425 | return ret; | |
426 | } | |
427 | ||
428 | string RGWHTTPClient::to_str() | |
429 | { | |
430 | string method_str = (last_method.empty() ? "<no-method>" : last_method); | |
431 | string url_str = (last_url.empty() ? "<no-url>" : last_url); | |
432 | return method_str + " " + url_str; | |
433 | } | |
434 | ||
435 | int RGWHTTPClient::get_req_retcode() | |
436 | { | |
437 | if (!req_data) { | |
438 | return -EINVAL; | |
439 | } | |
440 | ||
441 | return req_data->get_retcode(); | |
442 | } | |
443 | ||
444 | /* | |
445 | * init request, will be used later with RGWHTTPManager | |
446 | */ | |
31f18b77 | 447 | int RGWHTTPClient::init_request(const char *method, const char *url, rgw_http_req_data *_req_data, bool send_data_hint) |
7c673cae FG |
448 | { |
449 | assert(!req_data); | |
450 | _req_data->get(); | |
451 | req_data = _req_data; | |
452 | ||
453 | CURL *easy_handle; | |
454 | ||
455 | easy_handle = curl_easy_init(); | |
456 | ||
457 | req_data->easy_handle = easy_handle; | |
458 | ||
459 | dout(20) << "sending request to " << url << dendl; | |
460 | ||
461 | curl_slist *h = headers_to_slist(headers); | |
462 | ||
463 | req_data->h = h; | |
464 | ||
465 | last_method = (method ? method : ""); | |
466 | last_url = (url ? url : ""); | |
467 | ||
468 | curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method); | |
469 | curl_easy_setopt(easy_handle, CURLOPT_URL, url); | |
470 | curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); | |
471 | curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); | |
472 | curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); | |
473 | curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data); | |
474 | curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); | |
475 | curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data); | |
476 | curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf); | |
477 | if (h) { | |
478 | curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); | |
479 | } | |
480 | curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); | |
481 | curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); | |
31f18b77 | 482 | if (send_data_hint || is_upload_request(method)) { |
7c673cae FG |
483 | curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); |
484 | } | |
485 | if (has_send_len) { | |
486 | curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); | |
487 | } | |
31f18b77 FG |
488 | if (!verify_ssl) { |
489 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L); | |
490 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L); | |
491 | dout(20) << "ssl verification is set to off" << dendl; | |
492 | } | |
7c673cae FG |
493 | curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data); |
494 | ||
495 | return 0; | |
496 | } | |
497 | ||
498 | /* | |
499 | * wait for async request to complete | |
500 | */ | |
501 | int RGWHTTPClient::wait() | |
502 | { | |
503 | if (!req_data->is_done()) { | |
504 | return req_data->wait(); | |
505 | } | |
506 | ||
507 | return req_data->ret; | |
508 | } | |
509 | ||
510 | RGWHTTPClient::~RGWHTTPClient() | |
511 | { | |
512 | if (req_data) { | |
513 | RGWHTTPManager *http_manager = req_data->get_manager(); | |
514 | if (http_manager) { | |
515 | http_manager->remove_request(this); | |
516 | } | |
517 | ||
518 | req_data->put(); | |
519 | } | |
520 | } | |
521 | ||
522 | ||
523 | int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len) | |
524 | { | |
525 | const boost::string_ref header_line(static_cast<const char * const>(ptr), len); | |
526 | ||
527 | /* We're tokening the line that way due to backward compatibility. */ | |
528 | const size_t sep_loc = header_line.find_first_of(" \t:"); | |
529 | ||
530 | if (boost::string_ref::npos == sep_loc) { | |
531 | /* Wrongly formatted header? Just skip it. */ | |
532 | return 0; | |
533 | } | |
534 | ||
535 | header_name_t name(header_line.substr(0, sep_loc)); | |
536 | if (0 == relevant_headers.count(name)) { | |
537 | /* Not interested in this particular header. */ | |
538 | return 0; | |
539 | } | |
540 | ||
541 | const auto value_part = header_line.substr(sep_loc + 1); | |
542 | ||
543 | /* Skip spaces and tabs after the separator. */ | |
544 | const size_t val_loc_s = value_part.find_first_not_of(' '); | |
545 | const size_t val_loc_e = value_part.find_first_of("\r\n"); | |
546 | ||
547 | if (boost::string_ref::npos == val_loc_s || | |
548 | boost::string_ref::npos == val_loc_e) { | |
549 | /* Empty value case. */ | |
550 | found_headers.emplace(name, header_value_t()); | |
551 | } else { | |
552 | found_headers.emplace(name, header_value_t( | |
553 | value_part.substr(val_loc_s, val_loc_e - val_loc_s))); | |
554 | } | |
555 | ||
556 | return 0; | |
557 | } | |
558 | ||
559 | int RGWHTTPTransceiver::send_data(void* ptr, size_t len) | |
560 | { | |
561 | int length_to_copy = 0; | |
562 | if (post_data_index < post_data.length()) { | |
563 | length_to_copy = min(post_data.length() - post_data_index, len); | |
564 | memcpy(ptr, post_data.data() + post_data_index, length_to_copy); | |
565 | post_data_index += length_to_copy; | |
566 | } | |
567 | return length_to_copy; | |
568 | } | |
569 | ||
570 | ||
571 | static int clear_signal(int fd) | |
572 | { | |
573 | // since we're in non-blocking mode, we can try to read a lot more than | |
574 | // one signal from signal_thread() to avoid later wakeups. non-blocking reads | |
575 | // are also required to support the curl_multi_wait bug workaround | |
576 | std::array<char, 256> buf; | |
577 | int ret = ::read(fd, (void *)buf.data(), buf.size()); | |
578 | if (ret < 0) { | |
579 | ret = -errno; | |
580 | return ret == -EAGAIN ? 0 : ret; // clear EAGAIN | |
581 | } | |
582 | return 0; | |
583 | } | |
584 | ||
585 | #if HAVE_CURL_MULTI_WAIT | |
586 | ||
587 | static std::once_flag detect_flag; | |
588 | static bool curl_multi_wait_bug_present = false; | |
589 | ||
590 | static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle, | |
591 | int write_fd, int read_fd) | |
592 | { | |
593 | int ret = 0; | |
594 | ||
595 | // write to write_fd so that read_fd becomes readable | |
596 | uint32_t buf = 0; | |
597 | ret = ::write(write_fd, &buf, sizeof(buf)); | |
598 | if (ret < 0) { | |
599 | ret = -errno; | |
600 | ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl; | |
601 | return ret; | |
602 | } | |
603 | ||
604 | // pass read_fd in extra_fds for curl_multi_wait() | |
605 | int num_fds; | |
606 | struct curl_waitfd wait_fd; | |
607 | ||
608 | wait_fd.fd = read_fd; | |
609 | wait_fd.events = CURL_WAIT_POLLIN; | |
610 | wait_fd.revents = 0; | |
611 | ||
612 | ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds); | |
613 | if (ret != CURLM_OK) { | |
614 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
615 | return -EIO; | |
616 | } | |
617 | ||
618 | // curl_multi_wait should flag revents when extra_fd is readable. if it | |
619 | // doesn't, the bug is present and we can't rely on revents | |
620 | if (wait_fd.revents == 0) { | |
621 | curl_multi_wait_bug_present = true; | |
622 | ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a " | |
623 | "bug in curl_multi_wait(). enabling a workaround that may degrade " | |
624 | "performance slightly." << dendl; | |
625 | } | |
626 | ||
627 | return clear_signal(read_fd); | |
628 | } | |
629 | ||
630 | static bool is_signaled(const curl_waitfd& wait_fd) | |
631 | { | |
632 | if (wait_fd.fd < 0) { | |
633 | // no fd to signal | |
634 | return false; | |
635 | } | |
636 | ||
637 | if (curl_multi_wait_bug_present) { | |
638 | // we can't rely on revents, so we always return true if a wait_fd is given. | |
639 | // this means we'll be trying a non-blocking read on this fd every time that | |
640 | // curl_multi_wait() wakes up | |
641 | return true; | |
642 | } | |
643 | ||
644 | return wait_fd.revents > 0; | |
645 | } | |
646 | ||
647 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) | |
648 | { | |
649 | int num_fds; | |
650 | struct curl_waitfd wait_fd; | |
651 | ||
652 | wait_fd.fd = signal_fd; | |
653 | wait_fd.events = CURL_WAIT_POLLIN; | |
654 | wait_fd.revents = 0; | |
655 | ||
656 | int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); | |
657 | if (ret) { | |
658 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
659 | return -EIO; | |
660 | } | |
661 | ||
662 | if (is_signaled(wait_fd)) { | |
663 | ret = clear_signal(signal_fd); | |
664 | if (ret < 0) { | |
665 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
666 | return ret; | |
667 | } | |
668 | } | |
669 | return 0; | |
670 | } | |
671 | ||
672 | #else | |
673 | ||
674 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) | |
675 | { | |
676 | fd_set fdread; | |
677 | fd_set fdwrite; | |
678 | fd_set fdexcep; | |
679 | int maxfd = -1; | |
680 | ||
681 | FD_ZERO(&fdread); | |
682 | FD_ZERO(&fdwrite); | |
683 | FD_ZERO(&fdexcep); | |
684 | ||
685 | /* get file descriptors from the transfers */ | |
686 | int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd); | |
687 | if (ret) { | |
688 | ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl; | |
689 | return -EIO; | |
690 | } | |
691 | ||
692 | if (signal_fd > 0) { | |
693 | FD_SET(signal_fd, &fdread); | |
694 | if (signal_fd >= maxfd) { | |
695 | maxfd = signal_fd + 1; | |
696 | } | |
697 | } | |
698 | ||
699 | /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */ | |
700 | uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms; | |
701 | #define RGW_CURL_TIMEOUT 1000 | |
702 | if (!to) | |
703 | to = RGW_CURL_TIMEOUT; | |
704 | struct timeval timeout; | |
705 | timeout.tv_sec = to / 1000; | |
706 | timeout.tv_usec = to % 1000; | |
707 | ||
708 | ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); | |
709 | if (ret < 0) { | |
710 | ret = -errno; | |
711 | ldout(cct, 0) << "ERROR: select returned " << ret << dendl; | |
712 | return ret; | |
713 | } | |
714 | ||
715 | if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) { | |
716 | ret = clear_signal(signal_fd); | |
717 | if (ret < 0) { | |
718 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
719 | return ret; | |
720 | } | |
721 | } | |
722 | ||
723 | return 0; | |
724 | } | |
725 | ||
726 | #endif | |
727 | ||
728 | void *RGWHTTPManager::ReqsThread::entry() | |
729 | { | |
730 | manager->reqs_thread_entry(); | |
731 | return NULL; | |
732 | } | |
733 | ||
734 | /* | |
735 | * RGWHTTPManager has two modes of operation: threaded and non-threaded. | |
736 | */ | |
737 | RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), | |
738 | completion_mgr(_cm), is_threaded(false), | |
739 | reqs_lock("RGWHTTPManager::reqs_lock"), num_reqs(0), max_threaded_req(0), | |
740 | reqs_thread(NULL) | |
741 | { | |
742 | multi_handle = (void *)curl_multi_init(); | |
743 | thread_pipe[0] = -1; | |
744 | thread_pipe[1] = -1; | |
745 | } | |
746 | ||
747 | RGWHTTPManager::~RGWHTTPManager() { | |
748 | stop(); | |
749 | if (multi_handle) | |
750 | curl_multi_cleanup((CURLM *)multi_handle); | |
751 | } | |
752 | ||
753 | void RGWHTTPManager::register_request(rgw_http_req_data *req_data) | |
754 | { | |
755 | RWLock::WLocker rl(reqs_lock); | |
756 | req_data->id = num_reqs; | |
757 | req_data->registered = true; | |
758 | reqs[num_reqs] = req_data; | |
759 | num_reqs++; | |
760 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; | |
761 | } | |
762 | ||
763 | void RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) | |
764 | { | |
765 | RWLock::WLocker rl(reqs_lock); | |
766 | req_data->get(); | |
767 | req_data->registered = false; | |
768 | unregistered_reqs.push_back(req_data); | |
769 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; | |
770 | } | |
771 | ||
772 | void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) | |
773 | { | |
774 | RWLock::WLocker rl(reqs_lock); | |
775 | _complete_request(req_data); | |
776 | } | |
777 | ||
778 | void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) | |
779 | { | |
780 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id); | |
781 | if (iter != reqs.end()) { | |
782 | reqs.erase(iter); | |
783 | } | |
784 | { | |
785 | Mutex::Locker l(req_data->lock); | |
786 | req_data->mgr = nullptr; | |
787 | } | |
788 | if (completion_mgr) { | |
789 | completion_mgr->complete(NULL, req_data->user_info); | |
790 | } | |
791 | ||
792 | req_data->put(); | |
793 | } | |
794 | ||
795 | void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret) | |
796 | { | |
797 | req_data->finish(ret); | |
798 | complete_request(req_data); | |
799 | } | |
800 | ||
801 | void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) | |
802 | { | |
803 | req_data->finish(ret); | |
804 | _complete_request(req_data); | |
805 | } | |
806 | ||
807 | /* | |
808 | * hook request to the curl multi handle | |
809 | */ | |
810 | int RGWHTTPManager::link_request(rgw_http_req_data *req_data) | |
811 | { | |
812 | ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", easy_handle=" << req_data->easy_handle << dendl; | |
813 | CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->easy_handle); | |
814 | if (mstatus) { | |
815 | dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; | |
816 | return -EIO; | |
817 | } | |
818 | return 0; | |
819 | } | |
820 | ||
821 | /* | |
822 | * unhook request from the curl multi handle, and finish request if it wasn't finished yet as | |
823 | * there will be no more processing on this request | |
824 | */ | |
825 | void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) | |
826 | { | |
827 | if (req_data->easy_handle) { | |
828 | curl_multi_remove_handle((CURLM *)multi_handle, req_data->easy_handle); | |
829 | } | |
830 | if (!req_data->is_done()) { | |
831 | _finish_request(req_data, -ECANCELED); | |
832 | } | |
833 | } | |
834 | ||
835 | void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) | |
836 | { | |
837 | RWLock::WLocker wl(reqs_lock); | |
838 | _unlink_request(req_data); | |
839 | } | |
840 | ||
841 | void RGWHTTPManager::manage_pending_requests() | |
842 | { | |
843 | reqs_lock.get_read(); | |
844 | if (max_threaded_req == num_reqs && unregistered_reqs.empty()) { | |
845 | reqs_lock.unlock(); | |
846 | return; | |
847 | } | |
848 | reqs_lock.unlock(); | |
849 | ||
850 | RWLock::WLocker wl(reqs_lock); | |
851 | ||
852 | if (!unregistered_reqs.empty()) { | |
853 | for (auto& r : unregistered_reqs) { | |
854 | _unlink_request(r); | |
855 | r->put(); | |
856 | } | |
857 | ||
858 | unregistered_reqs.clear(); | |
859 | } | |
860 | ||
861 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req); | |
862 | ||
863 | list<std::pair<rgw_http_req_data *, int> > remove_reqs; | |
864 | ||
865 | for (; iter != reqs.end(); ++iter) { | |
866 | rgw_http_req_data *req_data = iter->second; | |
867 | int r = link_request(req_data); | |
868 | if (r < 0) { | |
869 | ldout(cct, 0) << "ERROR: failed to link http request" << dendl; | |
870 | remove_reqs.push_back(std::make_pair(iter->second, r)); | |
871 | } else { | |
872 | max_threaded_req = iter->first + 1; | |
873 | } | |
874 | } | |
875 | ||
876 | for (auto piter : remove_reqs) { | |
877 | rgw_http_req_data *req_data = piter.first; | |
878 | int r = piter.second; | |
879 | ||
880 | _finish_request(req_data, r); | |
881 | } | |
882 | } | |
883 | ||
31f18b77 | 884 | int RGWHTTPManager::add_request(RGWHTTPClient *client, const char *method, const char *url, bool send_data_hint) |
7c673cae FG |
885 | { |
886 | rgw_http_req_data *req_data = new rgw_http_req_data; | |
887 | ||
31f18b77 | 888 | int ret = client->init_request(method, url, req_data, send_data_hint); |
7c673cae FG |
889 | if (ret < 0) { |
890 | req_data->put(); | |
891 | req_data = NULL; | |
892 | return ret; | |
893 | } | |
894 | ||
895 | req_data->mgr = this; | |
896 | req_data->client = client; | |
897 | req_data->user_info = client->get_user_info(); | |
898 | ||
899 | register_request(req_data); | |
900 | ||
901 | if (!is_threaded) { | |
902 | ret = link_request(req_data); | |
903 | if (ret < 0) { | |
904 | req_data->put(); | |
905 | req_data = NULL; | |
906 | } | |
907 | return ret; | |
908 | } | |
909 | ret = signal_thread(); | |
910 | if (ret < 0) { | |
911 | finish_request(req_data, ret); | |
912 | } | |
913 | ||
914 | return ret; | |
915 | } | |
916 | ||
917 | int RGWHTTPManager::remove_request(RGWHTTPClient *client) | |
918 | { | |
919 | rgw_http_req_data *req_data = client->get_req_data(); | |
920 | ||
921 | if (!is_threaded) { | |
922 | unlink_request(req_data); | |
923 | return 0; | |
924 | } | |
925 | unregister_request(req_data); | |
926 | int ret = signal_thread(); | |
927 | if (ret < 0) { | |
928 | return ret; | |
929 | } | |
930 | ||
931 | return 0; | |
932 | } | |
933 | ||
934 | /* | |
935 | * the synchronous, non-threaded request processing method. | |
936 | */ | |
937 | int RGWHTTPManager::process_requests(bool wait_for_data, bool *done) | |
938 | { | |
939 | assert(!is_threaded); | |
940 | ||
941 | int still_running; | |
942 | int mstatus; | |
943 | ||
944 | do { | |
945 | if (wait_for_data) { | |
946 | int ret = do_curl_wait(cct, (CURLM *)multi_handle, -1); | |
947 | if (ret < 0) { | |
948 | return ret; | |
949 | } | |
950 | } | |
951 | ||
952 | mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); | |
953 | switch (mstatus) { | |
954 | case CURLM_OK: | |
955 | case CURLM_CALL_MULTI_PERFORM: | |
956 | break; | |
957 | default: | |
958 | dout(20) << "curl_multi_perform returned: " << mstatus << dendl; | |
959 | return -EINVAL; | |
960 | } | |
961 | int msgs_left; | |
962 | CURLMsg *msg; | |
963 | while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { | |
964 | if (msg->msg == CURLMSG_DONE) { | |
965 | CURL *e = msg->easy_handle; | |
966 | rgw_http_req_data *req_data; | |
967 | curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); | |
968 | ||
969 | long http_status; | |
970 | curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); | |
971 | ||
972 | int status = rgw_http_error_to_errno(http_status); | |
973 | int result = msg->data.result; | |
974 | finish_request(req_data, status); | |
975 | switch (result) { | |
976 | case CURLE_OK: | |
977 | break; | |
978 | default: | |
979 | dout(20) << "ERROR: msg->data.result=" << result << dendl; | |
980 | return -EIO; | |
981 | } | |
982 | } | |
983 | } | |
984 | } while (mstatus == CURLM_CALL_MULTI_PERFORM); | |
985 | ||
986 | *done = (still_running == 0); | |
987 | ||
988 | return 0; | |
989 | } | |
990 | ||
991 | /* | |
992 | * the synchronous, non-threaded request processing completion method. | |
993 | */ | |
994 | int RGWHTTPManager::complete_requests() | |
995 | { | |
996 | bool done = false; | |
997 | int ret; | |
998 | do { | |
999 | ret = process_requests(true, &done); | |
1000 | } while (!done && !ret); | |
1001 | ||
1002 | return ret; | |
1003 | } | |
1004 | ||
1005 | int RGWHTTPManager::set_threaded() | |
1006 | { | |
1007 | int r = pipe(thread_pipe); | |
1008 | if (r < 0) { | |
1009 | r = -errno; | |
1010 | ldout(cct, 0) << "ERROR: pipe() returned errno=" << r << dendl; | |
1011 | return r; | |
1012 | } | |
1013 | ||
1014 | // enable non-blocking reads | |
1015 | r = ::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK); | |
1016 | if (r < 0) { | |
1017 | r = -errno; | |
1018 | ldout(cct, 0) << "ERROR: fcntl() returned errno=" << r << dendl; | |
1019 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); | |
1020 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
1021 | return r; | |
1022 | } | |
1023 | ||
1024 | #ifdef HAVE_CURL_MULTI_WAIT | |
1025 | // on first initialization, use this pipe to detect whether we're using a | |
1026 | // buggy version of libcurl | |
1027 | std::call_once(detect_flag, detect_curl_multi_wait_bug, cct, | |
1028 | static_cast<CURLM*>(multi_handle), | |
1029 | thread_pipe[1], thread_pipe[0]); | |
1030 | #endif | |
1031 | ||
1032 | is_threaded = true; | |
1033 | reqs_thread = new ReqsThread(this); | |
1034 | reqs_thread->create("http_manager"); | |
1035 | return 0; | |
1036 | } | |
1037 | ||
1038 | void RGWHTTPManager::stop() | |
1039 | { | |
1040 | if (is_stopped) { | |
1041 | return; | |
1042 | } | |
1043 | ||
1044 | is_stopped = true; | |
1045 | ||
1046 | if (is_threaded) { | |
1047 | going_down = true; | |
1048 | signal_thread(); | |
1049 | reqs_thread->join(); | |
1050 | delete reqs_thread; | |
1051 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
1052 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); | |
1053 | } | |
1054 | } | |
1055 | ||
1056 | int RGWHTTPManager::signal_thread() | |
1057 | { | |
1058 | uint32_t buf = 0; | |
1059 | int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf)); | |
1060 | if (ret < 0) { | |
1061 | ret = -errno; | |
1062 | ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl; | |
1063 | return ret; | |
1064 | } | |
1065 | return 0; | |
1066 | } | |
1067 | ||
1068 | void *RGWHTTPManager::reqs_thread_entry() | |
1069 | { | |
1070 | int still_running; | |
1071 | int mstatus; | |
1072 | ||
1073 | ldout(cct, 20) << __func__ << ": start" << dendl; | |
1074 | ||
1075 | while (!going_down) { | |
1076 | int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]); | |
1077 | if (ret < 0) { | |
1078 | dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; | |
1079 | return NULL; | |
1080 | } | |
1081 | ||
1082 | manage_pending_requests(); | |
1083 | ||
1084 | mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); | |
1085 | switch (mstatus) { | |
1086 | case CURLM_OK: | |
1087 | case CURLM_CALL_MULTI_PERFORM: | |
1088 | break; | |
1089 | default: | |
1090 | dout(10) << "curl_multi_perform returned: " << mstatus << dendl; | |
1091 | break; | |
1092 | } | |
1093 | int msgs_left; | |
1094 | CURLMsg *msg; | |
1095 | while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { | |
1096 | if (msg->msg == CURLMSG_DONE) { | |
1097 | int result = msg->data.result; | |
1098 | CURL *e = msg->easy_handle; | |
1099 | rgw_http_req_data *req_data; | |
1100 | curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); | |
1101 | curl_multi_remove_handle((CURLM *)multi_handle, e); | |
1102 | ||
1103 | long http_status; | |
1104 | curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); | |
1105 | ||
1106 | int status = rgw_http_error_to_errno(http_status); | |
1107 | if (result != CURLE_OK && http_status == 0) { | |
1108 | status = -EAGAIN; | |
1109 | } | |
1110 | int id = req_data->id; | |
1111 | finish_request(req_data, status); | |
1112 | switch (result) { | |
1113 | case CURLE_OK: | |
1114 | break; | |
1115 | default: | |
1116 | dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl; | |
1117 | break; | |
1118 | } | |
1119 | } | |
1120 | } | |
1121 | } | |
1122 | ||
1123 | ||
1124 | RWLock::WLocker rl(reqs_lock); | |
1125 | for (auto r : unregistered_reqs) { | |
1126 | _finish_request(r, -ECANCELED); | |
1127 | } | |
1128 | ||
1129 | unregistered_reqs.clear(); | |
1130 | ||
1131 | auto all_reqs = std::move(reqs); | |
1132 | for (auto iter : all_reqs) { | |
1133 | _finish_request(iter.second, -ECANCELED); | |
1134 | } | |
1135 | ||
1136 | reqs.clear(); | |
1137 | ||
1138 | if (completion_mgr) { | |
1139 | completion_mgr->go_down(); | |
1140 | } | |
1141 | ||
1142 | return 0; | |
1143 | } | |
1144 | ||
1145 |