]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/migration/HttpClient.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / librbd / migration / HttpClient.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/migration/HttpClient.h"
5#include "common/dout.h"
6#include "common/errno.h"
7#include "librbd/AsioEngine.h"
8#include "librbd/ImageCtx.h"
9#include "librbd/Utils.h"
10#include "librbd/asio/Utils.h"
11#include "librbd/io/AioCompletion.h"
12#include "librbd/io/ReadResult.h"
13#include "librbd/migration/Utils.h"
14#include <boost/asio/buffer.hpp>
15#include <boost/asio/post.hpp>
16#include <boost/asio/ip/tcp.hpp>
17#include <boost/asio/read.hpp>
18#include <boost/asio/ssl.hpp>
19#include <boost/beast/core.hpp>
20#include <boost/beast/http/read.hpp>
21#include <boost/lexical_cast.hpp>
22#include <deque>
23
24namespace librbd {
25namespace migration {
26
27#define dout_subsys ceph_subsys_rbd
28#undef dout_prefix
29#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
30 << "HttpSession " << this << " " << __func__ \
31 << ": "
32
33/**
34 * boost::beast utilizes non-inheriting template classes for handling plain vs
35 * encrypted TCP streams. Utilize a base-class for handling the majority of the
36 * logic for handling connecting, disconnecting, reseting, and sending requests.
37 */
38
39template <typename I>
40template <typename D>
41class HttpClient<I>::HttpSession : public HttpSessionInterface {
42public:
43 void init(Context* on_finish) override {
44 ceph_assert(m_http_client->m_strand.running_in_this_thread());
45
46 auto cct = m_http_client->m_cct;
47 ldout(cct, 15) << dendl;
48
49 ceph_assert(m_state == STATE_UNINITIALIZED);
50 m_state = STATE_CONNECTING;
51
52 resolve_host(on_finish);
53 }
54
55 void shut_down(Context* on_finish) override {
56 ceph_assert(m_http_client->m_strand.running_in_this_thread());
57
58 auto cct = m_http_client->m_cct;
59 ldout(cct, 15) << dendl;
60
61 ceph_assert(on_finish != nullptr);
62 ceph_assert(m_on_shutdown == nullptr);
63 m_on_shutdown = on_finish;
64
65 auto current_state = m_state;
66 if (current_state == STATE_UNINITIALIZED) {
67 // never initialized or resolve/connect failed
68 on_finish->complete(0);
69 return;
70 }
71
72 m_state = STATE_SHUTTING_DOWN;
73 if (current_state != STATE_READY) {
74 // delay shutdown until current state transition completes
75 return;
76 }
77
78 disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
79 }
80
81 void issue(std::shared_ptr<Work>&& work) override {
82 ceph_assert(m_http_client->m_strand.running_in_this_thread());
83
84 auto cct = m_http_client->m_cct;
85 ldout(cct, 20) << "work=" << work.get() << dendl;
86
87 if (is_shutdown()) {
88 lderr(cct) << "cannot issue HTTP request, client is shutdown"
89 << dendl;
90 work->complete(-ESHUTDOWN, {});
91 return;
92 }
93
94 bool first_issue = m_issue_queue.empty();
95 m_issue_queue.emplace_back(work);
96 if (m_state == STATE_READY && first_issue) {
97 ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
98 finalize_issue(std::move(work));
99 } else if (m_state == STATE_UNINITIALIZED) {
100 ldout(cct, 20) << "resetting HTTP session: work=" << work.get() << dendl;
101 m_state = STATE_RESET_CONNECTING;
102 resolve_host(nullptr);
103 } else {
104 ldout(cct, 20) << "queueing HTTP request: work=" << work.get() << dendl;
105 }
106 }
107
108 void finalize_issue(std::shared_ptr<Work>&& work) {
109 auto cct = m_http_client->m_cct;
110 ldout(cct, 20) << "work=" << work.get() << dendl;
111
112 ++m_in_flight_requests;
113 (*work)(derived().stream());
114 }
115
116 void handle_issue(boost::system::error_code ec,
117 std::shared_ptr<Work>&& work) override {
118 ceph_assert(m_http_client->m_strand.running_in_this_thread());
119
120 auto cct = m_http_client->m_cct;
121 ldout(cct, 20) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
122
123 ceph_assert(m_in_flight_requests > 0);
124 --m_in_flight_requests;
125 if (maybe_finalize_reset()) {
126 // previous request is attempting reset to this request will be resent
127 return;
128 }
129
130 ceph_assert(!m_issue_queue.empty());
131 m_issue_queue.pop_front();
132
133 if (is_shutdown()) {
134 lderr(cct) << "client shutdown during in-flight request" << dendl;
135 work->complete(-ESHUTDOWN, {});
136
137 maybe_finalize_shutdown();
138 return;
139 }
140
141 if (ec) {
142 if (ec == boost::asio::error::bad_descriptor ||
143 ec == boost::asio::error::broken_pipe ||
144 ec == boost::asio::error::connection_reset ||
145 ec == boost::asio::error::operation_aborted ||
146 ec == boost::asio::ssl::error::stream_truncated ||
147 ec == boost::beast::http::error::end_of_stream ||
148 ec == boost::beast::http::error::partial_message) {
149 ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
150 m_issue_queue.push_front(work);
151 } else if (ec == boost::beast::error::timeout) {
152 lderr(cct) << "timed-out while issuing request" << dendl;
153 work->complete(-ETIMEDOUT, {});
154 } else {
155 lderr(cct) << "failed to issue request: " << ec.message() << dendl;
156 work->complete(-ec.value(), {});
157 }
158
159 // attempt to recover the connection
160 reset();
161 return;
162 }
163
164 bool first_receive = m_receive_queue.empty();
165 m_receive_queue.push_back(work);
166 if (first_receive) {
167 receive(std::move(work));
168 }
169
170 // TODO disable pipelining for non-idempotent requests
171
172 // pipeline the next request into the stream
173 if (!m_issue_queue.empty()) {
174 work = m_issue_queue.front();
175 ldout(cct, 20) << "sending http request: work=" << work.get() << dendl;
176 finalize_issue(std::move(work));
177 }
178 }
179
180protected:
181 HttpClient* m_http_client;
182
183 HttpSession(HttpClient* http_client)
184 : m_http_client(http_client), m_resolver(http_client->m_strand) {
185 }
186
187 virtual void connect(boost::asio::ip::tcp::resolver::results_type results,
188 Context* on_finish) = 0;
189 virtual void disconnect(Context* on_finish) = 0;
190
191 void close_socket() {
192 auto cct = m_http_client->m_cct;
193 ldout(cct, 15) << dendl;
194
195 boost::system::error_code ec;
196 boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
197 }
198
199private:
200 enum State {
201 STATE_UNINITIALIZED,
202 STATE_CONNECTING,
203 STATE_READY,
204 STATE_RESET_PENDING,
205 STATE_RESET_DISCONNECTING,
206 STATE_RESET_CONNECTING,
207 STATE_SHUTTING_DOWN,
208 STATE_SHUTDOWN,
209 };
210
211 State m_state = STATE_UNINITIALIZED;
212 boost::asio::ip::tcp::resolver m_resolver;
213
214 Context* m_on_shutdown = nullptr;
215
216 uint64_t m_in_flight_requests = 0;
217 std::deque<std::shared_ptr<Work>> m_issue_queue;
218 std::deque<std::shared_ptr<Work>> m_receive_queue;
219
220 boost::beast::flat_buffer m_buffer;
221 std::optional<boost::beast::http::parser<false, EmptyBody>> m_header_parser;
222 std::optional<boost::beast::http::parser<false, StringBody>> m_parser;
223
224 D& derived() {
225 return static_cast<D&>(*this);
226 }
227
228 void resolve_host(Context* on_finish) {
229 auto cct = m_http_client->m_cct;
230 ldout(cct, 15) << dendl;
231
232 shutdown_socket();
233 m_resolver.async_resolve(
234 m_http_client->m_url_spec.host, m_http_client->m_url_spec.port,
235 [this, on_finish](boost::system::error_code ec, auto results) {
236 handle_resolve_host(ec, results, on_finish); });
237 }
238
239 void handle_resolve_host(
240 boost::system::error_code ec,
241 boost::asio::ip::tcp::resolver::results_type results,
242 Context* on_finish) {
243 auto cct = m_http_client->m_cct;
244 int r = -ec.value();
245 ldout(cct, 15) << "r=" << r << dendl;
246
247 if (ec) {
248 if (ec == boost::asio::error::host_not_found) {
249 r = -ENOENT;
250 } else if (ec == boost::asio::error::host_not_found_try_again) {
251 // TODO: add retry throttle
252 r = -EAGAIN;
253 }
254
255 lderr(cct) << "failed to resolve host '"
256 << m_http_client->m_url_spec.host << "': "
257 << cpp_strerror(r) << dendl;
258 advance_state(STATE_UNINITIALIZED, r, on_finish);
259 return;
260 }
261
262 connect(results, new LambdaContext([this, on_finish](int r) {
263 handle_connect(r, on_finish); }));
264 }
265
266 void handle_connect(int r, Context* on_finish) {
267 auto cct = m_http_client->m_cct;
268 ldout(cct, 15) << "r=" << r << dendl;
269
270 if (r < 0) {
271 lderr(cct) << "failed to connect to host '"
272 << m_http_client->m_url_spec.host << "': "
273 << cpp_strerror(r) << dendl;
274 advance_state(STATE_UNINITIALIZED, r, on_finish);
275 return;
276 }
277
278 advance_state(STATE_READY, 0, on_finish);
279 }
280
281 void handle_shut_down(int r) {
282 auto cct = m_http_client->m_cct;
283 ldout(cct, 15) << "r=" << r << dendl;
284
285 if (r < 0) {
286 lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
287 << dendl;
288 }
289
290 // cancel all in-flight send/receives (if any)
291 shutdown_socket();
292
293 maybe_finalize_shutdown();
294 }
295
296 void maybe_finalize_shutdown() {
297 if (m_in_flight_requests > 0) {
298 return;
299 }
300
301 // cancel any queued IOs
302 fail_queued_work(-ESHUTDOWN);
303
304 advance_state(STATE_SHUTDOWN, 0, nullptr);
305 }
306
307 bool is_shutdown() const {
308 ceph_assert(m_http_client->m_strand.running_in_this_thread());
309 return (m_state == STATE_SHUTTING_DOWN || m_state == STATE_SHUTDOWN);
310 }
311
312 void reset() {
313 ceph_assert(m_http_client->m_strand.running_in_this_thread());
314 ceph_assert(m_state == STATE_READY);
315
316 auto cct = m_http_client->m_cct;
317 ldout(cct, 15) << dendl;
318
319 m_state = STATE_RESET_PENDING;
320 maybe_finalize_reset();
321 }
322
323 bool maybe_finalize_reset() {
324 if (m_state != STATE_RESET_PENDING) {
325 return false;
326 }
327
328 if (m_in_flight_requests > 0) {
329 return true;
330 }
331
332 ceph_assert(m_http_client->m_strand.running_in_this_thread());
333 auto cct = m_http_client->m_cct;
334 ldout(cct, 15) << dendl;
335
336 m_buffer.clear();
337
338 // move in-flight request back to the front of the issue queue
339 m_issue_queue.insert(m_issue_queue.begin(),
340 m_receive_queue.begin(), m_receive_queue.end());
341 m_receive_queue.clear();
342
343 m_state = STATE_RESET_DISCONNECTING;
344 disconnect(new LambdaContext([this](int r) { handle_reset(r); }));
345 return true;
346 }
347
348 void handle_reset(int r) {
349 auto cct = m_http_client->m_cct;
350 ldout(cct, 15) << "r=" << r << dendl;
351
352 if (r < 0) {
353 lderr(cct) << "failed to disconnect stream: '" << cpp_strerror(r)
354 << dendl;
355 }
356
357 advance_state(STATE_RESET_CONNECTING, r, nullptr);
358 }
359
360 int shutdown_socket() {
361 if (!boost::beast::get_lowest_layer(
362 derived().stream()).socket().is_open()) {
363 return 0;
364 }
365
366 auto cct = m_http_client->m_cct;
367 ldout(cct, 15) << dendl;
368
369 boost::system::error_code ec;
370 boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(
371 boost::asio::ip::tcp::socket::shutdown_both, ec);
372
373 if (ec && ec != boost::beast::errc::not_connected) {
374 lderr(cct) << "failed to shutdown socket: " << ec.message() << dendl;
375 return -ec.value();
376 }
377
378 close_socket();
379 return 0;
380 }
381
382 void receive(std::shared_ptr<Work>&& work) {
383 auto cct = m_http_client->m_cct;
384 ldout(cct, 15) << "work=" << work.get() << dendl;
385
386 ceph_assert(!m_receive_queue.empty());
387 ++m_in_flight_requests;
388
389 // receive the response for this request
390 m_parser.emplace();
391 if (work->header_only()) {
392 // HEAD requests don't trasfer data but the parser still cares about max
393 // content-length
394 m_header_parser.emplace();
395 m_header_parser->body_limit(std::numeric_limits<uint64_t>::max());
396
397 boost::beast::http::async_read_header(
398 derived().stream(), m_buffer, *m_header_parser,
399 [this, work=std::move(work)]
400 (boost::beast::error_code ec, std::size_t) mutable {
401 handle_receive(ec, std::move(work));
402 });
403 } else {
404 m_parser->body_limit(1 << 25); // max RBD object size
405 boost::beast::http::async_read(
406 derived().stream(), m_buffer, *m_parser,
407 [this, work=std::move(work)]
408 (boost::beast::error_code ec, std::size_t) mutable {
409 handle_receive(ec, std::move(work));
410 });
411 }
412 }
413
414 void handle_receive(boost::system::error_code ec,
415 std::shared_ptr<Work>&& work) {
416 auto cct = m_http_client->m_cct;
417 ldout(cct, 15) << "work=" << work.get() << ", r=" << -ec.value() << dendl;
418
419 ceph_assert(m_in_flight_requests > 0);
420 --m_in_flight_requests;
421 if (maybe_finalize_reset()) {
422 // previous request is attempting reset to this request will be resent
423 return;
424 }
425
426 ceph_assert(!m_receive_queue.empty());
427 m_receive_queue.pop_front();
428
429 if (is_shutdown()) {
430 lderr(cct) << "client shutdown with in-flight request" << dendl;
431 work->complete(-ESHUTDOWN, {});
432
433 maybe_finalize_shutdown();
434 return;
435 }
436
437 if (ec) {
438 if (ec == boost::asio::error::bad_descriptor ||
439 ec == boost::asio::error::broken_pipe ||
440 ec == boost::asio::error::connection_reset ||
441 ec == boost::asio::error::operation_aborted ||
442 ec == boost::asio::ssl::error::stream_truncated ||
443 ec == boost::beast::http::error::end_of_stream ||
444 ec == boost::beast::http::error::partial_message) {
445 ldout(cct, 5) << "remote peer stream closed, retrying request" << dendl;
446 m_receive_queue.push_front(work);
447 } else if (ec == boost::beast::error::timeout) {
448 lderr(cct) << "timed-out while issuing request" << dendl;
449 work->complete(-ETIMEDOUT, {});
450 } else {
451 lderr(cct) << "failed to issue request: " << ec.message() << dendl;
452 work->complete(-ec.value(), {});
453 }
454
455 reset();
456 return;
457 }
458
459 Response response;
460 if (work->header_only()) {
461 m_parser.emplace(std::move(*m_header_parser));
462 }
463 response = m_parser->release();
464
465 // basic response code handling in a common location
466 int r = 0;
467 auto result = response.result();
468 if (result == boost::beast::http::status::not_found) {
469 lderr(cct) << "requested resource does not exist" << dendl;
470 r = -ENOENT;
471 } else if (result == boost::beast::http::status::forbidden) {
472 lderr(cct) << "permission denied attempting to access resource" << dendl;
473 r = -EACCES;
474 } else if (boost::beast::http::to_status_class(result) !=
475 boost::beast::http::status_class::successful) {
476 lderr(cct) << "failed to retrieve size: HTTP " << result << dendl;
477 r = -EIO;
478 }
479
480 bool need_eof = response.need_eof();
481 if (r < 0) {
482 work->complete(r, {});
483 } else {
484 work->complete(0, std::move(response));
485 }
486
487 if (need_eof) {
488 ldout(cct, 20) << "reset required for non-pipelined response: "
489 << "work=" << work.get() << dendl;
490 reset();
491 } else if (!m_receive_queue.empty()) {
492 auto work = m_receive_queue.front();
493 receive(std::move(work));
494 }
495 }
496
497 void advance_state(State next_state, int r, Context* on_finish) {
498 auto cct = m_http_client->m_cct;
499 auto current_state = m_state;
500 ldout(cct, 15) << "current_state=" << current_state << ", "
501 << "next_state=" << next_state << ", "
502 << "r=" << r << dendl;
503
504 m_state = next_state;
505 if (current_state == STATE_CONNECTING) {
506 if (next_state == STATE_UNINITIALIZED) {
507 shutdown_socket();
508 on_finish->complete(r);
509 return;
510 } else if (next_state == STATE_READY) {
511 on_finish->complete(r);
512 return;
513 }
514 } else if (current_state == STATE_SHUTTING_DOWN) {
515 if (next_state == STATE_READY) {
516 // shut down requested while connecting/resetting
517 disconnect(new LambdaContext([this](int r) { handle_shut_down(r); }));
518 return;
519 } else if (next_state == STATE_UNINITIALIZED ||
520 next_state == STATE_SHUTDOWN ||
521 next_state == STATE_RESET_CONNECTING) {
522 ceph_assert(m_on_shutdown != nullptr);
523 m_on_shutdown->complete(r);
524 return;
525 }
526 } else if (current_state == STATE_RESET_DISCONNECTING) {
527 // disconnected from peer -- ignore errors and reconnect
528 ceph_assert(next_state == STATE_RESET_CONNECTING);
529 ceph_assert(on_finish == nullptr);
530 shutdown_socket();
531 resolve_host(nullptr);
532 return;
533 } else if (current_state == STATE_RESET_CONNECTING) {
534 ceph_assert(on_finish == nullptr);
535 if (next_state == STATE_READY) {
536 // restart queued IO
537 if (!m_issue_queue.empty()) {
538 auto& work = m_issue_queue.front();
539 finalize_issue(std::move(work));
540 }
541 return;
542 } else if (next_state == STATE_UNINITIALIZED) {
543 shutdown_socket();
544
545 // fail all queued IO
546 fail_queued_work(r);
547 return;
548 }
549 }
550
551 lderr(cct) << "unexpected state transition: "
552 << "current_state=" << current_state << ", "
553 << "next_state=" << next_state << dendl;
554 ceph_assert(false);
555 }
556
557 void complete_work(std::shared_ptr<Work> work, int r, Response&& response) {
558 auto cct = m_http_client->m_cct;
559 ldout(cct, 20) << "work=" << work.get() << ", r=" << r << dendl;
560
561 work->complete(r, std::move(response));
562 }
563
564 void fail_queued_work(int r) {
565 auto cct = m_http_client->m_cct;
566 ldout(cct, 10) << "r=" << r << dendl;
567
568 for (auto& work : m_issue_queue) {
569 complete_work(work, r, {});
570 }
571 m_issue_queue.clear();
572 ceph_assert(m_receive_queue.empty());
573 }
574};
575
576#undef dout_prefix
577#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
578 << "PlainHttpSession " << this << " " << __func__ \
579 << ": "
580
581template <typename I>
582class HttpClient<I>::PlainHttpSession : public HttpSession<PlainHttpSession> {
583public:
584 PlainHttpSession(HttpClient* http_client)
585 : HttpSession<PlainHttpSession>(http_client),
586 m_stream(http_client->m_strand) {
587 }
588 ~PlainHttpSession() override {
589 this->close_socket();
590 }
591
592 inline boost::beast::tcp_stream&
593 stream() {
594 return m_stream;
595 }
596
597protected:
598 void connect(boost::asio::ip::tcp::resolver::results_type results,
599 Context* on_finish) override {
600 auto http_client = this->m_http_client;
601 auto cct = http_client->m_cct;
602 ldout(cct, 15) << dendl;
603
604 m_stream.async_connect(
605 results,
1e59de90
TL
606 [on_finish](boost::system::error_code ec, const auto& endpoint) {
607 on_finish->complete(-ec.value());
608 });
f67539c2
TL
609 }
610
611 void disconnect(Context* on_finish) override {
612 on_finish->complete(0);
613 }
614
615private:
616 boost::beast::tcp_stream m_stream;
617
618};
619
620#undef dout_prefix
621#define dout_prefix *_dout << "librbd::migration::HttpClient::" \
622 << "SslHttpSession " << this << " " << __func__ \
623 << ": "
624
625template <typename I>
626class HttpClient<I>::SslHttpSession : public HttpSession<SslHttpSession> {
627public:
628 SslHttpSession(HttpClient* http_client)
629 : HttpSession<SslHttpSession>(http_client),
630 m_stream(http_client->m_strand, http_client->m_ssl_context) {
631 }
632 ~SslHttpSession() override {
633 this->close_socket();
634 }
635
636 inline boost::beast::ssl_stream<boost::beast::tcp_stream>&
637 stream() {
638 return m_stream;
639 }
640
641protected:
642 void connect(boost::asio::ip::tcp::resolver::results_type results,
643 Context* on_finish) override {
644 auto http_client = this->m_http_client;
645 auto cct = http_client->m_cct;
646 ldout(cct, 15) << dendl;
647
648 boost::beast::get_lowest_layer(m_stream).async_connect(
649 results,
1e59de90
TL
650 [this, on_finish](boost::system::error_code ec, const auto& endpoint) {
651 handle_connect(-ec.value(), on_finish);
652 });
f67539c2
TL
653 }
654
655 void disconnect(Context* on_finish) override {
656 auto http_client = this->m_http_client;
657 auto cct = http_client->m_cct;
658 ldout(cct, 15) << dendl;
659
660 if (!m_ssl_enabled) {
661 on_finish->complete(0);
662 return;
663 }
664
665 m_stream.async_shutdown(
666 asio::util::get_callback_adapter([this, on_finish](int r) {
667 shutdown(r, on_finish); }));
668 }
669
670private:
671 boost::beast::ssl_stream<boost::beast::tcp_stream> m_stream;
672 bool m_ssl_enabled = false;
673
674 void handle_connect(int r, Context* on_finish) {
675 auto http_client = this->m_http_client;
676 auto cct = http_client->m_cct;
677 ldout(cct, 15) << dendl;
678
679 if (r < 0) {
680 lderr(cct) << "failed to connect to host '"
681 << http_client->m_url_spec.host << "': "
682 << cpp_strerror(r) << dendl;
683 on_finish->complete(r);
684 return;
685 }
686
687 handshake(on_finish);
688 }
689
690 void handshake(Context* on_finish) {
691 auto http_client = this->m_http_client;
692 auto cct = http_client->m_cct;
693 ldout(cct, 15) << dendl;
694
695 auto& host = http_client->m_url_spec.host;
696 m_stream.set_verify_mode(
697 boost::asio::ssl::verify_peer |
698 boost::asio::ssl::verify_fail_if_no_peer_cert);
699 m_stream.set_verify_callback(
700 [host, next=boost::asio::ssl::host_name_verification(host),
701 ignore_self_signed=http_client->m_ignore_self_signed_cert]
702 (bool preverified, boost::asio::ssl::verify_context& ctx) {
703 if (!preverified && ignore_self_signed) {
704 auto ec = X509_STORE_CTX_get_error(ctx.native_handle());
705 switch (ec) {
706 case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT:
707 case X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN:
708 // ignore self-signed cert issues
709 preverified = true;
710 break;
711 default:
712 break;
713 }
714 }
715 return next(preverified, ctx);
716 });
717
718 // Set SNI Hostname (many hosts need this to handshake successfully)
719 if(!SSL_set_tlsext_host_name(m_stream.native_handle(),
720 http_client->m_url_spec.host.c_str())) {
721 int r = -::ERR_get_error();
722 lderr(cct) << "failed to initialize SNI hostname: " << cpp_strerror(r)
723 << dendl;
724 on_finish->complete(r);
725 return;
726 }
727
728 // Perform the SSL/TLS handshake
729 m_stream.async_handshake(
730 boost::asio::ssl::stream_base::client,
731 asio::util::get_callback_adapter(
732 [this, on_finish](int r) { handle_handshake(r, on_finish); }));
733 }
734
735 void handle_handshake(int r, Context* on_finish) {
736 auto http_client = this->m_http_client;
737 auto cct = http_client->m_cct;
738 ldout(cct, 15) << "r=" << r << dendl;
739
740 if (r < 0) {
741 lderr(cct) << "failed to complete handshake: " << cpp_strerror(r)
742 << dendl;
743 disconnect(new LambdaContext([r, on_finish](int) {
744 on_finish->complete(r); }));
745 return;
746 }
747
748 m_ssl_enabled = true;
749 on_finish->complete(0);
750 }
751
752 void shutdown(int r, Context* on_finish) {
753 auto http_client = this->m_http_client;
754 auto cct = http_client->m_cct;
755 ldout(cct, 15) << "r=" << r << dendl;
756
757 on_finish->complete(r);
758 }
759};
760
761#undef dout_prefix
762#define dout_prefix *_dout << "librbd::migration::HttpClient: " << this \
763 << " " << __func__ << ": "
764
765template <typename I>
766HttpClient<I>::HttpClient(I* image_ctx, const std::string& url)
767 : m_cct(image_ctx->cct), m_image_ctx(image_ctx),
768 m_asio_engine(image_ctx->asio_engine), m_url(url),
769 m_strand(boost::asio::make_strand(*m_asio_engine)),
770 m_ssl_context(boost::asio::ssl::context::sslv23_client) {
771 m_ssl_context.set_default_verify_paths();
772}
773
774template <typename I>
775void HttpClient<I>::open(Context* on_finish) {
776 ldout(m_cct, 10) << "url=" << m_url << dendl;
777
778 int r = util::parse_url(m_cct, m_url, &m_url_spec);
779 if (r < 0) {
780 lderr(m_cct) << "failed to parse url '" << m_url << "': " << cpp_strerror(r)
781 << dendl;
782 on_finish->complete(-EINVAL);
783 return;
784 }
785
786 boost::asio::post(m_strand, [this, on_finish]() mutable {
787 create_http_session(on_finish); });
788}
789
790template <typename I>
791void HttpClient<I>::close(Context* on_finish) {
792 boost::asio::post(m_strand, [this, on_finish]() mutable {
793 shut_down_http_session(on_finish); });
794}
795
796template <typename I>
797void HttpClient<I>::get_size(uint64_t* size, Context* on_finish) {
798 ldout(m_cct, 10) << dendl;
799
800 Request req;
801 req.method(boost::beast::http::verb::head);
802
803 issue(
804 std::move(req), [this, size, on_finish](int r, Response&& response) {
805 handle_get_size(r, std::move(response), size, on_finish);
806 });
807}
808
809template <typename I>
810void HttpClient<I>::handle_get_size(int r, Response&& response, uint64_t* size,
811 Context* on_finish) {
812 ldout(m_cct, 10) << "r=" << r << dendl;
813
814 if (r < 0) {
815 lderr(m_cct) << "failed to retrieve size: " << cpp_strerror(r) << dendl;
816 on_finish->complete(r);
817 return;
818 } else if (!response.has_content_length()) {
819 lderr(m_cct) << "failed to retrieve size: missing content-length" << dendl;
820 on_finish->complete(-EINVAL);
821 return;
822 }
823
824 auto content_length = response[boost::beast::http::field::content_length];
825 try {
826 *size = boost::lexical_cast<uint64_t>(content_length);
827 } catch (boost::bad_lexical_cast&) {
828 lderr(m_cct) << "invalid content-length in response" << dendl;
829 on_finish->complete(-EBADMSG);
830 return;
831 }
832
833 on_finish->complete(0);
834}
835
836template <typename I>
837void HttpClient<I>::read(io::Extents&& byte_extents, bufferlist* data,
838 Context* on_finish) {
839 ldout(m_cct, 20) << dendl;
840
841 auto aio_comp = io::AioCompletion::create_and_start(
842 on_finish, librbd::util::get_image_ctx(m_image_ctx), io::AIO_TYPE_READ);
843 aio_comp->set_request_count(byte_extents.size());
844
845 // utilize ReadResult to assemble multiple byte extents into a single bl
846 // since boost::beast doesn't support multipart responses out-of-the-box
847 io::ReadResult read_result{data};
848 aio_comp->read_result = std::move(read_result);
849 aio_comp->read_result.set_image_extents(byte_extents);
850
851 // issue a range get request for each extent
852 uint64_t buffer_offset = 0;
853 for (auto [byte_offset, byte_length] : byte_extents) {
854 auto ctx = new io::ReadResult::C_ImageReadRequest(
855 aio_comp, buffer_offset, {{byte_offset, byte_length}});
856 buffer_offset += byte_length;
857
858 Request req;
859 req.method(boost::beast::http::verb::get);
860
861 std::stringstream range;
862 ceph_assert(byte_length > 0);
863 range << "bytes=" << byte_offset << "-" << (byte_offset + byte_length - 1);
864 req.set(boost::beast::http::field::range, range.str());
865
866 issue(
867 std::move(req),
868 [this, byte_offset=byte_offset, byte_length=byte_length, ctx]
869 (int r, Response&& response) {
870 handle_read(r, std::move(response), byte_offset, byte_length, &ctx->bl,
871 ctx);
872 });
873 }
874}
875
876template <typename I>
877void HttpClient<I>::handle_read(int r, Response&& response,
878 uint64_t byte_offset, uint64_t byte_length,
879 bufferlist* data, Context* on_finish) {
880 ldout(m_cct, 20) << "bytes=" << byte_offset << "~" << byte_length << ", "
881 << "r=" << r << dendl;
882
883 if (r < 0) {
884 lderr(m_cct) << "failed to read requested byte range: "
885 << cpp_strerror(r) << dendl;
886 on_finish->complete(r);
887 return;
888 } else if (response.result() != boost::beast::http::status::partial_content) {
889 lderr(m_cct) << "failed to retrieve requested byte range: HTTP "
890 << response.result() << dendl;
891 on_finish->complete(-EIO);
892 return;
893 } else if (byte_length != response.body().size()) {
894 lderr(m_cct) << "unexpected short range read: "
895 << "wanted=" << byte_length << ", "
896 << "received=" << response.body().size() << dendl;
897 on_finish->complete(-EINVAL);
898 return;
899 }
900
901 data->clear();
902 data->append(response.body());
903 on_finish->complete(data->length());
904}
905
906template <typename I>
907void HttpClient<I>::issue(std::shared_ptr<Work>&& work) {
908 boost::asio::post(m_strand, [this, work=std::move(work)]() mutable {
909 m_http_session->issue(std::move(work)); });
910}
911
912template <typename I>
913void HttpClient<I>::create_http_session(Context* on_finish) {
914 ldout(m_cct, 15) << dendl;
915
916 ceph_assert(m_http_session == nullptr);
917 switch (m_url_spec.scheme) {
918 case URL_SCHEME_HTTP:
919 m_http_session = std::make_unique<PlainHttpSession>(this);
920 break;
921 case URL_SCHEME_HTTPS:
922 m_http_session = std::make_unique<SslHttpSession>(this);
923 break;
924 default:
925 ceph_assert(false);
926 break;
927 }
928
929 m_http_session->init(on_finish);
930}
931
932template <typename I>
933void HttpClient<I>::shut_down_http_session(Context* on_finish) {
934 ldout(m_cct, 15) << dendl;
935
936 if (m_http_session == nullptr) {
937 on_finish->complete(0);
938 return;
939 }
940
941 m_http_session->shut_down(on_finish);
942}
943
944} // namespace migration
945} // namespace librbd
946
947template class librbd::migration::HttpClient<librbd::ImageCtx>;