]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/ProtocolV2.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
index 95b756637f4e8a8de6ab2e2d49d3a2c9e65aa2f6..55b669384ed3bfd588e97192b00030217c48e4df 100644 (file)
 #include "Errors.h"
 #include "SocketMessenger.h"
 
-#ifdef UNIT_TESTS_BUILT
-#include "Interceptor.h"
-#endif
-
 using namespace ceph::msgr::v2;
 using crimson::common::local_conf;
-using io_state_t = crimson::net::IOHandler::io_state_t;
-using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
 
 namespace {
 
@@ -103,26 +97,6 @@ inline uint64_t generate_client_cookie() {
 
 namespace crimson::net {
 
-#ifdef UNIT_TESTS_BUILT
-// should be consistent to intercept_frame() in FrameAssemblerV2.cc
-void intercept(Breakpoint bp,
-               bp_type_t type,
-               SocketConnection& conn,
-               Interceptor *interceptor,
-               SocketRef& socket) {
-  if (interceptor) {
-    auto action = interceptor->intercept(conn, Breakpoint(bp));
-    socket->set_trap(type, action, &interceptor->blocker);
-  }
-}
-
-#define INTERCEPT_CUSTOM(bp, type)       \
-intercept({bp}, type, conn,              \
-          conn.interceptor, conn.socket)
-#else
-#define INTERCEPT_CUSTOM(bp, type)
-#endif
-
 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
 {
   logger().warn("{} waiting {} seconds ...", conn, seconds);
@@ -146,13 +120,16 @@ ProtocolV2::ProtocolV2(SocketConnection& conn,
     frame_assembler{FrameAssemblerV2::create(conn)},
     auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
     protocol_timer{conn}
-{}
+{
+  io_states = io_handler.get_states();
+}
 
 ProtocolV2::~ProtocolV2() {}
 
 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   ceph_assert(state == state_t::NONE);
   ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
@@ -170,9 +147,10 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
   execute_connecting();
 }
 
-void ProtocolV2::start_accept(SocketRef&& new_socket,
+void ProtocolV2::start_accept(SocketFRef&& new_socket,
                               const entity_addr_t& _peer_addr)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   ceph_assert(state == state_t::NONE);
   // until we know better
   conn.target_addr = _peer_addr;
@@ -182,12 +160,22 @@ void ProtocolV2::start_accept(SocketRef&& new_socket,
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+  auto cc_seq = crosscore.prepare_submit();
+  gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] {
+    return io_handler.set_accepted_sid(
+        cc_seq,
+        frame_assembler->get_socket_shard_id(),
+        seastar::make_foreign(conn.shared_from_this()));
+  });
+
   execute_accepting();
 }
 
-void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
+void ProtocolV2::trigger_state_phase1(state_t new_state)
 {
-  if (!reentrant && new_state == state) {
+  ceph_assert_always(!gate.is_closed());
+  if (new_state == state) {
     logger().error("{} is not allowed to re-trigger state {}",
                    conn, get_state_name(state));
     ceph_abort();
@@ -199,32 +187,84 @@ void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool
   }
   logger().debug("{} TRIGGER {}, was {}",
                  conn, get_state_name(new_state), get_state_name(state));
-  auto pre_state = state;
-  if (pre_state == state_t::READY) {
-    assert(!gate.is_closed());
-    ceph_assert_always(!exit_io.has_value());
-    exit_io = seastar::shared_promise<>();
+
+  if (state == state_t::READY) {
+    // from READY
+    ceph_assert_always(!need_exit_io);
+    ceph_assert_always(!pr_exit_io.has_value());
+    need_exit_io = true;
+    pr_exit_io = seastar::shared_promise<>();
   }
+
+  if (new_state == state_t::STANDBY && !conn.policy.server) {
+    need_notify_out = true;
+  } else {
+    need_notify_out = false;
+  }
+
   state = new_state;
+}
+
+void ProtocolV2::trigger_state_phase2(
+    state_t new_state, io_state_t new_io_state)
+{
+  ceph_assert_always(new_state == state);
+  ceph_assert_always(!gate.is_closed());
+  ceph_assert_always(!pr_switch_io_shard.has_value());
+
+  FrameAssemblerV2Ref fa;
   if (new_state == state_t::READY) {
-    // I'm not responsible to shutdown the socket at READY
-    is_socket_valid = false;
-    io_handler.set_io_state(new_io_state, std::move(frame_assembler));
+    assert(new_io_state == io_state_t::open);
+    assert(io_handler.get_shard_id() ==
+           frame_assembler->get_socket_shard_id());
+    frame_assembler->set_shard_id(io_handler.get_shard_id());
+    fa = std::move(frame_assembler);
   } else {
-    io_handler.set_io_state(new_io_state, nullptr);
+    assert(new_io_state != io_state_t::open);
   }
 
-  /*
-   * not atomic below
-   */
+  auto cc_seq = crosscore.prepare_submit();
+  logger().debug("{} send {} IOHandler::set_io_state(): new_state={}, new_io_state={}, "
+                 "fa={}, set_notify_out={}",
+                 conn, cc_seq, get_state_name(new_state), new_io_state,
+                 fa ? fmt::format("(sid={})", fa->get_shard_id()) : "N/A",
+                 need_notify_out);
+  gate.dispatch_in_background(
+      "set_io_state", conn,
+      [this, cc_seq, new_io_state, fa=std::move(fa)]() mutable {
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(),
+        [this, cc_seq, new_io_state,
+         fa=std::move(fa), set_notify_out=need_notify_out]() mutable {
+      return io_handler.set_io_state(
+          cc_seq, new_io_state, std::move(fa), set_notify_out);
+    });
+  });
 
-  if (pre_state == state_t::READY) {
-    gate.dispatch_in_background("exit_io", conn, [this] {
-      return io_handler.wait_io_exit_dispatching(
-      ).then([this](FrameAssemblerV2Ref fa) {
-        frame_assembler = std::move(fa);
-        exit_io->set_value();
-        exit_io = std::nullopt;
+  if (need_exit_io) {
+    // from READY
+    auto cc_seq = crosscore.prepare_submit();
+    logger().debug("{} send {} IOHandler::wait_io_exit_dispatching() ...",
+                   conn, cc_seq);
+    assert(pr_exit_io.has_value());
+    assert(new_io_state != io_state_t::open);
+    need_exit_io = false;
+    gate.dispatch_in_background("exit_io", conn, [this, cc_seq] {
+      return seastar::smp::submit_to(
+          io_handler.get_shard_id(), [this, cc_seq] {
+        return io_handler.wait_io_exit_dispatching(cc_seq);
+      }).then([this, cc_seq](auto ret) {
+        logger().debug("{} finish {} IOHandler::wait_io_exit_dispatching(), {}",
+                       conn, cc_seq, ret.io_states);
+        frame_assembler = std::move(ret.frame_assembler);
+        assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+        ceph_assert_always(
+            seastar::this_shard_id() == frame_assembler->get_shard_id());
+        ceph_assert_always(!frame_assembler->is_socket_valid());
+        assert(!need_exit_io);
+        io_states = ret.io_states;
+        pr_exit_io->set_value();
+        pr_exit_io = std::nullopt;
       });
     });
   }
@@ -281,7 +321,7 @@ void ProtocolV2::fault(
   if (likely(has_socket)) {
     if (likely(is_socket_valid)) {
       ceph_assert_always(state != state_t::READY);
-      frame_assembler->shutdown_socket();
+      frame_assembler->shutdown_socket<true>(&gate);
       is_socket_valid = false;
     } else {
       ceph_assert_always(state != state_t::ESTABLISHING);
@@ -292,20 +332,20 @@ void ProtocolV2::fault(
   }
 
   if (conn.policy.server ||
-      (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
+      (conn.policy.standby && !io_states.is_out_queued_or_sent())) {
     if (conn.policy.server) {
       logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
                     conn,
                     get_state_name(state),
                     where,
-                    io_stat_printer{io_handler},
+                    io_states,
                     e_what);
     } else {
       logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
                     conn,
                     get_state_name(state),
                     where,
-                    io_stat_printer{io_handler},
+                    io_states,
                     e_what);
     }
     execute_standby();
@@ -315,7 +355,7 @@ void ProtocolV2::fault(
                   conn,
                   get_state_name(state),
                   where,
-                  io_stat_printer{io_handler},
+                  io_states,
                   e_what);
     execute_wait(false);
   } else {
@@ -325,7 +365,7 @@ void ProtocolV2::fault(
                   conn,
                   get_state_name(state),
                   where,
-                  io_stat_printer{io_handler},
+                  io_states,
                   e_what);
     execute_connecting();
   }
@@ -339,7 +379,19 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
   }
-  io_handler.reset_session(full);
+
+  auto cc_seq = crosscore.prepare_submit();
+  logger().debug("{} send {} IOHandler::reset_session({})",
+                 conn, cc_seq, full);
+  io_states.reset_session(full);
+  gate.dispatch_in_background(
+      "reset_session", conn, [this, cc_seq, full] {
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(), [this, cc_seq, full] {
+      return io_handler.reset_session(cc_seq, full);
+    });
+  });
+  // user can make changes
 }
 
 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
@@ -361,98 +413,121 @@ ProtocolV2::banner_exchange(bool is_connect)
                  CRIMSON_MSGR2_SUPPORTED_FEATURES,
                  CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
-  INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
-  return frame_assembler->write_flush(std::move(bl)).then([this] {
-      // 2. read peer banner
-      unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
-      INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
-      return frame_assembler->read_exactly(banner_len); // or read exactly?
-    }).then([this] (auto bl) {
-      // 3. process peer banner and read banner_payload
-      unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
-      logger().debug("{} RECV({}) banner: \"{}\"",
-                     conn, bl.size(),
-                     std::string((const char*)bl.get(), banner_prefix_len));
-
-      if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
-        if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
-          logger().warn("{} peer is using V1 protocol", conn);
-        } else {
-          logger().warn("{} peer sent bad banner", conn);
-        }
-        abort_in_fault();
+#ifdef UNIT_TESTS_BUILT
+  return frame_assembler->intercept_frame(custom_bp_t::BANNER_WRITE, true
+  ).then([this, bl=std::move(bl)]() mutable {
+    return frame_assembler->write_flush(std::move(bl));
+  }
+#else
+  return frame_assembler->write_flush(std::move(bl)
+#endif
+  ).then([this] {
+    // 2. read peer banner
+    unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
+#ifdef UNIT_TESTS_BUILT
+    return frame_assembler->intercept_frame(custom_bp_t::BANNER_READ, false
+    ).then([this, banner_len] {
+      return frame_assembler->read_exactly(banner_len);
+    });
+#else
+    return frame_assembler->read_exactly(banner_len);
+#endif
+  }).then([this](auto bptr) {
+    // 3. process peer banner and read banner_payload
+    unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+    logger().debug("{} RECV({}) banner: \"{}\"",
+                   conn, bptr.length(),
+                   std::string(bptr.c_str(), banner_prefix_len));
+
+    if (memcmp(bptr.c_str(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
+      if (memcmp(bptr.c_str(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
+        logger().warn("{} peer is using V1 protocol", conn);
+      } else {
+        logger().warn("{} peer sent bad banner", conn);
       }
-      bl.trim_front(banner_prefix_len);
+      abort_in_fault();
+    }
 
-      uint16_t payload_len;
-      bufferlist buf;
-      buf.append(buffer::create(std::move(bl)));
-      auto ti = buf.cbegin();
-      try {
-        decode(payload_len, ti);
-      } catch (const buffer::error &e) {
-        logger().warn("{} decode banner payload len failed", conn);
-        abort_in_fault();
-      }
-      logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
-      INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
+    bptr.set_offset(bptr.offset() + banner_prefix_len);
+    bptr.set_length(bptr.length() - banner_prefix_len);
+    assert(bptr.length() == sizeof(ceph_le16));
+
+    uint16_t payload_len;
+    bufferlist buf;
+    buf.append(std::move(bptr));
+    auto ti = buf.cbegin();
+    try {
+      decode(payload_len, ti);
+    } catch (const buffer::error &e) {
+      logger().warn("{} decode banner payload len failed", conn);
+      abort_in_fault();
+    }
+    logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
+#ifdef UNIT_TESTS_BUILT
+    return frame_assembler->intercept_frame(
+      custom_bp_t::BANNER_PAYLOAD_READ, false
+    ).then([this, payload_len] {
       return frame_assembler->read(payload_len);
-    }).then([this, is_connect] (bufferlist bl) {
-      // 4. process peer banner_payload and send HelloFrame
-      auto p = bl.cbegin();
-      uint64_t _peer_supported_features;
-      uint64_t _peer_required_features;
-      try {
-        decode(_peer_supported_features, p);
-        decode(_peer_required_features, p);
-      } catch (const buffer::error &e) {
-        logger().warn("{} decode banner payload failed", conn);
-        abort_in_fault();
-      }
-      logger().debug("{} RECV({}) banner features: supported={} required={}",
-                     conn, bl.length(),
-                     _peer_supported_features, _peer_required_features);
-
-      // Check feature bit compatibility
-      uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
-      uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
-      if ((required_features & _peer_supported_features) != required_features) {
-        logger().error("{} peer does not support all required features"
-                       " required={} peer_supported={}",
-                       conn, required_features, _peer_supported_features);
-        ABORT_IN_CLOSE(is_connect);
-      }
-      if ((supported_features & _peer_required_features) != _peer_required_features) {
-        logger().error("{} we do not support all peer required features"
-                       " peer_required={} supported={}",
-                       conn, _peer_required_features, supported_features);
-        ABORT_IN_CLOSE(is_connect);
-      }
-      peer_supported_features = _peer_supported_features;
-      bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-      frame_assembler->set_is_rev1(is_rev1);
-
-      auto hello = HelloFrame::Encode(messenger.get_mytype(),
-                                      conn.target_addr);
-      logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
-                     conn, ceph_entity_type_name(messenger.get_mytype()),
-                     conn.target_addr);
-      return frame_assembler->write_flush_frame(hello);
-    }).then([this] {
-      //5. read peer HelloFrame
-      return frame_assembler->read_main_preamble();
-    }).then([this](auto ret) {
-      expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
-      return frame_assembler->read_frame_payload();
-    }).then([this](auto payload) {
-      // 6. process peer HelloFrame
-      auto hello = HelloFrame::Decode(payload->back());
-      logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
-                     conn, ceph_entity_type_name(hello.entity_type()),
-                     hello.peer_addr());
-      return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
-        std::make_tuple(hello.entity_type(), hello.peer_addr()));
     });
+#else
+    return frame_assembler->read(payload_len);
+#endif
+  }).then([this, is_connect] (bufferlist bl) {
+    // 4. process peer banner_payload and send HelloFrame
+    auto p = bl.cbegin();
+    uint64_t _peer_supported_features;
+    uint64_t _peer_required_features;
+    try {
+      decode(_peer_supported_features, p);
+      decode(_peer_required_features, p);
+    } catch (const buffer::error &e) {
+      logger().warn("{} decode banner payload failed", conn);
+      abort_in_fault();
+    }
+    logger().debug("{} RECV({}) banner features: supported={} required={}",
+                   conn, bl.length(),
+                   _peer_supported_features, _peer_required_features);
+
+    // Check feature bit compatibility
+    uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
+    uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+    if ((required_features & _peer_supported_features) != required_features) {
+      logger().error("{} peer does not support all required features"
+                     " required={} peer_supported={}",
+                     conn, required_features, _peer_supported_features);
+      ABORT_IN_CLOSE(is_connect);
+    }
+    if ((supported_features & _peer_required_features) != _peer_required_features) {
+      logger().error("{} we do not support all peer required features"
+                     " peer_required={} supported={}",
+                     conn, _peer_required_features, supported_features);
+      ABORT_IN_CLOSE(is_connect);
+    }
+    peer_supported_features = _peer_supported_features;
+    bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+    frame_assembler->set_is_rev1(is_rev1);
+
+    auto hello = HelloFrame::Encode(messenger.get_mytype(),
+                                    conn.target_addr);
+    logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+                   conn, ceph_entity_type_name(messenger.get_mytype()),
+                   conn.target_addr);
+    return frame_assembler->write_flush_frame(hello);
+  }).then([this] {
+    //5. read peer HelloFrame
+    return frame_assembler->read_main_preamble();
+  }).then([this](auto ret) {
+    expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+    return frame_assembler->read_frame_payload();
+  }).then([this](auto payload) {
+    // 6. process peer HelloFrame
+    auto hello = HelloFrame::Decode(payload->back());
+    logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
+                   conn, ceph_entity_type_name(hello.entity_type()),
+                   hello.peer_addr());
+    return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+      std::make_tuple(hello.entity_type(), hello.peer_addr()));
+  });
 }
 
 // CONNECTING state
@@ -616,8 +691,25 @@ ProtocolV2::client_connect()
       case Tag::SERVER_IDENT:
         return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} at receiving SERVER_IDENT",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
+
           // handle_server_ident() logic
-          io_handler.requeue_out_sent();
+          auto cc_seq = crosscore.prepare_submit();
+          logger().debug("{} send {} IOHandler::requeue_out_sent()",
+                         conn, cc_seq);
+          io_states.requeue_out_sent();
+          gate.dispatch_in_background(
+              "requeue_out_sent", conn, [this, cc_seq] {
+            return seastar::smp::submit_to(
+                io_handler.get_shard_id(), [this, cc_seq] {
+              return io_handler.requeue_out_sent(cc_seq);
+            });
+          });
+
           auto server_ident = ServerIdentFrame::Decode(payload->back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
@@ -693,12 +785,12 @@ ProtocolV2::client_reconnect()
                                           server_cookie,
                                           global_seq,
                                           connect_seq,
-                                          io_handler.get_in_seq());
+                                          io_states.in_seq);
   logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
                  " server_cookie={}, gs={}, cs={}, in_seq={}",
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
-                 global_seq, connect_seq, io_handler.get_in_seq());
+                 global_seq, connect_seq, io_states.in_seq);
   return frame_assembler->write_flush_frame(reconnect).then([this] {
     return frame_assembler->read_main_preamble();
   }).then([this](auto ret) {
@@ -736,7 +828,10 @@ ProtocolV2::client_reconnect()
           // handle_session_reset() logic
           auto reset = ResetFrame::Decode(payload->back());
           logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+
           reset_session(reset.full());
+          // user can make changes
+
           return client_connect();
         });
       case Tag::WAIT:
@@ -744,11 +839,29 @@ ProtocolV2::client_reconnect()
       case Tag::SESSION_RECONNECT_OK:
         return frame_assembler->read_frame_payload(
         ).then([this](auto payload) {
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} at receiving RECONNECT_OK",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
+
           // handle_reconnect_ok() logic
           auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
-          logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
-                         conn, reconnect_ok.msg_seq());
-          io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
+          auto cc_seq = crosscore.prepare_submit();
+          logger().debug("{} GOT ReconnectOkFrame: msg_seq={}, "
+                         "send {} IOHandler::requeue_out_sent_up_to()",
+                         conn, reconnect_ok.msg_seq(), cc_seq);
+
+          io_states.requeue_out_sent_up_to();
+          auto msg_seq = reconnect_ok.msg_seq();
+          gate.dispatch_in_background(
+              "requeue_out_reconnecting", conn, [this, cc_seq, msg_seq] {
+            return seastar::smp::submit_to(
+                io_handler.get_shard_id(), [this, cc_seq, msg_seq] {
+              return io_handler.requeue_out_sent_up_to(cc_seq, msg_seq);
+            });
+          });
+
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
@@ -762,148 +875,179 @@ ProtocolV2::client_reconnect()
 void ProtocolV2::execute_connecting()
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::CONNECTING, io_state_t::delay, false);
+  trigger_state(state_t::CONNECTING, io_state_t::delay);
   gated_execute("execute_connecting", conn, [this] {
-      global_seq = messenger.get_global_seq();
-      assert(client_cookie != 0);
-      if (!conn.policy.lossy && server_cookie != 0) {
-        ++connect_seq;
-        logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
-                       conn, global_seq, connect_seq);
-      } else { // conn.policy.lossy || server_cookie == 0
-        assert(connect_seq == 0);
-        assert(server_cookie == 0);
-        logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
-      }
-      return wait_exit_io().then([this] {
+    global_seq = messenger.get_global_seq();
+    assert(client_cookie != 0);
+    if (!conn.policy.lossy && server_cookie != 0) {
+      ++connect_seq;
+      logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+                     conn, global_seq, connect_seq);
+    } else { // conn.policy.lossy || server_cookie == 0
+      assert(connect_seq == 0);
+      assert(server_cookie == 0);
+      logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+    }
+    return wait_exit_io().then([this] {
 #ifdef UNIT_TESTS_BUILT
-          // process custom_bp_t::SOCKET_CONNECTING
-          // supports CONTINUE/FAULT/BLOCK
-          if (conn.interceptor) {
-            auto action = conn.interceptor->intercept(
-                conn, {custom_bp_t::SOCKET_CONNECTING});
-            switch (action) {
-            case bp_action_t::CONTINUE:
-              return seastar::now();
-            case bp_action_t::FAULT:
-              logger().info("[Test] got FAULT");
-              abort_in_fault();
-            case bp_action_t::BLOCK:
-              logger().info("[Test] got BLOCK");
-              return conn.interceptor->blocker.block();
-            default:
-              ceph_abort("unexpected action from trap");
-            }
-          } else {
-            return seastar::now();
-          }
-        }).then([this] {
-#endif
-          ceph_assert_always(frame_assembler);
-          if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} before Socket::connect()",
-                           conn, get_state_name(state));
-            abort_protocol();
-          }
-          return Socket::connect(conn.peer_addr);
-        }).then([this](SocketRef new_socket) {
-          logger().debug("{} socket connected", conn);
-          if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} during Socket::connect()",
-                           conn, get_state_name(state));
-            return new_socket->close().then([sock=std::move(new_socket)] {
-              abort_protocol();
-            });
-          }
-          if (!has_socket) {
-            frame_assembler->set_socket(std::move(new_socket));
-            has_socket = true;
-          } else {
-            gate.dispatch_in_background(
-              "replace_socket_connecting",
-              conn,
-              [this, new_socket=std::move(new_socket)]() mutable {
-                return frame_assembler->replace_shutdown_socket(std::move(new_socket));
-              }
-            );
-          }
-          is_socket_valid = true;
+      // process custom_bp_t::SOCKET_CONNECTING
+      // supports CONTINUE/FAULT/BLOCK
+      if (!conn.interceptor) {
+        return seastar::now();
+      }
+      return conn.interceptor->intercept(
+        conn, {Breakpoint{custom_bp_t::SOCKET_CONNECTING}}
+      ).then([this](bp_action_t action) {
+        switch (action) {
+        case bp_action_t::CONTINUE:
           return seastar::now();
-        }).then([this] {
-          auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          frame_assembler->reset_handlers();
-          frame_assembler->start_recording();
-          return banner_exchange(true);
-        }).then([this] (auto&& ret) {
-          auto [_peer_type, _my_addr_from_peer] = std::move(ret);
-          if (conn.get_peer_type() != _peer_type) {
-            logger().warn("{} connection peer type does not match what peer advertises {} != {}",
-                          conn, ceph_entity_type_name(conn.get_peer_type()),
-                          ceph_entity_type_name(_peer_type));
-            ABORT_IN_CLOSE(true);
-          }
-          if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} during banner_exchange(), abort",
-                           conn, get_state_name(state));
-            abort_protocol();
-          }
-          frame_assembler->learn_socket_ephemeral_port_as_connector(
-              _my_addr_from_peer.get_port());
-          if (unlikely(_my_addr_from_peer.is_legacy())) {
-            logger().warn("{} peer sent a legacy address for me: {}",
-                          conn, _my_addr_from_peer);
-            throw std::system_error(
-                make_error_code(crimson::net::error::bad_peer_address));
-          }
-          _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
-          messenger.learned_addr(_my_addr_from_peer, conn);
-          return client_auth();
-        }).then([this] {
-          if (server_cookie == 0) {
-            ceph_assert(connect_seq == 0);
-            return client_connect();
-          } else {
-            ceph_assert(connect_seq > 0);
-            return client_reconnect();
+        case bp_action_t::FAULT:
+          logger().info("[Test] got FAULT");
+          abort_in_fault();
+        case bp_action_t::BLOCK:
+          logger().info("[Test] got BLOCK");
+          return conn.interceptor->blocker.block();
+        default:
+          ceph_abort("unexpected action from trap");
+          return seastar::now();
+        }
+      });;
+    }).then([this] {
+#endif
+      ceph_assert_always(frame_assembler);
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} before Socket::connect()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      return Socket::connect(conn.peer_addr);
+    }).then([this](SocketRef _new_socket) {
+      logger().debug("{} socket connected", conn);
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} during Socket::connect()",
+                       conn, get_state_name(state));
+        return _new_socket->close().then([sock=std::move(_new_socket)] {
+          abort_protocol();
+        });
+      }
+      SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket));
+      if (!has_socket) {
+        frame_assembler->set_socket(std::move(new_socket));
+        has_socket = true;
+      } else {
+        gate.dispatch_in_background(
+          "replace_socket_connecting",
+          conn,
+          [this, new_socket=std::move(new_socket)]() mutable {
+            return frame_assembler->replace_shutdown_socket(std::move(new_socket));
           }
-        }).then([this] (next_step_t next) {
+        );
+      }
+      is_socket_valid = true;
+      return seastar::now();
+    }).then([this] {
+      auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+      frame_assembler->reset_handlers();
+      frame_assembler->start_recording();
+      return banner_exchange(true);
+    }).then([this] (auto&& ret) {
+      auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+      if (conn.get_peer_type() != _peer_type) {
+        logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+                      conn, ceph_entity_type_name(conn.get_peer_type()),
+                      ceph_entity_type_name(_peer_type));
+        ABORT_IN_CLOSE(true);
+      }
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} during banner_exchange(), abort",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      frame_assembler->learn_socket_ephemeral_port_as_connector(
+          _my_addr_from_peer.get_port());
+      if (unlikely(_my_addr_from_peer.is_legacy())) {
+        logger().warn("{} peer sent a legacy address for me: {}",
+                      conn, _my_addr_from_peer);
+        throw std::system_error(
+            make_error_code(crimson::net::error::bad_peer_address));
+      }
+      _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
+      messenger.learned_addr(_my_addr_from_peer, conn);
+      return client_auth();
+    }).then([this] {
+      if (server_cookie == 0) {
+        ceph_assert(connect_seq == 0);
+        return client_connect();
+      } else {
+        ceph_assert(connect_seq > 0);
+        return client_reconnect();
+      }
+    }).then([this] (next_step_t next) {
+      if (unlikely(state != state_t::CONNECTING)) {
+        logger().debug("{} triggered {} at the end of execute_connecting()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      switch (next) {
+       case next_step_t::ready: {
+        if (unlikely(state != state_t::CONNECTING)) {
+          logger().debug("{} triggered {} before dispatch_connect(), abort",
+                         conn, get_state_name(state));
+          abort_protocol();
+        }
+
+        auto cc_seq = crosscore.prepare_submit();
+        // there are 2 hops with dispatch_connect()
+        crosscore.prepare_submit();
+        logger().info("{} connected: gs={}, pgs={}, cs={}, "
+                      "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                      "send {} IOHandler::dispatch_connect()",
+                      conn, global_seq, peer_global_seq, connect_seq,
+                      client_cookie, server_cookie, io_states,
+                      frame_assembler->get_socket_shard_id(), cc_seq);
+
+        // set io_handler to a new shard
+        auto new_io_shard = frame_assembler->get_socket_shard_id();
+        ConnectionFRef conn_fref = seastar::make_foreign(
+            conn.shared_from_this());
+        ceph_assert_always(!pr_switch_io_shard.has_value());
+        pr_switch_io_shard = seastar::shared_promise<>();
+        return seastar::smp::submit_to(
+            io_handler.get_shard_id(),
+            [this, cc_seq, new_io_shard,
+             conn_fref=std::move(conn_fref)]() mutable {
+          return io_handler.dispatch_connect(
+              cc_seq, new_io_shard, std::move(conn_fref));
+        }).then([this, new_io_shard] {
+          ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+          pr_switch_io_shard->set_value();
+          pr_switch_io_shard = std::nullopt;
+          // user can make changes
+
           if (unlikely(state != state_t::CONNECTING)) {
-            logger().debug("{} triggered {} at the end of execute_connecting()",
+            logger().debug("{} triggered {} after dispatch_connect(), abort",
                            conn, get_state_name(state));
             abort_protocol();
           }
-          switch (next) {
-           case next_step_t::ready: {
-            logger().info("{} connected: gs={}, pgs={}, cs={}, "
-                          "client_cookie={}, server_cookie={}, {}",
-                          conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie,
-                          io_stat_printer{io_handler});
-            io_handler.dispatch_connect();
-            if (unlikely(state != state_t::CONNECTING)) {
-              logger().debug("{} triggered {} after ms_handle_connect(), abort",
-                             conn, get_state_name(state));
-              abort_protocol();
-            }
-            execute_ready();
-            break;
-           }
-           case next_step_t::wait: {
-            logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
-            ceph_assert_always(is_socket_valid);
-            frame_assembler->shutdown_socket();
-            is_socket_valid = false;
-            execute_wait(true);
-            break;
-           }
-           default: {
-            ceph_abort("impossible next step");
-           }
-          }
-        }).handle_exception([this](std::exception_ptr eptr) {
-          fault(state_t::CONNECTING, "execute_connecting", eptr);
+          execute_ready();
         });
+       }
+       case next_step_t::wait: {
+        logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+        ceph_assert_always(is_socket_valid);
+        frame_assembler->shutdown_socket<true>(&gate);
+        is_socket_valid = false;
+        execute_wait(true);
+        return seastar::now();
+       }
+       default: {
+        ceph_abort("impossible next step");
+       }
+      }
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::CONNECTING, "execute_connecting", eptr);
     });
+  });
 }
 
 // ACCEPTING state
@@ -1061,7 +1205,8 @@ ProtocolV2::reuse_connection(
   has_socket = false;
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
-    conn.interceptor->register_conn_replaced(conn);
+    conn.interceptor->register_conn_replaced(
+        conn.get_local_shared_foreign_from_this());
   }
 #endif
   // close this connection because all the necessary information is delivered
@@ -1476,91 +1621,89 @@ ProtocolV2::server_reconnect()
 void ProtocolV2::execute_accepting()
 {
   assert(is_socket_valid);
-  trigger_state(state_t::ACCEPTING, io_state_t::none, false);
+  trigger_state(state_t::ACCEPTING, io_state_t::none);
   gate.dispatch_in_background("execute_accepting", conn, [this] {
-      return seastar::futurize_invoke([this] {
+    return seastar::futurize_invoke([this] {
 #ifdef UNIT_TESTS_BUILT
-          if (conn.interceptor) {
-            auto action = conn.interceptor->intercept(
-                conn, {custom_bp_t::SOCKET_ACCEPTED});
-            switch (action) {
-            case bp_action_t::CONTINUE:
-              break;
-            case bp_action_t::FAULT:
-              logger().info("[Test] got FAULT");
-              abort_in_fault();
-            default:
-              ceph_abort("unexpected action from trap");
-            }
-          }
-#endif
-          auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          frame_assembler->reset_handlers();
-          frame_assembler->start_recording();
-          return banner_exchange(false);
-        }).then([this] (auto&& ret) {
-          auto [_peer_type, _my_addr_from_peer] = std::move(ret);
-          ceph_assert(conn.get_peer_type() == 0);
-          conn.set_peer_type(_peer_type);
-
-          conn.policy = messenger.get_policy(_peer_type);
-          logger().info("{} UPDATE: peer_type={},"
-                        " policy(lossy={} server={} standby={} resetcheck={})",
-                        conn, ceph_entity_type_name(_peer_type),
-                        conn.policy.lossy, conn.policy.server,
-                        conn.policy.standby, conn.policy.resetcheck);
-          if (!messenger.get_myaddr().is_blank_ip() &&
-              (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
-              messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
-            logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
-                          conn, _my_addr_from_peer, messenger.get_myaddr());
-            throw std::system_error(
-                make_error_code(crimson::net::error::bad_peer_address));
-          }
-          messenger.learned_addr(_my_addr_from_peer, conn);
-          return server_auth();
-        }).then([this] {
-          return frame_assembler->read_main_preamble();
-        }).then([this](auto ret) {
-          switch (ret.tag) {
-            case Tag::CLIENT_IDENT:
-              return server_connect();
-            case Tag::SESSION_RECONNECT:
-              return server_reconnect();
-            default: {
-              unexpected_tag(ret.tag, conn, "post_server_auth");
-              return seastar::make_ready_future<next_step_t>(next_step_t::none);
-            }
-          }
-        }).then([this] (next_step_t next) {
-          switch (next) {
-           case next_step_t::ready:
-            assert(state != state_t::ACCEPTING);
-            break;
-           case next_step_t::wait:
-            if (unlikely(state != state_t::ACCEPTING)) {
-              logger().debug("{} triggered {} at the end of execute_accepting()",
-                             conn, get_state_name(state));
-              abort_protocol();
-            }
-            logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
-            execute_server_wait();
-            break;
-           default:
-            ceph_abort("impossible next step");
-          }
-        }).handle_exception([this](std::exception_ptr eptr) {
-          const char *e_what;
-          try {
-            std::rethrow_exception(eptr);
-          } catch (std::exception &e) {
-            e_what = e.what();
-          }
-          logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
-                        conn, get_state_name(state), e_what);
-          do_close(false);
+      if (conn.interceptor) {
+        // only notify socket accepted
+        gate.dispatch_in_background(
+            "test_intercept_socket_accepted", conn, [this] {
+          return conn.interceptor->intercept(
+            conn, {Breakpoint{custom_bp_t::SOCKET_ACCEPTED}}
+          ).then([](bp_action_t action) {
+            ceph_assert(action == bp_action_t::CONTINUE);
+          });
         });
+      }
+#endif
+      auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
+      frame_assembler->reset_handlers();
+      frame_assembler->start_recording();
+      return banner_exchange(false);
+    }).then([this] (auto&& ret) {
+      auto [_peer_type, _my_addr_from_peer] = std::move(ret);
+      ceph_assert(conn.get_peer_type() == 0);
+      conn.set_peer_type(_peer_type);
+
+      conn.policy = messenger.get_policy(_peer_type);
+      logger().info("{} UPDATE: peer_type={},"
+                    " policy(lossy={} server={} standby={} resetcheck={})",
+                    conn, ceph_entity_type_name(_peer_type),
+                    conn.policy.lossy, conn.policy.server,
+                    conn.policy.standby, conn.policy.resetcheck);
+      if (!messenger.get_myaddr().is_blank_ip() &&
+          (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+          messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
+        logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
+                      conn, _my_addr_from_peer, messenger.get_myaddr());
+        throw std::system_error(
+            make_error_code(crimson::net::error::bad_peer_address));
+      }
+      messenger.learned_addr(_my_addr_from_peer, conn);
+      return server_auth();
+    }).then([this] {
+      return frame_assembler->read_main_preamble();
+    }).then([this](auto ret) {
+      switch (ret.tag) {
+        case Tag::CLIENT_IDENT:
+          return server_connect();
+        case Tag::SESSION_RECONNECT:
+          return server_reconnect();
+        default: {
+          unexpected_tag(ret.tag, conn, "post_server_auth");
+          return seastar::make_ready_future<next_step_t>(next_step_t::none);
+        }
+      }
+    }).then([this] (next_step_t next) {
+      switch (next) {
+       case next_step_t::ready:
+        assert(state != state_t::ACCEPTING);
+        break;
+       case next_step_t::wait:
+        if (unlikely(state != state_t::ACCEPTING)) {
+          logger().debug("{} triggered {} at the end of execute_accepting()",
+                         conn, get_state_name(state));
+          abort_protocol();
+        }
+        logger().info("{} execute_accepting(): going to SERVER_WAIT", conn);
+        execute_server_wait();
+        break;
+       default:
+        ceph_abort("impossible next step");
+      }
+    }).handle_exception([this](std::exception_ptr eptr) {
+      const char *e_what;
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        e_what = e.what();
+      }
+      logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
+                    conn, get_state_name(state), e_what);
+      do_close(false);
     });
+  });
 }
 
 // CONNECTING or ACCEPTING state
@@ -1609,10 +1752,22 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
   };
 
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
+  trigger_state(state_t::ESTABLISHING, io_state_t::delay);
+  bool is_replace;
   if (existing_conn) {
-    static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
-        true /* is_dispatch_reset */, std::move(accept_me));
+    logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+                  "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                  "close existing {}",
+                  conn, global_seq, peer_global_seq, connect_seq,
+                  client_cookie, server_cookie,
+                  io_states, frame_assembler->get_socket_shard_id(),
+                  *existing_conn);
+    is_replace = true;
+    ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+        existing_conn->protocol.get());
+    existing_proto->do_close(
+        true, // is_dispatch_reset
+        std::move(accept_me));
     if (unlikely(state != state_t::ESTABLISHING)) {
       logger().warn("{} triggered {} during execute_establishing(), "
                     "the accept event will not be delivered!",
@@ -1620,18 +1775,48 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
       abort_protocol();
     }
   } else {
+    logger().info("{} start establishing: gs={}, pgs={}, cs={}, "
+                  "client_cookie={}, server_cookie={}, {}, new_sid={}, "
+                  "no existing",
+                  conn, global_seq, peer_global_seq, connect_seq,
+                  client_cookie, server_cookie, io_states,
+                  frame_assembler->get_socket_shard_id());
+    is_replace = false;
     accept_me();
   }
 
-  io_handler.dispatch_accept();
-  if (unlikely(state != state_t::ESTABLISHING)) {
-    logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
-                   conn, get_state_name(state));
-    abort_protocol();
-  }
+  gated_execute("execute_establishing", conn, [this, is_replace] {
+    ceph_assert_always(state == state_t::ESTABLISHING);
+
+    // set io_handler to a new shard
+    auto cc_seq = crosscore.prepare_submit();
+    // there are 2 hops with dispatch_accept()
+    crosscore.prepare_submit();
+    auto new_io_shard = frame_assembler->get_socket_shard_id();
+    logger().debug("{} send {} IOHandler::dispatch_accept({})",
+                   conn, cc_seq, new_io_shard);
+    ConnectionFRef conn_fref = seastar::make_foreign(
+        conn.shared_from_this());
+    ceph_assert_always(!pr_switch_io_shard.has_value());
+    pr_switch_io_shard = seastar::shared_promise<>();
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(),
+        [this, cc_seq, new_io_shard, is_replace,
+         conn_fref=std::move(conn_fref)]() mutable {
+      return io_handler.dispatch_accept(
+          cc_seq, new_io_shard, std::move(conn_fref), is_replace);
+    }).then([this, new_io_shard] {
+      ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+      pr_switch_io_shard->set_value();
+      pr_switch_io_shard = std::nullopt;
+      // user can make changes
+
+      if (unlikely(state != state_t::ESTABLISHING)) {
+        logger().debug("{} triggered {} after dispatch_accept() during execute_establishing()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
 
-  gated_execute("execute_establishing", conn, [this] {
-    return seastar::futurize_invoke([this] {
       return send_server_ident();
     }).then([this] {
       if (unlikely(state != state_t::ESTABLISHING)) {
@@ -1639,11 +1824,7 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
                        conn, get_state_name(state));
         abort_protocol();
       }
-      logger().info("{} established: gs={}, pgs={}, cs={}, "
-                    "client_cookie={}, server_cookie={}, {}",
-                    conn, global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie,
-                    io_stat_printer{io_handler});
+      logger().info("{} established, going to ready", conn);
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::ESTABLISHING, "execute_establishing", eptr);
@@ -1656,15 +1837,26 @@ void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
 seastar::future<>
 ProtocolV2::send_server_ident()
 {
+  ceph_assert_always(state == state_t::ESTABLISHING ||
+                     state == state_t::REPLACING);
   // send_server_ident() logic
 
   // refered to async-conn v2: not assign gs to global_seq
   global_seq = messenger.get_global_seq();
-  logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+  auto cc_seq = crosscore.prepare_submit();
+  logger().debug("{} UPDATE: gs={} for server ident, "
+                 "send {} IOHandler::reset_peer_state()",
+                 conn, global_seq, cc_seq);
 
   // this is required for the case when this connection is being replaced
-  io_handler.requeue_out_sent_up_to(0);
-  io_handler.reset_session(false);
+  io_states.reset_peer_state();
+  gate.dispatch_in_background(
+      "reset_peer_state", conn, [this, cc_seq] {
+    return seastar::smp::submit_to(
+        io_handler.get_shard_id(), [this, cc_seq] {
+      return io_handler.reset_peer_state(cc_seq);
+    });
+  });
 
   if (!conn.policy.lossy) {
     server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
@@ -1709,13 +1901,21 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_connect_seq,
                                    uint64_t new_msg_seq)
 {
+  ceph_assert_always(state >= state_t::ESTABLISHING);
+  ceph_assert_always(state <= state_t::WAIT);
   ceph_assert_always(has_socket || state == state_t::CONNECTING);
-  ceph_assert_always(!mover.socket->is_shutdown());
-  trigger_state(state_t::REPLACING, io_state_t::delay, false);
+  // mover.socket shouldn't be shutdown
+
+  logger().info("{} start replacing ({}): pgs was {}, cs was {}, "
+                "client_cookie was {}, {}, new_sid={}",
+                conn, reconnect ? "reconnected" : "connected",
+                peer_global_seq, connect_seq, client_cookie,
+                io_states, mover.socket->get_shard_id());
   if (is_socket_valid) {
-    frame_assembler->shutdown_socket();
+    frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
   }
+  trigger_state_phase1(state_t::REPLACING);
   gate.dispatch_in_background(
       "trigger_replacing",
       conn,
@@ -1729,15 +1929,60 @@ void ProtocolV2::trigger_replacing(bool reconnect,
        new_peer_global_seq,
        new_connect_seq, new_msg_seq] () mutable {
     ceph_assert_always(state == state_t::REPLACING);
-    io_handler.dispatch_accept();
-    // state may become CLOSING, close mover.socket and abort later
-    return wait_exit_io(
+    auto new_io_shard = mover.socket->get_shard_id();
+    // state may become CLOSING below, but we cannot abort the chain until
+    // mover.socket is correctly handled (closed or replaced).
+
+    // this is preemptive
+    return wait_switch_io_shard(
     ).then([this] {
+      if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
+        return seastar::now();
+      }
+
+      trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
+      return wait_exit_io();
+    }).then([this] {
+      if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
+        return seastar::now();
+      }
+
       ceph_assert_always(frame_assembler);
       protocol_timer.cancel();
       auto done = std::move(execution_done);
       execution_done = seastar::now();
       return done;
+    }).then([this, new_io_shard] {
+      if (unlikely(state != state_t::REPLACING)) {
+        ceph_assert_always(state == state_t::CLOSING);
+        return seastar::now();
+      }
+
+      // set io_handler to a new shard
+      // we should prevent parallel switching core attemps
+      auto cc_seq = crosscore.prepare_submit();
+      // there are 2 hops with dispatch_accept()
+      crosscore.prepare_submit();
+      logger().debug("{} send {} IOHandler::dispatch_accept({})",
+                     conn, cc_seq, new_io_shard);
+      ConnectionFRef conn_fref = seastar::make_foreign(
+          conn.shared_from_this());
+      ceph_assert_always(!pr_switch_io_shard.has_value());
+      pr_switch_io_shard = seastar::shared_promise<>();
+      return seastar::smp::submit_to(
+          io_handler.get_shard_id(),
+          [this, cc_seq, new_io_shard,
+           conn_fref=std::move(conn_fref)]() mutable {
+        return io_handler.dispatch_accept(
+            cc_seq, new_io_shard, std::move(conn_fref), false);
+      }).then([this, new_io_shard] {
+        ceph_assert_always(io_handler.get_shard_id() == new_io_shard);
+        pr_switch_io_shard->set_value();
+        pr_switch_io_shard = std::nullopt;
+        // user can make changes
+      });
     }).then([this,
              reconnect,
              do_reset,
@@ -1749,9 +1994,13 @@ void ProtocolV2::trigger_replacing(bool reconnect,
              new_connect_seq, new_msg_seq] () mutable {
       if (state == state_t::REPLACING && do_reset) {
         reset_session(true);
+        // user can make changes
       }
 
       if (unlikely(state != state_t::REPLACING)) {
+        logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
+                       conn, get_state_name(state));
+        ceph_assert_always(state == state_t::CLOSING);
         return mover.socket->close(
         ).then([sock = std::move(mover.socket)] {
           abort_protocol();
@@ -1773,9 +2022,21 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       if (reconnect) {
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
-        io_handler.requeue_out_sent_up_to(new_msg_seq);
-        auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
-        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
+
+        auto cc_seq = crosscore.prepare_submit();
+        logger().debug("{} send {} IOHandler::requeue_out_sent_up_to({})",
+                       conn, cc_seq, new_msg_seq);
+        io_states.requeue_out_sent_up_to();
+        gate.dispatch_in_background(
+            "requeue_out_replacing", conn, [this, cc_seq, new_msg_seq] {
+          return seastar::smp::submit_to(
+              io_handler.get_shard_id(), [this, cc_seq, new_msg_seq] {
+            return io_handler.requeue_out_sent_up_to(cc_seq, new_msg_seq);
+          });
+        });
+
+        auto reconnect_ok = ReconnectOkFrame::Encode(io_states.in_seq);
+        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_states.in_seq);
         return frame_assembler->write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
@@ -1791,16 +2052,17 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       }
     }).then([this, reconnect] {
       if (unlikely(state != state_t::REPLACING)) {
-        logger().debug("{} triggered {} at the end of trigger_replacing()",
+        logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
                        conn, get_state_name(state));
+        ceph_assert_always(state == state_t::CLOSING);
         abort_protocol();
       }
-      logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+      logger().info("{} replaced ({}), going to ready: "
+                    "gs={}, pgs={}, cs={}, "
                     "client_cookie={}, server_cookie={}, {}",
                     conn, reconnect ? "reconnected" : "connected",
                     global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie,
-                    io_stat_printer{io_handler});
+                    client_cookie, server_cookie, io_states);
       execute_ready();
     }).handle_exception([this](std::exception_ptr eptr) {
       fault(state_t::REPLACING, "trigger_replacing", eptr);
@@ -1810,9 +2072,27 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 
 // READY state
 
-void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
+seastar::future<> ProtocolV2::notify_out_fault(
+    crosscore_t::seq_t cc_seq,
+    const char *where,
+    std::exception_ptr eptr,
+    io_handler_state _io_states)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} notify_out_fault(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, where, eptr, _io_states] {
+      return notify_out_fault(cc_seq, where, eptr, _io_states);
+    });
+  }
+
+  io_states = _io_states;
+  logger().debug("{} got {} notify_out_fault(): io_states={}",
+                 conn, cc_seq, io_states);
   fault(state_t::READY, where, eptr);
+  return seastar::now();
 }
 
 void ProtocolV2::execute_ready()
@@ -1820,7 +2100,16 @@ void ProtocolV2::execute_ready()
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   protocol_timer.cancel();
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::READY, io_state_t::open, false);
+  // I'm not responsible to shutdown the socket at READY
+  is_socket_valid = false;
+  trigger_state(state_t::READY, io_state_t::open);
+#ifdef UNIT_TESTS_BUILT
+  if (conn.interceptor) {
+    // FIXME: doesn't support cross-core
+    conn.interceptor->register_conn_ready(
+        conn.get_local_shared_foreign_from_this());
+  }
+#endif
 }
 
 // STANDBY state
@@ -1828,16 +2117,31 @@ void ProtocolV2::execute_ready()
 void ProtocolV2::execute_standby()
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::STANDBY, io_state_t::delay, false);
+  trigger_state(state_t::STANDBY, io_state_t::delay);
 }
 
-void ProtocolV2::notify_out()
+seastar::future<> ProtocolV2::notify_out(
+    crosscore_t::seq_t cc_seq)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} notify_out(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return notify_out(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} notify_out(): at {}",
+                 conn, cc_seq, get_state_name(state));
+  io_states.is_out_queued = true;
   if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
     logger().info("{} notify_out(): at {}, going to CONNECTING",
                   conn, get_state_name(state));
     execute_connecting();
   }
+  return seastar::now();
 }
 
 // WAIT state
@@ -1845,7 +2149,7 @@ void ProtocolV2::notify_out()
 void ProtocolV2::execute_wait(bool max_backoff)
 {
   ceph_assert_always(!is_socket_valid);
-  trigger_state(state_t::WAIT, io_state_t::delay, false);
+  trigger_state(state_t::WAIT, io_state_t::delay);
   gated_execute("execute_wait", conn, [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
@@ -1883,10 +2187,10 @@ void ProtocolV2::execute_wait(bool max_backoff)
 void ProtocolV2::execute_server_wait()
 {
   ceph_assert_always(is_socket_valid);
-  trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
+  trigger_state(state_t::SERVER_WAIT, io_state_t::none);
   gated_execute("execute_server_wait", conn, [this] {
     return frame_assembler->read_exactly(1
-    ).then([this](auto bl) {
+    ).then([this](auto bptr) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
     }).handle_exception([this](std::exception_ptr eptr) {
@@ -1905,9 +2209,23 @@ void ProtocolV2::execute_server_wait()
 
 // CLOSING state
 
-void ProtocolV2::notify_mark_down()
+seastar::future<> ProtocolV2::notify_mark_down(
+    crosscore_t::seq_t cc_seq)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} notify_mark_down(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return notify_mark_down(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} notify_mark_down()",
+                 conn, cc_seq);
   do_close(false);
+  return seastar::now();
 }
 
 seastar::future<> ProtocolV2::close_clean_yielded()
@@ -1918,22 +2236,21 @@ seastar::future<> ProtocolV2::close_clean_yielded()
   // the container when seastar::parallel_for_each() is still iterating in it.
   // that'd lead to a segfault.
   return seastar::yield(
-  ).then([this, conn_ref = conn.shared_from_this()] {
+  ).then([this] {
     do_close(false);
-    // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
-    // which will otherwise result in deadlock
-    assert(closed_clean_fut.valid());
-    return closed_clean_fut.get_future();
-  });
+    return pr_closed_clean.get_shared_future();
+
+  // connection may be unreferenced from the messenger,
+  // so need to hold the additional reference.
+  }).finally([conn_ref = conn.shared_from_this()] {});;
 }
 
 void ProtocolV2::do_close(
     bool is_dispatch_reset,
     std::optional<std::function<void()>> f_accept_new)
 {
-  if (closed) {
+  if (state == state_t::CLOSING) {
     // already closing
-    assert(state == state_t::CLOSING);
     return;
   }
 
@@ -1946,9 +2263,9 @@ void ProtocolV2::do_close(
    * atomic operations
    */
 
-  closed = true;
+  ceph_assert_always(!gate.is_closed());
 
-  // trigger close
+  // messenger registrations, must before user events
   messenger.closing_conn(
       seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
@@ -1964,48 +2281,67 @@ void ProtocolV2::do_close(
     // cannot happen
     ceph_assert(false);
   }
-  protocol_timer.cancel();
-  trigger_state(state_t::CLOSING, io_state_t::drop, false);
-
   if (f_accept_new) {
+    // the replacing connection must be registerred after the replaced
+    // connection is unreigsterred.
     (*f_accept_new)();
   }
+
+  protocol_timer.cancel();
   if (is_socket_valid) {
-    frame_assembler->shutdown_socket();
+    frame_assembler->shutdown_socket<true>(&gate);
     is_socket_valid = false;
   }
-  assert(!gate.is_closed());
-  auto handshake_closed = gate.close();
-  auto io_closed = io_handler.close_io(
-      is_dispatch_reset, is_replace);
-
-  // asynchronous operations
-  assert(!closed_clean_fut.valid());
-  closed_clean_fut = seastar::when_all(
-      std::move(handshake_closed), std::move(io_closed)
-  ).discard_result().then([this] {
-    ceph_assert_always(!exit_io.has_value());
-    if (has_socket) {
-      ceph_assert_always(frame_assembler);
-      return frame_assembler->close_shutdown_socket();
-    } else {
-      return seastar::now();
-    }
-  }).then([this] {
-    logger().debug("{} closed!", conn);
-    messenger.closed_conn(
-        seastar::static_pointer_cast<SocketConnection>(
-          conn.shared_from_this()));
+
+  trigger_state_phase1(state_t::CLOSING);
+  gate.dispatch_in_background(
+      "close_io", conn, [this, is_dispatch_reset, is_replace] {
+    // this is preemptive
+    return wait_switch_io_shard(
+    ).then([this, is_dispatch_reset, is_replace] {
+      trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
+      auto cc_seq = crosscore.prepare_submit();
+      logger().debug("{} send {} IOHandler::close_io(reset={}, replace={})",
+                     conn, cc_seq, is_dispatch_reset, is_replace);
+
+      std::ignore = gate.close(
+      ).then([this] {
+        ceph_assert_always(!need_exit_io);
+        ceph_assert_always(!pr_exit_io.has_value());
+        if (has_socket) {
+          ceph_assert_always(frame_assembler);
+          return frame_assembler->close_shutdown_socket();
+        } else {
+          return seastar::now();
+        }
+      }).then([this] {
+        logger().debug("{} closed!", conn);
+        messenger.closed_conn(
+            seastar::static_pointer_cast<SocketConnection>(
+              conn.shared_from_this()));
+        pr_closed_clean.set_value();
 #ifdef UNIT_TESTS_BUILT
-    closed_clean = true;
-    if (conn.interceptor) {
-      conn.interceptor->register_conn_closed(conn);
-    }
+        closed_clean = true;
+        if (conn.interceptor) {
+          conn.interceptor->register_conn_closed(
+              conn.get_local_shared_foreign_from_this());
+        }
 #endif
-  }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
-    logger().error("{} closing: closed_clean_fut got unexpected exception {}",
-                   conn, eptr);
-    ceph_abort();
+      // connection is unreferenced from the messenger,
+      // so need to hold the additional reference.
+      }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+        logger().error("{} closing got unexpected exception {}",
+                       conn, eptr);
+        ceph_abort();
+      });
+
+      return seastar::smp::submit_to(
+          io_handler.get_shard_id(),
+          [this, cc_seq, is_dispatch_reset, is_replace] {
+        return io_handler.close_io(cc_seq, is_dispatch_reset, is_replace);
+      });
+      // user can make changes
+    });
   });
 }