1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV2.h"
6 #include <fmt/format.h>
7 #include <fmt/ranges.h>
8 #include "include/msgr.h"
9 #include "include/random.h"
10 #include "msg/msg_fmt.h"
12 #include "crimson/auth/AuthClient.h"
13 #include "crimson/auth/AuthServer.h"
14 #include "crimson/common/formatter.h"
15 #include "crimson/common/log.h"
18 #include "SocketMessenger.h"
20 using namespace ceph::msgr::v2
;
21 using crimson::common::local_conf
;
25 // TODO: CEPH_MSGR2_FEATURE_COMPRESSION
26 const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES
=
27 (CEPH_MSGR2_FEATURE_REVISION_1
|
28 // CEPH_MSGR2_FEATURE_COMPRESSION |
31 // Log levels in V2 Protocol:
32 // * error level, something error that cause connection to terminate:
35 // * warn level: something unusual that identifies connection fault or replacement:
36 // - unstable network;
37 // - incompatible peer;
40 // - connection reset;
41 // * info level, something very important to show connection lifecycle,
42 // which doesn't happen very frequently;
43 // * debug level, important logs for debugging, including:
44 // - all the messages sent/received (-->/<==);
45 // - all the frames exchanged (WRITE/GOT);
46 // - important fields updated (UPDATE);
47 // - connection state transitions (TRIGGER);
48 // * trace level, trivial logs showing:
49 // - the exact bytes being sent/received (SEND/RECV(bytes));
50 // - detailed information of sub-frames;
51 // - integrity checks;
53 seastar::logger
& logger() {
54 return crimson::get_logger(ceph_subsys_ms
);
57 [[noreturn
]] void abort_in_fault() {
58 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
61 [[noreturn
]] void abort_protocol() {
62 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
65 #define ABORT_IN_CLOSE(is_dispatch_reset) { \
66 do_close(is_dispatch_reset); \
70 inline void expect_tag(const Tag
& expected
,
72 crimson::net::SocketConnection
& conn
,
74 if (actual
!= expected
) {
75 logger().warn("{} {} received wrong tag: {}, expected {}",
77 static_cast<uint32_t>(actual
),
78 static_cast<uint32_t>(expected
));
83 inline void unexpected_tag(const Tag
& unexpected
,
84 crimson::net::SocketConnection
& conn
,
86 logger().warn("{} {} received unexpected tag: {}",
87 conn
, where
, static_cast<uint32_t>(unexpected
));
91 inline uint64_t generate_client_cookie() {
92 return ceph::util::generate_random_number
<uint64_t>(
93 1, std::numeric_limits
<uint64_t>::max());
96 } // namespace anonymous
98 namespace crimson::net
{
100 seastar::future
<> ProtocolV2::Timer::backoff(double seconds
)
102 logger().warn("{} waiting {} seconds ...", conn
, seconds
);
105 as
= seastar::abort_source();
106 auto dur
= std::chrono::duration_cast
<seastar::lowres_clock::duration
>(
107 std::chrono::duration
<double>(seconds
));
108 return seastar::sleep_abortable(dur
, *as
109 ).handle_exception_type([this] (const seastar::sleep_aborted
& e
) {
110 logger().debug("{} wait aborted", conn
);
115 ProtocolV2::ProtocolV2(SocketConnection
& conn
,
116 IOHandler
&io_handler
)
118 messenger
{conn
.messenger
},
119 io_handler
{io_handler
},
120 frame_assembler
{FrameAssemblerV2::create(conn
)},
121 auth_meta
{seastar::make_lw_shared
<AuthConnectionMeta
>()},
124 io_states
= io_handler
.get_states();
127 ProtocolV2::~ProtocolV2() {}
129 void ProtocolV2::start_connect(const entity_addr_t
& _peer_addr
,
130 const entity_name_t
& _peer_name
)
132 assert(seastar::this_shard_id() == conn
.get_messenger_shard_id());
133 ceph_assert(state
== state_t::NONE
);
134 ceph_assert(!gate
.is_closed());
135 conn
.peer_addr
= _peer_addr
;
136 conn
.target_addr
= _peer_addr
;
137 conn
.set_peer_name(_peer_name
);
138 conn
.policy
= messenger
.get_policy(_peer_name
.type());
139 client_cookie
= generate_client_cookie();
140 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
141 " policy(lossy={}, server={}, standby={}, resetcheck={})",
142 conn
, _peer_addr
, _peer_name
, client_cookie
,
143 conn
.policy
.lossy
, conn
.policy
.server
,
144 conn
.policy
.standby
, conn
.policy
.resetcheck
);
145 messenger
.register_conn(
146 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
147 execute_connecting();
150 void ProtocolV2::start_accept(SocketFRef
&& new_socket
,
151 const entity_addr_t
& _peer_addr
)
153 assert(seastar::this_shard_id() == conn
.get_messenger_shard_id());
154 ceph_assert(state
== state_t::NONE
);
155 // until we know better
156 conn
.target_addr
= _peer_addr
;
157 frame_assembler
->set_socket(std::move(new_socket
));
159 is_socket_valid
= true;
160 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn
, _peer_addr
);
161 messenger
.accept_conn(
162 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
164 auto cc_seq
= crosscore
.prepare_submit();
165 gate
.dispatch_in_background("set_accepted_sid", conn
, [this, cc_seq
] {
166 return io_handler
.set_accepted_sid(
168 frame_assembler
->get_socket_shard_id(),
169 seastar::make_foreign(conn
.shared_from_this()));
175 void ProtocolV2::trigger_state_phase1(state_t new_state
)
177 ceph_assert_always(!gate
.is_closed());
178 if (new_state
== state
) {
179 logger().error("{} is not allowed to re-trigger state {}",
180 conn
, get_state_name(state
));
183 if (state
== state_t::CLOSING
) {
184 logger().error("{} CLOSING is not allowed to trigger state {}",
185 conn
, get_state_name(new_state
));
188 logger().debug("{} TRIGGER {}, was {}",
189 conn
, get_state_name(new_state
), get_state_name(state
));
191 if (state
== state_t::READY
) {
193 ceph_assert_always(!need_exit_io
);
194 ceph_assert_always(!pr_exit_io
.has_value());
196 pr_exit_io
= seastar::shared_promise
<>();
199 if (new_state
== state_t::STANDBY
&& !conn
.policy
.server
) {
200 need_notify_out
= true;
202 need_notify_out
= false;
208 void ProtocolV2::trigger_state_phase2(
209 state_t new_state
, io_state_t new_io_state
)
211 ceph_assert_always(new_state
== state
);
212 ceph_assert_always(!gate
.is_closed());
213 ceph_assert_always(!pr_switch_io_shard
.has_value());
215 FrameAssemblerV2Ref fa
;
216 if (new_state
== state_t::READY
) {
217 assert(new_io_state
== io_state_t::open
);
218 assert(io_handler
.get_shard_id() ==
219 frame_assembler
->get_socket_shard_id());
220 frame_assembler
->set_shard_id(io_handler
.get_shard_id());
221 fa
= std::move(frame_assembler
);
223 assert(new_io_state
!= io_state_t::open
);
226 auto cc_seq
= crosscore
.prepare_submit();
227 logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
228 "fa={}, set_notify_out={}",
229 conn
, cc_seq
, get_state_name(new_state
), new_io_state
,
230 fa
? fmt::format("(sid={})", fa
->get_shard_id()) : "N/A",
232 gate
.dispatch_in_background(
233 "set_io_state", conn
,
234 [this, cc_seq
, new_io_state
, fa
=std::move(fa
)]() mutable {
235 return seastar::smp::submit_to(
236 io_handler
.get_shard_id(),
237 [this, cc_seq
, new_io_state
,
238 fa
=std::move(fa
), set_notify_out
=need_notify_out
]() mutable {
239 return io_handler
.set_io_state(
240 cc_seq
, new_io_state
, std::move(fa
), set_notify_out
);
246 auto cc_seq
= crosscore
.prepare_submit();
247 logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
249 assert(pr_exit_io
.has_value());
250 assert(new_io_state
!= io_state_t::open
);
251 need_exit_io
= false;
252 gate
.dispatch_in_background("exit_io", conn
, [this, cc_seq
] {
253 return seastar::smp::submit_to(
254 io_handler
.get_shard_id(), [this, cc_seq
] {
255 return io_handler
.wait_io_exit_dispatching(cc_seq
);
256 }).then([this, cc_seq
](auto ret
) {
257 logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
258 conn
, cc_seq
, ret
.io_states
);
259 frame_assembler
= std::move(ret
.frame_assembler
);
260 assert(seastar::this_shard_id() == conn
.get_messenger_shard_id());
262 seastar::this_shard_id() == frame_assembler
->get_shard_id());
263 ceph_assert_always(!frame_assembler
->is_socket_valid());
264 assert(!need_exit_io
);
265 io_states
= ret
.io_states
;
266 pr_exit_io
->set_value();
267 pr_exit_io
= std::nullopt
;
273 void ProtocolV2::fault(
274 state_t expected_state
,
276 std::exception_ptr eptr
)
278 assert(expected_state
== state_t::CONNECTING
||
279 expected_state
== state_t::ESTABLISHING
||
280 expected_state
== state_t::REPLACING
||
281 expected_state
== state_t::READY
);
284 std::rethrow_exception(eptr
);
285 } catch (std::exception
&e
) {
289 if (state
!= expected_state
) {
290 logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
292 get_state_name(expected_state
),
294 get_state_name(state
),
297 if (expected_state
== state_t::REPLACING
) {
298 assert(state
== state_t::CLOSING
);
299 } else if (expected_state
== state_t::READY
) {
300 assert(state
== state_t::CLOSING
||
301 state
== state_t::REPLACING
||
302 state
== state_t::CONNECTING
||
303 state
== state_t::STANDBY
);
305 assert(state
== state_t::CLOSING
||
306 state
== state_t::REPLACING
);
311 assert(state
== expected_state
);
313 if (state
!= state_t::CONNECTING
&& conn
.policy
.lossy
) {
314 // socket will be shutdown in do_close()
315 logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
316 conn
, get_state_name(state
), where
, e_what
);
321 if (likely(has_socket
)) {
322 if (likely(is_socket_valid
)) {
323 ceph_assert_always(state
!= state_t::READY
);
324 frame_assembler
->shutdown_socket
<true>(&gate
);
325 is_socket_valid
= false;
327 ceph_assert_always(state
!= state_t::ESTABLISHING
);
329 } else { // !has_socket
330 ceph_assert_always(state
== state_t::CONNECTING
);
331 assert(!is_socket_valid
);
334 if (conn
.policy
.server
||
335 (conn
.policy
.standby
&& !io_states
.is_out_queued_or_sent())) {
336 if (conn
.policy
.server
) {
337 logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
339 get_state_name(state
),
344 logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
346 get_state_name(state
),
352 } else if (state
== state_t::CONNECTING
||
353 state
== state_t::REPLACING
) {
354 logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
356 get_state_name(state
),
362 assert(state
== state_t::READY
||
363 state
== state_t::ESTABLISHING
);
364 logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
366 get_state_name(state
),
370 execute_connecting();
374 void ProtocolV2::reset_session(bool full
)
379 client_cookie
= generate_client_cookie();
383 auto cc_seq
= crosscore
.prepare_submit();
384 logger().debug("{} send {} IOHandler::reset_session({})",
386 io_states
.reset_session(full
);
387 gate
.dispatch_in_background(
388 "reset_session", conn
, [this, cc_seq
, full
] {
389 return seastar::smp::submit_to(
390 io_handler
.get_shard_id(), [this, cc_seq
, full
] {
391 return io_handler
.reset_session(cc_seq
, full
);
394 // user can make changes
397 seastar::future
<std::tuple
<entity_type_t
, entity_addr_t
>>
398 ProtocolV2::banner_exchange(bool is_connect
)
400 // 1. prepare and send banner
401 bufferlist banner_payload
;
402 encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
403 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
406 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
407 auto len_payload
= static_cast<uint16_t>(banner_payload
.length());
408 encode(len_payload
, bl
, 0);
409 bl
.claim_append(banner_payload
);
410 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
411 "required={}, banner=\"{}\"",
412 conn
, bl
.length(), len_payload
,
413 CRIMSON_MSGR2_SUPPORTED_FEATURES
,
414 CEPH_MSGR2_REQUIRED_FEATURES
,
415 CEPH_BANNER_V2_PREFIX
);
416 #ifdef UNIT_TESTS_BUILT
417 return frame_assembler
->intercept_frame(custom_bp_t::BANNER_WRITE
, true
418 ).then([this, bl
=std::move(bl
)]() mutable {
419 return frame_assembler
->write_flush(std::move(bl
));
422 return frame_assembler
->write_flush(std::move(bl
)
425 // 2. read peer banner
426 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
427 #ifdef UNIT_TESTS_BUILT
428 return frame_assembler
->intercept_frame(custom_bp_t::BANNER_READ
, false
429 ).then([this, banner_len
] {
430 return frame_assembler
->read_exactly(banner_len
);
433 return frame_assembler
->read_exactly(banner_len
);
435 }).then([this](auto bptr
) {
436 // 3. process peer banner and read banner_payload
437 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
438 logger().debug("{} RECV({}) banner: \"{}\"",
440 std::string(bptr
.c_str(), banner_prefix_len
));
442 if (memcmp(bptr
.c_str(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
) != 0) {
443 if (memcmp(bptr
.c_str(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
444 logger().warn("{} peer is using V1 protocol", conn
);
446 logger().warn("{} peer sent bad banner", conn
);
451 bptr
.set_offset(bptr
.offset() + banner_prefix_len
);
452 bptr
.set_length(bptr
.length() - banner_prefix_len
);
453 assert(bptr
.length() == sizeof(ceph_le16
));
455 uint16_t payload_len
;
457 buf
.append(std::move(bptr
));
458 auto ti
= buf
.cbegin();
460 decode(payload_len
, ti
);
461 } catch (const buffer::error
&e
) {
462 logger().warn("{} decode banner payload len failed", conn
);
465 logger().debug("{} GOT banner: payload_len={}", conn
, payload_len
);
466 #ifdef UNIT_TESTS_BUILT
467 return frame_assembler
->intercept_frame(
468 custom_bp_t::BANNER_PAYLOAD_READ
, false
469 ).then([this, payload_len
] {
470 return frame_assembler
->read(payload_len
);
473 return frame_assembler
->read(payload_len
);
475 }).then([this, is_connect
] (bufferlist bl
) {
476 // 4. process peer banner_payload and send HelloFrame
477 auto p
= bl
.cbegin();
478 uint64_t _peer_supported_features
;
479 uint64_t _peer_required_features
;
481 decode(_peer_supported_features
, p
);
482 decode(_peer_required_features
, p
);
483 } catch (const buffer::error
&e
) {
484 logger().warn("{} decode banner payload failed", conn
);
487 logger().debug("{} RECV({}) banner features: supported={} required={}",
489 _peer_supported_features
, _peer_required_features
);
491 // Check feature bit compatibility
492 uint64_t supported_features
= CRIMSON_MSGR2_SUPPORTED_FEATURES
;
493 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
494 if ((required_features
& _peer_supported_features
) != required_features
) {
495 logger().error("{} peer does not support all required features"
496 " required={} peer_supported={}",
497 conn
, required_features
, _peer_supported_features
);
498 ABORT_IN_CLOSE(is_connect
);
500 if ((supported_features
& _peer_required_features
) != _peer_required_features
) {
501 logger().error("{} we do not support all peer required features"
502 " peer_required={} supported={}",
503 conn
, _peer_required_features
, supported_features
);
504 ABORT_IN_CLOSE(is_connect
);
506 peer_supported_features
= _peer_supported_features
;
507 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
508 frame_assembler
->set_is_rev1(is_rev1
);
510 auto hello
= HelloFrame::Encode(messenger
.get_mytype(),
512 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
513 conn
, ceph_entity_type_name(messenger
.get_mytype()),
515 return frame_assembler
->write_flush_frame(hello
);
517 //5. read peer HelloFrame
518 return frame_assembler
->read_main_preamble();
519 }).then([this](auto ret
) {
520 expect_tag(Tag::HELLO
, ret
.tag
, conn
, "read_hello_frame");
521 return frame_assembler
->read_frame_payload();
522 }).then([this](auto payload
) {
523 // 6. process peer HelloFrame
524 auto hello
= HelloFrame::Decode(payload
->back());
525 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
526 conn
, ceph_entity_type_name(hello
.entity_type()),
528 return seastar::make_ready_future
<std::tuple
<entity_type_t
, entity_addr_t
>>(
529 std::make_tuple(hello
.entity_type(), hello
.peer_addr()));
535 seastar::future
<> ProtocolV2::handle_auth_reply()
537 return frame_assembler
->read_main_preamble(
538 ).then([this](auto ret
) {
540 case Tag::AUTH_BAD_METHOD
:
541 return frame_assembler
->read_frame_payload(
542 ).then([this](auto payload
) {
543 // handle_auth_bad_method() logic
544 auto bad_method
= AuthBadMethodFrame::Decode(payload
->back());
545 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
546 "allowed_methods={}, allowed_modes={}",
547 conn
, bad_method
.method(), cpp_strerror(bad_method
.result()),
548 bad_method
.allowed_methods(), bad_method
.allowed_modes());
549 ceph_assert(messenger
.get_auth_client());
550 int r
= messenger
.get_auth_client()->handle_auth_bad_method(
552 bad_method
.method(), bad_method
.result(),
553 bad_method
.allowed_methods(), bad_method
.allowed_modes());
555 logger().warn("{} auth_client handle_auth_bad_method returned {}",
559 return client_auth(bad_method
.allowed_methods());
561 case Tag::AUTH_REPLY_MORE
:
562 return frame_assembler
->read_frame_payload(
563 ).then([this](auto payload
) {
564 // handle_auth_reply_more() logic
565 auto auth_more
= AuthReplyMoreFrame::Decode(payload
->back());
566 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
567 conn
, auth_more
.auth_payload().length());
568 ceph_assert(messenger
.get_auth_client());
569 // let execute_connecting() take care of the thrown exception
570 auto reply
= messenger
.get_auth_client()->handle_auth_reply_more(
571 conn
, *auth_meta
, auth_more
.auth_payload());
572 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
573 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
574 conn
, reply
.length());
575 return frame_assembler
->write_flush_frame(more_reply
);
577 return handle_auth_reply();
580 return frame_assembler
->read_frame_payload(
581 ).then([this](auto payload
) {
582 // handle_auth_done() logic
583 auto auth_done
= AuthDoneFrame::Decode(payload
->back());
584 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
585 conn
, auth_done
.global_id(),
586 ceph_con_mode_name(auth_done
.con_mode()),
587 auth_done
.auth_payload().length());
588 ceph_assert(messenger
.get_auth_client());
589 int r
= messenger
.get_auth_client()->handle_auth_done(
592 auth_done
.global_id(),
593 auth_done
.con_mode(),
594 auth_done
.auth_payload());
596 logger().warn("{} auth_client handle_auth_done returned {}", conn
, r
);
599 auth_meta
->con_mode
= auth_done
.con_mode();
600 frame_assembler
->create_session_stream_handlers(*auth_meta
, false);
601 return finish_auth();
604 unexpected_tag(ret
.tag
, conn
, "handle_auth_reply");
605 return seastar::now();
611 seastar::future
<> ProtocolV2::client_auth(std::vector
<uint32_t> &allowed_methods
)
613 // send_auth_request() logic
614 ceph_assert(messenger
.get_auth_client());
617 auto [auth_method
, preferred_modes
, bl
] =
618 messenger
.get_auth_client()->get_auth_request(conn
, *auth_meta
);
619 auth_meta
->auth_method
= auth_method
;
620 auto frame
= AuthRequestFrame::Encode(auth_method
, preferred_modes
, bl
);
621 logger().debug("{} WRITE AuthRequestFrame: method={},"
622 " preferred_modes={}, payload_len={}",
623 conn
, auth_method
, preferred_modes
, bl
.length());
624 return frame_assembler
->write_flush_frame(frame
626 return handle_auth_reply();
628 } catch (const crimson::auth::error
& e
) {
629 logger().error("{} get_initial_auth_request returned {}", conn
, e
.what());
630 ABORT_IN_CLOSE(true);
631 return seastar::now();
635 seastar::future
<ProtocolV2::next_step_t
>
636 ProtocolV2::process_wait()
638 return frame_assembler
->read_frame_payload(
639 ).then([this](auto payload
) {
640 // handle_wait() logic
641 logger().debug("{} GOT WaitFrame", conn
);
642 WaitFrame::Decode(payload
->back());
643 return next_step_t::wait
;
647 seastar::future
<ProtocolV2::next_step_t
>
648 ProtocolV2::client_connect()
650 // send_client_ident() logic
652 if (conn
.policy
.lossy
) {
653 flags
|= CEPH_MSG_CONNECT_LOSSY
;
656 auto client_ident
= ClientIdentFrame::Encode(
657 messenger
.get_myaddrs(),
659 messenger
.get_myname().num(),
661 conn
.policy
.features_supported
,
662 conn
.policy
.features_required
| msgr2_required
, flags
,
665 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
666 " gs={}, features_supported={}, features_required={},"
667 " flags={}, cookie={}",
668 conn
, messenger
.get_myaddrs(), conn
.target_addr
,
669 messenger
.get_myname().num(), global_seq
,
670 conn
.policy
.features_supported
,
671 conn
.policy
.features_required
| msgr2_required
,
672 flags
, client_cookie
);
673 return frame_assembler
->write_flush_frame(client_ident
675 return frame_assembler
->read_main_preamble();
676 }).then([this](auto ret
) {
678 case Tag::IDENT_MISSING_FEATURES
:
679 return frame_assembler
->read_frame_payload(
680 ).then([this](auto payload
) {
681 // handle_ident_missing_features() logic
682 auto ident_missing
= IdentMissingFeaturesFrame::Decode(payload
->back());
683 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
684 " (client does not support all server features)",
685 conn
, ident_missing
.features());
687 return next_step_t::none
;
690 return process_wait();
691 case Tag::SERVER_IDENT
:
692 return frame_assembler
->read_frame_payload(
693 ).then([this](auto payload
) {
694 if (unlikely(state
!= state_t::CONNECTING
)) {
695 logger().debug("{} triggered {} at receiving SERVER_IDENT",
696 conn
, get_state_name(state
));
700 // handle_server_ident() logic
701 auto cc_seq
= crosscore
.prepare_submit();
702 logger().debug("{} send {} IOHandler::requeue_out_sent()",
704 io_states
.requeue_out_sent();
705 gate
.dispatch_in_background(
706 "requeue_out_sent", conn
, [this, cc_seq
] {
707 return seastar::smp::submit_to(
708 io_handler
.get_shard_id(), [this, cc_seq
] {
709 return io_handler
.requeue_out_sent(cc_seq
);
713 auto server_ident
= ServerIdentFrame::Decode(payload
->back());
714 logger().debug("{} GOT ServerIdentFrame:"
715 " addrs={}, gid={}, gs={},"
716 " features_supported={}, features_required={},"
717 " flags={}, cookie={}",
719 server_ident
.addrs(), server_ident
.gid(),
720 server_ident
.global_seq(),
721 server_ident
.supported_features(),
722 server_ident
.required_features(),
723 server_ident
.flags(), server_ident
.cookie());
725 // is this who we intended to talk to?
726 // be a bit forgiving here, since we may be connecting based on addresses parsed out
727 // of mon_host or something.
728 if (!server_ident
.addrs().contains(conn
.target_addr
)) {
729 logger().warn("{} peer identifies as {}, does not include {}",
730 conn
, server_ident
.addrs(), conn
.target_addr
);
731 throw std::system_error(
732 make_error_code(crimson::net::error::bad_peer_address
));
735 server_cookie
= server_ident
.cookie();
737 // TODO: change peer_addr to entity_addrvec_t
738 if (server_ident
.addrs().front() != conn
.peer_addr
) {
739 logger().warn("{} peer advertises as {}, does not match {}",
740 conn
, server_ident
.addrs(), conn
.peer_addr
);
741 throw std::system_error(
742 make_error_code(crimson::net::error::bad_peer_address
));
744 if (conn
.get_peer_id() != entity_name_t::NEW
&&
745 conn
.get_peer_id() != server_ident
.gid()) {
746 logger().error("{} connection peer id ({}) does not match "
747 "what it should be ({}) during connecting, close",
748 conn
, server_ident
.gid(), conn
.get_peer_id());
749 ABORT_IN_CLOSE(true);
751 conn
.set_peer_id(server_ident
.gid());
752 conn
.set_features(server_ident
.supported_features() &
753 conn
.policy
.features_supported
);
754 logger().debug("{} UPDATE: features={}", conn
, conn
.get_features());
755 peer_global_seq
= server_ident
.global_seq();
757 bool lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
758 if (lossy
!= conn
.policy
.lossy
) {
759 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn
, lossy
);
760 conn
.policy
.lossy
= lossy
;
762 if (lossy
&& (connect_seq
!= 0 || server_cookie
!= 0)) {
763 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
764 conn
, connect_seq
, server_cookie
);
769 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
772 unexpected_tag(ret
.tag
, conn
, "post_client_connect");
773 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
779 seastar::future
<ProtocolV2::next_step_t
>
780 ProtocolV2::client_reconnect()
782 // send_reconnect() logic
783 auto reconnect
= ReconnectFrame::Encode(messenger
.get_myaddrs(),
789 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
790 " server_cookie={}, gs={}, cs={}, in_seq={}",
791 conn
, messenger
.get_myaddrs(),
792 client_cookie
, server_cookie
,
793 global_seq
, connect_seq
, io_states
.in_seq
);
794 return frame_assembler
->write_flush_frame(reconnect
).then([this] {
795 return frame_assembler
->read_main_preamble();
796 }).then([this](auto ret
) {
798 case Tag::SESSION_RETRY_GLOBAL
:
799 return frame_assembler
->read_frame_payload(
800 ).then([this](auto payload
) {
801 // handle_session_retry_global() logic
802 auto retry
= RetryGlobalFrame::Decode(payload
->back());
803 logger().warn("{} GOT RetryGlobalFrame: gs={}",
804 conn
, retry
.global_seq());
805 global_seq
= messenger
.get_global_seq(retry
.global_seq());
806 logger().warn("{} UPDATE: gs={} for retry global", conn
, global_seq
);
807 return client_reconnect();
809 case Tag::SESSION_RETRY
:
810 return frame_assembler
->read_frame_payload(
811 ).then([this](auto payload
) {
812 // handle_session_retry() logic
813 auto retry
= RetryFrame::Decode(payload
->back());
814 logger().warn("{} GOT RetryFrame: cs={}",
815 conn
, retry
.connect_seq());
816 connect_seq
= retry
.connect_seq() + 1;
817 logger().warn("{} UPDATE: cs={}", conn
, connect_seq
);
818 return client_reconnect();
820 case Tag::SESSION_RESET
:
821 return frame_assembler
->read_frame_payload(
822 ).then([this](auto payload
) {
823 if (unlikely(state
!= state_t::CONNECTING
)) {
824 logger().debug("{} triggered {} before reset_session()",
825 conn
, get_state_name(state
));
828 // handle_session_reset() logic
829 auto reset
= ResetFrame::Decode(payload
->back());
830 logger().warn("{} GOT ResetFrame: full={}", conn
, reset
.full());
832 reset_session(reset
.full());
833 // user can make changes
835 return client_connect();
838 return process_wait();
839 case Tag::SESSION_RECONNECT_OK
:
840 return frame_assembler
->read_frame_payload(
841 ).then([this](auto payload
) {
842 if (unlikely(state
!= state_t::CONNECTING
)) {
843 logger().debug("{} triggered {} at receiving RECONNECT_OK",
844 conn
, get_state_name(state
));
848 // handle_reconnect_ok() logic
849 auto reconnect_ok
= ReconnectOkFrame::Decode(payload
->back());
850 auto cc_seq
= crosscore
.prepare_submit();
851 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
852 "send {} IOHandler::requeue_out_sent_up_to()",
853 conn
, reconnect_ok
.msg_seq(), cc_seq
);
855 io_states
.requeue_out_sent_up_to();
856 auto msg_seq
= reconnect_ok
.msg_seq();
857 gate
.dispatch_in_background(
858 "requeue_out_reconnecting", conn
, [this, cc_seq
, msg_seq
] {
859 return seastar::smp::submit_to(
860 io_handler
.get_shard_id(), [this, cc_seq
, msg_seq
] {
861 return io_handler
.requeue_out_sent_up_to(cc_seq
, msg_seq
);
865 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
868 unexpected_tag(ret
.tag
, conn
, "post_client_reconnect");
869 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
875 void ProtocolV2::execute_connecting()
877 ceph_assert_always(!is_socket_valid
);
878 trigger_state(state_t::CONNECTING
, io_state_t::delay
);
879 gated_execute("execute_connecting", conn
, [this] {
880 global_seq
= messenger
.get_global_seq();
881 assert(client_cookie
!= 0);
882 if (!conn
.policy
.lossy
&& server_cookie
!= 0) {
884 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
885 conn
, global_seq
, connect_seq
);
886 } else { // conn.policy.lossy || server_cookie == 0
887 assert(connect_seq
== 0);
888 assert(server_cookie
== 0);
889 logger().debug("{} UPDATE: gs={} for connect", conn
, global_seq
);
891 return wait_exit_io().then([this] {
892 #ifdef UNIT_TESTS_BUILT
893 // process custom_bp_t::SOCKET_CONNECTING
894 // supports CONTINUE/FAULT/BLOCK
895 if (!conn
.interceptor
) {
896 return seastar::now();
898 return conn
.interceptor
->intercept(
899 conn
, {Breakpoint
{custom_bp_t::SOCKET_CONNECTING
}}
900 ).then([this](bp_action_t action
) {
902 case bp_action_t::CONTINUE
:
903 return seastar::now();
904 case bp_action_t::FAULT
:
905 logger().info("[Test] got FAULT");
907 case bp_action_t::BLOCK
:
908 logger().info("[Test] got BLOCK");
909 return conn
.interceptor
->blocker
.block();
911 ceph_abort("unexpected action from trap");
912 return seastar::now();
917 ceph_assert_always(frame_assembler
);
918 if (unlikely(state
!= state_t::CONNECTING
)) {
919 logger().debug("{} triggered {} before Socket::connect()",
920 conn
, get_state_name(state
));
923 return Socket::connect(conn
.peer_addr
);
924 }).then([this](SocketRef _new_socket
) {
925 logger().debug("{} socket connected", conn
);
926 if (unlikely(state
!= state_t::CONNECTING
)) {
927 logger().debug("{} triggered {} during Socket::connect()",
928 conn
, get_state_name(state
));
929 return _new_socket
->close().then([sock
=std::move(_new_socket
)] {
933 SocketFRef new_socket
= seastar::make_foreign(std::move(_new_socket
));
935 frame_assembler
->set_socket(std::move(new_socket
));
938 gate
.dispatch_in_background(
939 "replace_socket_connecting",
941 [this, new_socket
=std::move(new_socket
)]() mutable {
942 return frame_assembler
->replace_shutdown_socket(std::move(new_socket
));
946 is_socket_valid
= true;
947 return seastar::now();
949 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
950 frame_assembler
->reset_handlers();
951 frame_assembler
->start_recording();
952 return banner_exchange(true);
953 }).then([this] (auto&& ret
) {
954 auto [_peer_type
, _my_addr_from_peer
] = std::move(ret
);
955 if (conn
.get_peer_type() != _peer_type
) {
956 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
957 conn
, ceph_entity_type_name(conn
.get_peer_type()),
958 ceph_entity_type_name(_peer_type
));
959 ABORT_IN_CLOSE(true);
961 if (unlikely(state
!= state_t::CONNECTING
)) {
962 logger().debug("{} triggered {} during banner_exchange(), abort",
963 conn
, get_state_name(state
));
966 frame_assembler
->learn_socket_ephemeral_port_as_connector(
967 _my_addr_from_peer
.get_port());
968 if (unlikely(_my_addr_from_peer
.is_legacy())) {
969 logger().warn("{} peer sent a legacy address for me: {}",
970 conn
, _my_addr_from_peer
);
971 throw std::system_error(
972 make_error_code(crimson::net::error::bad_peer_address
));
974 _my_addr_from_peer
.set_type(entity_addr_t::TYPE_MSGR2
);
975 messenger
.learned_addr(_my_addr_from_peer
, conn
);
976 return client_auth();
978 if (server_cookie
== 0) {
979 ceph_assert(connect_seq
== 0);
980 return client_connect();
982 ceph_assert(connect_seq
> 0);
983 return client_reconnect();
985 }).then([this] (next_step_t next
) {
986 if (unlikely(state
!= state_t::CONNECTING
)) {
987 logger().debug("{} triggered {} at the end of execute_connecting()",
988 conn
, get_state_name(state
));
992 case next_step_t::ready
: {
993 if (unlikely(state
!= state_t::CONNECTING
)) {
994 logger().debug("{} triggered {} before dispatch_connect(), abort",
995 conn
, get_state_name(state
));
999 auto cc_seq
= crosscore
.prepare_submit();
1000 // there are 2 hops with dispatch_connect()
1001 crosscore
.prepare_submit();
1002 logger().info("{} connected: gs={}, pgs={}, cs={}, "
1003 "client_cookie={}, server_cookie={}, {}, new_sid={}, "
1004 "send {} IOHandler::dispatch_connect()",
1005 conn
, global_seq
, peer_global_seq
, connect_seq
,
1006 client_cookie
, server_cookie
, io_states
,
1007 frame_assembler
->get_socket_shard_id(), cc_seq
);
1009 // set io_handler to a new shard
1010 auto new_io_shard
= frame_assembler
->get_socket_shard_id();
1011 ConnectionFRef conn_fref
= seastar::make_foreign(
1012 conn
.shared_from_this());
1013 ceph_assert_always(!pr_switch_io_shard
.has_value());
1014 pr_switch_io_shard
= seastar::shared_promise
<>();
1015 return seastar::smp::submit_to(
1016 io_handler
.get_shard_id(),
1017 [this, cc_seq
, new_io_shard
,
1018 conn_fref
=std::move(conn_fref
)]() mutable {
1019 return io_handler
.dispatch_connect(
1020 cc_seq
, new_io_shard
, std::move(conn_fref
));
1021 }).then([this, new_io_shard
] {
1022 ceph_assert_always(io_handler
.get_shard_id() == new_io_shard
);
1023 pr_switch_io_shard
->set_value();
1024 pr_switch_io_shard
= std::nullopt
;
1025 // user can make changes
1027 if (unlikely(state
!= state_t::CONNECTING
)) {
1028 logger().debug("{} triggered {} after dispatch_connect(), abort",
1029 conn
, get_state_name(state
));
1035 case next_step_t::wait
: {
1036 logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn
);
1037 ceph_assert_always(is_socket_valid
);
1038 frame_assembler
->shutdown_socket
<true>(&gate
);
1039 is_socket_valid
= false;
1041 return seastar::now();
1044 ceph_abort("impossible next step");
1047 }).handle_exception([this](std::exception_ptr eptr
) {
1048 fault(state_t::CONNECTING
, "execute_connecting", eptr
);
1055 seastar::future
<> ProtocolV2::_auth_bad_method(int r
)
1057 // _auth_bad_method() logic
1059 auto [allowed_methods
, allowed_modes
] =
1060 messenger
.get_auth_server()->get_supported_auth_methods(conn
.get_peer_type());
1061 auto bad_method
= AuthBadMethodFrame::Encode(
1062 auth_meta
->auth_method
, r
, allowed_methods
, allowed_modes
);
1063 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
1064 "allowed_methods={}, allowed_modes={})",
1065 conn
, auth_meta
->auth_method
, cpp_strerror(r
),
1066 allowed_methods
, allowed_modes
);
1067 return frame_assembler
->write_flush_frame(bad_method
1069 return server_auth();
1073 seastar::future
<> ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
1075 // _handle_auth_request() logic
1076 ceph_assert(messenger
.get_auth_server());
1078 int r
= messenger
.get_auth_server()->handle_auth_request(
1082 auth_meta
->auth_method
,
1084 &conn
.peer_global_id
,
1089 auto auth_done
= AuthDoneFrame::Encode(
1090 conn
.peer_global_id
, auth_meta
->con_mode
, reply
);
1091 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
1092 conn
, conn
.peer_global_id
,
1093 ceph_con_mode_name(auth_meta
->con_mode
), reply
.length());
1094 return frame_assembler
->write_flush_frame(auth_done
1096 ceph_assert(auth_meta
);
1097 frame_assembler
->create_session_stream_handlers(*auth_meta
, true);
1098 return finish_auth();
1103 auto more
= AuthReplyMoreFrame::Encode(reply
);
1104 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
1105 conn
, reply
.length());
1106 return frame_assembler
->write_flush_frame(more
1108 return frame_assembler
->read_main_preamble();
1109 }).then([this](auto ret
) {
1110 expect_tag(Tag::AUTH_REQUEST_MORE
, ret
.tag
, conn
, "read_auth_request_more");
1111 return frame_assembler
->read_frame_payload();
1112 }).then([this](auto payload
) {
1113 auto auth_more
= AuthRequestMoreFrame::Decode(payload
->back());
1114 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
1115 conn
, auth_more
.auth_payload().length());
1116 return _handle_auth_request(auth_more
.auth_payload(), true);
1120 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn
);
1122 return seastar::now();
1125 logger().warn("{} auth_server handle_auth_request returned {}", conn
, r
);
1126 return _auth_bad_method(r
);
1131 seastar::future
<> ProtocolV2::server_auth()
1133 return frame_assembler
->read_main_preamble(
1134 ).then([this](auto ret
) {
1135 expect_tag(Tag::AUTH_REQUEST
, ret
.tag
, conn
, "read_auth_request");
1136 return frame_assembler
->read_frame_payload();
1137 }).then([this](auto payload
) {
1138 // handle_auth_request() logic
1139 auto request
= AuthRequestFrame::Decode(payload
->back());
1140 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
1142 conn
, request
.method(), request
.preferred_modes(),
1143 request
.auth_payload().length());
1144 auth_meta
->auth_method
= request
.method();
1145 auth_meta
->con_mode
= messenger
.get_auth_server()->pick_con_mode(
1146 conn
.get_peer_type(), auth_meta
->auth_method
,
1147 request
.preferred_modes());
1148 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
1149 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn
);
1150 return _auth_bad_method(-EOPNOTSUPP
);
1152 return _handle_auth_request(request
.auth_payload(), false);
1156 bool ProtocolV2::validate_peer_name(const entity_name_t
& peer_name
) const
1158 auto my_peer_name
= conn
.get_peer_name();
1159 if (my_peer_name
.type() != peer_name
.type()) {
1162 if (my_peer_name
.num() != entity_name_t::NEW
&&
1163 peer_name
.num() != entity_name_t::NEW
&&
1164 my_peer_name
.num() != peer_name
.num()) {
1170 seastar::future
<ProtocolV2::next_step_t
>
1171 ProtocolV2::send_wait()
1173 auto wait
= WaitFrame::Encode();
1174 logger().debug("{} WRITE WaitFrame", conn
);
1175 return frame_assembler
->write_flush_frame(wait
1177 return next_step_t::wait
;
1181 seastar::future
<ProtocolV2::next_step_t
>
1182 ProtocolV2::reuse_connection(
1183 ProtocolV2
* existing_proto
, bool do_reset
,
1184 bool reconnect
, uint64_t conn_seq
, uint64_t msg_seq
)
1186 if (unlikely(state
!= state_t::ACCEPTING
)) {
1187 logger().debug("{} triggered {} before trigger_replacing()",
1188 conn
, get_state_name(state
));
1192 existing_proto
->trigger_replacing(reconnect
,
1194 frame_assembler
->to_replace(),
1195 std::move(auth_meta
),
1198 conn
.get_peer_name(),
1199 conn
.get_features(),
1200 peer_supported_features
,
1203 ceph_assert_always(has_socket
&& is_socket_valid
);
1204 is_socket_valid
= false;
1206 #ifdef UNIT_TESTS_BUILT
1207 if (conn
.interceptor
) {
1208 conn
.interceptor
->register_conn_replaced(
1209 conn
.get_local_shared_foreign_from_this());
1212 // close this connection because all the necessary information is delivered
1213 // to the exisiting connection, and jump to error handling code to abort the
1215 ABORT_IN_CLOSE(false);
1216 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1219 seastar::future
<ProtocolV2::next_step_t
>
1220 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn
)
1222 // handle_existing_connection() logic
1223 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1224 existing_conn
->protocol
.get());
1225 ceph_assert(existing_proto
);
1226 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1227 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1228 conn
, global_seq
, peer_global_seq
, connect_seq
,
1229 client_cookie
, server_cookie
,
1230 fmt::ptr(existing_conn
.get()), get_state_name(existing_proto
->state
),
1231 existing_proto
->global_seq
,
1232 existing_proto
->peer_global_seq
,
1233 existing_proto
->connect_seq
,
1234 existing_proto
->client_cookie
,
1235 existing_proto
->server_cookie
);
1237 if (!validate_peer_name(existing_conn
->get_peer_name())) {
1238 logger().error("{} server_connect: my peer_name doesn't match"
1239 " the existing connection {}, abort", conn
, fmt::ptr(existing_conn
.get()));
1243 if (existing_proto
->state
== state_t::REPLACING
) {
1244 logger().warn("{} server_connect: racing replace happened while"
1245 " replacing existing connection {}, send wait.",
1246 conn
, *existing_conn
);
1250 if (existing_proto
->peer_global_seq
> peer_global_seq
) {
1251 logger().warn("{} server_connect:"
1252 " this is a stale connection, because peer_global_seq({})"
1253 " < existing->peer_global_seq({}), close this connection"
1254 " in favor of existing connection {}",
1255 conn
, peer_global_seq
,
1256 existing_proto
->peer_global_seq
, *existing_conn
);
1260 if (existing_conn
->policy
.lossy
) {
1261 // existing connection can be thrown out in favor of this one
1262 logger().warn("{} server_connect:"
1263 " existing connection {} is a lossy channel. Close existing in favor of"
1264 " this connection", conn
, *existing_conn
);
1265 if (unlikely(state
!= state_t::ACCEPTING
)) {
1266 logger().debug("{} triggered {} before execute_establishing()",
1267 conn
, get_state_name(state
));
1270 execute_establishing(existing_conn
);
1271 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1274 if (existing_proto
->server_cookie
!= 0) {
1275 if (existing_proto
->client_cookie
!= client_cookie
) {
1276 // Found previous session
1277 // peer has reset and we're going to reuse the existing connection
1278 // by replacing the socket
1279 logger().warn("{} server_connect:"
1280 " found new session (cs={})"
1281 " when existing {} {} is with stale session (cs={}, ss={}),"
1282 " peer must have reset",
1285 get_state_name(existing_proto
->state
),
1287 existing_proto
->client_cookie
,
1288 existing_proto
->server_cookie
);
1289 return reuse_connection(existing_proto
, conn
.policy
.resetcheck
);
1291 // session establishment interrupted between client_ident and server_ident,
1293 logger().warn("{} server_connect: found client session with existing {} {}"
1294 " matched (cs={}, ss={}), continuing session establishment",
1296 get_state_name(existing_proto
->state
),
1299 existing_proto
->server_cookie
);
1300 return reuse_connection(existing_proto
);
1303 // Looks like a connection race: server and client are both connecting to
1304 // each other at the same time.
1305 if (existing_proto
->client_cookie
!= client_cookie
) {
1306 if (existing_conn
->peer_wins()) {
1307 // acceptor (this connection, the peer) wins
1308 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1309 " and win, reusing existing {} {}",
1312 existing_proto
->client_cookie
,
1313 get_state_name(existing_proto
->state
),
1315 return reuse_connection(existing_proto
);
1317 // acceptor (this connection, the peer) loses
1318 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1319 " and lose to existing {}, ask client to wait",
1320 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1321 return existing_conn
->send_keepalive().then([this] {
1326 logger().warn("{} server_connect: found client session with existing {} {}"
1327 " matched (cs={}, ss={}), continuing session establishment",
1329 get_state_name(existing_proto
->state
),
1332 existing_proto
->server_cookie
);
1333 return reuse_connection(existing_proto
);
1338 seastar::future
<ProtocolV2::next_step_t
>
1339 ProtocolV2::server_connect()
1341 return frame_assembler
->read_frame_payload(
1342 ).then([this](auto payload
) {
1343 // handle_client_ident() logic
1344 auto client_ident
= ClientIdentFrame::Decode(payload
->back());
1345 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1346 " gid={}, gs={}, features_supported={},"
1347 " features_required={}, flags={}, cookie={}",
1348 conn
, client_ident
.addrs(), client_ident
.target_addr(),
1349 client_ident
.gid(), client_ident
.global_seq(),
1350 client_ident
.supported_features(),
1351 client_ident
.required_features(),
1352 client_ident
.flags(), client_ident
.cookie());
1354 if (client_ident
.addrs().empty() ||
1355 client_ident
.addrs().front() == entity_addr_t()) {
1356 logger().warn("{} oops, client_ident.addrs() is empty", conn
);
1357 throw std::system_error(
1358 make_error_code(crimson::net::error::bad_peer_address
));
1360 if (!messenger
.get_myaddrs().contains(client_ident
.target_addr())) {
1361 logger().warn("{} peer is trying to reach {} which is not us ({})",
1362 conn
, client_ident
.target_addr(), messenger
.get_myaddrs());
1363 throw std::system_error(
1364 make_error_code(crimson::net::error::bad_peer_address
));
1366 conn
.peer_addr
= client_ident
.addrs().front();
1367 logger().debug("{} UPDATE: peer_addr={}", conn
, conn
.peer_addr
);
1368 conn
.target_addr
= conn
.peer_addr
;
1369 if (!conn
.policy
.lossy
&& !conn
.policy
.server
&& conn
.target_addr
.get_port() <= 0) {
1370 logger().warn("{} we don't know how to reconnect to peer {}",
1371 conn
, conn
.target_addr
);
1372 throw std::system_error(
1373 make_error_code(crimson::net::error::bad_peer_address
));
1376 if (conn
.get_peer_id() != entity_name_t::NEW
&&
1377 conn
.get_peer_id() != client_ident
.gid()) {
1378 logger().error("{} client_ident peer_id ({}) does not match"
1379 " what it should be ({}) during accepting, abort",
1380 conn
, client_ident
.gid(), conn
.get_peer_id());
1383 conn
.set_peer_id(client_ident
.gid());
1384 client_cookie
= client_ident
.cookie();
1386 uint64_t feat_missing
=
1387 (conn
.policy
.features_required
| msgr2_required
) &
1388 ~(uint64_t)client_ident
.supported_features();
1390 auto ident_missing_features
= IdentMissingFeaturesFrame::Encode(feat_missing
);
1391 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1392 conn
, feat_missing
);
1393 return frame_assembler
->write_flush_frame(ident_missing_features
1395 return next_step_t::wait
;
1398 conn
.set_features(client_ident
.supported_features() &
1399 conn
.policy
.features_supported
);
1400 logger().debug("{} UPDATE: features={}", conn
, conn
.get_features());
1402 peer_global_seq
= client_ident
.global_seq();
1404 bool lossy
= client_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
1405 if (lossy
!= conn
.policy
.lossy
) {
1406 logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
1407 conn
, conn
.policy
.lossy
, lossy
);
1410 // Looks good so far, let's check if there is already an existing connection
1413 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1415 if (existing_conn
) {
1416 return handle_existing_connection(existing_conn
);
1418 if (unlikely(state
!= state_t::ACCEPTING
)) {
1419 logger().debug("{} triggered {} before execute_establishing()",
1420 conn
, get_state_name(state
));
1423 execute_establishing(nullptr);
1424 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1429 seastar::future
<ProtocolV2::next_step_t
>
1430 ProtocolV2::read_reconnect()
1432 return frame_assembler
->read_main_preamble(
1433 ).then([this](auto ret
) {
1434 expect_tag(Tag::SESSION_RECONNECT
, ret
.tag
, conn
, "read_session_reconnect");
1435 return server_reconnect();
1439 seastar::future
<ProtocolV2::next_step_t
>
1440 ProtocolV2::send_retry(uint64_t connect_seq
)
1442 auto retry
= RetryFrame::Encode(connect_seq
);
1443 logger().warn("{} WRITE RetryFrame: cs={}", conn
, connect_seq
);
1444 return frame_assembler
->write_flush_frame(retry
1446 return read_reconnect();
1450 seastar::future
<ProtocolV2::next_step_t
>
1451 ProtocolV2::send_retry_global(uint64_t global_seq
)
1453 auto retry
= RetryGlobalFrame::Encode(global_seq
);
1454 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn
, global_seq
);
1455 return frame_assembler
->write_flush_frame(retry
1457 return read_reconnect();
1461 seastar::future
<ProtocolV2::next_step_t
>
1462 ProtocolV2::send_reset(bool full
)
1464 auto reset
= ResetFrame::Encode(full
);
1465 logger().warn("{} WRITE ResetFrame: full={}", conn
, full
);
1466 return frame_assembler
->write_flush_frame(reset
1468 return frame_assembler
->read_main_preamble();
1469 }).then([this](auto ret
) {
1470 expect_tag(Tag::CLIENT_IDENT
, ret
.tag
, conn
, "post_send_reset");
1471 return server_connect();
1475 seastar::future
<ProtocolV2::next_step_t
>
1476 ProtocolV2::server_reconnect()
1478 return frame_assembler
->read_frame_payload(
1479 ).then([this](auto payload
) {
1480 // handle_reconnect() logic
1481 auto reconnect
= ReconnectFrame::Decode(payload
->back());
1483 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1484 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1485 conn
, reconnect
.addrs(),
1486 reconnect
.client_cookie(), reconnect
.server_cookie(),
1487 reconnect
.global_seq(), reconnect
.connect_seq(),
1488 reconnect
.msg_seq());
1490 // can peer_addrs be changed on-the-fly?
1491 // TODO: change peer_addr to entity_addrvec_t
1492 entity_addr_t paddr
= reconnect
.addrs().front();
1493 if (paddr
.is_msgr2() || paddr
.is_any()) {
1496 logger().warn("{} peer's address {} is not v2", conn
, paddr
);
1497 throw std::system_error(
1498 make_error_code(crimson::net::error::bad_peer_address
));
1500 if (conn
.peer_addr
== entity_addr_t()) {
1501 conn
.peer_addr
= paddr
;
1502 } else if (conn
.peer_addr
!= paddr
) {
1503 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1504 " reconnect failed",
1505 conn
, paddr
, conn
.peer_addr
);
1506 throw std::system_error(
1507 make_error_code(crimson::net::error::bad_peer_address
));
1509 peer_global_seq
= reconnect
.global_seq();
1511 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1513 if (!existing_conn
) {
1514 // there is no existing connection therefore cannot reconnect to previous
1516 logger().warn("{} server_reconnect: no existing connection from address {},"
1517 " reseting client", conn
, conn
.peer_addr
);
1518 return send_reset(true);
1521 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1522 existing_conn
->protocol
.get());
1523 ceph_assert(existing_proto
);
1524 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1525 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1526 conn
, global_seq
, peer_global_seq
, reconnect
.connect_seq(),
1527 reconnect
.client_cookie(), reconnect
.server_cookie(),
1528 fmt::ptr(existing_conn
.get()),
1529 get_state_name(existing_proto
->state
),
1530 existing_proto
->global_seq
,
1531 existing_proto
->peer_global_seq
,
1532 existing_proto
->connect_seq
,
1533 existing_proto
->client_cookie
,
1534 existing_proto
->server_cookie
);
1536 if (!validate_peer_name(existing_conn
->get_peer_name())) {
1537 logger().error("{} server_reconnect: my peer_name doesn't match"
1538 " the existing connection {}, abort", conn
, fmt::ptr(existing_conn
.get()));
1542 if (existing_proto
->state
== state_t::REPLACING
) {
1543 logger().warn("{} server_reconnect: racing replace happened while "
1544 " replacing existing connection {}, retry global.",
1545 conn
, *existing_conn
);
1546 return send_retry_global(existing_proto
->peer_global_seq
);
1549 if (existing_proto
->client_cookie
!= reconnect
.client_cookie()) {
1550 logger().warn("{} server_reconnect:"
1551 " client_cookie mismatch with existing connection {},"
1552 " cc={} rcc={}. I must have reset, reseting client.",
1553 conn
, *existing_conn
,
1554 existing_proto
->client_cookie
, reconnect
.client_cookie());
1555 return send_reset(conn
.policy
.resetcheck
);
1556 } else if (existing_proto
->server_cookie
== 0) {
1557 // this happens when:
1558 // - a connects to b
1559 // - a sends client_ident
1560 // - b gets client_ident, sends server_ident and sets cookie X
1561 // - connection fault
1562 // - b reconnects to a with cookie X, connect_seq=1
1563 // - a has cookie==0
1564 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1565 " server_ident with existing connection {}."
1566 " Asking peer to resume session establishment",
1567 conn
, existing_proto
->client_cookie
, *existing_conn
);
1568 return send_reset(false);
1571 if (existing_proto
->peer_global_seq
> reconnect
.global_seq()) {
1572 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1573 " with existing connection {},"
1574 " ask client to retry global",
1575 conn
, existing_proto
->peer_global_seq
,
1576 reconnect
.global_seq(), *existing_conn
);
1577 return send_retry_global(existing_proto
->peer_global_seq
);
1580 if (existing_proto
->connect_seq
> reconnect
.connect_seq()) {
1581 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1582 " with existing connection {}, ask client to retry",
1583 conn
, reconnect
.connect_seq(),
1584 existing_proto
->connect_seq
, *existing_conn
);
1585 return send_retry(existing_proto
->connect_seq
);
1586 } else if (existing_proto
->connect_seq
== reconnect
.connect_seq()) {
1587 // reconnect race: both peers are sending reconnect messages
1588 if (existing_conn
->peer_wins()) {
1589 // acceptor (this connection, the peer) wins
1590 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1591 " and win, reusing existing {} {}",
1593 reconnect
.connect_seq(),
1594 get_state_name(existing_proto
->state
),
1596 return reuse_connection(
1597 existing_proto
, false,
1598 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1600 // acceptor (this connection, the peer) loses
1601 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1602 " and lose to existing {}, ask client to wait",
1603 conn
, reconnect
.connect_seq(), *existing_conn
);
1606 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1607 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1608 " reusing existing {} {}",
1610 existing_proto
->connect_seq
,
1611 reconnect
.connect_seq(),
1612 get_state_name(existing_proto
->state
),
1614 return reuse_connection(
1615 existing_proto
, false,
1616 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1621 void ProtocolV2::execute_accepting()
1623 assert(is_socket_valid
);
1624 trigger_state(state_t::ACCEPTING
, io_state_t::none
);
1625 gate
.dispatch_in_background("execute_accepting", conn
, [this] {
1626 return seastar::futurize_invoke([this] {
1627 #ifdef UNIT_TESTS_BUILT
1628 if (conn
.interceptor
) {
1629 // only notify socket accepted
1630 gate
.dispatch_in_background(
1631 "test_intercept_socket_accepted", conn
, [this] {
1632 return conn
.interceptor
->intercept(
1633 conn
, {Breakpoint
{custom_bp_t::SOCKET_ACCEPTED
}}
1634 ).then([](bp_action_t action
) {
1635 ceph_assert(action
== bp_action_t::CONTINUE
);
1640 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
1641 frame_assembler
->reset_handlers();
1642 frame_assembler
->start_recording();
1643 return banner_exchange(false);
1644 }).then([this] (auto&& ret
) {
1645 auto [_peer_type
, _my_addr_from_peer
] = std::move(ret
);
1646 ceph_assert(conn
.get_peer_type() == 0);
1647 conn
.set_peer_type(_peer_type
);
1649 conn
.policy
= messenger
.get_policy(_peer_type
);
1650 logger().info("{} UPDATE: peer_type={},"
1651 " policy(lossy={} server={} standby={} resetcheck={})",
1652 conn
, ceph_entity_type_name(_peer_type
),
1653 conn
.policy
.lossy
, conn
.policy
.server
,
1654 conn
.policy
.standby
, conn
.policy
.resetcheck
);
1655 if (!messenger
.get_myaddr().is_blank_ip() &&
1656 (messenger
.get_myaddr().get_port() != _my_addr_from_peer
.get_port() ||
1657 messenger
.get_myaddr().get_nonce() != _my_addr_from_peer
.get_nonce())) {
1658 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1659 conn
, _my_addr_from_peer
, messenger
.get_myaddr());
1660 throw std::system_error(
1661 make_error_code(crimson::net::error::bad_peer_address
));
1663 messenger
.learned_addr(_my_addr_from_peer
, conn
);
1664 return server_auth();
1666 return frame_assembler
->read_main_preamble();
1667 }).then([this](auto ret
) {
1669 case Tag::CLIENT_IDENT
:
1670 return server_connect();
1671 case Tag::SESSION_RECONNECT
:
1672 return server_reconnect();
1674 unexpected_tag(ret
.tag
, conn
, "post_server_auth");
1675 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1678 }).then([this] (next_step_t next
) {
1680 case next_step_t::ready
:
1681 assert(state
!= state_t::ACCEPTING
);
1683 case next_step_t::wait
:
1684 if (unlikely(state
!= state_t::ACCEPTING
)) {
1685 logger().debug("{} triggered {} at the end of execute_accepting()",
1686 conn
, get_state_name(state
));
1689 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn
);
1690 execute_server_wait();
1693 ceph_abort("impossible next step");
1695 }).handle_exception([this](std::exception_ptr eptr
) {
1698 std::rethrow_exception(eptr
);
1699 } catch (std::exception
&e
) {
1702 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1703 conn
, get_state_name(state
), e_what
);
1709 // CONNECTING or ACCEPTING state
1711 seastar::future
<> ProtocolV2::finish_auth()
1713 ceph_assert(auth_meta
);
1715 auto records
= frame_assembler
->stop_recording();
1716 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1717 auth_meta
->session_key
.hmac_sha256(nullptr, records
.rxbuf
);
1718 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1719 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn
, sig
);
1720 return frame_assembler
->write_flush_frame(sig_frame
1722 return frame_assembler
->read_main_preamble();
1723 }).then([this](auto ret
) {
1724 expect_tag(Tag::AUTH_SIGNATURE
, ret
.tag
, conn
, "post_finish_auth");
1725 return frame_assembler
->read_frame_payload();
1726 }).then([this, txbuf
=std::move(records
.txbuf
)](auto payload
) {
1727 // handle_auth_signature() logic
1728 auto sig_frame
= AuthSignatureFrame::Decode(payload
->back());
1729 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn
, sig_frame
.signature());
1731 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
1732 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(nullptr, txbuf
);
1733 if (sig_frame
.signature() != actual_tx_sig
) {
1734 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1735 " sig_frame.signature()={}",
1736 conn
, actual_tx_sig
, sig_frame
.signature());
1744 void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn
) {
1745 auto accept_me
= [this] {
1746 messenger
.register_conn(
1747 seastar::static_pointer_cast
<SocketConnection
>(
1748 conn
.shared_from_this()));
1749 messenger
.unaccept_conn(
1750 seastar::static_pointer_cast
<SocketConnection
>(
1751 conn
.shared_from_this()));
1754 ceph_assert_always(is_socket_valid
);
1755 trigger_state(state_t::ESTABLISHING
, io_state_t::delay
);
1757 if (existing_conn
) {
1758 logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
1759 "client_cookie={}, server_cookie={}, {}, new_sid={}, "
1760 "close existing {}",
1761 conn
, global_seq
, peer_global_seq
, connect_seq
,
1762 client_cookie
, server_cookie
,
1763 io_states
, frame_assembler
->get_socket_shard_id(),
1766 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1767 existing_conn
->protocol
.get());
1768 existing_proto
->do_close(
1769 true, // is_dispatch_reset
1770 std::move(accept_me
));
1771 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1772 logger().warn("{} triggered {} during execute_establishing(), "
1773 "the accept event will not be delivered!",
1774 conn
, get_state_name(state
));
1778 logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
1779 "client_cookie={}, server_cookie={}, {}, new_sid={}, "
1781 conn
, global_seq
, peer_global_seq
, connect_seq
,
1782 client_cookie
, server_cookie
, io_states
,
1783 frame_assembler
->get_socket_shard_id());
1788 gated_execute("execute_establishing", conn
, [this, is_replace
] {
1789 ceph_assert_always(state
== state_t::ESTABLISHING
);
1791 // set io_handler to a new shard
1792 auto cc_seq
= crosscore
.prepare_submit();
1793 // there are 2 hops with dispatch_accept()
1794 crosscore
.prepare_submit();
1795 auto new_io_shard
= frame_assembler
->get_socket_shard_id();
1796 logger().debug("{} send {} IOHandler::dispatch_accept({})",
1797 conn
, cc_seq
, new_io_shard
);
1798 ConnectionFRef conn_fref
= seastar::make_foreign(
1799 conn
.shared_from_this());
1800 ceph_assert_always(!pr_switch_io_shard
.has_value());
1801 pr_switch_io_shard
= seastar::shared_promise
<>();
1802 return seastar::smp::submit_to(
1803 io_handler
.get_shard_id(),
1804 [this, cc_seq
, new_io_shard
, is_replace
,
1805 conn_fref
=std::move(conn_fref
)]() mutable {
1806 return io_handler
.dispatch_accept(
1807 cc_seq
, new_io_shard
, std::move(conn_fref
), is_replace
);
1808 }).then([this, new_io_shard
] {
1809 ceph_assert_always(io_handler
.get_shard_id() == new_io_shard
);
1810 pr_switch_io_shard
->set_value();
1811 pr_switch_io_shard
= std::nullopt
;
1812 // user can make changes
1814 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1815 logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
1816 conn
, get_state_name(state
));
1820 return send_server_ident();
1822 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1823 logger().debug("{} triggered {} at the end of execute_establishing()",
1824 conn
, get_state_name(state
));
1827 logger().info("{} established, going to ready", conn
);
1829 }).handle_exception([this](std::exception_ptr eptr
) {
1830 fault(state_t::ESTABLISHING
, "execute_establishing", eptr
);
1835 // ESTABLISHING or REPLACING state
1838 ProtocolV2::send_server_ident()
1840 ceph_assert_always(state
== state_t::ESTABLISHING
||
1841 state
== state_t::REPLACING
);
1842 // send_server_ident() logic
1844 // refered to async-conn v2: not assign gs to global_seq
1845 global_seq
= messenger
.get_global_seq();
1846 auto cc_seq
= crosscore
.prepare_submit();
1847 logger().debug("{} UPDATE: gs={} for server ident, "
1848 "send {} IOHandler::reset_peer_state()",
1849 conn
, global_seq
, cc_seq
);
1851 // this is required for the case when this connection is being replaced
1852 io_states
.reset_peer_state();
1853 gate
.dispatch_in_background(
1854 "reset_peer_state", conn
, [this, cc_seq
] {
1855 return seastar::smp::submit_to(
1856 io_handler
.get_shard_id(), [this, cc_seq
] {
1857 return io_handler
.reset_peer_state(cc_seq
);
1861 if (!conn
.policy
.lossy
) {
1862 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1866 if (conn
.policy
.lossy
) {
1867 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
1870 auto server_ident
= ServerIdentFrame::Encode(
1871 messenger
.get_myaddrs(),
1872 messenger
.get_myname().num(),
1874 conn
.policy
.features_supported
,
1875 conn
.policy
.features_required
| msgr2_required
,
1879 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1880 " gs={}, features_supported={}, features_required={},"
1881 " flags={}, cookie={}",
1882 conn
, messenger
.get_myaddrs(), messenger
.get_myname().num(),
1883 global_seq
, conn
.policy
.features_supported
,
1884 conn
.policy
.features_required
| msgr2_required
,
1885 flags
, server_cookie
);
1887 return frame_assembler
->write_flush_frame(server_ident
);
1892 void ProtocolV2::trigger_replacing(bool reconnect
,
1894 FrameAssemblerV2::mover_t
&&mover
,
1895 AuthConnectionMetaRef
&& new_auth_meta
,
1896 uint64_t new_peer_global_seq
,
1897 uint64_t new_client_cookie
,
1898 entity_name_t new_peer_name
,
1899 uint64_t new_conn_features
,
1900 uint64_t new_peer_supported_features
,
1901 uint64_t new_connect_seq
,
1902 uint64_t new_msg_seq
)
1904 ceph_assert_always(state
>= state_t::ESTABLISHING
);
1905 ceph_assert_always(state
<= state_t::WAIT
);
1906 ceph_assert_always(has_socket
|| state
== state_t::CONNECTING
);
1907 // mover.socket shouldn't be shutdown
1909 logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
1910 "client_cookie was {}, {}, new_sid={}",
1911 conn
, reconnect
? "reconnected" : "connected",
1912 peer_global_seq
, connect_seq
, client_cookie
,
1913 io_states
, mover
.socket
->get_shard_id());
1914 if (is_socket_valid
) {
1915 frame_assembler
->shutdown_socket
<true>(&gate
);
1916 is_socket_valid
= false;
1918 trigger_state_phase1(state_t::REPLACING
);
1919 gate
.dispatch_in_background(
1920 "trigger_replacing",
1925 mover
= std::move(mover
),
1926 new_auth_meta
= std::move(new_auth_meta
),
1927 new_client_cookie
, new_peer_name
,
1928 new_conn_features
, new_peer_supported_features
,
1929 new_peer_global_seq
,
1930 new_connect_seq
, new_msg_seq
] () mutable {
1931 ceph_assert_always(state
== state_t::REPLACING
);
1932 auto new_io_shard
= mover
.socket
->get_shard_id();
1933 // state may become CLOSING below, but we cannot abort the chain until
1934 // mover.socket is correctly handled (closed or replaced).
1936 // this is preemptive
1937 return wait_switch_io_shard(
1939 if (unlikely(state
!= state_t::REPLACING
)) {
1940 ceph_assert_always(state
== state_t::CLOSING
);
1941 return seastar::now();
1944 trigger_state_phase2(state_t::REPLACING
, io_state_t::delay
);
1945 return wait_exit_io();
1947 if (unlikely(state
!= state_t::REPLACING
)) {
1948 ceph_assert_always(state
== state_t::CLOSING
);
1949 return seastar::now();
1952 ceph_assert_always(frame_assembler
);
1953 protocol_timer
.cancel();
1954 auto done
= std::move(execution_done
);
1955 execution_done
= seastar::now();
1957 }).then([this, new_io_shard
] {
1958 if (unlikely(state
!= state_t::REPLACING
)) {
1959 ceph_assert_always(state
== state_t::CLOSING
);
1960 return seastar::now();
1963 // set io_handler to a new shard
1964 // we should prevent parallel switching core attemps
1965 auto cc_seq
= crosscore
.prepare_submit();
1966 // there are 2 hops with dispatch_accept()
1967 crosscore
.prepare_submit();
1968 logger().debug("{} send {} IOHandler::dispatch_accept({})",
1969 conn
, cc_seq
, new_io_shard
);
1970 ConnectionFRef conn_fref
= seastar::make_foreign(
1971 conn
.shared_from_this());
1972 ceph_assert_always(!pr_switch_io_shard
.has_value());
1973 pr_switch_io_shard
= seastar::shared_promise
<>();
1974 return seastar::smp::submit_to(
1975 io_handler
.get_shard_id(),
1976 [this, cc_seq
, new_io_shard
,
1977 conn_fref
=std::move(conn_fref
)]() mutable {
1978 return io_handler
.dispatch_accept(
1979 cc_seq
, new_io_shard
, std::move(conn_fref
), false);
1980 }).then([this, new_io_shard
] {
1981 ceph_assert_always(io_handler
.get_shard_id() == new_io_shard
);
1982 pr_switch_io_shard
->set_value();
1983 pr_switch_io_shard
= std::nullopt
;
1984 // user can make changes
1989 mover
= std::move(mover
),
1990 new_auth_meta
= std::move(new_auth_meta
),
1991 new_client_cookie
, new_peer_name
,
1992 new_conn_features
, new_peer_supported_features
,
1993 new_peer_global_seq
,
1994 new_connect_seq
, new_msg_seq
] () mutable {
1995 if (state
== state_t::REPLACING
&& do_reset
) {
1996 reset_session(true);
1997 // user can make changes
2000 if (unlikely(state
!= state_t::REPLACING
)) {
2001 logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
2002 conn
, get_state_name(state
));
2003 ceph_assert_always(state
== state_t::CLOSING
);
2004 return mover
.socket
->close(
2005 ).then([sock
= std::move(mover
.socket
)] {
2010 auth_meta
= std::move(new_auth_meta
);
2011 peer_global_seq
= new_peer_global_seq
;
2012 gate
.dispatch_in_background(
2013 "replace_frame_assembler",
2015 [this, mover
=std::move(mover
)]() mutable {
2016 return frame_assembler
->replace_by(std::move(mover
));
2019 is_socket_valid
= true;
2023 connect_seq
= new_connect_seq
;
2024 // send_reconnect_ok() logic
2026 auto cc_seq
= crosscore
.prepare_submit();
2027 logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
2028 conn
, cc_seq
, new_msg_seq
);
2029 io_states
.requeue_out_sent_up_to();
2030 gate
.dispatch_in_background(
2031 "requeue_out_replacing", conn
, [this, cc_seq
, new_msg_seq
] {
2032 return seastar::smp::submit_to(
2033 io_handler
.get_shard_id(), [this, cc_seq
, new_msg_seq
] {
2034 return io_handler
.requeue_out_sent_up_to(cc_seq
, new_msg_seq
);
2038 auto reconnect_ok
= ReconnectOkFrame::Encode(io_states
.in_seq
);
2039 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn
, io_states
.in_seq
);
2040 return frame_assembler
->write_flush_frame(reconnect_ok
);
2042 client_cookie
= new_client_cookie
;
2043 assert(conn
.get_peer_type() == new_peer_name
.type());
2044 if (conn
.get_peer_id() == entity_name_t::NEW
) {
2045 conn
.set_peer_id(new_peer_name
.num());
2047 conn
.set_features(new_conn_features
);
2048 peer_supported_features
= new_peer_supported_features
;
2049 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
2050 frame_assembler
->set_is_rev1(is_rev1
);
2051 return send_server_ident();
2053 }).then([this, reconnect
] {
2054 if (unlikely(state
!= state_t::REPLACING
)) {
2055 logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
2056 conn
, get_state_name(state
));
2057 ceph_assert_always(state
== state_t::CLOSING
);
2060 logger().info("{} replaced ({}), going to ready: "
2061 "gs={}, pgs={}, cs={}, "
2062 "client_cookie={}, server_cookie={}, {}",
2063 conn
, reconnect
? "reconnected" : "connected",
2064 global_seq
, peer_global_seq
, connect_seq
,
2065 client_cookie
, server_cookie
, io_states
);
2067 }).handle_exception([this](std::exception_ptr eptr
) {
2068 fault(state_t::REPLACING
, "trigger_replacing", eptr
);
2075 seastar::future
<> ProtocolV2::notify_out_fault(
2076 crosscore_t::seq_t cc_seq
,
2078 std::exception_ptr eptr
,
2079 io_handler_state _io_states
)
2081 assert(seastar::this_shard_id() == conn
.get_messenger_shard_id());
2082 if (!crosscore
.proceed_or_wait(cc_seq
)) {
2083 logger().debug("{} got {} notify_out_fault(), wait at {}",
2084 conn
, cc_seq
, crosscore
.get_in_seq());
2085 return crosscore
.wait(cc_seq
2086 ).then([this, cc_seq
, where
, eptr
, _io_states
] {
2087 return notify_out_fault(cc_seq
, where
, eptr
, _io_states
);
2091 io_states
= _io_states
;
2092 logger().debug("{} got {} notify_out_fault(): io_states={}",
2093 conn
, cc_seq
, io_states
);
2094 fault(state_t::READY
, where
, eptr
);
2095 return seastar::now();
2098 void ProtocolV2::execute_ready()
2100 assert(conn
.policy
.lossy
|| (client_cookie
!= 0 && server_cookie
!= 0));
2101 protocol_timer
.cancel();
2102 ceph_assert_always(is_socket_valid
);
2103 // I'm not responsible to shutdown the socket at READY
2104 is_socket_valid
= false;
2105 trigger_state(state_t::READY
, io_state_t::open
);
2106 #ifdef UNIT_TESTS_BUILT
2107 if (conn
.interceptor
) {
2108 // FIXME: doesn't support cross-core
2109 conn
.interceptor
->register_conn_ready(
2110 conn
.get_local_shared_foreign_from_this());
2117 void ProtocolV2::execute_standby()
2119 ceph_assert_always(!is_socket_valid
);
2120 trigger_state(state_t::STANDBY
, io_state_t::delay
);
2123 seastar::future
<> ProtocolV2::notify_out(
2124 crosscore_t::seq_t cc_seq
)
2126 assert(seastar::this_shard_id() == conn
.get_messenger_shard_id());
2127 if (!crosscore
.proceed_or_wait(cc_seq
)) {
2128 logger().debug("{} got {} notify_out(), wait at {}",
2129 conn
, cc_seq
, crosscore
.get_in_seq());
2130 return crosscore
.wait(cc_seq
2131 ).then([this, cc_seq
] {
2132 return notify_out(cc_seq
);
2136 logger().debug("{} got {} notify_out(): at {}",
2137 conn
, cc_seq
, get_state_name(state
));
2138 io_states
.is_out_queued
= true;
2139 if (unlikely(state
== state_t::STANDBY
&& !conn
.policy
.server
)) {
2140 logger().info("{} notify_out(): at {}, going to CONNECTING",
2141 conn
, get_state_name(state
));
2142 execute_connecting();
2144 return seastar::now();
2149 void ProtocolV2::execute_wait(bool max_backoff
)
2151 ceph_assert_always(!is_socket_valid
);
2152 trigger_state(state_t::WAIT
, io_state_t::delay
);
2153 gated_execute("execute_wait", conn
, [this, max_backoff
] {
2154 double backoff
= protocol_timer
.last_dur();
2156 backoff
= local_conf().get_val
<double>("ms_max_backoff");
2157 } else if (backoff
> 0) {
2158 backoff
= std::min(local_conf().get_val
<double>("ms_max_backoff"), 2 * backoff
);
2160 backoff
= local_conf().get_val
<double>("ms_initial_backoff");
2162 return protocol_timer
.backoff(backoff
).then([this] {
2163 if (unlikely(state
!= state_t::WAIT
)) {
2164 logger().debug("{} triggered {} at the end of execute_wait()",
2165 conn
, get_state_name(state
));
2168 logger().info("{} execute_wait(): going to CONNECTING", conn
);
2169 execute_connecting();
2170 }).handle_exception([this](std::exception_ptr eptr
) {
2173 std::rethrow_exception(eptr
);
2174 } catch (std::exception
&e
) {
2177 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
2178 conn
, get_state_name(state
), e_what
);
2179 assert(state
== state_t::REPLACING
||
2180 state
== state_t::CLOSING
);
2185 // SERVER_WAIT state
2187 void ProtocolV2::execute_server_wait()
2189 ceph_assert_always(is_socket_valid
);
2190 trigger_state(state_t::SERVER_WAIT
, io_state_t::none
);
2191 gated_execute("execute_server_wait", conn
, [this] {
2192 return frame_assembler
->read_exactly(1
2193 ).then([this](auto bptr
) {
2194 logger().warn("{} SERVER_WAIT got read, abort", conn
);
2196 }).handle_exception([this](std::exception_ptr eptr
) {
2199 std::rethrow_exception(eptr
);
2200 } catch (std::exception
&e
) {
2203 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
2204 conn
, get_state_name(state
), e_what
);
2212 seastar::future
<> ProtocolV2::notify_mark_down(
2213 crosscore_t::seq_t cc_seq
)
2215 assert(seastar::this_shard_id() == conn
.get_messenger_shard_id());
2216 if (!crosscore
.proceed_or_wait(cc_seq
)) {
2217 logger().debug("{} got {} notify_mark_down(), wait at {}",
2218 conn
, cc_seq
, crosscore
.get_in_seq());
2219 return crosscore
.wait(cc_seq
2220 ).then([this, cc_seq
] {
2221 return notify_mark_down(cc_seq
);
2225 logger().debug("{} got {} notify_mark_down()",
2228 return seastar::now();
2231 seastar::future
<> ProtocolV2::close_clean_yielded()
2233 // yield() so that do_close() can be called *after* close_clean_yielded() is
2234 // applied to all connections in a container using
2235 // seastar::parallel_for_each(). otherwise, we could erase a connection in
2236 // the container when seastar::parallel_for_each() is still iterating in it.
2237 // that'd lead to a segfault.
2238 return seastar::yield(
2241 return pr_closed_clean
.get_shared_future();
2243 // connection may be unreferenced from the messenger,
2244 // so need to hold the additional reference.
2245 }).finally([conn_ref
= conn
.shared_from_this()] {});;
2248 void ProtocolV2::do_close(
2249 bool is_dispatch_reset
,
2250 std::optional
<std::function
<void()>> f_accept_new
)
2252 if (state
== state_t::CLOSING
) {
2257 bool is_replace
= f_accept_new
? true : false;
2258 logger().info("{} closing: reset {}, replace {}", conn
,
2259 is_dispatch_reset
? "yes" : "no",
2260 is_replace
? "yes" : "no");
2266 ceph_assert_always(!gate
.is_closed());
2268 // messenger registrations, must before user events
2269 messenger
.closing_conn(
2270 seastar::static_pointer_cast
<SocketConnection
>(
2271 conn
.shared_from_this()));
2272 if (state
== state_t::ACCEPTING
|| state
== state_t::SERVER_WAIT
) {
2273 messenger
.unaccept_conn(
2274 seastar::static_pointer_cast
<SocketConnection
>(
2275 conn
.shared_from_this()));
2276 } else if (state
>= state_t::ESTABLISHING
&& state
< state_t::CLOSING
) {
2277 messenger
.unregister_conn(
2278 seastar::static_pointer_cast
<SocketConnection
>(
2279 conn
.shared_from_this()));
2285 // the replacing connection must be registerred after the replaced
2286 // connection is unreigsterred.
2290 protocol_timer
.cancel();
2291 if (is_socket_valid
) {
2292 frame_assembler
->shutdown_socket
<true>(&gate
);
2293 is_socket_valid
= false;
2296 trigger_state_phase1(state_t::CLOSING
);
2297 gate
.dispatch_in_background(
2298 "close_io", conn
, [this, is_dispatch_reset
, is_replace
] {
2299 // this is preemptive
2300 return wait_switch_io_shard(
2301 ).then([this, is_dispatch_reset
, is_replace
] {
2302 trigger_state_phase2(state_t::CLOSING
, io_state_t::drop
);
2303 auto cc_seq
= crosscore
.prepare_submit();
2304 logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
2305 conn
, cc_seq
, is_dispatch_reset
, is_replace
);
2307 std::ignore
= gate
.close(
2309 ceph_assert_always(!need_exit_io
);
2310 ceph_assert_always(!pr_exit_io
.has_value());
2312 ceph_assert_always(frame_assembler
);
2313 return frame_assembler
->close_shutdown_socket();
2315 return seastar::now();
2318 logger().debug("{} closed!", conn
);
2319 messenger
.closed_conn(
2320 seastar::static_pointer_cast
<SocketConnection
>(
2321 conn
.shared_from_this()));
2322 pr_closed_clean
.set_value();
2323 #ifdef UNIT_TESTS_BUILT
2324 closed_clean
= true;
2325 if (conn
.interceptor
) {
2326 conn
.interceptor
->register_conn_closed(
2327 conn
.get_local_shared_foreign_from_this());
2330 // connection is unreferenced from the messenger,
2331 // so need to hold the additional reference.
2332 }).handle_exception([conn_ref
= conn
.shared_from_this(), this] (auto eptr
) {
2333 logger().error("{} closing got unexpected exception {}",
2338 return seastar::smp::submit_to(
2339 io_handler
.get_shard_id(),
2340 [this, cc_seq
, is_dispatch_reset
, is_replace
] {
2341 return io_handler
.close_io(cc_seq
, is_dispatch_reset
, is_replace
);
2343 // user can make changes
2348 } // namespace crimson::net