1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV2.h"
6 #include <seastar/core/lowres_clock.hh>
7 #include <fmt/format.h>
8 #if FMT_VERSION >= 60000
9 #include <fmt/chrono.h>
13 #include "include/msgr.h"
14 #include "include/random.h"
16 #include "crimson/auth/AuthClient.h"
17 #include "crimson/auth/AuthServer.h"
20 #include "Dispatcher.h"
23 #include "SocketConnection.h"
24 #include "SocketMessenger.h"
26 #ifdef UNIT_TESTS_BUILT
27 #include "Interceptor.h"
30 using namespace ceph::msgr::v2
;
34 // TODO: apply the same logging policy to Protocol V1
35 // Log levels in V2 Protocol:
36 // * error level, something error that cause connection to terminate:
39 // * warn level: something unusual that identifies connection fault or replacement:
40 // - unstable network;
41 // - incompatible peer;
44 // - connection reset;
45 // * info level, something very important to show connection lifecycle,
46 // which doesn't happen very frequently;
47 // * debug level, important logs for debugging, including:
48 // - all the messages sent/received (-->/<==);
49 // - all the frames exchanged (WRITE/GOT);
50 // - important fields updated (UPDATE);
51 // - connection state transitions (TRIGGER);
52 // * trace level, trivial logs showing:
53 // - the exact bytes being sent/received (SEND/RECV(bytes));
54 // - detailed information of sub-frames;
55 // - integrity checks;
57 seastar::logger
& logger() {
58 return crimson::get_logger(ceph_subsys_ms
);
61 void abort_in_fault() {
62 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
65 void abort_protocol() {
66 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
69 void abort_in_close(crimson::net::ProtocolV2
& proto
) {
74 inline void expect_tag(const Tag
& expected
,
76 crimson::net::SocketConnection
& conn
,
78 if (actual
!= expected
) {
79 logger().warn("{} {} received wrong tag: {}, expected {}",
81 static_cast<uint32_t>(actual
),
82 static_cast<uint32_t>(expected
));
87 inline void unexpected_tag(const Tag
& unexpected
,
88 crimson::net::SocketConnection
& conn
,
90 logger().warn("{} {} received unexpected tag: {}",
91 conn
, where
, static_cast<uint32_t>(unexpected
));
95 inline uint64_t generate_client_cookie() {
96 return ceph::util::generate_random_number
<uint64_t>(
97 1, std::numeric_limits
<uint64_t>::max());
100 } // namespace anonymous
103 struct fmt::formatter
<seastar::lowres_system_clock::time_point
> {
104 // ignore the format string
105 template <typename ParseContext
>
106 constexpr auto parse(ParseContext
&ctx
) { return ctx
.begin(); }
108 template <typename FormatContext
>
109 auto format(const seastar::lowres_system_clock::time_point
& t
,
110 FormatContext
& ctx
) {
111 std::time_t tt
= std::chrono::duration_cast
<std::chrono::seconds
>(
112 t
.time_since_epoch()).count();
113 auto milliseconds
= (t
.time_since_epoch() %
114 std::chrono::seconds(1)).count();
115 return fmt::format_to(ctx
.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
116 fmt::localtime(tt
), milliseconds
);
121 inline ostream
& operator<<(
122 ostream
& out
, const seastar::lowres_system_clock::time_point
& t
)
124 return out
<< fmt::format("{}", t
);
128 namespace crimson::net
{
130 #ifdef UNIT_TESTS_BUILT
131 void intercept(Breakpoint bp
, bp_type_t type
,
132 SocketConnection
& conn
, SocketRef
& socket
) {
133 if (conn
.interceptor
) {
134 auto action
= conn
.interceptor
->intercept(conn
, Breakpoint(bp
));
135 socket
->set_trap(type
, action
, &conn
.interceptor
->blocker
);
139 #define INTERCEPT_CUSTOM(bp, type) \
140 intercept({bp}, type, conn, socket)
142 #define INTERCEPT_FRAME(tag, type) \
143 intercept({static_cast<Tag>(tag), type}, \
146 #define INTERCEPT_N_RW(bp) \
147 if (conn.interceptor) { \
148 auto action = conn.interceptor->intercept(conn, {bp}); \
149 ceph_assert(action != bp_action_t::BLOCK); \
150 if (action == bp_action_t::FAULT) { \
156 #define INTERCEPT_CUSTOM(bp, type)
157 #define INTERCEPT_FRAME(tag, type)
158 #define INTERCEPT_N_RW(bp)
161 seastar::future
<> ProtocolV2::Timer::backoff(double seconds
)
163 logger().warn("{} waiting {} seconds ...", conn
, seconds
);
166 as
= seastar::abort_source();
167 auto dur
= std::chrono::duration_cast
<seastar::lowres_clock::duration
>(
168 std::chrono::duration
<double>(seconds
));
169 return seastar::sleep_abortable(dur
, *as
170 ).handle_exception_type([this] (const seastar::sleep_aborted
& e
) {
171 logger().debug("{} wait aborted", conn
);
176 ProtocolV2::ProtocolV2(Dispatcher
& dispatcher
,
177 SocketConnection
& conn
,
178 SocketMessenger
& messenger
)
179 : Protocol(proto_t::v2
, dispatcher
, conn
),
180 messenger
{messenger
},
181 protocol_timer
{conn
},
182 tx_frame_asm(&session_stream_handlers
, false)
185 ProtocolV2::~ProtocolV2() {}
187 void ProtocolV2::start_connect(const entity_addr_t
& _peer_addr
,
188 const entity_type_t
& _peer_type
)
190 ceph_assert(state
== state_t::NONE
);
191 ceph_assert(!socket
);
192 conn
.peer_addr
= _peer_addr
;
193 conn
.target_addr
= _peer_addr
;
194 conn
.set_peer_type(_peer_type
);
195 conn
.policy
= messenger
.get_policy(_peer_type
);
196 client_cookie
= generate_client_cookie();
197 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
198 " policy(lossy={}, server={}, standby={}, resetcheck={})",
199 conn
, _peer_addr
, ceph_entity_type_name(_peer_type
), client_cookie
,
200 conn
.policy
.lossy
, conn
.policy
.server
,
201 conn
.policy
.standby
, conn
.policy
.resetcheck
);
202 messenger
.register_conn(
203 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
204 execute_connecting();
207 void ProtocolV2::start_accept(SocketRef
&& sock
,
208 const entity_addr_t
& _peer_addr
)
210 ceph_assert(state
== state_t::NONE
);
211 ceph_assert(!socket
);
212 // until we know better
213 conn
.target_addr
= _peer_addr
;
214 conn
.set_ephemeral_port(_peer_addr
.get_port(),
215 SocketConnection::side_t::acceptor
);
216 socket
= std::move(sock
);
217 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn
, _peer_addr
);
218 messenger
.accept_conn(
219 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
223 // TODO: Frame related implementations, probably to a separate class.
225 void ProtocolV2::enable_recording()
232 seastar::future
<Socket::tmp_buf
> ProtocolV2::read_exactly(size_t bytes
)
234 if (unlikely(record_io
)) {
235 return socket
->read_exactly(bytes
)
236 .then([this] (auto bl
) {
237 rxbuf
.append(buffer::create(bl
.share()));
241 return socket
->read_exactly(bytes
);
245 seastar::future
<bufferlist
> ProtocolV2::read(size_t bytes
)
247 if (unlikely(record_io
)) {
248 return socket
->read(bytes
)
249 .then([this] (auto buf
) {
254 return socket
->read(bytes
);
258 seastar::future
<> ProtocolV2::write(bufferlist
&& buf
)
260 if (unlikely(record_io
)) {
263 return socket
->write(std::move(buf
));
266 seastar::future
<> ProtocolV2::write_flush(bufferlist
&& buf
)
268 if (unlikely(record_io
)) {
271 return socket
->write_flush(std::move(buf
));
274 size_t ProtocolV2::get_current_msg_size() const
276 ceph_assert(!rx_segments_desc
.empty());
278 // we don't include SegmentIndex::Msg::HEADER.
279 for (__u8 idx
= 1; idx
< rx_segments_desc
.size(); idx
++) {
280 sum
+= rx_segments_desc
[idx
].length
;
285 seastar::future
<Tag
> ProtocolV2::read_main_preamble()
287 return read_exactly(sizeof(preamble_block_t
))
288 .then([this] (auto bl
) {
289 if (session_stream_handlers
.rx
) {
290 session_stream_handlers
.rx
->reset_rx_handler();
292 bl = session_stream_handlers.rx->authenticated_decrypt_update(
293 std::move(bl), segment_t::DEFAULT_ALIGNMENT);
297 // I expect ceph_le32 will make the endian conversion for me. Passing
298 // everything through ::Decode is unnecessary.
299 const auto& main_preamble
= \
300 *reinterpret_cast<const preamble_block_t
*>(bl
.get());
301 logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
302 conn
, bl
.size(), (int)main_preamble
.tag
,
303 (int)main_preamble
.num_segments
, main_preamble
.crc
);
305 // verify preamble's CRC before any further processing
306 const auto rx_crc
= ceph_crc32c(0,
307 reinterpret_cast<const unsigned char*>(&main_preamble
),
308 sizeof(main_preamble
) - sizeof(main_preamble
.crc
));
309 if (rx_crc
!= main_preamble
.crc
) {
310 logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
311 conn
, rx_crc
, main_preamble
.crc
);
315 // currently we do support between 1 and MAX_NUM_SEGMENTS segments
316 if (main_preamble
.num_segments
< 1 ||
317 main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
318 logger().warn("{} unsupported num_segments={}",
319 conn
, main_preamble
.num_segments
);
322 if (main_preamble
.num_segments
> MAX_NUM_SEGMENTS
) {
323 logger().warn("{} num_segments too much: {}",
324 conn
, main_preamble
.num_segments
);
328 rx_segments_desc
.clear();
329 rx_segments_data
.clear();
331 for (std::uint8_t idx
= 0; idx
< main_preamble
.num_segments
; idx
++) {
332 logger().trace("{} GOT frame segment: len={} align={}",
333 conn
, main_preamble
.segments
[idx
].length
,
334 main_preamble
.segments
[idx
].alignment
);
335 rx_segments_desc
.emplace_back(main_preamble
.segments
[idx
]);
338 INTERCEPT_FRAME(main_preamble
.tag
, bp_type_t::READ
);
339 return static_cast<Tag
>(main_preamble
.tag
);
343 seastar::future
<> ProtocolV2::read_frame_payload()
345 ceph_assert(!rx_segments_desc
.empty());
346 ceph_assert(rx_segments_data
.empty());
348 return seastar::do_until(
349 [this] { return rx_segments_desc
.size() == rx_segments_data
.size(); },
351 // description of current segment to read
352 const auto& cur_rx_desc
= rx_segments_desc
.at(rx_segments_data
.size());
353 // TODO: create aligned and contiguous buffer from socket
354 if (cur_rx_desc
.alignment
!= segment_t::DEFAULT_ALIGNMENT
) {
355 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
356 conn
, cur_rx_desc
.alignment
, rx_segments_data
.size());
358 // TODO: create aligned and contiguous buffer from socket
359 return read_exactly(cur_rx_desc
.length
)
360 .then([this] (auto tmp_bl
) {
361 logger().trace("{} RECV({}) frame segment[{}]",
362 conn
, tmp_bl
.size(), rx_segments_data
.size());
364 data
.append(buffer::create(std::move(tmp_bl
)));
365 if (session_stream_handlers
.rx
) {
369 rx_segments_data
.emplace_back(std::move(data
));
373 // TODO: get_epilogue_size()
374 ceph_assert(!session_stream_handlers
.rx
);
375 return read_exactly(sizeof(epilogue_crc_rev0_block_t
));
376 }).then([this] (auto bl
) {
377 logger().trace("{} RECV({}) frame epilogue", conn
, bl
.size());
380 if (session_stream_handlers
.rx
) {
384 auto& epilogue
= *reinterpret_cast<const epilogue_crc_rev0_block_t
*>(bl
.get());
385 for (std::uint8_t idx
= 0; idx
< rx_segments_data
.size(); idx
++) {
386 const __u32 expected_crc
= epilogue
.crc_values
[idx
];
387 const __u32 calculated_crc
= rx_segments_data
[idx
].crc32c(-1);
388 if (expected_crc
!= calculated_crc
) {
389 logger().warn("{} message integrity check failed at index {}:"
390 " expected_crc={} calculated_crc={}",
391 conn
, (unsigned int)idx
, expected_crc
, calculated_crc
);
394 logger().trace("{} message integrity check success at index {}: crc={}",
395 conn
, (unsigned int)idx
, expected_crc
);
398 late_flags
= epilogue
.late_flags
;
400 logger().trace("{} GOT frame epilogue: late_flags={}",
401 conn
, (unsigned)late_flags
);
403 // we do have a mechanism that allows transmitter to start sending message
404 // and abort after putting entire data field on wire. This will be used by
405 // the kernel client to avoid unnecessary buffering.
406 if (late_flags
& FRAME_LATE_FLAG_ABORTED
) {
414 seastar::future
<> ProtocolV2::write_frame(F
&frame
, bool flush
)
416 auto bl
= frame
.get_buffer(tx_frame_asm
);
417 const auto main_preamble
= reinterpret_cast<const preamble_block_t
*>(bl
.front().c_str());
418 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
419 conn
, bl
.length(), (int)main_preamble
->tag
,
420 (int)main_preamble
->num_segments
, main_preamble
->crc
);
421 INTERCEPT_FRAME(main_preamble
->tag
, bp_type_t::WRITE
);
423 return write_flush(std::move(bl
));
425 return write(std::move(bl
));
429 void ProtocolV2::trigger_state(state_t _state
, write_state_t _write_state
, bool reentrant
)
431 if (!reentrant
&& _state
== state
) {
432 logger().error("{} is not allowed to re-trigger state {}",
433 conn
, get_state_name(state
));
436 logger().debug("{} TRIGGER {}, was {}",
437 conn
, get_state_name(_state
), get_state_name(state
));
439 set_write_state(_write_state
);
442 void ProtocolV2::fault(bool backoff
, const char* func_name
, std::exception_ptr eptr
)
444 if (conn
.policy
.lossy
) {
445 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
446 conn
, func_name
, get_state_name(state
), eptr
);
449 } else if (conn
.policy
.server
||
450 (conn
.policy
.standby
&&
451 (!is_queued() && conn
.sent
.empty()))) {
452 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
453 conn
, func_name
, get_state_name(state
), eptr
);
455 } else if (backoff
) {
456 logger().info("{} {}: fault at {}, going to WAIT -- {}",
457 conn
, func_name
, get_state_name(state
), eptr
);
460 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
461 conn
, func_name
, get_state_name(state
), eptr
);
462 execute_connecting();
466 void ProtocolV2::dispatch_reset()
468 (void) seastar::with_gate(pending_dispatch
, [this] {
469 return dispatcher
.ms_handle_reset(
470 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
471 }).handle_exception([this] (std::exception_ptr eptr
) {
472 logger().error("{} ms_handle_reset caught exception: {}", conn
, eptr
);
473 ceph_abort("unexpected exception from ms_handle_reset()");
477 void ProtocolV2::reset_session(bool full
)
483 client_cookie
= generate_client_cookie();
486 (void) seastar::with_gate(pending_dispatch
, [this] {
487 return dispatcher
.ms_handle_remote_reset(
488 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
489 }).handle_exception([this] (std::exception_ptr eptr
) {
490 logger().error("{} ms_handle_remote_reset caught exception: {}", conn
, eptr
);
491 ceph_abort("unexpected exception from ms_handle_remote_reset()");
496 seastar::future
<entity_type_t
, entity_addr_t
> ProtocolV2::banner_exchange()
498 // 1. prepare and send banner
499 bufferlist banner_payload
;
500 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
501 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
504 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
505 auto len_payload
= static_cast<uint16_t>(banner_payload
.length());
506 encode(len_payload
, bl
, 0);
507 bl
.claim_append(banner_payload
);
508 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
509 "required={}, banner=\"{}\"",
510 conn
, bl
.length(), len_payload
,
511 CEPH_MSGR2_SUPPORTED_FEATURES
, CEPH_MSGR2_REQUIRED_FEATURES
,
512 CEPH_BANNER_V2_PREFIX
);
513 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE
, bp_type_t::WRITE
);
514 return write_flush(std::move(bl
)).then([this] {
515 // 2. read peer banner
516 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
517 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ
, bp_type_t::READ
);
518 return read_exactly(banner_len
); // or read exactly?
519 }).then([this] (auto bl
) {
520 // 3. process peer banner and read banner_payload
521 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
522 logger().debug("{} RECV({}) banner: \"{}\"",
524 std::string((const char*)bl
.get(), banner_prefix_len
));
526 if (memcmp(bl
.get(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
) != 0) {
527 if (memcmp(bl
.get(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
528 logger().warn("{} peer is using V1 protocol", conn
);
530 logger().warn("{} peer sent bad banner", conn
);
534 bl
.trim_front(banner_prefix_len
);
536 uint16_t payload_len
;
538 buf
.append(buffer::create(std::move(bl
)));
539 auto ti
= buf
.cbegin();
541 decode(payload_len
, ti
);
542 } catch (const buffer::error
&e
) {
543 logger().warn("{} decode banner payload len failed", conn
);
546 logger().debug("{} GOT banner: payload_len={}", conn
, payload_len
);
547 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ
, bp_type_t::READ
);
548 return read(payload_len
);
549 }).then([this] (bufferlist bl
) {
550 // 4. process peer banner_payload and send HelloFrame
551 auto p
= bl
.cbegin();
552 uint64_t peer_supported_features
;
553 uint64_t peer_required_features
;
555 decode(peer_supported_features
, p
);
556 decode(peer_required_features
, p
);
557 } catch (const buffer::error
&e
) {
558 logger().warn("{} decode banner payload failed", conn
);
561 logger().debug("{} RECV({}) banner features: supported={} required={}",
563 peer_supported_features
, peer_required_features
);
565 // Check feature bit compatibility
566 uint64_t supported_features
= CEPH_MSGR2_SUPPORTED_FEATURES
;
567 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
568 if ((required_features
& peer_supported_features
) != required_features
) {
569 logger().error("{} peer does not support all required features"
570 " required={} peer_supported={}",
571 conn
, required_features
, peer_supported_features
);
572 abort_in_close(*this);
574 if ((supported_features
& peer_required_features
) != peer_required_features
) {
575 logger().error("{} we do not support all peer required features"
576 " peer_required={} supported={}",
577 conn
, peer_required_features
, supported_features
);
578 abort_in_close(*this);
580 this->peer_required_features
= peer_required_features
;
581 if (this->peer_required_features
== 0) {
582 this->connection_features
= msgr2_required
;
585 auto hello
= HelloFrame::Encode(messenger
.get_mytype(),
587 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
588 conn
, ceph_entity_type_name(messenger
.get_mytype()),
590 return write_frame(hello
);
592 //5. read peer HelloFrame
593 return read_main_preamble();
594 }).then([this] (Tag tag
) {
595 expect_tag(Tag::HELLO
, tag
, conn
, __func__
);
596 return read_frame_payload();
598 // 6. process peer HelloFrame
599 auto hello
= HelloFrame::Decode(rx_segments_data
.back());
600 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
601 conn
, ceph_entity_type_name(hello
.entity_type()),
603 return seastar::make_ready_future
<entity_type_t
, entity_addr_t
>(
604 hello
.entity_type(), hello
.peer_addr());
610 seastar::future
<> ProtocolV2::handle_auth_reply()
612 return read_main_preamble()
613 .then([this] (Tag tag
) {
615 case Tag::AUTH_BAD_METHOD
:
616 return read_frame_payload().then([this] {
617 // handle_auth_bad_method() logic
618 auto bad_method
= AuthBadMethodFrame::Decode(rx_segments_data
.back());
619 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
620 "allowed_methods={}, allowed_modes={}",
621 conn
, bad_method
.method(), cpp_strerror(bad_method
.result()),
622 bad_method
.allowed_methods(), bad_method
.allowed_modes());
623 ceph_assert(messenger
.get_auth_client());
624 int r
= messenger
.get_auth_client()->handle_auth_bad_method(
625 conn
.shared_from_this(), auth_meta
,
626 bad_method
.method(), bad_method
.result(),
627 bad_method
.allowed_methods(), bad_method
.allowed_modes());
629 logger().warn("{} auth_client handle_auth_bad_method returned {}",
633 return client_auth(bad_method
.allowed_methods());
635 case Tag::AUTH_REPLY_MORE
:
636 return read_frame_payload().then([this] {
637 // handle_auth_reply_more() logic
638 auto auth_more
= AuthReplyMoreFrame::Decode(rx_segments_data
.back());
639 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
640 conn
, auth_more
.auth_payload().length());
641 ceph_assert(messenger
.get_auth_client());
642 // let execute_connecting() take care of the thrown exception
643 auto reply
= messenger
.get_auth_client()->handle_auth_reply_more(
644 conn
.shared_from_this(), auth_meta
, auth_more
.auth_payload());
645 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
646 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
647 conn
, reply
.length());
648 return write_frame(more_reply
);
650 return handle_auth_reply();
653 return read_frame_payload().then([this] {
654 // handle_auth_done() logic
655 auto auth_done
= AuthDoneFrame::Decode(rx_segments_data
.back());
656 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
657 conn
, auth_done
.global_id(),
658 ceph_con_mode_name(auth_done
.con_mode()),
659 auth_done
.auth_payload().length());
660 ceph_assert(messenger
.get_auth_client());
661 int r
= messenger
.get_auth_client()->handle_auth_done(
662 conn
.shared_from_this(), auth_meta
,
663 auth_done
.global_id(),
664 auth_done
.con_mode(),
665 auth_done
.auth_payload());
667 logger().warn("{} auth_client handle_auth_done returned {}", conn
, r
);
670 auth_meta
->con_mode
= auth_done
.con_mode();
672 ceph_assert(!auth_meta
->is_mode_secure());
673 session_stream_handlers
= { nullptr, nullptr };
674 return finish_auth();
677 unexpected_tag(tag
, conn
, __func__
);
678 return seastar::now();
684 seastar::future
<> ProtocolV2::client_auth(std::vector
<uint32_t> &allowed_methods
)
686 // send_auth_request() logic
687 ceph_assert(messenger
.get_auth_client());
690 auto [auth_method
, preferred_modes
, bl
] =
691 messenger
.get_auth_client()->get_auth_request(conn
.shared_from_this(), auth_meta
);
692 auth_meta
->auth_method
= auth_method
;
693 auto frame
= AuthRequestFrame::Encode(auth_method
, preferred_modes
, bl
);
694 logger().debug("{} WRITE AuthRequestFrame: method={},"
695 " preferred_modes={}, payload_len={}",
696 conn
, auth_method
, preferred_modes
, bl
.length());
697 return write_frame(frame
).then([this] {
698 return handle_auth_reply();
700 } catch (const crimson::auth::error
& e
) {
701 logger().error("{} get_initial_auth_request returned {}", conn
, e
);
703 abort_in_close(*this);
704 return seastar::now();
708 seastar::future
<ProtocolV2::next_step_t
>
709 ProtocolV2::process_wait()
711 return read_frame_payload().then([this] {
712 // handle_wait() logic
713 logger().debug("{} GOT WaitFrame", conn
);
714 WaitFrame::Decode(rx_segments_data
.back());
715 return next_step_t::wait
;
719 seastar::future
<ProtocolV2::next_step_t
>
720 ProtocolV2::client_connect()
722 // send_client_ident() logic
724 if (conn
.policy
.lossy
) {
725 flags
|= CEPH_MSG_CONNECT_LOSSY
;
728 auto client_ident
= ClientIdentFrame::Encode(
729 messenger
.get_myaddrs(),
731 messenger
.get_myname().num(),
733 conn
.policy
.features_supported
,
734 conn
.policy
.features_required
| msgr2_required
, flags
,
737 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
738 " gs={}, features_supported={}, features_required={},"
739 " flags={}, cookie={}",
740 conn
, messenger
.get_myaddrs(), conn
.target_addr
,
741 messenger
.get_myname().num(), global_seq
,
742 conn
.policy
.features_supported
,
743 conn
.policy
.features_required
| msgr2_required
,
744 flags
, client_cookie
);
745 return write_frame(client_ident
).then([this] {
746 return read_main_preamble();
747 }).then([this] (Tag tag
) {
749 case Tag::IDENT_MISSING_FEATURES
:
750 return read_frame_payload().then([this] {
751 // handle_ident_missing_features() logic
752 auto ident_missing
= IdentMissingFeaturesFrame::Decode(rx_segments_data
.back());
753 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
754 " (client does not support all server features)",
755 conn
, ident_missing
.features());
757 return next_step_t::none
;
760 return process_wait();
761 case Tag::SERVER_IDENT
:
762 return read_frame_payload().then([this] {
763 // handle_server_ident() logic
765 auto server_ident
= ServerIdentFrame::Decode(rx_segments_data
.back());
766 logger().debug("{} GOT ServerIdentFrame:"
767 " addrs={}, gid={}, gs={},"
768 " features_supported={}, features_required={},"
769 " flags={}, cookie={}",
771 server_ident
.addrs(), server_ident
.gid(),
772 server_ident
.global_seq(),
773 server_ident
.supported_features(),
774 server_ident
.required_features(),
775 server_ident
.flags(), server_ident
.cookie());
777 // is this who we intended to talk to?
778 // be a bit forgiving here, since we may be connecting based on addresses parsed out
779 // of mon_host or something.
780 if (!server_ident
.addrs().contains(conn
.target_addr
)) {
781 logger().warn("{} peer identifies as {}, does not include {}",
782 conn
, server_ident
.addrs(), conn
.target_addr
);
783 throw std::system_error(
784 make_error_code(crimson::net::error::bad_peer_address
));
787 server_cookie
= server_ident
.cookie();
789 // TODO: change peer_addr to entity_addrvec_t
790 if (server_ident
.addrs().front() != conn
.peer_addr
) {
791 logger().warn("{} peer advertises as {}, does not match {}",
792 conn
, server_ident
.addrs(), conn
.peer_addr
);
793 throw std::system_error(
794 make_error_code(crimson::net::error::bad_peer_address
));
796 conn
.set_peer_id(server_ident
.gid());
797 conn
.set_features(server_ident
.supported_features() &
798 conn
.policy
.features_supported
);
799 peer_global_seq
= server_ident
.global_seq();
801 bool lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
802 if (lossy
!= conn
.policy
.lossy
) {
803 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn
, lossy
);
804 conn
.policy
.lossy
= lossy
;
806 if (lossy
&& (connect_seq
!= 0 || server_cookie
!= 0)) {
807 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
808 conn
, connect_seq
, server_cookie
);
813 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
816 unexpected_tag(tag
, conn
, "post_client_connect");
817 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
823 seastar::future
<ProtocolV2::next_step_t
>
824 ProtocolV2::client_reconnect()
826 // send_reconnect() logic
827 auto reconnect
= ReconnectFrame::Encode(messenger
.get_myaddrs(),
833 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
834 " server_cookie={}, gs={}, cs={}, msg_seq={}",
835 conn
, messenger
.get_myaddrs(),
836 client_cookie
, server_cookie
,
837 global_seq
, connect_seq
, conn
.in_seq
);
838 return write_frame(reconnect
).then([this] {
839 return read_main_preamble();
840 }).then([this] (Tag tag
) {
842 case Tag::SESSION_RETRY_GLOBAL
:
843 return read_frame_payload().then([this] {
844 // handle_session_retry_global() logic
845 auto retry
= RetryGlobalFrame::Decode(rx_segments_data
.back());
846 logger().warn("{} GOT RetryGlobalFrame: gs={}",
847 conn
, retry
.global_seq());
848 return messenger
.get_global_seq(retry
.global_seq()).then([this] (auto gs
) {
850 logger().warn("{} UPDATE: gs={} for retry global", conn
, global_seq
);
851 return client_reconnect();
854 case Tag::SESSION_RETRY
:
855 return read_frame_payload().then([this] {
856 // handle_session_retry() logic
857 auto retry
= RetryFrame::Decode(rx_segments_data
.back());
858 logger().warn("{} GOT RetryFrame: cs={}",
859 conn
, retry
.connect_seq());
860 connect_seq
= retry
.connect_seq() + 1;
861 logger().warn("{} UPDATE: cs={}", conn
, connect_seq
);
862 return client_reconnect();
864 case Tag::SESSION_RESET
:
865 return read_frame_payload().then([this] {
866 // handle_session_reset() logic
867 auto reset
= ResetFrame::Decode(rx_segments_data
.back());
868 logger().warn("{} GOT ResetFrame: full={}", conn
, reset
.full());
869 reset_session(reset
.full());
870 return client_connect();
873 return process_wait();
874 case Tag::SESSION_RECONNECT_OK
:
875 return read_frame_payload().then([this] {
876 // handle_reconnect_ok() logic
877 auto reconnect_ok
= ReconnectOkFrame::Decode(rx_segments_data
.back());
878 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
879 conn
, reconnect_ok
.msg_seq());
880 requeue_up_to(reconnect_ok
.msg_seq());
881 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
884 unexpected_tag(tag
, conn
, "post_client_reconnect");
885 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
891 void ProtocolV2::execute_connecting()
893 trigger_state(state_t::CONNECTING
, write_state_t::delay
, true);
897 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
898 // we don't know my socket_port yet
899 conn
.set_ephemeral_port(0, SocketConnection::side_t::none
);
900 return messenger
.get_global_seq().then([this] (auto gs
) {
902 assert(client_cookie
!= 0);
903 if (!conn
.policy
.lossy
&& server_cookie
!= 0) {
905 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
906 conn
, global_seq
, connect_seq
);
907 } else { // conn.policy.lossy || server_cookie == 0
908 assert(connect_seq
== 0);
909 assert(server_cookie
== 0);
910 logger().debug("{} UPDATE: gs={} for connect", conn
, global_seq
);
913 return wait_write_exit();
915 if (unlikely(state
!= state_t::CONNECTING
)) {
916 logger().debug("{} triggered {} before Socket::connect()",
917 conn
, get_state_name(state
));
921 (void) with_gate(pending_dispatch
, [sock
= std::move(socket
)] () mutable {
922 return sock
->close().then([sock
= std::move(sock
)] {});
925 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING
);
926 return Socket::connect(conn
.peer_addr
);
927 }).then([this](SocketRef sock
) {
928 logger().debug("{} socket connected", conn
);
929 if (unlikely(state
!= state_t::CONNECTING
)) {
930 logger().debug("{} triggered {} during Socket::connect()",
931 conn
, get_state_name(state
));
932 return sock
->close().then([sock
= std::move(sock
)] {
936 socket
= std::move(sock
);
937 return seastar::now();
939 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
940 session_stream_handlers
= { nullptr, nullptr };
942 return banner_exchange();
943 }).then([this] (entity_type_t _peer_type
,
944 entity_addr_t _my_addr_from_peer
) {
945 if (conn
.get_peer_type() != _peer_type
) {
946 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
947 conn
, ceph_entity_type_name(conn
.get_peer_type()),
948 ceph_entity_type_name(_peer_type
));
950 abort_in_close(*this);
952 conn
.set_ephemeral_port(_my_addr_from_peer
.get_port(),
953 SocketConnection::side_t::connector
);
954 if (unlikely(_my_addr_from_peer
.is_legacy())) {
955 logger().warn("{} peer sent a legacy address for me: {}",
956 conn
, _my_addr_from_peer
);
957 throw std::system_error(
958 make_error_code(crimson::net::error::bad_peer_address
));
960 _my_addr_from_peer
.set_type(entity_addr_t::TYPE_MSGR2
);
961 return messenger
.learned_addr(_my_addr_from_peer
, conn
);
963 return client_auth();
965 if (server_cookie
== 0) {
966 ceph_assert(connect_seq
== 0);
967 return client_connect();
969 ceph_assert(connect_seq
> 0);
970 return client_reconnect();
972 }).then([this] (next_step_t next
) {
973 if (unlikely(state
!= state_t::CONNECTING
)) {
974 logger().debug("{} triggered {} at the end of execute_connecting()",
975 conn
, get_state_name(state
));
979 case next_step_t::ready
: {
980 (void) seastar::with_gate(pending_dispatch
, [this] {
981 return dispatcher
.ms_handle_connect(
982 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
983 }).handle_exception([this] (std::exception_ptr eptr
) {
984 logger().error("{} ms_handle_connect caught exception: {}", conn
, eptr
);
985 ceph_abort("unexpected exception from ms_handle_connect()");
987 logger().info("{} connected:"
988 " gs={}, pgs={}, cs={}, client_cookie={},"
989 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
990 conn
, global_seq
, peer_global_seq
, connect_seq
,
991 client_cookie
, server_cookie
, conn
.in_seq
,
992 conn
.out_seq
, conn
.out_q
.size());
996 case next_step_t::wait
: {
997 logger().info("{} execute_connecting(): going to WAIT", conn
);
1002 ceph_abort("impossible next step");
1005 }).handle_exception([this] (std::exception_ptr eptr
) {
1006 if (state
!= state_t::CONNECTING
) {
1007 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
1008 conn
, get_state_name(state
), eptr
);
1009 assert(state
== state_t::CLOSING
||
1010 state
== state_t::REPLACING
);
1014 if (conn
.policy
.server
||
1015 (conn
.policy
.standby
&&
1016 (!is_queued() && conn
.sent
.empty()))) {
1017 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
1018 " going to STANDBY -- {}",
1019 conn
, get_state_name(state
), eptr
);
1022 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
1023 conn
, get_state_name(state
), eptr
);
1024 execute_wait(false);
1032 seastar::future
<> ProtocolV2::_auth_bad_method(int r
)
1034 // _auth_bad_method() logic
1036 auto [allowed_methods
, allowed_modes
] =
1037 messenger
.get_auth_server()->get_supported_auth_methods(conn
.get_peer_type());
1038 auto bad_method
= AuthBadMethodFrame::Encode(
1039 auth_meta
->auth_method
, r
, allowed_methods
, allowed_modes
);
1040 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
1041 "allowed_methods={}, allowed_modes={})",
1042 conn
, auth_meta
->auth_method
, cpp_strerror(r
),
1043 allowed_methods
, allowed_modes
);
1044 return write_frame(bad_method
).then([this] {
1045 return server_auth();
1049 seastar::future
<> ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
1051 // _handle_auth_request() logic
1052 ceph_assert(messenger
.get_auth_server());
1054 int r
= messenger
.get_auth_server()->handle_auth_request(
1055 conn
.shared_from_this(), auth_meta
,
1056 more
, auth_meta
->auth_method
, auth_payload
,
1061 auto auth_done
= AuthDoneFrame::Encode(
1062 conn
.peer_global_id
, auth_meta
->con_mode
, reply
);
1063 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
1064 conn
, conn
.peer_global_id
,
1065 ceph_con_mode_name(auth_meta
->con_mode
), reply
.length());
1066 return write_frame(auth_done
).then([this] {
1067 ceph_assert(auth_meta
);
1069 ceph_assert(!auth_meta
->is_mode_secure());
1070 session_stream_handlers
= { nullptr, nullptr };
1071 return finish_auth();
1076 auto more
= AuthReplyMoreFrame::Encode(reply
);
1077 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
1078 conn
, reply
.length());
1079 return write_frame(more
).then([this] {
1080 return read_main_preamble();
1081 }).then([this] (Tag tag
) {
1082 expect_tag(Tag::AUTH_REQUEST_MORE
, tag
, conn
, __func__
);
1083 return read_frame_payload();
1085 auto auth_more
= AuthRequestMoreFrame::Decode(rx_segments_data
.back());
1086 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
1087 conn
, auth_more
.auth_payload().length());
1088 return _handle_auth_request(auth_more
.auth_payload(), true);
1092 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn
);
1094 return seastar::now();
1097 logger().warn("{} auth_server handle_auth_request returned {}", conn
, r
);
1098 return _auth_bad_method(r
);
1103 seastar::future
<> ProtocolV2::server_auth()
1105 return read_main_preamble()
1106 .then([this] (Tag tag
) {
1107 expect_tag(Tag::AUTH_REQUEST
, tag
, conn
, __func__
);
1108 return read_frame_payload();
1110 // handle_auth_request() logic
1111 auto request
= AuthRequestFrame::Decode(rx_segments_data
.back());
1112 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1114 conn
, request
.method(), request
.preferred_modes(),
1115 request
.auth_payload().length());
1116 auth_meta
->auth_method
= request
.method();
1117 auth_meta
->con_mode
= messenger
.get_auth_server()->pick_con_mode(
1118 conn
.get_peer_type(), auth_meta
->auth_method
,
1119 request
.preferred_modes());
1120 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
1121 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn
);
1122 return _auth_bad_method(-EOPNOTSUPP
);
1124 return _handle_auth_request(request
.auth_payload(), false);
1128 seastar::future
<ProtocolV2::next_step_t
>
1129 ProtocolV2::send_wait()
1131 auto wait
= WaitFrame::Encode();
1132 logger().debug("{} WRITE WaitFrame", conn
);
1133 return write_frame(wait
).then([] {
1134 return next_step_t::wait
;
1138 seastar::future
<ProtocolV2::next_step_t
>
1139 ProtocolV2::reuse_connection(
1140 ProtocolV2
* existing_proto
, bool do_reset
,
1141 bool reconnect
, uint64_t conn_seq
, uint64_t msg_seq
)
1143 existing_proto
->trigger_replacing(reconnect
,
1146 std::move(auth_meta
),
1147 std::move(session_stream_handlers
),
1150 conn
.get_peer_name(),
1151 connection_features
,
1154 #ifdef UNIT_TESTS_BUILT
1155 if (conn
.interceptor
) {
1156 conn
.interceptor
->register_conn_replaced(conn
);
1159 // close this connection because all the necessary information is delivered
1160 // to the exisiting connection, and jump to error handling code to abort the
1162 abort_in_close(*this);
1163 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1166 seastar::future
<ProtocolV2::next_step_t
>
1167 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn
)
1169 // handle_existing_connection() logic
1170 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1171 existing_conn
->protocol
.get());
1172 ceph_assert(existing_proto
);
1173 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1174 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1175 conn
, global_seq
, peer_global_seq
, connect_seq
,
1176 client_cookie
, server_cookie
,
1177 existing_conn
, get_state_name(existing_proto
->state
),
1178 existing_proto
->global_seq
,
1179 existing_proto
->peer_global_seq
,
1180 existing_proto
->connect_seq
,
1181 existing_proto
->client_cookie
,
1182 existing_proto
->server_cookie
);
1184 if (existing_proto
->state
== state_t::REPLACING
) {
1185 logger().warn("{} server_connect: racing replace happened while"
1186 " replacing existing connection {}, send wait.",
1187 conn
, *existing_conn
);
1191 if (existing_proto
->peer_global_seq
> peer_global_seq
) {
1192 logger().warn("{} server_connect:"
1193 " this is a stale connection, because peer_global_seq({})"
1194 " < existing->peer_global_seq({}), close this connection"
1195 " in favor of existing connection {}",
1196 conn
, peer_global_seq
,
1197 existing_proto
->peer_global_seq
, *existing_conn
);
1201 if (existing_conn
->policy
.lossy
) {
1202 // existing connection can be thrown out in favor of this one
1203 logger().warn("{} server_connect:"
1204 " existing connection {} is a lossy channel. Close existing in favor of"
1205 " this connection", conn
, *existing_conn
);
1206 existing_proto
->dispatch_reset();
1207 (void) existing_proto
->close();
1209 if (unlikely(state
!= state_t::ACCEPTING
)) {
1210 logger().debug("{} triggered {} in execute_accepting()",
1211 conn
, get_state_name(state
));
1214 execute_establishing();
1215 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1218 if (existing_proto
->server_cookie
!= 0) {
1219 if (existing_proto
->client_cookie
!= client_cookie
) {
1220 // Found previous session
1221 // peer has reset and we're going to reuse the existing connection
1222 // by replacing the socket
1223 logger().warn("{} server_connect:"
1224 " found new session (cs={})"
1225 " when existing {} is with stale session (cs={}, ss={}),"
1226 " peer must have reset",
1227 conn
, client_cookie
,
1228 *existing_conn
, existing_proto
->client_cookie
,
1229 existing_proto
->server_cookie
);
1230 return reuse_connection(existing_proto
, conn
.policy
.resetcheck
);
1232 // session establishment interrupted between client_ident and server_ident,
1234 logger().warn("{} server_connect: found client session with existing {}"
1235 " matched (cs={}, ss={}), continuing session establishment",
1236 conn
, *existing_conn
, client_cookie
, existing_proto
->server_cookie
);
1237 return reuse_connection(existing_proto
);
1240 // Looks like a connection race: server and client are both connecting to
1241 // each other at the same time.
1242 if (existing_proto
->client_cookie
!= client_cookie
) {
1243 if (existing_conn
->peer_wins()) {
1244 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1245 " and win, reusing existing {}",
1246 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1247 return reuse_connection(existing_proto
);
1249 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1250 " and lose to existing {}, ask client to wait",
1251 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1252 return existing_conn
->keepalive().then([this] {
1257 logger().warn("{} server_connect: found client session with existing {}"
1258 " matched (cs={}, ss={}), continuing session establishment",
1259 conn
, *existing_conn
, client_cookie
, existing_proto
->server_cookie
);
1260 return reuse_connection(existing_proto
);
1265 seastar::future
<ProtocolV2::next_step_t
>
1266 ProtocolV2::server_connect()
1268 return read_frame_payload().then([this] {
1269 // handle_client_ident() logic
1270 auto client_ident
= ClientIdentFrame::Decode(rx_segments_data
.back());
1271 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1272 " gid={}, gs={}, features_supported={},"
1273 " features_required={}, flags={}, cookie={}",
1274 conn
, client_ident
.addrs(), client_ident
.target_addr(),
1275 client_ident
.gid(), client_ident
.global_seq(),
1276 client_ident
.supported_features(),
1277 client_ident
.required_features(),
1278 client_ident
.flags(), client_ident
.cookie());
1280 if (client_ident
.addrs().empty() ||
1281 client_ident
.addrs().front() == entity_addr_t()) {
1282 logger().warn("{} oops, client_ident.addrs() is empty", conn
);
1283 throw std::system_error(
1284 make_error_code(crimson::net::error::bad_peer_address
));
1286 if (!messenger
.get_myaddrs().contains(client_ident
.target_addr())) {
1287 logger().warn("{} peer is trying to reach {} which is not us ({})",
1288 conn
, client_ident
.target_addr(), messenger
.get_myaddrs());
1289 throw std::system_error(
1290 make_error_code(crimson::net::error::bad_peer_address
));
1292 // TODO: change peer_addr to entity_addrvec_t
1293 entity_addr_t paddr
= client_ident
.addrs().front();
1294 if ((paddr
.is_msgr2() || paddr
.is_any()) &&
1295 paddr
.is_same_host(conn
.target_addr
)) {
1298 logger().warn("{} peer's address {} is not v2 or not the same host with {}",
1299 conn
, paddr
, conn
.target_addr
);
1300 throw std::system_error(
1301 make_error_code(crimson::net::error::bad_peer_address
));
1303 conn
.peer_addr
= paddr
;
1304 logger().debug("{} UPDATE: peer_addr={}", conn
, conn
.peer_addr
);
1305 conn
.target_addr
= conn
.peer_addr
;
1306 if (!conn
.policy
.lossy
&& !conn
.policy
.server
&& conn
.target_addr
.get_port() <= 0) {
1307 logger().warn("{} we don't know how to reconnect to peer {}",
1308 conn
, conn
.target_addr
);
1309 throw std::system_error(
1310 make_error_code(crimson::net::error::bad_peer_address
));
1313 conn
.set_peer_id(client_ident
.gid());
1314 client_cookie
= client_ident
.cookie();
1316 uint64_t feat_missing
=
1317 (conn
.policy
.features_required
| msgr2_required
) &
1318 ~(uint64_t)client_ident
.supported_features();
1320 auto ident_missing_features
= IdentMissingFeaturesFrame::Encode(feat_missing
);
1321 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1322 conn
, feat_missing
);
1323 return write_frame(ident_missing_features
).then([] {
1324 return next_step_t::wait
;
1327 connection_features
=
1328 client_ident
.supported_features() & conn
.policy
.features_supported
;
1329 logger().debug("{} UPDATE: connection_features={}", conn
, connection_features
);
1331 peer_global_seq
= client_ident
.global_seq();
1333 // Looks good so far, let's check if there is already an existing connection
1336 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1338 if (existing_conn
) {
1339 if (existing_conn
->protocol
->proto_type
!= proto_t::v2
) {
1340 logger().warn("{} existing connection {} proto version is {}, close existing",
1341 conn
, *existing_conn
,
1342 static_cast<int>(existing_conn
->protocol
->proto_type
));
1343 // should unregister the existing from msgr atomically
1344 (void) existing_conn
->close();
1346 return handle_existing_connection(existing_conn
);
1350 if (unlikely(state
!= state_t::ACCEPTING
)) {
1351 logger().debug("{} triggered {} in execute_accepting()",
1352 conn
, get_state_name(state
));
1355 execute_establishing();
1356 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1360 seastar::future
<ProtocolV2::next_step_t
>
1361 ProtocolV2::read_reconnect()
1363 return read_main_preamble()
1364 .then([this] (Tag tag
) {
1365 expect_tag(Tag::SESSION_RECONNECT
, tag
, conn
, "read_reconnect");
1366 return server_reconnect();
1370 seastar::future
<ProtocolV2::next_step_t
>
1371 ProtocolV2::send_retry(uint64_t connect_seq
)
1373 auto retry
= RetryFrame::Encode(connect_seq
);
1374 logger().warn("{} WRITE RetryFrame: cs={}", conn
, connect_seq
);
1375 return write_frame(retry
).then([this] {
1376 return read_reconnect();
1380 seastar::future
<ProtocolV2::next_step_t
>
1381 ProtocolV2::send_retry_global(uint64_t global_seq
)
1383 auto retry
= RetryGlobalFrame::Encode(global_seq
);
1384 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn
, global_seq
);
1385 return write_frame(retry
).then([this] {
1386 return read_reconnect();
1390 seastar::future
<ProtocolV2::next_step_t
>
1391 ProtocolV2::send_reset(bool full
)
1393 auto reset
= ResetFrame::Encode(full
);
1394 logger().warn("{} WRITE ResetFrame: full={}", conn
, full
);
1395 return write_frame(reset
).then([this] {
1396 return read_main_preamble();
1397 }).then([this] (Tag tag
) {
1398 expect_tag(Tag::CLIENT_IDENT
, tag
, conn
, "post_send_reset");
1399 return server_connect();
1403 seastar::future
<ProtocolV2::next_step_t
>
1404 ProtocolV2::server_reconnect()
1406 return read_frame_payload().then([this] {
1407 // handle_reconnect() logic
1408 auto reconnect
= ReconnectFrame::Decode(rx_segments_data
.back());
1410 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1411 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1412 conn
, reconnect
.addrs(),
1413 reconnect
.client_cookie(), reconnect
.server_cookie(),
1414 reconnect
.global_seq(), reconnect
.connect_seq(),
1415 reconnect
.msg_seq());
1417 // can peer_addrs be changed on-the-fly?
1418 // TODO: change peer_addr to entity_addrvec_t
1419 entity_addr_t paddr
= reconnect
.addrs().front();
1420 if (paddr
.is_msgr2() || paddr
.is_any()) {
1423 logger().warn("{} peer's address {} is not v2", conn
, paddr
);
1424 throw std::system_error(
1425 make_error_code(crimson::net::error::bad_peer_address
));
1427 if (conn
.peer_addr
== entity_addr_t()) {
1428 conn
.peer_addr
= paddr
;
1429 } else if (conn
.peer_addr
!= paddr
) {
1430 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1431 " reconnect failed",
1432 conn
, paddr
, conn
.peer_addr
);
1433 throw std::system_error(
1434 make_error_code(crimson::net::error::bad_peer_address
));
1436 peer_global_seq
= reconnect
.global_seq();
1438 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1440 if (!existing_conn
) {
1441 // there is no existing connection therefore cannot reconnect to previous
1443 logger().warn("{} server_reconnect: no existing connection from address {},"
1444 " reseting client", conn
, conn
.peer_addr
);
1445 return send_reset(true);
1448 if (existing_conn
->protocol
->proto_type
!= proto_t::v2
) {
1449 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1450 "close existing and reset client.",
1451 conn
, *existing_conn
,
1452 static_cast<int>(existing_conn
->protocol
->proto_type
));
1453 (void) existing_conn
->close();
1454 return send_reset(true);
1457 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1458 existing_conn
->protocol
.get());
1459 ceph_assert(existing_proto
);
1460 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1461 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1462 conn
, global_seq
, peer_global_seq
, reconnect
.connect_seq(),
1463 reconnect
.client_cookie(), reconnect
.server_cookie(),
1465 get_state_name(existing_proto
->state
),
1466 existing_proto
->global_seq
,
1467 existing_proto
->peer_global_seq
,
1468 existing_proto
->connect_seq
,
1469 existing_proto
->client_cookie
,
1470 existing_proto
->server_cookie
);
1472 if (existing_proto
->state
== state_t::REPLACING
) {
1473 logger().warn("{} server_reconnect: racing replace happened while "
1474 " replacing existing connection {}, retry global.",
1475 conn
, *existing_conn
);
1476 return send_retry_global(existing_proto
->peer_global_seq
);
1479 if (existing_proto
->client_cookie
!= reconnect
.client_cookie()) {
1480 logger().warn("{} server_reconnect:"
1481 " client_cookie mismatch with existing connection {},"
1482 " cc={} rcc={}. I must have reset, reseting client.",
1483 conn
, *existing_conn
,
1484 existing_proto
->client_cookie
, reconnect
.client_cookie());
1485 return send_reset(conn
.policy
.resetcheck
);
1486 } else if (existing_proto
->server_cookie
== 0) {
1487 // this happens when:
1488 // - a connects to b
1489 // - a sends client_ident
1490 // - b gets client_ident, sends server_ident and sets cookie X
1491 // - connection fault
1492 // - b reconnects to a with cookie X, connect_seq=1
1493 // - a has cookie==0
1494 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1495 " server_ident with existing connection {}."
1496 " Asking peer to resume session establishment",
1497 conn
, existing_proto
->client_cookie
, *existing_conn
);
1498 return send_reset(false);
1501 if (existing_proto
->peer_global_seq
> reconnect
.global_seq()) {
1502 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1503 " with existing connection {},"
1504 " ask client to retry global",
1505 conn
, existing_proto
->peer_global_seq
,
1506 reconnect
.global_seq(), *existing_conn
);
1507 return send_retry_global(existing_proto
->peer_global_seq
);
1510 if (existing_proto
->connect_seq
> reconnect
.connect_seq()) {
1511 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1512 " with existing connection {}, ask client to retry",
1513 conn
, reconnect
.connect_seq(),
1514 existing_proto
->connect_seq
, *existing_conn
);
1515 return send_retry(existing_proto
->connect_seq
);
1516 } else if (existing_proto
->connect_seq
== reconnect
.connect_seq()) {
1517 // reconnect race: both peers are sending reconnect messages
1518 if (existing_conn
->peer_wins()) {
1519 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1520 " and win, reusing existing {}",
1521 conn
, reconnect
.connect_seq(), *existing_conn
);
1522 return reuse_connection(
1523 existing_proto
, false,
1524 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1526 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1527 " and lose to existing {}, ask client to wait",
1528 conn
, reconnect
.connect_seq(), *existing_conn
);
1531 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1532 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1533 " reusing existing {}",
1534 conn
, existing_proto
->connect_seq
,
1535 reconnect
.connect_seq(), *existing_conn
);
1536 return reuse_connection(
1537 existing_proto
, false,
1538 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1543 void ProtocolV2::execute_accepting()
1545 trigger_state(state_t::ACCEPTING
, write_state_t::none
, false);
1546 (void) seastar::with_gate(pending_dispatch
, [this] {
1547 return seastar::futurize_apply([this] {
1548 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED
);
1549 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
1550 session_stream_handlers
= { nullptr, nullptr };
1552 return banner_exchange();
1553 }).then([this] (entity_type_t _peer_type
,
1554 entity_addr_t _my_addr_from_peer
) {
1555 ceph_assert(conn
.get_peer_type() == 0);
1556 conn
.set_peer_type(_peer_type
);
1558 conn
.policy
= messenger
.get_policy(_peer_type
);
1559 logger().info("{} UPDATE: peer_type={},"
1560 " policy(lossy={} server={} standby={} resetcheck={})",
1561 conn
, ceph_entity_type_name(_peer_type
),
1562 conn
.policy
.lossy
, conn
.policy
.server
,
1563 conn
.policy
.standby
, conn
.policy
.resetcheck
);
1564 if (messenger
.get_myaddr().get_port() != _my_addr_from_peer
.get_port() ||
1565 messenger
.get_myaddr().get_nonce() != _my_addr_from_peer
.get_nonce()) {
1566 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1567 conn
, _my_addr_from_peer
, messenger
.get_myaddr());
1568 throw std::system_error(
1569 make_error_code(crimson::net::error::bad_peer_address
));
1571 return messenger
.learned_addr(_my_addr_from_peer
, conn
);
1573 return server_auth();
1575 return read_main_preamble();
1576 }).then([this] (Tag tag
) {
1578 case Tag::CLIENT_IDENT
:
1579 return server_connect();
1580 case Tag::SESSION_RECONNECT
:
1581 return server_reconnect();
1583 unexpected_tag(tag
, conn
, "post_server_auth");
1584 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1587 }).then([this] (next_step_t next
) {
1589 case next_step_t::ready
:
1590 assert(state
!= state_t::ACCEPTING
);
1592 case next_step_t::wait
:
1593 if (unlikely(state
!= state_t::ACCEPTING
)) {
1594 logger().debug("{} triggered {} at the end of execute_accepting()",
1595 conn
, get_state_name(state
));
1598 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn
);
1599 execute_server_wait();
1602 ceph_abort("impossible next step");
1604 }).handle_exception([this] (std::exception_ptr eptr
) {
1605 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1606 conn
, get_state_name(state
), eptr
);
1612 // CONNECTING or ACCEPTING state
1614 seastar::future
<> ProtocolV2::finish_auth()
1616 ceph_assert(auth_meta
);
1618 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1619 auth_meta
->session_key
.hmac_sha256(nullptr, rxbuf
);
1620 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1621 ceph_assert(record_io
);
1624 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn
, sig
);
1625 return write_frame(sig_frame
).then([this] {
1626 return read_main_preamble();
1627 }).then([this] (Tag tag
) {
1628 expect_tag(Tag::AUTH_SIGNATURE
, tag
, conn
, "post_finish_auth");
1629 return read_frame_payload();
1631 // handle_auth_signature() logic
1632 auto sig_frame
= AuthSignatureFrame::Decode(rx_segments_data
.back());
1633 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn
, sig_frame
.signature());
1635 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
1636 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(nullptr, txbuf
);
1637 if (sig_frame
.signature() != actual_tx_sig
) {
1638 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1639 " sig_frame.signature()={}",
1640 conn
, actual_tx_sig
, sig_frame
.signature());
1649 void ProtocolV2::execute_establishing() {
1650 trigger_state(state_t::ESTABLISHING
, write_state_t::delay
, false);
1651 (void) seastar::with_gate(pending_dispatch
, [this] {
1652 return dispatcher
.ms_handle_accept(
1653 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1654 }).handle_exception([this] (std::exception_ptr eptr
) {
1655 logger().error("{} ms_handle_accept caught exception: {}", conn
, eptr
);
1656 ceph_abort("unexpected exception from ms_handle_accept()");
1658 messenger
.register_conn(
1659 seastar::static_pointer_cast
<SocketConnection
>(
1660 conn
.shared_from_this()));
1661 messenger
.unaccept_conn(
1662 seastar::static_pointer_cast
<SocketConnection
>(
1663 conn
.shared_from_this()));
1664 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
1665 return seastar::futurize_apply([this] {
1666 return send_server_ident();
1668 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1669 logger().debug("{} triggered {} at the end of execute_establishing()",
1670 conn
, get_state_name(state
));
1673 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1674 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1675 conn
, global_seq
, peer_global_seq
, connect_seq
,
1676 client_cookie
, server_cookie
, conn
.in_seq
,
1677 conn
.out_seq
, conn
.out_q
.size());
1679 }).handle_exception([this] (std::exception_ptr eptr
) {
1680 if (state
!= state_t::ESTABLISHING
) {
1681 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1682 conn
, get_state_name(state
), eptr
);
1683 assert(state
== state_t::CLOSING
||
1684 state
== state_t::REPLACING
);
1687 fault(false, "execute_establishing()", eptr
);
1692 // ESTABLISHING or REPLACING state
1695 ProtocolV2::send_server_ident()
1697 // send_server_ident() logic
1699 // refered to async-conn v2: not assign gs to global_seq
1700 return messenger
.get_global_seq().then([this] (auto gs
) {
1701 logger().debug("{} UPDATE: gs={} for server ident", conn
, global_seq
);
1703 // this is required for the case when this connection is being replaced
1707 if (!conn
.policy
.lossy
) {
1708 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1712 if (conn
.policy
.lossy
) {
1713 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
1716 auto server_ident
= ServerIdentFrame::Encode(
1717 messenger
.get_myaddrs(),
1718 messenger
.get_myname().num(),
1720 conn
.policy
.features_supported
,
1721 conn
.policy
.features_required
| msgr2_required
,
1725 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1726 " gs={}, features_supported={}, features_required={},"
1727 " flags={}, cookie={}",
1728 conn
, messenger
.get_myaddrs(), messenger
.get_myname().num(),
1729 gs
, conn
.policy
.features_supported
,
1730 conn
.policy
.features_required
| msgr2_required
,
1731 flags
, server_cookie
);
1733 conn
.set_features(connection_features
);
1735 return write_frame(server_ident
);
1741 void ProtocolV2::trigger_replacing(bool reconnect
,
1743 SocketRef
&& new_socket
,
1744 AuthConnectionMetaRef
&& new_auth_meta
,
1745 ceph::crypto::onwire::rxtx_t new_rxtx
,
1746 uint64_t new_peer_global_seq
,
1747 uint64_t new_client_cookie
,
1748 entity_name_t new_peer_name
,
1749 uint64_t new_conn_features
,
1750 uint64_t new_connect_seq
,
1751 uint64_t new_msg_seq
)
1753 trigger_state(state_t::REPLACING
, write_state_t::delay
, false);
1757 (void) seastar::with_gate(pending_dispatch
, [this] {
1758 return dispatcher
.ms_handle_accept(
1759 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1760 }).handle_exception([this] (std::exception_ptr eptr
) {
1761 logger().error("{} ms_handle_accept caught exception: {}", conn
, eptr
);
1762 ceph_abort("unexpected exception from ms_handle_accept()");
1764 (void) seastar::with_gate(pending_dispatch
,
1768 new_socket
= std::move(new_socket
),
1769 new_auth_meta
= std::move(new_auth_meta
),
1770 new_rxtx
= std::move(new_rxtx
),
1771 new_client_cookie
, new_peer_name
,
1772 new_conn_features
, new_peer_global_seq
,
1773 new_connect_seq
, new_msg_seq
] () mutable {
1774 return wait_write_exit().then([this, do_reset
] {
1776 reset_session(true);
1778 protocol_timer
.cancel();
1779 return execution_done
.get_future();
1782 new_socket
= std::move(new_socket
),
1783 new_auth_meta
= std::move(new_auth_meta
),
1784 new_rxtx
= std::move(new_rxtx
),
1785 new_client_cookie
, new_peer_name
,
1786 new_conn_features
, new_peer_global_seq
,
1787 new_connect_seq
, new_msg_seq
] () mutable {
1788 if (unlikely(state
!= state_t::REPLACING
)) {
1789 return new_socket
->close().then([sock
= std::move(new_socket
)] {
1795 (void) with_gate(pending_dispatch
, [sock
= std::move(socket
)] () mutable {
1796 return sock
->close().then([sock
= std::move(sock
)] {});
1799 socket
= std::move(new_socket
);
1800 auth_meta
= std::move(new_auth_meta
);
1801 session_stream_handlers
= std::move(new_rxtx
);
1803 peer_global_seq
= new_peer_global_seq
;
1806 connect_seq
= new_connect_seq
;
1807 // send_reconnect_ok() logic
1808 requeue_up_to(new_msg_seq
);
1809 auto reconnect_ok
= ReconnectOkFrame::Encode(conn
.in_seq
);
1810 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn
, conn
.in_seq
);
1811 return write_frame(reconnect_ok
);
1813 client_cookie
= new_client_cookie
;
1814 conn
.set_peer_name(new_peer_name
);
1815 connection_features
= new_conn_features
;
1816 return send_server_ident();
1818 }).then([this, reconnect
] {
1819 if (unlikely(state
!= state_t::REPLACING
)) {
1820 logger().debug("{} triggered {} at the end of trigger_replacing()",
1821 conn
, get_state_name(state
));
1824 logger().info("{} replaced ({}):"
1825 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1826 " in_seq={}, out_seq={}, out_q={}",
1827 conn
, reconnect
? "reconnected" : "connected",
1828 global_seq
, peer_global_seq
, connect_seq
, client_cookie
,
1829 server_cookie
, conn
.in_seq
, conn
.out_seq
, conn
.out_q
.size());
1831 }).handle_exception([this] (std::exception_ptr eptr
) {
1832 if (state
!= state_t::REPLACING
) {
1833 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1834 conn
, get_state_name(state
), eptr
);
1835 assert(state
== state_t::CLOSING
);
1838 fault(true, "trigger_replacing()", eptr
);
1845 ceph::bufferlist
ProtocolV2::do_sweep_messages(
1846 const std::deque
<MessageRef
>& msgs
,
1848 bool require_keepalive
,
1849 std::optional
<utime_t
> _keepalive_ack
,
1852 ceph::bufferlist bl
;
1854 if (unlikely(require_keepalive
)) {
1855 auto keepalive_frame
= KeepAliveFrame::Encode();
1856 bl
.append(keepalive_frame
.get_buffer(tx_frame_asm
));
1857 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2
, bp_type_t::WRITE
);
1860 if (unlikely(_keepalive_ack
.has_value())) {
1861 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(*_keepalive_ack
);
1862 bl
.append(keepalive_ack_frame
.get_buffer(tx_frame_asm
));
1863 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK
, bp_type_t::WRITE
);
1866 if (require_ack
&& !num_msgs
) {
1867 auto ack_frame
= AckFrame::Encode(conn
.in_seq
);
1868 bl
.append(ack_frame
.get_buffer(tx_frame_asm
));
1869 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK
, bp_type_t::WRITE
);
1872 std::for_each(msgs
.begin(), msgs
.begin()+num_msgs
, [this, &bl
](const MessageRef
& msg
) {
1873 // TODO: move to common code
1875 msg
->get_header().src
= messenger
.get_myname();
1877 msg
->encode(conn
.features
, 0);
1879 ceph_assert(!msg
->get_seq() && "message already has seq");
1880 msg
->set_seq(++conn
.out_seq
);
1882 ceph_msg_header
&header
= msg
->get_header();
1883 ceph_msg_footer
&footer
= msg
->get_footer();
1885 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
1886 header
.type
, header
.priority
,
1888 init_le32(0), header
.data_off
,
1889 init_le64(conn
.in_seq
),
1890 footer
.flags
, header
.compat_version
,
1893 auto message
= MessageFrame::Encode(header2
,
1894 msg
->get_payload(), msg
->get_middle(), msg
->get_data());
1895 logger().debug("{} --> #{} === {} ({})",
1896 conn
, msg
->get_seq(), *msg
, msg
->get_type());
1897 bl
.append(message
.get_buffer(tx_frame_asm
));
1898 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE
, bp_type_t::WRITE
);
1904 seastar::future
<> ProtocolV2::read_message(utime_t throttle_stamp
)
1906 return read_frame_payload()
1907 .then([this, throttle_stamp
] {
1908 utime_t recv_stamp
{seastar::lowres_system_clock::now()};
1910 // we need to get the size before std::moving segments data
1911 const size_t cur_msg_size
= get_current_msg_size();
1912 auto msg_frame
= MessageFrame::Decode(rx_segments_data
);
1913 // XXX: paranoid copy just to avoid oops
1914 ceph_msg_header2 current_header
= msg_frame
.header();
1916 logger().trace("{} got {} + {} + {} byte message,"
1917 " envelope type={} src={} off={} seq={}",
1918 conn
, msg_frame
.front_len(), msg_frame
.middle_len(),
1919 msg_frame
.data_len(), current_header
.type
, conn
.get_peer_name(),
1920 current_header
.data_off
, current_header
.seq
);
1922 ceph_msg_header header
{current_header
.seq
,
1924 current_header
.type
,
1925 current_header
.priority
,
1926 current_header
.version
,
1927 init_le32(msg_frame
.front_len()),
1928 init_le32(msg_frame
.middle_len()),
1929 init_le32(msg_frame
.data_len()),
1930 current_header
.data_off
,
1931 conn
.get_peer_name(),
1932 current_header
.compat_version
,
1933 current_header
.reserved
,
1935 ceph_msg_footer footer
{init_le32(0), init_le32(0),
1936 init_le32(0), init_le64(0), current_header
.flags
};
1938 auto pconn
= seastar::static_pointer_cast
<SocketConnection
>(
1939 conn
.shared_from_this());
1940 Message
*message
= decode_message(nullptr, 0, header
, footer
,
1941 msg_frame
.front(), msg_frame
.middle(), msg_frame
.data(),
1944 logger().warn("{} decode message failed", conn
);
1948 // store reservation size in message, so we don't get confused
1949 // by messages entering the dispatch queue through other paths.
1950 message
->set_dispatch_throttle_size(cur_msg_size
);
1952 message
->set_throttle_stamp(throttle_stamp
);
1953 message
->set_recv_stamp(recv_stamp
);
1954 message
->set_recv_complete_stamp(utime_t
{seastar::lowres_system_clock::now()});
1956 // check received seq#. if it is old, drop the message.
1957 // note that incoming messages may skip ahead. this is convenient for the
1958 // client side queueing because messages can't be renumbered, but the (kernel)
1959 // client will occasionally pull a message out of the sent queue to send
1960 // elsewhere. in that case it doesn't matter if we "got" it or not.
1961 uint64_t cur_seq
= conn
.in_seq
;
1962 if (message
->get_seq() <= cur_seq
) {
1963 logger().error("{} got old message {} <= {} {} {}, discarding",
1964 conn
, message
->get_seq(), cur_seq
, message
, *message
);
1965 if (HAVE_FEATURE(conn
.features
, RECONNECT_SEQ
) &&
1966 conf
.ms_die_on_old_message
) {
1967 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1970 } else if (message
->get_seq() > cur_seq
+ 1) {
1971 logger().error("{} missed message? skipped from seq {} to {}",
1972 conn
, cur_seq
, message
->get_seq());
1973 if (conf
.ms_die_on_skipped_message
) {
1974 ceph_assert(0 == "skipped incoming seq");
1978 // note last received message.
1979 conn
.in_seq
= message
->get_seq();
1980 logger().debug("{} <== #{} === {} ({})",
1981 conn
, message
->get_seq(), *message
, message
->get_type());
1983 ack_writes(current_header
.ack_seq
);
1985 // TODO: change MessageRef with seastar::shared_ptr
1986 auto msg_ref
= MessageRef
{message
, false};
1987 (void) seastar::with_gate(pending_dispatch
, [this, msg
= std::move(msg_ref
)] {
1988 return dispatcher
.ms_dispatch(&conn
, std::move(msg
));
1989 }).handle_exception([this] (std::exception_ptr eptr
) {
1990 logger().error("{} ms_dispatch caught exception: {}", conn
, eptr
);
1991 ceph_abort("unexpected exception from ms_dispatch()");
1996 void ProtocolV2::execute_ready()
1998 assert(conn
.policy
.lossy
|| (client_cookie
!= 0 && server_cookie
!= 0));
1999 trigger_state(state_t::READY
, write_state_t::open
, false);
2000 #ifdef UNIT_TESTS_BUILT
2001 if (conn
.interceptor
) {
2002 conn
.interceptor
->register_conn_ready(conn
);
2005 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
2006 protocol_timer
.cancel();
2007 return seastar::keep_doing([this] {
2008 return read_main_preamble()
2009 .then([this] (Tag tag
) {
2011 case Tag::MESSAGE
: {
2012 return seastar::futurize_apply([this] {
2013 // throttle_message() logic
2014 if (!conn
.policy
.throttler_messages
) {
2015 return seastar::now();
2017 // TODO: message throttler
2019 return seastar::now();
2021 // throttle_bytes() logic
2022 if (!conn
.policy
.throttler_bytes
) {
2023 return seastar::now();
2025 size_t cur_msg_size
= get_current_msg_size();
2026 if (!cur_msg_size
) {
2027 return seastar::now();
2029 logger().trace("{} wants {} bytes from policy throttler {}/{}",
2031 conn
.policy
.throttler_bytes
->get_current(),
2032 conn
.policy
.throttler_bytes
->get_max());
2033 return conn
.policy
.throttler_bytes
->get(cur_msg_size
);
2035 // TODO: throttle_dispatch_queue() logic
2036 utime_t throttle_stamp
{seastar::lowres_system_clock::now()};
2037 return read_message(throttle_stamp
);
2041 return read_frame_payload().then([this] {
2042 // handle_message_ack() logic
2043 auto ack
= AckFrame::Decode(rx_segments_data
.back());
2044 logger().debug("{} GOT AckFrame: seq={}", conn
, ack
.seq());
2045 ack_writes(ack
.seq());
2047 case Tag::KEEPALIVE2
:
2048 return read_frame_payload().then([this] {
2049 // handle_keepalive2() logic
2050 auto keepalive_frame
= KeepAliveFrame::Decode(rx_segments_data
.back());
2051 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
2052 conn
, keepalive_frame
.timestamp());
2053 notify_keepalive_ack(keepalive_frame
.timestamp());
2054 conn
.set_last_keepalive(seastar::lowres_system_clock::now());
2056 case Tag::KEEPALIVE2_ACK
:
2057 return read_frame_payload().then([this] {
2058 // handle_keepalive2_ack() logic
2059 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(rx_segments_data
.back());
2060 conn
.set_last_keepalive_ack(
2061 seastar::lowres_system_clock::time_point
{keepalive_ack_frame
.timestamp()});
2062 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2063 conn
, conn
.last_keepalive_ack
);
2066 unexpected_tag(tag
, conn
, "execute_ready");
2067 return seastar::now();
2071 }).handle_exception([this] (std::exception_ptr eptr
) {
2072 if (state
!= state_t::READY
) {
2073 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2074 conn
, get_state_name(state
), eptr
);
2075 assert(state
== state_t::REPLACING
||
2076 state
== state_t::CLOSING
);
2079 fault(false, "execute_ready()", eptr
);
2086 void ProtocolV2::execute_standby()
2088 trigger_state(state_t::STANDBY
, write_state_t::delay
, true);
2094 void ProtocolV2::notify_write()
2096 if (unlikely(state
== state_t::STANDBY
&& !conn
.policy
.server
)) {
2097 logger().info("{} notify_write(): at {}, going to CONNECTING",
2098 conn
, get_state_name(state
));
2099 execute_connecting();
2105 void ProtocolV2::execute_wait(bool max_backoff
)
2107 trigger_state(state_t::WAIT
, write_state_t::delay
, true);
2111 execution_done
= seastar::with_gate(pending_dispatch
,
2112 [this, max_backoff
] {
2113 double backoff
= protocol_timer
.last_dur();
2115 backoff
= conf
.ms_max_backoff
;
2116 } else if (backoff
> 0) {
2117 backoff
= std::min(conf
.ms_max_backoff
, 2 * backoff
);
2119 backoff
= conf
.ms_initial_backoff
;
2121 return protocol_timer
.backoff(backoff
).then([this] {
2122 if (unlikely(state
!= state_t::WAIT
)) {
2123 logger().debug("{} triggered {} at the end of execute_wait()",
2124 conn
, get_state_name(state
));
2127 logger().info("{} execute_wait(): going to CONNECTING", conn
);
2128 execute_connecting();
2129 }).handle_exception([this] (std::exception_ptr eptr
) {
2130 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2131 conn
, get_state_name(state
), eptr
);
2132 assert(state
== state_t::REPLACING
||
2133 state
== state_t::CLOSING
);
2138 // SERVER_WAIT state
2140 void ProtocolV2::execute_server_wait()
2142 trigger_state(state_t::SERVER_WAIT
, write_state_t::delay
, false);
2143 execution_done
= seastar::with_gate(pending_dispatch
, [this] {
2144 return read_exactly(1).then([this] (auto bl
) {
2145 logger().warn("{} SERVER_WAIT got read, abort", conn
);
2147 }).handle_exception([this] (std::exception_ptr eptr
) {
2148 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2149 conn
, get_state_name(state
), eptr
);
2157 void ProtocolV2::trigger_close()
2159 if (state
== state_t::ACCEPTING
|| state
== state_t::SERVER_WAIT
) {
2160 messenger
.unaccept_conn(
2161 seastar::static_pointer_cast
<SocketConnection
>(
2162 conn
.shared_from_this()));
2163 } else if (state
>= state_t::ESTABLISHING
&& state
< state_t::CLOSING
) {
2164 messenger
.unregister_conn(
2165 seastar::static_pointer_cast
<SocketConnection
>(
2166 conn
.shared_from_this()));
2172 protocol_timer
.cancel();
2174 trigger_state(state_t::CLOSING
, write_state_t::drop
, false);
2175 #ifdef UNIT_TESTS_BUILT
2176 if (conn
.interceptor
) {
2177 conn
.interceptor
->register_conn_closed(conn
);
2182 } // namespace crimson::net