]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #include "include/compat.h" | |
91327a77 | 5 | #include "common/errno.h" |
7c673cae FG |
6 | |
7 | #include <boost/utility/string_ref.hpp> | |
8 | ||
9 | #include <curl/curl.h> | |
10 | #include <curl/easy.h> | |
11 | #include <curl/multi.h> | |
12 | ||
13 | #include "rgw_common.h" | |
14 | #include "rgw_http_client.h" | |
15 | #include "rgw_http_errors.h" | |
eafe8130 | 16 | #include "common/async/completion.h" |
7c673cae FG |
17 | #include "common/RefCountedObj.h" |
18 | ||
19 | #include "rgw_coroutine.h" | |
9f95a23c | 20 | #include "rgw_tools.h" |
7c673cae FG |
21 | |
22 | #include <atomic> | |
23 | ||
24 | #define dout_context g_ceph_context | |
25 | #define dout_subsys ceph_subsys_rgw | |
26 | ||
11fdf7f2 TL |
27 | RGWHTTPManager *rgw_http_manager; |
28 | ||
29 | struct RGWCurlHandle; | |
30 | ||
31 | static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle); | |
32 | ||
7c673cae | 33 | struct rgw_http_req_data : public RefCountedObject { |
11fdf7f2 TL |
34 | RGWCurlHandle *curl_handle{nullptr}; |
35 | curl_slist *h{nullptr}; | |
7c673cae | 36 | uint64_t id; |
11fdf7f2 | 37 | int ret{0}; |
7c673cae | 38 | std::atomic<bool> done = { false }; |
11fdf7f2 TL |
39 | RGWHTTPClient *client{nullptr}; |
40 | rgw_io_id control_io_id; | |
41 | void *user_info{nullptr}; | |
42 | bool registered{false}; | |
43 | RGWHTTPManager *mgr{nullptr}; | |
7c673cae | 44 | char error_buf[CURL_ERROR_SIZE]; |
11fdf7f2 TL |
45 | bool write_paused{false}; |
46 | bool read_paused{false}; | |
7c673cae | 47 | |
9f95a23c TL |
48 | optional<int> user_ret; |
49 | ||
50 | ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock"); | |
51 | ceph::condition_variable cond; | |
7c673cae | 52 | |
eafe8130 TL |
53 | using Signature = void(boost::system::error_code); |
54 | using Completion = ceph::async::Completion<Signature>; | |
55 | std::unique_ptr<Completion> completion; | |
56 | ||
9f95a23c | 57 | rgw_http_req_data() : id(-1) { |
92f5a8d4 | 58 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae FG |
59 | memset(error_buf, 0, sizeof(error_buf)); |
60 | } | |
61 | ||
eafe8130 TL |
62 | template <typename ExecutionContext, typename CompletionToken> |
63 | auto async_wait(ExecutionContext& ctx, CompletionToken&& token) { | |
64 | boost::asio::async_completion<CompletionToken, Signature> init(token); | |
65 | auto& handler = init.completion_handler; | |
66 | { | |
67 | std::unique_lock l{lock}; | |
68 | completion = Completion::create(ctx.get_executor(), std::move(handler)); | |
69 | } | |
70 | return init.result.get(); | |
71 | } | |
9f95a23c | 72 | |
eafe8130 | 73 | int wait(optional_yield y) { |
11fdf7f2 TL |
74 | if (done) { |
75 | return ret; | |
76 | } | |
eafe8130 TL |
77 | #ifdef HAVE_BOOST_CONTEXT |
78 | if (y) { | |
79 | auto& context = y.get_io_context(); | |
80 | auto& yield = y.get_yield_context(); | |
81 | boost::system::error_code ec; | |
82 | async_wait(context, yield[ec]); | |
83 | return -ec.value(); | |
84 | } | |
9f95a23c TL |
85 | // work on asio threads should be asynchronous, so warn when they block |
86 | if (is_asio_thread) { | |
87 | dout(20) << "WARNING: blocking http request" << dendl; | |
88 | } | |
eafe8130 | 89 | #endif |
9f95a23c TL |
90 | std::unique_lock l{lock}; |
91 | cond.wait(l); | |
7c673cae FG |
92 | return ret; |
93 | } | |
94 | ||
11fdf7f2 | 95 | void set_state(int bitmask); |
7c673cae | 96 | |
9f95a23c TL |
97 | void finish(int r, long http_status = -1) { |
98 | std::lock_guard l{lock}; | |
99 | if (http_status != -1) { | |
100 | if (client) { | |
101 | client->set_http_status(http_status); | |
102 | } | |
103 | } | |
7c673cae | 104 | ret = r; |
11fdf7f2 TL |
105 | if (curl_handle) |
106 | do_curl_easy_cleanup(curl_handle); | |
7c673cae FG |
107 | |
108 | if (h) | |
109 | curl_slist_free_all(h); | |
110 | ||
11fdf7f2 | 111 | curl_handle = NULL; |
7c673cae FG |
112 | h = NULL; |
113 | done = true; | |
eafe8130 TL |
114 | if (completion) { |
115 | boost::system::error_code ec(-ret, boost::system::system_category()); | |
116 | Completion::post(std::move(completion), ec); | |
117 | } else { | |
9f95a23c | 118 | cond.notify_all(); |
eafe8130 | 119 | } |
11fdf7f2 TL |
120 | } |
121 | ||
7c673cae FG |
122 | bool is_done() { |
123 | return done; | |
124 | } | |
125 | ||
126 | int get_retcode() { | |
9f95a23c | 127 | std::lock_guard l{lock}; |
7c673cae FG |
128 | return ret; |
129 | } | |
9f95a23c | 130 | |
7c673cae | 131 | RGWHTTPManager *get_manager() { |
9f95a23c | 132 | std::lock_guard l{lock}; |
7c673cae FG |
133 | return mgr; |
134 | } | |
11fdf7f2 TL |
135 | |
136 | CURL *get_easy_handle() const; | |
7c673cae FG |
137 | }; |
138 | ||
94b18763 FG |
139 | struct RGWCurlHandle { |
140 | int uses; | |
141 | mono_time lastuse; | |
142 | CURL* h; | |
143 | ||
11fdf7f2 | 144 | explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {}; |
94b18763 FG |
145 | CURL* operator*() { |
146 | return this->h; | |
147 | } | |
148 | }; | |
149 | ||
11fdf7f2 TL |
150 | void rgw_http_req_data::set_state(int bitmask) { |
151 | /* no need to lock here, moreover curl_easy_pause() might trigger | |
152 | * the data receive callback :/ | |
153 | */ | |
154 | CURLcode rc = curl_easy_pause(**curl_handle, bitmask); | |
155 | if (rc != CURLE_OK) { | |
156 | dout(0) << "ERROR: curl_easy_pause() returned rc=" << rc << dendl; | |
157 | } | |
158 | } | |
159 | ||
94b18763 FG |
160 | #define MAXIDLE 5 |
161 | class RGWCurlHandles : public Thread { | |
162 | public: | |
9f95a23c TL |
163 | ceph::mutex cleaner_lock = ceph::make_mutex("RGWCurlHandles::cleaner_lock"); |
164 | std::vector<RGWCurlHandle*> saved_curl; | |
94b18763 | 165 | int cleaner_shutdown; |
9f95a23c | 166 | ceph::condition_variable cleaner_cond; |
94b18763 FG |
167 | |
168 | RGWCurlHandles() : | |
94b18763 FG |
169 | cleaner_shutdown{0} { |
170 | } | |
171 | ||
172 | RGWCurlHandle* get_curl_handle(); | |
173 | void release_curl_handle_now(RGWCurlHandle* curl); | |
174 | void release_curl_handle(RGWCurlHandle* curl); | |
175 | void flush_curl_handles(); | |
176 | void* entry(); | |
177 | void stop(); | |
178 | }; | |
179 | ||
180 | RGWCurlHandle* RGWCurlHandles::get_curl_handle() { | |
181 | RGWCurlHandle* curl = 0; | |
182 | CURL* h; | |
183 | { | |
9f95a23c | 184 | std::lock_guard lock{cleaner_lock}; |
94b18763 FG |
185 | if (!saved_curl.empty()) { |
186 | curl = *saved_curl.begin(); | |
187 | saved_curl.erase(saved_curl.begin()); | |
188 | } | |
189 | } | |
190 | if (curl) { | |
191 | } else if ((h = curl_easy_init())) { | |
192 | curl = new RGWCurlHandle{h}; | |
193 | } else { | |
194 | // curl = 0; | |
195 | } | |
196 | return curl; | |
197 | } | |
198 | ||
199 | void RGWCurlHandles::release_curl_handle_now(RGWCurlHandle* curl) | |
200 | { | |
201 | curl_easy_cleanup(**curl); | |
202 | delete curl; | |
203 | } | |
204 | ||
205 | void RGWCurlHandles::release_curl_handle(RGWCurlHandle* curl) | |
206 | { | |
207 | if (cleaner_shutdown) { | |
208 | release_curl_handle_now(curl); | |
209 | } else { | |
210 | curl_easy_reset(**curl); | |
9f95a23c | 211 | std::lock_guard lock{cleaner_lock}; |
94b18763 FG |
212 | curl->lastuse = mono_clock::now(); |
213 | saved_curl.insert(saved_curl.begin(), 1, curl); | |
214 | } | |
215 | } | |
216 | ||
217 | void* RGWCurlHandles::entry() | |
218 | { | |
219 | RGWCurlHandle* curl; | |
9f95a23c | 220 | std::unique_lock lock{cleaner_lock}; |
94b18763 FG |
221 | |
222 | for (;;) { | |
223 | if (cleaner_shutdown) { | |
224 | if (saved_curl.empty()) | |
225 | break; | |
226 | } else { | |
9f95a23c | 227 | cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE)); |
94b18763 FG |
228 | } |
229 | mono_time now = mono_clock::now(); | |
230 | while (!saved_curl.empty()) { | |
231 | auto cend = saved_curl.end(); | |
232 | --cend; | |
233 | curl = *cend; | |
234 | if (!cleaner_shutdown && now - curl->lastuse < std::chrono::seconds(MAXIDLE)) | |
235 | break; | |
236 | saved_curl.erase(cend); | |
237 | release_curl_handle_now(curl); | |
238 | } | |
239 | } | |
240 | return nullptr; | |
241 | } | |
242 | ||
243 | void RGWCurlHandles::stop() | |
244 | { | |
9f95a23c | 245 | std::lock_guard lock{cleaner_lock}; |
94b18763 | 246 | cleaner_shutdown = 1; |
9f95a23c | 247 | cleaner_cond.notify_all(); |
94b18763 FG |
248 | } |
249 | ||
250 | void RGWCurlHandles::flush_curl_handles() | |
251 | { | |
252 | stop(); | |
253 | join(); | |
254 | if (!saved_curl.empty()) { | |
255 | dout(0) << "ERROR: " << __func__ << " failed final cleanup" << dendl; | |
256 | } | |
257 | saved_curl.shrink_to_fit(); | |
258 | } | |
259 | ||
11fdf7f2 TL |
260 | CURL *rgw_http_req_data::get_easy_handle() const |
261 | { | |
262 | return **curl_handle; | |
263 | } | |
264 | ||
94b18763 | 265 | static RGWCurlHandles *handles; |
11fdf7f2 TL |
266 | |
267 | static RGWCurlHandle *do_curl_easy_init() | |
268 | { | |
269 | return handles->get_curl_handle(); | |
270 | } | |
271 | ||
272 | static void do_curl_easy_cleanup(RGWCurlHandle *curl_handle) | |
273 | { | |
274 | handles->release_curl_handle(curl_handle); | |
275 | } | |
276 | ||
94b18763 FG |
277 | // XXX make this part of the token cache? (but that's swift-only; |
278 | // and this especially needs to integrates with s3...) | |
279 | ||
280 | void rgw_setup_saved_curl_handles() | |
281 | { | |
282 | handles = new RGWCurlHandles(); | |
283 | handles->create("rgw_curl"); | |
284 | } | |
285 | ||
286 | void rgw_release_all_curl_handles() | |
287 | { | |
288 | handles->flush_curl_handles(); | |
289 | delete handles; | |
290 | } | |
291 | ||
11fdf7f2 | 292 | void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type) |
7c673cae | 293 | { |
11fdf7f2 TL |
294 | if (id == 0) { |
295 | id = io_id_provider.get_next(); | |
7c673cae | 296 | } |
7c673cae FG |
297 | } |
298 | ||
299 | /* | |
300 | * the following set of callbacks will be called either on RGWHTTPManager::process(), | |
301 | * or via the RGWHTTPManager async processing. | |
302 | */ | |
303 | size_t RGWHTTPClient::receive_http_header(void * const ptr, | |
304 | const size_t size, | |
305 | const size_t nmemb, | |
306 | void * const _info) | |
307 | { | |
308 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
309 | size_t len = size * nmemb; | |
310 | ||
9f95a23c | 311 | std::lock_guard l{req_data->lock}; |
7c673cae FG |
312 | |
313 | if (!req_data->registered) { | |
314 | return len; | |
315 | } | |
316 | ||
317 | int ret = req_data->client->receive_header(ptr, size * nmemb); | |
318 | if (ret < 0) { | |
9f95a23c TL |
319 | dout(5) << "WARNING: client->receive_header() returned ret=" << ret << dendl; |
320 | req_data->user_ret = ret; | |
321 | return CURLE_WRITE_ERROR; | |
7c673cae FG |
322 | } |
323 | ||
324 | return len; | |
325 | } | |
326 | ||
327 | size_t RGWHTTPClient::receive_http_data(void * const ptr, | |
328 | const size_t size, | |
329 | const size_t nmemb, | |
330 | void * const _info) | |
331 | { | |
332 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
333 | size_t len = size * nmemb; | |
334 | ||
11fdf7f2 TL |
335 | bool pause = false; |
336 | ||
337 | RGWHTTPClient *client; | |
338 | ||
339 | { | |
9f95a23c | 340 | std::lock_guard l{req_data->lock}; |
11fdf7f2 TL |
341 | if (!req_data->registered) { |
342 | return len; | |
343 | } | |
344 | ||
345 | client = req_data->client; | |
346 | } | |
347 | ||
348 | size_t& skip_bytes = client->receive_pause_skip; | |
349 | ||
350 | if (skip_bytes >= len) { | |
351 | skip_bytes -= len; | |
7c673cae FG |
352 | return len; |
353 | } | |
11fdf7f2 TL |
354 | |
355 | int ret = client->receive_data((char *)ptr + skip_bytes, len - skip_bytes, &pause); | |
7c673cae | 356 | if (ret < 0) { |
9f95a23c TL |
357 | dout(5) << "WARNING: client->receive_data() returned ret=" << ret << dendl; |
358 | req_data->user_ret = ret; | |
359 | return CURLE_WRITE_ERROR; | |
7c673cae FG |
360 | } |
361 | ||
11fdf7f2 TL |
362 | if (pause) { |
363 | dout(20) << "RGWHTTPClient::receive_http_data(): pause" << dendl; | |
364 | skip_bytes = len; | |
9f95a23c | 365 | std::lock_guard l{req_data->lock}; |
11fdf7f2 TL |
366 | req_data->read_paused = true; |
367 | return CURL_WRITEFUNC_PAUSE; | |
368 | } | |
369 | ||
370 | skip_bytes = 0; | |
371 | ||
7c673cae FG |
372 | return len; |
373 | } | |
374 | ||
375 | size_t RGWHTTPClient::send_http_data(void * const ptr, | |
376 | const size_t size, | |
377 | const size_t nmemb, | |
378 | void * const _info) | |
379 | { | |
380 | rgw_http_req_data *req_data = static_cast<rgw_http_req_data *>(_info); | |
381 | ||
11fdf7f2 TL |
382 | RGWHTTPClient *client; |
383 | ||
384 | { | |
9f95a23c | 385 | std::lock_guard l{req_data->lock}; |
7c673cae | 386 | |
11fdf7f2 TL |
387 | if (!req_data->registered) { |
388 | return 0; | |
389 | } | |
390 | ||
391 | client = req_data->client; | |
7c673cae FG |
392 | } |
393 | ||
11fdf7f2 TL |
394 | bool pause = false; |
395 | ||
396 | int ret = client->send_data(ptr, size * nmemb, &pause); | |
7c673cae | 397 | if (ret < 0) { |
9f95a23c TL |
398 | dout(5) << "WARNING: client->send_data() returned ret=" << ret << dendl; |
399 | req_data->user_ret = ret; | |
400 | return CURLE_READ_ERROR; | |
7c673cae FG |
401 | } |
402 | ||
11fdf7f2 TL |
403 | if (ret == 0 && |
404 | pause) { | |
9f95a23c | 405 | std::lock_guard l{req_data->lock}; |
11fdf7f2 TL |
406 | req_data->write_paused = true; |
407 | return CURL_READFUNC_PAUSE; | |
408 | } | |
409 | ||
7c673cae FG |
410 | return ret; |
411 | } | |
412 | ||
9f95a23c | 413 | ceph::mutex& RGWHTTPClient::get_req_lock() |
11fdf7f2 TL |
414 | { |
415 | return req_data->lock; | |
416 | } | |
417 | ||
418 | void RGWHTTPClient::_set_write_paused(bool pause) | |
419 | { | |
9f95a23c | 420 | ceph_assert(ceph_mutex_is_locked(req_data->lock)); |
11fdf7f2 TL |
421 | |
422 | RGWHTTPManager *mgr = req_data->mgr; | |
423 | if (pause == req_data->write_paused) { | |
424 | return; | |
425 | } | |
426 | if (pause) { | |
427 | mgr->set_request_state(this, SET_WRITE_PAUSED); | |
428 | } else { | |
429 | mgr->set_request_state(this, SET_WRITE_RESUME); | |
430 | } | |
431 | } | |
432 | ||
433 | void RGWHTTPClient::_set_read_paused(bool pause) | |
434 | { | |
9f95a23c | 435 | ceph_assert(ceph_mutex_is_locked(req_data->lock)); |
11fdf7f2 TL |
436 | |
437 | RGWHTTPManager *mgr = req_data->mgr; | |
438 | if (pause == req_data->read_paused) { | |
439 | return; | |
440 | } | |
441 | if (pause) { | |
442 | mgr->set_request_state(this, SET_READ_PAUSED); | |
443 | } else { | |
444 | mgr->set_request_state(this, SET_READ_RESUME); | |
445 | } | |
446 | } | |
447 | ||
7c673cae FG |
448 | static curl_slist *headers_to_slist(param_vec_t& headers) |
449 | { | |
450 | curl_slist *h = NULL; | |
451 | ||
452 | param_vec_t::iterator iter; | |
453 | for (iter = headers.begin(); iter != headers.end(); ++iter) { | |
454 | pair<string, string>& p = *iter; | |
455 | string val = p.first; | |
456 | ||
457 | if (strncmp(val.c_str(), "HTTP_", 5) == 0) { | |
458 | val = val.substr(5); | |
459 | } | |
460 | ||
461 | /* we need to convert all underscores into dashes as some web servers forbid them | |
462 | * in the http header field names | |
463 | */ | |
464 | for (size_t i = 0; i < val.size(); i++) { | |
465 | if (val[i] == '_') { | |
466 | val[i] = '-'; | |
467 | } | |
468 | } | |
11fdf7f2 TL |
469 | |
470 | val = camelcase_dash_http_attr(val); | |
7c673cae | 471 | |
28e407b8 AA |
472 | // curl won't send headers with empty values unless it ends with a ; instead |
473 | if (p.second.empty()) { | |
474 | val.append(1, ';'); | |
475 | } else { | |
476 | val.append(": "); | |
477 | val.append(p.second); | |
478 | } | |
7c673cae FG |
479 | h = curl_slist_append(h, val.c_str()); |
480 | } | |
481 | ||
482 | return h; | |
483 | } | |
484 | ||
11fdf7f2 | 485 | static bool is_upload_request(const string& method) |
7c673cae | 486 | { |
11fdf7f2 | 487 | return method == "POST" || method == "PUT"; |
7c673cae FG |
488 | } |
489 | ||
490 | /* | |
11fdf7f2 | 491 | * process a single simple one off request |
7c673cae | 492 | */ |
eafe8130 | 493 | int RGWHTTPClient::process(optional_yield y) |
7c673cae | 494 | { |
eafe8130 | 495 | return RGWHTTP::process(this, y); |
7c673cae FG |
496 | } |
497 | ||
498 | string RGWHTTPClient::to_str() | |
499 | { | |
11fdf7f2 TL |
500 | string method_str = (method.empty() ? "<no-method>" : method); |
501 | string url_str = (url.empty() ? "<no-url>" : url); | |
7c673cae FG |
502 | return method_str + " " + url_str; |
503 | } | |
504 | ||
505 | int RGWHTTPClient::get_req_retcode() | |
506 | { | |
507 | if (!req_data) { | |
508 | return -EINVAL; | |
509 | } | |
510 | ||
511 | return req_data->get_retcode(); | |
512 | } | |
513 | ||
514 | /* | |
515 | * init request, will be used later with RGWHTTPManager | |
516 | */ | |
11fdf7f2 | 517 | int RGWHTTPClient::init_request(rgw_http_req_data *_req_data) |
7c673cae | 518 | { |
11fdf7f2 | 519 | ceph_assert(!req_data); |
7c673cae FG |
520 | _req_data->get(); |
521 | req_data = _req_data; | |
522 | ||
11fdf7f2 | 523 | req_data->curl_handle = do_curl_easy_init(); |
7c673cae | 524 | |
11fdf7f2 | 525 | CURL *easy_handle = req_data->get_easy_handle(); |
7c673cae FG |
526 | |
527 | dout(20) << "sending request to " << url << dendl; | |
528 | ||
529 | curl_slist *h = headers_to_slist(headers); | |
530 | ||
531 | req_data->h = h; | |
532 | ||
11fdf7f2 TL |
533 | curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str()); |
534 | curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str()); | |
7c673cae FG |
535 | curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L); |
536 | curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L); | |
537 | curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header); | |
538 | curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data); | |
539 | curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data); | |
540 | curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data); | |
541 | curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf); | |
1adf2230 AA |
542 | curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time); |
543 | curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit); | |
7c673cae FG |
544 | if (h) { |
545 | curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h); | |
546 | } | |
547 | curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data); | |
548 | curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data); | |
31f18b77 | 549 | if (send_data_hint || is_upload_request(method)) { |
7c673cae FG |
550 | curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L); |
551 | } | |
552 | if (has_send_len) { | |
553 | curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, (void *)send_len); | |
554 | } | |
31f18b77 FG |
555 | if (!verify_ssl) { |
556 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L); | |
557 | curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L); | |
558 | dout(20) << "ssl verification is set to off" << dendl; | |
559 | } | |
7c673cae FG |
560 | curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data); |
561 | ||
562 | return 0; | |
563 | } | |
564 | ||
11fdf7f2 TL |
565 | bool RGWHTTPClient::is_done() |
566 | { | |
567 | return req_data->is_done(); | |
568 | } | |
569 | ||
7c673cae FG |
570 | /* |
571 | * wait for async request to complete | |
572 | */ | |
eafe8130 | 573 | int RGWHTTPClient::wait(optional_yield y) |
7c673cae | 574 | { |
eafe8130 | 575 | return req_data->wait(y); |
7c673cae FG |
576 | } |
577 | ||
11fdf7f2 | 578 | void RGWHTTPClient::cancel() |
7c673cae FG |
579 | { |
580 | if (req_data) { | |
11fdf7f2 | 581 | RGWHTTPManager *http_manager = req_data->mgr; |
7c673cae FG |
582 | if (http_manager) { |
583 | http_manager->remove_request(this); | |
584 | } | |
11fdf7f2 TL |
585 | } |
586 | } | |
7c673cae | 587 | |
11fdf7f2 TL |
588 | RGWHTTPClient::~RGWHTTPClient() |
589 | { | |
590 | cancel(); | |
591 | if (req_data) { | |
7c673cae FG |
592 | req_data->put(); |
593 | } | |
594 | } | |
595 | ||
596 | ||
597 | int RGWHTTPHeadersCollector::receive_header(void * const ptr, const size_t len) | |
598 | { | |
11fdf7f2 | 599 | const boost::string_ref header_line(static_cast<const char *>(ptr), len); |
7c673cae FG |
600 | |
601 | /* We're tokening the line that way due to backward compatibility. */ | |
602 | const size_t sep_loc = header_line.find_first_of(" \t:"); | |
603 | ||
604 | if (boost::string_ref::npos == sep_loc) { | |
605 | /* Wrongly formatted header? Just skip it. */ | |
606 | return 0; | |
607 | } | |
608 | ||
609 | header_name_t name(header_line.substr(0, sep_loc)); | |
610 | if (0 == relevant_headers.count(name)) { | |
611 | /* Not interested in this particular header. */ | |
612 | return 0; | |
613 | } | |
614 | ||
615 | const auto value_part = header_line.substr(sep_loc + 1); | |
616 | ||
617 | /* Skip spaces and tabs after the separator. */ | |
618 | const size_t val_loc_s = value_part.find_first_not_of(' '); | |
619 | const size_t val_loc_e = value_part.find_first_of("\r\n"); | |
620 | ||
621 | if (boost::string_ref::npos == val_loc_s || | |
622 | boost::string_ref::npos == val_loc_e) { | |
623 | /* Empty value case. */ | |
624 | found_headers.emplace(name, header_value_t()); | |
625 | } else { | |
626 | found_headers.emplace(name, header_value_t( | |
627 | value_part.substr(val_loc_s, val_loc_e - val_loc_s))); | |
628 | } | |
629 | ||
630 | return 0; | |
631 | } | |
632 | ||
11fdf7f2 | 633 | int RGWHTTPTransceiver::send_data(void* ptr, size_t len, bool* pause) |
7c673cae FG |
634 | { |
635 | int length_to_copy = 0; | |
636 | if (post_data_index < post_data.length()) { | |
637 | length_to_copy = min(post_data.length() - post_data_index, len); | |
638 | memcpy(ptr, post_data.data() + post_data_index, length_to_copy); | |
639 | post_data_index += length_to_copy; | |
640 | } | |
641 | return length_to_copy; | |
642 | } | |
643 | ||
644 | ||
645 | static int clear_signal(int fd) | |
646 | { | |
647 | // since we're in non-blocking mode, we can try to read a lot more than | |
648 | // one signal from signal_thread() to avoid later wakeups. non-blocking reads | |
649 | // are also required to support the curl_multi_wait bug workaround | |
650 | std::array<char, 256> buf; | |
651 | int ret = ::read(fd, (void *)buf.data(), buf.size()); | |
652 | if (ret < 0) { | |
653 | ret = -errno; | |
654 | return ret == -EAGAIN ? 0 : ret; // clear EAGAIN | |
655 | } | |
656 | return 0; | |
657 | } | |
658 | ||
659 | #if HAVE_CURL_MULTI_WAIT | |
660 | ||
661 | static std::once_flag detect_flag; | |
662 | static bool curl_multi_wait_bug_present = false; | |
663 | ||
664 | static int detect_curl_multi_wait_bug(CephContext *cct, CURLM *handle, | |
665 | int write_fd, int read_fd) | |
666 | { | |
667 | int ret = 0; | |
668 | ||
669 | // write to write_fd so that read_fd becomes readable | |
670 | uint32_t buf = 0; | |
671 | ret = ::write(write_fd, &buf, sizeof(buf)); | |
672 | if (ret < 0) { | |
673 | ret = -errno; | |
674 | ldout(cct, 0) << "ERROR: " << __func__ << "(): write() returned " << ret << dendl; | |
675 | return ret; | |
676 | } | |
677 | ||
678 | // pass read_fd in extra_fds for curl_multi_wait() | |
679 | int num_fds; | |
680 | struct curl_waitfd wait_fd; | |
681 | ||
682 | wait_fd.fd = read_fd; | |
683 | wait_fd.events = CURL_WAIT_POLLIN; | |
684 | wait_fd.revents = 0; | |
685 | ||
686 | ret = curl_multi_wait(handle, &wait_fd, 1, 0, &num_fds); | |
687 | if (ret != CURLM_OK) { | |
688 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
689 | return -EIO; | |
690 | } | |
691 | ||
692 | // curl_multi_wait should flag revents when extra_fd is readable. if it | |
693 | // doesn't, the bug is present and we can't rely on revents | |
694 | if (wait_fd.revents == 0) { | |
695 | curl_multi_wait_bug_present = true; | |
696 | ldout(cct, 0) << "WARNING: detected a version of libcurl which contains a " | |
697 | "bug in curl_multi_wait(). enabling a workaround that may degrade " | |
698 | "performance slightly." << dendl; | |
699 | } | |
700 | ||
701 | return clear_signal(read_fd); | |
702 | } | |
703 | ||
704 | static bool is_signaled(const curl_waitfd& wait_fd) | |
705 | { | |
706 | if (wait_fd.fd < 0) { | |
707 | // no fd to signal | |
708 | return false; | |
709 | } | |
710 | ||
711 | if (curl_multi_wait_bug_present) { | |
712 | // we can't rely on revents, so we always return true if a wait_fd is given. | |
713 | // this means we'll be trying a non-blocking read on this fd every time that | |
714 | // curl_multi_wait() wakes up | |
715 | return true; | |
716 | } | |
717 | ||
718 | return wait_fd.revents > 0; | |
719 | } | |
720 | ||
721 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) | |
722 | { | |
723 | int num_fds; | |
724 | struct curl_waitfd wait_fd; | |
725 | ||
726 | wait_fd.fd = signal_fd; | |
727 | wait_fd.events = CURL_WAIT_POLLIN; | |
728 | wait_fd.revents = 0; | |
729 | ||
730 | int ret = curl_multi_wait(handle, &wait_fd, 1, cct->_conf->rgw_curl_wait_timeout_ms, &num_fds); | |
731 | if (ret) { | |
732 | ldout(cct, 0) << "ERROR: curl_multi_wait() returned " << ret << dendl; | |
733 | return -EIO; | |
734 | } | |
735 | ||
736 | if (is_signaled(wait_fd)) { | |
737 | ret = clear_signal(signal_fd); | |
738 | if (ret < 0) { | |
739 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
740 | return ret; | |
741 | } | |
742 | } | |
743 | return 0; | |
744 | } | |
745 | ||
746 | #else | |
747 | ||
748 | static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd) | |
749 | { | |
750 | fd_set fdread; | |
751 | fd_set fdwrite; | |
752 | fd_set fdexcep; | |
753 | int maxfd = -1; | |
754 | ||
755 | FD_ZERO(&fdread); | |
756 | FD_ZERO(&fdwrite); | |
757 | FD_ZERO(&fdexcep); | |
758 | ||
759 | /* get file descriptors from the transfers */ | |
760 | int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd); | |
761 | if (ret) { | |
762 | ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl; | |
763 | return -EIO; | |
764 | } | |
765 | ||
766 | if (signal_fd > 0) { | |
767 | FD_SET(signal_fd, &fdread); | |
768 | if (signal_fd >= maxfd) { | |
769 | maxfd = signal_fd + 1; | |
770 | } | |
771 | } | |
772 | ||
773 | /* forcing a strict timeout, as the returned fdsets might not reference all fds we wait on */ | |
774 | uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms; | |
775 | #define RGW_CURL_TIMEOUT 1000 | |
776 | if (!to) | |
777 | to = RGW_CURL_TIMEOUT; | |
778 | struct timeval timeout; | |
779 | timeout.tv_sec = to / 1000; | |
780 | timeout.tv_usec = to % 1000; | |
781 | ||
782 | ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout); | |
783 | if (ret < 0) { | |
784 | ret = -errno; | |
785 | ldout(cct, 0) << "ERROR: select returned " << ret << dendl; | |
786 | return ret; | |
787 | } | |
788 | ||
789 | if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) { | |
790 | ret = clear_signal(signal_fd); | |
791 | if (ret < 0) { | |
792 | ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl; | |
793 | return ret; | |
794 | } | |
795 | } | |
796 | ||
797 | return 0; | |
798 | } | |
799 | ||
800 | #endif | |
801 | ||
802 | void *RGWHTTPManager::ReqsThread::entry() | |
803 | { | |
804 | manager->reqs_thread_entry(); | |
805 | return NULL; | |
806 | } | |
807 | ||
808 | /* | |
809 | * RGWHTTPManager has two modes of operation: threaded and non-threaded. | |
810 | */ | |
811 | RGWHTTPManager::RGWHTTPManager(CephContext *_cct, RGWCompletionManager *_cm) : cct(_cct), | |
9f95a23c | 812 | completion_mgr(_cm) |
7c673cae FG |
813 | { |
814 | multi_handle = (void *)curl_multi_init(); | |
815 | thread_pipe[0] = -1; | |
816 | thread_pipe[1] = -1; | |
817 | } | |
818 | ||
819 | RGWHTTPManager::~RGWHTTPManager() { | |
820 | stop(); | |
821 | if (multi_handle) | |
822 | curl_multi_cleanup((CURLM *)multi_handle); | |
823 | } | |
824 | ||
825 | void RGWHTTPManager::register_request(rgw_http_req_data *req_data) | |
826 | { | |
9f95a23c | 827 | std::unique_lock rl{reqs_lock}; |
7c673cae FG |
828 | req_data->id = num_reqs; |
829 | req_data->registered = true; | |
830 | reqs[num_reqs] = req_data; | |
831 | num_reqs++; | |
11fdf7f2 | 832 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; |
7c673cae FG |
833 | } |
834 | ||
11fdf7f2 | 835 | bool RGWHTTPManager::unregister_request(rgw_http_req_data *req_data) |
7c673cae | 836 | { |
9f95a23c | 837 | std::unique_lock rl{reqs_lock}; |
11fdf7f2 TL |
838 | if (!req_data->registered) { |
839 | return false; | |
840 | } | |
7c673cae FG |
841 | req_data->get(); |
842 | req_data->registered = false; | |
843 | unregistered_reqs.push_back(req_data); | |
11fdf7f2 TL |
844 | ldout(cct, 20) << __func__ << " mgr=" << this << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; |
845 | return true; | |
7c673cae FG |
846 | } |
847 | ||
848 | void RGWHTTPManager::complete_request(rgw_http_req_data *req_data) | |
849 | { | |
9f95a23c | 850 | std::unique_lock rl{reqs_lock}; |
7c673cae FG |
851 | _complete_request(req_data); |
852 | } | |
853 | ||
854 | void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data) | |
855 | { | |
856 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id); | |
857 | if (iter != reqs.end()) { | |
858 | reqs.erase(iter); | |
859 | } | |
860 | { | |
9f95a23c | 861 | std::lock_guard l{req_data->lock}; |
7c673cae FG |
862 | req_data->mgr = nullptr; |
863 | } | |
864 | if (completion_mgr) { | |
11fdf7f2 | 865 | completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info); |
7c673cae FG |
866 | } |
867 | ||
868 | req_data->put(); | |
869 | } | |
870 | ||
9f95a23c | 871 | void RGWHTTPManager::finish_request(rgw_http_req_data *req_data, int ret, long http_status) |
7c673cae | 872 | { |
9f95a23c | 873 | req_data->finish(ret, http_status); |
7c673cae FG |
874 | complete_request(req_data); |
875 | } | |
876 | ||
877 | void RGWHTTPManager::_finish_request(rgw_http_req_data *req_data, int ret) | |
878 | { | |
879 | req_data->finish(ret); | |
880 | _complete_request(req_data); | |
881 | } | |
882 | ||
11fdf7f2 TL |
883 | void RGWHTTPManager::_set_req_state(set_state& ss) |
884 | { | |
885 | ss.req->set_state(ss.bitmask); | |
886 | } | |
7c673cae FG |
887 | /* |
888 | * hook request to the curl multi handle | |
889 | */ | |
890 | int RGWHTTPManager::link_request(rgw_http_req_data *req_data) | |
891 | { | |
11fdf7f2 TL |
892 | ldout(cct, 20) << __func__ << " req_data=" << req_data << " req_data->id=" << req_data->id << ", curl_handle=" << req_data->curl_handle << dendl; |
893 | CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle()); | |
7c673cae FG |
894 | if (mstatus) { |
895 | dout(0) << "ERROR: failed on curl_multi_add_handle, status=" << mstatus << dendl; | |
896 | return -EIO; | |
897 | } | |
898 | return 0; | |
899 | } | |
900 | ||
901 | /* | |
902 | * unhook request from the curl multi handle, and finish request if it wasn't finished yet as | |
903 | * there will be no more processing on this request | |
904 | */ | |
905 | void RGWHTTPManager::_unlink_request(rgw_http_req_data *req_data) | |
906 | { | |
11fdf7f2 TL |
907 | if (req_data->curl_handle) { |
908 | curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle()); | |
7c673cae | 909 | } |
eafe8130 | 910 | if (!req_data->is_done()) { |
7c673cae FG |
911 | _finish_request(req_data, -ECANCELED); |
912 | } | |
913 | } | |
914 | ||
915 | void RGWHTTPManager::unlink_request(rgw_http_req_data *req_data) | |
916 | { | |
9f95a23c | 917 | std::unique_lock wl{reqs_lock}; |
7c673cae FG |
918 | _unlink_request(req_data); |
919 | } | |
920 | ||
921 | void RGWHTTPManager::manage_pending_requests() | |
922 | { | |
9f95a23c | 923 | reqs_lock.lock_shared(); |
11fdf7f2 TL |
924 | if (max_threaded_req == num_reqs && |
925 | unregistered_reqs.empty() && | |
926 | reqs_change_state.empty()) { | |
9f95a23c | 927 | reqs_lock.unlock_shared(); |
7c673cae FG |
928 | return; |
929 | } | |
9f95a23c | 930 | reqs_lock.unlock_shared(); |
7c673cae | 931 | |
9f95a23c | 932 | std::unique_lock wl{reqs_lock}; |
7c673cae | 933 | |
f6b5b4d7 TL |
934 | if (!reqs_change_state.empty()) { |
935 | for (auto siter : reqs_change_state) { | |
936 | _set_req_state(siter); | |
937 | } | |
938 | reqs_change_state.clear(); | |
939 | } | |
940 | ||
7c673cae FG |
941 | if (!unregistered_reqs.empty()) { |
942 | for (auto& r : unregistered_reqs) { | |
943 | _unlink_request(r); | |
944 | r->put(); | |
945 | } | |
946 | ||
947 | unregistered_reqs.clear(); | |
948 | } | |
949 | ||
950 | map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req); | |
951 | ||
952 | list<std::pair<rgw_http_req_data *, int> > remove_reqs; | |
953 | ||
954 | for (; iter != reqs.end(); ++iter) { | |
955 | rgw_http_req_data *req_data = iter->second; | |
956 | int r = link_request(req_data); | |
957 | if (r < 0) { | |
958 | ldout(cct, 0) << "ERROR: failed to link http request" << dendl; | |
959 | remove_reqs.push_back(std::make_pair(iter->second, r)); | |
960 | } else { | |
961 | max_threaded_req = iter->first + 1; | |
962 | } | |
963 | } | |
964 | ||
965 | for (auto piter : remove_reqs) { | |
966 | rgw_http_req_data *req_data = piter.first; | |
967 | int r = piter.second; | |
968 | ||
969 | _finish_request(req_data, r); | |
970 | } | |
971 | } | |
972 | ||
11fdf7f2 | 973 | int RGWHTTPManager::add_request(RGWHTTPClient *client) |
7c673cae FG |
974 | { |
975 | rgw_http_req_data *req_data = new rgw_http_req_data; | |
976 | ||
11fdf7f2 | 977 | int ret = client->init_request(req_data); |
7c673cae FG |
978 | if (ret < 0) { |
979 | req_data->put(); | |
980 | req_data = NULL; | |
981 | return ret; | |
982 | } | |
983 | ||
984 | req_data->mgr = this; | |
985 | req_data->client = client; | |
11fdf7f2 TL |
986 | req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL); |
987 | req_data->user_info = client->get_io_user_info(); | |
7c673cae FG |
988 | |
989 | register_request(req_data); | |
990 | ||
11fdf7f2 | 991 | if (!is_started) { |
7c673cae FG |
992 | ret = link_request(req_data); |
993 | if (ret < 0) { | |
994 | req_data->put(); | |
995 | req_data = NULL; | |
996 | } | |
997 | return ret; | |
998 | } | |
999 | ret = signal_thread(); | |
1000 | if (ret < 0) { | |
1001 | finish_request(req_data, ret); | |
1002 | } | |
1003 | ||
1004 | return ret; | |
1005 | } | |
1006 | ||
1007 | int RGWHTTPManager::remove_request(RGWHTTPClient *client) | |
1008 | { | |
1009 | rgw_http_req_data *req_data = client->get_req_data(); | |
1010 | ||
11fdf7f2 | 1011 | if (!is_started) { |
7c673cae FG |
1012 | unlink_request(req_data); |
1013 | return 0; | |
1014 | } | |
11fdf7f2 TL |
1015 | if (!unregister_request(req_data)) { |
1016 | return 0; | |
1017 | } | |
7c673cae FG |
1018 | int ret = signal_thread(); |
1019 | if (ret < 0) { | |
1020 | return ret; | |
1021 | } | |
1022 | ||
1023 | return 0; | |
1024 | } | |
1025 | ||
11fdf7f2 | 1026 | int RGWHTTPManager::set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state) |
7c673cae | 1027 | { |
11fdf7f2 | 1028 | rgw_http_req_data *req_data = client->get_req_data(); |
7c673cae | 1029 | |
9f95a23c | 1030 | ceph_assert(ceph_mutex_is_locked(req_data->lock)); |
7c673cae | 1031 | |
11fdf7f2 TL |
1032 | /* can only do that if threaded */ |
1033 | if (!is_started) { | |
1034 | return -EINVAL; | |
1035 | } | |
7c673cae | 1036 | |
11fdf7f2 TL |
1037 | bool suggested_wr_paused = req_data->write_paused; |
1038 | bool suggested_rd_paused = req_data->read_paused; | |
1039 | ||
1040 | switch (state) { | |
1041 | case SET_WRITE_PAUSED: | |
1042 | suggested_wr_paused = true; | |
1043 | break; | |
1044 | case SET_WRITE_RESUME: | |
1045 | suggested_wr_paused = false; | |
1046 | break; | |
1047 | case SET_READ_PAUSED: | |
1048 | suggested_rd_paused = true; | |
1049 | break; | |
1050 | case SET_READ_RESUME: | |
1051 | suggested_rd_paused = false; | |
1052 | break; | |
1053 | default: | |
1054 | /* shouldn't really be here */ | |
1055 | return -EIO; | |
1056 | } | |
1057 | if (suggested_wr_paused == req_data->write_paused && | |
1058 | suggested_rd_paused == req_data->read_paused) { | |
1059 | return 0; | |
1060 | } | |
7c673cae | 1061 | |
11fdf7f2 TL |
1062 | req_data->write_paused = suggested_wr_paused; |
1063 | req_data->read_paused = suggested_rd_paused; | |
7c673cae | 1064 | |
11fdf7f2 | 1065 | int bitmask = CURLPAUSE_CONT; |
7c673cae | 1066 | |
11fdf7f2 TL |
1067 | if (req_data->write_paused) { |
1068 | bitmask |= CURLPAUSE_SEND; | |
1069 | } | |
7c673cae | 1070 | |
11fdf7f2 TL |
1071 | if (req_data->read_paused) { |
1072 | bitmask |= CURLPAUSE_RECV; | |
1073 | } | |
7c673cae | 1074 | |
11fdf7f2 TL |
1075 | reqs_change_state.push_back(set_state(req_data, bitmask)); |
1076 | int ret = signal_thread(); | |
1077 | if (ret < 0) { | |
1078 | return ret; | |
1079 | } | |
7c673cae | 1080 | |
11fdf7f2 | 1081 | return 0; |
7c673cae FG |
1082 | } |
1083 | ||
11fdf7f2 | 1084 | int RGWHTTPManager::start() |
7c673cae | 1085 | { |
9f95a23c | 1086 | if (pipe_cloexec(thread_pipe, 0) < 0) { |
91327a77 AA |
1087 | int e = errno; |
1088 | ldout(cct, 0) << "ERROR: pipe(): " << cpp_strerror(e) << dendl; | |
1089 | return -e; | |
7c673cae FG |
1090 | } |
1091 | ||
1092 | // enable non-blocking reads | |
91327a77 AA |
1093 | if (::fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK) < 0) { |
1094 | int e = errno; | |
1095 | ldout(cct, 0) << "ERROR: fcntl(): " << cpp_strerror(e) << dendl; | |
7c673cae FG |
1096 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); |
1097 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
91327a77 | 1098 | return -e; |
7c673cae FG |
1099 | } |
1100 | ||
1101 | #ifdef HAVE_CURL_MULTI_WAIT | |
1102 | // on first initialization, use this pipe to detect whether we're using a | |
1103 | // buggy version of libcurl | |
1104 | std::call_once(detect_flag, detect_curl_multi_wait_bug, cct, | |
1105 | static_cast<CURLM*>(multi_handle), | |
1106 | thread_pipe[1], thread_pipe[0]); | |
1107 | #endif | |
1108 | ||
11fdf7f2 | 1109 | is_started = true; |
7c673cae FG |
1110 | reqs_thread = new ReqsThread(this); |
1111 | reqs_thread->create("http_manager"); | |
1112 | return 0; | |
1113 | } | |
1114 | ||
1115 | void RGWHTTPManager::stop() | |
1116 | { | |
1117 | if (is_stopped) { | |
1118 | return; | |
1119 | } | |
1120 | ||
1121 | is_stopped = true; | |
1122 | ||
11fdf7f2 | 1123 | if (is_started) { |
7c673cae FG |
1124 | going_down = true; |
1125 | signal_thread(); | |
1126 | reqs_thread->join(); | |
1127 | delete reqs_thread; | |
1128 | TEMP_FAILURE_RETRY(::close(thread_pipe[1])); | |
1129 | TEMP_FAILURE_RETRY(::close(thread_pipe[0])); | |
1130 | } | |
1131 | } | |
1132 | ||
1133 | int RGWHTTPManager::signal_thread() | |
1134 | { | |
1135 | uint32_t buf = 0; | |
1136 | int ret = write(thread_pipe[1], (void *)&buf, sizeof(buf)); | |
1137 | if (ret < 0) { | |
1138 | ret = -errno; | |
1139 | ldout(cct, 0) << "ERROR: " << __func__ << ": write() returned ret=" << ret << dendl; | |
1140 | return ret; | |
1141 | } | |
1142 | return 0; | |
1143 | } | |
1144 | ||
1145 | void *RGWHTTPManager::reqs_thread_entry() | |
1146 | { | |
1147 | int still_running; | |
1148 | int mstatus; | |
1149 | ||
1150 | ldout(cct, 20) << __func__ << ": start" << dendl; | |
1151 | ||
1152 | while (!going_down) { | |
1153 | int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]); | |
1154 | if (ret < 0) { | |
1155 | dout(0) << "ERROR: do_curl_wait() returned: " << ret << dendl; | |
1156 | return NULL; | |
1157 | } | |
1158 | ||
1159 | manage_pending_requests(); | |
1160 | ||
1161 | mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running); | |
1162 | switch (mstatus) { | |
1163 | case CURLM_OK: | |
1164 | case CURLM_CALL_MULTI_PERFORM: | |
1165 | break; | |
1166 | default: | |
1167 | dout(10) << "curl_multi_perform returned: " << mstatus << dendl; | |
1168 | break; | |
1169 | } | |
1170 | int msgs_left; | |
1171 | CURLMsg *msg; | |
1172 | while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) { | |
1173 | if (msg->msg == CURLMSG_DONE) { | |
1174 | int result = msg->data.result; | |
1175 | CURL *e = msg->easy_handle; | |
1176 | rgw_http_req_data *req_data; | |
1177 | curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); | |
1178 | curl_multi_remove_handle((CURLM *)multi_handle, e); | |
1179 | ||
1180 | long http_status; | |
9f95a23c TL |
1181 | int status; |
1182 | if (!req_data->user_ret) { | |
1183 | curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); | |
1184 | ||
1185 | status = rgw_http_error_to_errno(http_status); | |
1186 | if (result != CURLE_OK && status == 0) { | |
1187 | dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl; | |
1188 | status = -EAGAIN; | |
1189 | } | |
1190 | } else { | |
1191 | status = *req_data->user_ret; | |
1192 | rgw_err err; | |
1193 | set_req_state_err(err, status, 0); | |
1194 | http_status = err.http_ret; | |
7c673cae FG |
1195 | } |
1196 | int id = req_data->id; | |
9f95a23c | 1197 | finish_request(req_data, status, http_status); |
7c673cae FG |
1198 | switch (result) { |
1199 | case CURLE_OK: | |
1200 | break; | |
1adf2230 AA |
1201 | case CURLE_OPERATION_TIMEDOUT: |
1202 | dout(0) << "WARNING: curl operation timed out, network average transfer speed less than " | |
1203 | << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl; | |
7c673cae FG |
1204 | default: |
1205 | dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl; | |
11fdf7f2 | 1206 | dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl; |
7c673cae FG |
1207 | break; |
1208 | } | |
1209 | } | |
1210 | } | |
1211 | } | |
1212 | ||
1213 | ||
9f95a23c | 1214 | std::unique_lock rl{reqs_lock}; |
7c673cae | 1215 | for (auto r : unregistered_reqs) { |
91327a77 | 1216 | _unlink_request(r); |
7c673cae FG |
1217 | } |
1218 | ||
1219 | unregistered_reqs.clear(); | |
1220 | ||
1221 | auto all_reqs = std::move(reqs); | |
1222 | for (auto iter : all_reqs) { | |
91327a77 | 1223 | _unlink_request(iter.second); |
7c673cae FG |
1224 | } |
1225 | ||
1226 | reqs.clear(); | |
1227 | ||
1228 | if (completion_mgr) { | |
1229 | completion_mgr->go_down(); | |
1230 | } | |
1231 | ||
1232 | return 0; | |
1233 | } | |
1234 | ||
11fdf7f2 TL |
1235 | void rgw_http_client_init(CephContext *cct) |
1236 | { | |
1237 | curl_global_init(CURL_GLOBAL_ALL); | |
1238 | rgw_http_manager = new RGWHTTPManager(cct); | |
1239 | rgw_http_manager->start(); | |
1240 | } | |
1241 | ||
1242 | void rgw_http_client_cleanup() | |
1243 | { | |
1244 | rgw_http_manager->stop(); | |
1245 | delete rgw_http_manager; | |
1246 | curl_global_cleanup(); | |
1247 | } | |
1248 | ||
1249 | ||
1250 | int RGWHTTP::send(RGWHTTPClient *req) { | |
1251 | if (!req) { | |
1252 | return 0; | |
1253 | } | |
1254 | int r = rgw_http_manager->add_request(req); | |
1255 | if (r < 0) { | |
1256 | return r; | |
1257 | } | |
1258 | ||
1259 | return 0; | |
1260 | } | |
1261 | ||
eafe8130 | 1262 | int RGWHTTP::process(RGWHTTPClient *req, optional_yield y) { |
11fdf7f2 TL |
1263 | if (!req) { |
1264 | return 0; | |
1265 | } | |
1266 | int r = send(req); | |
1267 | if (r < 0) { | |
1268 | return r; | |
1269 | } | |
1270 | ||
eafe8130 | 1271 | return req->wait(y); |
11fdf7f2 | 1272 | } |
7c673cae | 1273 |