]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/ProtocolV2.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
index fa47da50832c0956441af3af05033116d66ead35..95b756637f4e8a8de6ab2e2d49d3a2c9e65aa2f6 100644 (file)
@@ -3,19 +3,18 @@
 
 #include "ProtocolV2.h"
 
-#include <seastar/core/lowres_clock.hh>
 #include <fmt/format.h>
+#include <fmt/ranges.h>
 #include "include/msgr.h"
 #include "include/random.h"
+#include "msg/msg_fmt.h"
 
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
 #include "crimson/common/formatter.h"
+#include "crimson/common/log.h"
 
-#include "chained_dispatchers.h"
 #include "Errors.h"
-#include "Socket.h"
-#include "SocketConnection.h"
 #include "SocketMessenger.h"
 
 #ifdef UNIT_TESTS_BUILT
@@ -24,6 +23,8 @@
 
 using namespace ceph::msgr::v2;
 using crimson::common::local_conf;
+using io_state_t = crimson::net::IOHandler::io_state_t;
+using io_stat_printer = crimson::net::IOHandler::io_stat_printer;
 
 namespace {
 
@@ -67,9 +68,9 @@ seastar::logger& logger() {
   throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
 }
 
-[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
-  proto.close(dispatch_reset);
-  abort_protocol();
+#define ABORT_IN_CLOSE(is_dispatch_reset) { \
+  do_close(is_dispatch_reset);              \
+  abort_protocol();                         \
 }
 
 inline void expect_tag(const Tag& expected,
@@ -103,34 +104,23 @@ inline uint64_t generate_client_cookie() {
 namespace crimson::net {
 
 #ifdef UNIT_TESTS_BUILT
-void intercept(Breakpoint bp, bp_type_t type,
-               SocketConnection& conn, SocketRef& socket) {
-  if (conn.interceptor) {
-    auto action = conn.interceptor->intercept(conn, Breakpoint(bp));
-    socket->set_trap(type, action, &conn.interceptor->blocker);
+// should be consistent to intercept_frame() in FrameAssemblerV2.cc
+void intercept(Breakpoint bp,
+               bp_type_t type,
+               SocketConnection& conn,
+               Interceptor *interceptor,
+               SocketRef& socket) {
+  if (interceptor) {
+    auto action = interceptor->intercept(conn, Breakpoint(bp));
+    socket->set_trap(type, action, &interceptor->blocker);
   }
 }
 
 #define INTERCEPT_CUSTOM(bp, type)       \
-intercept({bp}, type, conn, socket)
-
-#define INTERCEPT_FRAME(tag, type)       \
-intercept({static_cast<Tag>(tag), type}, \
-          type, conn, socket)
-
-#define INTERCEPT_N_RW(bp)                               \
-if (conn.interceptor) {                                  \
-  auto action = conn.interceptor->intercept(conn, {bp}); \
-  ceph_assert(action != bp_action_t::BLOCK);             \
-  if (action == bp_action_t::FAULT) {                    \
-    abort_in_fault();                                    \
-  }                                                      \
-}
-
+intercept({bp}, type, conn,              \
+          conn.interceptor, conn.socket)
 #else
 #define INTERCEPT_CUSTOM(bp, type)
-#define INTERCEPT_FRAME(tag, type)
-#define INTERCEPT_N_RW(bp)
 #endif
 
 seastar::future<> ProtocolV2::Timer::backoff(double seconds)
@@ -148,27 +138,22 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds)
   });
 }
 
-ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
-                       SocketConnection& conn,
-                       SocketMessenger& messenger)
-  : Protocol(proto_t::v2, dispatchers, conn),
-    messenger{messenger},
+ProtocolV2::ProtocolV2(SocketConnection& conn,
+                       IOHandler &io_handler)
+  : conn{conn},
+    messenger{conn.messenger},
+    io_handler{io_handler},
+    frame_assembler{FrameAssemblerV2::create(conn)},
+    auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
     protocol_timer{conn}
 {}
 
 ProtocolV2::~ProtocolV2() {}
 
-bool ProtocolV2::is_connected() const {
-  return state == state_t::READY ||
-         state == state_t::ESTABLISHING ||
-         state == state_t::REPLACING;
-}
-
 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!socket);
   ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
   conn.target_addr = _peer_addr;
@@ -185,198 +170,163 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
   execute_connecting();
 }
 
-void ProtocolV2::start_accept(SocketRef&& sock,
+void ProtocolV2::start_accept(SocketRef&& new_socket,
                               const entity_addr_t& _peer_addr)
 {
   ceph_assert(state == state_t::NONE);
-  ceph_assert(!socket);
   // until we know better
   conn.target_addr = _peer_addr;
-  socket = std::move(sock);
+  frame_assembler->set_socket(std::move(new_socket));
+  has_socket = true;
+  is_socket_valid = true;
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
   messenger.accept_conn(
     seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   execute_accepting();
 }
 
-// TODO: Frame related implementations, probably to a separate class.
-
-void ProtocolV2::enable_recording()
-{
-  rxbuf.clear();
-  txbuf.clear();
-  record_io = true;
-}
-
-seastar::future<Socket::tmp_buf> ProtocolV2::read_exactly(size_t bytes)
+void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
 {
-  if (unlikely(record_io)) {
-    return socket->read_exactly(bytes)
-    .then([this] (auto bl) {
-      rxbuf.append(buffer::create(bl.share()));
-      return bl;
-    });
+  if (!reentrant && new_state == state) {
+    logger().error("{} is not allowed to re-trigger state {}",
+                   conn, get_state_name(state));
+    ceph_abort();
+  }
+  if (state == state_t::CLOSING) {
+    logger().error("{} CLOSING is not allowed to trigger state {}",
+                   conn, get_state_name(new_state));
+    ceph_abort();
+  }
+  logger().debug("{} TRIGGER {}, was {}",
+                 conn, get_state_name(new_state), get_state_name(state));
+  auto pre_state = state;
+  if (pre_state == state_t::READY) {
+    assert(!gate.is_closed());
+    ceph_assert_always(!exit_io.has_value());
+    exit_io = seastar::shared_promise<>();
+  }
+  state = new_state;
+  if (new_state == state_t::READY) {
+    // I'm not responsible to shutdown the socket at READY
+    is_socket_valid = false;
+    io_handler.set_io_state(new_io_state, std::move(frame_assembler));
   } else {
-    return socket->read_exactly(bytes);
-  };
-}
+    io_handler.set_io_state(new_io_state, nullptr);
+  }
 
-seastar::future<bufferlist> ProtocolV2::read(size_t bytes)
-{
-  if (unlikely(record_io)) {
-    return socket->read(bytes)
-    .then([this] (auto buf) {
-      rxbuf.append(buf);
-      return buf;
+  /*
+   * not atomic below
+   */
+
+  if (pre_state == state_t::READY) {
+    gate.dispatch_in_background("exit_io", conn, [this] {
+      return io_handler.wait_io_exit_dispatching(
+      ).then([this](FrameAssemblerV2Ref fa) {
+        frame_assembler = std::move(fa);
+        exit_io->set_value();
+        exit_io = std::nullopt;
+      });
     });
-  } else {
-    return socket->read(bytes);
   }
 }
 
-seastar::future<> ProtocolV2::write(bufferlist&& buf)
+void ProtocolV2::fault(
+    state_t expected_state,
+    const char *where,
+    std::exception_ptr eptr)
 {
-  if (unlikely(record_io)) {
-    txbuf.append(buf);
+  assert(expected_state == state_t::CONNECTING ||
+         expected_state == state_t::ESTABLISHING ||
+         expected_state == state_t::REPLACING ||
+         expected_state == state_t::READY);
+  const char *e_what;
+  try {
+    std::rethrow_exception(eptr);
+  } catch (std::exception &e) {
+    e_what = e.what();
   }
-  return socket->write(std::move(buf));
-}
 
-seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
-{
-  if (unlikely(record_io)) {
-    txbuf.append(buf);
+  if (state != expected_state) {
+    logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
+                  conn,
+                  get_state_name(expected_state),
+                  where,
+                  get_state_name(state),
+                  e_what);
+#ifndef NDEBUG
+    if (expected_state == state_t::REPLACING) {
+      assert(state == state_t::CLOSING);
+    } else if (expected_state == state_t::READY) {
+      assert(state == state_t::CLOSING ||
+             state == state_t::REPLACING ||
+             state == state_t::CONNECTING ||
+             state == state_t::STANDBY);
+    } else {
+      assert(state == state_t::CLOSING ||
+             state == state_t::REPLACING);
+    }
+#endif
+    return;
   }
-  return socket->write_flush(std::move(buf));
-}
-
-size_t ProtocolV2::get_current_msg_size() const
-{
-  ceph_assert(rx_frame_asm.get_num_segments() > 0);
-  size_t sum = 0;
-  // we don't include SegmentIndex::Msg::HEADER.
-  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
-    sum += rx_frame_asm.get_segment_logical_len(idx);
+  assert(state == expected_state);
+
+  if (state != state_t::CONNECTING && conn.policy.lossy) {
+    // socket will be shutdown in do_close()
+    logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
+                  conn, get_state_name(state), where, e_what);
+    do_close(true);
+    return;
   }
-  return sum;
-}
-
-seastar::future<Tag> ProtocolV2::read_main_preamble()
-{
-  rx_preamble.clear();
-  return read_exactly(rx_frame_asm.get_preamble_onwire_len())
-    .then([this] (auto bl) {
-      rx_segments_data.clear();
-      try {
-        rx_preamble.append(buffer::create(std::move(bl)));
-        const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
-        INTERCEPT_FRAME(tag, bp_type_t::READ);
-        return tag;
-      } catch (FrameError& e) {
-        logger().warn("{} read_main_preamble: {}", conn, e.what());
-        abort_in_fault();
-      }
-    });
-}
 
-seastar::future<> ProtocolV2::read_frame_payload()
-{
-  ceph_assert(rx_segments_data.empty());
-
-  return seastar::do_until(
-    [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
-    [this] {
-      // TODO: create aligned and contiguous buffer from socket
-      const size_t seg_idx = rx_segments_data.size();
-      if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
-         alignment != segment_t::DEFAULT_ALIGNMENT) {
-        logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
-                       conn, alignment, rx_segments_data.size());
-      }
-      uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
-      // TODO: create aligned and contiguous buffer from socket
-      return read_exactly(onwire_len).then([this] (auto tmp_bl) {
-        logger().trace("{} RECV({}) frame segment[{}]",
-                       conn, tmp_bl.size(), rx_segments_data.size());
-        bufferlist segment;
-        segment.append(buffer::create(std::move(tmp_bl)));
-        rx_segments_data.emplace_back(std::move(segment));
-      });
-    }
-  ).then([this] {
-    return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
-  }).then([this] (auto bl) {
-    logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
-    bool ok = false;
-    try {
-      bufferlist rx_epilogue;
-      rx_epilogue.append(buffer::create(std::move(bl)));
-      ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
-    } catch (FrameError& e) {
-      logger().error("read_frame_payload: {} {}", conn, e.what());
-      abort_in_fault();
-    } catch (ceph::crypto::onwire::MsgAuthError&) {
-      logger().error("read_frame_payload: {} bad auth tag", conn);
-      abort_in_fault();
-    }
-    // we do have a mechanism that allows transmitter to start sending message
-    // and abort after putting entire data field on wire. This will be used by
-    // the kernel client to avoid unnecessary buffering.
-    if (!ok) {
-      // TODO
-      ceph_assert(false);
+  if (likely(has_socket)) {
+    if (likely(is_socket_valid)) {
+      ceph_assert_always(state != state_t::READY);
+      frame_assembler->shutdown_socket();
+      is_socket_valid = false;
+    } else {
+      ceph_assert_always(state != state_t::ESTABLISHING);
     }
-  });
-}
-
-template <class F>
-seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
-{
-  auto bl = frame.get_buffer(tx_frame_asm);
-  const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
-  logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
-                 conn, bl.length(), (int)main_preamble->tag,
-                 (int)main_preamble->num_segments, main_preamble->crc);
-  INTERCEPT_FRAME(main_preamble->tag, bp_type_t::WRITE);
-  if (flush) {
-    return write_flush(std::move(bl));
-  } else {
-    return write(std::move(bl));
+  } else { // !has_socket
+    ceph_assert_always(state == state_t::CONNECTING);
+    assert(!is_socket_valid);
   }
-}
 
-void ProtocolV2::trigger_state(state_t _state, write_state_t _write_state, bool reentrant)
-{
-  if (!reentrant && _state == state) {
-    logger().error("{} is not allowed to re-trigger state {}",
-                   conn, get_state_name(state));
-    ceph_assert(false);
-  }
-  logger().debug("{} TRIGGER {}, was {}",
-                 conn, get_state_name(_state), get_state_name(state));
-  state = _state;
-  set_write_state(_write_state);
-}
-
-void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
-{
-  if (conn.policy.lossy) {
-    logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
-                  conn, func_name, get_state_name(state), eptr);
-    close(true);
-  } else if (conn.policy.server ||
-             (conn.policy.standby &&
-              (!is_queued() && conn.sent.empty()))) {
-    logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+  if (conn.policy.server ||
+      (conn.policy.standby && !io_handler.is_out_queued_or_sent())) {
+    if (conn.policy.server) {
+      logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
+                    conn,
+                    get_state_name(state),
+                    where,
+                    io_stat_printer{io_handler},
+                    e_what);
+    } else {
+      logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
+                    conn,
+                    get_state_name(state),
+                    where,
+                    io_stat_printer{io_handler},
+                    e_what);
+    }
     execute_standby();
-  } else if (backoff) {
-    logger().info("{} {}: fault at {}, going to WAIT -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+  } else if (state == state_t::CONNECTING ||
+             state == state_t::REPLACING) {
+    logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
+                  conn,
+                  get_state_name(state),
+                  where,
+                  io_stat_printer{io_handler},
+                  e_what);
     execute_wait(false);
   } else {
-    logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
-                  conn, func_name, get_state_name(state), eptr);
+    assert(state == state_t::READY ||
+           state == state_t::ESTABLISHING);
+    logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
+                  conn,
+                  get_state_name(state),
+                  where,
+                  io_stat_printer{io_handler},
+                  e_what);
     execute_connecting();
   }
 }
@@ -385,14 +335,11 @@ void ProtocolV2::reset_session(bool full)
 {
   server_cookie = 0;
   connect_seq = 0;
-  conn.in_seq = 0;
   if (full) {
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
-    reset_write();
-    dispatchers.ms_handle_remote_reset(
-       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   }
+  io_handler.reset_session(full);
 }
 
 seastar::future<std::tuple<entity_type_t, entity_addr_t>>
@@ -415,11 +362,11 @@ ProtocolV2::banner_exchange(bool is_connect)
                  CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
   INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
-  return write_flush(std::move(bl)).then([this] {
+  return frame_assembler->write_flush(std::move(bl)).then([this] {
       // 2. read peer banner
       unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
-      return read_exactly(banner_len); // or read exactly?
+      return frame_assembler->read_exactly(banner_len); // or read exactly?
     }).then([this] (auto bl) {
       // 3. process peer banner and read banner_payload
       unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
@@ -449,61 +396,57 @@ ProtocolV2::banner_exchange(bool is_connect)
       }
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
-      return read(payload_len);
+      return frame_assembler->read(payload_len);
     }).then([this, is_connect] (bufferlist bl) {
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
-      uint64_t peer_supported_features;
-      uint64_t peer_required_features;
+      uint64_t _peer_supported_features;
+      uint64_t _peer_required_features;
       try {
-        decode(peer_supported_features, p);
-        decode(peer_required_features, p);
+        decode(_peer_supported_features, p);
+        decode(_peer_required_features, p);
       } catch (const buffer::error &e) {
         logger().warn("{} decode banner payload failed", conn);
         abort_in_fault();
       }
       logger().debug("{} RECV({}) banner features: supported={} required={}",
                      conn, bl.length(),
-                     peer_supported_features, peer_required_features);
+                     _peer_supported_features, _peer_required_features);
 
       // Check feature bit compatibility
       uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
       uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
-      if ((required_features & peer_supported_features) != required_features) {
+      if ((required_features & _peer_supported_features) != required_features) {
         logger().error("{} peer does not support all required features"
                        " required={} peer_supported={}",
-                       conn, required_features, peer_supported_features);
-        abort_in_close(*this, is_connect);
+                       conn, required_features, _peer_supported_features);
+        ABORT_IN_CLOSE(is_connect);
       }
-      if ((supported_features & peer_required_features) != peer_required_features) {
+      if ((supported_features & _peer_required_features) != _peer_required_features) {
         logger().error("{} we do not support all peer required features"
                        " peer_required={} supported={}",
-                       conn, peer_required_features, supported_features);
-        abort_in_close(*this, is_connect);
-      }
-      this->peer_required_features = peer_required_features;
-      if (this->peer_required_features == 0) {
-        this->connection_features = msgr2_required;
+                       conn, _peer_required_features, supported_features);
+        ABORT_IN_CLOSE(is_connect);
       }
-      const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
-      tx_frame_asm.set_is_rev1(is_rev1);
-      rx_frame_asm.set_is_rev1(is_rev1);
+      peer_supported_features = _peer_supported_features;
+      bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+      frame_assembler->set_is_rev1(is_rev1);
 
       auto hello = HelloFrame::Encode(messenger.get_mytype(),
                                       conn.target_addr);
       logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
                      conn, ceph_entity_type_name(messenger.get_mytype()),
                      conn.target_addr);
-      return write_frame(hello);
+      return frame_assembler->write_flush_frame(hello);
     }).then([this] {
       //5. read peer HelloFrame
-      return read_main_preamble();
-    }).then([this] (Tag tag) {
-      expect_tag(Tag::HELLO, tag, conn, __func__);
-      return read_frame_payload();
-    }).then([this] {
+      return frame_assembler->read_main_preamble();
+    }).then([this](auto ret) {
+      expect_tag(Tag::HELLO, ret.tag, conn, "read_hello_frame");
+      return frame_assembler->read_frame_payload();
+    }).then([this](auto payload) {
       // 6. process peer HelloFrame
-      auto hello = HelloFrame::Decode(rx_segments_data.back());
+      auto hello = HelloFrame::Decode(payload->back());
       logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
                      conn, ceph_entity_type_name(hello.entity_type()),
                      hello.peer_addr());
@@ -516,20 +459,21 @@ ProtocolV2::banner_exchange(bool is_connect)
 
 seastar::future<> ProtocolV2::handle_auth_reply()
 {
-  return read_main_preamble()
-  .then([this] (Tag tag) {
-    switch (tag) {
+  return frame_assembler->read_main_preamble(
+  ).then([this](auto ret) {
+    switch (ret.tag) {
       case Tag::AUTH_BAD_METHOD:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_auth_bad_method() logic
-          auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
+          auto bad_method = AuthBadMethodFrame::Decode(payload->back());
           logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
                         "allowed_methods={}, allowed_modes={}",
                         conn, bad_method.method(), cpp_strerror(bad_method.result()),
                         bad_method.allowed_methods(), bad_method.allowed_modes());
           ceph_assert(messenger.get_auth_client());
           int r = messenger.get_auth_client()->handle_auth_bad_method(
-              conn.shared_from_this(), auth_meta,
+              conn, *auth_meta,
               bad_method.method(), bad_method.result(),
               bad_method.allowed_methods(), bad_method.allowed_modes());
           if (r < 0) {
@@ -540,33 +484,36 @@ seastar::future<> ProtocolV2::handle_auth_reply()
           return client_auth(bad_method.allowed_methods());
         });
       case Tag::AUTH_REPLY_MORE:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_auth_reply_more() logic
-          auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
+          auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
           logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
                          conn, auth_more.auth_payload().length());
           ceph_assert(messenger.get_auth_client());
           // let execute_connecting() take care of the thrown exception
           auto reply = messenger.get_auth_client()->handle_auth_reply_more(
-            conn.shared_from_this(), auth_meta, auth_more.auth_payload());
+            conn, *auth_meta, auth_more.auth_payload());
           auto more_reply = AuthRequestMoreFrame::Encode(reply);
           logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
                          conn, reply.length());
-          return write_frame(more_reply);
+          return frame_assembler->write_flush_frame(more_reply);
         }).then([this] {
           return handle_auth_reply();
         });
       case Tag::AUTH_DONE:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_auth_done() logic
-          auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+          auto auth_done = AuthDoneFrame::Decode(payload->back());
           logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
                          conn, auth_done.global_id(),
                          ceph_con_mode_name(auth_done.con_mode()),
                          auth_done.auth_payload().length());
           ceph_assert(messenger.get_auth_client());
           int r = messenger.get_auth_client()->handle_auth_done(
-              conn.shared_from_this(), auth_meta,
+              conn,
+              *auth_meta,
               auth_done.global_id(),
               auth_done.con_mode(),
               auth_done.auth_payload());
@@ -575,12 +522,11 @@ seastar::future<> ProtocolV2::handle_auth_reply()
             abort_in_fault();
           }
           auth_meta->con_mode = auth_done.con_mode();
-          session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
-              nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
+          frame_assembler->create_session_stream_handlers(*auth_meta, false);
           return finish_auth();
         });
       default: {
-        unexpected_tag(tag, conn, __func__);
+        unexpected_tag(ret.tag, conn, "handle_auth_reply");
         return seastar::now();
       }
     }
@@ -594,18 +540,19 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
 
   try {
     auto [auth_method, preferred_modes, bl] =
-      messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
+      messenger.get_auth_client()->get_auth_request(conn, *auth_meta);
     auth_meta->auth_method = auth_method;
     auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
     logger().debug("{} WRITE AuthRequestFrame: method={},"
                    " preferred_modes={}, payload_len={}",
                    conn, auth_method, preferred_modes, bl.length());
-    return write_frame(frame).then([this] {
+    return frame_assembler->write_flush_frame(frame
+    ).then([this] {
       return handle_auth_reply();
     });
   } catch (const crimson::auth::error& e) {
-    logger().error("{} get_initial_auth_request returned {}", conn, e);
-    abort_in_close(*this, true);
+    logger().error("{} get_initial_auth_request returned {}", conn, e.what());
+    ABORT_IN_CLOSE(true);
     return seastar::now();
   }
 }
@@ -613,10 +560,11 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::process_wait()
 {
-  return read_frame_payload().then([this] {
+  return frame_assembler->read_frame_payload(
+  ).then([this](auto payload) {
     // handle_wait() logic
     logger().debug("{} GOT WaitFrame", conn);
-    WaitFrame::Decode(rx_segments_data.back());
+    WaitFrame::Decode(payload->back());
     return next_step_t::wait;
   });
 }
@@ -647,14 +595,16 @@ ProtocolV2::client_connect()
                  conn.policy.features_supported,
                  conn.policy.features_required | msgr2_required,
                  flags, client_cookie);
-  return write_frame(client_ident).then([this] {
-    return read_main_preamble();
-  }).then([this] (Tag tag) {
-    switch (tag) {
+  return frame_assembler->write_flush_frame(client_ident
+  ).then([this] {
+    return frame_assembler->read_main_preamble();
+  }).then([this](auto ret) {
+    switch (ret.tag) {
       case Tag::IDENT_MISSING_FEATURES:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_ident_missing_features() logic
-          auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
+          auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
           logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
                         " (client does not support all server features)",
                         conn, ident_missing.features());
@@ -664,10 +614,11 @@ ProtocolV2::client_connect()
       case Tag::WAIT:
         return process_wait();
       case Tag::SERVER_IDENT:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_server_ident() logic
-          requeue_sent();
-          auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
+          io_handler.requeue_out_sent();
+          auto server_ident = ServerIdentFrame::Decode(payload->back());
           logger().debug("{} GOT ServerIdentFrame:"
                          " addrs={}, gid={}, gs={},"
                          " features_supported={}, features_required={},"
@@ -703,11 +654,12 @@ ProtocolV2::client_connect()
             logger().error("{} connection peer id ({}) does not match "
                            "what it should be ({}) during connecting, close",
                             conn, server_ident.gid(), conn.get_peer_id());
-            abort_in_close(*this, true);
+            ABORT_IN_CLOSE(true);
           }
           conn.set_peer_id(server_ident.gid());
           conn.set_features(server_ident.supported_features() &
                             conn.policy.features_supported);
+          logger().debug("{} UPDATE: features={}", conn, conn.get_features());
           peer_global_seq = server_ident.global_seq();
 
           bool lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
@@ -725,7 +677,7 @@ ProtocolV2::client_connect()
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
-        unexpected_tag(tag, conn, "post_client_connect");
+        unexpected_tag(ret.tag, conn, "post_client_connect");
         return seastar::make_ready_future<next_step_t>(next_step_t::none);
       }
     }
@@ -741,32 +693,32 @@ ProtocolV2::client_reconnect()
                                           server_cookie,
                                           global_seq,
                                           connect_seq,
-                                          conn.in_seq);
+                                          io_handler.get_in_seq());
   logger().debug("{} WRITE ReconnectFrame: addrs={}, client_cookie={},"
-                 " server_cookie={}, gs={}, cs={}, msg_seq={}",
+                 " server_cookie={}, gs={}, cs={}, in_seq={}",
                  conn, messenger.get_myaddrs(),
                  client_cookie, server_cookie,
-                 global_seq, connect_seq, conn.in_seq);
-  return write_frame(reconnect).then([this] {
-    return read_main_preamble();
-  }).then([this] (Tag tag) {
-    switch (tag) {
+                 global_seq, connect_seq, io_handler.get_in_seq());
+  return frame_assembler->write_flush_frame(reconnect).then([this] {
+    return frame_assembler->read_main_preamble();
+  }).then([this](auto ret) {
+    switch (ret.tag) {
       case Tag::SESSION_RETRY_GLOBAL:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_session_retry_global() logic
-          auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+          auto retry = RetryGlobalFrame::Decode(payload->back());
           logger().warn("{} GOT RetryGlobalFrame: gs={}",
                         conn, retry.global_seq());
-          return messenger.get_global_seq(retry.global_seq()).then([this] (auto gs) {
-            global_seq = gs;
-            logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
-            return client_reconnect();
-          });
+          global_seq = messenger.get_global_seq(retry.global_seq());
+          logger().warn("{} UPDATE: gs={} for retry global", conn, global_seq);
+          return client_reconnect();
         });
       case Tag::SESSION_RETRY:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_session_retry() logic
-          auto retry = RetryFrame::Decode(rx_segments_data.back());
+          auto retry = RetryFrame::Decode(payload->back());
           logger().warn("{} GOT RetryFrame: cs={}",
                         conn, retry.connect_seq());
           connect_seq = retry.connect_seq() + 1;
@@ -774,9 +726,15 @@ ProtocolV2::client_reconnect()
           return client_reconnect();
         });
       case Tag::SESSION_RESET:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} before reset_session()",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
           // handle_session_reset() logic
-          auto reset = ResetFrame::Decode(rx_segments_data.back());
+          auto reset = ResetFrame::Decode(payload->back());
           logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
           reset_session(reset.full());
           return client_connect();
@@ -784,16 +742,17 @@ ProtocolV2::client_reconnect()
       case Tag::WAIT:
         return process_wait();
       case Tag::SESSION_RECONNECT_OK:
-        return read_frame_payload().then([this] {
+        return frame_assembler->read_frame_payload(
+        ).then([this](auto payload) {
           // handle_reconnect_ok() logic
-          auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
+          auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
           logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
                          conn, reconnect_ok.msg_seq());
-          requeue_up_to(reconnect_ok.msg_seq());
+          io_handler.requeue_out_sent_up_to(reconnect_ok.msg_seq());
           return seastar::make_ready_future<next_step_t>(next_step_t::ready);
         });
       default: {
-        unexpected_tag(tag, conn, "post_client_reconnect");
+        unexpected_tag(ret.tag, conn, "post_client_reconnect");
         return seastar::make_ready_future<next_step_t>(next_step_t::none);
       }
     }
@@ -802,54 +761,78 @@ ProtocolV2::client_reconnect()
 
 void ProtocolV2::execute_connecting()
 {
-  trigger_state(state_t::CONNECTING, write_state_t::delay, true);
-  if (socket) {
-    socket->shutdown();
-  }
-  gated_execute("execute_connecting", [this] {
-      return messenger.get_global_seq().then([this] (auto gs) {
-          global_seq = gs;
-          assert(client_cookie != 0);
-          if (!conn.policy.lossy && server_cookie != 0) {
-            ++connect_seq;
-            logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
-                           conn, global_seq, connect_seq);
-          } else { // conn.policy.lossy || server_cookie == 0
-            assert(connect_seq == 0);
-            assert(server_cookie == 0);
-            logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+  ceph_assert_always(!is_socket_valid);
+  trigger_state(state_t::CONNECTING, io_state_t::delay, false);
+  gated_execute("execute_connecting", conn, [this] {
+      global_seq = messenger.get_global_seq();
+      assert(client_cookie != 0);
+      if (!conn.policy.lossy && server_cookie != 0) {
+        ++connect_seq;
+        logger().debug("{} UPDATE: gs={}, cs={} for reconnect",
+                       conn, global_seq, connect_seq);
+      } else { // conn.policy.lossy || server_cookie == 0
+        assert(connect_seq == 0);
+        assert(server_cookie == 0);
+        logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
+      }
+      return wait_exit_io().then([this] {
+#ifdef UNIT_TESTS_BUILT
+          // process custom_bp_t::SOCKET_CONNECTING
+          // supports CONTINUE/FAULT/BLOCK
+          if (conn.interceptor) {
+            auto action = conn.interceptor->intercept(
+                conn, {custom_bp_t::SOCKET_CONNECTING});
+            switch (action) {
+            case bp_action_t::CONTINUE:
+              return seastar::now();
+            case bp_action_t::FAULT:
+              logger().info("[Test] got FAULT");
+              abort_in_fault();
+            case bp_action_t::BLOCK:
+              logger().info("[Test] got BLOCK");
+              return conn.interceptor->blocker.block();
+            default:
+              ceph_abort("unexpected action from trap");
+            }
+          } else {
+            return seastar::now();
           }
-
-          return wait_write_exit();
         }).then([this] {
+#endif
+          ceph_assert_always(frame_assembler);
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} before Socket::connect()",
                            conn, get_state_name(state));
             abort_protocol();
           }
-          if (socket) {
-            gate.dispatch_in_background("close_sockect_connecting", *this,
-                           [sock = std::move(socket)] () mutable {
-              return sock->close().then([sock = std::move(sock)] {});
-            });
-          }
-          INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
           return Socket::connect(conn.peer_addr);
-        }).then([this](SocketRef sock) {
+        }).then([this](SocketRef new_socket) {
           logger().debug("{} socket connected", conn);
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} during Socket::connect()",
                            conn, get_state_name(state));
-            return sock->close().then([sock = std::move(sock)] {
+            return new_socket->close().then([sock=std::move(new_socket)] {
               abort_protocol();
             });
           }
-          socket = std::move(sock);
+          if (!has_socket) {
+            frame_assembler->set_socket(std::move(new_socket));
+            has_socket = true;
+          } else {
+            gate.dispatch_in_background(
+              "replace_socket_connecting",
+              conn,
+              [this, new_socket=std::move(new_socket)]() mutable {
+                return frame_assembler->replace_shutdown_socket(std::move(new_socket));
+              }
+            );
+          }
+          is_socket_valid = true;
           return seastar::now();
         }).then([this] {
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          session_stream_handlers = { nullptr, nullptr };
-          enable_recording();
+          frame_assembler->reset_handlers();
+          frame_assembler->start_recording();
           return banner_exchange(true);
         }).then([this] (auto&& ret) {
           auto [_peer_type, _my_addr_from_peer] = std::move(ret);
@@ -857,14 +840,15 @@ void ProtocolV2::execute_connecting()
             logger().warn("{} connection peer type does not match what peer advertises {} != {}",
                           conn, ceph_entity_type_name(conn.get_peer_type()),
                           ceph_entity_type_name(_peer_type));
-            abort_in_close(*this, true);
+            ABORT_IN_CLOSE(true);
           }
           if (unlikely(state != state_t::CONNECTING)) {
             logger().debug("{} triggered {} during banner_exchange(), abort",
                            conn, get_state_name(state));
             abort_protocol();
           }
-          socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
+          frame_assembler->learn_socket_ephemeral_port_as_connector(
+              _my_addr_from_peer.get_port());
           if (unlikely(_my_addr_from_peer.is_legacy())) {
             logger().warn("{} peer sent a legacy address for me: {}",
                           conn, _my_addr_from_peer);
@@ -872,8 +856,7 @@ void ProtocolV2::execute_connecting()
                 make_error_code(crimson::net::error::bad_peer_address));
           }
           _my_addr_from_peer.set_type(entity_addr_t::TYPE_MSGR2);
-          return messenger.learned_addr(_my_addr_from_peer, conn);
-        }).then([this] {
+          messenger.learned_addr(_my_addr_from_peer, conn);
           return client_auth();
         }).then([this] {
           if (server_cookie == 0) {
@@ -891,17 +874,25 @@ void ProtocolV2::execute_connecting()
           }
           switch (next) {
            case next_step_t::ready: {
-            logger().info("{} connected:"
-                          " gs={}, pgs={}, cs={}, client_cookie={},"
-                          " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+            logger().info("{} connected: gs={}, pgs={}, cs={}, "
+                          "client_cookie={}, server_cookie={}, {}",
                           conn, global_seq, peer_global_seq, connect_seq,
-                          client_cookie, server_cookie, conn.in_seq,
-                          conn.out_seq, conn.out_q.size());
-            execute_ready(true);
+                          client_cookie, server_cookie,
+                          io_stat_printer{io_handler});
+            io_handler.dispatch_connect();
+            if (unlikely(state != state_t::CONNECTING)) {
+              logger().debug("{} triggered {} after ms_handle_connect(), abort",
+                             conn, get_state_name(state));
+              abort_protocol();
+            }
+            execute_ready();
             break;
            }
            case next_step_t::wait: {
-            logger().info("{} execute_connecting(): going to WAIT", conn);
+            logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+            ceph_assert_always(is_socket_valid);
+            frame_assembler->shutdown_socket();
+            is_socket_valid = false;
             execute_wait(true);
             break;
            }
@@ -909,27 +900,8 @@ void ProtocolV2::execute_connecting()
             ceph_abort("impossible next step");
            }
           }
-        }).handle_exception([this] (std::exception_ptr eptr) {
-          if (state != state_t::CONNECTING) {
-            logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
-                          conn, get_state_name(state), eptr);
-            assert(state == state_t::CLOSING ||
-                   state == state_t::REPLACING);
-            return;
-          }
-
-          if (conn.policy.server ||
-              (conn.policy.standby &&
-               (!is_queued() && conn.sent.empty()))) {
-            logger().info("{} execute_connecting(): fault at {} with nothing to send,"
-                          " going to STANDBY -- {}",
-                          conn, get_state_name(state), eptr);
-            execute_standby();
-          } else {
-            logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
-                          conn, get_state_name(state), eptr);
-            execute_wait(false);
-          }
+        }).handle_exception([this](std::exception_ptr eptr) {
+          fault(state_t::CONNECTING, "execute_connecting", eptr);
         });
     });
 }
@@ -948,7 +920,8 @@ seastar::future<> ProtocolV2::_auth_bad_method(int r)
                 "allowed_methods={}, allowed_modes={})",
                 conn, auth_meta->auth_method, cpp_strerror(r),
                 allowed_methods, allowed_modes);
-  return write_frame(bad_method).then([this] {
+  return frame_assembler->write_flush_frame(bad_method
+  ).then([this] {
     return server_auth();
   });
 }
@@ -959,8 +932,12 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
   ceph_assert(messenger.get_auth_server());
   bufferlist reply;
   int r = messenger.get_auth_server()->handle_auth_request(
-      conn.shared_from_this(), auth_meta,
-      more, auth_meta->auth_method, auth_payload,
+      conn,
+      *auth_meta,
+      more,
+      auth_meta->auth_method,
+      auth_payload,
+      &conn.peer_global_id,
       &reply);
   switch (r) {
    // successful
@@ -970,10 +947,10 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
                    conn, conn.peer_global_id,
                    ceph_con_mode_name(auth_meta->con_mode), reply.length());
-    return write_frame(auth_done).then([this] {
+    return frame_assembler->write_flush_frame(auth_done
+    ).then([this] {
       ceph_assert(auth_meta);
-      session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
-          nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
+      frame_assembler->create_session_stream_handlers(*auth_meta, true);
       return finish_auth();
     });
    }
@@ -982,13 +959,14 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
     auto more = AuthReplyMoreFrame::Encode(reply);
     logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
                    conn, reply.length());
-    return write_frame(more).then([this] {
-      return read_main_preamble();
-    }).then([this] (Tag tag) {
-      expect_tag(Tag::AUTH_REQUEST_MORE, tag, conn, __func__);
-      return read_frame_payload();
-    }).then([this] {
-      auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+    return frame_assembler->write_flush_frame(more
+    ).then([this] {
+      return frame_assembler->read_main_preamble();
+    }).then([this](auto ret) {
+      expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, "read_auth_request_more");
+      return frame_assembler->read_frame_payload();
+    }).then([this](auto payload) {
+      auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
       logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
                      conn, auth_more.auth_payload().length());
       return _handle_auth_request(auth_more.auth_payload(), true);
@@ -1008,13 +986,13 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
 
 seastar::future<> ProtocolV2::server_auth()
 {
-  return read_main_preamble()
-  .then([this] (Tag tag) {
-    expect_tag(Tag::AUTH_REQUEST, tag, conn, __func__);
-    return read_frame_payload();
-  }).then([this] {
+  return frame_assembler->read_main_preamble(
+  ).then([this](auto ret) {
+    expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, "read_auth_request");
+    return frame_assembler->read_frame_payload();
+  }).then([this](auto payload) {
     // handle_auth_request() logic
-    auto request = AuthRequestFrame::Decode(rx_segments_data.back());
+    auto request = AuthRequestFrame::Decode(payload->back());
     logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
                    " payload_len={}",
                    conn, request.method(), request.preferred_modes(),
@@ -1050,7 +1028,8 @@ ProtocolV2::send_wait()
 {
   auto wait = WaitFrame::Encode();
   logger().debug("{} WRITE WaitFrame", conn);
-  return write_frame(wait).then([] {
+  return frame_assembler->write_flush_frame(wait
+  ).then([] {
     return next_step_t::wait;
   });
 }
@@ -1060,19 +1039,26 @@ ProtocolV2::reuse_connection(
     ProtocolV2* existing_proto, bool do_reset,
     bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
 {
+  if (unlikely(state != state_t::ACCEPTING)) {
+    logger().debug("{} triggered {} before trigger_replacing()",
+                   conn, get_state_name(state));
+    abort_protocol();
+  }
+
   existing_proto->trigger_replacing(reconnect,
                                     do_reset,
-                                    std::move(socket),
+                                    frame_assembler->to_replace(),
                                     std::move(auth_meta),
-                                    std::move(session_stream_handlers),
                                     peer_global_seq,
                                     client_cookie,
                                     conn.get_peer_name(),
-                                    connection_features,
-                                    tx_frame_asm.get_is_rev1(),
-                                    rx_frame_asm.get_is_rev1(),
+                                    conn.get_features(),
+                                    peer_supported_features,
                                     conn_seq,
                                     msg_seq);
+  ceph_assert_always(has_socket && is_socket_valid);
+  is_socket_valid = false;
+  has_socket = false;
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
     conn.interceptor->register_conn_replaced(conn);
@@ -1081,7 +1067,7 @@ ProtocolV2::reuse_connection(
   // close this connection because all the necessary information is delivered
   // to the exisiting connection, and jump to error handling code to abort the
   // current state.
-  abort_in_close(*this, false);
+  ABORT_IN_CLOSE(false);
   return seastar::make_ready_future<next_step_t>(next_step_t::none);
 }
 
@@ -1096,7 +1082,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
                  " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
                  conn, global_seq, peer_global_seq, connect_seq,
                  client_cookie, server_cookie,
-                 existing_conn, get_state_name(existing_proto->state),
+                 fmt::ptr(existing_conn.get()), get_state_name(existing_proto->state),
                  existing_proto->global_seq,
                  existing_proto->peer_global_seq,
                  existing_proto->connect_seq,
@@ -1105,7 +1091,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
 
   if (!validate_peer_name(existing_conn->get_peer_name())) {
     logger().error("{} server_connect: my peer_name doesn't match"
-                   " the existing connection {}, abort", conn, existing_conn);
+                   " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
     abort_in_fault();
   }
 
@@ -1131,7 +1117,12 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     logger().warn("{} server_connect:"
                   " existing connection {} is a lossy channel. Close existing in favor of"
                   " this connection", conn, *existing_conn);
-    execute_establishing(existing_conn, true);
+    if (unlikely(state != state_t::ACCEPTING)) {
+      logger().debug("{} triggered {} before execute_establishing()",
+                     conn, get_state_name(state));
+      abort_protocol();
+    }
+    execute_establishing(existing_conn);
     return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   }
 
@@ -1142,18 +1133,25 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
       // by replacing the socket
       logger().warn("{} server_connect:"
                     " found new session (cs={})"
-                    " when existing {} is with stale session (cs={}, ss={}),"
+                    " when existing {} {} is with stale session (cs={}, ss={}),"
                     " peer must have reset",
-                    conn, client_cookie,
-                    *existing_conn, existing_proto->client_cookie,
+                    conn,
+                    client_cookie,
+                    get_state_name(existing_proto->state),
+                    *existing_conn,
+                    existing_proto->client_cookie,
                     existing_proto->server_cookie);
       return reuse_connection(existing_proto, conn.policy.resetcheck);
     } else {
       // session establishment interrupted between client_ident and server_ident,
       // continuing...
-      logger().warn("{} server_connect: found client session with existing {}"
+      logger().warn("{} server_connect: found client session with existing {} {}"
                     " matched (cs={}, ss={}), continuing session establishment",
-                    conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+                    conn,
+                    get_state_name(existing_proto->state),
+                    *existing_conn,
+                    client_cookie,
+                    existing_proto->server_cookie);
       return reuse_connection(existing_proto);
     }
   } else {
@@ -1161,22 +1159,32 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     // each other at the same time.
     if (existing_proto->client_cookie != client_cookie) {
       if (existing_conn->peer_wins()) {
+        // acceptor (this connection, the peer) wins
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
-                      " and win, reusing existing {}",
-                      conn, client_cookie, existing_proto->client_cookie, *existing_conn);
+                      " and win, reusing existing {} {}",
+                      conn,
+                      client_cookie,
+                      existing_proto->client_cookie,
+                      get_state_name(existing_proto->state),
+                      *existing_conn);
         return reuse_connection(existing_proto);
       } else {
+        // acceptor (this connection, the peer) loses
         logger().warn("{} server_connect: connection race detected (cs={}, e_cs={}, ss=0)"
                       " and lose to existing {}, ask client to wait",
                       conn, client_cookie, existing_proto->client_cookie, *existing_conn);
-        return existing_conn->keepalive().then([this] {
+        return existing_conn->send_keepalive().then([this] {
           return send_wait();
         });
       }
     } else {
-      logger().warn("{} server_connect: found client session with existing {}"
+      logger().warn("{} server_connect: found client session with existing {} {}"
                     " matched (cs={}, ss={}), continuing session establishment",
-                    conn, *existing_conn, client_cookie, existing_proto->server_cookie);
+                    conn,
+                    get_state_name(existing_proto->state),
+                    *existing_conn,
+                    client_cookie,
+                    existing_proto->server_cookie);
       return reuse_connection(existing_proto);
     }
   }
@@ -1185,9 +1193,10 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::server_connect()
 {
-  return read_frame_payload().then([this] {
+  return frame_assembler->read_frame_payload(
+  ).then([this](auto payload) {
     // handle_client_ident() logic
-    auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
+    auto client_ident = ClientIdentFrame::Decode(payload->back());
     logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
                    " gid={}, gs={}, features_supported={},"
                    " features_required={}, flags={}, cookie={}",
@@ -1236,35 +1245,37 @@ ProtocolV2::server_connect()
       auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
       logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
                     conn, feat_missing);
-      return write_frame(ident_missing_features).then([] {
+      return frame_assembler->write_flush_frame(ident_missing_features
+      ).then([] {
         return next_step_t::wait;
       });
     }
-    connection_features =
-        client_ident.supported_features() & conn.policy.features_supported;
-    logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
+    conn.set_features(client_ident.supported_features() &
+                      conn.policy.features_supported);
+    logger().debug("{} UPDATE: features={}", conn, conn.get_features());
 
     peer_global_seq = client_ident.global_seq();
 
+    bool lossy = client_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
+    if (lossy != conn.policy.lossy) {
+      logger().warn("{} my lossy policy {} doesn't match client {}, ignore",
+                    conn, conn.policy.lossy, lossy);
+    }
+
     // Looks good so far, let's check if there is already an existing connection
     // to this peer.
 
     SocketConnectionRef existing_conn = messenger.lookup_conn(conn.peer_addr);
 
     if (existing_conn) {
-      if (existing_conn->protocol->proto_type != proto_t::v2) {
-        logger().warn("{} existing connection {} proto version is {}, close existing",
-                      conn, *existing_conn,
-                      static_cast<int>(existing_conn->protocol->proto_type));
-        // should unregister the existing from msgr atomically
-        // NOTE: this is following async messenger logic, but we may miss the reset event.
-        execute_establishing(existing_conn, false);
-        return seastar::make_ready_future<next_step_t>(next_step_t::ready);
-      } else {
-        return handle_existing_connection(existing_conn);
-      }
+      return handle_existing_connection(existing_conn);
     } else {
-      execute_establishing(nullptr, true);
+      if (unlikely(state != state_t::ACCEPTING)) {
+        logger().debug("{} triggered {} before execute_establishing()",
+                       conn, get_state_name(state));
+        abort_protocol();
+      }
+      execute_establishing(nullptr);
       return seastar::make_ready_future<next_step_t>(next_step_t::ready);
     }
   });
@@ -1273,9 +1284,9 @@ ProtocolV2::server_connect()
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::read_reconnect()
 {
-  return read_main_preamble()
-  .then([this] (Tag tag) {
-    expect_tag(Tag::SESSION_RECONNECT, tag, conn, "read_reconnect");
+  return frame_assembler->read_main_preamble(
+  ).then([this](auto ret) {
+    expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_session_reconnect");
     return server_reconnect();
   });
 }
@@ -1285,7 +1296,8 @@ ProtocolV2::send_retry(uint64_t connect_seq)
 {
   auto retry = RetryFrame::Encode(connect_seq);
   logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
-  return write_frame(retry).then([this] {
+  return frame_assembler->write_flush_frame(retry
+  ).then([this] {
     return read_reconnect();
   });
 }
@@ -1295,7 +1307,8 @@ ProtocolV2::send_retry_global(uint64_t global_seq)
 {
   auto retry = RetryGlobalFrame::Encode(global_seq);
   logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
-  return write_frame(retry).then([this] {
+  return frame_assembler->write_flush_frame(retry
+  ).then([this] {
     return read_reconnect();
   });
 }
@@ -1305,10 +1318,11 @@ ProtocolV2::send_reset(bool full)
 {
   auto reset = ResetFrame::Encode(full);
   logger().warn("{} WRITE ResetFrame: full={}", conn, full);
-  return write_frame(reset).then([this] {
-    return read_main_preamble();
-  }).then([this] (Tag tag) {
-    expect_tag(Tag::CLIENT_IDENT, tag, conn, "post_send_reset");
+  return frame_assembler->write_flush_frame(reset
+  ).then([this] {
+    return frame_assembler->read_main_preamble();
+  }).then([this](auto ret) {
+    expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
     return server_connect();
   });
 }
@@ -1316,9 +1330,10 @@ ProtocolV2::send_reset(bool full)
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::server_reconnect()
 {
-  return read_frame_payload().then([this] {
+  return frame_assembler->read_frame_payload(
+  ).then([this](auto payload) {
     // handle_reconnect() logic
-    auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
+    auto reconnect = ReconnectFrame::Decode(payload->back());
 
     logger().debug("{} GOT ReconnectFrame: addrs={}, client_cookie={},"
                    " server_cookie={}, gs={}, cs={}, msg_seq={}",
@@ -1358,16 +1373,6 @@ ProtocolV2::server_reconnect()
       return send_reset(true);
     }
 
-    if (existing_conn->protocol->proto_type != proto_t::v2) {
-      logger().warn("{} server_reconnect: existing connection {} proto version is {},"
-                    "close existing and reset client.",
-                    conn, *existing_conn,
-                    static_cast<int>(existing_conn->protocol->proto_type));
-      // NOTE: this is following async messenger logic, but we may miss the reset event.
-      existing_conn->mark_down();
-      return send_reset(true);
-    }
-
     ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
         existing_conn->protocol.get());
     ceph_assert(existing_proto);
@@ -1375,7 +1380,7 @@ ProtocolV2::server_reconnect()
                    " found existing {}(state={}, gs={}, pgs={}, cs={}, cc={}, sc={})",
                    conn, global_seq, peer_global_seq, reconnect.connect_seq(),
                    reconnect.client_cookie(), reconnect.server_cookie(),
-                   existing_conn,
+                   fmt::ptr(existing_conn.get()),
                    get_state_name(existing_proto->state),
                    existing_proto->global_seq,
                    existing_proto->peer_global_seq,
@@ -1385,7 +1390,7 @@ ProtocolV2::server_reconnect()
 
     if (!validate_peer_name(existing_conn->get_peer_name())) {
       logger().error("{} server_reconnect: my peer_name doesn't match"
-                     " the existing connection {}, abort", conn, existing_conn);
+                     " the existing connection {}, abort", conn, fmt::ptr(existing_conn.get()));
       abort_in_fault();
     }
 
@@ -1436,13 +1441,18 @@ ProtocolV2::server_reconnect()
     } else if (existing_proto->connect_seq == reconnect.connect_seq()) {
       // reconnect race: both peers are sending reconnect messages
       if (existing_conn->peer_wins()) {
+        // acceptor (this connection, the peer) wins
         logger().warn("{} server_reconnect: reconnect race detected (cs={})"
-                      " and win, reusing existing {}",
-                      conn, reconnect.connect_seq(), *existing_conn);
+                      " and win, reusing existing {} {}",
+                      conn,
+                      reconnect.connect_seq(),
+                      get_state_name(existing_proto->state),
+                      *existing_conn);
         return reuse_connection(
             existing_proto, false,
             true, reconnect.connect_seq(), reconnect.msg_seq());
       } else {
+        // acceptor (this connection, the peer) loses
         logger().warn("{} server_reconnect: reconnect race detected (cs={})"
                       " and lose to existing {}, ask client to wait",
                       conn, reconnect.connect_seq(), *existing_conn);
@@ -1450,9 +1460,12 @@ ProtocolV2::server_reconnect()
       }
     } else { // existing_proto->connect_seq < reconnect.connect_seq()
       logger().warn("{} server_reconnect: stale exsiting connect_seq exist_cs({}) < peer_cs({}),"
-                    " reusing existing {}",
-                    conn, existing_proto->connect_seq,
-                    reconnect.connect_seq(), *existing_conn);
+                    " reusing existing {} {}",
+                    conn,
+                    existing_proto->connect_seq,
+                    reconnect.connect_seq(),
+                    get_state_name(existing_proto->state),
+                    *existing_conn);
       return reuse_connection(
           existing_proto, false,
           true, reconnect.connect_seq(), reconnect.msg_seq());
@@ -1462,14 +1475,28 @@ ProtocolV2::server_reconnect()
 
 void ProtocolV2::execute_accepting()
 {
-  trigger_state(state_t::ACCEPTING, write_state_t::none, false);
-  gate.dispatch_in_background("execute_accepting", *this, [this] {
+  assert(is_socket_valid);
+  trigger_state(state_t::ACCEPTING, io_state_t::none, false);
+  gate.dispatch_in_background("execute_accepting", conn, [this] {
       return seastar::futurize_invoke([this] {
-          INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
+#ifdef UNIT_TESTS_BUILT
+          if (conn.interceptor) {
+            auto action = conn.interceptor->intercept(
+                conn, {custom_bp_t::SOCKET_ACCEPTED});
+            switch (action) {
+            case bp_action_t::CONTINUE:
+              break;
+            case bp_action_t::FAULT:
+              logger().info("[Test] got FAULT");
+              abort_in_fault();
+            default:
+              ceph_abort("unexpected action from trap");
+            }
+          }
+#endif
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
-          session_stream_handlers = { nullptr, nullptr };
-          session_comp_handlers = { nullptr, nullptr };
-          enable_recording();
+          frame_assembler->reset_handlers();
+          frame_assembler->start_recording();
           return banner_exchange(false);
         }).then([this] (auto&& ret) {
           auto [_peer_type, _my_addr_from_peer] = std::move(ret);
@@ -1490,19 +1517,18 @@ void ProtocolV2::execute_accepting()
             throw std::system_error(
                 make_error_code(crimson::net::error::bad_peer_address));
           }
-          return messenger.learned_addr(_my_addr_from_peer, conn);
-        }).then([this] {
+          messenger.learned_addr(_my_addr_from_peer, conn);
           return server_auth();
         }).then([this] {
-          return read_main_preamble();
-        }).then([this] (Tag tag) {
-          switch (tag) {
+          return frame_assembler->read_main_preamble();
+        }).then([this](auto ret) {
+          switch (ret.tag) {
             case Tag::CLIENT_IDENT:
               return server_connect();
             case Tag::SESSION_RECONNECT:
               return server_reconnect();
             default: {
-              unexpected_tag(tag, conn, "post_server_auth");
+              unexpected_tag(ret.tag, conn, "post_server_auth");
               return seastar::make_ready_future<next_step_t>(next_step_t::none);
             }
           }
@@ -1523,10 +1549,16 @@ void ProtocolV2::execute_accepting()
            default:
             ceph_abort("impossible next step");
           }
-        }).handle_exception([this] (std::exception_ptr eptr) {
+        }).handle_exception([this](std::exception_ptr eptr) {
+          const char *e_what;
+          try {
+            std::rethrow_exception(eptr);
+          } catch (std::exception &e) {
+            e_what = e.what();
+          }
           logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
-                        conn, get_state_name(state), eptr);
-          close(false);
+                        conn, get_state_name(state), e_what);
+          do_close(false);
         });
     });
 }
@@ -1537,21 +1569,20 @@ seastar::future<> ProtocolV2::finish_auth()
 {
   ceph_assert(auth_meta);
 
+  auto records = frame_assembler->stop_recording();
   const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
-    auth_meta->session_key.hmac_sha256(nullptr, rxbuf);
+    auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
   auto sig_frame = AuthSignatureFrame::Encode(sig);
-  ceph_assert(record_io);
-  record_io = false;
-  rxbuf.clear();
   logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
-  return write_frame(sig_frame).then([this] {
-    return read_main_preamble();
-  }).then([this] (Tag tag) {
-    expect_tag(Tag::AUTH_SIGNATURE, tag, conn, "post_finish_auth");
-    return read_frame_payload();
-  }).then([this] {
+  return frame_assembler->write_flush_frame(sig_frame
+  ).then([this] {
+    return frame_assembler->read_main_preamble();
+  }).then([this](auto ret) {
+    expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
+    return frame_assembler->read_frame_payload();
+  }).then([this, txbuf=std::move(records.txbuf)](auto payload) {
     // handle_auth_signature() logic
-    auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+    auto sig_frame = AuthSignatureFrame::Decode(payload->back());
     logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
 
     const auto actual_tx_sig = auth_meta->session_key.empty() ?
@@ -1562,20 +1593,12 @@ seastar::future<> ProtocolV2::finish_auth()
                     conn, actual_tx_sig, sig_frame.signature());
       abort_in_fault();
     }
-    txbuf.clear();
   });
 }
 
 // ESTABLISHING
 
-void ProtocolV2::execute_establishing(
-    SocketConnectionRef existing_conn, bool dispatch_reset) {
-  if (unlikely(state != state_t::ACCEPTING)) {
-    logger().debug("{} triggered {} before execute_establishing()",
-                   conn, get_state_name(state));
-    abort_protocol();
-  }
-
+void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
   auto accept_me = [this] {
     messenger.register_conn(
       seastar::static_pointer_cast<SocketConnection>(
@@ -1585,9 +1608,11 @@ void ProtocolV2::execute_establishing(
         conn.shared_from_this()));
   };
 
-  trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
+  ceph_assert_always(is_socket_valid);
+  trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
   if (existing_conn) {
-    existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+    static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
+        true /* is_dispatch_reset */, std::move(accept_me));
     if (unlikely(state != state_t::ESTABLISHING)) {
       logger().warn("{} triggered {} during execute_establishing(), "
                     "the accept event will not be delivered!",
@@ -1598,10 +1623,14 @@ void ProtocolV2::execute_establishing(
     accept_me();
   }
 
-  dispatchers.ms_handle_accept(
-      seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  io_handler.dispatch_accept();
+  if (unlikely(state != state_t::ESTABLISHING)) {
+    logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
+                   conn, get_state_name(state));
+    abort_protocol();
+  }
 
-  gated_execute("execute_establishing", [this] {
+  gated_execute("execute_establishing", conn, [this] {
     return seastar::futurize_invoke([this] {
       return send_server_ident();
     }).then([this] {
@@ -1610,21 +1639,14 @@ void ProtocolV2::execute_establishing(
                        conn, get_state_name(state));
         abort_protocol();
       }
-      logger().info("{} established: gs={}, pgs={}, cs={}, client_cookie={},"
-                    " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
+      logger().info("{} established: gs={}, pgs={}, cs={}, "
+                    "client_cookie={}, server_cookie={}, {}",
                     conn, global_seq, peer_global_seq, connect_seq,
-                    client_cookie, server_cookie, conn.in_seq,
-                    conn.out_seq, conn.out_q.size());
-      execute_ready(false);
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      if (state != state_t::ESTABLISHING) {
-        logger().info("{} execute_establishing() protocol aborted at {} -- {}",
-                      conn, get_state_name(state), eptr);
-        assert(state == state_t::CLOSING ||
-               state == state_t::REPLACING);
-        return;
-      }
-      fault(false, "execute_establishing()", eptr);
+                    client_cookie, server_cookie,
+                    io_stat_printer{io_handler});
+      execute_ready();
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::ESTABLISHING, "execute_establishing", eptr);
     });
   });
 }
@@ -1637,127 +1659,134 @@ ProtocolV2::send_server_ident()
   // send_server_ident() logic
 
   // refered to async-conn v2: not assign gs to global_seq
-  return messenger.get_global_seq().then([this] (auto gs) {
-    logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
+  global_seq = messenger.get_global_seq();
+  logger().debug("{} UPDATE: gs={} for server ident", conn, global_seq);
 
-    // this is required for the case when this connection is being replaced
-    requeue_up_to(0);
-    conn.in_seq = 0;
+  // this is required for the case when this connection is being replaced
+  io_handler.requeue_out_sent_up_to(0);
+  io_handler.reset_session(false);
 
-    if (!conn.policy.lossy) {
-      server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
-    }
+  if (!conn.policy.lossy) {
+    server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
+  }
 
-    uint64_t flags = 0;
-    if (conn.policy.lossy) {
-      flags = flags | CEPH_MSG_CONNECT_LOSSY;
-    }
+  uint64_t flags = 0;
+  if (conn.policy.lossy) {
+    flags = flags | CEPH_MSG_CONNECT_LOSSY;
+  }
 
-    auto server_ident = ServerIdentFrame::Encode(
-            messenger.get_myaddrs(),
-            messenger.get_myname().num(),
-            gs,
-            conn.policy.features_supported,
-            conn.policy.features_required | msgr2_required,
-            flags,
-            server_cookie);
-
-    logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
-                   " gs={}, features_supported={}, features_required={},"
-                   " flags={}, cookie={}",
-                   conn, messenger.get_myaddrs(), messenger.get_myname().num(),
-                   gs, conn.policy.features_supported,
-                   conn.policy.features_required | msgr2_required,
-                   flags, server_cookie);
-
-    conn.set_features(connection_features);
-
-    return write_frame(server_ident);
-  });
+  auto server_ident = ServerIdentFrame::Encode(
+          messenger.get_myaddrs(),
+          messenger.get_myname().num(),
+          global_seq,
+          conn.policy.features_supported,
+          conn.policy.features_required | msgr2_required,
+          flags,
+          server_cookie);
+
+  logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+                 " gs={}, features_supported={}, features_required={},"
+                 " flags={}, cookie={}",
+                 conn, messenger.get_myaddrs(), messenger.get_myname().num(),
+                 global_seq, conn.policy.features_supported,
+                 conn.policy.features_required | msgr2_required,
+                 flags, server_cookie);
+
+  return frame_assembler->write_flush_frame(server_ident);
 }
 
 // REPLACING state
 
 void ProtocolV2::trigger_replacing(bool reconnect,
                                    bool do_reset,
-                                   SocketRef&& new_socket,
+                                   FrameAssemblerV2::mover_t &&mover,
                                    AuthConnectionMetaRef&& new_auth_meta,
-                                   ceph::crypto::onwire::rxtx_t new_rxtx,
                                    uint64_t new_peer_global_seq,
                                    uint64_t new_client_cookie,
                                    entity_name_t new_peer_name,
                                    uint64_t new_conn_features,
-                                   bool tx_is_rev1,
-                                   bool rx_is_rev1,
+                                   uint64_t new_peer_supported_features,
                                    uint64_t new_connect_seq,
                                    uint64_t new_msg_seq)
 {
-  trigger_state(state_t::REPLACING, write_state_t::delay, false);
-  if (socket) {
-    socket->shutdown();
+  ceph_assert_always(has_socket || state == state_t::CONNECTING);
+  ceph_assert_always(!mover.socket->is_shutdown());
+  trigger_state(state_t::REPLACING, io_state_t::delay, false);
+  if (is_socket_valid) {
+    frame_assembler->shutdown_socket();
+    is_socket_valid = false;
   }
-  dispatchers.ms_handle_accept(
-      seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  gate.dispatch_in_background("trigger_replacing", *this,
-                 [this,
-                  reconnect,
-                  do_reset,
-                  new_socket = std::move(new_socket),
-                  new_auth_meta = std::move(new_auth_meta),
-                  new_rxtx = std::move(new_rxtx),
-                  tx_is_rev1, rx_is_rev1,
-                  new_client_cookie, new_peer_name,
-                  new_conn_features, new_peer_global_seq,
-                  new_connect_seq, new_msg_seq] () mutable {
-    return wait_write_exit().then([this, do_reset] {
-      if (do_reset) {
-        reset_session(true);
-      }
+  gate.dispatch_in_background(
+      "trigger_replacing",
+      conn,
+      [this,
+       reconnect,
+       do_reset,
+       mover = std::move(mover),
+       new_auth_meta = std::move(new_auth_meta),
+       new_client_cookie, new_peer_name,
+       new_conn_features, new_peer_supported_features,
+       new_peer_global_seq,
+       new_connect_seq, new_msg_seq] () mutable {
+    ceph_assert_always(state == state_t::REPLACING);
+    io_handler.dispatch_accept();
+    // state may become CLOSING, close mover.socket and abort later
+    return wait_exit_io(
+    ).then([this] {
+      ceph_assert_always(frame_assembler);
       protocol_timer.cancel();
-      return execution_done.get_future();
+      auto done = std::move(execution_done);
+      execution_done = seastar::now();
+      return done;
     }).then([this,
              reconnect,
-             new_socket = std::move(new_socket),
+             do_reset,
+             mover = std::move(mover),
              new_auth_meta = std::move(new_auth_meta),
-             new_rxtx = std::move(new_rxtx),
-             tx_is_rev1, rx_is_rev1,
              new_client_cookie, new_peer_name,
-             new_conn_features, new_peer_global_seq,
+             new_conn_features, new_peer_supported_features,
+             new_peer_global_seq,
              new_connect_seq, new_msg_seq] () mutable {
+      if (state == state_t::REPLACING && do_reset) {
+        reset_session(true);
+      }
+
       if (unlikely(state != state_t::REPLACING)) {
-        return new_socket->close().then([sock = std::move(new_socket)] {
+        return mover.socket->close(
+        ).then([sock = std::move(mover.socket)] {
           abort_protocol();
         });
       }
 
-      if (socket) {
-        gate.dispatch_in_background("close_socket_replacing", *this,
-                       [sock = std::move(socket)] () mutable {
-          return sock->close().then([sock = std::move(sock)] {});
-        });
-      }
-      socket = std::move(new_socket);
       auth_meta = std::move(new_auth_meta);
-      session_stream_handlers = std::move(new_rxtx);
-      record_io = false;
       peer_global_seq = new_peer_global_seq;
+      gate.dispatch_in_background(
+        "replace_frame_assembler",
+        conn,
+        [this, mover=std::move(mover)]() mutable {
+          return frame_assembler->replace_by(std::move(mover));
+        }
+      );
+      is_socket_valid = true;
+      has_socket = true;
 
       if (reconnect) {
         connect_seq = new_connect_seq;
         // send_reconnect_ok() logic
-        requeue_up_to(new_msg_seq);
-        auto reconnect_ok = ReconnectOkFrame::Encode(conn.in_seq);
-        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, conn.in_seq);
-        return write_frame(reconnect_ok);
+        io_handler.requeue_out_sent_up_to(new_msg_seq);
+        auto reconnect_ok = ReconnectOkFrame::Encode(io_handler.get_in_seq());
+        logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, io_handler.get_in_seq());
+        return frame_assembler->write_flush_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
         assert(conn.get_peer_type() == new_peer_name.type());
         if (conn.get_peer_id() == entity_name_t::NEW) {
           conn.set_peer_id(new_peer_name.num());
         }
-        connection_features = new_conn_features;
-        tx_frame_asm.set_is_rev1(tx_is_rev1);
-        rx_frame_asm.set_is_rev1(rx_is_rev1);
+        conn.set_features(new_conn_features);
+        peer_supported_features = new_peer_supported_features;
+        bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+        frame_assembler->set_is_rev1(is_rev1);
         return send_server_ident();
       }
     }).then([this, reconnect] {
@@ -1766,279 +1795,46 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                        conn, get_state_name(state));
         abort_protocol();
       }
-      logger().info("{} replaced ({}):"
-                    " gs={}, pgs={}, cs={}, client_cookie={}, server_cookie={},"
-                    " in_seq={}, out_seq={}, out_q={}",
+      logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+                    "client_cookie={}, server_cookie={}, {}",
                     conn, reconnect ? "reconnected" : "connected",
-                    global_seq, peer_global_seq, connect_seq, client_cookie,
-                    server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
-      execute_ready(false);
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      if (state != state_t::REPLACING) {
-        logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
-                      conn, get_state_name(state), eptr);
-        assert(state == state_t::CLOSING);
-        return;
-      }
-      fault(true, "trigger_replacing()", eptr);
+                    global_seq, peer_global_seq, connect_seq,
+                    client_cookie, server_cookie,
+                    io_stat_printer{io_handler});
+      execute_ready();
+    }).handle_exception([this](std::exception_ptr eptr) {
+      fault(state_t::REPLACING, "trigger_replacing", eptr);
     });
   });
 }
 
 // READY state
 
-ceph::bufferlist ProtocolV2::do_sweep_messages(
-    const std::deque<MessageURef>& msgs,
-    size_t num_msgs,
-    bool require_keepalive,
-    std::optional<utime_t> _keepalive_ack,
-    bool require_ack)
+void ProtocolV2::notify_out_fault(const char *where, std::exception_ptr eptr)
 {
-  ceph::bufferlist bl;
-
-  if (unlikely(require_keepalive)) {
-    auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(keepalive_frame.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
-  }
-
-  if (unlikely(_keepalive_ack.has_value())) {
-    auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
-    bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
-  }
-
-  if (require_ack && num_msgs == 0u) {
-    auto ack_frame = AckFrame::Encode(conn.in_seq);
-    bl.append(ack_frame.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
-  }
-
-  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
-    // TODO: move to common code
-    // set priority
-    msg->get_header().src = messenger.get_myname();
-
-    msg->encode(conn.features, 0);
-
-    ceph_assert(!msg->get_seq() && "message already has seq");
-    msg->set_seq(++conn.out_seq);
-
-    ceph_msg_header &header = msg->get_header();
-    ceph_msg_footer &footer = msg->get_footer();
-
-    ceph_msg_header2 header2{header.seq,        header.tid,
-                             header.type,       header.priority,
-                             header.version,
-                             ceph_le32(0),      header.data_off,
-                             ceph_le64(conn.in_seq),
-                             footer.flags,      header.compat_version,
-                             header.reserved};
-
-    auto message = MessageFrame::Encode(header2,
-        msg->get_payload(), msg->get_middle(), msg->get_data());
-    logger().debug("{} --> #{} === {} ({})",
-                  conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(message.get_buffer(tx_frame_asm));
-    INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
-  });
-
-  return bl;
+  fault(state_t::READY, where, eptr);
 }
 
-seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
-{
-  return read_frame_payload()
-  .then([this, throttle_stamp] {
-    utime_t recv_stamp{seastar::lowres_system_clock::now()};
-
-    // we need to get the size before std::moving segments data
-    const size_t cur_msg_size = get_current_msg_size();
-    auto msg_frame = MessageFrame::Decode(rx_segments_data);
-    // XXX: paranoid copy just to avoid oops
-    ceph_msg_header2 current_header = msg_frame.header();
-
-    logger().trace("{} got {} + {} + {} byte message,"
-                   " envelope type={} src={} off={} seq={}",
-                   conn, msg_frame.front_len(), msg_frame.middle_len(),
-                   msg_frame.data_len(), current_header.type, conn.get_peer_name(),
-                   current_header.data_off, current_header.seq);
-
-    ceph_msg_header header{current_header.seq,
-                           current_header.tid,
-                           current_header.type,
-                           current_header.priority,
-                           current_header.version,
-                           ceph_le32(msg_frame.front_len()),
-                           ceph_le32(msg_frame.middle_len()),
-                           ceph_le32(msg_frame.data_len()),
-                           current_header.data_off,
-                           conn.get_peer_name(),
-                           current_header.compat_version,
-                           current_header.reserved,
-                           ceph_le32(0)};
-    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
-                           ceph_le32(0), ceph_le64(0), current_header.flags};
-
-    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
-        conn.shared_from_this());
-    Message *message = decode_message(nullptr, 0, header, footer,
-        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
-    if (!message) {
-      logger().warn("{} decode message failed", conn);
-      abort_in_fault();
-    }
-
-    // store reservation size in message, so we don't get confused
-    // by messages entering the dispatch queue through other paths.
-    message->set_dispatch_throttle_size(cur_msg_size);
-
-    message->set_throttle_stamp(throttle_stamp);
-    message->set_recv_stamp(recv_stamp);
-    message->set_recv_complete_stamp(utime_t{seastar::lowres_system_clock::now()});
-
-    // check received seq#.  if it is old, drop the message.
-    // note that incoming messages may skip ahead.  this is convenient for the
-    // client side queueing because messages can't be renumbered, but the (kernel)
-    // client will occasionally pull a message out of the sent queue to send
-    // elsewhere.  in that case it doesn't matter if we "got" it or not.
-    uint64_t cur_seq = conn.in_seq;
-    if (message->get_seq() <= cur_seq) {
-      logger().error("{} got old message {} <= {} {}, discarding",
-                     conn, message->get_seq(), cur_seq, *message);
-      if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
-          local_conf()->ms_die_on_old_message) {
-        ceph_assert(0 == "old msgs despite reconnect_seq feature");
-      }
-      return seastar::now();
-    } else if (message->get_seq() > cur_seq + 1) {
-      logger().error("{} missed message? skipped from seq {} to {}",
-                     conn, cur_seq, message->get_seq());
-      if (local_conf()->ms_die_on_skipped_message) {
-        ceph_assert(0 == "skipped incoming seq");
-      }
-    }
-
-    // note last received message.
-    conn.in_seq = message->get_seq();
-    logger().debug("{} <== #{} === {} ({})",
-                  conn, message->get_seq(), *message, message->get_type());
-    notify_ack();
-    ack_writes(current_header.ack_seq);
-
-    // TODO: change MessageRef with seastar::shared_ptr
-    auto msg_ref = MessageRef{message, false};
-    // throttle the reading process by the returned future
-    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
-  });
-}
-
-void ProtocolV2::execute_ready(bool dispatch_connect)
+void ProtocolV2::execute_ready()
 {
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
-  trigger_state(state_t::READY, write_state_t::open, false);
-  if (dispatch_connect) {
-    dispatchers.ms_handle_connect(
-       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }
-#ifdef UNIT_TESTS_BUILT
-  if (conn.interceptor) {
-    conn.interceptor->register_conn_ready(conn);
-  }
-#endif
-  gated_execute("execute_ready", [this] {
-    protocol_timer.cancel();
-    return seastar::keep_doing([this] {
-      return read_main_preamble()
-      .then([this] (Tag tag) {
-        switch (tag) {
-          case Tag::MESSAGE: {
-            return seastar::futurize_invoke([this] {
-              // throttle_message() logic
-              if (!conn.policy.throttler_messages) {
-                return seastar::now();
-              }
-              // TODO: message throttler
-              ceph_assert(false);
-              return seastar::now();
-            }).then([this] {
-              // throttle_bytes() logic
-              if (!conn.policy.throttler_bytes) {
-                return seastar::now();
-              }
-              size_t cur_msg_size = get_current_msg_size();
-              if (!cur_msg_size) {
-                return seastar::now();
-              }
-              logger().trace("{} wants {} bytes from policy throttler {}/{}",
-                             conn, cur_msg_size,
-                             conn.policy.throttler_bytes->get_current(),
-                             conn.policy.throttler_bytes->get_max());
-              return conn.policy.throttler_bytes->get(cur_msg_size);
-            }).then([this] {
-              // TODO: throttle_dispatch_queue() logic
-              utime_t throttle_stamp{seastar::lowres_system_clock::now()};
-              return read_message(throttle_stamp);
-            });
-          }
-          case Tag::ACK:
-            return read_frame_payload().then([this] {
-              // handle_message_ack() logic
-              auto ack = AckFrame::Decode(rx_segments_data.back());
-              logger().debug("{} GOT AckFrame: seq={}", conn, ack.seq());
-              ack_writes(ack.seq());
-            });
-          case Tag::KEEPALIVE2:
-            return read_frame_payload().then([this] {
-              // handle_keepalive2() logic
-              auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
-              logger().debug("{} GOT KeepAliveFrame: timestamp={}",
-                             conn, keepalive_frame.timestamp());
-              notify_keepalive_ack(keepalive_frame.timestamp());
-              conn.set_last_keepalive(seastar::lowres_system_clock::now());
-            });
-          case Tag::KEEPALIVE2_ACK:
-            return read_frame_payload().then([this] {
-              // handle_keepalive2_ack() logic
-              auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
-              conn.set_last_keepalive_ack(
-                seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
-              logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
-                             conn, conn.last_keepalive_ack);
-            });
-          default: {
-            unexpected_tag(tag, conn, "execute_ready");
-            return seastar::now();
-          }
-        }
-      });
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      if (state != state_t::READY) {
-        logger().info("{} execute_ready(): protocol aborted at {} -- {}",
-                      conn, get_state_name(state), eptr);
-        assert(state == state_t::REPLACING ||
-               state == state_t::CLOSING);
-        return;
-      }
-      fault(false, "execute_ready()", eptr);
-    });
-  });
+  protocol_timer.cancel();
+  ceph_assert_always(is_socket_valid);
+  trigger_state(state_t::READY, io_state_t::open, false);
 }
 
 // STANDBY state
 
 void ProtocolV2::execute_standby()
 {
-  trigger_state(state_t::STANDBY, write_state_t::delay, true);
-  if (socket) {
-    socket->shutdown();
-  }
+  ceph_assert_always(!is_socket_valid);
+  trigger_state(state_t::STANDBY, io_state_t::delay, false);
 }
 
-void ProtocolV2::notify_write()
+void ProtocolV2::notify_out()
 {
   if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
-    logger().info("{} notify_write(): at {}, going to CONNECTING",
+    logger().info("{} notify_out(): at {}, going to CONNECTING",
                   conn, get_state_name(state));
     execute_connecting();
   }
@@ -2048,11 +1844,9 @@ void ProtocolV2::notify_write()
 
 void ProtocolV2::execute_wait(bool max_backoff)
 {
-  trigger_state(state_t::WAIT, write_state_t::delay, true);
-  if (socket) {
-    socket->shutdown();
-  }
-  gated_execute("execute_wait", [this, max_backoff] {
+  ceph_assert_always(!is_socket_valid);
+  trigger_state(state_t::WAIT, io_state_t::delay, false);
+  gated_execute("execute_wait", conn, [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
       backoff = local_conf().get_val<double>("ms_max_backoff");
@@ -2069,9 +1863,15 @@ void ProtocolV2::execute_wait(bool max_backoff)
       }
       logger().info("{} execute_wait(): going to CONNECTING", conn);
       execute_connecting();
-    }).handle_exception([this] (std::exception_ptr eptr) {
+    }).handle_exception([this](std::exception_ptr eptr) {
+      const char *e_what;
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        e_what = e.what();
+      }
       logger().info("{} execute_wait(): protocol aborted at {} -- {}",
-                    conn, get_state_name(state), eptr);
+                    conn, get_state_name(state), e_what);
       assert(state == state_t::REPLACING ||
              state == state_t::CLOSING);
     });
@@ -2082,27 +1882,76 @@ void ProtocolV2::execute_wait(bool max_backoff)
 
 void ProtocolV2::execute_server_wait()
 {
-  trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
-  gated_execute("execute_server_wait", [this] {
-    return read_exactly(1).then([this] (auto bl) {
+  ceph_assert_always(is_socket_valid);
+  trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
+  gated_execute("execute_server_wait", conn, [this] {
+    return frame_assembler->read_exactly(1
+    ).then([this](auto bl) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
-    }).handle_exception([this] (std::exception_ptr eptr) {
+    }).handle_exception([this](std::exception_ptr eptr) {
+      const char *e_what;
+      try {
+        std::rethrow_exception(eptr);
+      } catch (std::exception &e) {
+        e_what = e.what();
+      }
       logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
-                    conn, get_state_name(state), eptr);
-      close(false);
+                    conn, get_state_name(state), e_what);
+      do_close(false);
     });
   });
 }
 
 // CLOSING state
 
-void ProtocolV2::trigger_close()
+void ProtocolV2::notify_mark_down()
 {
+  do_close(false);
+}
+
+seastar::future<> ProtocolV2::close_clean_yielded()
+{
+  // yield() so that do_close() can be called *after* close_clean_yielded() is
+  // applied to all connections in a container using
+  // seastar::parallel_for_each(). otherwise, we could erase a connection in
+  // the container when seastar::parallel_for_each() is still iterating in it.
+  // that'd lead to a segfault.
+  return seastar::yield(
+  ).then([this, conn_ref = conn.shared_from_this()] {
+    do_close(false);
+    // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
+    // which will otherwise result in deadlock
+    assert(closed_clean_fut.valid());
+    return closed_clean_fut.get_future();
+  });
+}
+
+void ProtocolV2::do_close(
+    bool is_dispatch_reset,
+    std::optional<std::function<void()>> f_accept_new)
+{
+  if (closed) {
+    // already closing
+    assert(state == state_t::CLOSING);
+    return;
+  }
+
+  bool is_replace = f_accept_new ? true : false;
+  logger().info("{} closing: reset {}, replace {}", conn,
+                is_dispatch_reset ? "yes" : "no",
+                is_replace ? "yes" : "no");
+
+  /*
+   * atomic operations
+   */
+
+  closed = true;
+
+  // trigger close
   messenger.closing_conn(
       seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this()));
-
   if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
     messenger.unaccept_conn(
       seastar::static_pointer_cast<SocketConnection>(
@@ -2115,21 +1964,49 @@ void ProtocolV2::trigger_close()
     // cannot happen
     ceph_assert(false);
   }
-
   protocol_timer.cancel();
-  trigger_state(state_t::CLOSING, write_state_t::drop, false);
-}
+  trigger_state(state_t::CLOSING, io_state_t::drop, false);
 
-void ProtocolV2::on_closed()
-{
-  messenger.closed_conn(
-      seastar::static_pointer_cast<SocketConnection>(
-       conn.shared_from_this()));
-}
-
-void ProtocolV2::print(std::ostream& out) const
-{
-  out << conn;
+  if (f_accept_new) {
+    (*f_accept_new)();
+  }
+  if (is_socket_valid) {
+    frame_assembler->shutdown_socket();
+    is_socket_valid = false;
+  }
+  assert(!gate.is_closed());
+  auto handshake_closed = gate.close();
+  auto io_closed = io_handler.close_io(
+      is_dispatch_reset, is_replace);
+
+  // asynchronous operations
+  assert(!closed_clean_fut.valid());
+  closed_clean_fut = seastar::when_all(
+      std::move(handshake_closed), std::move(io_closed)
+  ).discard_result().then([this] {
+    ceph_assert_always(!exit_io.has_value());
+    if (has_socket) {
+      ceph_assert_always(frame_assembler);
+      return frame_assembler->close_shutdown_socket();
+    } else {
+      return seastar::now();
+    }
+  }).then([this] {
+    logger().debug("{} closed!", conn);
+    messenger.closed_conn(
+        seastar::static_pointer_cast<SocketConnection>(
+          conn.shared_from_this()));
+#ifdef UNIT_TESTS_BUILT
+    closed_clean = true;
+    if (conn.interceptor) {
+      conn.interceptor->register_conn_closed(conn);
+    }
+#endif
+  }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+    logger().error("{} closing: closed_clean_fut got unexpected exception {}",
+                   conn, eptr);
+    ceph_abort();
+  });
 }
 
 } // namespace crimson::net