1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV2.h"
6 #include <seastar/core/lowres_clock.hh>
7 #include <fmt/format.h>
8 #if FMT_VERSION >= 60000
9 #include <fmt/chrono.h>
13 #include "include/msgr.h"
14 #include "include/random.h"
16 #include "crimson/auth/AuthClient.h"
17 #include "crimson/auth/AuthServer.h"
20 #include "Dispatcher.h"
23 #include "SocketConnection.h"
24 #include "SocketMessenger.h"
26 #ifdef UNIT_TESTS_BUILT
27 #include "Interceptor.h"
30 using namespace ceph::msgr::v2
;
34 // TODO: apply the same logging policy to Protocol V1
35 // Log levels in V2 Protocol:
36 // * error level, something error that cause connection to terminate:
39 // * warn level: something unusual that identifies connection fault or replacement:
40 // - unstable network;
41 // - incompatible peer;
44 // - connection reset;
45 // * info level, something very important to show connection lifecycle,
46 // which doesn't happen very frequently;
47 // * debug level, important logs for debugging, including:
48 // - all the messages sent/received (-->/<==);
49 // - all the frames exchanged (WRITE/GOT);
50 // - important fields updated (UPDATE);
51 // - connection state transitions (TRIGGER);
52 // * trace level, trivial logs showing:
53 // - the exact bytes being sent/received (SEND/RECV(bytes));
54 // - detailed information of sub-frames;
55 // - integrity checks;
57 seastar::logger
& logger() {
58 return crimson::get_logger(ceph_subsys_ms
);
61 void abort_in_fault() {
62 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
65 void abort_protocol() {
66 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
69 void abort_in_close(crimson::net::ProtocolV2
& proto
) {
74 inline void expect_tag(const Tag
& expected
,
76 crimson::net::SocketConnection
& conn
,
78 if (actual
!= expected
) {
79 logger().warn("{} {} received wrong tag: {}, expected {}",
81 static_cast<uint32_t>(actual
),
82 static_cast<uint32_t>(expected
));
87 inline void unexpected_tag(const Tag
& unexpected
,
88 crimson::net::SocketConnection
& conn
,
90 logger().warn("{} {} received unexpected tag: {}",
91 conn
, where
, static_cast<uint32_t>(unexpected
));
95 inline uint64_t generate_client_cookie() {
96 return ceph::util::generate_random_number
<uint64_t>(
97 1, std::numeric_limits
<uint64_t>::max());
100 } // namespace anonymous
103 struct fmt::formatter
<seastar::lowres_system_clock::time_point
> {
104 // ignore the format string
105 template <typename ParseContext
>
106 constexpr auto parse(ParseContext
&ctx
) { return ctx
.begin(); }
108 template <typename FormatContext
>
109 auto format(const seastar::lowres_system_clock::time_point
& t
,
110 FormatContext
& ctx
) {
111 std::time_t tt
= std::chrono::duration_cast
<std::chrono::seconds
>(
112 t
.time_since_epoch()).count();
113 auto milliseconds
= (t
.time_since_epoch() %
114 std::chrono::seconds(1)).count();
115 return fmt::format_to(ctx
.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
116 fmt::localtime(tt
), milliseconds
);
121 inline ostream
& operator<<(
122 ostream
& out
, const seastar::lowres_system_clock::time_point
& t
)
124 return out
<< fmt::format("{}", t
);
128 namespace crimson::net
{
130 #ifdef UNIT_TESTS_BUILT
131 void intercept(Breakpoint bp
, bp_type_t type
,
132 SocketConnection
& conn
, SocketRef
& socket
) {
133 if (conn
.interceptor
) {
134 auto action
= conn
.interceptor
->intercept(conn
, Breakpoint(bp
));
135 socket
->set_trap(type
, action
, &conn
.interceptor
->blocker
);
139 #define INTERCEPT_CUSTOM(bp, type) \
140 intercept({bp}, type, conn, socket)
142 #define INTERCEPT_FRAME(tag, type) \
143 intercept({static_cast<Tag>(tag), type}, \
146 #define INTERCEPT_N_RW(bp) \
147 if (conn.interceptor) { \
148 auto action = conn.interceptor->intercept(conn, {bp}); \
149 ceph_assert(action != bp_action_t::BLOCK); \
150 if (action == bp_action_t::FAULT) { \
156 #define INTERCEPT_CUSTOM(bp, type)
157 #define INTERCEPT_FRAME(tag, type)
158 #define INTERCEPT_N_RW(bp)
161 seastar::future
<> ProtocolV2::Timer::backoff(double seconds
)
163 logger().warn("{} waiting {} seconds ...", conn
, seconds
);
166 as
= seastar::abort_source();
167 auto dur
= std::chrono::duration_cast
<seastar::lowres_clock::duration
>(
168 std::chrono::duration
<double>(seconds
));
169 return seastar::sleep_abortable(dur
, *as
170 ).handle_exception_type([this] (const seastar::sleep_aborted
& e
) {
171 logger().debug("{} wait aborted", conn
);
176 ProtocolV2::ProtocolV2(Dispatcher
& dispatcher
,
177 SocketConnection
& conn
,
178 SocketMessenger
& messenger
)
179 : Protocol(proto_t::v2
, dispatcher
, conn
),
180 messenger
{messenger
},
184 ProtocolV2::~ProtocolV2() {}
186 void ProtocolV2::start_connect(const entity_addr_t
& _peer_addr
,
187 const entity_type_t
& _peer_type
)
189 ceph_assert(state
== state_t::NONE
);
190 ceph_assert(!socket
);
191 conn
.peer_addr
= _peer_addr
;
192 conn
.target_addr
= _peer_addr
;
193 conn
.set_peer_type(_peer_type
);
194 conn
.policy
= messenger
.get_policy(_peer_type
);
195 client_cookie
= generate_client_cookie();
196 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
197 " policy(lossy={}, server={}, standby={}, resetcheck={})",
198 conn
, _peer_addr
, ceph_entity_type_name(_peer_type
), client_cookie
,
199 conn
.policy
.lossy
, conn
.policy
.server
,
200 conn
.policy
.standby
, conn
.policy
.resetcheck
);
201 messenger
.register_conn(
202 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
203 execute_connecting();
206 void ProtocolV2::start_accept(SocketRef
&& sock
,
207 const entity_addr_t
& _peer_addr
)
209 ceph_assert(state
== state_t::NONE
);
210 ceph_assert(!socket
);
211 // until we know better
212 conn
.target_addr
= _peer_addr
;
213 conn
.set_ephemeral_port(_peer_addr
.get_port(),
214 SocketConnection::side_t::acceptor
);
215 socket
= std::move(sock
);
216 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn
, _peer_addr
);
217 messenger
.accept_conn(
218 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
222 // TODO: Frame related implementations, probably to a separate class.
224 void ProtocolV2::enable_recording()
231 seastar::future
<Socket::tmp_buf
> ProtocolV2::read_exactly(size_t bytes
)
233 if (unlikely(record_io
)) {
234 return socket
->read_exactly(bytes
)
235 .then([this] (auto bl
) {
236 rxbuf
.append(buffer::create(bl
.share()));
240 return socket
->read_exactly(bytes
);
244 seastar::future
<bufferlist
> ProtocolV2::read(size_t bytes
)
246 if (unlikely(record_io
)) {
247 return socket
->read(bytes
)
248 .then([this] (auto buf
) {
253 return socket
->read(bytes
);
257 seastar::future
<> ProtocolV2::write(bufferlist
&& buf
)
259 if (unlikely(record_io
)) {
262 return socket
->write(std::move(buf
));
265 seastar::future
<> ProtocolV2::write_flush(bufferlist
&& buf
)
267 if (unlikely(record_io
)) {
270 return socket
->write_flush(std::move(buf
));
273 size_t ProtocolV2::get_current_msg_size() const
275 ceph_assert(!rx_segments_desc
.empty());
277 // we don't include SegmentIndex::Msg::HEADER.
278 for (__u8 idx
= 1; idx
< rx_segments_desc
.size(); idx
++) {
279 sum
+= rx_segments_desc
[idx
].length
;
284 seastar::future
<Tag
> ProtocolV2::read_main_preamble()
286 return read_exactly(FRAME_PREAMBLE_SIZE
)
287 .then([this] (auto bl
) {
288 if (session_stream_handlers
.rx
) {
289 session_stream_handlers
.rx
->reset_rx_handler();
291 bl = session_stream_handlers.rx->authenticated_decrypt_update(
292 std::move(bl), segment_t::DEFAULT_ALIGNMENT);
296 // I expect ceph_le32 will make the endian conversion for me. Passing
297 // everything through ::Decode is unnecessary.
298 const auto& main_preamble
= \
299 *reinterpret_cast<const preamble_block_t
*>(bl
.get());
300 logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
301 conn
, bl
.size(), (int)main_preamble
.tag
,
302 (int)main_preamble
.num_segments
, main_preamble
.crc
);
304 // verify preamble's CRC before any further processing
305 const auto rx_crc
= ceph_crc32c(0,
306 reinterpret_cast<const unsigned char*>(&main_preamble
),
307 sizeof(main_preamble
) - sizeof(main_preamble
.crc
));
308 if (rx_crc
!= main_preamble
.crc
) {
309 logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
310 conn
, rx_crc
, main_preamble
.crc
);
314 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
315 if (main_preamble
.num_segments
< 1 ||
316 main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
317 logger().warn("{} unsupported num_segments={}",
318 conn
, main_preamble
.num_segments
);
321 if (main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
322 logger().warn("{} num_segments too much: {}",
323 conn
, main_preamble
.num_segments
);
327 rx_segments_desc
.clear();
328 rx_segments_data
.clear();
330 for (std::uint8_t idx
= 0; idx
< main_preamble
.num_segments
; idx
++) {
331 logger().trace("{} GOT frame segment: len={} align={}",
332 conn
, main_preamble
.segments
[idx
].length
,
333 main_preamble
.segments
[idx
].alignment
);
334 rx_segments_desc
.emplace_back(main_preamble
.segments
[idx
]);
337 INTERCEPT_FRAME(main_preamble
.tag
, bp_type_t::READ
);
338 return static_cast<Tag
>(main_preamble
.tag
);
342 seastar::future
<> ProtocolV2::read_frame_payload()
344 ceph_assert(!rx_segments_desc
.empty());
345 ceph_assert(rx_segments_data
.empty());
347 return seastar::do_until(
348 [this] { return rx_segments_desc
.size() == rx_segments_data
.size(); },
350 // description of current segment to read
351 const auto& cur_rx_desc
= rx_segments_desc
.at(rx_segments_data
.size());
352 // TODO: create aligned and contiguous buffer from socket
353 if (cur_rx_desc
.alignment
!= segment_t::DEFAULT_ALIGNMENT
) {
354 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
355 conn
, cur_rx_desc
.alignment
, rx_segments_data
.size());
357 // TODO: create aligned and contiguous buffer from socket
358 return read_exactly(cur_rx_desc
.length
)
359 .then([this] (auto tmp_bl
) {
360 logger().trace("{} RECV({}) frame segment[{}]",
361 conn
, tmp_bl
.size(), rx_segments_data
.size());
363 data
.append(buffer::create(std::move(tmp_bl
)));
364 if (session_stream_handlers
.rx
) {
368 rx_segments_data
.emplace_back(std::move(data
));
372 // TODO: get_epilogue_size()
373 ceph_assert(!session_stream_handlers
.rx
);
374 return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE
);
375 }).then([this] (auto bl
) {
376 logger().trace("{} RECV({}) frame epilogue", conn
, bl
.size());
379 if (session_stream_handlers
.rx
) {
383 auto& epilogue
= *reinterpret_cast<const epilogue_plain_block_t
*>(bl
.get());
384 for (std::uint8_t idx
= 0; idx
< rx_segments_data
.size(); idx
++) {
385 const __u32 expected_crc
= epilogue
.crc_values
[idx
];
386 const __u32 calculated_crc
= rx_segments_data
[idx
].crc32c(-1);
387 if (expected_crc
!= calculated_crc
) {
388 logger().warn("{} message integrity check failed at index {}:"
389 " expected_crc={} calculated_crc={}",
390 conn
, (unsigned int)idx
, expected_crc
, calculated_crc
);
393 logger().trace("{} message integrity check success at index {}: crc={}",
394 conn
, (unsigned int)idx
, expected_crc
);
397 late_flags
= epilogue
.late_flags
;
399 logger().trace("{} GOT frame epilogue: late_flags={}",
400 conn
, (unsigned)late_flags
);
402 // we do have a mechanism that allows transmitter to start sending message
403 // and abort after putting entire data field on wire. This will be used by
404 // the kernel client to avoid unnecessary buffering.
405 if (late_flags
& FRAME_FLAGS_LATEABRT
) {
413 seastar::future
<> ProtocolV2::write_frame(F
&frame
, bool flush
)
415 auto bl
= frame
.get_buffer(session_stream_handlers
);
416 const auto main_preamble
= reinterpret_cast<const preamble_block_t
*>(bl
.front().c_str());
417 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
418 conn
, bl
.length(), (int)main_preamble
->tag
,
419 (int)main_preamble
->num_segments
, main_preamble
->crc
);
420 INTERCEPT_FRAME(main_preamble
->tag
, bp_type_t::WRITE
);
422 return write_flush(std::move(bl
));
424 return write(std::move(bl
));
428 void ProtocolV2::trigger_state(state_t _state
, write_state_t _write_state
, bool reentrant
)
430 if (!reentrant
&& _state
== state
) {
431 logger().error("{} is not allowed to re-trigger state {}",
432 conn
, get_state_name(state
));
435 logger().debug("{} TRIGGER {}, was {}",
436 conn
, get_state_name(_state
), get_state_name(state
));
438 set_write_state(_write_state
);
441 void ProtocolV2::fault(bool backoff
, const char* func_name
, std::exception_ptr eptr
)
443 if (conn
.policy
.lossy
) {
444 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
445 conn
, func_name
, get_state_name(state
), eptr
);
448 } else if (conn
.policy
.server
||
449 (conn
.policy
.standby
&&
450 (!is_queued() && conn
.sent
.empty()))) {
451 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
452 conn
, func_name
, get_state_name(state
), eptr
);
454 } else if (backoff
) {
455 logger().info("{} {}: fault at {}, going to WAIT -- {}",
456 conn
, func_name
, get_state_name(state
), eptr
);
459 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
460 conn
, func_name
, get_state_name(state
), eptr
);
461 execute_connecting();
465 void ProtocolV2::dispatch_reset()
467 (void) seastar::with_gate(pending_dispatch
, [this] {
468 return dispatcher
.ms_handle_reset(
469 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
470 }).handle_exception([this] (std::exception_ptr eptr
) {
471 logger().error("{} ms_handle_reset caught exception: {}", conn
, eptr
);
472 ceph_abort("unexpected exception from ms_handle_reset()");
476 void ProtocolV2::reset_session(bool full
)
482 client_cookie
= generate_client_cookie();
485 (void) seastar::with_gate(pending_dispatch
, [this] {
486 return dispatcher
.ms_handle_remote_reset(
487 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
488 }).handle_exception([this] (std::exception_ptr eptr
) {
489 logger().error("{} ms_handle_remote_reset caught exception: {}", conn
, eptr
);
490 ceph_abort("unexpected exception from ms_handle_remote_reset()");
495 seastar::future
<entity_type_t
, entity_addr_t
> ProtocolV2::banner_exchange()
497 // 1. prepare and send banner
498 bufferlist banner_payload
;
499 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
500 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
503 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
504 auto len_payload
= static_cast<uint16_t>(banner_payload
.length());
505 encode(len_payload
, bl
, 0);
506 bl
.claim_append(banner_payload
);
507 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
508 "required={}, banner=\"{}\"",
509 conn
, bl
.length(), len_payload
,
510 CEPH_MSGR2_SUPPORTED_FEATURES
, CEPH_MSGR2_REQUIRED_FEATURES
,
511 CEPH_BANNER_V2_PREFIX
);
512 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE
, bp_type_t::WRITE
);
513 return write_flush(std::move(bl
)).then([this] {
514 // 2. read peer banner
515 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
516 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ
, bp_type_t::READ
);
517 return read_exactly(banner_len
); // or read exactly?
518 }).then([this] (auto bl
) {
519 // 3. process peer banner and read banner_payload
520 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
521 logger().debug("{} RECV({}) banner: \"{}\"",
523 std::string((const char*)bl
.get(), banner_prefix_len
));
525 if (memcmp(bl
.get(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
) != 0) {
526 if (memcmp(bl
.get(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
527 logger().warn("{} peer is using V1 protocol", conn
);
529 logger().warn("{} peer sent bad banner", conn
);
533 bl
.trim_front(banner_prefix_len
);
535 uint16_t payload_len
;
537 buf
.append(buffer::create(std::move(bl
)));
538 auto ti
= buf
.cbegin();
540 decode(payload_len
, ti
);
541 } catch (const buffer::error
&e
) {
542 logger().warn("{} decode banner payload len failed", conn
);
545 logger().debug("{} GOT banner: payload_len={}", conn
, payload_len
);
546 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ
, bp_type_t::READ
);
547 return read(payload_len
);
548 }).then([this] (bufferlist bl
) {
549 // 4. process peer banner_payload and send HelloFrame
550 auto p
= bl
.cbegin();
551 uint64_t peer_supported_features
;
552 uint64_t peer_required_features
;
554 decode(peer_supported_features
, p
);
555 decode(peer_required_features
, p
);
556 } catch (const buffer::error
&e
) {
557 logger().warn("{} decode banner payload failed", conn
);
560 logger().debug("{} RECV({}) banner features: supported={} required={}",
562 peer_supported_features
, peer_required_features
);
564 // Check feature bit compatibility
565 uint64_t supported_features
= CEPH_MSGR2_SUPPORTED_FEATURES
;
566 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
567 if ((required_features
& peer_supported_features
) != required_features
) {
568 logger().error("{} peer does not support all required features"
569 " required={} peer_supported={}",
570 conn
, required_features
, peer_supported_features
);
571 abort_in_close(*this);
573 if ((supported_features
& peer_required_features
) != peer_required_features
) {
574 logger().error("{} we do not support all peer required features"
575 " peer_required={} supported={}",
576 conn
, peer_required_features
, supported_features
);
577 abort_in_close(*this);
579 this->peer_required_features
= peer_required_features
;
580 if (this->peer_required_features
== 0) {
581 this->connection_features
= msgr2_required
;
584 auto hello
= HelloFrame::Encode(messenger
.get_mytype(),
586 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
587 conn
, ceph_entity_type_name(messenger
.get_mytype()),
589 return write_frame(hello
);
591 //5. read peer HelloFrame
592 return read_main_preamble();
593 }).then([this] (Tag tag
) {
594 expect_tag(Tag::HELLO
, tag
, conn
, __func__
);
595 return read_frame_payload();
597 // 6. process peer HelloFrame
598 auto hello
= HelloFrame::Decode(rx_segments_data
.back());
599 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
600 conn
, ceph_entity_type_name(hello
.entity_type()),
602 return seastar::make_ready_future
<entity_type_t
, entity_addr_t
>(
603 hello
.entity_type(), hello
.peer_addr());
609 seastar::future
<> ProtocolV2::handle_auth_reply()
611 return read_main_preamble()
612 .then([this] (Tag tag
) {
614 case Tag::AUTH_BAD_METHOD
:
615 return read_frame_payload().then([this] {
616 // handle_auth_bad_method() logic
617 auto bad_method
= AuthBadMethodFrame::Decode(rx_segments_data
.back());
618 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
619 "allowed_methods={}, allowed_modes={}",
620 conn
, bad_method
.method(), cpp_strerror(bad_method
.result()),
621 bad_method
.allowed_methods(), bad_method
.allowed_modes());
622 ceph_assert(messenger
.get_auth_client());
623 int r
= messenger
.get_auth_client()->handle_auth_bad_method(
624 conn
.shared_from_this(), auth_meta
,
625 bad_method
.method(), bad_method
.result(),
626 bad_method
.allowed_methods(), bad_method
.allowed_modes());
628 logger().warn("{} auth_client handle_auth_bad_method returned {}",
632 return client_auth(bad_method
.allowed_methods());
634 case Tag::AUTH_REPLY_MORE
:
635 return read_frame_payload().then([this] {
636 // handle_auth_reply_more() logic
637 auto auth_more
= AuthReplyMoreFrame::Decode(rx_segments_data
.back());
638 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
639 conn
, auth_more
.auth_payload().length());
640 ceph_assert(messenger
.get_auth_client());
641 // let execute_connecting() take care of the thrown exception
642 auto reply
= messenger
.get_auth_client()->handle_auth_reply_more(
643 conn
.shared_from_this(), auth_meta
, auth_more
.auth_payload());
644 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
645 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
646 conn
, reply
.length());
647 return write_frame(more_reply
);
649 return handle_auth_reply();
652 return read_frame_payload().then([this] {
653 // handle_auth_done() logic
654 auto auth_done
= AuthDoneFrame::Decode(rx_segments_data
.back());
655 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
656 conn
, auth_done
.global_id(),
657 ceph_con_mode_name(auth_done
.con_mode()),
658 auth_done
.auth_payload().length());
659 ceph_assert(messenger
.get_auth_client());
660 int r
= messenger
.get_auth_client()->handle_auth_done(
661 conn
.shared_from_this(), auth_meta
,
662 auth_done
.global_id(),
663 auth_done
.con_mode(),
664 auth_done
.auth_payload());
666 logger().warn("{} auth_client handle_auth_done returned {}", conn
, r
);
669 auth_meta
->con_mode
= auth_done
.con_mode();
671 ceph_assert(!auth_meta
->is_mode_secure());
672 session_stream_handlers
= { nullptr, nullptr };
673 return finish_auth();
676 unexpected_tag(tag
, conn
, __func__
);
677 return seastar::now();
683 seastar::future
<> ProtocolV2::client_auth(std::vector
<uint32_t> &allowed_methods
)
685 // send_auth_request() logic
686 ceph_assert(messenger
.get_auth_client());
689 auto [auth_method
, preferred_modes
, bl
] =
690 messenger
.get_auth_client()->get_auth_request(conn
.shared_from_this(), auth_meta
);
691 auth_meta
->auth_method
= auth_method
;
692 auto frame
= AuthRequestFrame::Encode(auth_method
, preferred_modes
, bl
);
693 logger().debug("{} WRITE AuthRequestFrame: method={},"
694 " preferred_modes={}, payload_len={}",
695 conn
, auth_method
, preferred_modes
, bl
.length());
696 return write_frame(frame
).then([this] {
697 return handle_auth_reply();
699 } catch (const crimson::auth::error
& e
) {
700 logger().error("{} get_initial_auth_request returned {}", conn
, e
);
702 abort_in_close(*this);
703 return seastar::now();
707 seastar::future
<ProtocolV2::next_step_t
>
708 ProtocolV2::process_wait()
710 return read_frame_payload().then([this] {
711 // handle_wait() logic
712 logger().debug("{} GOT WaitFrame", conn
);
713 WaitFrame::Decode(rx_segments_data
.back());
714 return next_step_t::wait
;
718 seastar::future
<ProtocolV2::next_step_t
>
719 ProtocolV2::client_connect()
721 // send_client_ident() logic
723 if (conn
.policy
.lossy
) {
724 flags
|= CEPH_MSG_CONNECT_LOSSY
;
727 auto client_ident
= ClientIdentFrame::Encode(
728 messenger
.get_myaddrs(),
730 messenger
.get_myname().num(),
732 conn
.policy
.features_supported
,
733 conn
.policy
.features_required
| msgr2_required
, flags
,
736 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
737 " gs={}, features_supported={}, features_required={},"
738 " flags={}, cookie={}",
739 conn
, messenger
.get_myaddrs(), conn
.target_addr
,
740 messenger
.get_myname().num(), global_seq
,
741 conn
.policy
.features_supported
,
742 conn
.policy
.features_required
| msgr2_required
,
743 flags
, client_cookie
);
744 return write_frame(client_ident
).then([this] {
745 return read_main_preamble();
746 }).then([this] (Tag tag
) {
748 case Tag::IDENT_MISSING_FEATURES
:
749 return read_frame_payload().then([this] {
750 // handle_ident_missing_features() logic
751 auto ident_missing
= IdentMissingFeaturesFrame::Decode(rx_segments_data
.back());
752 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
753 " (client does not support all server features)",
754 conn
, ident_missing
.features());
756 return next_step_t::none
;
759 return process_wait();
760 case Tag::SERVER_IDENT
:
761 return read_frame_payload().then([this] {
762 // handle_server_ident() logic
764 auto server_ident
= ServerIdentFrame::Decode(rx_segments_data
.back());
765 logger().debug("{} GOT ServerIdentFrame:"
766 " addrs={}, gid={}, gs={},"
767 " features_supported={}, features_required={},"
768 " flags={}, cookie={}",
770 server_ident
.addrs(), server_ident
.gid(),
771 server_ident
.global_seq(),
772 server_ident
.supported_features(),
773 server_ident
.required_features(),
774 server_ident
.flags(), server_ident
.cookie());
776 // is this who we intended to talk to?
777 // be a bit forgiving here, since we may be connecting based on addresses parsed out
778 // of mon_host or something.
779 if (!server_ident
.addrs().contains(conn
.target_addr
)) {
780 logger().warn("{} peer identifies as {}, does not include {}",
781 conn
, server_ident
.addrs(), conn
.target_addr
);
782 throw std::system_error(
783 make_error_code(crimson::net::error::bad_peer_address
));
786 server_cookie
= server_ident
.cookie();
788 // TODO: change peer_addr to entity_addrvec_t
789 if (server_ident
.addrs().front() != conn
.peer_addr
) {
790 logger().warn("{} peer advertises as {}, does not match {}",
791 conn
, server_ident
.addrs(), conn
.peer_addr
);
792 throw std::system_error(
793 make_error_code(crimson::net::error::bad_peer_address
));
795 conn
.set_peer_id(server_ident
.gid());
796 conn
.set_features(server_ident
.supported_features() &
797 conn
.policy
.features_supported
);
798 peer_global_seq
= server_ident
.global_seq();
800 bool lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
801 if (lossy
!= conn
.policy
.lossy
) {
802 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn
, lossy
);
803 conn
.policy
.lossy
= lossy
;
805 if (lossy
&& (connect_seq
!= 0 || server_cookie
!= 0)) {
806 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
807 conn
, connect_seq
, server_cookie
);
812 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
815 unexpected_tag(tag
, conn
, "post_client_connect");
816 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
822 seastar::future
<ProtocolV2::next_step_t
>
823 ProtocolV2::client_reconnect()
825 // send_reconnect() logic
826 auto reconnect
= ReconnectFrame::Encode(messenger
.get_myaddrs(),
832 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
833 " server_cookie={}, gs={}, cs={}, msg_seq={}",
834 conn
, messenger
.get_myaddrs(),
835 client_cookie
, server_cookie
,
836 global_seq
, connect_seq
, conn
.in_seq
);
837 return write_frame(reconnect
).then([this] {
838 return read_main_preamble();
839 }).then([this] (Tag tag
) {
841 case Tag::SESSION_RETRY_GLOBAL
:
842 return read_frame_payload().then([this] {
843 // handle_session_retry_global() logic
844 auto retry
= RetryGlobalFrame::Decode(rx_segments_data
.back());
845 logger().warn("{} GOT RetryGlobalFrame: gs={}",
846 conn
, retry
.global_seq());
847 return messenger
.get_global_seq(retry
.global_seq()).then([this] (auto gs
) {
849 logger().warn("{} UPDATE: gs={} for retry global", conn
, global_seq
);
850 return client_reconnect();
853 case Tag::SESSION_RETRY
:
854 return read_frame_payload().then([this] {
855 // handle_session_retry() logic
856 auto retry
= RetryFrame::Decode(rx_segments_data
.back());
857 logger().warn("{} GOT RetryFrame: cs={}",
858 conn
, retry
.connect_seq());
859 connect_seq
= retry
.connect_seq() + 1;
860 logger().warn("{} UPDATE: cs={}", conn
, connect_seq
);
861 return client_reconnect();
863 case Tag::SESSION_RESET
:
864 return read_frame_payload().then([this] {
865 // handle_session_reset() logic
866 auto reset
= ResetFrame::Decode(rx_segments_data
.back());
867 logger().warn("{} GOT ResetFrame: full={}", conn
, reset
.full());
868 reset_session(reset
.full());
869 return client_connect();
872 return process_wait();
873 case Tag::SESSION_RECONNECT_OK
:
874 return read_frame_payload().then([this] {
875 // handle_reconnect_ok() logic
876 auto reconnect_ok
= ReconnectOkFrame::Decode(rx_segments_data
.back());
877 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
878 conn
, reconnect_ok
.msg_seq());
879 requeue_up_to(reconnect_ok
.msg_seq());
880 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
883 unexpected_tag(tag
, conn
, "post_client_reconnect");
884 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
890 void ProtocolV2::execute_connecting()
892 trigger_state(state_t::CONNECTING
, write_state_t::delay
, true);
896 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
897 // we don't know my socket_port yet
898 conn
.set_ephemeral_port(0, SocketConnection::side_t::none
);
899 return messenger
.get_global_seq().then([this] (auto gs
) {
901 assert(client_cookie
!= 0);
902 if (!conn
.policy
.lossy
&& server_cookie
!= 0) {
904 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
905 conn
, global_seq
, connect_seq
);
906 } else { // conn.policy.lossy || server_cookie == 0
907 assert(connect_seq
== 0);
908 assert(server_cookie
== 0);
909 logger().debug("{} UPDATE: gs={} for connect", conn
, global_seq
);
912 return wait_write_exit();
914 if (unlikely(state
!= state_t::CONNECTING
)) {
915 logger().debug("{} triggered {} before Socket::connect()",
916 conn
, get_state_name(state
));
920 (void) with_gate(pending_dispatch
, [sock
= std::move(socket
)] () mutable {
921 return sock
->close().then([sock
= std::move(sock
)] {});
924 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING
);
925 return Socket::connect(conn
.peer_addr
);
926 }).then([this](SocketRef sock
) {
927 logger().debug("{} socket connected", conn
);
928 if (unlikely(state
!= state_t::CONNECTING
)) {
929 logger().debug("{} triggered {} during Socket::connect()",
930 conn
, get_state_name(state
));
931 return sock
->close().then([sock
= std::move(sock
)] {
935 socket
= std::move(sock
);
936 return seastar::now();
938 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
939 session_stream_handlers
= { nullptr, nullptr };
941 return banner_exchange();
942 }).then([this] (entity_type_t _peer_type
,
943 entity_addr_t _my_addr_from_peer
) {
944 if (conn
.get_peer_type() != _peer_type
) {
945 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
946 conn
, ceph_entity_type_name(conn
.get_peer_type()),
947 ceph_entity_type_name(_peer_type
));
949 abort_in_close(*this);
951 conn
.set_ephemeral_port(_my_addr_from_peer
.get_port(),
952 SocketConnection::side_t::connector
);
953 if (unlikely(_my_addr_from_peer
.is_legacy())) {
954 logger().warn("{} peer sent a legacy address for me: {}",
955 conn
, _my_addr_from_peer
);
956 throw std::system_error(
957 make_error_code(crimson::net::error::bad_peer_address
));
959 _my_addr_from_peer
.set_type(entity_addr_t::TYPE_MSGR2
);
960 return messenger
.learned_addr(_my_addr_from_peer
, conn
);
962 return client_auth();
964 if (server_cookie
== 0) {
965 ceph_assert(connect_seq
== 0);
966 return client_connect();
968 ceph_assert(connect_seq
> 0);
969 return client_reconnect();
971 }).then([this] (next_step_t next
) {
972 if (unlikely(state
!= state_t::CONNECTING
)) {
973 logger().debug("{} triggered {} at the end of execute_connecting()",
974 conn
, get_state_name(state
));
978 case next_step_t::ready
: {
979 (void) seastar::with_gate(pending_dispatch
, [this] {
980 return dispatcher
.ms_handle_connect(
981 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
982 }).handle_exception([this] (std::exception_ptr eptr
) {
983 logger().error("{} ms_handle_connect caught exception: {}", conn
, eptr
);
984 ceph_abort("unexpected exception from ms_handle_connect()");
986 logger().info("{} connected:"
987 " gs={}, pgs={}, cs={}, client_cookie={},"
988 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
989 conn
, global_seq
, peer_global_seq
, connect_seq
,
990 client_cookie
, server_cookie
, conn
.in_seq
,
991 conn
.out_seq
, conn
.out_q
.size());
995 case next_step_t::wait
: {
996 logger().info("{} execute_connecting(): going to WAIT", conn
);
1001 ceph_abort("impossible next step");
1004 }).handle_exception([this] (std::exception_ptr eptr
) {
1005 if (state
!= state_t::CONNECTING
) {
1006 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
1007 conn
, get_state_name(state
), eptr
);
1008 assert(state
== state_t::CLOSING
||
1009 state
== state_t::REPLACING
);
1013 if (conn
.policy
.server
||
1014 (conn
.policy
.standby
&&
1015 (!is_queued() && conn
.sent
.empty()))) {
1016 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
1017 " going to STANDBY -- {}",
1018 conn
, get_state_name(state
), eptr
);
1021 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
1022 conn
, get_state_name(state
), eptr
);
1023 execute_wait(false);
1031 seastar::future
<> ProtocolV2::_auth_bad_method(int r
)
1033 // _auth_bad_method() logic
1035 auto [allowed_methods
, allowed_modes
] =
1036 messenger
.get_auth_server()->get_supported_auth_methods(conn
.get_peer_type());
1037 auto bad_method
= AuthBadMethodFrame::Encode(
1038 auth_meta
->auth_method
, r
, allowed_methods
, allowed_modes
);
1039 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
1040 "allowed_methods={}, allowed_modes={})",
1041 conn
, auth_meta
->auth_method
, cpp_strerror(r
),
1042 allowed_methods
, allowed_modes
);
1043 return write_frame(bad_method
).then([this] {
1044 return server_auth();
1048 seastar::future
<> ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
1050 // _handle_auth_request() logic
1051 ceph_assert(messenger
.get_auth_server());
1053 int r
= messenger
.get_auth_server()->handle_auth_request(
1054 conn
.shared_from_this(), auth_meta
,
1055 more
, auth_meta
->auth_method
, auth_payload
,
1060 auto auth_done
= AuthDoneFrame::Encode(
1061 conn
.peer_global_id
, auth_meta
->con_mode
, reply
);
1062 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
1063 conn
, conn
.peer_global_id
,
1064 ceph_con_mode_name(auth_meta
->con_mode
), reply
.length());
1065 return write_frame(auth_done
).then([this] {
1066 ceph_assert(auth_meta
);
1068 ceph_assert(!auth_meta
->is_mode_secure());
1069 session_stream_handlers
= { nullptr, nullptr };
1070 return finish_auth();
1075 auto more
= AuthReplyMoreFrame::Encode(reply
);
1076 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
1077 conn
, reply
.length());
1078 return write_frame(more
).then([this] {
1079 return read_main_preamble();
1080 }).then([this] (Tag tag
) {
1081 expect_tag(Tag::AUTH_REQUEST_MORE
, tag
, conn
, __func__
);
1082 return read_frame_payload();
1084 auto auth_more
= AuthRequestMoreFrame::Decode(rx_segments_data
.back());
1085 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
1086 conn
, auth_more
.auth_payload().length());
1087 return _handle_auth_request(auth_more
.auth_payload(), true);
1091 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn
);
1093 return seastar::now();
1096 logger().warn("{} auth_server handle_auth_request returned {}", conn
, r
);
1097 return _auth_bad_method(r
);
1102 seastar::future
<> ProtocolV2::server_auth()
1104 return read_main_preamble()
1105 .then([this] (Tag tag
) {
1106 expect_tag(Tag::AUTH_REQUEST
, tag
, conn
, __func__
);
1107 return read_frame_payload();
1109 // handle_auth_request() logic
1110 auto request
= AuthRequestFrame::Decode(rx_segments_data
.back());
1111 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1113 conn
, request
.method(), request
.preferred_modes(),
1114 request
.auth_payload().length());
1115 auth_meta
->auth_method
= request
.method();
1116 auth_meta
->con_mode
= messenger
.get_auth_server()->pick_con_mode(
1117 conn
.get_peer_type(), auth_meta
->auth_method
,
1118 request
.preferred_modes());
1119 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
1120 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn
);
1121 return _auth_bad_method(-EOPNOTSUPP
);
1123 return _handle_auth_request(request
.auth_payload(), false);
1127 seastar::future
<ProtocolV2::next_step_t
>
1128 ProtocolV2::send_wait()
1130 auto wait
= WaitFrame::Encode();
1131 logger().debug("{} WRITE WaitFrame", conn
);
1132 return write_frame(wait
).then([] {
1133 return next_step_t::wait
;
1137 seastar::future
<ProtocolV2::next_step_t
>
1138 ProtocolV2::reuse_connection(
1139 ProtocolV2
* existing_proto
, bool do_reset
,
1140 bool reconnect
, uint64_t conn_seq
, uint64_t msg_seq
)
1142 existing_proto
->trigger_replacing(reconnect
,
1145 std::move(auth_meta
),
1146 std::move(session_stream_handlers
),
1149 conn
.get_peer_name(),
1150 connection_features
,
1153 #ifdef UNIT_TESTS_BUILT
1154 if (conn
.interceptor
) {
1155 conn
.interceptor
->register_conn_replaced(conn
);
1158 // close this connection because all the necessary information is delivered
1159 // to the exisiting connection, and jump to error handling code to abort the
1161 abort_in_close(*this);
1162 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1165 seastar::future
<ProtocolV2::next_step_t
>
1166 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn
)
1168 // handle_existing_connection() logic
1169 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1170 existing_conn
->protocol
.get());
1171 ceph_assert(existing_proto
);
1172 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1173 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1174 conn
, global_seq
, peer_global_seq
, connect_seq
,
1175 client_cookie
, server_cookie
,
1176 existing_conn
, get_state_name(existing_proto
->state
),
1177 existing_proto
->global_seq
,
1178 existing_proto
->peer_global_seq
,
1179 existing_proto
->connect_seq
,
1180 existing_proto
->client_cookie
,
1181 existing_proto
->server_cookie
);
1183 if (existing_proto
->state
== state_t::REPLACING
) {
1184 logger().warn("{} server_connect: racing replace happened while"
1185 " replacing existing connection {}, send wait.",
1186 conn
, *existing_conn
);
1190 if (existing_proto
->peer_global_seq
> peer_global_seq
) {
1191 logger().warn("{} server_connect:"
1192 " this is a stale connection, because peer_global_seq({})"
1193 " < existing->peer_global_seq({}), close this connection"
1194 " in favor of existing connection {}",
1195 conn
, peer_global_seq
,
1196 existing_proto
->peer_global_seq
, *existing_conn
);
1200 if (existing_conn
->policy
.lossy
) {
1201 // existing connection can be thrown out in favor of this one
1202 logger().warn("{} server_connect:"
1203 " existing connection {} is a lossy channel. Close existing in favor of"
1204 " this connection", conn
, *existing_conn
);
1205 existing_proto
->dispatch_reset();
1206 (void) existing_proto
->close();
1208 if (unlikely(state
!= state_t::ACCEPTING
)) {
1209 logger().debug("{} triggered {} in execute_accepting()",
1210 conn
, get_state_name(state
));
1213 execute_establishing();
1214 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1217 if (existing_proto
->server_cookie
!= 0) {
1218 if (existing_proto
->client_cookie
!= client_cookie
) {
1219 // Found previous session
1220 // peer has reset and we're going to reuse the existing connection
1221 // by replacing the socket
1222 logger().warn("{} server_connect:"
1223 " found new session (cs={})"
1224 " when existing {} is with stale session (cs={}, ss={}),"
1225 " peer must have reset",
1226 conn
, client_cookie
,
1227 *existing_conn
, existing_proto
->client_cookie
,
1228 existing_proto
->server_cookie
);
1229 return reuse_connection(existing_proto
, conn
.policy
.resetcheck
);
1231 // session establishment interrupted between client_ident and server_ident,
1233 logger().warn("{} server_connect: found client session with existing {}"
1234 " matched (cs={}, ss={}), continuing session establishment",
1235 conn
, *existing_conn
, client_cookie
, existing_proto
->server_cookie
);
1236 return reuse_connection(existing_proto
);
1239 // Looks like a connection race: server and client are both connecting to
1240 // each other at the same time.
1241 if (existing_proto
->client_cookie
!= client_cookie
) {
1242 if (existing_conn
->peer_wins()) {
1243 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1244 " and win, reusing existing {}",
1245 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1246 return reuse_connection(existing_proto
);
1248 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1249 " and lose to existing {}, ask client to wait",
1250 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1251 return existing_conn
->keepalive().then([this] {
1256 logger().warn("{} server_connect: found client session with existing {}"
1257 " matched (cs={}, ss={}), continuing session establishment",
1258 conn
, *existing_conn
, client_cookie
, existing_proto
->server_cookie
);
1259 return reuse_connection(existing_proto
);
1264 seastar::future
<ProtocolV2::next_step_t
>
1265 ProtocolV2::server_connect()
1267 return read_frame_payload().then([this] {
1268 // handle_client_ident() logic
1269 auto client_ident
= ClientIdentFrame::Decode(rx_segments_data
.back());
1270 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1271 " gid={}, gs={}, features_supported={},"
1272 " features_required={}, flags={}, cookie={}",
1273 conn
, client_ident
.addrs(), client_ident
.target_addr(),
1274 client_ident
.gid(), client_ident
.global_seq(),
1275 client_ident
.supported_features(),
1276 client_ident
.required_features(),
1277 client_ident
.flags(), client_ident
.cookie());
1279 if (client_ident
.addrs().empty() ||
1280 client_ident
.addrs().front() == entity_addr_t()) {
1281 logger().warn("{} oops, client_ident.addrs() is empty", conn
);
1282 throw std::system_error(
1283 make_error_code(crimson::net::error::bad_peer_address
));
1285 if (!messenger
.get_myaddrs().contains(client_ident
.target_addr())) {
1286 logger().warn("{} peer is trying to reach {} which is not us ({})",
1287 conn
, client_ident
.target_addr(), messenger
.get_myaddrs());
1288 throw std::system_error(
1289 make_error_code(crimson::net::error::bad_peer_address
));
1291 // TODO: change peer_addr to entity_addrvec_t
1292 entity_addr_t paddr
= client_ident
.addrs().front();
1293 if ((paddr
.is_msgr2() || paddr
.is_any()) &&
1294 paddr
.is_same_host(conn
.target_addr
)) {
1297 logger().warn("{} peer's address {} is not v2 or not the same host with {}",
1298 conn
, paddr
, conn
.target_addr
);
1299 throw std::system_error(
1300 make_error_code(crimson::net::error::bad_peer_address
));
1302 conn
.peer_addr
= paddr
;
1303 logger().debug("{} UPDATE: peer_addr={}", conn
, conn
.peer_addr
);
1304 conn
.target_addr
= conn
.peer_addr
;
1305 if (!conn
.policy
.lossy
&& !conn
.policy
.server
&& conn
.target_addr
.get_port() <= 0) {
1306 logger().warn("{} we don't know how to reconnect to peer {}",
1307 conn
, conn
.target_addr
);
1308 throw std::system_error(
1309 make_error_code(crimson::net::error::bad_peer_address
));
1312 conn
.set_peer_id(client_ident
.gid());
1313 client_cookie
= client_ident
.cookie();
1315 uint64_t feat_missing
=
1316 (conn
.policy
.features_required
| msgr2_required
) &
1317 ~(uint64_t)client_ident
.supported_features();
1319 auto ident_missing_features
= IdentMissingFeaturesFrame::Encode(feat_missing
);
1320 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1321 conn
, feat_missing
);
1322 return write_frame(ident_missing_features
).then([] {
1323 return next_step_t::wait
;
1326 connection_features
=
1327 client_ident
.supported_features() & conn
.policy
.features_supported
;
1328 logger().debug("{} UPDATE: connection_features={}", conn
, connection_features
);
1330 peer_global_seq
= client_ident
.global_seq();
1332 // Looks good so far, let's check if there is already an existing connection
1335 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1337 if (existing_conn
) {
1338 if (existing_conn
->protocol
->proto_type
!= proto_t::v2
) {
1339 logger().warn("{} existing connection {} proto version is {}, close existing",
1340 conn
, *existing_conn
,
1341 static_cast<int>(existing_conn
->protocol
->proto_type
));
1342 // should unregister the existing from msgr atomically
1343 (void) existing_conn
->close();
1345 return handle_existing_connection(existing_conn
);
1349 if (unlikely(state
!= state_t::ACCEPTING
)) {
1350 logger().debug("{} triggered {} in execute_accepting()",
1351 conn
, get_state_name(state
));
1354 execute_establishing();
1355 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1359 seastar::future
<ProtocolV2::next_step_t
>
1360 ProtocolV2::read_reconnect()
1362 return read_main_preamble()
1363 .then([this] (Tag tag
) {
1364 expect_tag(Tag::SESSION_RECONNECT
, tag
, conn
, "read_reconnect");
1365 return server_reconnect();
1369 seastar::future
<ProtocolV2::next_step_t
>
1370 ProtocolV2::send_retry(uint64_t connect_seq
)
1372 auto retry
= RetryFrame::Encode(connect_seq
);
1373 logger().warn("{} WRITE RetryFrame: cs={}", conn
, connect_seq
);
1374 return write_frame(retry
).then([this] {
1375 return read_reconnect();
1379 seastar::future
<ProtocolV2::next_step_t
>
1380 ProtocolV2::send_retry_global(uint64_t global_seq
)
1382 auto retry
= RetryGlobalFrame::Encode(global_seq
);
1383 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn
, global_seq
);
1384 return write_frame(retry
).then([this] {
1385 return read_reconnect();
1389 seastar::future
<ProtocolV2::next_step_t
>
1390 ProtocolV2::send_reset(bool full
)
1392 auto reset
= ResetFrame::Encode(full
);
1393 logger().warn("{} WRITE ResetFrame: full={}", conn
, full
);
1394 return write_frame(reset
).then([this] {
1395 return read_main_preamble();
1396 }).then([this] (Tag tag
) {
1397 expect_tag(Tag::CLIENT_IDENT
, tag
, conn
, "post_send_reset");
1398 return server_connect();
1402 seastar::future
<ProtocolV2::next_step_t
>
1403 ProtocolV2::server_reconnect()
1405 return read_frame_payload().then([this] {
1406 // handle_reconnect() logic
1407 auto reconnect
= ReconnectFrame::Decode(rx_segments_data
.back());
1409 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1410 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1411 conn
, reconnect
.addrs(),
1412 reconnect
.client_cookie(), reconnect
.server_cookie(),
1413 reconnect
.global_seq(), reconnect
.connect_seq(),
1414 reconnect
.msg_seq());
1416 // can peer_addrs be changed on-the-fly?
1417 // TODO: change peer_addr to entity_addrvec_t
1418 entity_addr_t paddr
= reconnect
.addrs().front();
1419 if (paddr
.is_msgr2() || paddr
.is_any()) {
1422 logger().warn("{} peer's address {} is not v2", conn
, paddr
);
1423 throw std::system_error(
1424 make_error_code(crimson::net::error::bad_peer_address
));
1426 if (conn
.peer_addr
== entity_addr_t()) {
1427 conn
.peer_addr
= paddr
;
1428 } else if (conn
.peer_addr
!= paddr
) {
1429 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1430 " reconnect failed",
1431 conn
, paddr
, conn
.peer_addr
);
1432 throw std::system_error(
1433 make_error_code(crimson::net::error::bad_peer_address
));
1435 peer_global_seq
= reconnect
.global_seq();
1437 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1439 if (!existing_conn
) {
1440 // there is no existing connection therefore cannot reconnect to previous
1442 logger().warn("{} server_reconnect: no existing connection from address {},"
1443 " reseting client", conn
, conn
.peer_addr
);
1444 return send_reset(true);
1447 if (existing_conn
->protocol
->proto_type
!= proto_t::v2
) {
1448 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1449 "close existing and reset client.",
1450 conn
, *existing_conn
,
1451 static_cast<int>(existing_conn
->protocol
->proto_type
));
1452 (void) existing_conn
->close();
1453 return send_reset(true);
1456 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1457 existing_conn
->protocol
.get());
1458 ceph_assert(existing_proto
);
1459 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1460 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1461 conn
, global_seq
, peer_global_seq
, reconnect
.connect_seq(),
1462 reconnect
.client_cookie(), reconnect
.server_cookie(),
1464 get_state_name(existing_proto
->state
),
1465 existing_proto
->global_seq
,
1466 existing_proto
->peer_global_seq
,
1467 existing_proto
->connect_seq
,
1468 existing_proto
->client_cookie
,
1469 existing_proto
->server_cookie
);
1471 if (existing_proto
->state
== state_t::REPLACING
) {
1472 logger().warn("{} server_reconnect: racing replace happened while "
1473 " replacing existing connection {}, retry global.",
1474 conn
, *existing_conn
);
1475 return send_retry_global(existing_proto
->peer_global_seq
);
1478 if (existing_proto
->client_cookie
!= reconnect
.client_cookie()) {
1479 logger().warn("{} server_reconnect:"
1480 " client_cookie mismatch with existing connection {},"
1481 " cc={} rcc={}. I must have reset, reseting client.",
1482 conn
, *existing_conn
,
1483 existing_proto
->client_cookie
, reconnect
.client_cookie());
1484 return send_reset(conn
.policy
.resetcheck
);
1485 } else if (existing_proto
->server_cookie
== 0) {
1486 // this happens when:
1487 // - a connects to b
1488 // - a sends client_ident
1489 // - b gets client_ident, sends server_ident and sets cookie X
1490 // - connection fault
1491 // - b reconnects to a with cookie X, connect_seq=1
1492 // - a has cookie==0
1493 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1494 " server_ident with existing connection {}."
1495 " Asking peer to resume session establishment",
1496 conn
, existing_proto
->client_cookie
, *existing_conn
);
1497 return send_reset(false);
1500 if (existing_proto
->peer_global_seq
> reconnect
.global_seq()) {
1501 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1502 " with existing connection {},"
1503 " ask client to retry global",
1504 conn
, existing_proto
->peer_global_seq
,
1505 reconnect
.global_seq(), *existing_conn
);
1506 return send_retry_global(existing_proto
->peer_global_seq
);
1509 if (existing_proto
->connect_seq
> reconnect
.connect_seq()) {
1510 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1511 " with existing connection {}, ask client to retry",
1512 conn
, reconnect
.connect_seq(),
1513 existing_proto
->connect_seq
, *existing_conn
);
1514 return send_retry(existing_proto
->connect_seq
);
1515 } else if (existing_proto
->connect_seq
== reconnect
.connect_seq()) {
1516 // reconnect race: both peers are sending reconnect messages
1517 if (existing_conn
->peer_wins()) {
1518 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1519 " and win, reusing existing {}",
1520 conn
, reconnect
.connect_seq(), *existing_conn
);
1521 return reuse_connection(
1522 existing_proto
, false,
1523 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1525 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1526 " and lose to existing {}, ask client to wait",
1527 conn
, reconnect
.connect_seq(), *existing_conn
);
1530 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1531 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1532 " reusing existing {}",
1533 conn
, existing_proto
->connect_seq
,
1534 reconnect
.connect_seq(), *existing_conn
);
1535 return reuse_connection(
1536 existing_proto
, false,
1537 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1542 void ProtocolV2::execute_accepting()
1544 trigger_state(state_t::ACCEPTING
, write_state_t::none
, false);
1545 (void) seastar::with_gate(pending_dispatch
, [this] {
1546 return seastar::futurize_apply([this] {
1547 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED
);
1548 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
1549 session_stream_handlers
= { nullptr, nullptr };
1551 return banner_exchange();
1552 }).then([this] (entity_type_t _peer_type
,
1553 entity_addr_t _my_addr_from_peer
) {
1554 ceph_assert(conn
.get_peer_type() == 0);
1555 conn
.set_peer_type(_peer_type
);
1557 conn
.policy
= messenger
.get_policy(_peer_type
);
1558 logger().info("{} UPDATE: peer_type={},"
1559 " policy(lossy={} server={} standby={} resetcheck={})",
1560 conn
, ceph_entity_type_name(_peer_type
),
1561 conn
.policy
.lossy
, conn
.policy
.server
,
1562 conn
.policy
.standby
, conn
.policy
.resetcheck
);
1563 if (messenger
.get_myaddr().get_port() != _my_addr_from_peer
.get_port() ||
1564 messenger
.get_myaddr().get_nonce() != _my_addr_from_peer
.get_nonce()) {
1565 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1566 conn
, _my_addr_from_peer
, messenger
.get_myaddr());
1567 throw std::system_error(
1568 make_error_code(crimson::net::error::bad_peer_address
));
1570 return messenger
.learned_addr(_my_addr_from_peer
, conn
);
1572 return server_auth();
1574 return read_main_preamble();
1575 }).then([this] (Tag tag
) {
1577 case Tag::CLIENT_IDENT
:
1578 return server_connect();
1579 case Tag::SESSION_RECONNECT
:
1580 return server_reconnect();
1582 unexpected_tag(tag
, conn
, "post_server_auth");
1583 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1586 }).then([this] (next_step_t next
) {
1588 case next_step_t::ready
:
1589 assert(state
!= state_t::ACCEPTING
);
1591 case next_step_t::wait
:
1592 if (unlikely(state
!= state_t::ACCEPTING
)) {
1593 logger().debug("{} triggered {} at the end of execute_accepting()",
1594 conn
, get_state_name(state
));
1597 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn
);
1598 execute_server_wait();
1601 ceph_abort("impossible next step");
1603 }).handle_exception([this] (std::exception_ptr eptr
) {
1604 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1605 conn
, get_state_name(state
), eptr
);
1611 // CONNECTING or ACCEPTING state
1613 seastar::future
<> ProtocolV2::finish_auth()
1615 ceph_assert(auth_meta
);
1617 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1618 auth_meta
->session_key
.hmac_sha256(nullptr, rxbuf
);
1619 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1620 ceph_assert(record_io
);
1623 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn
, sig
);
1624 return write_frame(sig_frame
).then([this] {
1625 return read_main_preamble();
1626 }).then([this] (Tag tag
) {
1627 expect_tag(Tag::AUTH_SIGNATURE
, tag
, conn
, "post_finish_auth");
1628 return read_frame_payload();
1630 // handle_auth_signature() logic
1631 auto sig_frame
= AuthSignatureFrame::Decode(rx_segments_data
.back());
1632 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn
, sig_frame
.signature());
1634 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
1635 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(nullptr, txbuf
);
1636 if (sig_frame
.signature() != actual_tx_sig
) {
1637 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1638 " sig_frame.signature()={}",
1639 conn
, actual_tx_sig
, sig_frame
.signature());
1648 void ProtocolV2::execute_establishing() {
1649 trigger_state(state_t::ESTABLISHING
, write_state_t::delay
, false);
1650 (void) seastar::with_gate(pending_dispatch
, [this] {
1651 return dispatcher
.ms_handle_accept(
1652 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1653 }).handle_exception([this] (std::exception_ptr eptr
) {
1654 logger().error("{} ms_handle_accept caught exception: {}", conn
, eptr
);
1655 ceph_abort("unexpected exception from ms_handle_accept()");
1657 messenger
.register_conn(
1658 seastar::static_pointer_cast
<SocketConnection
>(
1659 conn
.shared_from_this()));
1660 messenger
.unaccept_conn(
1661 seastar::static_pointer_cast
<SocketConnection
>(
1662 conn
.shared_from_this()));
1663 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
1664 return seastar::futurize_apply([this] {
1665 return send_server_ident();
1667 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1668 logger().debug("{} triggered {} at the end of execute_establishing()",
1669 conn
, get_state_name(state
));
1672 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1673 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1674 conn
, global_seq
, peer_global_seq
, connect_seq
,
1675 client_cookie
, server_cookie
, conn
.in_seq
,
1676 conn
.out_seq
, conn
.out_q
.size());
1678 }).handle_exception([this] (std::exception_ptr eptr
) {
1679 if (state
!= state_t::ESTABLISHING
) {
1680 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1681 conn
, get_state_name(state
), eptr
);
1682 assert(state
== state_t::CLOSING
||
1683 state
== state_t::REPLACING
);
1686 fault(false, "execute_establishing()", eptr
);
1691 // ESTABLISHING or REPLACING state
1694 ProtocolV2::send_server_ident()
1696 // send_server_ident() logic
1698 // refered to async-conn v2: not assign gs to global_seq
1699 return messenger
.get_global_seq().then([this] (auto gs
) {
1700 logger().debug("{} UPDATE: gs={} for server ident", conn
, global_seq
);
1702 // this is required for the case when this connection is being replaced
1706 if (!conn
.policy
.lossy
) {
1707 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1711 if (conn
.policy
.lossy
) {
1712 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
1715 auto server_ident
= ServerIdentFrame::Encode(
1716 messenger
.get_myaddrs(),
1717 messenger
.get_myname().num(),
1719 conn
.policy
.features_supported
,
1720 conn
.policy
.features_required
| msgr2_required
,
1724 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1725 " gs={}, features_supported={}, features_required={},"
1726 " flags={}, cookie={}",
1727 conn
, messenger
.get_myaddrs(), messenger
.get_myname().num(),
1728 gs
, conn
.policy
.features_supported
,
1729 conn
.policy
.features_required
| msgr2_required
,
1730 flags
, server_cookie
);
1732 conn
.set_features(connection_features
);
1734 return write_frame(server_ident
);
1740 void ProtocolV2::trigger_replacing(bool reconnect
,
1742 SocketRef
&& new_socket
,
1743 AuthConnectionMetaRef
&& new_auth_meta
,
1744 ceph::crypto::onwire::rxtx_t new_rxtx
,
1745 uint64_t new_peer_global_seq
,
1746 uint64_t new_client_cookie
,
1747 entity_name_t new_peer_name
,
1748 uint64_t new_conn_features
,
1749 uint64_t new_connect_seq
,
1750 uint64_t new_msg_seq
)
1752 trigger_state(state_t::REPLACING
, write_state_t::delay
, false);
1756 (void) seastar::with_gate(pending_dispatch
, [this] {
1757 return dispatcher
.ms_handle_accept(
1758 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1759 }).handle_exception([this] (std::exception_ptr eptr
) {
1760 logger().error("{} ms_handle_accept caught exception: {}", conn
, eptr
);
1761 ceph_abort("unexpected exception from ms_handle_accept()");
1763 (void) seastar::with_gate(pending_dispatch
,
1767 new_socket
= std::move(new_socket
),
1768 new_auth_meta
= std::move(new_auth_meta
),
1769 new_rxtx
= std::move(new_rxtx
),
1770 new_client_cookie
, new_peer_name
,
1771 new_conn_features
, new_peer_global_seq
,
1772 new_connect_seq
, new_msg_seq
] () mutable {
1773 return wait_write_exit().then([this, do_reset
] {
1775 reset_session(true);
1777 protocol_timer
.cancel();
1778 return execution_done
.get_future();
1781 new_socket
= std::move(new_socket
),
1782 new_auth_meta
= std::move(new_auth_meta
),
1783 new_rxtx
= std::move(new_rxtx
),
1784 new_client_cookie
, new_peer_name
,
1785 new_conn_features
, new_peer_global_seq
,
1786 new_connect_seq
, new_msg_seq
] () mutable {
1787 if (unlikely(state
!= state_t::REPLACING
)) {
1788 return new_socket
->close().then([sock
= std::move(new_socket
)] {
1794 (void) with_gate(pending_dispatch
, [sock
= std::move(socket
)] () mutable {
1795 return sock
->close().then([sock
= std::move(sock
)] {});
1798 socket
= std::move(new_socket
);
1799 auth_meta
= std::move(new_auth_meta
);
1800 session_stream_handlers
= std::move(new_rxtx
);
1802 peer_global_seq
= new_peer_global_seq
;
1805 connect_seq
= new_connect_seq
;
1806 // send_reconnect_ok() logic
1807 requeue_up_to(new_msg_seq
);
1808 auto reconnect_ok
= ReconnectOkFrame::Encode(conn
.in_seq
);
1809 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn
, conn
.in_seq
);
1810 return write_frame(reconnect_ok
);
1812 client_cookie
= new_client_cookie
;
1813 conn
.set_peer_name(new_peer_name
);
1814 connection_features
= new_conn_features
;
1815 return send_server_ident();
1817 }).then([this, reconnect
] {
1818 if (unlikely(state
!= state_t::REPLACING
)) {
1819 logger().debug("{} triggered {} at the end of trigger_replacing()",
1820 conn
, get_state_name(state
));
1823 logger().info("{} replaced ({}):"
1824 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1825 " in_seq={}, out_seq={}, out_q={}",
1826 conn
, reconnect
? "reconnected" : "connected",
1827 global_seq
, peer_global_seq
, connect_seq
, client_cookie
,
1828 server_cookie
, conn
.in_seq
, conn
.out_seq
, conn
.out_q
.size());
1830 }).handle_exception([this] (std::exception_ptr eptr
) {
1831 if (state
!= state_t::REPLACING
) {
1832 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1833 conn
, get_state_name(state
), eptr
);
1834 assert(state
== state_t::CLOSING
);
1837 fault(true, "trigger_replacing()", eptr
);
1844 ceph::bufferlist
ProtocolV2::do_sweep_messages(
1845 const std::deque
<MessageRef
>& msgs
,
1847 bool require_keepalive
,
1848 std::optional
<utime_t
> _keepalive_ack
,
1851 ceph::bufferlist bl
;
1853 if (unlikely(require_keepalive
)) {
1854 auto keepalive_frame
= KeepAliveFrame::Encode();
1855 bl
.append(keepalive_frame
.get_buffer(session_stream_handlers
));
1856 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2
, bp_type_t::WRITE
);
1859 if (unlikely(_keepalive_ack
.has_value())) {
1860 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(*_keepalive_ack
);
1861 bl
.append(keepalive_ack_frame
.get_buffer(session_stream_handlers
));
1862 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK
, bp_type_t::WRITE
);
1865 if (require_ack
&& !num_msgs
) {
1866 auto ack_frame
= AckFrame::Encode(conn
.in_seq
);
1867 bl
.append(ack_frame
.get_buffer(session_stream_handlers
));
1868 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK
, bp_type_t::WRITE
);
1871 std::for_each(msgs
.begin(), msgs
.begin()+num_msgs
, [this, &bl
](const MessageRef
& msg
) {
1872 // TODO: move to common code
1874 msg
->get_header().src
= messenger
.get_myname();
1876 msg
->encode(conn
.features
, 0);
1878 ceph_assert(!msg
->get_seq() && "message already has seq");
1879 msg
->set_seq(++conn
.out_seq
);
1881 ceph_msg_header
&header
= msg
->get_header();
1882 ceph_msg_footer
&footer
= msg
->get_footer();
1884 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
1885 header
.type
, header
.priority
,
1887 init_le32(0), header
.data_off
,
1888 init_le64(conn
.in_seq
),
1889 footer
.flags
, header
.compat_version
,
1892 auto message
= MessageFrame::Encode(header2
,
1893 msg
->get_payload(), msg
->get_middle(), msg
->get_data());
1894 logger().debug("{} --> #{} === {} ({})",
1895 conn
, msg
->get_seq(), *msg
, msg
->get_type());
1896 bl
.append(message
.get_buffer(session_stream_handlers
));
1897 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE
, bp_type_t::WRITE
);
1903 seastar::future
<> ProtocolV2::read_message(utime_t throttle_stamp
)
1905 return read_frame_payload()
1906 .then([this, throttle_stamp
] {
1907 utime_t recv_stamp
{seastar::lowres_system_clock::now()};
1909 // we need to get the size before std::moving segments data
1910 const size_t cur_msg_size
= get_current_msg_size();
1911 auto msg_frame
= MessageFrame::Decode(std::move(rx_segments_data
));
1912 // XXX: paranoid copy just to avoid oops
1913 ceph_msg_header2 current_header
= msg_frame
.header();
1915 logger().trace("{} got {} + {} + {} byte message,"
1916 " envelope type={} src={} off={} seq={}",
1917 conn
, msg_frame
.front_len(), msg_frame
.middle_len(),
1918 msg_frame
.data_len(), current_header
.type
, conn
.get_peer_name(),
1919 current_header
.data_off
, current_header
.seq
);
1921 ceph_msg_header header
{current_header
.seq
,
1923 current_header
.type
,
1924 current_header
.priority
,
1925 current_header
.version
,
1926 init_le32(msg_frame
.front_len()),
1927 init_le32(msg_frame
.middle_len()),
1928 init_le32(msg_frame
.data_len()),
1929 current_header
.data_off
,
1930 conn
.get_peer_name(),
1931 current_header
.compat_version
,
1932 current_header
.reserved
,
1934 ceph_msg_footer footer
{init_le32(0), init_le32(0),
1935 init_le32(0), init_le64(0), current_header
.flags
};
1937 auto pconn
= seastar::static_pointer_cast
<SocketConnection
>(
1938 conn
.shared_from_this());
1939 Message
*message
= decode_message(nullptr, 0, header
, footer
,
1940 msg_frame
.front(), msg_frame
.middle(), msg_frame
.data(),
1943 logger().warn("{} decode message failed", conn
);
1947 // store reservation size in message, so we don't get confused
1948 // by messages entering the dispatch queue through other paths.
1949 message
->set_dispatch_throttle_size(cur_msg_size
);
1951 message
->set_throttle_stamp(throttle_stamp
);
1952 message
->set_recv_stamp(recv_stamp
);
1953 message
->set_recv_complete_stamp(utime_t
{seastar::lowres_system_clock::now()});
1955 // check received seq#. if it is old, drop the message.
1956 // note that incoming messages may skip ahead. this is convenient for the
1957 // client side queueing because messages can't be renumbered, but the (kernel)
1958 // client will occasionally pull a message out of the sent queue to send
1959 // elsewhere. in that case it doesn't matter if we "got" it or not.
1960 uint64_t cur_seq
= conn
.in_seq
;
1961 if (message
->get_seq() <= cur_seq
) {
1962 logger().error("{} got old message {} <= {} {} {}, discarding",
1963 conn
, message
->get_seq(), cur_seq
, message
, *message
);
1964 if (HAVE_FEATURE(conn
.features
, RECONNECT_SEQ
) &&
1965 conf
.ms_die_on_old_message
) {
1966 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1969 } else if (message
->get_seq() > cur_seq
+ 1) {
1970 logger().error("{} missed message? skipped from seq {} to {}",
1971 conn
, cur_seq
, message
->get_seq());
1972 if (conf
.ms_die_on_skipped_message
) {
1973 ceph_assert(0 == "skipped incoming seq");
1977 // note last received message.
1978 conn
.in_seq
= message
->get_seq();
1979 logger().debug("{} <== #{} === {} ({})",
1980 conn
, message
->get_seq(), *message
, message
->get_type());
1982 ack_writes(current_header
.ack_seq
);
1984 // TODO: change MessageRef with seastar::shared_ptr
1985 auto msg_ref
= MessageRef
{message
, false};
1986 (void) seastar::with_gate(pending_dispatch
, [this, msg
= std::move(msg_ref
)] {
1987 return dispatcher
.ms_dispatch(&conn
, std::move(msg
));
1988 }).handle_exception([this] (std::exception_ptr eptr
) {
1989 logger().error("{} ms_dispatch caught exception: {}", conn
, eptr
);
1990 ceph_abort("unexpected exception from ms_dispatch()");
1995 void ProtocolV2::execute_ready()
1997 assert(conn
.policy
.lossy
|| (client_cookie
!= 0 && server_cookie
!= 0));
1998 trigger_state(state_t::READY
, write_state_t::open
, false);
1999 #ifdef UNIT_TESTS_BUILT
2000 if (conn
.interceptor
) {
2001 conn
.interceptor
->register_conn_ready(conn
);
2004 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
2005 protocol_timer
.cancel();
2006 return seastar::keep_doing([this] {
2007 return read_main_preamble()
2008 .then([this] (Tag tag
) {
2010 case Tag::MESSAGE
: {
2011 return seastar::futurize_apply([this] {
2012 // throttle_message() logic
2013 if (!conn
.policy
.throttler_messages
) {
2014 return seastar::now();
2016 // TODO: message throttler
2018 return seastar::now();
2020 // throttle_bytes() logic
2021 if (!conn
.policy
.throttler_bytes
) {
2022 return seastar::now();
2024 size_t cur_msg_size
= get_current_msg_size();
2025 if (!cur_msg_size
) {
2026 return seastar::now();
2028 logger().trace("{} wants {} bytes from policy throttler {}/{}",
2030 conn
.policy
.throttler_bytes
->get_current(),
2031 conn
.policy
.throttler_bytes
->get_max());
2032 return conn
.policy
.throttler_bytes
->get(cur_msg_size
);
2034 // TODO: throttle_dispatch_queue() logic
2035 utime_t throttle_stamp
{seastar::lowres_system_clock::now()};
2036 return read_message(throttle_stamp
);
2040 return read_frame_payload().then([this] {
2041 // handle_message_ack() logic
2042 auto ack
= AckFrame::Decode(rx_segments_data
.back());
2043 logger().debug("{} GOT AckFrame: seq={}", conn
, ack
.seq());
2044 ack_writes(ack
.seq());
2046 case Tag::KEEPALIVE2
:
2047 return read_frame_payload().then([this] {
2048 // handle_keepalive2() logic
2049 auto keepalive_frame
= KeepAliveFrame::Decode(rx_segments_data
.back());
2050 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
2051 conn
, keepalive_frame
.timestamp());
2052 notify_keepalive_ack(keepalive_frame
.timestamp());
2053 conn
.set_last_keepalive(seastar::lowres_system_clock::now());
2055 case Tag::KEEPALIVE2_ACK
:
2056 return read_frame_payload().then([this] {
2057 // handle_keepalive2_ack() logic
2058 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(rx_segments_data
.back());
2059 conn
.set_last_keepalive_ack(
2060 seastar::lowres_system_clock::time_point
{keepalive_ack_frame
.timestamp()});
2061 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2062 conn
, conn
.last_keepalive_ack
);
2065 unexpected_tag(tag
, conn
, "execute_ready");
2066 return seastar::now();
2070 }).handle_exception([this] (std::exception_ptr eptr
) {
2071 if (state
!= state_t::READY
) {
2072 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2073 conn
, get_state_name(state
), eptr
);
2074 assert(state
== state_t::REPLACING
||
2075 state
== state_t::CLOSING
);
2078 fault(false, "execute_ready()", eptr
);
2085 void ProtocolV2::execute_standby()
2087 trigger_state(state_t::STANDBY
, write_state_t::delay
, true);
2093 void ProtocolV2::notify_write()
2095 if (unlikely(state
== state_t::STANDBY
&& !conn
.policy
.server
)) {
2096 logger().info("{} notify_write(): at {}, going to CONNECTING",
2097 conn
, get_state_name(state
));
2098 execute_connecting();
2104 void ProtocolV2::execute_wait(bool max_backoff
)
2106 trigger_state(state_t::WAIT
, write_state_t::delay
, true);
2110 execution_done
= seastar::with_gate(pending_dispatch
,
2111 [this, max_backoff
] {
2112 double backoff
= protocol_timer
.last_dur();
2114 backoff
= conf
.ms_max_backoff
;
2115 } else if (backoff
> 0) {
2116 backoff
= std::min(conf
.ms_max_backoff
, 2 * backoff
);
2118 backoff
= conf
.ms_initial_backoff
;
2120 return protocol_timer
.backoff(backoff
).then([this] {
2121 if (unlikely(state
!= state_t::WAIT
)) {
2122 logger().debug("{} triggered {} at the end of execute_wait()",
2123 conn
, get_state_name(state
));
2126 logger().info("{} execute_wait(): going to CONNECTING", conn
);
2127 execute_connecting();
2128 }).handle_exception([this] (std::exception_ptr eptr
) {
2129 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2130 conn
, get_state_name(state
), eptr
);
2131 assert(state
== state_t::REPLACING
||
2132 state
== state_t::CLOSING
);
2137 // SERVER_WAIT state
2139 void ProtocolV2::execute_server_wait()
2141 trigger_state(state_t::SERVER_WAIT
, write_state_t::delay
, false);
2142 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
2143 return read_exactly(1).then([this] (auto bl
) {
2144 logger().warn("{} SERVER_WAIT got read, abort", conn
);
2146 }).handle_exception([this] (std::exception_ptr eptr
) {
2147 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2148 conn
, get_state_name(state
), eptr
);
2156 void ProtocolV2::trigger_close()
2158 if (state
== state_t::ACCEPTING
|| state
== state_t::SERVER_WAIT
) {
2159 messenger
.unaccept_conn(
2160 seastar::static_pointer_cast
<SocketConnection
>(
2161 conn
.shared_from_this()));
2162 } else if (state
>= state_t::ESTABLISHING
&& state
< state_t::CLOSING
) {
2163 messenger
.unregister_conn(
2164 seastar::static_pointer_cast
<SocketConnection
>(
2165 conn
.shared_from_this()));
2171 protocol_timer
.cancel();
2173 trigger_state(state_t::CLOSING
, write_state_t::drop
, false);
2174 #ifdef UNIT_TESTS_BUILT
2175 if (conn
.interceptor
) {
2176 conn
.interceptor
->register_conn_closed(conn
);
2181 } // namespace crimson::net