]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/migration/HttpClient.h" | |
5 | #include "common/dout.h" | |
6 | #include "common/errno.h" | |
7 | #include "librbd/AsioEngine.h" | |
8 | #include "librbd/ImageCtx.h" | |
9 | #include "librbd/Utils.h" | |
10 | #include "librbd/asio/Utils.h" | |
11 | #include "librbd/io/AioCompletion.h" | |
12 | #include "librbd/io/ReadResult.h" | |
13 | #include "librbd/migration/Utils.h" | |
14 | #include <boost/asio/buffer.hpp> | |
15 | #include <boost/asio/post.hpp> | |
16 | #include <boost/asio/ip/tcp.hpp> | |
17 | #include <boost/asio/read.hpp> | |
18 | #include <boost/asio/ssl.hpp> | |
19 | #include <boost/beast/core.hpp> | |
20 | #include <boost/beast/http/read.hpp> | |
21 | #include <boost/lexical_cast.hpp> | |
22 | #include <deque> | |
23 | ||
24 | namespace librbd { | |
25 | namespace migration { | |
26 | ||
27 | #define dout_subsys ceph_subsys_rbd | |
28 | #undef dout_prefix | |
29 | #define dout_prefix *_dout << "librbd::migration::HttpClient::" \ | |
30 | << "HttpSession " << this << " " << __func__ \ | |
31 | << ": " | |
32 | ||
33 | /** | |
34 | * boost::beast utilizes non-inheriting template classes for handling plain vs | |
35 | * encrypted TCP streams. Utilize a base-class for handling the majority of the | |
36 | * logic for handling connecting, disconnecting, reseting, and sending requests. | |
37 | */ | |
38 | ||
39 | template <typename I> | |
40 | template <typename D> | |
41 | class HttpClient<I>::HttpSession : public HttpSessionInterface { | |
42 | public: | |
43 | void init(Context* on_finish) override { | |
44 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
45 | ||
46 | auto cct = m_http_client->m_cct; | |
47 | ldout(cct, 15) << dendl; | |
48 | ||
49 | ceph_assert(m_state == STATE_UNINITIALIZED); | |
50 | m_state = STATE_CONNECTING; | |
51 | ||
52 | resolve_host(on_finish); | |
53 | } | |
54 | ||
55 | void shut_down(Context* on_finish) override { | |
56 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
57 | ||
58 | auto cct = m_http_client->m_cct; | |
59 | ldout(cct, 15) << dendl; | |
60 | ||
61 | ceph_assert(on_finish != nullptr); | |
62 | ceph_assert(m_on_shutdown == nullptr); | |
63 | m_on_shutdown = on_finish; | |
64 | ||
65 | auto current_state = m_state; | |
66 | if (current_state == STATE_UNINITIALIZED) { | |
67 | // never initialized or resolve/connect failed | |
68 | on_finish->complete(0); | |
69 | return; | |
70 | } | |
71 | ||
72 | m_state = STATE_SHUTTING_DOWN; | |
73 | if (current_state != STATE_READY) { | |
74 | // delay shutdown until current state transition completes | |
75 | return; | |
76 | } | |
77 | ||
78 | disconnect(new LambdaContext([this](int r) { handle_shut_down(r); })); | |
79 | } | |
80 | ||
81 | void issue(std::shared_ptr<Work>&& work) override { | |
82 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
83 | ||
84 | auto cct = m_http_client->m_cct; | |
85 | ldout(cct, 20) << "work=" << work.get() << dendl; | |
86 | ||
87 | if (is_shutdown()) { | |
88 | lderr(cct) << "cannot issue HTTP request, client is shutdown" | |
89 | << dendl; | |
90 | work->complete(-ESHUTDOWN, {}); | |
91 | return; | |
92 | } | |
93 | ||
94 | bool first_issue = m_issue_queue.empty(); | |
95 | m_issue_queue.emplace_back(work); | |
96 | if (m_state == STATE_READY && first_issue) { | |
97 | ldout(cct, 20) << "sending http request: work=" << work.get() << dendl; | |
98 | finalize_issue(std::move(work)); | |
99 | } else if (m_state == STATE_UNINITIALIZED) { | |
100 | ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl; | |
101 | m_state = STATE_RESET_CONNECTING; | |
102 | resolve_host(nullptr); | |
103 | } else { | |
104 | ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl; | |
105 | } | |
106 | } | |
107 | ||
108 | void finalize_issue(std::shared_ptr<Work>&& work) { | |
109 | auto cct = m_http_client->m_cct; | |
110 | ldout(cct, 20) << "work=" << work.get() << dendl; | |
111 | ||
112 | ++m_in_flight_requests; | |
113 | (*work)(derived().stream()); | |
114 | } | |
115 | ||
116 | void handle_issue(boost::system::error_code ec, | |
117 | std::shared_ptr<Work>&& work) override { | |
118 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
119 | ||
120 | auto cct = m_http_client->m_cct; | |
121 | ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl; | |
122 | ||
123 | ceph_assert(m_in_flight_requests > 0); | |
124 | --m_in_flight_requests; | |
125 | if (maybe_finalize_reset()) { | |
126 | // previous request is attempting reset to this request will be resent | |
127 | return; | |
128 | } | |
129 | ||
130 | ceph_assert(!m_issue_queue.empty()); | |
131 | m_issue_queue.pop_front(); | |
132 | ||
133 | if (is_shutdown()) { | |
134 | lderr(cct) << "client shutdown during in-flight request" << dendl; | |
135 | work->complete(-ESHUTDOWN, {}); | |
136 | ||
137 | maybe_finalize_shutdown(); | |
138 | return; | |
139 | } | |
140 | ||
141 | if (ec) { | |
142 | if (ec == boost::asio::error::bad_descriptor || | |
143 | ec == boost::asio::error::broken_pipe || | |
144 | ec == boost::asio::error::connection_reset || | |
145 | ec == boost::asio::error::operation_aborted || | |
146 | ec == boost::asio::ssl::error::stream_truncated || | |
147 | ec == boost::beast::http::error::end_of_stream || | |
148 | ec == boost::beast::http::error::partial_message) { | |
149 | ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl; | |
150 | m_issue_queue.push_front(work); | |
151 | } else if (ec == boost::beast::error::timeout) { | |
152 | lderr(cct) << "timed-out while issuing request" << dendl; | |
153 | work->complete(-ETIMEDOUT, {}); | |
154 | } else { | |
155 | lderr(cct) << "failed to issue request: " << ec.message() << dendl; | |
156 | work->complete(-ec.value(), {}); | |
157 | } | |
158 | ||
159 | // attempt to recover the connection | |
160 | reset(); | |
161 | return; | |
162 | } | |
163 | ||
164 | bool first_receive = m_receive_queue.empty(); | |
165 | m_receive_queue.push_back(work); | |
166 | if (first_receive) { | |
167 | receive(std::move(work)); | |
168 | } | |
169 | ||
170 | // TODO disable pipelining for non-idempotent requests | |
171 | ||
172 | // pipeline the next request into the stream | |
173 | if (!m_issue_queue.empty()) { | |
174 | work = m_issue_queue.front(); | |
175 | ldout(cct, 20) << "sending http request: work=" << work.get() << dendl; | |
176 | finalize_issue(std::move(work)); | |
177 | } | |
178 | } | |
179 | ||
180 | protected: | |
181 | HttpClient* m_http_client; | |
182 | ||
183 | HttpSession(HttpClient* http_client) | |
184 | : m_http_client(http_client), m_resolver(http_client->m_strand) { | |
185 | } | |
186 | ||
187 | virtual void connect(boost::asio::ip::tcp::resolver::results_type results, | |
188 | Context* on_finish) = 0; | |
189 | virtual void disconnect(Context* on_finish) = 0; | |
190 | ||
191 | void close_socket() { | |
192 | auto cct = m_http_client->m_cct; | |
193 | ldout(cct, 15) << dendl; | |
194 | ||
195 | boost::system::error_code ec; | |
196 | boost::beast::get_lowest_layer(derived().stream()).socket().close(ec); | |
197 | } | |
198 | ||
199 | private: | |
200 | enum State { | |
201 | STATE_UNINITIALIZED, | |
202 | STATE_CONNECTING, | |
203 | STATE_READY, | |
204 | STATE_RESET_PENDING, | |
205 | STATE_RESET_DISCONNECTING, | |
206 | STATE_RESET_CONNECTING, | |
207 | STATE_SHUTTING_DOWN, | |
208 | STATE_SHUTDOWN, | |
209 | }; | |
210 | ||
211 | State m_state = STATE_UNINITIALIZED; | |
212 | boost::asio::ip::tcp::resolver m_resolver; | |
213 | ||
214 | Context* m_on_shutdown = nullptr; | |
215 | ||
216 | uint64_t m_in_flight_requests = 0; | |
217 | std::deque<std::shared_ptr<Work>> m_issue_queue; | |
218 | std::deque<std::shared_ptr<Work>> m_receive_queue; | |
219 | ||
220 | boost::beast::flat_buffer m_buffer; | |
221 | std::optional<boost::beast::http::parser<false, EmptyBody>> m_header_parser; | |
222 | std::optional<boost::beast::http::parser<false, StringBody>> m_parser; | |
223 | ||
224 | D& derived() { | |
225 | return static_cast<D&>(*this); | |
226 | } | |
227 | ||
228 | void resolve_host(Context* on_finish) { | |
229 | auto cct = m_http_client->m_cct; | |
230 | ldout(cct, 15) << dendl; | |
231 | ||
232 | shutdown_socket(); | |
233 | m_resolver.async_resolve( | |
234 | m_http_client->m_url_spec.host, m_http_client->m_url_spec.port, | |
235 | [this, on_finish](boost::system::error_code ec, auto results) { | |
236 | handle_resolve_host(ec, results, on_finish); }); | |
237 | } | |
238 | ||
239 | void handle_resolve_host( | |
240 | boost::system::error_code ec, | |
241 | boost::asio::ip::tcp::resolver::results_type results, | |
242 | Context* on_finish) { | |
243 | auto cct = m_http_client->m_cct; | |
244 | int r = -ec.value(); | |
245 | ldout(cct, 15) << "r=" << r << dendl; | |
246 | ||
247 | if (ec) { | |
248 | if (ec == boost::asio::error::host_not_found) { | |
249 | r = -ENOENT; | |
250 | } else if (ec == boost::asio::error::host_not_found_try_again) { | |
251 | // TODO: add retry throttle | |
252 | r = -EAGAIN; | |
253 | } | |
254 | ||
255 | lderr(cct) << "failed to resolve host '" | |
256 | << m_http_client->m_url_spec.host << "': " | |
257 | << cpp_strerror(r) << dendl; | |
258 | advance_state(STATE_UNINITIALIZED, r, on_finish); | |
259 | return; | |
260 | } | |
261 | ||
262 | connect(results, new LambdaContext([this, on_finish](int r) { | |
263 | handle_connect(r, on_finish); })); | |
264 | } | |
265 | ||
266 | void handle_connect(int r, Context* on_finish) { | |
267 | auto cct = m_http_client->m_cct; | |
268 | ldout(cct, 15) << "r=" << r << dendl; | |
269 | ||
270 | if (r < 0) { | |
271 | lderr(cct) << "failed to connect to host '" | |
272 | << m_http_client->m_url_spec.host << "': " | |
273 | << cpp_strerror(r) << dendl; | |
274 | advance_state(STATE_UNINITIALIZED, r, on_finish); | |
275 | return; | |
276 | } | |
277 | ||
278 | advance_state(STATE_READY, 0, on_finish); | |
279 | } | |
280 | ||
281 | void handle_shut_down(int r) { | |
282 | auto cct = m_http_client->m_cct; | |
283 | ldout(cct, 15) << "r=" << r << dendl; | |
284 | ||
285 | if (r < 0) { | |
286 | lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r) | |
287 | << dendl; | |
288 | } | |
289 | ||
290 | // cancel all in-flight send/receives (if any) | |
291 | shutdown_socket(); | |
292 | ||
293 | maybe_finalize_shutdown(); | |
294 | } | |
295 | ||
296 | void maybe_finalize_shutdown() { | |
297 | if (m_in_flight_requests > 0) { | |
298 | return; | |
299 | } | |
300 | ||
301 | // cancel any queued IOs | |
302 | fail_queued_work(-ESHUTDOWN); | |
303 | ||
304 | advance_state(STATE_SHUTDOWN, 0, nullptr); | |
305 | } | |
306 | ||
307 | bool is_shutdown() const { | |
308 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
309 | return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN); | |
310 | } | |
311 | ||
312 | void reset() { | |
313 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
314 | ceph_assert(m_state == STATE_READY); | |
315 | ||
316 | auto cct = m_http_client->m_cct; | |
317 | ldout(cct, 15) << dendl; | |
318 | ||
319 | m_state = STATE_RESET_PENDING; | |
320 | maybe_finalize_reset(); | |
321 | } | |
322 | ||
323 | bool maybe_finalize_reset() { | |
324 | if (m_state != STATE_RESET_PENDING) { | |
325 | return false; | |
326 | } | |
327 | ||
328 | if (m_in_flight_requests > 0) { | |
329 | return true; | |
330 | } | |
331 | ||
332 | ceph_assert(m_http_client->m_strand.running_in_this_thread()); | |
333 | auto cct = m_http_client->m_cct; | |
334 | ldout(cct, 15) << dendl; | |
335 | ||
336 | m_buffer.clear(); | |
337 | ||
338 | // move in-flight request back to the front of the issue queue | |
339 | m_issue_queue.insert(m_issue_queue.begin(), | |
340 | m_receive_queue.begin(), m_receive_queue.end()); | |
341 | m_receive_queue.clear(); | |
342 | ||
343 | m_state = STATE_RESET_DISCONNECTING; | |
344 | disconnect(new LambdaContext([this](int r) { handle_reset(r); })); | |
345 | return true; | |
346 | } | |
347 | ||
348 | void handle_reset(int r) { | |
349 | auto cct = m_http_client->m_cct; | |
350 | ldout(cct, 15) << "r=" << r << dendl; | |
351 | ||
352 | if (r < 0) { | |
353 | lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r) | |
354 | << dendl; | |
355 | } | |
356 | ||
357 | advance_state(STATE_RESET_CONNECTING, r, nullptr); | |
358 | } | |
359 | ||
360 | int shutdown_socket() { | |
361 | if (!boost::beast::get_lowest_layer( | |
362 | derived().stream()).socket().is_open()) { | |
363 | return 0; | |
364 | } | |
365 | ||
366 | auto cct = m_http_client->m_cct; | |
367 | ldout(cct, 15) << dendl; | |
368 | ||
369 | boost::system::error_code ec; | |
370 | boost::beast::get_lowest_layer(derived().stream()).socket().shutdown( | |
371 | boost::asio::ip::tcp::socket::shutdown_both, ec); | |
372 | ||
373 | if (ec && ec != boost::beast::errc::not_connected) { | |
374 | lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl; | |
375 | return -ec.value(); | |
376 | } | |
377 | ||
378 | close_socket(); | |
379 | return 0; | |
380 | } | |
381 | ||
382 | void receive(std::shared_ptr<Work>&& work) { | |
383 | auto cct = m_http_client->m_cct; | |
384 | ldout(cct, 15) << "work=" << work.get() << dendl; | |
385 | ||
386 | ceph_assert(!m_receive_queue.empty()); | |
387 | ++m_in_flight_requests; | |
388 | ||
389 | // receive the response for this request | |
390 | m_parser.emplace(); | |
391 | if (work->header_only()) { | |
392 | // HEAD requests don't trasfer data but the parser still cares about max | |
393 | // content-length | |
394 | m_header_parser.emplace(); | |
395 | m_header_parser->body_limit(std::numeric_limits<uint64_t>::max()); | |
396 | ||
397 | boost::beast::http::async_read_header( | |
398 | derived().stream(), m_buffer, *m_header_parser, | |
399 | [this, work=std::move(work)] | |
400 | (boost::beast::error_code ec, std::size_t) mutable { | |
401 | handle_receive(ec, std::move(work)); | |
402 | }); | |
403 | } else { | |
404 | m_parser->body_limit(1 << 25); // max RBD object size | |
405 | boost::beast::http::async_read( | |
406 | derived().stream(), m_buffer, *m_parser, | |
407 | [this, work=std::move(work)] | |
408 | (boost::beast::error_code ec, std::size_t) mutable { | |
409 | handle_receive(ec, std::move(work)); | |
410 | }); | |
411 | } | |
412 | } | |
413 | ||
414 | void handle_receive(boost::system::error_code ec, | |
415 | std::shared_ptr<Work>&& work) { | |
416 | auto cct = m_http_client->m_cct; | |
417 | ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl; | |
418 | ||
419 | ceph_assert(m_in_flight_requests > 0); | |
420 | --m_in_flight_requests; | |
421 | if (maybe_finalize_reset()) { | |
422 | // previous request is attempting reset to this request will be resent | |
423 | return; | |
424 | } | |
425 | ||
426 | ceph_assert(!m_receive_queue.empty()); | |
427 | m_receive_queue.pop_front(); | |
428 | ||
429 | if (is_shutdown()) { | |
430 | lderr(cct) << "client shutdown with in-flight request" << dendl; | |
431 | work->complete(-ESHUTDOWN, {}); | |
432 | ||
433 | maybe_finalize_shutdown(); | |
434 | return; | |
435 | } | |
436 | ||
437 | if (ec) { | |
438 | if (ec == boost::asio::error::bad_descriptor || | |
439 | ec == boost::asio::error::broken_pipe || | |
440 | ec == boost::asio::error::connection_reset || | |
441 | ec == boost::asio::error::operation_aborted || | |
442 | ec == boost::asio::ssl::error::stream_truncated || | |
443 | ec == boost::beast::http::error::end_of_stream || | |
444 | ec == boost::beast::http::error::partial_message) { | |
445 | ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl; | |
446 | m_receive_queue.push_front(work); | |
447 | } else if (ec == boost::beast::error::timeout) { | |
448 | lderr(cct) << "timed-out while issuing request" << dendl; | |
449 | work->complete(-ETIMEDOUT, {}); | |
450 | } else { | |
451 | lderr(cct) << "failed to issue request: " << ec.message() << dendl; | |
452 | work->complete(-ec.value(), {}); | |
453 | } | |
454 | ||
455 | reset(); | |
456 | return; | |
457 | } | |
458 | ||
459 | Response response; | |
460 | if (work->header_only()) { | |
461 | m_parser.emplace(std::move(*m_header_parser)); | |
462 | } | |
463 | response = m_parser->release(); | |
464 | ||
465 | // basic response code handling in a common location | |
466 | int r = 0; | |
467 | auto result = response.result(); | |
468 | if (result == boost::beast::http::status::not_found) { | |
469 | lderr(cct) << "requested resource does not exist" << dendl; | |
470 | r = -ENOENT; | |
471 | } else if (result == boost::beast::http::status::forbidden) { | |
472 | lderr(cct) << "permission denied attempting to access resource" << dendl; | |
473 | r = -EACCES; | |
474 | } else if (boost::beast::http::to_status_class(result) != | |
475 | boost::beast::http::status_class::successful) { | |
476 | lderr(cct) << "failed to retrieve size: HTTP " << result << dendl; | |
477 | r = -EIO; | |
478 | } | |
479 | ||
480 | bool need_eof = response.need_eof(); | |
481 | if (r < 0) { | |
482 | work->complete(r, {}); | |
483 | } else { | |
484 | work->complete(0, std::move(response)); | |
485 | } | |
486 | ||
487 | if (need_eof) { | |
488 | ldout(cct, 20) << "reset required for non-pipelined response: " | |
489 | << "work=" << work.get() << dendl; | |
490 | reset(); | |
491 | } else if (!m_receive_queue.empty()) { | |
492 | auto work = m_receive_queue.front(); | |
493 | receive(std::move(work)); | |
494 | } | |
495 | } | |
496 | ||
497 | void advance_state(State next_state, int r, Context* on_finish) { | |
498 | auto cct = m_http_client->m_cct; | |
499 | auto current_state = m_state; | |
500 | ldout(cct, 15) << "current_state=" << current_state << ", " | |
501 | << "next_state=" << next_state << ", " | |
502 | << "r=" << r << dendl; | |
503 | ||
504 | m_state = next_state; | |
505 | if (current_state == STATE_CONNECTING) { | |
506 | if (next_state == STATE_UNINITIALIZED) { | |
507 | shutdown_socket(); | |
508 | on_finish->complete(r); | |
509 | return; | |
510 | } else if (next_state == STATE_READY) { | |
511 | on_finish->complete(r); | |
512 | return; | |
513 | } | |
514 | } else if (current_state == STATE_SHUTTING_DOWN) { | |
515 | if (next_state == STATE_READY) { | |
516 | // shut down requested while connecting/resetting | |
517 | disconnect(new LambdaContext([this](int r) { handle_shut_down(r); })); | |
518 | return; | |
519 | } else if (next_state == STATE_UNINITIALIZED || | |
520 | next_state == STATE_SHUTDOWN || | |
521 | next_state == STATE_RESET_CONNECTING) { | |
522 | ceph_assert(m_on_shutdown != nullptr); | |
523 | m_on_shutdown->complete(r); | |
524 | return; | |
525 | } | |
526 | } else if (current_state == STATE_RESET_DISCONNECTING) { | |
527 | // disconnected from peer -- ignore errors and reconnect | |
528 | ceph_assert(next_state == STATE_RESET_CONNECTING); | |
529 | ceph_assert(on_finish == nullptr); | |
530 | shutdown_socket(); | |
531 | resolve_host(nullptr); | |
532 | return; | |
533 | } else if (current_state == STATE_RESET_CONNECTING) { | |
534 | ceph_assert(on_finish == nullptr); | |
535 | if (next_state == STATE_READY) { | |
536 | // restart queued IO | |
537 | if (!m_issue_queue.empty()) { | |
538 | auto& work = m_issue_queue.front(); | |
539 | finalize_issue(std::move(work)); | |
540 | } | |
541 | return; | |
542 | } else if (next_state == STATE_UNINITIALIZED) { | |
543 | shutdown_socket(); | |
544 | ||
545 | // fail all queued IO | |
546 | fail_queued_work(r); | |
547 | return; | |
548 | } | |
549 | } | |
550 | ||
551 | lderr(cct) << "unexpected state transition: " | |
552 | << "current_state=" << current_state << ", " | |
553 | << "next_state=" << next_state << dendl; | |
554 | ceph_assert(false); | |
555 | } | |
556 | ||
557 | void complete_work(std::shared_ptr<Work> work, int r, Response&& response) { | |
558 | auto cct = m_http_client->m_cct; | |
559 | ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl; | |
560 | ||
561 | work->complete(r, std::move(response)); | |
562 | } | |
563 | ||
564 | void fail_queued_work(int r) { | |
565 | auto cct = m_http_client->m_cct; | |
566 | ldout(cct, 10) << "r=" << r << dendl; | |
567 | ||
568 | for (auto& work : m_issue_queue) { | |
569 | complete_work(work, r, {}); | |
570 | } | |
571 | m_issue_queue.clear(); | |
572 | ceph_assert(m_receive_queue.empty()); | |
573 | } | |
574 | }; | |
575 | ||
576 | #undef dout_prefix | |
577 | #define dout_prefix *_dout << "librbd::migration::HttpClient::" \ | |
578 | << "PlainHttpSession " << this << " " << __func__ \ | |
579 | << ": " | |
580 | ||
581 | template <typename I> | |
582 | class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> { | |
583 | public: | |
584 | PlainHttpSession(HttpClient* http_client) | |
585 | : HttpSession<PlainHttpSession>(http_client), | |
586 | m_stream(http_client->m_strand) { | |
587 | } | |
588 | ~PlainHttpSession() override { | |
589 | this->close_socket(); | |
590 | } | |
591 | ||
592 | inline boost::beast::tcp_stream& | |
593 | stream() { | |
594 | return m_stream; | |
595 | } | |
596 | ||
597 | protected: | |
598 | void connect(boost::asio::ip::tcp::resolver::results_type results, | |
599 | Context* on_finish) override { | |
600 | auto http_client = this->m_http_client; | |
601 | auto cct = http_client->m_cct; | |
602 | ldout(cct, 15) << dendl; | |
603 | ||
604 | m_stream.async_connect( | |
605 | results, | |
1e59de90 TL |
606 | [on_finish](boost::system::error_code ec, const auto& endpoint) { |
607 | on_finish->complete(-ec.value()); | |
608 | }); | |
f67539c2 TL |
609 | } |
610 | ||
611 | void disconnect(Context* on_finish) override { | |
612 | on_finish->complete(0); | |
613 | } | |
614 | ||
615 | private: | |
616 | boost::beast::tcp_stream m_stream; | |
617 | ||
618 | }; | |
619 | ||
620 | #undef dout_prefix | |
621 | #define dout_prefix *_dout << "librbd::migration::HttpClient::" \ | |
622 | << "SslHttpSession " << this << " " << __func__ \ | |
623 | << ": " | |
624 | ||
625 | template <typename I> | |
626 | class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> { | |
627 | public: | |
628 | SslHttpSession(HttpClient* http_client) | |
629 | : HttpSession<SslHttpSession>(http_client), | |
630 | m_stream(http_client->m_strand, http_client->m_ssl_context) { | |
631 | } | |
632 | ~SslHttpSession() override { | |
633 | this->close_socket(); | |
634 | } | |
635 | ||
636 | inline boost::beast::ssl_stream<boost::beast::tcp_stream>& | |
637 | stream() { | |
638 | return m_stream; | |
639 | } | |
640 | ||
641 | protected: | |
642 | void connect(boost::asio::ip::tcp::resolver::results_type results, | |
643 | Context* on_finish) override { | |
644 | auto http_client = this->m_http_client; | |
645 | auto cct = http_client->m_cct; | |
646 | ldout(cct, 15) << dendl; | |
647 | ||
648 | boost::beast::get_lowest_layer(m_stream).async_connect( | |
649 | results, | |
1e59de90 TL |
650 | [this, on_finish](boost::system::error_code ec, const auto& endpoint) { |
651 | handle_connect(-ec.value(), on_finish); | |
652 | }); | |
f67539c2 TL |
653 | } |
654 | ||
655 | void disconnect(Context* on_finish) override { | |
656 | auto http_client = this->m_http_client; | |
657 | auto cct = http_client->m_cct; | |
658 | ldout(cct, 15) << dendl; | |
659 | ||
660 | if (!m_ssl_enabled) { | |
661 | on_finish->complete(0); | |
662 | return; | |
663 | } | |
664 | ||
665 | m_stream.async_shutdown( | |
666 | asio::util::get_callback_adapter([this, on_finish](int r) { | |
667 | shutdown(r, on_finish); })); | |
668 | } | |
669 | ||
670 | private: | |
671 | boost::beast::ssl_stream<boost::beast::tcp_stream> m_stream; | |
672 | bool m_ssl_enabled = false; | |
673 | ||
674 | void handle_connect(int r, Context* on_finish) { | |
675 | auto http_client = this->m_http_client; | |
676 | auto cct = http_client->m_cct; | |
677 | ldout(cct, 15) << dendl; | |
678 | ||
679 | if (r < 0) { | |
680 | lderr(cct) << "failed to connect to host '" | |
681 | << http_client->m_url_spec.host << "': " | |
682 | << cpp_strerror(r) << dendl; | |
683 | on_finish->complete(r); | |
684 | return; | |
685 | } | |
686 | ||
687 | handshake(on_finish); | |
688 | } | |
689 | ||
690 | void handshake(Context* on_finish) { | |
691 | auto http_client = this->m_http_client; | |
692 | auto cct = http_client->m_cct; | |
693 | ldout(cct, 15) << dendl; | |
694 | ||
695 | auto& host = http_client->m_url_spec.host; | |
696 | m_stream.set_verify_mode( | |
697 | boost::asio::ssl::verify_peer | | |
698 | boost::asio::ssl::verify_fail_if_no_peer_cert); | |
699 | m_stream.set_verify_callback( | |
700 | [host, next=boost::asio::ssl::host_name_verification(host), | |
701 | ignore_self_signed=http_client->m_ignore_self_signed_cert] | |
702 | (bool preverified, boost::asio::ssl::verify_context& ctx) { | |
703 | if (!preverified && ignore_self_signed) { | |
704 | auto ec = X509_STORE_CTX_get_error(ctx.native_handle()); | |
705 | switch (ec) { | |
706 | case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT: | |
707 | case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN: | |
708 | // ignore self-signed cert issues | |
709 | preverified = true; | |
710 | break; | |
711 | default: | |
712 | break; | |
713 | } | |
714 | } | |
715 | return next(preverified, ctx); | |
716 | }); | |
717 | ||
718 | // Set SNI Hostname (many hosts need this to handshake successfully) | |
719 | if(!SSL_set_tlsext_host_name(m_stream.native_handle(), | |
720 | http_client->m_url_spec.host.c_str())) { | |
721 | int r = -::ERR_get_error(); | |
722 | lderr(cct) << "failed to initialize SNI hostname: " << cpp_strerror(r) | |
723 | << dendl; | |
724 | on_finish->complete(r); | |
725 | return; | |
726 | } | |
727 | ||
728 | // Perform the SSL/TLS handshake | |
729 | m_stream.async_handshake( | |
730 | boost::asio::ssl::stream_base::client, | |
731 | asio::util::get_callback_adapter( | |
732 | [this, on_finish](int r) { handle_handshake(r, on_finish); })); | |
733 | } | |
734 | ||
735 | void handle_handshake(int r, Context* on_finish) { | |
736 | auto http_client = this->m_http_client; | |
737 | auto cct = http_client->m_cct; | |
738 | ldout(cct, 15) << "r=" << r << dendl; | |
739 | ||
740 | if (r < 0) { | |
741 | lderr(cct) << "failed to complete handshake: " << cpp_strerror(r) | |
742 | << dendl; | |
743 | disconnect(new LambdaContext([r, on_finish](int) { | |
744 | on_finish->complete(r); })); | |
745 | return; | |
746 | } | |
747 | ||
748 | m_ssl_enabled = true; | |
749 | on_finish->complete(0); | |
750 | } | |
751 | ||
752 | void shutdown(int r, Context* on_finish) { | |
753 | auto http_client = this->m_http_client; | |
754 | auto cct = http_client->m_cct; | |
755 | ldout(cct, 15) << "r=" << r << dendl; | |
756 | ||
757 | on_finish->complete(r); | |
758 | } | |
759 | }; | |
760 | ||
761 | #undef dout_prefix | |
762 | #define dout_prefix *_dout << "librbd::migration::HttpClient: " << this \ | |
763 | << " " << __func__ << ": " | |
764 | ||
765 | template <typename I> | |
766 | HttpClient<I>::HttpClient(I* image_ctx, const std::string& url) | |
767 | : m_cct(image_ctx->cct), m_image_ctx(image_ctx), | |
768 | m_asio_engine(image_ctx->asio_engine), m_url(url), | |
769 | m_strand(boost::asio::make_strand(*m_asio_engine)), | |
770 | m_ssl_context(boost::asio::ssl::context::sslv23_client) { | |
771 | m_ssl_context.set_default_verify_paths(); | |
772 | } | |
773 | ||
774 | template <typename I> | |
775 | void HttpClient<I>::open(Context* on_finish) { | |
776 | ldout(m_cct, 10) << "url=" << m_url << dendl; | |
777 | ||
778 | int r = util::parse_url(m_cct, m_url, &m_url_spec); | |
779 | if (r < 0) { | |
780 | lderr(m_cct) << "failed to parse url '" << m_url << "': " << cpp_strerror(r) | |
781 | << dendl; | |
782 | on_finish->complete(-EINVAL); | |
783 | return; | |
784 | } | |
785 | ||
786 | boost::asio::post(m_strand, [this, on_finish]() mutable { | |
787 | create_http_session(on_finish); }); | |
788 | } | |
789 | ||
790 | template <typename I> | |
791 | void HttpClient<I>::close(Context* on_finish) { | |
792 | boost::asio::post(m_strand, [this, on_finish]() mutable { | |
793 | shut_down_http_session(on_finish); }); | |
794 | } | |
795 | ||
796 | template <typename I> | |
797 | void HttpClient<I>::get_size(uint64_t* size, Context* on_finish) { | |
798 | ldout(m_cct, 10) << dendl; | |
799 | ||
800 | Request req; | |
801 | req.method(boost::beast::http::verb::head); | |
802 | ||
803 | issue( | |
804 | std::move(req), [this, size, on_finish](int r, Response&& response) { | |
805 | handle_get_size(r, std::move(response), size, on_finish); | |
806 | }); | |
807 | } | |
808 | ||
809 | template <typename I> | |
810 | void HttpClient<I>::handle_get_size(int r, Response&& response, uint64_t* size, | |
811 | Context* on_finish) { | |
812 | ldout(m_cct, 10) << "r=" << r << dendl; | |
813 | ||
814 | if (r < 0) { | |
815 | lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(r) << dendl; | |
816 | on_finish->complete(r); | |
817 | return; | |
818 | } else if (!response.has_content_length()) { | |
819 | lderr(m_cct) << "failed to retrieve size: missing content-length" << dendl; | |
820 | on_finish->complete(-EINVAL); | |
821 | return; | |
822 | } | |
823 | ||
824 | auto content_length = response[boost::beast::http::field::content_length]; | |
825 | try { | |
826 | *size = boost::lexical_cast<uint64_t>(content_length); | |
827 | } catch (boost::bad_lexical_cast&) { | |
828 | lderr(m_cct) << "invalid content-length in response" << dendl; | |
829 | on_finish->complete(-EBADMSG); | |
830 | return; | |
831 | } | |
832 | ||
833 | on_finish->complete(0); | |
834 | } | |
835 | ||
836 | template <typename I> | |
837 | void HttpClient<I>::read(io::Extents&& byte_extents, bufferlist* data, | |
838 | Context* on_finish) { | |
839 | ldout(m_cct, 20) << dendl; | |
840 | ||
841 | auto aio_comp = io::AioCompletion::create_and_start( | |
842 | on_finish, librbd::util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ); | |
843 | aio_comp->set_request_count(byte_extents.size()); | |
844 | ||
845 | // utilize ReadResult to assemble multiple byte extents into a single bl | |
846 | // since boost::beast doesn't support multipart responses out-of-the-box | |
847 | io::ReadResult read_result{data}; | |
848 | aio_comp->read_result = std::move(read_result); | |
849 | aio_comp->read_result.set_image_extents(byte_extents); | |
850 | ||
851 | // issue a range get request for each extent | |
852 | uint64_t buffer_offset = 0; | |
853 | for (auto [byte_offset, byte_length] : byte_extents) { | |
854 | auto ctx = new io::ReadResult::C_ImageReadRequest( | |
855 | aio_comp, buffer_offset, {{byte_offset, byte_length}}); | |
856 | buffer_offset += byte_length; | |
857 | ||
858 | Request req; | |
859 | req.method(boost::beast::http::verb::get); | |
860 | ||
861 | std::stringstream range; | |
862 | ceph_assert(byte_length > 0); | |
863 | range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1); | |
864 | req.set(boost::beast::http::field::range, range.str()); | |
865 | ||
866 | issue( | |
867 | std::move(req), | |
868 | [this, byte_offset=byte_offset, byte_length=byte_length, ctx] | |
869 | (int r, Response&& response) { | |
870 | handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl, | |
871 | ctx); | |
872 | }); | |
873 | } | |
874 | } | |
875 | ||
876 | template <typename I> | |
877 | void HttpClient<I>::handle_read(int r, Response&& response, | |
878 | uint64_t byte_offset, uint64_t byte_length, | |
879 | bufferlist* data, Context* on_finish) { | |
880 | ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", " | |
881 | << "r=" << r << dendl; | |
882 | ||
883 | if (r < 0) { | |
884 | lderr(m_cct) << "failed to read requested byte range: " | |
885 | << cpp_strerror(r) << dendl; | |
886 | on_finish->complete(r); | |
887 | return; | |
888 | } else if (response.result() != boost::beast::http::status::partial_content) { | |
889 | lderr(m_cct) << "failed to retrieve requested byte range: HTTP " | |
890 | << response.result() << dendl; | |
891 | on_finish->complete(-EIO); | |
892 | return; | |
893 | } else if (byte_length != response.body().size()) { | |
894 | lderr(m_cct) << "unexpected short range read: " | |
895 | << "wanted=" << byte_length << ", " | |
896 | << "received=" << response.body().size() << dendl; | |
897 | on_finish->complete(-EINVAL); | |
898 | return; | |
899 | } | |
900 | ||
901 | data->clear(); | |
902 | data->append(response.body()); | |
903 | on_finish->complete(data->length()); | |
904 | } | |
905 | ||
906 | template <typename I> | |
907 | void HttpClient<I>::issue(std::shared_ptr<Work>&& work) { | |
908 | boost::asio::post(m_strand, [this, work=std::move(work)]() mutable { | |
909 | m_http_session->issue(std::move(work)); }); | |
910 | } | |
911 | ||
912 | template <typename I> | |
913 | void HttpClient<I>::create_http_session(Context* on_finish) { | |
914 | ldout(m_cct, 15) << dendl; | |
915 | ||
916 | ceph_assert(m_http_session == nullptr); | |
917 | switch (m_url_spec.scheme) { | |
918 | case URL_SCHEME_HTTP: | |
919 | m_http_session = std::make_unique<PlainHttpSession>(this); | |
920 | break; | |
921 | case URL_SCHEME_HTTPS: | |
922 | m_http_session = std::make_unique<SslHttpSession>(this); | |
923 | break; | |
924 | default: | |
925 | ceph_assert(false); | |
926 | break; | |
927 | } | |
928 | ||
929 | m_http_session->init(on_finish); | |
930 | } | |
931 | ||
932 | template <typename I> | |
933 | void HttpClient<I>::shut_down_http_session(Context* on_finish) { | |
934 | ldout(m_cct, 15) << dendl; | |
935 | ||
936 | if (m_http_session == nullptr) { | |
937 | on_finish->complete(0); | |
938 | return; | |
939 | } | |
940 | ||
941 | m_http_session->shut_down(on_finish); | |
942 | } | |
943 | ||
944 | } // namespace migration | |
945 | } // namespace librbd | |
946 | ||
947 | template class librbd::migration::HttpClient<librbd::ImageCtx>; |