]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/io_handler.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / io_handler.cc
index 80d578363282e8c4b364cfd7d694328e5be5c10c..c414c48e12f8e89f63d5fcb8a736bc4d86e587f9 100644 (file)
@@ -47,18 +47,28 @@ namespace crimson::net {
 
 IOHandler::IOHandler(ChainedDispatchers &dispatchers,
                      SocketConnection &conn)
-  : dispatchers(dispatchers),
+  : shard_states(shard_states_t::create(
+        seastar::this_shard_id(), io_state_t::none)),
+    dispatchers(dispatchers),
     conn(conn),
     conn_ref(conn.get_local_shared_foreign_from_this())
 {}
 
 IOHandler::~IOHandler()
 {
-  ceph_assert(gate.is_closed());
-  assert(!out_exit_dispatching);
+  // close_io() must be finished
+  ceph_assert_always(maybe_prv_shard_states == nullptr);
+  // should be true in the according shard
+  // ceph_assert_always(shard_states->assert_closed_and_exit());
+  assert(!conn_ref);
 }
 
-ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
+#ifdef UNIT_TESTS_BUILT
+IOHandler::sweep_ret
+#else
+ceph::bufferlist
+#endif
+IOHandler::sweep_out_pending_msgs_to_sent(
   bool require_keepalive,
   std::optional<utime_t> maybe_keepalive_ack,
   bool require_ack)
@@ -66,25 +76,45 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
   std::size_t num_msgs = out_pending_msgs.size();
   ceph::bufferlist bl;
 
+#ifdef UNIT_TESTS_BUILT
+  std::vector<Tag> tags;
+#endif
+
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
     bl.append(frame_assembler->get_buffer(keepalive_frame));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = KeepAliveFrame::tag;
+    tags.push_back(tag);
+#endif
   }
 
   if (unlikely(maybe_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
     bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = KeepAliveFrameAck::tag;
+    tags.push_back(tag);
+#endif
   }
 
   if (require_ack && num_msgs == 0u) {
-    auto ack_frame = AckFrame::Encode(get_in_seq());
+    auto ack_frame = AckFrame::Encode(in_seq);
     bl.append(frame_assembler->get_buffer(ack_frame));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = AckFrame::tag;
+    tags.push_back(tag);
+#endif
   }
 
   std::for_each(
       out_pending_msgs.begin(),
       out_pending_msgs.begin()+num_msgs,
-      [this, &bl](const MessageURef& msg) {
+      [this, &bl
+#ifdef UNIT_TESTS_BUILT
+        , &tags
+#endif
+      ](const MessageFRef& msg) {
     // set priority
     msg->get_header().src = conn.messenger.get_myname();
 
@@ -100,7 +130,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
                              header.type,       header.priority,
                              header.version,
                              ceph_le32(0),      header.data_off,
-                             ceph_le64(get_in_seq()),
+                             ceph_le64(in_seq),
                              footer.flags,      header.compat_version,
                              header.reserved};
 
@@ -109,6 +139,10 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
     bl.append(frame_assembler->get_buffer(message));
+#ifdef UNIT_TESTS_BUILT
+    auto tag = MessageFrame::tag;
+    tags.push_back(tag);
+#endif
   });
 
   if (!conn.policy.lossy) {
@@ -118,12 +152,49 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
         std::make_move_iterator(out_pending_msgs.end()));
   }
   out_pending_msgs.clear();
+
+#ifdef UNIT_TESTS_BUILT
+  return sweep_ret{std::move(bl), tags};
+#else
   return bl;
+#endif
 }
 
-seastar::future<> IOHandler::send(MessageURef msg)
+seastar::future<> IOHandler::send(MessageFRef msg)
 {
-  if (io_state != io_state_t::drop) {
+  // sid may be changed on-the-fly during the submission
+  if (seastar::this_shard_id() == get_shard_id()) {
+    return do_send(std::move(msg));
+  } else {
+    logger().trace("{} send() is directed to {} -- {}",
+                   conn, get_shard_id(), *msg);
+    return seastar::smp::submit_to(
+        get_shard_id(), [this, msg=std::move(msg)]() mutable {
+      return send_redirected(std::move(msg));
+    });
+  }
+}
+
+seastar::future<> IOHandler::send_redirected(MessageFRef msg)
+{
+  // sid may be changed on-the-fly during the submission
+  if (seastar::this_shard_id() == get_shard_id()) {
+    return do_send(std::move(msg));
+  } else {
+    logger().debug("{} send() is redirected to {} -- {}",
+                   conn, get_shard_id(), *msg);
+    return seastar::smp::submit_to(
+        get_shard_id(), [this, msg=std::move(msg)]() mutable {
+      return send_redirected(std::move(msg));
+    });
+  }
+}
+
+seastar::future<> IOHandler::do_send(MessageFRef msg)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  logger().trace("{} do_send() got message -- {}", conn, *msg);
+  if (get_io_state() != io_state_t::drop) {
     out_pending_msgs.push_back(std::move(msg));
     notify_out_dispatch();
   }
@@ -132,6 +203,36 @@ seastar::future<> IOHandler::send(MessageURef msg)
 
 seastar::future<> IOHandler::send_keepalive()
 {
+  // sid may be changed on-the-fly during the submission
+  if (seastar::this_shard_id() == get_shard_id()) {
+    return do_send_keepalive();
+  } else {
+    logger().trace("{} send_keepalive() is directed to {}", conn, get_shard_id());
+    return seastar::smp::submit_to(
+        get_shard_id(), [this] {
+      return send_keepalive_redirected();
+    });
+  }
+}
+
+seastar::future<> IOHandler::send_keepalive_redirected()
+{
+  // sid may be changed on-the-fly during the submission
+  if (seastar::this_shard_id() == get_shard_id()) {
+    return do_send_keepalive();
+  } else {
+    logger().debug("{} send_keepalive() is redirected to {}", conn, get_shard_id());
+    return seastar::smp::submit_to(
+        get_shard_id(), [this] {
+      return send_keepalive_redirected();
+    });
+  }
+}
+
+seastar::future<> IOHandler::do_send_keepalive()
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  logger().trace("{} do_send_keeplive(): need_keepalive={}", conn, need_keepalive);
   if (!need_keepalive) {
     need_keepalive = true;
     notify_out_dispatch();
@@ -141,22 +242,31 @@ seastar::future<> IOHandler::send_keepalive()
 
 void IOHandler::mark_down()
 {
-  ceph_assert_always(io_state != io_state_t::none);
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  ceph_assert_always(get_io_state() != io_state_t::none);
   need_dispatch_reset = false;
-  if (io_state == io_state_t::drop) {
+  if (get_io_state() == io_state_t::drop) {
     return;
   }
 
-  logger().info("{} mark_down() with {}",
-                conn, io_stat_printer{*this});
-  set_io_state(io_state_t::drop);
-  handshake_listener->notify_mark_down();
+  auto cc_seq = crosscore.prepare_submit();
+  logger().info("{} mark_down() at {}, send {} notify_mark_down()",
+                conn, io_stat_printer{*this}, cc_seq);
+  do_set_io_state(io_state_t::drop);
+  shard_states->dispatch_in_background(
+      "notify_mark_down", conn, [this, cc_seq] {
+    return seastar::smp::submit_to(
+        conn.get_messenger_shard_id(), [this, cc_seq] {
+      return handshake_listener->notify_mark_down(cc_seq);
+    });
+  });
 }
 
 void IOHandler::print_io_stat(std::ostream &out) const
 {
+  assert(seastar::this_shard_id() == get_shard_id());
   out << "io_stat("
-      << "io_state=" << fmt::format("{}", io_state)
+      << "io_state=" << fmt::format("{}", get_io_state())
       << ", in_seq=" << in_seq
       << ", out_seq=" << out_seq
       << ", out_pending_msgs_size=" << out_pending_msgs.size()
@@ -167,49 +277,80 @@ void IOHandler::print_io_stat(std::ostream &out) const
       << ")";
 }
 
-void IOHandler::set_io_state(
-    const IOHandler::io_state_t &new_state,
-    FrameAssemblerV2Ref fa)
+void IOHandler::assign_frame_assembler(FrameAssemblerV2Ref fa)
+{
+  assert(fa != nullptr);
+  ceph_assert_always(frame_assembler == nullptr);
+  frame_assembler = std::move(fa);
+  ceph_assert_always(
+      frame_assembler->get_shard_id() == get_shard_id());
+  // should have been set through dispatch_accept/connect()
+  ceph_assert_always(
+      frame_assembler->get_socket_shard_id() == get_shard_id());
+  ceph_assert_always(frame_assembler->is_socket_valid());
+}
+
+void IOHandler::do_set_io_state(
+    io_state_t new_state,
+    std::optional<crosscore_t::seq_t> cc_seq,
+    FrameAssemblerV2Ref fa,
+    bool set_notify_out)
 {
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  auto prv_state = get_io_state();
+  logger().debug("{} got {}do_set_io_state(): prv_state={}, new_state={}, "
+                 "fa={}, set_notify_out={}, at {}",
+                 conn,
+                 cc_seq.has_value() ? fmt::format("{} ", *cc_seq) : "",
+                 prv_state, new_state,
+                 fa ? "present" : "N/A", set_notify_out,
+                 io_stat_printer{*this});
   ceph_assert_always(!(
-    (new_state == io_state_t::none && io_state != io_state_t::none) ||
-    (new_state == io_state_t::open && io_state == io_state_t::open) ||
-    (new_state != io_state_t::drop && io_state == io_state_t::drop)
+    (new_state == io_state_t::none && prv_state != io_state_t::none) ||
+    (new_state == io_state_t::open && prv_state == io_state_t::open)
   ));
 
+  if (prv_state == io_state_t::drop) {
+    // only possible due to a racing mark_down() from user
+    if (new_state == io_state_t::open) {
+      assign_frame_assembler(std::move(fa));
+      frame_assembler->shutdown_socket<false>(nullptr);
+    } else {
+      assert(fa == nullptr);
+    }
+    return;
+  }
+
   bool dispatch_in = false;
   if (new_state == io_state_t::open) {
     // to open
     ceph_assert_always(protocol_is_connected == true);
-    assert(fa != nullptr);
-    ceph_assert_always(frame_assembler == nullptr);
-    frame_assembler = std::move(fa);
-    ceph_assert_always(frame_assembler->is_socket_valid());
+    assign_frame_assembler(std::move(fa));
     dispatch_in = true;
-#ifdef UNIT_TESTS_BUILT
-    if (conn.interceptor) {
-      conn.interceptor->register_conn_ready(conn);
-    }
-#endif
-  } else if (io_state == io_state_t::open) {
+  } else if (prv_state == io_state_t::open) {
     // from open
     ceph_assert_always(protocol_is_connected == true);
     protocol_is_connected = false;
     assert(fa == nullptr);
     ceph_assert_always(frame_assembler->is_socket_valid());
-    frame_assembler->shutdown_socket();
-    if (out_dispatching) {
-      ceph_assert_always(!out_exit_dispatching.has_value());
-      out_exit_dispatching = seastar::promise<>();
-    }
+    frame_assembler->shutdown_socket<false>(nullptr);
   } else {
     assert(fa == nullptr);
   }
 
-  if (io_state != new_state) {
-    io_state = new_state;
-    io_state_changed.set_value();
-    io_state_changed = seastar::promise<>();
+  if (new_state == io_state_t::delay) {
+    need_notify_out = set_notify_out;
+    if (need_notify_out) {
+      maybe_notify_out_dispatch();
+    }
+  } else {
+    assert(set_notify_out == false);
+    need_notify_out = false;
+  }
+
+  // FIXME: simplify and drop the prv_state == new_state case
+  if (prv_state != new_state) {
+    shard_states->set_io_state(new_state);
   }
 
   /*
@@ -221,44 +362,141 @@ void IOHandler::set_io_state(
   }
 }
 
-seastar::future<FrameAssemblerV2Ref> IOHandler::wait_io_exit_dispatching()
+seastar::future<> IOHandler::set_io_state(
+    crosscore_t::seq_t cc_seq,
+    io_state_t new_state,
+    FrameAssemblerV2Ref fa,
+    bool set_notify_out)
 {
-  ceph_assert_always(io_state != io_state_t::open);
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} set_io_state(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, new_state,
+            fa=std::move(fa), set_notify_out]() mutable {
+      return set_io_state(cc_seq, new_state, std::move(fa), set_notify_out);
+    });
+  }
+
+  do_set_io_state(new_state, cc_seq, std::move(fa), set_notify_out);
+  return seastar::now();
+}
+
+seastar::future<IOHandler::exit_dispatching_ret>
+IOHandler::wait_io_exit_dispatching(
+    crosscore_t::seq_t cc_seq)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} wait_io_exit_dispatching(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return wait_io_exit_dispatching(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} wait_io_exit_dispatching()",
+                 conn, cc_seq);
+  ceph_assert_always(get_io_state() != io_state_t::open);
   ceph_assert_always(frame_assembler != nullptr);
   ceph_assert_always(!frame_assembler->is_socket_valid());
-  return seastar::when_all(
-    [this] {
-      if (out_exit_dispatching) {
-        return out_exit_dispatching->get_future();
-      } else {
-        return seastar::now();
-      }
-    }(),
-    [this] {
-      if (in_exit_dispatching) {
-        return in_exit_dispatching->get_future();
-      } else {
-        return seastar::now();
-      }
-    }()
-  ).discard_result().then([this] {
-    return std::move(frame_assembler);
+  return seastar::futurize_invoke([this] {
+    // cannot be running in parallel with to_new_sid()
+    if (maybe_dropped_sid.has_value()) {
+      ceph_assert_always(get_io_state() == io_state_t::drop);
+      assert(shard_states->assert_closed_and_exit());
+      auto prv_sid = *maybe_dropped_sid;
+      return seastar::smp::submit_to(prv_sid, [this] {
+        logger().debug("{} got wait_io_exit_dispatching from prv_sid", conn);
+        assert(maybe_prv_shard_states != nullptr);
+        return maybe_prv_shard_states->wait_io_exit_dispatching();
+      });
+    } else {
+      return shard_states->wait_io_exit_dispatching();
+    }
+  }).then([this] {
+    logger().debug("{} finish wait_io_exit_dispatching at {}",
+                   conn, io_stat_printer{*this});
+    ceph_assert_always(frame_assembler != nullptr);
+    ceph_assert_always(!frame_assembler->is_socket_valid());
+    frame_assembler->set_shard_id(conn.get_messenger_shard_id());
+    return exit_dispatching_ret{
+      std::move(frame_assembler),
+      get_states()};
   });
 }
 
-void IOHandler::reset_session(bool full)
+seastar::future<> IOHandler::reset_session(
+    crosscore_t::seq_t cc_seq,
+    bool full)
 {
-  // reset in
-  in_seq = 0;
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} reset_session(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, full] {
+      return reset_session(cc_seq, full);
+    });
+  }
+
+  logger().debug("{} got {} reset_session({})",
+                 conn, cc_seq, full);
+  assert(get_io_state() != io_state_t::open);
+  reset_in();
   if (full) {
     reset_out();
     dispatch_remote_reset();
   }
+  return seastar::now();
 }
 
-void IOHandler::requeue_out_sent()
+seastar::future<> IOHandler::reset_peer_state(
+    crosscore_t::seq_t cc_seq)
 {
-  assert(io_state != io_state_t::open);
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} reset_peer_state(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return reset_peer_state(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} reset_peer_state()",
+                 conn, cc_seq);
+  assert(get_io_state() != io_state_t::open);
+  reset_in();
+  do_requeue_out_sent_up_to(0);
+  discard_out_sent();
+  return seastar::now();
+}
+
+seastar::future<> IOHandler::requeue_out_sent(
+    crosscore_t::seq_t cc_seq)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} requeue_out_sent(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq] {
+      return requeue_out_sent(cc_seq);
+    });
+  }
+
+  logger().debug("{} got {} requeue_out_sent()",
+                 conn, cc_seq);
+  do_requeue_out_sent();
+  return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent()
+{
+  assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty()) {
     return;
   }
@@ -266,7 +504,7 @@ void IOHandler::requeue_out_sent()
   out_seq -= out_sent_msgs.size();
   logger().debug("{} requeue {} items, revert out_seq to {}",
                  conn, out_sent_msgs.size(), out_seq);
-  for (MessageURef& msg : out_sent_msgs) {
+  for (MessageFRef& msg : out_sent_msgs) {
     msg->clear_payload();
     msg->set_seq(0);
   }
@@ -275,12 +513,32 @@ void IOHandler::requeue_out_sent()
       std::make_move_iterator(out_sent_msgs.begin()),
       std::make_move_iterator(out_sent_msgs.end()));
   out_sent_msgs.clear();
-  notify_out_dispatch();
+  maybe_notify_out_dispatch();
 }
 
-void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
+seastar::future<> IOHandler::requeue_out_sent_up_to(
+    crosscore_t::seq_t cc_seq,
+    seq_num_t msg_seq)
 {
-  assert(io_state != io_state_t::open);
+  assert(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} requeue_out_sent_up_to(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, msg_seq] {
+      return requeue_out_sent_up_to(cc_seq, msg_seq);
+    });
+  }
+
+  logger().debug("{} got {} requeue_out_sent_up_to({})",
+                 conn, cc_seq, msg_seq);
+  do_requeue_out_sent_up_to(msg_seq);
+  return seastar::now();
+}
+
+void IOHandler::do_requeue_out_sent_up_to(seq_num_t seq)
+{
+  assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
                    conn, out_seq, seq);
@@ -297,57 +555,233 @@ void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
       out_sent_msgs.pop_front();
     }
   }
-  requeue_out_sent();
+  do_requeue_out_sent();
+}
+
+void IOHandler::reset_in()
+{
+  assert(get_io_state() != io_state_t::open);
+  in_seq = 0;
 }
 
 void IOHandler::reset_out()
 {
-  assert(io_state != io_state_t::open);
-  out_seq = 0;
+  assert(get_io_state() != io_state_t::open);
+  discard_out_sent();
   out_pending_msgs.clear();
-  out_sent_msgs.clear();
   need_keepalive = false;
   next_keepalive_ack = std::nullopt;
   ack_left = 0;
 }
 
-void IOHandler::dispatch_accept()
+void IOHandler::discard_out_sent()
 {
-  if (io_state == io_state_t::drop) {
-    return;
-  }
-  // protocol_is_connected can be from true to true here if the replacing is
-  // happening to a connected connection.
-  protocol_is_connected = true;
-  dispatchers.ms_handle_accept(conn_ref);
+  assert(get_io_state() != io_state_t::open);
+  out_seq = 0;
+  out_sent_msgs.clear();
 }
 
-void IOHandler::dispatch_connect()
+seastar::future<>
+IOHandler::dispatch_accept(
+    crosscore_t::seq_t cc_seq,
+    seastar::shard_id new_sid,
+    ConnectionFRef conn_fref,
+    bool is_replace)
 {
-  if (io_state == io_state_t::drop) {
-    return;
+  return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+}
+
+seastar::future<>
+IOHandler::dispatch_connect(
+    crosscore_t::seq_t cc_seq,
+    seastar::shard_id new_sid,
+    ConnectionFRef conn_fref)
+{
+  return to_new_sid(cc_seq, new_sid, std::move(conn_fref), std::nullopt);
+}
+
+seastar::future<>
+IOHandler::cleanup_prv_shard(seastar::shard_id prv_sid)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  return seastar::smp::submit_to(prv_sid, [this] {
+    logger().debug("{} got cleanup_prv_shard()", conn);
+    assert(maybe_prv_shard_states != nullptr);
+    auto ref_prv_states = std::move(maybe_prv_shard_states);
+    auto &prv_states = *ref_prv_states;
+    return prv_states.close(
+    ).then([ref_prv_states=std::move(ref_prv_states)] {
+      ceph_assert_always(ref_prv_states->assert_closed_and_exit());
+    });
+  }).then([this] {
+    ceph_assert_always(maybe_prv_shard_states == nullptr);
+  });
+}
+
+seastar::future<>
+IOHandler::to_new_sid(
+    crosscore_t::seq_t cc_seq,
+    seastar::shard_id new_sid,
+    ConnectionFRef conn_fref,
+    std::optional<bool> is_replace)
+{
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} to_new_sid(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, new_sid, is_replace,
+            conn_fref=std::move(conn_fref)]() mutable {
+      return to_new_sid(cc_seq, new_sid, std::move(conn_fref), is_replace);
+    });
   }
-  ceph_assert_always(protocol_is_connected == false);
-  protocol_is_connected = true;
-  dispatchers.ms_handle_connect(conn_ref);
+
+  bool is_accept_or_connect = is_replace.has_value();
+  logger().debug("{} got {} to_new_sid_1(new_sid={}, {}) at {}",
+                 conn, cc_seq, new_sid,
+                 fmt::format("{}",
+                   is_accept_or_connect ?
+                   (*is_replace ? "accept(replace)" : "accept(!replace)") :
+                   "connect"),
+                 io_stat_printer{*this});
+  auto next_cc_seq = ++cc_seq;
+
+  if (get_io_state() != io_state_t::drop) {
+    ceph_assert_always(conn_ref);
+    if (new_sid != seastar::this_shard_id()) {
+      dispatchers.ms_handle_shard_change(conn_ref, new_sid, is_accept_or_connect);
+      // user can make changes
+    }
+  } else {
+    // it is possible that both io_handler and protocolv2 are
+    // trying to close each other from different cores simultaneously.
+    assert(!protocol_is_connected);
+  }
+
+  if (get_io_state() != io_state_t::drop) {
+    if (is_accept_or_connect) {
+      // protocol_is_connected can be from true to true here if the replacing is
+      // happening to a connected connection.
+    } else {
+      ceph_assert_always(protocol_is_connected == false);
+    }
+    protocol_is_connected = true;
+  } else {
+    assert(!protocol_is_connected);
+  }
+
+  bool is_dropped = false;
+  if (get_io_state() == io_state_t::drop) {
+    is_dropped = true;
+  }
+  ceph_assert_always(get_io_state() != io_state_t::open);
+
+  // apply the switching atomically
+  ceph_assert_always(conn_ref);
+  conn_ref.reset();
+  auto prv_sid = get_shard_id();
+  ceph_assert_always(maybe_prv_shard_states == nullptr);
+  maybe_prv_shard_states = std::move(shard_states);
+  shard_states = shard_states_t::create_from_previous(
+      *maybe_prv_shard_states, new_sid);
+  assert(new_sid == get_shard_id());
+
+  return seastar::smp::submit_to(new_sid,
+      [this, next_cc_seq, is_dropped, prv_sid, is_replace, conn_fref=std::move(conn_fref)]() mutable {
+    logger().debug("{} got {} to_new_sid_2(prv_sid={}, is_dropped={}, {}) at {}",
+                   conn, next_cc_seq, prv_sid, is_dropped,
+                   fmt::format("{}",
+                     is_replace.has_value() ?
+                     (*is_replace ? "accept(replace)" : "accept(!replace)") :
+                     "connect"),
+                   io_stat_printer{*this});
+
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+    ceph_assert_always(get_io_state() != io_state_t::open);
+    ceph_assert_always(!maybe_dropped_sid.has_value());
+    ceph_assert_always(crosscore.proceed_or_wait(next_cc_seq));
+
+    if (is_dropped) {
+      ceph_assert_always(get_io_state() == io_state_t::drop);
+      ceph_assert_always(shard_states->assert_closed_and_exit());
+      maybe_dropped_sid = prv_sid;
+      // cleanup_prv_shard() will be done in a follow-up close_io()
+    } else {
+      // possible at io_state_t::drop
+
+      // previous shard is not cleaned,
+      // but close_io() is responsible to clean up the current shard,
+      // so cleanup the previous shard here.
+      shard_states->dispatch_in_background(
+          "cleanup_prv_sid", conn, [this, prv_sid] {
+        return cleanup_prv_shard(prv_sid);
+      });
+      maybe_notify_out_dispatch();
+    }
+
+    ceph_assert_always(!conn_ref);
+    // assign even if already dropping
+    conn_ref = make_local_shared_foreign(std::move(conn_fref));
+
+    if (get_io_state() != io_state_t::drop) {
+      if (is_replace.has_value()) {
+        dispatchers.ms_handle_accept(conn_ref, prv_sid, *is_replace);
+      } else {
+        dispatchers.ms_handle_connect(conn_ref, prv_sid);
+      }
+      // user can make changes
+    }
+  });
+}
+
+seastar::future<> IOHandler::set_accepted_sid(
+    crosscore_t::seq_t cc_seq,
+    seastar::shard_id sid,
+    ConnectionFRef conn_fref)
+{
+  assert(seastar::this_shard_id() == get_shard_id());
+  assert(get_io_state() == io_state_t::none);
+  ceph_assert_always(conn_ref);
+  conn_ref.reset();
+  assert(maybe_prv_shard_states == nullptr);
+  shard_states.reset();
+  shard_states = shard_states_t::create(sid, io_state_t::none);
+  return seastar::smp::submit_to(sid,
+      [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
+    // must be the first to proceed
+    ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
+
+    logger().debug("{} set accepted sid", conn);
+    ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+    ceph_assert_always(get_io_state() == io_state_t::none);
+    assert(maybe_prv_shard_states == nullptr);
+    ceph_assert_always(!conn_ref);
+    conn_ref = make_local_shared_foreign(std::move(conn_fref));
+  });
 }
 
 void IOHandler::dispatch_reset(bool is_replace)
 {
-  ceph_assert_always(io_state == io_state_t::drop);
+  ceph_assert_always(get_io_state() == io_state_t::drop);
   if (!need_dispatch_reset) {
     return;
   }
   need_dispatch_reset = false;
+  ceph_assert_always(conn_ref);
+
   dispatchers.ms_handle_reset(conn_ref, is_replace);
+  // user can make changes
 }
 
 void IOHandler::dispatch_remote_reset()
 {
-  if (io_state == io_state_t::drop) {
+  if (get_io_state() == io_state_t::drop) {
     return;
   }
+  ceph_assert_always(conn_ref);
+
   dispatchers.ms_handle_remote_reset(conn_ref);
+  // user can make changes
 }
 
 void IOHandler::ack_out_sent(seq_num_t seq)
@@ -364,85 +798,74 @@ void IOHandler::ack_out_sent(seq_num_t seq)
   }
 }
 
-seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
-  assert(!is_out_queued());
-  return frame_assembler->flush(
-  ).then([this] {
-    if (!is_out_queued()) {
-      // still nothing pending to send after flush,
-      // the dispatching can ONLY stop now
-      ceph_assert(out_dispatching);
-      out_dispatching = false;
-      if (unlikely(out_exit_dispatching.has_value())) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: nothing queued at {},"
-                      " set out_exit_dispatching",
-                      conn, io_state);
-      }
-      return seastar::make_ready_future<stop_t>(stop_t::yes);
-    } else {
-      // something is pending to send during flushing
-      return seastar::make_ready_future<stop_t>(stop_t::no);
-    }
-  });
-}
-
-seastar::future<> IOHandler::do_out_dispatch()
+seastar::future<>
+IOHandler::do_out_dispatch(shard_states_t &ctx)
 {
-  return seastar::repeat([this] {
-    switch (io_state) {
+  return seastar::repeat([this, &ctx] {
+    switch (ctx.get_io_state()) {
      case io_state_t::open: {
-      bool still_queued = is_out_queued();
-      if (unlikely(!still_queued)) {
-        return try_exit_out_dispatch();
+      if (unlikely(!is_out_queued())) {
+        // try exit open dispatching
+        return frame_assembler->flush<false>(
+        ).then([this, &ctx] {
+          if (ctx.get_io_state() != io_state_t::open || is_out_queued()) {
+            return seastar::make_ready_future<stop_t>(stop_t::no);
+          }
+          // still nothing pending to send after flush,
+          // open dispatching can ONLY stop now
+          ctx.exit_out_dispatching("exit-open", conn);
+          return seastar::make_ready_future<stop_t>(stop_t::yes);
+        });
       }
+
+      auto require_keepalive = need_keepalive;
+      need_keepalive = false;
+      auto maybe_keepalive_ack = next_keepalive_ack;
+      next_keepalive_ack = std::nullopt;
       auto to_ack = ack_left;
       assert(to_ack == 0 || in_seq > 0);
-      return frame_assembler->write(
-        sweep_out_pending_msgs_to_sent(
-          need_keepalive, next_keepalive_ack, to_ack > 0)
-      ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
-        need_keepalive = false;
-        if (next_keepalive_ack == prv_keepalive_ack) {
-          next_keepalive_ack = std::nullopt;
-        }
-        assert(ack_left >= to_ack);
-        ack_left -= to_ack;
-        if (!is_out_queued()) {
-          return try_exit_out_dispatch();
-        } else {
-          // messages were enqueued during socket write
-          return seastar::make_ready_future<stop_t>(stop_t::no);
+      ack_left = 0;
+#ifdef UNIT_TESTS_BUILT
+      auto ret = sweep_out_pending_msgs_to_sent(
+          require_keepalive, maybe_keepalive_ack, to_ack > 0);
+      return frame_assembler->intercept_frames(ret.tags, true
+      ).then([this, bl=std::move(ret.bl)]() mutable {
+        return frame_assembler->write<false>(std::move(bl));
+      }
+#else
+      auto bl = sweep_out_pending_msgs_to_sent(
+          require_keepalive, maybe_keepalive_ack, to_ack > 0);
+      return frame_assembler->write<false>(std::move(bl)
+#endif
+      ).then([this, &ctx] {
+        if (ctx.get_io_state() != io_state_t::open) {
+          return frame_assembler->flush<false>(
+          ).then([] {
+            return seastar::make_ready_future<stop_t>(stop_t::no);
+          });
         }
+
+        // FIXME: may leak a flush if state is changed after return and before
+        // the next repeat body.
+        return seastar::make_ready_future<stop_t>(stop_t::no);
       });
      }
      case io_state_t::delay:
       // delay out dispatching until open
-      if (out_exit_dispatching) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: delay and set out_exit_dispatching ...", conn);
-      } else {
-        logger().info("{} do_out_dispatch: delay ...", conn);
-      }
-      return io_state_changed.get_future(
+      ctx.notify_out_dispatching_stopped("delay...", conn);
+      return ctx.wait_state_change(
       ).then([] { return stop_t::no; });
      case io_state_t::drop:
-      ceph_assert(out_dispatching);
-      out_dispatching = false;
-      if (out_exit_dispatching) {
-        out_exit_dispatching->set_value();
-        out_exit_dispatching = std::nullopt;
-        logger().info("{} do_out_dispatch: dropped and set out_exit_dispatching", conn);
-      } else {
-        logger().info("{} do_out_dispatch: dropped", conn);
-      }
+      ctx.exit_out_dispatching("dropped", conn);
+      return seastar::make_ready_future<stop_t>(stop_t::yes);
+     case io_state_t::switched:
+      ctx.exit_out_dispatching("switched", conn);
       return seastar::make_ready_future<stop_t>(stop_t::yes);
      default:
-      ceph_assert(false);
+      ceph_abort("impossible");
     }
-  }).handle_exception_type([this] (const std::system_error& e) {
+  }).handle_exception_type([this, &ctx](const std::system_error& e) {
+    auto io_state = ctx.get_io_state();
     if (e.code() != std::errc::broken_pipe &&
         e.code() != std::errc::connection_reset &&
         e.code() != error::negotiation_failure) {
@@ -452,58 +875,83 @@ seastar::future<> IOHandler::do_out_dispatch()
     }
 
     if (io_state == io_state_t::open) {
-      logger().info("{} do_out_dispatch(): fault at {}, going to delay -- {}",
-                    conn, io_state, e.what());
+      auto cc_seq = crosscore.prepare_submit();
+      logger().info("{} do_out_dispatch(): fault at {}, {}, going to delay -- {}, "
+                    "send {} notify_out_fault()",
+                    conn, io_state, io_stat_printer{*this}, e.what(), cc_seq);
       std::exception_ptr eptr;
       try {
         throw e;
       } catch(...) {
         eptr = std::current_exception();
       }
-      set_io_state(io_state_t::delay);
-      handshake_listener->notify_out_fault("do_out_dispatch", eptr);
+      do_set_io_state(io_state_t::delay);
+      shard_states->dispatch_in_background(
+          "notify_out_fault(out)", conn, [this, cc_seq, eptr] {
+        auto states = get_states();
+        return seastar::smp::submit_to(
+            conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+          return handshake_listener->notify_out_fault(
+              cc_seq, "do_out_dispatch", eptr, states);
+        });
+      });
     } else {
-      logger().info("{} do_out_dispatch(): fault at {} -- {}",
-                    conn, io_state, e.what());
+      if (io_state != io_state_t::switched) {
+        logger().info("{} do_out_dispatch(): fault at {}, {} -- {}",
+                      conn, io_state, io_stat_printer{*this}, e.what());
+      } else {
+        logger().info("{} do_out_dispatch(): fault at {} -- {}",
+                      conn, io_state, e.what());
+      }
     }
 
-    return do_out_dispatch();
+    return do_out_dispatch(ctx);
   });
 }
 
+void IOHandler::maybe_notify_out_dispatch()
+{
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  if (is_out_queued()) {
+    notify_out_dispatch();
+  }
+}
+
 void IOHandler::notify_out_dispatch()
 {
-  handshake_listener->notify_out();
-  if (out_dispatching) {
-    // already dispatching
-    return;
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  assert(is_out_queued());
+  if (need_notify_out) {
+    auto cc_seq = crosscore.prepare_submit();
+    logger().debug("{} send {} notify_out()",
+                   conn, cc_seq);
+    shard_states->dispatch_in_background(
+        "notify_out", conn, [this, cc_seq] {
+      return seastar::smp::submit_to(
+          conn.get_messenger_shard_id(), [this, cc_seq] {
+        return handshake_listener->notify_out(cc_seq);
+      });
+    });
   }
-  out_dispatching = true;
-  switch (io_state) {
-   case io_state_t::open:
-     [[fallthrough]];
-   case io_state_t::delay:
-    assert(!gate.is_closed());
-    gate.dispatch_in_background("do_out_dispatch", conn, [this] {
-      return do_out_dispatch();
+  if (shard_states->try_enter_out_dispatching()) {
+    shard_states->dispatch_in_background(
+        "do_out_dispatch", conn, [this] {
+      return do_out_dispatch(*shard_states);
     });
-    return;
-   case io_state_t::drop:
-    out_dispatching = false;
-    return;
-   default:
-    ceph_assert(false);
   }
 }
 
 seastar::future<>
-IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
+IOHandler::read_message(
+    shard_states_t &ctx,
+    utime_t throttle_stamp,
+    std::size_t msg_size)
 {
-  return frame_assembler->read_frame_payload(
-  ).then([this, throttle_stamp, msg_size](auto payload) {
-    if (unlikely(io_state != io_state_t::open)) {
+  return frame_assembler->read_frame_payload<false>(
+  ).then([this, throttle_stamp, msg_size, &ctx](auto payload) {
+    if (unlikely(ctx.get_io_state() != io_state_t::open)) {
       logger().debug("{} triggered {} during read_message()",
-                     conn, io_state);
+                     conn, ctx.get_io_state());
       abort_protocol();
     }
 
@@ -561,7 +1009,7 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
     // client side queueing because messages can't be renumbered, but the (kernel)
     // client will occasionally pull a message out of the sent queue to send
     // elsewhere.  in that case it doesn't matter if we "got" it or not.
-    uint64_t cur_seq = get_in_seq();
+    uint64_t cur_seq = in_seq;
     if (message->get_seq() <= cur_seq) {
       logger().error("{} got old message {} <= {} {}, discarding",
                      conn, message->get_seq(), cur_seq, *message);
@@ -605,20 +1053,24 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
-    assert(io_state == io_state_t::open);
+    assert(ctx.get_io_state() == io_state_t::open);
+    assert(get_io_state() == io_state_t::open);
+    ceph_assert_always(conn_ref);
+
     // throttle the reading process by the returned future
     return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+    // user can make changes
   });
 }
 
 void IOHandler::do_in_dispatch()
 {
-  ceph_assert_always(!in_exit_dispatching.has_value());
-  in_exit_dispatching = seastar::promise<>();
-  gate.dispatch_in_background("do_in_dispatch", conn, [this] {
-    return seastar::keep_doing([this] {
-      return frame_assembler->read_main_preamble(
-      ).then([this](auto ret) {
+  shard_states->enter_in_dispatching();
+  shard_states->dispatch_in_background(
+      "do_in_dispatch", conn, [this, &ctx=*shard_states] {
+    return seastar::keep_doing([this, &ctx] {
+      return frame_assembler->read_main_preamble<false>(
+      ).then([this, &ctx](auto ret) {
         switch (ret.tag) {
           case Tag::MESSAGE: {
             size_t msg_size = get_msg_size(*ret.rx_frame_asm);
@@ -628,7 +1080,7 @@ void IOHandler::do_in_dispatch()
                 return seastar::now();
               }
               // TODO: message throttler
-              ceph_assert(false);
+              ceph_abort("TODO");
               return seastar::now();
             }).then([this, msg_size] {
               // throttle_bytes() logic
@@ -643,14 +1095,14 @@ void IOHandler::do_in_dispatch()
                              conn.policy.throttler_bytes->get_current(),
                              conn.policy.throttler_bytes->get_max());
               return conn.policy.throttler_bytes->get(msg_size);
-            }).then([this, msg_size] {
+            }).then([this, msg_size, &ctx] {
               // TODO: throttle_dispatch_queue() logic
               utime_t throttle_stamp{seastar::lowres_system_clock::now()};
-              return read_message(throttle_stamp, msg_size);
+              return read_message(ctx, throttle_stamp, msg_size);
             });
           }
           case Tag::ACK:
-            return frame_assembler->read_frame_payload(
+            return frame_assembler->read_frame_payload<false>(
             ).then([this](auto payload) {
               // handle_message_ack() logic
               auto ack = AckFrame::Decode(payload->back());
@@ -658,7 +1110,7 @@ void IOHandler::do_in_dispatch()
               ack_out_sent(ack.seq());
             });
           case Tag::KEEPALIVE2:
-            return frame_assembler->read_frame_payload(
+            return frame_assembler->read_frame_payload<false>(
             ).then([this](auto payload) {
               // handle_keepalive2() logic
               auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
@@ -666,12 +1118,14 @@ void IOHandler::do_in_dispatch()
                              conn, keepalive_frame.timestamp());
               // notify keepalive ack
               next_keepalive_ack = keepalive_frame.timestamp();
-              notify_out_dispatch();
+              if (seastar::this_shard_id() == get_shard_id()) {
+                notify_out_dispatch();
+              }
 
               last_keepalive = seastar::lowres_system_clock::now();
             });
           case Tag::KEEPALIVE2_ACK:
-            return frame_assembler->read_frame_payload(
+            return frame_assembler->read_frame_payload<false>(
             ).then([this](auto payload) {
               // handle_keepalive2_ack() logic
               auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
@@ -688,7 +1142,7 @@ void IOHandler::do_in_dispatch()
           }
         }
       });
-    }).handle_exception([this](std::exception_ptr eptr) {
+    }).handle_exception([this, &ctx](std::exception_ptr eptr) {
       const char *e_what;
       try {
         std::rethrow_exception(eptr);
@@ -696,21 +1150,138 @@ void IOHandler::do_in_dispatch()
         e_what = e.what();
       }
 
+      auto io_state = ctx.get_io_state();
       if (io_state == io_state_t::open) {
-        logger().info("{} do_in_dispatch(): fault at {}, going to delay -- {}",
-                      conn, io_state, e_what);
-        set_io_state(io_state_t::delay);
-        handshake_listener->notify_out_fault("do_in_dispatch", eptr);
+        auto cc_seq = crosscore.prepare_submit();
+        logger().info("{} do_in_dispatch(): fault at {}, {}, going to delay -- {}, "
+                      "send {} notify_out_fault()",
+                      conn, io_state, io_stat_printer{*this}, e_what, cc_seq);
+        do_set_io_state(io_state_t::delay);
+        shard_states->dispatch_in_background(
+            "notify_out_fault(in)", conn, [this, cc_seq, eptr] {
+          auto states = get_states();
+          return seastar::smp::submit_to(
+              conn.get_messenger_shard_id(), [this, cc_seq, eptr, states] {
+            return handshake_listener->notify_out_fault(
+                cc_seq, "do_in_dispatch", eptr, states);
+          });
+        });
       } else {
-        logger().info("{} do_in_dispatch(): fault at {} -- {}",
-                      conn, io_state, e_what);
+        if (io_state != io_state_t::switched) {
+          logger().info("{} do_in_dispatch(): fault at {}, {} -- {}",
+                        conn, io_state, io_stat_printer{*this}, e_what);
+        } else {
+          logger().info("{} do_in_dispatch(): fault at {} -- {}",
+                        conn, io_state, e_what);
+        }
       }
-    }).finally([this] {
-      ceph_assert_always(in_exit_dispatching.has_value());
-      in_exit_dispatching->set_value();
-      in_exit_dispatching = std::nullopt;
+    }).finally([&ctx] {
+      ctx.exit_in_dispatching();
     });
   });
 }
 
+seastar::future<>
+IOHandler::close_io(
+    crosscore_t::seq_t cc_seq,
+    bool is_dispatch_reset,
+    bool is_replace)
+{
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+  if (!crosscore.proceed_or_wait(cc_seq)) {
+    logger().debug("{} got {} close_io(), wait at {}",
+                   conn, cc_seq, crosscore.get_in_seq());
+    return crosscore.wait(cc_seq
+    ).then([this, cc_seq, is_dispatch_reset, is_replace] {
+      return close_io(cc_seq, is_dispatch_reset, is_replace);
+    });
+  }
+
+  logger().debug("{} got {} close_io(reset={}, replace={})",
+                 conn, cc_seq, is_dispatch_reset, is_replace);
+  ceph_assert_always(get_io_state() == io_state_t::drop);
+
+  if (is_dispatch_reset) {
+    dispatch_reset(is_replace);
+  }
+
+  ceph_assert_always(conn_ref);
+  conn_ref.reset();
+
+  // cannot be running in parallel with to_new_sid()
+  if (maybe_dropped_sid.has_value()) {
+    assert(shard_states->assert_closed_and_exit());
+    auto prv_sid = *maybe_dropped_sid;
+    return cleanup_prv_shard(prv_sid);
+  } else {
+    return shard_states->close(
+    ).then([this] {
+      assert(shard_states->assert_closed_and_exit());
+    });
+  }
+}
+
+/*
+ * IOHandler::shard_states_t
+ */
+
+void
+IOHandler::shard_states_t::notify_out_dispatching_stopped(
+    const char *what, SocketConnection &conn)
+{
+  assert(seastar::this_shard_id() == sid);
+  if (unlikely(out_exit_dispatching.has_value())) {
+    out_exit_dispatching->set_value();
+    out_exit_dispatching = std::nullopt;
+    logger().info("{} do_out_dispatch: stop({}) at {}, set out_exit_dispatching",
+                  conn, what, io_state);
+  } else {
+    if (unlikely(io_state != io_state_t::open)) {
+      logger().info("{} do_out_dispatch: stop({}) at {}, no out_exit_dispatching",
+                    conn, what, io_state);
+    }
+  }
+}
+
+seastar::future<>
+IOHandler::shard_states_t::wait_io_exit_dispatching()
+{
+  assert(seastar::this_shard_id() == sid);
+  assert(io_state != io_state_t::open);
+  assert(!gate.is_closed());
+  return seastar::when_all(
+    [this] {
+      if (out_exit_dispatching) {
+        return out_exit_dispatching->get_future();
+      } else {
+        return seastar::now();
+      }
+    }(),
+    [this] {
+      if (in_exit_dispatching) {
+        return in_exit_dispatching->get_future();
+      } else {
+        return seastar::now();
+      }
+    }()
+  ).discard_result();
+}
+
+IOHandler::shard_states_ref_t
+IOHandler::shard_states_t::create_from_previous(
+    shard_states_t &prv_states,
+    seastar::shard_id new_sid)
+{
+  auto io_state = prv_states.io_state;
+  assert(io_state != io_state_t::open);
+  auto ret = shard_states_t::create(new_sid, io_state);
+  if (io_state == io_state_t::drop) {
+    // the new gate should not never be used
+    auto fut = ret->gate.close();
+    ceph_assert_always(fut.available());
+  }
+  prv_states.set_io_state(io_state_t::switched);
+  return ret;
+}
+
 } // namespace crimson::net