]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/ProtocolV2.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
index d68dba174ce593964405cba3edecc9d275fba838..fa47da50832c0956441af3af05033116d66ead35 100644 (file)
@@ -5,19 +5,14 @@
 
 #include <seastar/core/lowres_clock.hh>
 #include <fmt/format.h>
-#if FMT_VERSION >= 60000
-#include <fmt/chrono.h>
-#else
-#include <fmt/time.h>
-#endif
 #include "include/msgr.h"
 #include "include/random.h"
 
 #include "crimson/auth/AuthClient.h"
 #include "crimson/auth/AuthServer.h"
+#include "crimson/common/formatter.h"
 
-#include "Config.h"
-#include "Dispatcher.h"
+#include "chained_dispatchers.h"
 #include "Errors.h"
 #include "Socket.h"
 #include "SocketConnection.h"
 #endif
 
 using namespace ceph::msgr::v2;
+using crimson::common::local_conf;
 
 namespace {
 
-// TODO: apply the same logging policy to Protocol V1
+// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
+const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
+  (CEPH_MSGR2_FEATURE_REVISION_1 |
+   // CEPH_MSGR2_FEATURE_COMPRESSION |
+   UINT64_C(0));
+
 // Log levels in V2 Protocol:
 // * error level, something error that cause connection to terminate:
 //   - fatal errors;
@@ -58,16 +59,16 @@ seastar::logger& logger() {
   return crimson::get_logger(ceph_subsys_ms);
 }
 
-void abort_in_fault() {
+[[noreturn]] void abort_in_fault() {
   throw std::system_error(make_error_code(crimson::net::error::negotiation_failure));
 }
 
-void abort_protocol() {
+[[noreturn]] void abort_protocol() {
   throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
 }
 
-void abort_in_close(crimson::net::ProtocolV2& proto) {
-  (void) proto.close();
+[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
+  proto.close(dispatch_reset);
   abort_protocol();
 }
 
@@ -99,32 +100,6 @@ inline uint64_t generate_client_cookie() {
 
 } // namespace anonymous
 
-template <>
-struct fmt::formatter<seastar::lowres_system_clock::time_point> {
-  // ignore the format string
-  template <typename ParseContext>
-  constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
-
-  template <typename FormatContext>
-  auto format(const seastar::lowres_system_clock::time_point& t,
-             FormatContext& ctx) {
-    std::time_t tt = std::chrono::duration_cast<std::chrono::seconds>(
-      t.time_since_epoch()).count();
-    auto milliseconds = (t.time_since_epoch() %
-                        std::chrono::seconds(1)).count();
-    return fmt::format_to(ctx.out(), "{:%Y-%m-%d %H:%M:%S} {:03d}",
-                         fmt::localtime(tt), milliseconds);
-  }
-};
-
-namespace std {
-inline ostream& operator<<(
-  ostream& out, const seastar::lowres_system_clock::time_point& t)
-{
-  return out << fmt::format("{}", t);
-}
-}
-
 namespace crimson::net {
 
 #ifdef UNIT_TESTS_BUILT
@@ -173,30 +148,36 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds)
   });
 }
 
-ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
+ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers,
                        SocketConnection& conn,
                        SocketMessenger& messenger)
-  : Protocol(proto_t::v2, dispatcher, conn),
+  : Protocol(proto_t::v2, dispatchers, conn),
     messenger{messenger},
-    protocol_timer{conn},
-    tx_frame_asm(&session_stream_handlers, false)
+    protocol_timer{conn}
 {}
 
 ProtocolV2::~ProtocolV2() {}
 
+bool ProtocolV2::is_connected() const {
+  return state == state_t::READY ||
+         state == state_t::ESTABLISHING ||
+         state == state_t::REPLACING;
+}
+
 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
-                               const entity_type_t& _peer_type)
+                               const entity_name_t& _peer_name)
 {
   ceph_assert(state == state_t::NONE);
   ceph_assert(!socket);
+  ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
   conn.target_addr = _peer_addr;
-  conn.set_peer_type(_peer_type);
-  conn.policy = messenger.get_policy(_peer_type);
+  conn.set_peer_name(_peer_name);
+  conn.policy = messenger.get_policy(_peer_name.type());
   client_cookie = generate_client_cookie();
-  logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
+  logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
                 " policy(lossy={}, server={}, standby={}, resetcheck={})",
-                conn, _peer_addr, ceph_entity_type_name(_peer_type), client_cookie,
+                conn, _peer_addr, _peer_name, client_cookie,
                 conn.policy.lossy, conn.policy.server,
                 conn.policy.standby, conn.policy.resetcheck);
   messenger.register_conn(
@@ -211,8 +192,6 @@ void ProtocolV2::start_accept(SocketRef&& sock,
   ceph_assert(!socket);
   // until we know better
   conn.target_addr = _peer_addr;
-  conn.set_ephemeral_port(_peer_addr.get_port(),
-                          SocketConnection::side_t::acceptor);
   socket = std::move(sock);
   logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
   messenger.accept_conn(
@@ -273,137 +252,77 @@ seastar::future<> ProtocolV2::write_flush(bufferlist&& buf)
 
 size_t ProtocolV2::get_current_msg_size() const
 {
-  ceph_assert(!rx_segments_desc.empty());
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
   size_t sum = 0;
   // we don't include SegmentIndex::Msg::HEADER.
-  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
-    sum += rx_segments_desc[idx].length;
+  for (size_t idx = 1; idx < rx_frame_asm.get_num_segments(); idx++) {
+    sum += rx_frame_asm.get_segment_logical_len(idx);
   }
   return sum;
 }
 
 seastar::future<Tag> ProtocolV2::read_main_preamble()
 {
-  return read_exactly(sizeof(preamble_block_t))
+  rx_preamble.clear();
+  return read_exactly(rx_frame_asm.get_preamble_onwire_len())
     .then([this] (auto bl) {
-      if (session_stream_handlers.rx) {
-        session_stream_handlers.rx->reset_rx_handler();
-        /*
-        bl = session_stream_handlers.rx->authenticated_decrypt_update(
-            std::move(bl), segment_t::DEFAULT_ALIGNMENT);
-        */
-      }
-
-      // I expect ceph_le32 will make the endian conversion for me. Passing
-      // everything through ::Decode is unnecessary.
-      const auto& main_preamble = \
-        *reinterpret_cast<const preamble_block_t*>(bl.get());
-      logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
-                     conn, bl.size(), (int)main_preamble.tag,
-                     (int)main_preamble.num_segments, main_preamble.crc);
-
-      // verify preamble's CRC before any further processing
-      const auto rx_crc = ceph_crc32c(0,
-        reinterpret_cast<const unsigned char*>(&main_preamble),
-        sizeof(main_preamble) - sizeof(main_preamble.crc));
-      if (rx_crc != main_preamble.crc) {
-        logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
-                      conn, rx_crc, main_preamble.crc);
-        abort_in_fault();
-      }
-
-      // currently we do support between 1 and MAX_NUM_SEGMENTS segments
-      if (main_preamble.num_segments < 1 ||
-          main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-        logger().warn("{} unsupported num_segments={}",
-                      conn, main_preamble.num_segments);
-        abort_in_fault();
-      }
-      if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-        logger().warn("{} num_segments too much: {}",
-                      conn, main_preamble.num_segments);
-        abort_in_fault();
-      }
-
-      rx_segments_desc.clear();
       rx_segments_data.clear();
-
-      for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
-        logger().trace("{} GOT frame segment: len={} align={}",
-                       conn, main_preamble.segments[idx].length,
-                       main_preamble.segments[idx].alignment);
-        rx_segments_desc.emplace_back(main_preamble.segments[idx]);
+      try {
+        rx_preamble.append(buffer::create(std::move(bl)));
+        const Tag tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+        INTERCEPT_FRAME(tag, bp_type_t::READ);
+        return tag;
+      } catch (FrameError& e) {
+        logger().warn("{} read_main_preamble: {}", conn, e.what());
+        abort_in_fault();
       }
-
-      INTERCEPT_FRAME(main_preamble.tag, bp_type_t::READ);
-      return static_cast<Tag>(main_preamble.tag);
     });
 }
 
 seastar::future<> ProtocolV2::read_frame_payload()
 {
-  ceph_assert(!rx_segments_desc.empty());
   ceph_assert(rx_segments_data.empty());
 
   return seastar::do_until(
-    [this] { return rx_segments_desc.size() == rx_segments_data.size(); },
+    [this] { return rx_frame_asm.get_num_segments() == rx_segments_data.size(); },
     [this] {
-      // description of current segment to read
-      const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
       // TODO: create aligned and contiguous buffer from socket
-      if (cur_rx_desc.alignment != segment_t::DEFAULT_ALIGNMENT) {
+      const size_t seg_idx = rx_segments_data.size();
+      if (uint16_t alignment = rx_frame_asm.get_segment_align(seg_idx);
+         alignment != segment_t::DEFAULT_ALIGNMENT) {
         logger().trace("{} cannot allocate {} aligned buffer at segment desc index {}",
-                       conn, cur_rx_desc.alignment, rx_segments_data.size());
+                       conn, alignment, rx_segments_data.size());
       }
+      uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
       // TODO: create aligned and contiguous buffer from socket
-      return read_exactly(cur_rx_desc.length)
-      .then([this] (auto tmp_bl) {
+      return read_exactly(onwire_len).then([this] (auto tmp_bl) {
         logger().trace("{} RECV({}) frame segment[{}]",
                        conn, tmp_bl.size(), rx_segments_data.size());
-        bufferlist data;
-        data.append(buffer::create(std::move(tmp_bl)));
-        if (session_stream_handlers.rx) {
-          // TODO
-          ceph_assert(false);
-        }
-        rx_segments_data.emplace_back(std::move(data));
+        bufferlist segment;
+        segment.append(buffer::create(std::move(tmp_bl)));
+        rx_segments_data.emplace_back(std::move(segment));
       });
     }
   ).then([this] {
-    // TODO: get_epilogue_size()
-    ceph_assert(!session_stream_handlers.rx);
-    return read_exactly(sizeof(epilogue_crc_rev0_block_t));
+    return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
   }).then([this] (auto bl) {
     logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
-
-    __u8 late_flags;
-    if (session_stream_handlers.rx) {
-      // TODO
-      ceph_assert(false);
-    } else {
-      auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
-      for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
-        const __u32 expected_crc = epilogue.crc_values[idx];
-        const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
-        if (expected_crc != calculated_crc) {
-          logger().warn("{} message integrity check failed at index {}:"
-                        " expected_crc={} calculated_crc={}",
-                        conn, (unsigned int)idx, expected_crc, calculated_crc);
-          abort_in_fault();
-        } else {
-          logger().trace("{} message integrity check success at index {}: crc={}",
-                         conn, (unsigned int)idx, expected_crc);
-        }
-      }
-      late_flags = epilogue.late_flags;
+    bool ok = false;
+    try {
+      bufferlist rx_epilogue;
+      rx_epilogue.append(buffer::create(std::move(bl)));
+      ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
+    } catch (FrameError& e) {
+      logger().error("read_frame_payload: {} {}", conn, e.what());
+      abort_in_fault();
+    } catch (ceph::crypto::onwire::MsgAuthError&) {
+      logger().error("read_frame_payload: {} bad auth tag", conn);
+      abort_in_fault();
     }
-    logger().trace("{} GOT frame epilogue: late_flags={}",
-                   conn, (unsigned)late_flags);
-
     // we do have a mechanism that allows transmitter to start sending message
     // and abort after putting entire data field on wire. This will be used by
     // the kernel client to avoid unnecessary buffering.
-    if (late_flags & FRAME_LATE_FLAG_ABORTED) {
+    if (!ok) {
       // TODO
       ceph_assert(false);
     }
@@ -444,8 +363,7 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
   if (conn.policy.lossy) {
     logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
                   conn, func_name, get_state_name(state), eptr);
-    dispatch_reset();
-    (void) close();
+    close(true);
   } else if (conn.policy.server ||
              (conn.policy.standby &&
               (!is_queued() && conn.sent.empty()))) {
@@ -463,17 +381,6 @@ void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr e
   }
 }
 
-void ProtocolV2::dispatch_reset()
-{
-  (void) seastar::with_gate(pending_dispatch, [this] {
-    return dispatcher.ms_handle_reset(
-        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }).handle_exception([this] (std::exception_ptr eptr) {
-    logger().error("{} ms_handle_reset caught exception: {}", conn, eptr);
-    ceph_abort("unexpected exception from ms_handle_reset()");
-  });
-}
-
 void ProtocolV2::reset_session(bool full)
 {
   server_cookie = 0;
@@ -483,21 +390,17 @@ void ProtocolV2::reset_session(bool full)
     client_cookie = generate_client_cookie();
     peer_global_seq = 0;
     reset_write();
-    (void) seastar::with_gate(pending_dispatch, [this] {
-      return dispatcher.ms_handle_remote_reset(
-          seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      logger().error("{} ms_handle_remote_reset caught exception: {}", conn, eptr);
-      ceph_abort("unexpected exception from ms_handle_remote_reset()");
-    });
+    dispatchers.ms_handle_remote_reset(
+       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
   }
 }
 
-seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
+seastar::future<std::tuple<entity_type_t, entity_addr_t>>
+ProtocolV2::banner_exchange(bool is_connect)
 {
   // 1. prepare and send banner
   bufferlist banner_payload;
-  encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+  encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
   encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
 
   bufferlist bl;
@@ -508,7 +411,8 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
   logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
                  "required={}, banner=\"{}\"",
                  conn, bl.length(), len_payload,
-                 CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+                 CRIMSON_MSGR2_SUPPORTED_FEATURES,
+                 CEPH_MSGR2_REQUIRED_FEATURES,
                  CEPH_BANNER_V2_PREFIX);
   INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
   return write_flush(std::move(bl)).then([this] {
@@ -546,7 +450,7 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
       logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
       INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
       return read(payload_len);
-    }).then([this] (bufferlist bl) {
+    }).then([this, is_connect] (bufferlist bl) {
       // 4. process peer banner_payload and send HelloFrame
       auto p = bl.cbegin();
       uint64_t peer_supported_features;
@@ -563,24 +467,27 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
                      peer_supported_features, peer_required_features);
 
       // Check feature bit compatibility
-      uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+      uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
       uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
       if ((required_features & peer_supported_features) != required_features) {
         logger().error("{} peer does not support all required features"
                        " required={} peer_supported={}",
                        conn, required_features, peer_supported_features);
-        abort_in_close(*this);
+        abort_in_close(*this, is_connect);
       }
       if ((supported_features & peer_required_features) != peer_required_features) {
         logger().error("{} we do not support all peer required features"
                        " peer_required={} supported={}",
                        conn, peer_required_features, supported_features);
-        abort_in_close(*this);
+        abort_in_close(*this, is_connect);
       }
       this->peer_required_features = peer_required_features;
       if (this->peer_required_features == 0) {
         this->connection_features = msgr2_required;
       }
+      const bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+      tx_frame_asm.set_is_rev1(is_rev1);
+      rx_frame_asm.set_is_rev1(is_rev1);
 
       auto hello = HelloFrame::Encode(messenger.get_mytype(),
                                       conn.target_addr);
@@ -600,8 +507,8 @@ seastar::future<entity_type_t, entity_addr_t> ProtocolV2::banner_exchange()
       logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
                      conn, ceph_entity_type_name(hello.entity_type()),
                      hello.peer_addr());
-      return seastar::make_ready_future<entity_type_t, entity_addr_t>(
-          hello.entity_type(), hello.peer_addr());
+      return seastar::make_ready_future<std::tuple<entity_type_t, entity_addr_t>>(
+        std::make_tuple(hello.entity_type(), hello.peer_addr()));
     });
 }
 
@@ -668,9 +575,8 @@ seastar::future<> ProtocolV2::handle_auth_reply()
             abort_in_fault();
           }
           auth_meta->con_mode = auth_done.con_mode();
-          // TODO
-          ceph_assert(!auth_meta->is_mode_secure());
-          session_stream_handlers = { nullptr, nullptr };
+          session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+              nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), false);
           return finish_auth();
         });
       default: {
@@ -699,8 +605,7 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
     });
   } catch (const crimson::auth::error& e) {
     logger().error("{} get_initial_auth_request returned {}", conn, e);
-    dispatch_reset();
-    abort_in_close(*this);
+    abort_in_close(*this, true);
     return seastar::now();
   }
 }
@@ -793,6 +698,13 @@ ProtocolV2::client_connect()
             throw std::system_error(
                 make_error_code(crimson::net::error::bad_peer_address));
           }
+          if (conn.get_peer_id() != entity_name_t::NEW &&
+              conn.get_peer_id() != server_ident.gid()) {
+            logger().error("{} connection peer id ({}) does not match "
+                           "what it should be ({}) during connecting, close",
+                            conn, server_ident.gid(), conn.get_peer_id());
+            abort_in_close(*this, true);
+          }
           conn.set_peer_id(server_ident.gid());
           conn.set_features(server_ident.supported_features() &
                             conn.policy.features_supported);
@@ -894,9 +806,7 @@ void ProtocolV2::execute_connecting()
   if (socket) {
     socket->shutdown();
   }
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
-      // we don't know my socket_port yet
-      conn.set_ephemeral_port(0, SocketConnection::side_t::none);
+  gated_execute("execute_connecting", [this] {
       return messenger.get_global_seq().then([this] (auto gs) {
           global_seq = gs;
           assert(client_cookie != 0);
@@ -918,7 +828,8 @@ void ProtocolV2::execute_connecting()
             abort_protocol();
           }
           if (socket) {
-            (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+            gate.dispatch_in_background("close_sockect_connecting", *this,
+                           [sock = std::move(socket)] () mutable {
               return sock->close().then([sock = std::move(sock)] {});
             });
           }
@@ -939,18 +850,21 @@ void ProtocolV2::execute_connecting()
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
           enable_recording();
-          return banner_exchange();
-        }).then([this] (entity_type_t _peer_type,
-                        entity_addr_t _my_addr_from_peer) {
+          return banner_exchange(true);
+        }).then([this] (auto&& ret) {
+          auto [_peer_type, _my_addr_from_peer] = std::move(ret);
           if (conn.get_peer_type() != _peer_type) {
             logger().warn("{} connection peer type does not match what peer advertises {} != {}",
                           conn, ceph_entity_type_name(conn.get_peer_type()),
                           ceph_entity_type_name(_peer_type));
-            dispatch_reset();
-            abort_in_close(*this);
+            abort_in_close(*this, true);
           }
-          conn.set_ephemeral_port(_my_addr_from_peer.get_port(),
-                                  SocketConnection::side_t::connector);
+          if (unlikely(state != state_t::CONNECTING)) {
+            logger().debug("{} triggered {} during banner_exchange(), abort",
+                           conn, get_state_name(state));
+            abort_protocol();
+          }
+          socket->learn_ephemeral_port_as_connector(_my_addr_from_peer.get_port());
           if (unlikely(_my_addr_from_peer.is_legacy())) {
             logger().warn("{} peer sent a legacy address for me: {}",
                           conn, _my_addr_from_peer);
@@ -977,20 +891,13 @@ void ProtocolV2::execute_connecting()
           }
           switch (next) {
            case next_step_t::ready: {
-            (void) seastar::with_gate(pending_dispatch, [this] {
-              return dispatcher.ms_handle_connect(
-                  seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-            }).handle_exception([this] (std::exception_ptr eptr) {
-              logger().error("{} ms_handle_connect caught exception: {}", conn, eptr);
-              ceph_abort("unexpected exception from ms_handle_connect()");
-            });
             logger().info("{} connected:"
                           " gs={}, pgs={}, cs={}, client_cookie={},"
                           " server_cookie={}, in_seq={}, out_seq={}, out_q={}",
                           conn, global_seq, peer_global_seq, connect_seq,
                           client_cookie, server_cookie, conn.in_seq,
                           conn.out_seq, conn.out_q.size());
-            execute_ready();
+            execute_ready(true);
             break;
            }
            case next_step_t::wait: {
@@ -1065,9 +972,8 @@ seastar::future<> ProtocolV2::_handle_auth_request(bufferlist& auth_payload, boo
                    ceph_con_mode_name(auth_meta->con_mode), reply.length());
     return write_frame(auth_done).then([this] {
       ceph_assert(auth_meta);
-      // TODO
-      ceph_assert(!auth_meta->is_mode_secure());
-      session_stream_handlers = { nullptr, nullptr };
+      session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+          nullptr, *auth_meta, tx_frame_asm.get_is_rev1(), true);
       return finish_auth();
     });
    }
@@ -1125,6 +1031,20 @@ seastar::future<> ProtocolV2::server_auth()
   });
 }
 
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+  auto my_peer_name = conn.get_peer_name();
+  if (my_peer_name.type() != peer_name.type()) {
+    return false;
+  }
+  if (my_peer_name.num() != entity_name_t::NEW &&
+      peer_name.num() != entity_name_t::NEW &&
+      my_peer_name.num() != peer_name.num()) {
+    return false;
+  }
+  return true;
+}
+
 seastar::future<ProtocolV2::next_step_t>
 ProtocolV2::send_wait()
 {
@@ -1149,6 +1069,8 @@ ProtocolV2::reuse_connection(
                                     client_cookie,
                                     conn.get_peer_name(),
                                     connection_features,
+                                    tx_frame_asm.get_is_rev1(),
+                                    rx_frame_asm.get_is_rev1(),
                                     conn_seq,
                                     msg_seq);
 #ifdef UNIT_TESTS_BUILT
@@ -1159,7 +1081,7 @@ ProtocolV2::reuse_connection(
   // close this connection because all the necessary information is delivered
   // to the exisiting connection, and jump to error handling code to abort the
   // current state.
-  abort_in_close(*this);
+  abort_in_close(*this, false);
   return seastar::make_ready_future<next_step_t>(next_step_t::none);
 }
 
@@ -1181,6 +1103,12 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
                  existing_proto->client_cookie,
                  existing_proto->server_cookie);
 
+  if (!validate_peer_name(existing_conn->get_peer_name())) {
+    logger().error("{} server_connect: my peer_name doesn't match"
+                   " the existing connection {}, abort", conn, existing_conn);
+    abort_in_fault();
+  }
+
   if (existing_proto->state == state_t::REPLACING) {
     logger().warn("{} server_connect: racing replace happened while"
                   " replacing existing connection {}, send wait.",
@@ -1203,15 +1131,7 @@ ProtocolV2::handle_existing_connection(SocketConnectionRef existing_conn)
     logger().warn("{} server_connect:"
                   " existing connection {} is a lossy channel. Close existing in favor of"
                   " this connection", conn, *existing_conn);
-    existing_proto->dispatch_reset();
-    (void) existing_proto->close();
-
-    if (unlikely(state != state_t::ACCEPTING)) {
-      logger().debug("{} triggered {} in execute_accepting()",
-                     conn, get_state_name(state));
-      abort_protocol();
-    }
-    execute_establishing();
+    execute_establishing(existing_conn, true);
     return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   }
 
@@ -1289,18 +1209,7 @@ ProtocolV2::server_connect()
       throw std::system_error(
           make_error_code(crimson::net::error::bad_peer_address));
     }
-    // TODO: change peer_addr to entity_addrvec_t
-    entity_addr_t paddr = client_ident.addrs().front();
-    if ((paddr.is_msgr2() || paddr.is_any()) &&
-        paddr.is_same_host(conn.target_addr)) {
-      // good
-    } else {
-      logger().warn("{} peer's address {} is not v2 or not the same host with {}",
-                    conn, paddr, conn.target_addr);
-      throw std::system_error(
-          make_error_code(crimson::net::error::bad_peer_address));
-    }
-    conn.peer_addr = paddr;
+    conn.peer_addr = client_ident.addrs().front();
     logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
     conn.target_addr = conn.peer_addr;
     if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
@@ -1310,6 +1219,13 @@ ProtocolV2::server_connect()
           make_error_code(crimson::net::error::bad_peer_address));
     }
 
+    if (conn.get_peer_id() != entity_name_t::NEW &&
+        conn.get_peer_id() != client_ident.gid()) {
+      logger().error("{} client_ident peer_id ({}) does not match"
+                     " what it should be ({}) during accepting, abort",
+                      conn, client_ident.gid(), conn.get_peer_id());
+      abort_in_fault();
+    }
     conn.set_peer_id(client_ident.gid());
     client_cookie = client_ident.cookie();
 
@@ -1341,19 +1257,16 @@ ProtocolV2::server_connect()
                       conn, *existing_conn,
                       static_cast<int>(existing_conn->protocol->proto_type));
         // should unregister the existing from msgr atomically
-        (void) existing_conn->close();
+        // NOTE: this is following async messenger logic, but we may miss the reset event.
+        execute_establishing(existing_conn, false);
+        return seastar::make_ready_future<next_step_t>(next_step_t::ready);
       } else {
         return handle_existing_connection(existing_conn);
       }
+    } else {
+      execute_establishing(nullptr, true);
+      return seastar::make_ready_future<next_step_t>(next_step_t::ready);
     }
-
-    if (unlikely(state != state_t::ACCEPTING)) {
-      logger().debug("{} triggered {} in execute_accepting()",
-                     conn, get_state_name(state));
-      abort_protocol();
-    }
-    execute_establishing();
-    return seastar::make_ready_future<next_step_t>(next_step_t::ready);
   });
 }
 
@@ -1450,7 +1363,8 @@ ProtocolV2::server_reconnect()
                     "close existing and reset client.",
                     conn, *existing_conn,
                     static_cast<int>(existing_conn->protocol->proto_type));
-      (void) existing_conn->close();
+      // NOTE: this is following async messenger logic, but we may miss the reset event.
+      existing_conn->mark_down();
       return send_reset(true);
     }
 
@@ -1469,6 +1383,12 @@ ProtocolV2::server_reconnect()
                    existing_proto->client_cookie,
                    existing_proto->server_cookie);
 
+    if (!validate_peer_name(existing_conn->get_peer_name())) {
+      logger().error("{} server_reconnect: my peer_name doesn't match"
+                     " the existing connection {}, abort", conn, existing_conn);
+      abort_in_fault();
+    }
+
     if (existing_proto->state == state_t::REPLACING) {
       logger().warn("{} server_reconnect: racing replace happened while "
                     " replacing existing connection {}, retry global.",
@@ -1543,15 +1463,16 @@ ProtocolV2::server_reconnect()
 void ProtocolV2::execute_accepting()
 {
   trigger_state(state_t::ACCEPTING, write_state_t::none, false);
-  (void) seastar::with_gate(pending_dispatch, [this] {
-      return seastar::futurize_apply([this] {
+  gate.dispatch_in_background("execute_accepting", *this, [this] {
+      return seastar::futurize_invoke([this] {
           INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
           auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
           session_stream_handlers = { nullptr, nullptr };
+          session_comp_handlers = { nullptr, nullptr };
           enable_recording();
-          return banner_exchange();
-        }).then([this] (entity_type_t _peer_type,
-                        entity_addr_t _my_addr_from_peer) {
+          return banner_exchange(false);
+        }).then([this] (auto&& ret) {
+          auto [_peer_type, _my_addr_from_peer] = std::move(ret);
           ceph_assert(conn.get_peer_type() == 0);
           conn.set_peer_type(_peer_type);
 
@@ -1561,8 +1482,9 @@ void ProtocolV2::execute_accepting()
                         conn, ceph_entity_type_name(_peer_type),
                         conn.policy.lossy, conn.policy.server,
                         conn.policy.standby, conn.policy.resetcheck);
-          if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
-              messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+          if (!messenger.get_myaddr().is_blank_ip() &&
+              (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+              messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
             logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
                           conn, _my_addr_from_peer, messenger.get_myaddr());
             throw std::system_error(
@@ -1604,7 +1526,7 @@ void ProtocolV2::execute_accepting()
         }).handle_exception([this] (std::exception_ptr eptr) {
           logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
                         conn, get_state_name(state), eptr);
-          (void) close();
+          close(false);
         });
     });
 }
@@ -1646,23 +1568,41 @@ seastar::future<> ProtocolV2::finish_auth()
 
 // ESTABLISHING
 
-void ProtocolV2::execute_establishing() {
+void ProtocolV2::execute_establishing(
+    SocketConnectionRef existing_conn, bool dispatch_reset) {
+  if (unlikely(state != state_t::ACCEPTING)) {
+    logger().debug("{} triggered {} before execute_establishing()",
+                   conn, get_state_name(state));
+    abort_protocol();
+  }
+
+  auto accept_me = [this] {
+    messenger.register_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+    messenger.unaccept_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+  };
+
   trigger_state(state_t::ESTABLISHING, write_state_t::delay, false);
-  (void) seastar::with_gate(pending_dispatch, [this] {
-    return dispatcher.ms_handle_accept(
-        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }).handle_exception([this] (std::exception_ptr eptr) {
-    logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
-    ceph_abort("unexpected exception from ms_handle_accept()");
-  });
-  messenger.register_conn(
-    seastar::static_pointer_cast<SocketConnection>(
-      conn.shared_from_this()));
-  messenger.unaccept_conn(
-    seastar::static_pointer_cast<SocketConnection>(
-      conn.shared_from_this()));
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
-    return seastar::futurize_apply([this] {
+  if (existing_conn) {
+    existing_conn->protocol->close(dispatch_reset, std::move(accept_me));
+    if (unlikely(state != state_t::ESTABLISHING)) {
+      logger().warn("{} triggered {} during execute_establishing(), "
+                    "the accept event will not be delivered!",
+                    conn, get_state_name(state));
+      abort_protocol();
+    }
+  } else {
+    accept_me();
+  }
+
+  dispatchers.ms_handle_accept(
+      seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+  gated_execute("execute_establishing", [this] {
+    return seastar::futurize_invoke([this] {
       return send_server_ident();
     }).then([this] {
       if (unlikely(state != state_t::ESTABLISHING)) {
@@ -1675,7 +1615,7 @@ void ProtocolV2::execute_establishing() {
                     conn, global_seq, peer_global_seq, connect_seq,
                     client_cookie, server_cookie, conn.in_seq,
                     conn.out_seq, conn.out_q.size());
-      execute_ready();
+      execute_ready(false);
     }).handle_exception([this] (std::exception_ptr eptr) {
       if (state != state_t::ESTABLISHING) {
         logger().info("{} execute_establishing() protocol aborted at {} -- {}",
@@ -1747,6 +1687,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                                    uint64_t new_client_cookie,
                                    entity_name_t new_peer_name,
                                    uint64_t new_conn_features,
+                                   bool tx_is_rev1,
+                                   bool rx_is_rev1,
                                    uint64_t new_connect_seq,
                                    uint64_t new_msg_seq)
 {
@@ -1754,23 +1696,19 @@ void ProtocolV2::trigger_replacing(bool reconnect,
   if (socket) {
     socket->shutdown();
   }
-  (void) seastar::with_gate(pending_dispatch, [this] {
-    return dispatcher.ms_handle_accept(
-        seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
-  }).handle_exception([this] (std::exception_ptr eptr) {
-    logger().error("{} ms_handle_accept caught exception: {}", conn, eptr);
-    ceph_abort("unexpected exception from ms_handle_accept()");
-  });
-  (void) seastar::with_gate(pending_dispatch,
-                            [this,
-                             reconnect,
-                             do_reset,
-                             new_socket = std::move(new_socket),
-                             new_auth_meta = std::move(new_auth_meta),
-                             new_rxtx = std::move(new_rxtx),
-                             new_client_cookie, new_peer_name,
-                             new_conn_features, new_peer_global_seq,
-                             new_connect_seq, new_msg_seq] () mutable {
+  dispatchers.ms_handle_accept(
+      seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  gate.dispatch_in_background("trigger_replacing", *this,
+                 [this,
+                  reconnect,
+                  do_reset,
+                  new_socket = std::move(new_socket),
+                  new_auth_meta = std::move(new_auth_meta),
+                  new_rxtx = std::move(new_rxtx),
+                  tx_is_rev1, rx_is_rev1,
+                  new_client_cookie, new_peer_name,
+                  new_conn_features, new_peer_global_seq,
+                  new_connect_seq, new_msg_seq] () mutable {
     return wait_write_exit().then([this, do_reset] {
       if (do_reset) {
         reset_session(true);
@@ -1782,6 +1720,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
              new_socket = std::move(new_socket),
              new_auth_meta = std::move(new_auth_meta),
              new_rxtx = std::move(new_rxtx),
+             tx_is_rev1, rx_is_rev1,
              new_client_cookie, new_peer_name,
              new_conn_features, new_peer_global_seq,
              new_connect_seq, new_msg_seq] () mutable {
@@ -1792,7 +1731,8 @@ void ProtocolV2::trigger_replacing(bool reconnect,
       }
 
       if (socket) {
-        (void) with_gate(pending_dispatch, [sock = std::move(socket)] () mutable {
+        gate.dispatch_in_background("close_socket_replacing", *this,
+                       [sock = std::move(socket)] () mutable {
           return sock->close().then([sock = std::move(sock)] {});
         });
       }
@@ -1811,8 +1751,13 @@ void ProtocolV2::trigger_replacing(bool reconnect,
         return write_frame(reconnect_ok);
       } else {
         client_cookie = new_client_cookie;
-        conn.set_peer_name(new_peer_name);
+        assert(conn.get_peer_type() == new_peer_name.type());
+        if (conn.get_peer_id() == entity_name_t::NEW) {
+          conn.set_peer_id(new_peer_name.num());
+        }
         connection_features = new_conn_features;
+        tx_frame_asm.set_is_rev1(tx_is_rev1);
+        rx_frame_asm.set_is_rev1(rx_is_rev1);
         return send_server_ident();
       }
     }).then([this, reconnect] {
@@ -1827,7 +1772,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
                     conn, reconnect ? "reconnected" : "connected",
                     global_seq, peer_global_seq, connect_seq, client_cookie,
                     server_cookie, conn.in_seq, conn.out_seq, conn.out_q.size());
-      execute_ready();
+      execute_ready(false);
     }).handle_exception([this] (std::exception_ptr eptr) {
       if (state != state_t::REPLACING) {
         logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
@@ -1843,7 +1788,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 // READY state
 
 ceph::bufferlist ProtocolV2::do_sweep_messages(
-    const std::deque<MessageRef>& msgs,
+    const std::deque<MessageURef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
     std::optional<utime_t> _keepalive_ack,
@@ -1863,13 +1808,13 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
   }
 
-  if (require_ack && !num_msgs) {
+  if (require_ack && num_msgs == 0u) {
     auto ack_frame = AckFrame::Encode(conn.in_seq);
     bl.append(ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
 
-  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
     // TODO: move to common code
     // set priority
     msg->get_header().src = messenger.get_myname();
@@ -1885,8 +1830,8 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     ceph_msg_header2 header2{header.seq,        header.tid,
                              header.type,       header.priority,
                              header.version,
-                             init_le32(0),      header.data_off,
-                             init_le64(conn.in_seq),
+                             ceph_le32(0),      header.data_off,
+                             ceph_le64(conn.in_seq),
                              footer.flags,      header.compat_version,
                              header.reserved};
 
@@ -1924,22 +1869,21 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
                            current_header.type,
                            current_header.priority,
                            current_header.version,
-                           init_le32(msg_frame.front_len()),
-                           init_le32(msg_frame.middle_len()),
-                           init_le32(msg_frame.data_len()),
+                           ceph_le32(msg_frame.front_len()),
+                           ceph_le32(msg_frame.middle_len()),
+                           ceph_le32(msg_frame.data_len()),
                            current_header.data_off,
                            conn.get_peer_name(),
                            current_header.compat_version,
                            current_header.reserved,
-                           init_le32(0)};
-    ceph_msg_footer footer{init_le32(0), init_le32(0),
-                           init_le32(0), init_le64(0), current_header.flags};
+                           ceph_le32(0)};
+    ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+                           ceph_le32(0), ceph_le64(0), current_header.flags};
 
-    auto pconn = seastar::static_pointer_cast<SocketConnection>(
+    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this());
     Message *message = decode_message(nullptr, 0, header, footer,
-        msg_frame.front(), msg_frame.middle(), msg_frame.data(),
-        std::move(pconn));
+        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
     if (!message) {
       logger().warn("{} decode message failed", conn);
       abort_in_fault();
@@ -1960,17 +1904,17 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     // elsewhere.  in that case it doesn't matter if we "got" it or not.
     uint64_t cur_seq = conn.in_seq;
     if (message->get_seq() <= cur_seq) {
-      logger().error("{} got old message {} <= {} {} {}, discarding",
-                     conn, message->get_seq(), cur_seq, message, *message);
+      logger().error("{} got old message {} <= {} {}, discarding",
+                     conn, message->get_seq(), cur_seq, *message);
       if (HAVE_FEATURE(conn.features, RECONNECT_SEQ) &&
-          conf.ms_die_on_old_message) {
+          local_conf()->ms_die_on_old_message) {
         ceph_assert(0 == "old msgs despite reconnect_seq feature");
       }
-      return;
+      return seastar::now();
     } else if (message->get_seq() > cur_seq + 1) {
       logger().error("{} missed message? skipped from seq {} to {}",
                      conn, cur_seq, message->get_seq());
-      if (conf.ms_die_on_skipped_message) {
+      if (local_conf()->ms_die_on_skipped_message) {
         ceph_assert(0 == "skipped incoming seq");
       }
     }
@@ -1984,32 +1928,32 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
 
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
-    (void) seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
-      return dispatcher.ms_dispatch(&conn, std::move(msg));
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
-      ceph_abort("unexpected exception from ms_dispatch()");
-    });
+    // throttle the reading process by the returned future
+    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
   });
 }
 
-void ProtocolV2::execute_ready()
+void ProtocolV2::execute_ready(bool dispatch_connect)
 {
   assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
   trigger_state(state_t::READY, write_state_t::open, false);
+  if (dispatch_connect) {
+    dispatchers.ms_handle_connect(
+       seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  }
 #ifdef UNIT_TESTS_BUILT
   if (conn.interceptor) {
     conn.interceptor->register_conn_ready(conn);
   }
 #endif
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
+  gated_execute("execute_ready", [this] {
     protocol_timer.cancel();
     return seastar::keep_doing([this] {
       return read_main_preamble()
       .then([this] (Tag tag) {
         switch (tag) {
           case Tag::MESSAGE: {
-            return seastar::futurize_apply([this] {
+            return seastar::futurize_invoke([this] {
               // throttle_message() logic
               if (!conn.policy.throttler_messages) {
                 return seastar::now();
@@ -2108,15 +2052,14 @@ void ProtocolV2::execute_wait(bool max_backoff)
   if (socket) {
     socket->shutdown();
   }
-  execution_done = seastar::with_gate(pending_dispatch,
-                                      [this, max_backoff] {
+  gated_execute("execute_wait", [this, max_backoff] {
     double backoff = protocol_timer.last_dur();
     if (max_backoff) {
-      backoff = conf.ms_max_backoff;
+      backoff = local_conf().get_val<double>("ms_max_backoff");
     } else if (backoff > 0) {
-      backoff = std::min(conf.ms_max_backoff, 2 * backoff);
+      backoff = std::min(local_conf().get_val<double>("ms_max_backoff"), 2 * backoff);
     } else {
-      backoff = conf.ms_initial_backoff;
+      backoff = local_conf().get_val<double>("ms_initial_backoff");
     }
     return protocol_timer.backoff(backoff).then([this] {
       if (unlikely(state != state_t::WAIT)) {
@@ -2140,14 +2083,14 @@ void ProtocolV2::execute_wait(bool max_backoff)
 void ProtocolV2::execute_server_wait()
 {
   trigger_state(state_t::SERVER_WAIT, write_state_t::delay, false);
-  execution_done = seastar::with_gate(pending_dispatch, [this] {
+  gated_execute("execute_server_wait", [this] {
     return read_exactly(1).then([this] (auto bl) {
       logger().warn("{} SERVER_WAIT got read, abort", conn);
       abort_in_fault();
     }).handle_exception([this] (std::exception_ptr eptr) {
       logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
                     conn, get_state_name(state), eptr);
-      (void) close();
+      close(false);
     });
   });
 }
@@ -2156,6 +2099,10 @@ void ProtocolV2::execute_server_wait()
 
 void ProtocolV2::trigger_close()
 {
+  messenger.closing_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+        conn.shared_from_this()));
+
   if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
     messenger.unaccept_conn(
       seastar::static_pointer_cast<SocketConnection>(
@@ -2170,13 +2117,19 @@ void ProtocolV2::trigger_close()
   }
 
   protocol_timer.cancel();
-
   trigger_state(state_t::CLOSING, write_state_t::drop, false);
-#ifdef UNIT_TESTS_BUILT
-  if (conn.interceptor) {
-    conn.interceptor->register_conn_closed(conn);
-  }
-#endif
+}
+
+void ProtocolV2::on_closed()
+{
+  messenger.closed_conn(
+      seastar::static_pointer_cast<SocketConnection>(
+       conn.shared_from_this()));
+}
+
+void ProtocolV2::print(std::ostream& out) const
+{
+  out << conn;
 }
 
 } // namespace crimson::net