1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV2.h"
6 #include <seastar/core/lowres_clock.hh>
7 #include <fmt/format.h>
8 #include "include/msgr.h"
9 #include "include/random.h"
11 #include "crimson/auth/AuthClient.h"
12 #include "crimson/auth/AuthServer.h"
13 #include "crimson/common/formatter.h"
15 #include "chained_dispatchers.h"
18 #include "SocketConnection.h"
19 #include "SocketMessenger.h"
21 #ifdef UNIT_TESTS_BUILT
22 #include "Interceptor.h"
25 using namespace ceph::msgr::v2
;
26 using crimson::common::local_conf
;
30 // TODO: apply the same logging policy to Protocol V1
31 // Log levels in V2 Protocol:
32 // * error level, something error that cause connection to terminate:
35 // * warn level: something unusual that identifies connection fault or replacement:
36 // - unstable network;
37 // - incompatible peer;
40 // - connection reset;
41 // * info level, something very important to show connection lifecycle,
42 // which doesn't happen very frequently;
43 // * debug level, important logs for debugging, including:
44 // - all the messages sent/received (-->/<==);
45 // - all the frames exchanged (WRITE/GOT);
46 // - important fields updated (UPDATE);
47 // - connection state transitions (TRIGGER);
48 // * trace level, trivial logs showing:
49 // - the exact bytes being sent/received (SEND/RECV(bytes));
50 // - detailed information of sub-frames;
51 // - integrity checks;
53 seastar::logger
& logger() {
54 return crimson::get_logger(ceph_subsys_ms
);
57 [[noreturn
]] void abort_in_fault() {
58 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
61 [[noreturn
]] void abort_protocol() {
62 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
65 [[noreturn
]] void abort_in_close(crimson::net::ProtocolV2
& proto
, bool dispatch_reset
) {
66 proto
.close(dispatch_reset
);
70 inline void expect_tag(const Tag
& expected
,
72 crimson::net::SocketConnection
& conn
,
74 if (actual
!= expected
) {
75 logger().warn("{} {} received wrong tag: {}, expected {}",
77 static_cast<uint32_t>(actual
),
78 static_cast<uint32_t>(expected
));
83 inline void unexpected_tag(const Tag
& unexpected
,
84 crimson::net::SocketConnection
& conn
,
86 logger().warn("{} {} received unexpected tag: {}",
87 conn
, where
, static_cast<uint32_t>(unexpected
));
91 inline uint64_t generate_client_cookie() {
92 return ceph::util::generate_random_number
<uint64_t>(
93 1, std::numeric_limits
<uint64_t>::max());
96 } // namespace anonymous
98 namespace crimson::net
{
100 #ifdef UNIT_TESTS_BUILT
101 void intercept(Breakpoint bp
, bp_type_t type
,
102 SocketConnection
& conn
, SocketRef
& socket
) {
103 if (conn
.interceptor
) {
104 auto action
= conn
.interceptor
->intercept(conn
, Breakpoint(bp
));
105 socket
->set_trap(type
, action
, &conn
.interceptor
->blocker
);
109 #define INTERCEPT_CUSTOM(bp, type) \
110 intercept({bp}, type, conn, socket)
112 #define INTERCEPT_FRAME(tag, type) \
113 intercept({static_cast<Tag>(tag), type}, \
116 #define INTERCEPT_N_RW(bp) \
117 if (conn.interceptor) { \
118 auto action = conn.interceptor->intercept(conn, {bp}); \
119 ceph_assert(action != bp_action_t::BLOCK); \
120 if (action == bp_action_t::FAULT) { \
126 #define INTERCEPT_CUSTOM(bp, type)
127 #define INTERCEPT_FRAME(tag, type)
128 #define INTERCEPT_N_RW(bp)
131 seastar::future
<> ProtocolV2::Timer::backoff(double seconds
)
133 logger().warn("{} waiting {} seconds ...", conn
, seconds
);
136 as
= seastar::abort_source();
137 auto dur
= std::chrono::duration_cast
<seastar::lowres_clock::duration
>(
138 std::chrono::duration
<double>(seconds
));
139 return seastar::sleep_abortable(dur
, *as
140 ).handle_exception_type([this] (const seastar::sleep_aborted
& e
) {
141 logger().debug("{} wait aborted", conn
);
146 ProtocolV2::ProtocolV2(ChainedDispatchers
& dispatchers
,
147 SocketConnection
& conn
,
148 SocketMessenger
& messenger
)
149 : Protocol(proto_t::v2
, dispatchers
, conn
),
150 messenger
{messenger
},
154 ProtocolV2::~ProtocolV2() {}
156 bool ProtocolV2::is_connected() const {
157 return state
== state_t::READY
||
158 state
== state_t::ESTABLISHING
||
159 state
== state_t::REPLACING
;
162 void ProtocolV2::start_connect(const entity_addr_t
& _peer_addr
,
163 const entity_name_t
& _peer_name
)
165 ceph_assert(state
== state_t::NONE
);
166 ceph_assert(!socket
);
167 ceph_assert(!gate
.is_closed());
168 conn
.peer_addr
= _peer_addr
;
169 conn
.target_addr
= _peer_addr
;
170 conn
.set_peer_name(_peer_name
);
171 conn
.policy
= messenger
.get_policy(_peer_name
.type());
172 client_cookie
= generate_client_cookie();
173 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
174 " policy(lossy={}, server={}, standby={}, resetcheck={})",
175 conn
, _peer_addr
, _peer_name
, client_cookie
,
176 conn
.policy
.lossy
, conn
.policy
.server
,
177 conn
.policy
.standby
, conn
.policy
.resetcheck
);
178 messenger
.register_conn(
179 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
180 execute_connecting();
183 void ProtocolV2::start_accept(SocketRef
&& sock
,
184 const entity_addr_t
& _peer_addr
)
186 ceph_assert(state
== state_t::NONE
);
187 ceph_assert(!socket
);
188 // until we know better
189 conn
.target_addr
= _peer_addr
;
190 socket
= std::move(sock
);
191 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn
, _peer_addr
);
192 messenger
.accept_conn(
193 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
197 // TODO: Frame related implementations, probably to a separate class.
199 void ProtocolV2::enable_recording()
206 seastar::future
<Socket::tmp_buf
> ProtocolV2::read_exactly(size_t bytes
)
208 if (unlikely(record_io
)) {
209 return socket
->read_exactly(bytes
)
210 .then([this] (auto bl
) {
211 rxbuf
.append(buffer::create(bl
.share()));
215 return socket
->read_exactly(bytes
);
219 seastar::future
<bufferlist
> ProtocolV2::read(size_t bytes
)
221 if (unlikely(record_io
)) {
222 return socket
->read(bytes
)
223 .then([this] (auto buf
) {
228 return socket
->read(bytes
);
232 seastar::future
<> ProtocolV2::write(bufferlist
&& buf
)
234 if (unlikely(record_io
)) {
237 return socket
->write(std::move(buf
));
240 seastar::future
<> ProtocolV2::write_flush(bufferlist
&& buf
)
242 if (unlikely(record_io
)) {
245 return socket
->write_flush(std::move(buf
));
248 size_t ProtocolV2::get_current_msg_size() const
250 ceph_assert(rx_frame_asm
.get_num_segments() > 0);
252 // we don't include SegmentIndex::Msg::HEADER.
253 for (size_t idx
= 1; idx
< rx_frame_asm
.get_num_segments(); idx
++) {
254 sum
+= rx_frame_asm
.get_segment_logical_len(idx
);
259 seastar::future
<Tag
> ProtocolV2::read_main_preamble()
262 return read_exactly(rx_frame_asm
.get_preamble_onwire_len())
263 .then([this] (auto bl
) {
264 rx_segments_data
.clear();
266 rx_preamble
.append(buffer::create(std::move(bl
)));
267 const Tag tag
= rx_frame_asm
.disassemble_preamble(rx_preamble
);
268 INTERCEPT_FRAME(tag
, bp_type_t::READ
);
270 } catch (FrameError
& e
) {
271 logger().warn("{} read_main_preamble: {}", conn
, e
.what());
277 seastar::future
<> ProtocolV2::read_frame_payload()
279 ceph_assert(rx_segments_data
.empty());
281 return seastar::do_until(
282 [this] { return rx_frame_asm
.get_num_segments() == rx_segments_data
.size(); },
284 // TODO: create aligned and contiguous buffer from socket
285 const size_t seg_idx
= rx_segments_data
.size();
286 if (uint16_t alignment
= rx_frame_asm
.get_segment_align(seg_idx
);
287 alignment
!= segment_t::DEFAULT_ALIGNMENT
) {
288 logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
289 conn
, alignment
, rx_segments_data
.size());
291 uint32_t onwire_len
= rx_frame_asm
.get_segment_onwire_len(seg_idx
);
292 // TODO: create aligned and contiguous buffer from socket
293 return read_exactly(onwire_len
).then([this] (auto tmp_bl
) {
294 logger().trace("{} RECV({}) frame segment[{}]",
295 conn
, tmp_bl
.size(), rx_segments_data
.size());
297 segment
.append(buffer::create(std::move(tmp_bl
)));
298 rx_segments_data
.emplace_back(std::move(segment
));
302 return read_exactly(rx_frame_asm
.get_epilogue_onwire_len());
303 }).then([this] (auto bl
) {
304 logger().trace("{} RECV({}) frame epilogue", conn
, bl
.size());
307 rx_frame_asm
.disassemble_first_segment(rx_preamble
, rx_segments_data
[0]);
308 bufferlist rx_epilogue
;
309 rx_epilogue
.append(buffer::create(std::move(bl
)));
310 ok
= rx_frame_asm
.disassemble_remaining_segments(rx_segments_data
.data(), rx_epilogue
);
311 } catch (FrameError
& e
) {
312 logger().error("read_frame_payload: {} {}", conn
, e
.what());
314 } catch (ceph::crypto::onwire::MsgAuthError
&) {
315 logger().error("read_frame_payload: {} bad auth tag", conn
);
318 // we do have a mechanism that allows transmitter to start sending message
319 // and abort after putting entire data field on wire. This will be used by
320 // the kernel client to avoid unnecessary buffering.
329 seastar::future
<> ProtocolV2::write_frame(F
&frame
, bool flush
)
331 auto bl
= frame
.get_buffer(tx_frame_asm
);
332 const auto main_preamble
= reinterpret_cast<const preamble_block_t
*>(bl
.front().c_str());
333 logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
334 conn
, bl
.length(), (int)main_preamble
->tag
,
335 (int)main_preamble
->num_segments
, main_preamble
->crc
);
336 INTERCEPT_FRAME(main_preamble
->tag
, bp_type_t::WRITE
);
338 return write_flush(std::move(bl
));
340 return write(std::move(bl
));
344 void ProtocolV2::trigger_state(state_t _state
, write_state_t _write_state
, bool reentrant
)
346 if (!reentrant
&& _state
== state
) {
347 logger().error("{} is not allowed to re-trigger state {}",
348 conn
, get_state_name(state
));
351 logger().debug("{} TRIGGER {}, was {}",
352 conn
, get_state_name(_state
), get_state_name(state
));
354 set_write_state(_write_state
);
357 void ProtocolV2::fault(bool backoff
, const char* func_name
, std::exception_ptr eptr
)
359 if (conn
.policy
.lossy
) {
360 logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
361 conn
, func_name
, get_state_name(state
), eptr
);
363 } else if (conn
.policy
.server
||
364 (conn
.policy
.standby
&&
365 (!is_queued() && conn
.sent
.empty()))) {
366 logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
367 conn
, func_name
, get_state_name(state
), eptr
);
369 } else if (backoff
) {
370 logger().info("{} {}: fault at {}, going to WAIT -- {}",
371 conn
, func_name
, get_state_name(state
), eptr
);
374 logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
375 conn
, func_name
, get_state_name(state
), eptr
);
376 execute_connecting();
380 void ProtocolV2::reset_session(bool full
)
386 client_cookie
= generate_client_cookie();
389 dispatchers
.ms_handle_remote_reset(
390 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
394 seastar::future
<std::tuple
<entity_type_t
, entity_addr_t
>>
395 ProtocolV2::banner_exchange(bool is_connect
)
397 // 1. prepare and send banner
398 bufferlist banner_payload
;
399 encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
400 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
403 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
404 auto len_payload
= static_cast<uint16_t>(banner_payload
.length());
405 encode(len_payload
, bl
, 0);
406 bl
.claim_append(banner_payload
);
407 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
408 "required={}, banner=\"{}\"",
409 conn
, bl
.length(), len_payload
,
410 CEPH_MSGR2_SUPPORTED_FEATURES
, CEPH_MSGR2_REQUIRED_FEATURES
,
411 CEPH_BANNER_V2_PREFIX
);
412 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE
, bp_type_t::WRITE
);
413 return write_flush(std::move(bl
)).then([this] {
414 // 2. read peer banner
415 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
416 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ
, bp_type_t::READ
);
417 return read_exactly(banner_len
); // or read exactly?
418 }).then([this] (auto bl
) {
419 // 3. process peer banner and read banner_payload
420 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
421 logger().debug("{} RECV({}) banner: \"{}\"",
423 std::string((const char*)bl
.get(), banner_prefix_len
));
425 if (memcmp(bl
.get(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
) != 0) {
426 if (memcmp(bl
.get(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
427 logger().warn("{} peer is using V1 protocol", conn
);
429 logger().warn("{} peer sent bad banner", conn
);
433 bl
.trim_front(banner_prefix_len
);
435 uint16_t payload_len
;
437 buf
.append(buffer::create(std::move(bl
)));
438 auto ti
= buf
.cbegin();
440 decode(payload_len
, ti
);
441 } catch (const buffer::error
&e
) {
442 logger().warn("{} decode banner payload len failed", conn
);
445 logger().debug("{} GOT banner: payload_len={}", conn
, payload_len
);
446 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ
, bp_type_t::READ
);
447 return read(payload_len
);
448 }).then([this, is_connect
] (bufferlist bl
) {
449 // 4. process peer banner_payload and send HelloFrame
450 auto p
= bl
.cbegin();
451 uint64_t peer_supported_features
;
452 uint64_t peer_required_features
;
454 decode(peer_supported_features
, p
);
455 decode(peer_required_features
, p
);
456 } catch (const buffer::error
&e
) {
457 logger().warn("{} decode banner payload failed", conn
);
460 logger().debug("{} RECV({}) banner features: supported={} required={}",
462 peer_supported_features
, peer_required_features
);
464 // Check feature bit compatibility
465 uint64_t supported_features
= CEPH_MSGR2_SUPPORTED_FEATURES
;
466 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
467 if ((required_features
& peer_supported_features
) != required_features
) {
468 logger().error("{} peer does not support all required features"
469 " required={} peer_supported={}",
470 conn
, required_features
, peer_supported_features
);
471 abort_in_close(*this, is_connect
);
473 if ((supported_features
& peer_required_features
) != peer_required_features
) {
474 logger().error("{} we do not support all peer required features"
475 " peer_required={} supported={}",
476 conn
, peer_required_features
, supported_features
);
477 abort_in_close(*this, is_connect
);
479 this->peer_required_features
= peer_required_features
;
480 if (this->peer_required_features
== 0) {
481 this->connection_features
= msgr2_required
;
483 const bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
484 tx_frame_asm
.set_is_rev1(is_rev1
);
485 rx_frame_asm
.set_is_rev1(is_rev1
);
487 auto hello
= HelloFrame::Encode(messenger
.get_mytype(),
489 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
490 conn
, ceph_entity_type_name(messenger
.get_mytype()),
492 return write_frame(hello
);
494 //5. read peer HelloFrame
495 return read_main_preamble();
496 }).then([this] (Tag tag
) {
497 expect_tag(Tag::HELLO
, tag
, conn
, __func__
);
498 return read_frame_payload();
500 // 6. process peer HelloFrame
501 auto hello
= HelloFrame::Decode(rx_segments_data
.back());
502 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
503 conn
, ceph_entity_type_name(hello
.entity_type()),
505 return seastar::make_ready_future
<std::tuple
<entity_type_t
, entity_addr_t
>>(
506 std::make_tuple(hello
.entity_type(), hello
.peer_addr()));
512 seastar::future
<> ProtocolV2::handle_auth_reply()
514 return read_main_preamble()
515 .then([this] (Tag tag
) {
517 case Tag::AUTH_BAD_METHOD
:
518 return read_frame_payload().then([this] {
519 // handle_auth_bad_method() logic
520 auto bad_method
= AuthBadMethodFrame::Decode(rx_segments_data
.back());
521 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
522 "allowed_methods={}, allowed_modes={}",
523 conn
, bad_method
.method(), cpp_strerror(bad_method
.result()),
524 bad_method
.allowed_methods(), bad_method
.allowed_modes());
525 ceph_assert(messenger
.get_auth_client());
526 int r
= messenger
.get_auth_client()->handle_auth_bad_method(
527 conn
.shared_from_this(), auth_meta
,
528 bad_method
.method(), bad_method
.result(),
529 bad_method
.allowed_methods(), bad_method
.allowed_modes());
531 logger().warn("{} auth_client handle_auth_bad_method returned {}",
535 return client_auth(bad_method
.allowed_methods());
537 case Tag::AUTH_REPLY_MORE
:
538 return read_frame_payload().then([this] {
539 // handle_auth_reply_more() logic
540 auto auth_more
= AuthReplyMoreFrame::Decode(rx_segments_data
.back());
541 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
542 conn
, auth_more
.auth_payload().length());
543 ceph_assert(messenger
.get_auth_client());
544 // let execute_connecting() take care of the thrown exception
545 auto reply
= messenger
.get_auth_client()->handle_auth_reply_more(
546 conn
.shared_from_this(), auth_meta
, auth_more
.auth_payload());
547 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
548 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
549 conn
, reply
.length());
550 return write_frame(more_reply
);
552 return handle_auth_reply();
555 return read_frame_payload().then([this] {
556 // handle_auth_done() logic
557 auto auth_done
= AuthDoneFrame::Decode(rx_segments_data
.back());
558 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
559 conn
, auth_done
.global_id(),
560 ceph_con_mode_name(auth_done
.con_mode()),
561 auth_done
.auth_payload().length());
562 ceph_assert(messenger
.get_auth_client());
563 int r
= messenger
.get_auth_client()->handle_auth_done(
564 conn
.shared_from_this(), auth_meta
,
565 auth_done
.global_id(),
566 auth_done
.con_mode(),
567 auth_done
.auth_payload());
569 logger().warn("{} auth_client handle_auth_done returned {}", conn
, r
);
572 auth_meta
->con_mode
= auth_done
.con_mode();
573 session_stream_handlers
= ceph::crypto::onwire::rxtx_t::create_handler_pair(
574 nullptr, *auth_meta
, tx_frame_asm
.get_is_rev1(), false);
575 return finish_auth();
578 unexpected_tag(tag
, conn
, __func__
);
579 return seastar::now();
585 seastar::future
<> ProtocolV2::client_auth(std::vector
<uint32_t> &allowed_methods
)
587 // send_auth_request() logic
588 ceph_assert(messenger
.get_auth_client());
591 auto [auth_method
, preferred_modes
, bl
] =
592 messenger
.get_auth_client()->get_auth_request(conn
.shared_from_this(), auth_meta
);
593 auth_meta
->auth_method
= auth_method
;
594 auto frame
= AuthRequestFrame::Encode(auth_method
, preferred_modes
, bl
);
595 logger().debug("{} WRITE AuthRequestFrame: method={},"
596 " preferred_modes={}, payload_len={}",
597 conn
, auth_method
, preferred_modes
, bl
.length());
598 return write_frame(frame
).then([this] {
599 return handle_auth_reply();
601 } catch (const crimson::auth::error
& e
) {
602 logger().error("{} get_initial_auth_request returned {}", conn
, e
);
603 abort_in_close(*this, true);
604 return seastar::now();
608 seastar::future
<ProtocolV2::next_step_t
>
609 ProtocolV2::process_wait()
611 return read_frame_payload().then([this] {
612 // handle_wait() logic
613 logger().debug("{} GOT WaitFrame", conn
);
614 WaitFrame::Decode(rx_segments_data
.back());
615 return next_step_t::wait
;
619 seastar::future
<ProtocolV2::next_step_t
>
620 ProtocolV2::client_connect()
622 // send_client_ident() logic
624 if (conn
.policy
.lossy
) {
625 flags
|= CEPH_MSG_CONNECT_LOSSY
;
628 auto client_ident
= ClientIdentFrame::Encode(
629 messenger
.get_myaddrs(),
631 messenger
.get_myname().num(),
633 conn
.policy
.features_supported
,
634 conn
.policy
.features_required
| msgr2_required
, flags
,
637 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
638 " gs={}, features_supported={}, features_required={},"
639 " flags={}, cookie={}",
640 conn
, messenger
.get_myaddrs(), conn
.target_addr
,
641 messenger
.get_myname().num(), global_seq
,
642 conn
.policy
.features_supported
,
643 conn
.policy
.features_required
| msgr2_required
,
644 flags
, client_cookie
);
645 return write_frame(client_ident
).then([this] {
646 return read_main_preamble();
647 }).then([this] (Tag tag
) {
649 case Tag::IDENT_MISSING_FEATURES
:
650 return read_frame_payload().then([this] {
651 // handle_ident_missing_features() logic
652 auto ident_missing
= IdentMissingFeaturesFrame::Decode(rx_segments_data
.back());
653 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
654 " (client does not support all server features)",
655 conn
, ident_missing
.features());
657 return next_step_t::none
;
660 return process_wait();
661 case Tag::SERVER_IDENT
:
662 return read_frame_payload().then([this] {
663 // handle_server_ident() logic
665 auto server_ident
= ServerIdentFrame::Decode(rx_segments_data
.back());
666 logger().debug("{} GOT ServerIdentFrame:"
667 " addrs={}, gid={}, gs={},"
668 " features_supported={}, features_required={},"
669 " flags={}, cookie={}",
671 server_ident
.addrs(), server_ident
.gid(),
672 server_ident
.global_seq(),
673 server_ident
.supported_features(),
674 server_ident
.required_features(),
675 server_ident
.flags(), server_ident
.cookie());
677 // is this who we intended to talk to?
678 // be a bit forgiving here, since we may be connecting based on addresses parsed out
679 // of mon_host or something.
680 if (!server_ident
.addrs().contains(conn
.target_addr
)) {
681 logger().warn("{} peer identifies as {}, does not include {}",
682 conn
, server_ident
.addrs(), conn
.target_addr
);
683 throw std::system_error(
684 make_error_code(crimson::net::error::bad_peer_address
));
687 server_cookie
= server_ident
.cookie();
689 // TODO: change peer_addr to entity_addrvec_t
690 if (server_ident
.addrs().front() != conn
.peer_addr
) {
691 logger().warn("{} peer advertises as {}, does not match {}",
692 conn
, server_ident
.addrs(), conn
.peer_addr
);
693 throw std::system_error(
694 make_error_code(crimson::net::error::bad_peer_address
));
696 if (conn
.get_peer_id() != entity_name_t::NEW
&&
697 conn
.get_peer_id() != server_ident
.gid()) {
698 logger().error("{} connection peer id ({}) does not match "
699 "what it should be ({}) during connecting, close",
700 conn
, server_ident
.gid(), conn
.get_peer_id());
701 abort_in_close(*this, true);
703 conn
.set_peer_id(server_ident
.gid());
704 conn
.set_features(server_ident
.supported_features() &
705 conn
.policy
.features_supported
);
706 peer_global_seq
= server_ident
.global_seq();
708 bool lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
709 if (lossy
!= conn
.policy
.lossy
) {
710 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn
, lossy
);
711 conn
.policy
.lossy
= lossy
;
713 if (lossy
&& (connect_seq
!= 0 || server_cookie
!= 0)) {
714 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
715 conn
, connect_seq
, server_cookie
);
720 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
723 unexpected_tag(tag
, conn
, "post_client_connect");
724 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
730 seastar::future
<ProtocolV2::next_step_t
>
731 ProtocolV2::client_reconnect()
733 // send_reconnect() logic
734 auto reconnect
= ReconnectFrame::Encode(messenger
.get_myaddrs(),
740 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
741 " server_cookie={}, gs={}, cs={}, msg_seq={}",
742 conn
, messenger
.get_myaddrs(),
743 client_cookie
, server_cookie
,
744 global_seq
, connect_seq
, conn
.in_seq
);
745 return write_frame(reconnect
).then([this] {
746 return read_main_preamble();
747 }).then([this] (Tag tag
) {
749 case Tag::SESSION_RETRY_GLOBAL
:
750 return read_frame_payload().then([this] {
751 // handle_session_retry_global() logic
752 auto retry
= RetryGlobalFrame::Decode(rx_segments_data
.back());
753 logger().warn("{} GOT RetryGlobalFrame: gs={}",
754 conn
, retry
.global_seq());
755 return messenger
.get_global_seq(retry
.global_seq()).then([this] (auto gs
) {
757 logger().warn("{} UPDATE: gs={} for retry global", conn
, global_seq
);
758 return client_reconnect();
761 case Tag::SESSION_RETRY
:
762 return read_frame_payload().then([this] {
763 // handle_session_retry() logic
764 auto retry
= RetryFrame::Decode(rx_segments_data
.back());
765 logger().warn("{} GOT RetryFrame: cs={}",
766 conn
, retry
.connect_seq());
767 connect_seq
= retry
.connect_seq() + 1;
768 logger().warn("{} UPDATE: cs={}", conn
, connect_seq
);
769 return client_reconnect();
771 case Tag::SESSION_RESET
:
772 return read_frame_payload().then([this] {
773 // handle_session_reset() logic
774 auto reset
= ResetFrame::Decode(rx_segments_data
.back());
775 logger().warn("{} GOT ResetFrame: full={}", conn
, reset
.full());
776 reset_session(reset
.full());
777 return client_connect();
780 return process_wait();
781 case Tag::SESSION_RECONNECT_OK
:
782 return read_frame_payload().then([this] {
783 // handle_reconnect_ok() logic
784 auto reconnect_ok
= ReconnectOkFrame::Decode(rx_segments_data
.back());
785 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
786 conn
, reconnect_ok
.msg_seq());
787 requeue_up_to(reconnect_ok
.msg_seq());
788 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
791 unexpected_tag(tag
, conn
, "post_client_reconnect");
792 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
798 void ProtocolV2::execute_connecting()
800 trigger_state(state_t::CONNECTING
, write_state_t::delay
, true);
804 gated_execute("execute_connecting", [this] {
805 return messenger
.get_global_seq().then([this] (auto gs
) {
807 assert(client_cookie
!= 0);
808 if (!conn
.policy
.lossy
&& server_cookie
!= 0) {
810 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
811 conn
, global_seq
, connect_seq
);
812 } else { // conn.policy.lossy || server_cookie == 0
813 assert(connect_seq
== 0);
814 assert(server_cookie
== 0);
815 logger().debug("{} UPDATE: gs={} for connect", conn
, global_seq
);
818 return wait_write_exit();
820 if (unlikely(state
!= state_t::CONNECTING
)) {
821 logger().debug("{} triggered {} before Socket::connect()",
822 conn
, get_state_name(state
));
826 gate
.dispatch_in_background("close_sockect_connecting", *this,
827 [sock
= std::move(socket
)] () mutable {
828 return sock
->close().then([sock
= std::move(sock
)] {});
831 INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING
);
832 return Socket::connect(conn
.peer_addr
);
833 }).then([this](SocketRef sock
) {
834 logger().debug("{} socket connected", conn
);
835 if (unlikely(state
!= state_t::CONNECTING
)) {
836 logger().debug("{} triggered {} during Socket::connect()",
837 conn
, get_state_name(state
));
838 return sock
->close().then([sock
= std::move(sock
)] {
842 socket
= std::move(sock
);
843 return seastar::now();
845 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
846 session_stream_handlers
= { nullptr, nullptr };
848 return banner_exchange(true);
849 }).then([this] (auto&& ret
) {
850 auto [_peer_type
, _my_addr_from_peer
] = std::move(ret
);
851 if (conn
.get_peer_type() != _peer_type
) {
852 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
853 conn
, ceph_entity_type_name(conn
.get_peer_type()),
854 ceph_entity_type_name(_peer_type
));
855 abort_in_close(*this, true);
857 if (unlikely(state
!= state_t::CONNECTING
)) {
858 logger().debug("{} triggered {} during banner_exchange(), abort",
859 conn
, get_state_name(state
));
862 socket
->learn_ephemeral_port_as_connector(_my_addr_from_peer
.get_port());
863 if (unlikely(_my_addr_from_peer
.is_legacy())) {
864 logger().warn("{} peer sent a legacy address for me: {}",
865 conn
, _my_addr_from_peer
);
866 throw std::system_error(
867 make_error_code(crimson::net::error::bad_peer_address
));
869 _my_addr_from_peer
.set_type(entity_addr_t::TYPE_MSGR2
);
870 return messenger
.learned_addr(_my_addr_from_peer
, conn
);
872 return client_auth();
874 if (server_cookie
== 0) {
875 ceph_assert(connect_seq
== 0);
876 return client_connect();
878 ceph_assert(connect_seq
> 0);
879 return client_reconnect();
881 }).then([this] (next_step_t next
) {
882 if (unlikely(state
!= state_t::CONNECTING
)) {
883 logger().debug("{} triggered {} at the end of execute_connecting()",
884 conn
, get_state_name(state
));
888 case next_step_t::ready
: {
889 logger().info("{} connected:"
890 " gs={}, pgs={}, cs={}, client_cookie={},"
891 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
892 conn
, global_seq
, peer_global_seq
, connect_seq
,
893 client_cookie
, server_cookie
, conn
.in_seq
,
894 conn
.out_seq
, conn
.out_q
.size());
898 case next_step_t::wait
: {
899 logger().info("{} execute_connecting(): going to WAIT", conn
);
904 ceph_abort("impossible next step");
907 }).handle_exception([this] (std::exception_ptr eptr
) {
908 if (state
!= state_t::CONNECTING
) {
909 logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
910 conn
, get_state_name(state
), eptr
);
911 assert(state
== state_t::CLOSING
||
912 state
== state_t::REPLACING
);
916 if (conn
.policy
.server
||
917 (conn
.policy
.standby
&&
918 (!is_queued() && conn
.sent
.empty()))) {
919 logger().info("{} execute_connecting(): fault at {} with nothing to send,"
920 " going to STANDBY -- {}",
921 conn
, get_state_name(state
), eptr
);
924 logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
925 conn
, get_state_name(state
), eptr
);
934 seastar::future
<> ProtocolV2::_auth_bad_method(int r
)
936 // _auth_bad_method() logic
938 auto [allowed_methods
, allowed_modes
] =
939 messenger
.get_auth_server()->get_supported_auth_methods(conn
.get_peer_type());
940 auto bad_method
= AuthBadMethodFrame::Encode(
941 auth_meta
->auth_method
, r
, allowed_methods
, allowed_modes
);
942 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
943 "allowed_methods={}, allowed_modes={})",
944 conn
, auth_meta
->auth_method
, cpp_strerror(r
),
945 allowed_methods
, allowed_modes
);
946 return write_frame(bad_method
).then([this] {
947 return server_auth();
951 seastar::future
<> ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
953 // _handle_auth_request() logic
954 ceph_assert(messenger
.get_auth_server());
956 int r
= messenger
.get_auth_server()->handle_auth_request(
957 conn
.shared_from_this(), auth_meta
,
958 more
, auth_meta
->auth_method
, auth_payload
,
963 auto auth_done
= AuthDoneFrame::Encode(
964 conn
.peer_global_id
, auth_meta
->con_mode
, reply
);
965 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
966 conn
, conn
.peer_global_id
,
967 ceph_con_mode_name(auth_meta
->con_mode
), reply
.length());
968 return write_frame(auth_done
).then([this] {
969 ceph_assert(auth_meta
);
970 session_stream_handlers
= ceph::crypto::onwire::rxtx_t::create_handler_pair(
971 nullptr, *auth_meta
, tx_frame_asm
.get_is_rev1(), true);
972 return finish_auth();
977 auto more
= AuthReplyMoreFrame::Encode(reply
);
978 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
979 conn
, reply
.length());
980 return write_frame(more
).then([this] {
981 return read_main_preamble();
982 }).then([this] (Tag tag
) {
983 expect_tag(Tag::AUTH_REQUEST_MORE
, tag
, conn
, __func__
);
984 return read_frame_payload();
986 auto auth_more
= AuthRequestMoreFrame::Decode(rx_segments_data
.back());
987 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
988 conn
, auth_more
.auth_payload().length());
989 return _handle_auth_request(auth_more
.auth_payload(), true);
993 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn
);
995 return seastar::now();
998 logger().warn("{} auth_server handle_auth_request returned {}", conn
, r
);
999 return _auth_bad_method(r
);
1004 seastar::future
<> ProtocolV2::server_auth()
1006 return read_main_preamble()
1007 .then([this] (Tag tag
) {
1008 expect_tag(Tag::AUTH_REQUEST
, tag
, conn
, __func__
);
1009 return read_frame_payload();
1011 // handle_auth_request() logic
1012 auto request
= AuthRequestFrame::Decode(rx_segments_data
.back());
1013 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1015 conn
, request
.method(), request
.preferred_modes(),
1016 request
.auth_payload().length());
1017 auth_meta
->auth_method
= request
.method();
1018 auth_meta
->con_mode
= messenger
.get_auth_server()->pick_con_mode(
1019 conn
.get_peer_type(), auth_meta
->auth_method
,
1020 request
.preferred_modes());
1021 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
1022 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn
);
1023 return _auth_bad_method(-EOPNOTSUPP
);
1025 return _handle_auth_request(request
.auth_payload(), false);
1029 bool ProtocolV2::validate_peer_name(const entity_name_t
& peer_name
) const
1031 auto my_peer_name
= conn
.get_peer_name();
1032 if (my_peer_name
.type() != peer_name
.type()) {
1035 if (my_peer_name
.num() != entity_name_t::NEW
&&
1036 peer_name
.num() != entity_name_t::NEW
&&
1037 my_peer_name
.num() != peer_name
.num()) {
1043 seastar::future
<ProtocolV2::next_step_t
>
1044 ProtocolV2::send_wait()
1046 auto wait
= WaitFrame::Encode();
1047 logger().debug("{} WRITE WaitFrame", conn
);
1048 return write_frame(wait
).then([] {
1049 return next_step_t::wait
;
1053 seastar::future
<ProtocolV2::next_step_t
>
1054 ProtocolV2::reuse_connection(
1055 ProtocolV2
* existing_proto
, bool do_reset
,
1056 bool reconnect
, uint64_t conn_seq
, uint64_t msg_seq
)
1058 existing_proto
->trigger_replacing(reconnect
,
1061 std::move(auth_meta
),
1062 std::move(session_stream_handlers
),
1065 conn
.get_peer_name(),
1066 connection_features
,
1067 tx_frame_asm
.get_is_rev1(),
1068 rx_frame_asm
.get_is_rev1(),
1071 #ifdef UNIT_TESTS_BUILT
1072 if (conn
.interceptor
) {
1073 conn
.interceptor
->register_conn_replaced(conn
);
1076 // close this connection because all the necessary information is delivered
1077 // to the exisiting connection, and jump to error handling code to abort the
1079 abort_in_close(*this, false);
1080 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1083 seastar::future
<ProtocolV2::next_step_t
>
1084 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn
)
1086 // handle_existing_connection() logic
1087 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1088 existing_conn
->protocol
.get());
1089 ceph_assert(existing_proto
);
1090 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1091 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1092 conn
, global_seq
, peer_global_seq
, connect_seq
,
1093 client_cookie
, server_cookie
,
1094 existing_conn
, get_state_name(existing_proto
->state
),
1095 existing_proto
->global_seq
,
1096 existing_proto
->peer_global_seq
,
1097 existing_proto
->connect_seq
,
1098 existing_proto
->client_cookie
,
1099 existing_proto
->server_cookie
);
1101 if (!validate_peer_name(existing_conn
->get_peer_name())) {
1102 logger().error("{} server_connect: my peer_name doesn't match"
1103 " the existing connection {}, abort", conn
, existing_conn
);
1107 if (existing_proto
->state
== state_t::REPLACING
) {
1108 logger().warn("{} server_connect: racing replace happened while"
1109 " replacing existing connection {}, send wait.",
1110 conn
, *existing_conn
);
1114 if (existing_proto
->peer_global_seq
> peer_global_seq
) {
1115 logger().warn("{} server_connect:"
1116 " this is a stale connection, because peer_global_seq({})"
1117 " < existing->peer_global_seq({}), close this connection"
1118 " in favor of existing connection {}",
1119 conn
, peer_global_seq
,
1120 existing_proto
->peer_global_seq
, *existing_conn
);
1124 if (existing_conn
->policy
.lossy
) {
1125 // existing connection can be thrown out in favor of this one
1126 logger().warn("{} server_connect:"
1127 " existing connection {} is a lossy channel. Close existing in favor of"
1128 " this connection", conn
, *existing_conn
);
1129 execute_establishing(existing_conn
, true);
1130 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1133 if (existing_proto
->server_cookie
!= 0) {
1134 if (existing_proto
->client_cookie
!= client_cookie
) {
1135 // Found previous session
1136 // peer has reset and we're going to reuse the existing connection
1137 // by replacing the socket
1138 logger().warn("{} server_connect:"
1139 " found new session (cs={})"
1140 " when existing {} is with stale session (cs={}, ss={}),"
1141 " peer must have reset",
1142 conn
, client_cookie
,
1143 *existing_conn
, existing_proto
->client_cookie
,
1144 existing_proto
->server_cookie
);
1145 return reuse_connection(existing_proto
, conn
.policy
.resetcheck
);
1147 // session establishment interrupted between client_ident and server_ident,
1149 logger().warn("{} server_connect: found client session with existing {}"
1150 " matched (cs={}, ss={}), continuing session establishment",
1151 conn
, *existing_conn
, client_cookie
, existing_proto
->server_cookie
);
1152 return reuse_connection(existing_proto
);
1155 // Looks like a connection race: server and client are both connecting to
1156 // each other at the same time.
1157 if (existing_proto
->client_cookie
!= client_cookie
) {
1158 if (existing_conn
->peer_wins()) {
1159 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1160 " and win, reusing existing {}",
1161 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1162 return reuse_connection(existing_proto
);
1164 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1165 " and lose to existing {}, ask client to wait",
1166 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1167 return existing_conn
->keepalive().then([this] {
1172 logger().warn("{} server_connect: found client session with existing {}"
1173 " matched (cs={}, ss={}), continuing session establishment",
1174 conn
, *existing_conn
, client_cookie
, existing_proto
->server_cookie
);
1175 return reuse_connection(existing_proto
);
1180 seastar::future
<ProtocolV2::next_step_t
>
1181 ProtocolV2::server_connect()
1183 return read_frame_payload().then([this] {
1184 // handle_client_ident() logic
1185 auto client_ident
= ClientIdentFrame::Decode(rx_segments_data
.back());
1186 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1187 " gid={}, gs={}, features_supported={},"
1188 " features_required={}, flags={}, cookie={}",
1189 conn
, client_ident
.addrs(), client_ident
.target_addr(),
1190 client_ident
.gid(), client_ident
.global_seq(),
1191 client_ident
.supported_features(),
1192 client_ident
.required_features(),
1193 client_ident
.flags(), client_ident
.cookie());
1195 if (client_ident
.addrs().empty() ||
1196 client_ident
.addrs().front() == entity_addr_t()) {
1197 logger().warn("{} oops, client_ident.addrs() is empty", conn
);
1198 throw std::system_error(
1199 make_error_code(crimson::net::error::bad_peer_address
));
1201 if (!messenger
.get_myaddrs().contains(client_ident
.target_addr())) {
1202 logger().warn("{} peer is trying to reach {} which is not us ({})",
1203 conn
, client_ident
.target_addr(), messenger
.get_myaddrs());
1204 throw std::system_error(
1205 make_error_code(crimson::net::error::bad_peer_address
));
1207 // TODO: change peer_addr to entity_addrvec_t
1208 entity_addr_t paddr
= client_ident
.addrs().front();
1209 if ((paddr
.is_msgr2() || paddr
.is_any()) &&
1210 paddr
.is_same_host(conn
.target_addr
)) {
1213 logger().warn("{} peer's address {} is not v2 or not the same host with {}",
1214 conn
, paddr
, conn
.target_addr
);
1215 throw std::system_error(
1216 make_error_code(crimson::net::error::bad_peer_address
));
1218 conn
.peer_addr
= paddr
;
1219 logger().debug("{} UPDATE: peer_addr={}", conn
, conn
.peer_addr
);
1220 conn
.target_addr
= conn
.peer_addr
;
1221 if (!conn
.policy
.lossy
&& !conn
.policy
.server
&& conn
.target_addr
.get_port() <= 0) {
1222 logger().warn("{} we don't know how to reconnect to peer {}",
1223 conn
, conn
.target_addr
);
1224 throw std::system_error(
1225 make_error_code(crimson::net::error::bad_peer_address
));
1228 if (conn
.get_peer_id() != entity_name_t::NEW
&&
1229 conn
.get_peer_id() != client_ident
.gid()) {
1230 logger().error("{} client_ident peer_id ({}) does not match"
1231 " what it should be ({}) during accepting, abort",
1232 conn
, client_ident
.gid(), conn
.get_peer_id());
1235 conn
.set_peer_id(client_ident
.gid());
1236 client_cookie
= client_ident
.cookie();
1238 uint64_t feat_missing
=
1239 (conn
.policy
.features_required
| msgr2_required
) &
1240 ~(uint64_t)client_ident
.supported_features();
1242 auto ident_missing_features
= IdentMissingFeaturesFrame::Encode(feat_missing
);
1243 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1244 conn
, feat_missing
);
1245 return write_frame(ident_missing_features
).then([] {
1246 return next_step_t::wait
;
1249 connection_features
=
1250 client_ident
.supported_features() & conn
.policy
.features_supported
;
1251 logger().debug("{} UPDATE: connection_features={}", conn
, connection_features
);
1253 peer_global_seq
= client_ident
.global_seq();
1255 // Looks good so far, let's check if there is already an existing connection
1258 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1260 if (existing_conn
) {
1261 if (existing_conn
->protocol
->proto_type
!= proto_t::v2
) {
1262 logger().warn("{} existing connection {} proto version is {}, close existing",
1263 conn
, *existing_conn
,
1264 static_cast<int>(existing_conn
->protocol
->proto_type
));
1265 // should unregister the existing from msgr atomically
1266 // NOTE: this is following async messenger logic, but we may miss the reset event.
1267 execute_establishing(existing_conn
, false);
1268 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1270 return handle_existing_connection(existing_conn
);
1273 execute_establishing(nullptr, true);
1274 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1279 seastar::future
<ProtocolV2::next_step_t
>
1280 ProtocolV2::read_reconnect()
1282 return read_main_preamble()
1283 .then([this] (Tag tag
) {
1284 expect_tag(Tag::SESSION_RECONNECT
, tag
, conn
, "read_reconnect");
1285 return server_reconnect();
1289 seastar::future
<ProtocolV2::next_step_t
>
1290 ProtocolV2::send_retry(uint64_t connect_seq
)
1292 auto retry
= RetryFrame::Encode(connect_seq
);
1293 logger().warn("{} WRITE RetryFrame: cs={}", conn
, connect_seq
);
1294 return write_frame(retry
).then([this] {
1295 return read_reconnect();
1299 seastar::future
<ProtocolV2::next_step_t
>
1300 ProtocolV2::send_retry_global(uint64_t global_seq
)
1302 auto retry
= RetryGlobalFrame::Encode(global_seq
);
1303 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn
, global_seq
);
1304 return write_frame(retry
).then([this] {
1305 return read_reconnect();
1309 seastar::future
<ProtocolV2::next_step_t
>
1310 ProtocolV2::send_reset(bool full
)
1312 auto reset
= ResetFrame::Encode(full
);
1313 logger().warn("{} WRITE ResetFrame: full={}", conn
, full
);
1314 return write_frame(reset
).then([this] {
1315 return read_main_preamble();
1316 }).then([this] (Tag tag
) {
1317 expect_tag(Tag::CLIENT_IDENT
, tag
, conn
, "post_send_reset");
1318 return server_connect();
1322 seastar::future
<ProtocolV2::next_step_t
>
1323 ProtocolV2::server_reconnect()
1325 return read_frame_payload().then([this] {
1326 // handle_reconnect() logic
1327 auto reconnect
= ReconnectFrame::Decode(rx_segments_data
.back());
1329 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1330 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1331 conn
, reconnect
.addrs(),
1332 reconnect
.client_cookie(), reconnect
.server_cookie(),
1333 reconnect
.global_seq(), reconnect
.connect_seq(),
1334 reconnect
.msg_seq());
1336 // can peer_addrs be changed on-the-fly?
1337 // TODO: change peer_addr to entity_addrvec_t
1338 entity_addr_t paddr
= reconnect
.addrs().front();
1339 if (paddr
.is_msgr2() || paddr
.is_any()) {
1342 logger().warn("{} peer's address {} is not v2", conn
, paddr
);
1343 throw std::system_error(
1344 make_error_code(crimson::net::error::bad_peer_address
));
1346 if (conn
.peer_addr
== entity_addr_t()) {
1347 conn
.peer_addr
= paddr
;
1348 } else if (conn
.peer_addr
!= paddr
) {
1349 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1350 " reconnect failed",
1351 conn
, paddr
, conn
.peer_addr
);
1352 throw std::system_error(
1353 make_error_code(crimson::net::error::bad_peer_address
));
1355 peer_global_seq
= reconnect
.global_seq();
1357 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1359 if (!existing_conn
) {
1360 // there is no existing connection therefore cannot reconnect to previous
1362 logger().warn("{} server_reconnect: no existing connection from address {},"
1363 " reseting client", conn
, conn
.peer_addr
);
1364 return send_reset(true);
1367 if (existing_conn
->protocol
->proto_type
!= proto_t::v2
) {
1368 logger().warn("{} server_reconnect: existing connection {} proto version is {},"
1369 "close existing and reset client.",
1370 conn
, *existing_conn
,
1371 static_cast<int>(existing_conn
->protocol
->proto_type
));
1372 // NOTE: this is following async messenger logic, but we may miss the reset event.
1373 existing_conn
->mark_down();
1374 return send_reset(true);
1377 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1378 existing_conn
->protocol
.get());
1379 ceph_assert(existing_proto
);
1380 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1381 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1382 conn
, global_seq
, peer_global_seq
, reconnect
.connect_seq(),
1383 reconnect
.client_cookie(), reconnect
.server_cookie(),
1385 get_state_name(existing_proto
->state
),
1386 existing_proto
->global_seq
,
1387 existing_proto
->peer_global_seq
,
1388 existing_proto
->connect_seq
,
1389 existing_proto
->client_cookie
,
1390 existing_proto
->server_cookie
);
1392 if (!validate_peer_name(existing_conn
->get_peer_name())) {
1393 logger().error("{} server_reconnect: my peer_name doesn't match"
1394 " the existing connection {}, abort", conn
, existing_conn
);
1398 if (existing_proto
->state
== state_t::REPLACING
) {
1399 logger().warn("{} server_reconnect: racing replace happened while "
1400 " replacing existing connection {}, retry global.",
1401 conn
, *existing_conn
);
1402 return send_retry_global(existing_proto
->peer_global_seq
);
1405 if (existing_proto
->client_cookie
!= reconnect
.client_cookie()) {
1406 logger().warn("{} server_reconnect:"
1407 " client_cookie mismatch with existing connection {},"
1408 " cc={} rcc={}. I must have reset, reseting client.",
1409 conn
, *existing_conn
,
1410 existing_proto
->client_cookie
, reconnect
.client_cookie());
1411 return send_reset(conn
.policy
.resetcheck
);
1412 } else if (existing_proto
->server_cookie
== 0) {
1413 // this happens when:
1414 // - a connects to b
1415 // - a sends client_ident
1416 // - b gets client_ident, sends server_ident and sets cookie X
1417 // - connection fault
1418 // - b reconnects to a with cookie X, connect_seq=1
1419 // - a has cookie==0
1420 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1421 " server_ident with existing connection {}."
1422 " Asking peer to resume session establishment",
1423 conn
, existing_proto
->client_cookie
, *existing_conn
);
1424 return send_reset(false);
1427 if (existing_proto
->peer_global_seq
> reconnect
.global_seq()) {
1428 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1429 " with existing connection {},"
1430 " ask client to retry global",
1431 conn
, existing_proto
->peer_global_seq
,
1432 reconnect
.global_seq(), *existing_conn
);
1433 return send_retry_global(existing_proto
->peer_global_seq
);
1436 if (existing_proto
->connect_seq
> reconnect
.connect_seq()) {
1437 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1438 " with existing connection {}, ask client to retry",
1439 conn
, reconnect
.connect_seq(),
1440 existing_proto
->connect_seq
, *existing_conn
);
1441 return send_retry(existing_proto
->connect_seq
);
1442 } else if (existing_proto
->connect_seq
== reconnect
.connect_seq()) {
1443 // reconnect race: both peers are sending reconnect messages
1444 if (existing_conn
->peer_wins()) {
1445 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1446 " and win, reusing existing {}",
1447 conn
, reconnect
.connect_seq(), *existing_conn
);
1448 return reuse_connection(
1449 existing_proto
, false,
1450 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1452 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1453 " and lose to existing {}, ask client to wait",
1454 conn
, reconnect
.connect_seq(), *existing_conn
);
1457 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1458 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1459 " reusing existing {}",
1460 conn
, existing_proto
->connect_seq
,
1461 reconnect
.connect_seq(), *existing_conn
);
1462 return reuse_connection(
1463 existing_proto
, false,
1464 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1469 void ProtocolV2::execute_accepting()
1471 trigger_state(state_t::ACCEPTING
, write_state_t::none
, false);
1472 gate
.dispatch_in_background("execute_accepting", *this, [this] {
1473 return seastar::futurize_invoke([this] {
1474 INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED
);
1475 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
1476 session_stream_handlers
= { nullptr, nullptr };
1478 return banner_exchange(false);
1479 }).then([this] (auto&& ret
) {
1480 auto [_peer_type
, _my_addr_from_peer
] = std::move(ret
);
1481 ceph_assert(conn
.get_peer_type() == 0);
1482 conn
.set_peer_type(_peer_type
);
1484 conn
.policy
= messenger
.get_policy(_peer_type
);
1485 logger().info("{} UPDATE: peer_type={},"
1486 " policy(lossy={} server={} standby={} resetcheck={})",
1487 conn
, ceph_entity_type_name(_peer_type
),
1488 conn
.policy
.lossy
, conn
.policy
.server
,
1489 conn
.policy
.standby
, conn
.policy
.resetcheck
);
1490 if (messenger
.get_myaddr().get_port() != _my_addr_from_peer
.get_port() ||
1491 messenger
.get_myaddr().get_nonce() != _my_addr_from_peer
.get_nonce()) {
1492 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1493 conn
, _my_addr_from_peer
, messenger
.get_myaddr());
1494 throw std::system_error(
1495 make_error_code(crimson::net::error::bad_peer_address
));
1497 return messenger
.learned_addr(_my_addr_from_peer
, conn
);
1499 return server_auth();
1501 return read_main_preamble();
1502 }).then([this] (Tag tag
) {
1504 case Tag::CLIENT_IDENT
:
1505 return server_connect();
1506 case Tag::SESSION_RECONNECT
:
1507 return server_reconnect();
1509 unexpected_tag(tag
, conn
, "post_server_auth");
1510 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1513 }).then([this] (next_step_t next
) {
1515 case next_step_t::ready
:
1516 assert(state
!= state_t::ACCEPTING
);
1518 case next_step_t::wait
:
1519 if (unlikely(state
!= state_t::ACCEPTING
)) {
1520 logger().debug("{} triggered {} at the end of execute_accepting()",
1521 conn
, get_state_name(state
));
1524 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn
);
1525 execute_server_wait();
1528 ceph_abort("impossible next step");
1530 }).handle_exception([this] (std::exception_ptr eptr
) {
1531 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1532 conn
, get_state_name(state
), eptr
);
1538 // CONNECTING or ACCEPTING state
1540 seastar::future
<> ProtocolV2::finish_auth()
1542 ceph_assert(auth_meta
);
1544 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1545 auth_meta
->session_key
.hmac_sha256(nullptr, rxbuf
);
1546 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1547 ceph_assert(record_io
);
1550 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn
, sig
);
1551 return write_frame(sig_frame
).then([this] {
1552 return read_main_preamble();
1553 }).then([this] (Tag tag
) {
1554 expect_tag(Tag::AUTH_SIGNATURE
, tag
, conn
, "post_finish_auth");
1555 return read_frame_payload();
1557 // handle_auth_signature() logic
1558 auto sig_frame
= AuthSignatureFrame::Decode(rx_segments_data
.back());
1559 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn
, sig_frame
.signature());
1561 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
1562 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(nullptr, txbuf
);
1563 if (sig_frame
.signature() != actual_tx_sig
) {
1564 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1565 " sig_frame.signature()={}",
1566 conn
, actual_tx_sig
, sig_frame
.signature());
1575 void ProtocolV2::execute_establishing(
1576 SocketConnectionRef existing_conn
, bool dispatch_reset
) {
1577 if (unlikely(state
!= state_t::ACCEPTING
)) {
1578 logger().debug("{} triggered {} before execute_establishing()",
1579 conn
, get_state_name(state
));
1583 auto accept_me
= [this] {
1584 messenger
.register_conn(
1585 seastar::static_pointer_cast
<SocketConnection
>(
1586 conn
.shared_from_this()));
1587 messenger
.unaccept_conn(
1588 seastar::static_pointer_cast
<SocketConnection
>(
1589 conn
.shared_from_this()));
1592 trigger_state(state_t::ESTABLISHING
, write_state_t::delay
, false);
1593 if (existing_conn
) {
1594 existing_conn
->protocol
->close(dispatch_reset
, std::move(accept_me
));
1595 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1596 logger().warn("{} triggered {} during execute_establishing(), "
1597 "the accept event will not be delivered!",
1598 conn
, get_state_name(state
));
1605 dispatchers
.ms_handle_accept(
1606 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1608 gated_execute("execute_establishing", [this] {
1609 return seastar::futurize_invoke([this] {
1610 return send_server_ident();
1612 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1613 logger().debug("{} triggered {} at the end of execute_establishing()",
1614 conn
, get_state_name(state
));
1617 logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
1618 " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
1619 conn
, global_seq
, peer_global_seq
, connect_seq
,
1620 client_cookie
, server_cookie
, conn
.in_seq
,
1621 conn
.out_seq
, conn
.out_q
.size());
1622 execute_ready(false);
1623 }).handle_exception([this] (std::exception_ptr eptr
) {
1624 if (state
!= state_t::ESTABLISHING
) {
1625 logger().info("{} execute_establishing() protocol aborted at {} -- {}",
1626 conn
, get_state_name(state
), eptr
);
1627 assert(state
== state_t::CLOSING
||
1628 state
== state_t::REPLACING
);
1631 fault(false, "execute_establishing()", eptr
);
1636 // ESTABLISHING or REPLACING state
1639 ProtocolV2::send_server_ident()
1641 // send_server_ident() logic
1643 // refered to async-conn v2: not assign gs to global_seq
1644 return messenger
.get_global_seq().then([this] (auto gs
) {
1645 logger().debug("{} UPDATE: gs={} for server ident", conn
, global_seq
);
1647 // this is required for the case when this connection is being replaced
1651 if (!conn
.policy
.lossy
) {
1652 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1656 if (conn
.policy
.lossy
) {
1657 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
1660 auto server_ident
= ServerIdentFrame::Encode(
1661 messenger
.get_myaddrs(),
1662 messenger
.get_myname().num(),
1664 conn
.policy
.features_supported
,
1665 conn
.policy
.features_required
| msgr2_required
,
1669 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1670 " gs={}, features_supported={}, features_required={},"
1671 " flags={}, cookie={}",
1672 conn
, messenger
.get_myaddrs(), messenger
.get_myname().num(),
1673 gs
, conn
.policy
.features_supported
,
1674 conn
.policy
.features_required
| msgr2_required
,
1675 flags
, server_cookie
);
1677 conn
.set_features(connection_features
);
1679 return write_frame(server_ident
);
1685 void ProtocolV2::trigger_replacing(bool reconnect
,
1687 SocketRef
&& new_socket
,
1688 AuthConnectionMetaRef
&& new_auth_meta
,
1689 ceph::crypto::onwire::rxtx_t new_rxtx
,
1690 uint64_t new_peer_global_seq
,
1691 uint64_t new_client_cookie
,
1692 entity_name_t new_peer_name
,
1693 uint64_t new_conn_features
,
1696 uint64_t new_connect_seq
,
1697 uint64_t new_msg_seq
)
1699 trigger_state(state_t::REPLACING
, write_state_t::delay
, false);
1703 dispatchers
.ms_handle_accept(
1704 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1705 gate
.dispatch_in_background("trigger_replacing", *this,
1709 new_socket
= std::move(new_socket
),
1710 new_auth_meta
= std::move(new_auth_meta
),
1711 new_rxtx
= std::move(new_rxtx
),
1712 tx_is_rev1
, rx_is_rev1
,
1713 new_client_cookie
, new_peer_name
,
1714 new_conn_features
, new_peer_global_seq
,
1715 new_connect_seq
, new_msg_seq
] () mutable {
1716 return wait_write_exit().then([this, do_reset
] {
1718 reset_session(true);
1720 protocol_timer
.cancel();
1721 return execution_done
.get_future();
1724 new_socket
= std::move(new_socket
),
1725 new_auth_meta
= std::move(new_auth_meta
),
1726 new_rxtx
= std::move(new_rxtx
),
1727 tx_is_rev1
, rx_is_rev1
,
1728 new_client_cookie
, new_peer_name
,
1729 new_conn_features
, new_peer_global_seq
,
1730 new_connect_seq
, new_msg_seq
] () mutable {
1731 if (unlikely(state
!= state_t::REPLACING
)) {
1732 return new_socket
->close().then([sock
= std::move(new_socket
)] {
1738 gate
.dispatch_in_background("close_socket_replacing", *this,
1739 [sock
= std::move(socket
)] () mutable {
1740 return sock
->close().then([sock
= std::move(sock
)] {});
1743 socket
= std::move(new_socket
);
1744 auth_meta
= std::move(new_auth_meta
);
1745 session_stream_handlers
= std::move(new_rxtx
);
1747 peer_global_seq
= new_peer_global_seq
;
1750 connect_seq
= new_connect_seq
;
1751 // send_reconnect_ok() logic
1752 requeue_up_to(new_msg_seq
);
1753 auto reconnect_ok
= ReconnectOkFrame::Encode(conn
.in_seq
);
1754 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn
, conn
.in_seq
);
1755 return write_frame(reconnect_ok
);
1757 client_cookie
= new_client_cookie
;
1758 assert(conn
.get_peer_type() == new_peer_name
.type());
1759 if (conn
.get_peer_id() == entity_name_t::NEW
) {
1760 conn
.set_peer_id(new_peer_name
.num());
1762 connection_features
= new_conn_features
;
1763 tx_frame_asm
.set_is_rev1(tx_is_rev1
);
1764 rx_frame_asm
.set_is_rev1(rx_is_rev1
);
1765 return send_server_ident();
1767 }).then([this, reconnect
] {
1768 if (unlikely(state
!= state_t::REPLACING
)) {
1769 logger().debug("{} triggered {} at the end of trigger_replacing()",
1770 conn
, get_state_name(state
));
1773 logger().info("{} replaced ({}):"
1774 " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
1775 " in_seq={}, out_seq={}, out_q={}",
1776 conn
, reconnect
? "reconnected" : "connected",
1777 global_seq
, peer_global_seq
, connect_seq
, client_cookie
,
1778 server_cookie
, conn
.in_seq
, conn
.out_seq
, conn
.out_q
.size());
1779 execute_ready(false);
1780 }).handle_exception([this] (std::exception_ptr eptr
) {
1781 if (state
!= state_t::REPLACING
) {
1782 logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
1783 conn
, get_state_name(state
), eptr
);
1784 assert(state
== state_t::CLOSING
);
1787 fault(true, "trigger_replacing()", eptr
);
1794 ceph::bufferlist
ProtocolV2::do_sweep_messages(
1795 const std::deque
<MessageRef
>& msgs
,
1797 bool require_keepalive
,
1798 std::optional
<utime_t
> _keepalive_ack
,
1801 ceph::bufferlist bl
;
1803 if (unlikely(require_keepalive
)) {
1804 auto keepalive_frame
= KeepAliveFrame::Encode();
1805 bl
.append(keepalive_frame
.get_buffer(tx_frame_asm
));
1806 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2
, bp_type_t::WRITE
);
1809 if (unlikely(_keepalive_ack
.has_value())) {
1810 auto keepalive_ack_frame
= KeepAliveFrameAck::Encode(*_keepalive_ack
);
1811 bl
.append(keepalive_ack_frame
.get_buffer(tx_frame_asm
));
1812 INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK
, bp_type_t::WRITE
);
1815 if (require_ack
&& !num_msgs
) {
1816 auto ack_frame
= AckFrame::Encode(conn
.in_seq
);
1817 bl
.append(ack_frame
.get_buffer(tx_frame_asm
));
1818 INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK
, bp_type_t::WRITE
);
1821 std::for_each(msgs
.begin(), msgs
.begin()+num_msgs
, [this, &bl
](const MessageRef
& msg
) {
1822 // TODO: move to common code
1824 msg
->get_header().src
= messenger
.get_myname();
1826 msg
->encode(conn
.features
, 0);
1828 ceph_assert(!msg
->get_seq() && "message already has seq");
1829 msg
->set_seq(++conn
.out_seq
);
1831 ceph_msg_header
&header
= msg
->get_header();
1832 ceph_msg_footer
&footer
= msg
->get_footer();
1834 ceph_msg_header2 header2
{header
.seq
, header
.tid
,
1835 header
.type
, header
.priority
,
1837 init_le32(0), header
.data_off
,
1838 init_le64(conn
.in_seq
),
1839 footer
.flags
, header
.compat_version
,
1842 auto message
= MessageFrame::Encode(header2
,
1843 msg
->get_payload(), msg
->get_middle(), msg
->get_data());
1844 logger().debug("{} --> #{} === {} ({})",
1845 conn
, msg
->get_seq(), *msg
, msg
->get_type());
1846 bl
.append(message
.get_buffer(tx_frame_asm
));
1847 INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE
, bp_type_t::WRITE
);
1853 seastar::future
<> ProtocolV2::read_message(utime_t throttle_stamp
)
1855 return read_frame_payload()
1856 .then([this, throttle_stamp
] {
1857 utime_t recv_stamp
{seastar::lowres_system_clock::now()};
1859 // we need to get the size before std::moving segments data
1860 const size_t cur_msg_size
= get_current_msg_size();
1861 auto msg_frame
= MessageFrame::Decode(rx_segments_data
);
1862 // XXX: paranoid copy just to avoid oops
1863 ceph_msg_header2 current_header
= msg_frame
.header();
1865 logger().trace("{} got {} + {} + {} byte message,"
1866 " envelope type={} src={} off={} seq={}",
1867 conn
, msg_frame
.front_len(), msg_frame
.middle_len(),
1868 msg_frame
.data_len(), current_header
.type
, conn
.get_peer_name(),
1869 current_header
.data_off
, current_header
.seq
);
1871 ceph_msg_header header
{current_header
.seq
,
1873 current_header
.type
,
1874 current_header
.priority
,
1875 current_header
.version
,
1876 init_le32(msg_frame
.front_len()),
1877 init_le32(msg_frame
.middle_len()),
1878 init_le32(msg_frame
.data_len()),
1879 current_header
.data_off
,
1880 conn
.get_peer_name(),
1881 current_header
.compat_version
,
1882 current_header
.reserved
,
1884 ceph_msg_footer footer
{init_le32(0), init_le32(0),
1885 init_le32(0), init_le64(0), current_header
.flags
};
1887 auto conn_ref
= seastar::static_pointer_cast
<SocketConnection
>(
1888 conn
.shared_from_this());
1889 Message
*message
= decode_message(nullptr, 0, header
, footer
,
1890 msg_frame
.front(), msg_frame
.middle(), msg_frame
.data(), conn_ref
);
1892 logger().warn("{} decode message failed", conn
);
1896 // store reservation size in message, so we don't get confused
1897 // by messages entering the dispatch queue through other paths.
1898 message
->set_dispatch_throttle_size(cur_msg_size
);
1900 message
->set_throttle_stamp(throttle_stamp
);
1901 message
->set_recv_stamp(recv_stamp
);
1902 message
->set_recv_complete_stamp(utime_t
{seastar::lowres_system_clock::now()});
1904 // check received seq#. if it is old, drop the message.
1905 // note that incoming messages may skip ahead. this is convenient for the
1906 // client side queueing because messages can't be renumbered, but the (kernel)
1907 // client will occasionally pull a message out of the sent queue to send
1908 // elsewhere. in that case it doesn't matter if we "got" it or not.
1909 uint64_t cur_seq
= conn
.in_seq
;
1910 if (message
->get_seq() <= cur_seq
) {
1911 logger().error("{} got old message {} <= {} {}, discarding",
1912 conn
, message
->get_seq(), cur_seq
, *message
);
1913 if (HAVE_FEATURE(conn
.features
, RECONNECT_SEQ
) &&
1914 local_conf()->ms_die_on_old_message
) {
1915 ceph_assert(0 == "old msgs despite reconnect_seq feature");
1917 return seastar::now();
1918 } else if (message
->get_seq() > cur_seq
+ 1) {
1919 logger().error("{} missed message? skipped from seq {} to {}",
1920 conn
, cur_seq
, message
->get_seq());
1921 if (local_conf()->ms_die_on_skipped_message
) {
1922 ceph_assert(0 == "skipped incoming seq");
1926 // note last received message.
1927 conn
.in_seq
= message
->get_seq();
1928 logger().debug("{} <== #{} === {} ({})",
1929 conn
, message
->get_seq(), *message
, message
->get_type());
1931 ack_writes(current_header
.ack_seq
);
1933 // TODO: change MessageRef with seastar::shared_ptr
1934 auto msg_ref
= MessageRef
{message
, false};
1935 // throttle the reading process by the returned future
1936 return dispatchers
.ms_dispatch(conn_ref
, std::move(msg_ref
));
1940 void ProtocolV2::execute_ready(bool dispatch_connect
)
1942 assert(conn
.policy
.lossy
|| (client_cookie
!= 0 && server_cookie
!= 0));
1943 trigger_state(state_t::READY
, write_state_t::open
, false);
1944 if (dispatch_connect
) {
1945 dispatchers
.ms_handle_connect(
1946 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
1948 #ifdef UNIT_TESTS_BUILT
1949 if (conn
.interceptor
) {
1950 conn
.interceptor
->register_conn_ready(conn
);
1953 gated_execute("execute_ready", [this] {
1954 protocol_timer
.cancel();
1955 return seastar::keep_doing([this] {
1956 return read_main_preamble()
1957 .then([this] (Tag tag
) {
1959 case Tag::MESSAGE
: {
1960 return seastar::futurize_invoke([this] {
1961 // throttle_message() logic
1962 if (!conn
.policy
.throttler_messages
) {
1963 return seastar::now();
1965 // TODO: message throttler
1967 return seastar::now();
1969 // throttle_bytes() logic
1970 if (!conn
.policy
.throttler_bytes
) {
1971 return seastar::now();
1973 size_t cur_msg_size
= get_current_msg_size();
1974 if (!cur_msg_size
) {
1975 return seastar::now();
1977 logger().trace("{} wants {} bytes from policy throttler {}/{}",
1979 conn
.policy
.throttler_bytes
->get_current(),
1980 conn
.policy
.throttler_bytes
->get_max());
1981 return conn
.policy
.throttler_bytes
->get(cur_msg_size
);
1983 // TODO: throttle_dispatch_queue() logic
1984 utime_t throttle_stamp
{seastar::lowres_system_clock::now()};
1985 return read_message(throttle_stamp
);
1989 return read_frame_payload().then([this] {
1990 // handle_message_ack() logic
1991 auto ack
= AckFrame::Decode(rx_segments_data
.back());
1992 logger().debug("{} GOT AckFrame: seq={}", conn
, ack
.seq());
1993 ack_writes(ack
.seq());
1995 case Tag::KEEPALIVE2
:
1996 return read_frame_payload().then([this] {
1997 // handle_keepalive2() logic
1998 auto keepalive_frame
= KeepAliveFrame::Decode(rx_segments_data
.back());
1999 logger().debug("{} GOT KeepAliveFrame: timestamp={}",
2000 conn
, keepalive_frame
.timestamp());
2001 notify_keepalive_ack(keepalive_frame
.timestamp());
2002 conn
.set_last_keepalive(seastar::lowres_system_clock::now());
2004 case Tag::KEEPALIVE2_ACK
:
2005 return read_frame_payload().then([this] {
2006 // handle_keepalive2_ack() logic
2007 auto keepalive_ack_frame
= KeepAliveFrameAck::Decode(rx_segments_data
.back());
2008 conn
.set_last_keepalive_ack(
2009 seastar::lowres_system_clock::time_point
{keepalive_ack_frame
.timestamp()});
2010 logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
2011 conn
, conn
.last_keepalive_ack
);
2014 unexpected_tag(tag
, conn
, "execute_ready");
2015 return seastar::now();
2019 }).handle_exception([this] (std::exception_ptr eptr
) {
2020 if (state
!= state_t::READY
) {
2021 logger().info("{} execute_ready(): protocol aborted at {} -- {}",
2022 conn
, get_state_name(state
), eptr
);
2023 assert(state
== state_t::REPLACING
||
2024 state
== state_t::CLOSING
);
2027 fault(false, "execute_ready()", eptr
);
2034 void ProtocolV2::execute_standby()
2036 trigger_state(state_t::STANDBY
, write_state_t::delay
, true);
2042 void ProtocolV2::notify_write()
2044 if (unlikely(state
== state_t::STANDBY
&& !conn
.policy
.server
)) {
2045 logger().info("{} notify_write(): at {}, going to CONNECTING",
2046 conn
, get_state_name(state
));
2047 execute_connecting();
2053 void ProtocolV2::execute_wait(bool max_backoff
)
2055 trigger_state(state_t::WAIT
, write_state_t::delay
, true);
2059 gated_execute("execute_wait", [this, max_backoff
] {
2060 double backoff
= protocol_timer
.last_dur();
2062 backoff
= local_conf().get_val
<double>("ms_max_backoff");
2063 } else if (backoff
> 0) {
2064 backoff
= std::min(local_conf().get_val
<double>("ms_max_backoff"), 2 * backoff
);
2066 backoff
= local_conf().get_val
<double>("ms_initial_backoff");
2068 return protocol_timer
.backoff(backoff
).then([this] {
2069 if (unlikely(state
!= state_t::WAIT
)) {
2070 logger().debug("{} triggered {} at the end of execute_wait()",
2071 conn
, get_state_name(state
));
2074 logger().info("{} execute_wait(): going to CONNECTING", conn
);
2075 execute_connecting();
2076 }).handle_exception([this] (std::exception_ptr eptr
) {
2077 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2078 conn
, get_state_name(state
), eptr
);
2079 assert(state
== state_t::REPLACING
||
2080 state
== state_t::CLOSING
);
2085 // SERVER_WAIT state
2087 void ProtocolV2::execute_server_wait()
2089 trigger_state(state_t::SERVER_WAIT
, write_state_t::delay
, false);
2090 gated_execute("execute_server_wait", [this] {
2091 return read_exactly(1).then([this] (auto bl
) {
2092 logger().warn("{} SERVER_WAIT got read, abort", conn
);
2094 }).handle_exception([this] (std::exception_ptr eptr
) {
2095 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2096 conn
, get_state_name(state
), eptr
);
2104 void ProtocolV2::trigger_close()
2106 messenger
.closing_conn(
2107 seastar::static_pointer_cast
<SocketConnection
>(
2108 conn
.shared_from_this()));
2110 if (state
== state_t::ACCEPTING
|| state
== state_t::SERVER_WAIT
) {
2111 messenger
.unaccept_conn(
2112 seastar::static_pointer_cast
<SocketConnection
>(
2113 conn
.shared_from_this()));
2114 } else if (state
>= state_t::ESTABLISHING
&& state
< state_t::CLOSING
) {
2115 messenger
.unregister_conn(
2116 seastar::static_pointer_cast
<SocketConnection
>(
2117 conn
.shared_from_this()));
2123 protocol_timer
.cancel();
2124 trigger_state(state_t::CLOSING
, write_state_t::drop
, false);
2127 void ProtocolV2::on_closed()
2129 messenger
.closed_conn(
2130 seastar::static_pointer_cast
<SocketConnection
>(
2131 conn
.shared_from_this()));
2134 void ProtocolV2::print(std::ostream
& out
) const
2139 } // namespace crimson::net