1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ProtocolV2.h"
6 #include <fmt/format.h>
7 #include <fmt/ranges.h>
8 #include "include/msgr.h"
9 #include "include/random.h"
10 #include "msg/msg_fmt.h"
12 #include "crimson/auth/AuthClient.h"
13 #include "crimson/auth/AuthServer.h"
14 #include "crimson/common/formatter.h"
15 #include "crimson/common/log.h"
18 #include "SocketMessenger.h"
20 #ifdef UNIT_TESTS_BUILT
21 #include "Interceptor.h"
24 using namespace ceph::msgr::v2
;
25 using crimson::common::local_conf
;
26 using io_state_t
= crimson::net::IOHandler::io_state_t
;
27 using io_stat_printer
= crimson::net::IOHandler::io_stat_printer
;
31 // TODO: CEPH_MSGR2_FEATURE_COMPRESSION
32 const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES
=
33 (CEPH_MSGR2_FEATURE_REVISION_1
|
34 // CEPH_MSGR2_FEATURE_COMPRESSION |
37 // Log levels in V2 Protocol:
38 // * error level, something error that cause connection to terminate:
41 // * warn level: something unusual that identifies connection fault or replacement:
42 // - unstable network;
43 // - incompatible peer;
46 // - connection reset;
47 // * info level, something very important to show connection lifecycle,
48 // which doesn't happen very frequently;
49 // * debug level, important logs for debugging, including:
50 // - all the messages sent/received (-->/<==);
51 // - all the frames exchanged (WRITE/GOT);
52 // - important fields updated (UPDATE);
53 // - connection state transitions (TRIGGER);
54 // * trace level, trivial logs showing:
55 // - the exact bytes being sent/received (SEND/RECV(bytes));
56 // - detailed information of sub-frames;
57 // - integrity checks;
59 seastar::logger
& logger() {
60 return crimson::get_logger(ceph_subsys_ms
);
63 [[noreturn
]] void abort_in_fault() {
64 throw std::system_error(make_error_code(crimson::net::error::negotiation_failure
));
67 [[noreturn
]] void abort_protocol() {
68 throw std::system_error(make_error_code(crimson::net::error::protocol_aborted
));
71 #define ABORT_IN_CLOSE(is_dispatch_reset) { \
72 do_close(is_dispatch_reset); \
76 inline void expect_tag(const Tag
& expected
,
78 crimson::net::SocketConnection
& conn
,
80 if (actual
!= expected
) {
81 logger().warn("{} {} received wrong tag: {}, expected {}",
83 static_cast<uint32_t>(actual
),
84 static_cast<uint32_t>(expected
));
89 inline void unexpected_tag(const Tag
& unexpected
,
90 crimson::net::SocketConnection
& conn
,
92 logger().warn("{} {} received unexpected tag: {}",
93 conn
, where
, static_cast<uint32_t>(unexpected
));
97 inline uint64_t generate_client_cookie() {
98 return ceph::util::generate_random_number
<uint64_t>(
99 1, std::numeric_limits
<uint64_t>::max());
102 } // namespace anonymous
104 namespace crimson::net
{
106 #ifdef UNIT_TESTS_BUILT
107 // should be consistent to intercept_frame() in FrameAssemblerV2.cc
108 void intercept(Breakpoint bp
,
110 SocketConnection
& conn
,
111 Interceptor
*interceptor
,
114 auto action
= interceptor
->intercept(conn
, Breakpoint(bp
));
115 socket
->set_trap(type
, action
, &interceptor
->blocker
);
119 #define INTERCEPT_CUSTOM(bp, type) \
120 intercept({bp}, type, conn, \
121 conn.interceptor, conn.socket)
123 #define INTERCEPT_CUSTOM(bp, type)
126 seastar::future
<> ProtocolV2::Timer::backoff(double seconds
)
128 logger().warn("{} waiting {} seconds ...", conn
, seconds
);
131 as
= seastar::abort_source();
132 auto dur
= std::chrono::duration_cast
<seastar::lowres_clock::duration
>(
133 std::chrono::duration
<double>(seconds
));
134 return seastar::sleep_abortable(dur
, *as
135 ).handle_exception_type([this] (const seastar::sleep_aborted
& e
) {
136 logger().debug("{} wait aborted", conn
);
141 ProtocolV2::ProtocolV2(SocketConnection
& conn
,
142 IOHandler
&io_handler
)
144 messenger
{conn
.messenger
},
145 io_handler
{io_handler
},
146 frame_assembler
{FrameAssemblerV2::create(conn
)},
147 auth_meta
{seastar::make_lw_shared
<AuthConnectionMeta
>()},
151 ProtocolV2::~ProtocolV2() {}
153 void ProtocolV2::start_connect(const entity_addr_t
& _peer_addr
,
154 const entity_name_t
& _peer_name
)
156 ceph_assert(state
== state_t::NONE
);
157 ceph_assert(!gate
.is_closed());
158 conn
.peer_addr
= _peer_addr
;
159 conn
.target_addr
= _peer_addr
;
160 conn
.set_peer_name(_peer_name
);
161 conn
.policy
= messenger
.get_policy(_peer_name
.type());
162 client_cookie
= generate_client_cookie();
163 logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
164 " policy(lossy={}, server={}, standby={}, resetcheck={})",
165 conn
, _peer_addr
, _peer_name
, client_cookie
,
166 conn
.policy
.lossy
, conn
.policy
.server
,
167 conn
.policy
.standby
, conn
.policy
.resetcheck
);
168 messenger
.register_conn(
169 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
170 execute_connecting();
173 void ProtocolV2::start_accept(SocketRef
&& new_socket
,
174 const entity_addr_t
& _peer_addr
)
176 ceph_assert(state
== state_t::NONE
);
177 // until we know better
178 conn
.target_addr
= _peer_addr
;
179 frame_assembler
->set_socket(std::move(new_socket
));
181 is_socket_valid
= true;
182 logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn
, _peer_addr
);
183 messenger
.accept_conn(
184 seastar::static_pointer_cast
<SocketConnection
>(conn
.shared_from_this()));
188 void ProtocolV2::trigger_state(state_t new_state
, io_state_t new_io_state
, bool reentrant
)
190 if (!reentrant
&& new_state
== state
) {
191 logger().error("{} is not allowed to re-trigger state {}",
192 conn
, get_state_name(state
));
195 if (state
== state_t::CLOSING
) {
196 logger().error("{} CLOSING is not allowed to trigger state {}",
197 conn
, get_state_name(new_state
));
200 logger().debug("{} TRIGGER {}, was {}",
201 conn
, get_state_name(new_state
), get_state_name(state
));
202 auto pre_state
= state
;
203 if (pre_state
== state_t::READY
) {
204 assert(!gate
.is_closed());
205 ceph_assert_always(!exit_io
.has_value());
206 exit_io
= seastar::shared_promise
<>();
209 if (new_state
== state_t::READY
) {
210 // I'm not responsible to shutdown the socket at READY
211 is_socket_valid
= false;
212 io_handler
.set_io_state(new_io_state
, std::move(frame_assembler
));
214 io_handler
.set_io_state(new_io_state
, nullptr);
221 if (pre_state
== state_t::READY
) {
222 gate
.dispatch_in_background("exit_io", conn
, [this] {
223 return io_handler
.wait_io_exit_dispatching(
224 ).then([this](FrameAssemblerV2Ref fa
) {
225 frame_assembler
= std::move(fa
);
226 exit_io
->set_value();
227 exit_io
= std::nullopt
;
233 void ProtocolV2::fault(
234 state_t expected_state
,
236 std::exception_ptr eptr
)
238 assert(expected_state
== state_t::CONNECTING
||
239 expected_state
== state_t::ESTABLISHING
||
240 expected_state
== state_t::REPLACING
||
241 expected_state
== state_t::READY
);
244 std::rethrow_exception(eptr
);
245 } catch (std::exception
&e
) {
249 if (state
!= expected_state
) {
250 logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
252 get_state_name(expected_state
),
254 get_state_name(state
),
257 if (expected_state
== state_t::REPLACING
) {
258 assert(state
== state_t::CLOSING
);
259 } else if (expected_state
== state_t::READY
) {
260 assert(state
== state_t::CLOSING
||
261 state
== state_t::REPLACING
||
262 state
== state_t::CONNECTING
||
263 state
== state_t::STANDBY
);
265 assert(state
== state_t::CLOSING
||
266 state
== state_t::REPLACING
);
271 assert(state
== expected_state
);
273 if (state
!= state_t::CONNECTING
&& conn
.policy
.lossy
) {
274 // socket will be shutdown in do_close()
275 logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
276 conn
, get_state_name(state
), where
, e_what
);
281 if (likely(has_socket
)) {
282 if (likely(is_socket_valid
)) {
283 ceph_assert_always(state
!= state_t::READY
);
284 frame_assembler
->shutdown_socket();
285 is_socket_valid
= false;
287 ceph_assert_always(state
!= state_t::ESTABLISHING
);
289 } else { // !has_socket
290 ceph_assert_always(state
== state_t::CONNECTING
);
291 assert(!is_socket_valid
);
294 if (conn
.policy
.server
||
295 (conn
.policy
.standby
&& !io_handler
.is_out_queued_or_sent())) {
296 if (conn
.policy
.server
) {
297 logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
299 get_state_name(state
),
301 io_stat_printer
{io_handler
},
304 logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
306 get_state_name(state
),
308 io_stat_printer
{io_handler
},
312 } else if (state
== state_t::CONNECTING
||
313 state
== state_t::REPLACING
) {
314 logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
316 get_state_name(state
),
318 io_stat_printer
{io_handler
},
322 assert(state
== state_t::READY
||
323 state
== state_t::ESTABLISHING
);
324 logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
326 get_state_name(state
),
328 io_stat_printer
{io_handler
},
330 execute_connecting();
334 void ProtocolV2::reset_session(bool full
)
339 client_cookie
= generate_client_cookie();
342 io_handler
.reset_session(full
);
345 seastar::future
<std::tuple
<entity_type_t
, entity_addr_t
>>
346 ProtocolV2::banner_exchange(bool is_connect
)
348 // 1. prepare and send banner
349 bufferlist banner_payload
;
350 encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES
, banner_payload
, 0);
351 encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES
, banner_payload
, 0);
354 bl
.append(CEPH_BANNER_V2_PREFIX
, strlen(CEPH_BANNER_V2_PREFIX
));
355 auto len_payload
= static_cast<uint16_t>(banner_payload
.length());
356 encode(len_payload
, bl
, 0);
357 bl
.claim_append(banner_payload
);
358 logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
359 "required={}, banner=\"{}\"",
360 conn
, bl
.length(), len_payload
,
361 CRIMSON_MSGR2_SUPPORTED_FEATURES
,
362 CEPH_MSGR2_REQUIRED_FEATURES
,
363 CEPH_BANNER_V2_PREFIX
);
364 INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE
, bp_type_t::WRITE
);
365 return frame_assembler
->write_flush(std::move(bl
)).then([this] {
366 // 2. read peer banner
367 unsigned banner_len
= strlen(CEPH_BANNER_V2_PREFIX
) + sizeof(ceph_le16
);
368 INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ
, bp_type_t::READ
);
369 return frame_assembler
->read_exactly(banner_len
); // or read exactly?
370 }).then([this] (auto bl
) {
371 // 3. process peer banner and read banner_payload
372 unsigned banner_prefix_len
= strlen(CEPH_BANNER_V2_PREFIX
);
373 logger().debug("{} RECV({}) banner: \"{}\"",
375 std::string((const char*)bl
.get(), banner_prefix_len
));
377 if (memcmp(bl
.get(), CEPH_BANNER_V2_PREFIX
, banner_prefix_len
) != 0) {
378 if (memcmp(bl
.get(), CEPH_BANNER
, strlen(CEPH_BANNER
)) == 0) {
379 logger().warn("{} peer is using V1 protocol", conn
);
381 logger().warn("{} peer sent bad banner", conn
);
385 bl
.trim_front(banner_prefix_len
);
387 uint16_t payload_len
;
389 buf
.append(buffer::create(std::move(bl
)));
390 auto ti
= buf
.cbegin();
392 decode(payload_len
, ti
);
393 } catch (const buffer::error
&e
) {
394 logger().warn("{} decode banner payload len failed", conn
);
397 logger().debug("{} GOT banner: payload_len={}", conn
, payload_len
);
398 INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ
, bp_type_t::READ
);
399 return frame_assembler
->read(payload_len
);
400 }).then([this, is_connect
] (bufferlist bl
) {
401 // 4. process peer banner_payload and send HelloFrame
402 auto p
= bl
.cbegin();
403 uint64_t _peer_supported_features
;
404 uint64_t _peer_required_features
;
406 decode(_peer_supported_features
, p
);
407 decode(_peer_required_features
, p
);
408 } catch (const buffer::error
&e
) {
409 logger().warn("{} decode banner payload failed", conn
);
412 logger().debug("{} RECV({}) banner features: supported={} required={}",
414 _peer_supported_features
, _peer_required_features
);
416 // Check feature bit compatibility
417 uint64_t supported_features
= CRIMSON_MSGR2_SUPPORTED_FEATURES
;
418 uint64_t required_features
= CEPH_MSGR2_REQUIRED_FEATURES
;
419 if ((required_features
& _peer_supported_features
) != required_features
) {
420 logger().error("{} peer does not support all required features"
421 " required={} peer_supported={}",
422 conn
, required_features
, _peer_supported_features
);
423 ABORT_IN_CLOSE(is_connect
);
425 if ((supported_features
& _peer_required_features
) != _peer_required_features
) {
426 logger().error("{} we do not support all peer required features"
427 " peer_required={} supported={}",
428 conn
, _peer_required_features
, supported_features
);
429 ABORT_IN_CLOSE(is_connect
);
431 peer_supported_features
= _peer_supported_features
;
432 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
433 frame_assembler
->set_is_rev1(is_rev1
);
435 auto hello
= HelloFrame::Encode(messenger
.get_mytype(),
437 logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
438 conn
, ceph_entity_type_name(messenger
.get_mytype()),
440 return frame_assembler
->write_flush_frame(hello
);
442 //5. read peer HelloFrame
443 return frame_assembler
->read_main_preamble();
444 }).then([this](auto ret
) {
445 expect_tag(Tag::HELLO
, ret
.tag
, conn
, "read_hello_frame");
446 return frame_assembler
->read_frame_payload();
447 }).then([this](auto payload
) {
448 // 6. process peer HelloFrame
449 auto hello
= HelloFrame::Decode(payload
->back());
450 logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
451 conn
, ceph_entity_type_name(hello
.entity_type()),
453 return seastar::make_ready_future
<std::tuple
<entity_type_t
, entity_addr_t
>>(
454 std::make_tuple(hello
.entity_type(), hello
.peer_addr()));
460 seastar::future
<> ProtocolV2::handle_auth_reply()
462 return frame_assembler
->read_main_preamble(
463 ).then([this](auto ret
) {
465 case Tag::AUTH_BAD_METHOD
:
466 return frame_assembler
->read_frame_payload(
467 ).then([this](auto payload
) {
468 // handle_auth_bad_method() logic
469 auto bad_method
= AuthBadMethodFrame::Decode(payload
->back());
470 logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
471 "allowed_methods={}, allowed_modes={}",
472 conn
, bad_method
.method(), cpp_strerror(bad_method
.result()),
473 bad_method
.allowed_methods(), bad_method
.allowed_modes());
474 ceph_assert(messenger
.get_auth_client());
475 int r
= messenger
.get_auth_client()->handle_auth_bad_method(
477 bad_method
.method(), bad_method
.result(),
478 bad_method
.allowed_methods(), bad_method
.allowed_modes());
480 logger().warn("{} auth_client handle_auth_bad_method returned {}",
484 return client_auth(bad_method
.allowed_methods());
486 case Tag::AUTH_REPLY_MORE
:
487 return frame_assembler
->read_frame_payload(
488 ).then([this](auto payload
) {
489 // handle_auth_reply_more() logic
490 auto auth_more
= AuthReplyMoreFrame::Decode(payload
->back());
491 logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
492 conn
, auth_more
.auth_payload().length());
493 ceph_assert(messenger
.get_auth_client());
494 // let execute_connecting() take care of the thrown exception
495 auto reply
= messenger
.get_auth_client()->handle_auth_reply_more(
496 conn
, *auth_meta
, auth_more
.auth_payload());
497 auto more_reply
= AuthRequestMoreFrame::Encode(reply
);
498 logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
499 conn
, reply
.length());
500 return frame_assembler
->write_flush_frame(more_reply
);
502 return handle_auth_reply();
505 return frame_assembler
->read_frame_payload(
506 ).then([this](auto payload
) {
507 // handle_auth_done() logic
508 auto auth_done
= AuthDoneFrame::Decode(payload
->back());
509 logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
510 conn
, auth_done
.global_id(),
511 ceph_con_mode_name(auth_done
.con_mode()),
512 auth_done
.auth_payload().length());
513 ceph_assert(messenger
.get_auth_client());
514 int r
= messenger
.get_auth_client()->handle_auth_done(
517 auth_done
.global_id(),
518 auth_done
.con_mode(),
519 auth_done
.auth_payload());
521 logger().warn("{} auth_client handle_auth_done returned {}", conn
, r
);
524 auth_meta
->con_mode
= auth_done
.con_mode();
525 frame_assembler
->create_session_stream_handlers(*auth_meta
, false);
526 return finish_auth();
529 unexpected_tag(ret
.tag
, conn
, "handle_auth_reply");
530 return seastar::now();
536 seastar::future
<> ProtocolV2::client_auth(std::vector
<uint32_t> &allowed_methods
)
538 // send_auth_request() logic
539 ceph_assert(messenger
.get_auth_client());
542 auto [auth_method
, preferred_modes
, bl
] =
543 messenger
.get_auth_client()->get_auth_request(conn
, *auth_meta
);
544 auth_meta
->auth_method
= auth_method
;
545 auto frame
= AuthRequestFrame::Encode(auth_method
, preferred_modes
, bl
);
546 logger().debug("{} WRITE AuthRequestFrame: method={},"
547 " preferred_modes={}, payload_len={}",
548 conn
, auth_method
, preferred_modes
, bl
.length());
549 return frame_assembler
->write_flush_frame(frame
551 return handle_auth_reply();
553 } catch (const crimson::auth::error
& e
) {
554 logger().error("{} get_initial_auth_request returned {}", conn
, e
.what());
555 ABORT_IN_CLOSE(true);
556 return seastar::now();
560 seastar::future
<ProtocolV2::next_step_t
>
561 ProtocolV2::process_wait()
563 return frame_assembler
->read_frame_payload(
564 ).then([this](auto payload
) {
565 // handle_wait() logic
566 logger().debug("{} GOT WaitFrame", conn
);
567 WaitFrame::Decode(payload
->back());
568 return next_step_t::wait
;
572 seastar::future
<ProtocolV2::next_step_t
>
573 ProtocolV2::client_connect()
575 // send_client_ident() logic
577 if (conn
.policy
.lossy
) {
578 flags
|= CEPH_MSG_CONNECT_LOSSY
;
581 auto client_ident
= ClientIdentFrame::Encode(
582 messenger
.get_myaddrs(),
584 messenger
.get_myname().num(),
586 conn
.policy
.features_supported
,
587 conn
.policy
.features_required
| msgr2_required
, flags
,
590 logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
591 " gs={}, features_supported={}, features_required={},"
592 " flags={}, cookie={}",
593 conn
, messenger
.get_myaddrs(), conn
.target_addr
,
594 messenger
.get_myname().num(), global_seq
,
595 conn
.policy
.features_supported
,
596 conn
.policy
.features_required
| msgr2_required
,
597 flags
, client_cookie
);
598 return frame_assembler
->write_flush_frame(client_ident
600 return frame_assembler
->read_main_preamble();
601 }).then([this](auto ret
) {
603 case Tag::IDENT_MISSING_FEATURES
:
604 return frame_assembler
->read_frame_payload(
605 ).then([this](auto payload
) {
606 // handle_ident_missing_features() logic
607 auto ident_missing
= IdentMissingFeaturesFrame::Decode(payload
->back());
608 logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
609 " (client does not support all server features)",
610 conn
, ident_missing
.features());
612 return next_step_t::none
;
615 return process_wait();
616 case Tag::SERVER_IDENT
:
617 return frame_assembler
->read_frame_payload(
618 ).then([this](auto payload
) {
619 // handle_server_ident() logic
620 io_handler
.requeue_out_sent();
621 auto server_ident
= ServerIdentFrame::Decode(payload
->back());
622 logger().debug("{} GOT ServerIdentFrame:"
623 " addrs={}, gid={}, gs={},"
624 " features_supported={}, features_required={},"
625 " flags={}, cookie={}",
627 server_ident
.addrs(), server_ident
.gid(),
628 server_ident
.global_seq(),
629 server_ident
.supported_features(),
630 server_ident
.required_features(),
631 server_ident
.flags(), server_ident
.cookie());
633 // is this who we intended to talk to?
634 // be a bit forgiving here, since we may be connecting based on addresses parsed out
635 // of mon_host or something.
636 if (!server_ident
.addrs().contains(conn
.target_addr
)) {
637 logger().warn("{} peer identifies as {}, does not include {}",
638 conn
, server_ident
.addrs(), conn
.target_addr
);
639 throw std::system_error(
640 make_error_code(crimson::net::error::bad_peer_address
));
643 server_cookie
= server_ident
.cookie();
645 // TODO: change peer_addr to entity_addrvec_t
646 if (server_ident
.addrs().front() != conn
.peer_addr
) {
647 logger().warn("{} peer advertises as {}, does not match {}",
648 conn
, server_ident
.addrs(), conn
.peer_addr
);
649 throw std::system_error(
650 make_error_code(crimson::net::error::bad_peer_address
));
652 if (conn
.get_peer_id() != entity_name_t::NEW
&&
653 conn
.get_peer_id() != server_ident
.gid()) {
654 logger().error("{} connection peer id ({}) does not match "
655 "what it should be ({}) during connecting, close",
656 conn
, server_ident
.gid(), conn
.get_peer_id());
657 ABORT_IN_CLOSE(true);
659 conn
.set_peer_id(server_ident
.gid());
660 conn
.set_features(server_ident
.supported_features() &
661 conn
.policy
.features_supported
);
662 logger().debug("{} UPDATE: features={}", conn
, conn
.get_features());
663 peer_global_seq
= server_ident
.global_seq();
665 bool lossy
= server_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
666 if (lossy
!= conn
.policy
.lossy
) {
667 logger().warn("{} UPDATE Policy(lossy={}) from server flags", conn
, lossy
);
668 conn
.policy
.lossy
= lossy
;
670 if (lossy
&& (connect_seq
!= 0 || server_cookie
!= 0)) {
671 logger().warn("{} UPDATE cs=0({}) sc=0({}) for lossy policy",
672 conn
, connect_seq
, server_cookie
);
677 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
680 unexpected_tag(ret
.tag
, conn
, "post_client_connect");
681 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
687 seastar::future
<ProtocolV2::next_step_t
>
688 ProtocolV2::client_reconnect()
690 // send_reconnect() logic
691 auto reconnect
= ReconnectFrame::Encode(messenger
.get_myaddrs(),
696 io_handler
.get_in_seq());
697 logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
698 " server_cookie={}, gs={}, cs={}, in_seq={}",
699 conn
, messenger
.get_myaddrs(),
700 client_cookie
, server_cookie
,
701 global_seq
, connect_seq
, io_handler
.get_in_seq());
702 return frame_assembler
->write_flush_frame(reconnect
).then([this] {
703 return frame_assembler
->read_main_preamble();
704 }).then([this](auto ret
) {
706 case Tag::SESSION_RETRY_GLOBAL
:
707 return frame_assembler
->read_frame_payload(
708 ).then([this](auto payload
) {
709 // handle_session_retry_global() logic
710 auto retry
= RetryGlobalFrame::Decode(payload
->back());
711 logger().warn("{} GOT RetryGlobalFrame: gs={}",
712 conn
, retry
.global_seq());
713 global_seq
= messenger
.get_global_seq(retry
.global_seq());
714 logger().warn("{} UPDATE: gs={} for retry global", conn
, global_seq
);
715 return client_reconnect();
717 case Tag::SESSION_RETRY
:
718 return frame_assembler
->read_frame_payload(
719 ).then([this](auto payload
) {
720 // handle_session_retry() logic
721 auto retry
= RetryFrame::Decode(payload
->back());
722 logger().warn("{} GOT RetryFrame: cs={}",
723 conn
, retry
.connect_seq());
724 connect_seq
= retry
.connect_seq() + 1;
725 logger().warn("{} UPDATE: cs={}", conn
, connect_seq
);
726 return client_reconnect();
728 case Tag::SESSION_RESET
:
729 return frame_assembler
->read_frame_payload(
730 ).then([this](auto payload
) {
731 if (unlikely(state
!= state_t::CONNECTING
)) {
732 logger().debug("{} triggered {} before reset_session()",
733 conn
, get_state_name(state
));
736 // handle_session_reset() logic
737 auto reset
= ResetFrame::Decode(payload
->back());
738 logger().warn("{} GOT ResetFrame: full={}", conn
, reset
.full());
739 reset_session(reset
.full());
740 return client_connect();
743 return process_wait();
744 case Tag::SESSION_RECONNECT_OK
:
745 return frame_assembler
->read_frame_payload(
746 ).then([this](auto payload
) {
747 // handle_reconnect_ok() logic
748 auto reconnect_ok
= ReconnectOkFrame::Decode(payload
->back());
749 logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
750 conn
, reconnect_ok
.msg_seq());
751 io_handler
.requeue_out_sent_up_to(reconnect_ok
.msg_seq());
752 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
755 unexpected_tag(ret
.tag
, conn
, "post_client_reconnect");
756 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
762 void ProtocolV2::execute_connecting()
764 ceph_assert_always(!is_socket_valid
);
765 trigger_state(state_t::CONNECTING
, io_state_t::delay
, false);
766 gated_execute("execute_connecting", conn
, [this] {
767 global_seq
= messenger
.get_global_seq();
768 assert(client_cookie
!= 0);
769 if (!conn
.policy
.lossy
&& server_cookie
!= 0) {
771 logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
772 conn
, global_seq
, connect_seq
);
773 } else { // conn.policy.lossy || server_cookie == 0
774 assert(connect_seq
== 0);
775 assert(server_cookie
== 0);
776 logger().debug("{} UPDATE: gs={} for connect", conn
, global_seq
);
778 return wait_exit_io().then([this] {
779 #ifdef UNIT_TESTS_BUILT
780 // process custom_bp_t::SOCKET_CONNECTING
781 // supports CONTINUE/FAULT/BLOCK
782 if (conn
.interceptor
) {
783 auto action
= conn
.interceptor
->intercept(
784 conn
, {custom_bp_t::SOCKET_CONNECTING
});
786 case bp_action_t::CONTINUE
:
787 return seastar::now();
788 case bp_action_t::FAULT
:
789 logger().info("[Test] got FAULT");
791 case bp_action_t::BLOCK
:
792 logger().info("[Test] got BLOCK");
793 return conn
.interceptor
->blocker
.block();
795 ceph_abort("unexpected action from trap");
798 return seastar::now();
802 ceph_assert_always(frame_assembler
);
803 if (unlikely(state
!= state_t::CONNECTING
)) {
804 logger().debug("{} triggered {} before Socket::connect()",
805 conn
, get_state_name(state
));
808 return Socket::connect(conn
.peer_addr
);
809 }).then([this](SocketRef new_socket
) {
810 logger().debug("{} socket connected", conn
);
811 if (unlikely(state
!= state_t::CONNECTING
)) {
812 logger().debug("{} triggered {} during Socket::connect()",
813 conn
, get_state_name(state
));
814 return new_socket
->close().then([sock
=std::move(new_socket
)] {
819 frame_assembler
->set_socket(std::move(new_socket
));
822 gate
.dispatch_in_background(
823 "replace_socket_connecting",
825 [this, new_socket
=std::move(new_socket
)]() mutable {
826 return frame_assembler
->replace_shutdown_socket(std::move(new_socket
));
830 is_socket_valid
= true;
831 return seastar::now();
833 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
834 frame_assembler
->reset_handlers();
835 frame_assembler
->start_recording();
836 return banner_exchange(true);
837 }).then([this] (auto&& ret
) {
838 auto [_peer_type
, _my_addr_from_peer
] = std::move(ret
);
839 if (conn
.get_peer_type() != _peer_type
) {
840 logger().warn("{} connection peer type does not match what peer advertises {} != {}",
841 conn
, ceph_entity_type_name(conn
.get_peer_type()),
842 ceph_entity_type_name(_peer_type
));
843 ABORT_IN_CLOSE(true);
845 if (unlikely(state
!= state_t::CONNECTING
)) {
846 logger().debug("{} triggered {} during banner_exchange(), abort",
847 conn
, get_state_name(state
));
850 frame_assembler
->learn_socket_ephemeral_port_as_connector(
851 _my_addr_from_peer
.get_port());
852 if (unlikely(_my_addr_from_peer
.is_legacy())) {
853 logger().warn("{} peer sent a legacy address for me: {}",
854 conn
, _my_addr_from_peer
);
855 throw std::system_error(
856 make_error_code(crimson::net::error::bad_peer_address
));
858 _my_addr_from_peer
.set_type(entity_addr_t::TYPE_MSGR2
);
859 messenger
.learned_addr(_my_addr_from_peer
, conn
);
860 return client_auth();
862 if (server_cookie
== 0) {
863 ceph_assert(connect_seq
== 0);
864 return client_connect();
866 ceph_assert(connect_seq
> 0);
867 return client_reconnect();
869 }).then([this] (next_step_t next
) {
870 if (unlikely(state
!= state_t::CONNECTING
)) {
871 logger().debug("{} triggered {} at the end of execute_connecting()",
872 conn
, get_state_name(state
));
876 case next_step_t::ready
: {
877 logger().info("{} connected: gs={}, pgs={}, cs={}, "
878 "client_cookie={}, server_cookie={}, {}",
879 conn
, global_seq
, peer_global_seq
, connect_seq
,
880 client_cookie
, server_cookie
,
881 io_stat_printer
{io_handler
});
882 io_handler
.dispatch_connect();
883 if (unlikely(state
!= state_t::CONNECTING
)) {
884 logger().debug("{} triggered {} after ms_handle_connect(), abort",
885 conn
, get_state_name(state
));
891 case next_step_t::wait
: {
892 logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn
);
893 ceph_assert_always(is_socket_valid
);
894 frame_assembler
->shutdown_socket();
895 is_socket_valid
= false;
900 ceph_abort("impossible next step");
903 }).handle_exception([this](std::exception_ptr eptr
) {
904 fault(state_t::CONNECTING
, "execute_connecting", eptr
);
911 seastar::future
<> ProtocolV2::_auth_bad_method(int r
)
913 // _auth_bad_method() logic
915 auto [allowed_methods
, allowed_modes
] =
916 messenger
.get_auth_server()->get_supported_auth_methods(conn
.get_peer_type());
917 auto bad_method
= AuthBadMethodFrame::Encode(
918 auth_meta
->auth_method
, r
, allowed_methods
, allowed_modes
);
919 logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
920 "allowed_methods={}, allowed_modes={})",
921 conn
, auth_meta
->auth_method
, cpp_strerror(r
),
922 allowed_methods
, allowed_modes
);
923 return frame_assembler
->write_flush_frame(bad_method
925 return server_auth();
929 seastar::future
<> ProtocolV2::_handle_auth_request(bufferlist
& auth_payload
, bool more
)
931 // _handle_auth_request() logic
932 ceph_assert(messenger
.get_auth_server());
934 int r
= messenger
.get_auth_server()->handle_auth_request(
938 auth_meta
->auth_method
,
940 &conn
.peer_global_id
,
945 auto auth_done
= AuthDoneFrame::Encode(
946 conn
.peer_global_id
, auth_meta
->con_mode
, reply
);
947 logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
948 conn
, conn
.peer_global_id
,
949 ceph_con_mode_name(auth_meta
->con_mode
), reply
.length());
950 return frame_assembler
->write_flush_frame(auth_done
952 ceph_assert(auth_meta
);
953 frame_assembler
->create_session_stream_handlers(*auth_meta
, true);
954 return finish_auth();
959 auto more
= AuthReplyMoreFrame::Encode(reply
);
960 logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
961 conn
, reply
.length());
962 return frame_assembler
->write_flush_frame(more
964 return frame_assembler
->read_main_preamble();
965 }).then([this](auto ret
) {
966 expect_tag(Tag::AUTH_REQUEST_MORE
, ret
.tag
, conn
, "read_auth_request_more");
967 return frame_assembler
->read_frame_payload();
968 }).then([this](auto payload
) {
969 auto auth_more
= AuthRequestMoreFrame::Decode(payload
->back());
970 logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
971 conn
, auth_more
.auth_payload().length());
972 return _handle_auth_request(auth_more
.auth_payload(), true);
976 logger().warn("{} auth_server handle_auth_request returned -EBUSY", conn
);
978 return seastar::now();
981 logger().warn("{} auth_server handle_auth_request returned {}", conn
, r
);
982 return _auth_bad_method(r
);
987 seastar::future
<> ProtocolV2::server_auth()
989 return frame_assembler
->read_main_preamble(
990 ).then([this](auto ret
) {
991 expect_tag(Tag::AUTH_REQUEST
, ret
.tag
, conn
, "read_auth_request");
992 return frame_assembler
->read_frame_payload();
993 }).then([this](auto payload
) {
994 // handle_auth_request() logic
995 auto request
= AuthRequestFrame::Decode(payload
->back());
996 logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
998 conn
, request
.method(), request
.preferred_modes(),
999 request
.auth_payload().length());
1000 auth_meta
->auth_method
= request
.method();
1001 auth_meta
->con_mode
= messenger
.get_auth_server()->pick_con_mode(
1002 conn
.get_peer_type(), auth_meta
->auth_method
,
1003 request
.preferred_modes());
1004 if (auth_meta
->con_mode
== CEPH_CON_MODE_UNKNOWN
) {
1005 logger().warn("{} auth_server pick_con_mode returned mode CEPH_CON_MODE_UNKNOWN", conn
);
1006 return _auth_bad_method(-EOPNOTSUPP
);
1008 return _handle_auth_request(request
.auth_payload(), false);
1012 bool ProtocolV2::validate_peer_name(const entity_name_t
& peer_name
) const
1014 auto my_peer_name
= conn
.get_peer_name();
1015 if (my_peer_name
.type() != peer_name
.type()) {
1018 if (my_peer_name
.num() != entity_name_t::NEW
&&
1019 peer_name
.num() != entity_name_t::NEW
&&
1020 my_peer_name
.num() != peer_name
.num()) {
1026 seastar::future
<ProtocolV2::next_step_t
>
1027 ProtocolV2::send_wait()
1029 auto wait
= WaitFrame::Encode();
1030 logger().debug("{} WRITE WaitFrame", conn
);
1031 return frame_assembler
->write_flush_frame(wait
1033 return next_step_t::wait
;
1037 seastar::future
<ProtocolV2::next_step_t
>
1038 ProtocolV2::reuse_connection(
1039 ProtocolV2
* existing_proto
, bool do_reset
,
1040 bool reconnect
, uint64_t conn_seq
, uint64_t msg_seq
)
1042 if (unlikely(state
!= state_t::ACCEPTING
)) {
1043 logger().debug("{} triggered {} before trigger_replacing()",
1044 conn
, get_state_name(state
));
1048 existing_proto
->trigger_replacing(reconnect
,
1050 frame_assembler
->to_replace(),
1051 std::move(auth_meta
),
1054 conn
.get_peer_name(),
1055 conn
.get_features(),
1056 peer_supported_features
,
1059 ceph_assert_always(has_socket
&& is_socket_valid
);
1060 is_socket_valid
= false;
1062 #ifdef UNIT_TESTS_BUILT
1063 if (conn
.interceptor
) {
1064 conn
.interceptor
->register_conn_replaced(conn
);
1067 // close this connection because all the necessary information is delivered
1068 // to the exisiting connection, and jump to error handling code to abort the
1070 ABORT_IN_CLOSE(false);
1071 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1074 seastar::future
<ProtocolV2::next_step_t
>
1075 ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn
)
1077 // handle_existing_connection() logic
1078 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1079 existing_conn
->protocol
.get());
1080 ceph_assert(existing_proto
);
1081 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) connecting,"
1082 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1083 conn
, global_seq
, peer_global_seq
, connect_seq
,
1084 client_cookie
, server_cookie
,
1085 fmt::ptr(existing_conn
.get()), get_state_name(existing_proto
->state
),
1086 existing_proto
->global_seq
,
1087 existing_proto
->peer_global_seq
,
1088 existing_proto
->connect_seq
,
1089 existing_proto
->client_cookie
,
1090 existing_proto
->server_cookie
);
1092 if (!validate_peer_name(existing_conn
->get_peer_name())) {
1093 logger().error("{} server_connect: my peer_name doesn't match"
1094 " the existing connection {}, abort", conn
, fmt::ptr(existing_conn
.get()));
1098 if (existing_proto
->state
== state_t::REPLACING
) {
1099 logger().warn("{} server_connect: racing replace happened while"
1100 " replacing existing connection {}, send wait.",
1101 conn
, *existing_conn
);
1105 if (existing_proto
->peer_global_seq
> peer_global_seq
) {
1106 logger().warn("{} server_connect:"
1107 " this is a stale connection, because peer_global_seq({})"
1108 " < existing->peer_global_seq({}), close this connection"
1109 " in favor of existing connection {}",
1110 conn
, peer_global_seq
,
1111 existing_proto
->peer_global_seq
, *existing_conn
);
1115 if (existing_conn
->policy
.lossy
) {
1116 // existing connection can be thrown out in favor of this one
1117 logger().warn("{} server_connect:"
1118 " existing connection {} is a lossy channel. Close existing in favor of"
1119 " this connection", conn
, *existing_conn
);
1120 if (unlikely(state
!= state_t::ACCEPTING
)) {
1121 logger().debug("{} triggered {} before execute_establishing()",
1122 conn
, get_state_name(state
));
1125 execute_establishing(existing_conn
);
1126 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1129 if (existing_proto
->server_cookie
!= 0) {
1130 if (existing_proto
->client_cookie
!= client_cookie
) {
1131 // Found previous session
1132 // peer has reset and we're going to reuse the existing connection
1133 // by replacing the socket
1134 logger().warn("{} server_connect:"
1135 " found new session (cs={})"
1136 " when existing {} {} is with stale session (cs={}, ss={}),"
1137 " peer must have reset",
1140 get_state_name(existing_proto
->state
),
1142 existing_proto
->client_cookie
,
1143 existing_proto
->server_cookie
);
1144 return reuse_connection(existing_proto
, conn
.policy
.resetcheck
);
1146 // session establishment interrupted between client_ident and server_ident,
1148 logger().warn("{} server_connect: found client session with existing {} {}"
1149 " matched (cs={}, ss={}), continuing session establishment",
1151 get_state_name(existing_proto
->state
),
1154 existing_proto
->server_cookie
);
1155 return reuse_connection(existing_proto
);
1158 // Looks like a connection race: server and client are both connecting to
1159 // each other at the same time.
1160 if (existing_proto
->client_cookie
!= client_cookie
) {
1161 if (existing_conn
->peer_wins()) {
1162 // acceptor (this connection, the peer) wins
1163 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1164 " and win, reusing existing {} {}",
1167 existing_proto
->client_cookie
,
1168 get_state_name(existing_proto
->state
),
1170 return reuse_connection(existing_proto
);
1172 // acceptor (this connection, the peer) loses
1173 logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
1174 " and lose to existing {}, ask client to wait",
1175 conn
, client_cookie
, existing_proto
->client_cookie
, *existing_conn
);
1176 return existing_conn
->send_keepalive().then([this] {
1181 logger().warn("{} server_connect: found client session with existing {} {}"
1182 " matched (cs={}, ss={}), continuing session establishment",
1184 get_state_name(existing_proto
->state
),
1187 existing_proto
->server_cookie
);
1188 return reuse_connection(existing_proto
);
1193 seastar::future
<ProtocolV2::next_step_t
>
1194 ProtocolV2::server_connect()
1196 return frame_assembler
->read_frame_payload(
1197 ).then([this](auto payload
) {
1198 // handle_client_ident() logic
1199 auto client_ident
= ClientIdentFrame::Decode(payload
->back());
1200 logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
1201 " gid={}, gs={}, features_supported={},"
1202 " features_required={}, flags={}, cookie={}",
1203 conn
, client_ident
.addrs(), client_ident
.target_addr(),
1204 client_ident
.gid(), client_ident
.global_seq(),
1205 client_ident
.supported_features(),
1206 client_ident
.required_features(),
1207 client_ident
.flags(), client_ident
.cookie());
1209 if (client_ident
.addrs().empty() ||
1210 client_ident
.addrs().front() == entity_addr_t()) {
1211 logger().warn("{} oops, client_ident.addrs() is empty", conn
);
1212 throw std::system_error(
1213 make_error_code(crimson::net::error::bad_peer_address
));
1215 if (!messenger
.get_myaddrs().contains(client_ident
.target_addr())) {
1216 logger().warn("{} peer is trying to reach {} which is not us ({})",
1217 conn
, client_ident
.target_addr(), messenger
.get_myaddrs());
1218 throw std::system_error(
1219 make_error_code(crimson::net::error::bad_peer_address
));
1221 conn
.peer_addr
= client_ident
.addrs().front();
1222 logger().debug("{} UPDATE: peer_addr={}", conn
, conn
.peer_addr
);
1223 conn
.target_addr
= conn
.peer_addr
;
1224 if (!conn
.policy
.lossy
&& !conn
.policy
.server
&& conn
.target_addr
.get_port() <= 0) {
1225 logger().warn("{} we don't know how to reconnect to peer {}",
1226 conn
, conn
.target_addr
);
1227 throw std::system_error(
1228 make_error_code(crimson::net::error::bad_peer_address
));
1231 if (conn
.get_peer_id() != entity_name_t::NEW
&&
1232 conn
.get_peer_id() != client_ident
.gid()) {
1233 logger().error("{} client_ident peer_id ({}) does not match"
1234 " what it should be ({}) during accepting, abort",
1235 conn
, client_ident
.gid(), conn
.get_peer_id());
1238 conn
.set_peer_id(client_ident
.gid());
1239 client_cookie
= client_ident
.cookie();
1241 uint64_t feat_missing
=
1242 (conn
.policy
.features_required
| msgr2_required
) &
1243 ~(uint64_t)client_ident
.supported_features();
1245 auto ident_missing_features
= IdentMissingFeaturesFrame::Encode(feat_missing
);
1246 logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
1247 conn
, feat_missing
);
1248 return frame_assembler
->write_flush_frame(ident_missing_features
1250 return next_step_t::wait
;
1253 conn
.set_features(client_ident
.supported_features() &
1254 conn
.policy
.features_supported
);
1255 logger().debug("{} UPDATE: features={}", conn
, conn
.get_features());
1257 peer_global_seq
= client_ident
.global_seq();
1259 bool lossy
= client_ident
.flags() & CEPH_MSG_CONNECT_LOSSY
;
1260 if (lossy
!= conn
.policy
.lossy
) {
1261 logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
1262 conn
, conn
.policy
.lossy
, lossy
);
1265 // Looks good so far, let's check if there is already an existing connection
1268 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1270 if (existing_conn
) {
1271 return handle_existing_connection(existing_conn
);
1273 if (unlikely(state
!= state_t::ACCEPTING
)) {
1274 logger().debug("{} triggered {} before execute_establishing()",
1275 conn
, get_state_name(state
));
1278 execute_establishing(nullptr);
1279 return seastar::make_ready_future
<next_step_t
>(next_step_t::ready
);
1284 seastar::future
<ProtocolV2::next_step_t
>
1285 ProtocolV2::read_reconnect()
1287 return frame_assembler
->read_main_preamble(
1288 ).then([this](auto ret
) {
1289 expect_tag(Tag::SESSION_RECONNECT
, ret
.tag
, conn
, "read_session_reconnect");
1290 return server_reconnect();
1294 seastar::future
<ProtocolV2::next_step_t
>
1295 ProtocolV2::send_retry(uint64_t connect_seq
)
1297 auto retry
= RetryFrame::Encode(connect_seq
);
1298 logger().warn("{} WRITE RetryFrame: cs={}", conn
, connect_seq
);
1299 return frame_assembler
->write_flush_frame(retry
1301 return read_reconnect();
1305 seastar::future
<ProtocolV2::next_step_t
>
1306 ProtocolV2::send_retry_global(uint64_t global_seq
)
1308 auto retry
= RetryGlobalFrame::Encode(global_seq
);
1309 logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn
, global_seq
);
1310 return frame_assembler
->write_flush_frame(retry
1312 return read_reconnect();
1316 seastar::future
<ProtocolV2::next_step_t
>
1317 ProtocolV2::send_reset(bool full
)
1319 auto reset
= ResetFrame::Encode(full
);
1320 logger().warn("{} WRITE ResetFrame: full={}", conn
, full
);
1321 return frame_assembler
->write_flush_frame(reset
1323 return frame_assembler
->read_main_preamble();
1324 }).then([this](auto ret
) {
1325 expect_tag(Tag::CLIENT_IDENT
, ret
.tag
, conn
, "post_send_reset");
1326 return server_connect();
1330 seastar::future
<ProtocolV2::next_step_t
>
1331 ProtocolV2::server_reconnect()
1333 return frame_assembler
->read_frame_payload(
1334 ).then([this](auto payload
) {
1335 // handle_reconnect() logic
1336 auto reconnect
= ReconnectFrame::Decode(payload
->back());
1338 logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
1339 " server_cookie={}, gs={}, cs={}, msg_seq={}",
1340 conn
, reconnect
.addrs(),
1341 reconnect
.client_cookie(), reconnect
.server_cookie(),
1342 reconnect
.global_seq(), reconnect
.connect_seq(),
1343 reconnect
.msg_seq());
1345 // can peer_addrs be changed on-the-fly?
1346 // TODO: change peer_addr to entity_addrvec_t
1347 entity_addr_t paddr
= reconnect
.addrs().front();
1348 if (paddr
.is_msgr2() || paddr
.is_any()) {
1351 logger().warn("{} peer's address {} is not v2", conn
, paddr
);
1352 throw std::system_error(
1353 make_error_code(crimson::net::error::bad_peer_address
));
1355 if (conn
.peer_addr
== entity_addr_t()) {
1356 conn
.peer_addr
= paddr
;
1357 } else if (conn
.peer_addr
!= paddr
) {
1358 logger().error("{} peer identifies as {}, while conn.peer_addr={},"
1359 " reconnect failed",
1360 conn
, paddr
, conn
.peer_addr
);
1361 throw std::system_error(
1362 make_error_code(crimson::net::error::bad_peer_address
));
1364 peer_global_seq
= reconnect
.global_seq();
1366 SocketConnectionRef existing_conn
= messenger
.lookup_conn(conn
.peer_addr
);
1368 if (!existing_conn
) {
1369 // there is no existing connection therefore cannot reconnect to previous
1371 logger().warn("{} server_reconnect: no existing connection from address {},"
1372 " reseting client", conn
, conn
.peer_addr
);
1373 return send_reset(true);
1376 ProtocolV2
*existing_proto
= dynamic_cast<ProtocolV2
*>(
1377 existing_conn
->protocol
.get());
1378 ceph_assert(existing_proto
);
1379 logger().debug("{}(gs={}, pgs={}, cs={}, cc={}, sc={}) re-connecting,"
1380 " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
1381 conn
, global_seq
, peer_global_seq
, reconnect
.connect_seq(),
1382 reconnect
.client_cookie(), reconnect
.server_cookie(),
1383 fmt::ptr(existing_conn
.get()),
1384 get_state_name(existing_proto
->state
),
1385 existing_proto
->global_seq
,
1386 existing_proto
->peer_global_seq
,
1387 existing_proto
->connect_seq
,
1388 existing_proto
->client_cookie
,
1389 existing_proto
->server_cookie
);
1391 if (!validate_peer_name(existing_conn
->get_peer_name())) {
1392 logger().error("{} server_reconnect: my peer_name doesn't match"
1393 " the existing connection {}, abort", conn
, fmt::ptr(existing_conn
.get()));
1397 if (existing_proto
->state
== state_t::REPLACING
) {
1398 logger().warn("{} server_reconnect: racing replace happened while "
1399 " replacing existing connection {}, retry global.",
1400 conn
, *existing_conn
);
1401 return send_retry_global(existing_proto
->peer_global_seq
);
1404 if (existing_proto
->client_cookie
!= reconnect
.client_cookie()) {
1405 logger().warn("{} server_reconnect:"
1406 " client_cookie mismatch with existing connection {},"
1407 " cc={} rcc={}. I must have reset, reseting client.",
1408 conn
, *existing_conn
,
1409 existing_proto
->client_cookie
, reconnect
.client_cookie());
1410 return send_reset(conn
.policy
.resetcheck
);
1411 } else if (existing_proto
->server_cookie
== 0) {
1412 // this happens when:
1413 // - a connects to b
1414 // - a sends client_ident
1415 // - b gets client_ident, sends server_ident and sets cookie X
1416 // - connection fault
1417 // - b reconnects to a with cookie X, connect_seq=1
1418 // - a has cookie==0
1419 logger().warn("{} server_reconnect: I was a client (cc={}) and didn't received the"
1420 " server_ident with existing connection {}."
1421 " Asking peer to resume session establishment",
1422 conn
, existing_proto
->client_cookie
, *existing_conn
);
1423 return send_reset(false);
1426 if (existing_proto
->peer_global_seq
> reconnect
.global_seq()) {
1427 logger().warn("{} server_reconnect: stale global_seq: exist_pgs({}) > peer_gs({}),"
1428 " with existing connection {},"
1429 " ask client to retry global",
1430 conn
, existing_proto
->peer_global_seq
,
1431 reconnect
.global_seq(), *existing_conn
);
1432 return send_retry_global(existing_proto
->peer_global_seq
);
1435 if (existing_proto
->connect_seq
> reconnect
.connect_seq()) {
1436 logger().warn("{} server_reconnect: stale peer connect_seq peer_cs({}) < exist_cs({}),"
1437 " with existing connection {}, ask client to retry",
1438 conn
, reconnect
.connect_seq(),
1439 existing_proto
->connect_seq
, *existing_conn
);
1440 return send_retry(existing_proto
->connect_seq
);
1441 } else if (existing_proto
->connect_seq
== reconnect
.connect_seq()) {
1442 // reconnect race: both peers are sending reconnect messages
1443 if (existing_conn
->peer_wins()) {
1444 // acceptor (this connection, the peer) wins
1445 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1446 " and win, reusing existing {} {}",
1448 reconnect
.connect_seq(),
1449 get_state_name(existing_proto
->state
),
1451 return reuse_connection(
1452 existing_proto
, false,
1453 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1455 // acceptor (this connection, the peer) loses
1456 logger().warn("{} server_reconnect: reconnect race detected (cs={})"
1457 " and lose to existing {}, ask client to wait",
1458 conn
, reconnect
.connect_seq(), *existing_conn
);
1461 } else { // existing_proto->connect_seq < reconnect.connect_seq()
1462 logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
1463 " reusing existing {} {}",
1465 existing_proto
->connect_seq
,
1466 reconnect
.connect_seq(),
1467 get_state_name(existing_proto
->state
),
1469 return reuse_connection(
1470 existing_proto
, false,
1471 true, reconnect
.connect_seq(), reconnect
.msg_seq());
1476 void ProtocolV2::execute_accepting()
1478 assert(is_socket_valid
);
1479 trigger_state(state_t::ACCEPTING
, io_state_t::none
, false);
1480 gate
.dispatch_in_background("execute_accepting", conn
, [this] {
1481 return seastar::futurize_invoke([this] {
1482 #ifdef UNIT_TESTS_BUILT
1483 if (conn
.interceptor
) {
1484 auto action
= conn
.interceptor
->intercept(
1485 conn
, {custom_bp_t::SOCKET_ACCEPTED
});
1487 case bp_action_t::CONTINUE
:
1489 case bp_action_t::FAULT
:
1490 logger().info("[Test] got FAULT");
1493 ceph_abort("unexpected action from trap");
1497 auth_meta
= seastar::make_lw_shared
<AuthConnectionMeta
>();
1498 frame_assembler
->reset_handlers();
1499 frame_assembler
->start_recording();
1500 return banner_exchange(false);
1501 }).then([this] (auto&& ret
) {
1502 auto [_peer_type
, _my_addr_from_peer
] = std::move(ret
);
1503 ceph_assert(conn
.get_peer_type() == 0);
1504 conn
.set_peer_type(_peer_type
);
1506 conn
.policy
= messenger
.get_policy(_peer_type
);
1507 logger().info("{} UPDATE: peer_type={},"
1508 " policy(lossy={} server={} standby={} resetcheck={})",
1509 conn
, ceph_entity_type_name(_peer_type
),
1510 conn
.policy
.lossy
, conn
.policy
.server
,
1511 conn
.policy
.standby
, conn
.policy
.resetcheck
);
1512 if (!messenger
.get_myaddr().is_blank_ip() &&
1513 (messenger
.get_myaddr().get_port() != _my_addr_from_peer
.get_port() ||
1514 messenger
.get_myaddr().get_nonce() != _my_addr_from_peer
.get_nonce())) {
1515 logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
1516 conn
, _my_addr_from_peer
, messenger
.get_myaddr());
1517 throw std::system_error(
1518 make_error_code(crimson::net::error::bad_peer_address
));
1520 messenger
.learned_addr(_my_addr_from_peer
, conn
);
1521 return server_auth();
1523 return frame_assembler
->read_main_preamble();
1524 }).then([this](auto ret
) {
1526 case Tag::CLIENT_IDENT
:
1527 return server_connect();
1528 case Tag::SESSION_RECONNECT
:
1529 return server_reconnect();
1531 unexpected_tag(ret
.tag
, conn
, "post_server_auth");
1532 return seastar::make_ready_future
<next_step_t
>(next_step_t::none
);
1535 }).then([this] (next_step_t next
) {
1537 case next_step_t::ready
:
1538 assert(state
!= state_t::ACCEPTING
);
1540 case next_step_t::wait
:
1541 if (unlikely(state
!= state_t::ACCEPTING
)) {
1542 logger().debug("{} triggered {} at the end of execute_accepting()",
1543 conn
, get_state_name(state
));
1546 logger().info("{} execute_accepting(): going to SERVER_WAIT", conn
);
1547 execute_server_wait();
1550 ceph_abort("impossible next step");
1552 }).handle_exception([this](std::exception_ptr eptr
) {
1555 std::rethrow_exception(eptr
);
1556 } catch (std::exception
&e
) {
1559 logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
1560 conn
, get_state_name(state
), e_what
);
1566 // CONNECTING or ACCEPTING state
1568 seastar::future
<> ProtocolV2::finish_auth()
1570 ceph_assert(auth_meta
);
1572 auto records
= frame_assembler
->stop_recording();
1573 const auto sig
= auth_meta
->session_key
.empty() ? sha256_digest_t() :
1574 auth_meta
->session_key
.hmac_sha256(nullptr, records
.rxbuf
);
1575 auto sig_frame
= AuthSignatureFrame::Encode(sig
);
1576 logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn
, sig
);
1577 return frame_assembler
->write_flush_frame(sig_frame
1579 return frame_assembler
->read_main_preamble();
1580 }).then([this](auto ret
) {
1581 expect_tag(Tag::AUTH_SIGNATURE
, ret
.tag
, conn
, "post_finish_auth");
1582 return frame_assembler
->read_frame_payload();
1583 }).then([this, txbuf
=std::move(records
.txbuf
)](auto payload
) {
1584 // handle_auth_signature() logic
1585 auto sig_frame
= AuthSignatureFrame::Decode(payload
->back());
1586 logger().debug("{} GOT AuthSignatureFrame: signature={}", conn
, sig_frame
.signature());
1588 const auto actual_tx_sig
= auth_meta
->session_key
.empty() ?
1589 sha256_digest_t() : auth_meta
->session_key
.hmac_sha256(nullptr, txbuf
);
1590 if (sig_frame
.signature() != actual_tx_sig
) {
1591 logger().warn("{} pre-auth signature mismatch actual_tx_sig={}"
1592 " sig_frame.signature()={}",
1593 conn
, actual_tx_sig
, sig_frame
.signature());
1601 void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn
) {
1602 auto accept_me
= [this] {
1603 messenger
.register_conn(
1604 seastar::static_pointer_cast
<SocketConnection
>(
1605 conn
.shared_from_this()));
1606 messenger
.unaccept_conn(
1607 seastar::static_pointer_cast
<SocketConnection
>(
1608 conn
.shared_from_this()));
1611 ceph_assert_always(is_socket_valid
);
1612 trigger_state(state_t::ESTABLISHING
, io_state_t::delay
, false);
1613 if (existing_conn
) {
1614 static_cast<ProtocolV2
*>(existing_conn
->protocol
.get())->do_close(
1615 true /* is_dispatch_reset */, std::move(accept_me
));
1616 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1617 logger().warn("{} triggered {} during execute_establishing(), "
1618 "the accept event will not be delivered!",
1619 conn
, get_state_name(state
));
1626 io_handler
.dispatch_accept();
1627 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1628 logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
1629 conn
, get_state_name(state
));
1633 gated_execute("execute_establishing", conn
, [this] {
1634 return seastar::futurize_invoke([this] {
1635 return send_server_ident();
1637 if (unlikely(state
!= state_t::ESTABLISHING
)) {
1638 logger().debug("{} triggered {} at the end of execute_establishing()",
1639 conn
, get_state_name(state
));
1642 logger().info("{} established: gs={}, pgs={}, cs={}, "
1643 "client_cookie={}, server_cookie={}, {}",
1644 conn
, global_seq
, peer_global_seq
, connect_seq
,
1645 client_cookie
, server_cookie
,
1646 io_stat_printer
{io_handler
});
1648 }).handle_exception([this](std::exception_ptr eptr
) {
1649 fault(state_t::ESTABLISHING
, "execute_establishing", eptr
);
1654 // ESTABLISHING or REPLACING state
1657 ProtocolV2::send_server_ident()
1659 // send_server_ident() logic
1661 // refered to async-conn v2: not assign gs to global_seq
1662 global_seq
= messenger
.get_global_seq();
1663 logger().debug("{} UPDATE: gs={} for server ident", conn
, global_seq
);
1665 // this is required for the case when this connection is being replaced
1666 io_handler
.requeue_out_sent_up_to(0);
1667 io_handler
.reset_session(false);
1669 if (!conn
.policy
.lossy
) {
1670 server_cookie
= ceph::util::generate_random_number
<uint64_t>(1, -1ll);
1674 if (conn
.policy
.lossy
) {
1675 flags
= flags
| CEPH_MSG_CONNECT_LOSSY
;
1678 auto server_ident
= ServerIdentFrame::Encode(
1679 messenger
.get_myaddrs(),
1680 messenger
.get_myname().num(),
1682 conn
.policy
.features_supported
,
1683 conn
.policy
.features_required
| msgr2_required
,
1687 logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
1688 " gs={}, features_supported={}, features_required={},"
1689 " flags={}, cookie={}",
1690 conn
, messenger
.get_myaddrs(), messenger
.get_myname().num(),
1691 global_seq
, conn
.policy
.features_supported
,
1692 conn
.policy
.features_required
| msgr2_required
,
1693 flags
, server_cookie
);
1695 return frame_assembler
->write_flush_frame(server_ident
);
1700 void ProtocolV2::trigger_replacing(bool reconnect
,
1702 FrameAssemblerV2::mover_t
&&mover
,
1703 AuthConnectionMetaRef
&& new_auth_meta
,
1704 uint64_t new_peer_global_seq
,
1705 uint64_t new_client_cookie
,
1706 entity_name_t new_peer_name
,
1707 uint64_t new_conn_features
,
1708 uint64_t new_peer_supported_features
,
1709 uint64_t new_connect_seq
,
1710 uint64_t new_msg_seq
)
1712 ceph_assert_always(has_socket
|| state
== state_t::CONNECTING
);
1713 ceph_assert_always(!mover
.socket
->is_shutdown());
1714 trigger_state(state_t::REPLACING
, io_state_t::delay
, false);
1715 if (is_socket_valid
) {
1716 frame_assembler
->shutdown_socket();
1717 is_socket_valid
= false;
1719 gate
.dispatch_in_background(
1720 "trigger_replacing",
1725 mover
= std::move(mover
),
1726 new_auth_meta
= std::move(new_auth_meta
),
1727 new_client_cookie
, new_peer_name
,
1728 new_conn_features
, new_peer_supported_features
,
1729 new_peer_global_seq
,
1730 new_connect_seq
, new_msg_seq
] () mutable {
1731 ceph_assert_always(state
== state_t::REPLACING
);
1732 io_handler
.dispatch_accept();
1733 // state may become CLOSING, close mover.socket and abort later
1734 return wait_exit_io(
1736 ceph_assert_always(frame_assembler
);
1737 protocol_timer
.cancel();
1738 auto done
= std::move(execution_done
);
1739 execution_done
= seastar::now();
1744 mover
= std::move(mover
),
1745 new_auth_meta
= std::move(new_auth_meta
),
1746 new_client_cookie
, new_peer_name
,
1747 new_conn_features
, new_peer_supported_features
,
1748 new_peer_global_seq
,
1749 new_connect_seq
, new_msg_seq
] () mutable {
1750 if (state
== state_t::REPLACING
&& do_reset
) {
1751 reset_session(true);
1754 if (unlikely(state
!= state_t::REPLACING
)) {
1755 return mover
.socket
->close(
1756 ).then([sock
= std::move(mover
.socket
)] {
1761 auth_meta
= std::move(new_auth_meta
);
1762 peer_global_seq
= new_peer_global_seq
;
1763 gate
.dispatch_in_background(
1764 "replace_frame_assembler",
1766 [this, mover
=std::move(mover
)]() mutable {
1767 return frame_assembler
->replace_by(std::move(mover
));
1770 is_socket_valid
= true;
1774 connect_seq
= new_connect_seq
;
1775 // send_reconnect_ok() logic
1776 io_handler
.requeue_out_sent_up_to(new_msg_seq
);
1777 auto reconnect_ok
= ReconnectOkFrame::Encode(io_handler
.get_in_seq());
1778 logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn
, io_handler
.get_in_seq());
1779 return frame_assembler
->write_flush_frame(reconnect_ok
);
1781 client_cookie
= new_client_cookie
;
1782 assert(conn
.get_peer_type() == new_peer_name
.type());
1783 if (conn
.get_peer_id() == entity_name_t::NEW
) {
1784 conn
.set_peer_id(new_peer_name
.num());
1786 conn
.set_features(new_conn_features
);
1787 peer_supported_features
= new_peer_supported_features
;
1788 bool is_rev1
= HAVE_MSGR2_FEATURE(peer_supported_features
, REVISION_1
);
1789 frame_assembler
->set_is_rev1(is_rev1
);
1790 return send_server_ident();
1792 }).then([this, reconnect
] {
1793 if (unlikely(state
!= state_t::REPLACING
)) {
1794 logger().debug("{} triggered {} at the end of trigger_replacing()",
1795 conn
, get_state_name(state
));
1798 logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
1799 "client_cookie={}, server_cookie={}, {}",
1800 conn
, reconnect
? "reconnected" : "connected",
1801 global_seq
, peer_global_seq
, connect_seq
,
1802 client_cookie
, server_cookie
,
1803 io_stat_printer
{io_handler
});
1805 }).handle_exception([this](std::exception_ptr eptr
) {
1806 fault(state_t::REPLACING
, "trigger_replacing", eptr
);
1813 void ProtocolV2::notify_out_fault(const char *where
, std::exception_ptr eptr
)
1815 fault(state_t::READY
, where
, eptr
);
1818 void ProtocolV2::execute_ready()
1820 assert(conn
.policy
.lossy
|| (client_cookie
!= 0 && server_cookie
!= 0));
1821 protocol_timer
.cancel();
1822 ceph_assert_always(is_socket_valid
);
1823 trigger_state(state_t::READY
, io_state_t::open
, false);
1828 void ProtocolV2::execute_standby()
1830 ceph_assert_always(!is_socket_valid
);
1831 trigger_state(state_t::STANDBY
, io_state_t::delay
, false);
1834 void ProtocolV2::notify_out()
1836 if (unlikely(state
== state_t::STANDBY
&& !conn
.policy
.server
)) {
1837 logger().info("{} notify_out(): at {}, going to CONNECTING",
1838 conn
, get_state_name(state
));
1839 execute_connecting();
1845 void ProtocolV2::execute_wait(bool max_backoff
)
1847 ceph_assert_always(!is_socket_valid
);
1848 trigger_state(state_t::WAIT
, io_state_t::delay
, false);
1849 gated_execute("execute_wait", conn
, [this, max_backoff
] {
1850 double backoff
= protocol_timer
.last_dur();
1852 backoff
= local_conf().get_val
<double>("ms_max_backoff");
1853 } else if (backoff
> 0) {
1854 backoff
= std::min(local_conf().get_val
<double>("ms_max_backoff"), 2 * backoff
);
1856 backoff
= local_conf().get_val
<double>("ms_initial_backoff");
1858 return protocol_timer
.backoff(backoff
).then([this] {
1859 if (unlikely(state
!= state_t::WAIT
)) {
1860 logger().debug("{} triggered {} at the end of execute_wait()",
1861 conn
, get_state_name(state
));
1864 logger().info("{} execute_wait(): going to CONNECTING", conn
);
1865 execute_connecting();
1866 }).handle_exception([this](std::exception_ptr eptr
) {
1869 std::rethrow_exception(eptr
);
1870 } catch (std::exception
&e
) {
1873 logger().info("{} execute_wait(): protocol aborted at {} -- {}",
1874 conn
, get_state_name(state
), e_what
);
1875 assert(state
== state_t::REPLACING
||
1876 state
== state_t::CLOSING
);
1881 // SERVER_WAIT state
1883 void ProtocolV2::execute_server_wait()
1885 ceph_assert_always(is_socket_valid
);
1886 trigger_state(state_t::SERVER_WAIT
, io_state_t::none
, false);
1887 gated_execute("execute_server_wait", conn
, [this] {
1888 return frame_assembler
->read_exactly(1
1889 ).then([this](auto bl
) {
1890 logger().warn("{} SERVER_WAIT got read, abort", conn
);
1892 }).handle_exception([this](std::exception_ptr eptr
) {
1895 std::rethrow_exception(eptr
);
1896 } catch (std::exception
&e
) {
1899 logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
1900 conn
, get_state_name(state
), e_what
);
1908 void ProtocolV2::notify_mark_down()
1913 seastar::future
<> ProtocolV2::close_clean_yielded()
1915 // yield() so that do_close() can be called *after* close_clean_yielded() is
1916 // applied to all connections in a container using
1917 // seastar::parallel_for_each(). otherwise, we could erase a connection in
1918 // the container when seastar::parallel_for_each() is still iterating in it.
1919 // that'd lead to a segfault.
1920 return seastar::yield(
1921 ).then([this, conn_ref
= conn
.shared_from_this()] {
1923 // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
1924 // which will otherwise result in deadlock
1925 assert(closed_clean_fut
.valid());
1926 return closed_clean_fut
.get_future();
1930 void ProtocolV2::do_close(
1931 bool is_dispatch_reset
,
1932 std::optional
<std::function
<void()>> f_accept_new
)
1936 assert(state
== state_t::CLOSING
);
1940 bool is_replace
= f_accept_new
? true : false;
1941 logger().info("{} closing: reset {}, replace {}", conn
,
1942 is_dispatch_reset
? "yes" : "no",
1943 is_replace
? "yes" : "no");
1952 messenger
.closing_conn(
1953 seastar::static_pointer_cast
<SocketConnection
>(
1954 conn
.shared_from_this()));
1955 if (state
== state_t::ACCEPTING
|| state
== state_t::SERVER_WAIT
) {
1956 messenger
.unaccept_conn(
1957 seastar::static_pointer_cast
<SocketConnection
>(
1958 conn
.shared_from_this()));
1959 } else if (state
>= state_t::ESTABLISHING
&& state
< state_t::CLOSING
) {
1960 messenger
.unregister_conn(
1961 seastar::static_pointer_cast
<SocketConnection
>(
1962 conn
.shared_from_this()));
1967 protocol_timer
.cancel();
1968 trigger_state(state_t::CLOSING
, io_state_t::drop
, false);
1973 if (is_socket_valid
) {
1974 frame_assembler
->shutdown_socket();
1975 is_socket_valid
= false;
1977 assert(!gate
.is_closed());
1978 auto handshake_closed
= gate
.close();
1979 auto io_closed
= io_handler
.close_io(
1980 is_dispatch_reset
, is_replace
);
1982 // asynchronous operations
1983 assert(!closed_clean_fut
.valid());
1984 closed_clean_fut
= seastar::when_all(
1985 std::move(handshake_closed
), std::move(io_closed
)
1986 ).discard_result().then([this] {
1987 ceph_assert_always(!exit_io
.has_value());
1989 ceph_assert_always(frame_assembler
);
1990 return frame_assembler
->close_shutdown_socket();
1992 return seastar::now();
1995 logger().debug("{} closed!", conn
);
1996 messenger
.closed_conn(
1997 seastar::static_pointer_cast
<SocketConnection
>(
1998 conn
.shared_from_this()));
1999 #ifdef UNIT_TESTS_BUILT
2000 closed_clean
= true;
2001 if (conn
.interceptor
) {
2002 conn
.interceptor
->register_conn_closed(conn
);
2005 }).handle_exception([conn_ref
= conn
.shared_from_this(), this] (auto eptr
) {
2006 logger().error("{} closing: closed_clean_fut got unexpected exception {}",
2012 } // namespace crimson::net