]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/ProtocolV2.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / ProtocolV2.cc
index 5666b24c7a2946e804a2b9097005f6889d112206..a176fc2c808ac9eb8d465310781daf4eacc20b98 100644 (file)
@@ -16,7 +16,7 @@
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
 #define dout_prefix _conn_prefix(_dout)
-ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
+std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
   return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
                 << *connection->peer_addrs << " conn(" << connection << " "
                 << this
@@ -24,8 +24,12 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
                << " :" << connection->port
                 << " s=" << get_state_name(state) << " pgs=" << peer_global_seq
                 << " cs=" << connect_seq << " l=" << connection->policy.lossy
-                << " rx=" << session_stream_handlers.rx.get()
+                << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
+                                                  REVISION_1)
+                << " crypto rx=" << session_stream_handlers.rx.get()
                 << " tx=" << session_stream_handlers.tx.get()
+                << " comp rx=" << session_compression_handlers.rx.get()
+                << " tx=" << session_compression_handlers.tx.get()
                 << ").";
 }
 
@@ -43,8 +47,8 @@ void ProtocolV2::run_continuation(CtPtr pcontinuation) {
 void ProtocolV2::run_continuation(CtRef continuation) {
   try {
     CONTINUATION_RUN(continuation)
-  } catch (const buffer::error &e) {
-    lderr(cct) << __func__ << " failed decoding of frame header: " << e
+  } catch (const ceph::buffer::error &e) {
+    lderr(cct) << __func__ << " failed decoding of frame header: " << e.what()
                << dendl;
     _fault();
   } catch (const ceph::crypto::onwire::MsgAuthError &e) {
@@ -57,7 +61,7 @@ void ProtocolV2::run_continuation(CtRef continuation) {
 
 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
 
-#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
+#define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L)))
 
 #define READ_RXBUF(B, C) read(CONTINUATION(C), B)
 
@@ -81,7 +85,7 @@ if(connection->interceptor) { \
 ProtocolV2::ProtocolV2(AsyncConnection *connection)
     : Protocol(2, connection),
       state(NONE),
-      peer_required_features(0),
+      peer_supported_features(0),
       client_cookie(0),
       server_cookie(0),
       global_seq(0),
@@ -92,6 +96,10 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection)
       replacing(false),
       can_write(false),
       bannerExchangeCallback(nullptr),
+      tx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+                   &session_compression_handlers),
+      rx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+                   &session_compression_handlers),
       next_tag(static_cast<Tag>(0)),
       keepalive(false) {
 }
@@ -119,7 +127,7 @@ bool ProtocolV2::is_connected() { return can_write; }
 void ProtocolV2::discard_out_queue() {
   ldout(cct, 10) << __func__ << " started" << dendl;
 
-  for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
+  for (auto p = sent.begin(); p != sent.end(); ++p) {
     ldout(cct, 20) << __func__ << " discard " << *p << dendl;
     (*p)->put();
   }
@@ -145,7 +153,7 @@ void ProtocolV2::reset_session() {
 
   connection->dispatch_queue->discard_queue(connection->conn_id);
   discard_out_queue();
-  connection->outcoming_bl.clear();
+  connection->outgoing_bl.clear();
 
   connection->dispatch_queue->queue_remote_reset(connection);
 
@@ -195,6 +203,7 @@ void ProtocolV2::requeue_sent() {
     ldout(cct, 5) << __func__ << " requeueing message m=" << m
                   << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
                   << *m << dendl;
+    m->clear_payload();
     rq.emplace_front(out_queue_entry_t{false, m});
   }
 }
@@ -221,12 +230,38 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   return count;
 }
 
-void ProtocolV2::reset_recv_state() {
+void ProtocolV2::reset_security() {
+  ldout(cct, 5) << __func__ << dendl;
+
   auth_meta.reset(new AuthConnectionMeta);
-  session_stream_handlers.tx.reset(nullptr);
   session_stream_handlers.rx.reset(nullptr);
-  pre_auth.txbuf.clear();
+  session_stream_handlers.tx.reset(nullptr);
   pre_auth.rxbuf.clear();
+  pre_auth.txbuf.clear();
+}
+
+// it's expected the `write_lock` is held while calling this method.
+void ProtocolV2::reset_recv_state() {
+  ldout(cct, 5) << __func__ << dendl;
+
+  if (!connection->center->in_thread()) {
+    // execute in the same thread that uses the rx/tx handlers. We need
+    // to do the warp because holding `write_lock` is not enough as
+    // `write_event()` unlocks it just before calling `write_message()`.
+    // `submit_to()` here is NOT blocking.
+    connection->center->submit_to(connection->center->get_id(), [this] {
+      ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
+                    << dendl;
+      // Possibly unnecessary. See the comment in `deactivate_existing`.
+      std::lock_guard<std::mutex> l(connection->lock);
+      std::lock_guard<std::mutex> wl(connection->write_lock);
+      reset_security();
+      reset_compression();
+    }, /* always_async = */true);
+  } else {
+    reset_security();
+    reset_compression();
+  }
 
   // clean read and write callbacks
   connection->pendingReadLen.reset();
@@ -238,11 +273,11 @@ void ProtocolV2::reset_recv_state() {
 }
 
 size_t ProtocolV2::get_current_msg_size() const {
-  ceph_assert(!rx_segments_desc.empty());
+  ceph_assert(rx_frame_asm.get_num_segments() > 0);
   size_t sum = 0;
   // we don't include SegmentIndex::Msg::HEADER.
-  for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
-    sum += rx_segments_desc[idx].length;
+  for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
+    sum += rx_frame_asm.get_segment_logical_len(i);
   }
   return sum;
 }
@@ -383,13 +418,8 @@ void ProtocolV2::prepare_send_message(uint64_t features,
   ldout(cct, 20) << __func__ << " m=" << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
-  if (m->empty_payload()) {
-    ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
-                   << " " << *m << dendl;
-  } else {
-    ldout(cct, 20) << __func__ << " half-reencoding features " << features
-                   << " " << m << " " << *m << dendl;
-  }
+  ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
+                << features << " " << m  << " " << *m << dendl;
 
   // encode and copy out of *m
   m->encode(features, 0);
@@ -422,6 +452,7 @@ void ProtocolV2::send_message(Message *m) {
   } else {
     ldout(cct, 5) << __func__ << " enqueueing message m=" << m
                   << " type=" << m->get_type() << " " << *m << dendl;
+    m->queue_start = ceph::mono_clock::now();
     m->trace.event("async enqueueing message");
     out_queue[m->get_priority()].emplace_back(
       out_queue_entry_t{is_prepared, m});
@@ -502,8 +533,8 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
   ceph_msg_header2 header2{header.seq,        header.tid,
                            header.type,       header.priority,
                            header.version,
-                           0,                 header.data_off,
-                           ack_seq,
+                           ceph_le32(0),      header.data_off,
+                           ceph_le64(ack_seq),
                            footer.flags,      header.compat_version,
                            header.reserved};
 
@@ -512,7 +543,10 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                             m->get_payload(),
                             m->get_middle(),
                             m->get_data());
-  connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
+  if (!append_frame(message)) {
+    m->put();
+    return -EILSEQ;
+  }
 
   ldout(cct, 5) << __func__ << " sending message m=" << m
                 << " seq=" << m->get_seq() << " " << *m << dendl;
@@ -522,35 +556,43 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) {
                  << " src=" << entity_name_t(messenger->get_myname())
                  << " off=" << header2.data_off
                  << dendl;
-  ssize_t total_send_size = connection->outcoming_bl.length();
+  ssize_t total_send_size = connection->outgoing_bl.length();
   ssize_t rc = connection->_try_send(more);
   if (rc < 0) {
     ldout(cct, 1) << __func__ << " error sending " << m << ", "
                   << cpp_strerror(rc) << dendl;
   } else {
     connection->logger->inc(
-        l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+        l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
     ldout(cct, 10) << __func__ << " sending " << m
                    << (rc ? " continuely." : " done.") << dendl;
   }
+
+#if defined(WITH_EVENTTRACE)
   if (m->get_type() == CEPH_MSG_OSD_OP)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
+#endif
   m->put();
 
   return rc;
 }
 
-void ProtocolV2::append_keepalive() {
-  ldout(cct, 10) << __func__ << dendl;
-  auto keepalive_frame = KeepAliveFrame::Encode();
-  connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
-}
+template <class F>
+bool ProtocolV2::append_frame(F& frame) {
+  ceph::bufferlist bl;
+  try {
+    bl = frame.get_buffer(tx_frame_asm);
+  } catch (ceph::crypto::onwire::TxHandlerError &e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return false;
+  }
 
-void ProtocolV2::append_keepalive_ack(utime_t &timestamp) {
-  auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
-  connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
+  connection->outgoing_bl.claim_append(bl);
+  return true;
 }
 
 void ProtocolV2::handle_message_ack(uint64_t seq) {
@@ -564,6 +606,7 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
   static const int max_pending = 128;
   int i = 0;
   Message *pending[max_pending];
+  auto now = ceph::mono_clock::now();
   connection->write_lock.lock();
   while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
     Message *m = sent.front();
@@ -574,11 +617,20 @@ void ProtocolV2::handle_message_ack(uint64_t seq) {
                    << dendl;
   }
   connection->write_lock.unlock();
+  connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
   for (int k = 0; k < i; k++) {
     pending[k]->put();
   }
 }
 
+void ProtocolV2::reset_compression() {
+  ldout(cct, 5) << __func__ << dendl;
+
+  comp_meta = CompConnectionMeta{};
+  session_compression_handlers.rx.reset(nullptr);
+  session_compression_handlers.tx.reset(nullptr);
+}
+
 void ProtocolV2::write_event() {
   ldout(cct, 10) << __func__ << dendl;
   ssize_t r = 0;
@@ -586,7 +638,15 @@ void ProtocolV2::write_event() {
   connection->write_lock.lock();
   if (can_write) {
     if (keepalive) {
-      append_keepalive();
+      ldout(cct, 10) << __func__ << " appending keepalive" << dendl;
+      auto keepalive_frame = KeepAliveFrame::Encode();
+      if (!append_frame(keepalive_frame)) {
+        connection->write_lock.unlock();
+        connection->lock.lock();
+        fault();
+        connection->lock.unlock();
+        return;
+      }
       keepalive = false;
     }
 
@@ -611,6 +671,12 @@ void ProtocolV2::write_event() {
         prepare_send_message(connection->get_features(), out_entry.m);
       }
 
+      if (out_entry.m->queue_start != ceph::mono_time()) {
+        connection->logger->tinc(l_msgr_send_messages_queue_lat,
+                                ceph::mono_clock::now() -
+                                out_entry.m->queue_start);
+      }
+
       r = write_message(out_entry.m, more);
 
       connection->write_lock.lock();
@@ -619,8 +685,11 @@ void ProtocolV2::write_event() {
       } else if (r < 0) {
         ldout(cct, 1) << __func__ << " send msg failed" << dendl;
         break;
-      } else if (r > 0)
+      } else if (r > 0) {
+       // Outbound message in-progress, thread will be re-awoken
+       // when the outbound socket is writeable again
         break;
+      }
     } while (can_write);
     write_in_progress = false;
 
@@ -628,15 +697,16 @@ void ProtocolV2::write_event() {
     if (r == 0) {
       uint64_t left = ack_left;
       if (left) {
-        ceph_le64 s;
-        s = in_seq;
-        auto ack = AckFrame::Encode(in_seq);
-        connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
         ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
                        << " messages" << dendl;
-        ack_left -= left;
-        left = ack_left;
-        r = connection->_try_send(left);
+        auto ack_frame = AckFrame::Encode(in_seq);
+        if (append_frame(ack_frame)) {
+          ack_left -= left;
+          left = ack_left;
+          r = connection->_try_send(left);
+        } else {
+          r = -EILSEQ;
+        }
       } else if (is_queued()) {
         r = connection->_try_send();
       }
@@ -683,26 +753,6 @@ bool ProtocolV2::is_queued() {
   return !out_queue.empty() || connection->is_queued();
 }
 
-uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
-  if (session_stream_handlers.rx) {
-    return segment_onwire_size(logical_size);
-  } else {
-    return logical_size;
-  }
-}
-
-uint32_t ProtocolV2::get_epilogue_size() const {
-  // In secure mode size of epilogue is flexible and depends on particular
-  // cipher implementation. See the comment for epilogue_secure_block_t or
-  // epilogue_plain_block_t.
-  if (session_stream_handlers.rx) {
-    return FRAME_SECURE_EPILOGUE_SIZE + \
-        session_stream_handlers.rx->get_extra_size_at_final();
-  } else {
-    return FRAME_PLAIN_EPILOGUE_SIZE;
-  }
-}
-
 CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
                        rx_buffer_t &&buffer) {
   const auto len = buffer->length();
@@ -713,17 +763,17 @@ CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
       if (unlikely(pre_auth.enabled) && r >= 0) {
         pre_auth.rxbuf.append(*next.node);
        ceph_assert(!cct->_conf->ms_die_on_bug ||
-                   pre_auth.rxbuf.length() < 1000000);
+                   pre_auth.rxbuf.length() < 20000000);
       }
       next.r = r;
       run_continuation(next);
     });
   if (r <= 0) {
     // error or done synchronously
-    if (unlikely(pre_auth.enabled) && r >= 0) {
+    if (unlikely(pre_auth.enabled) && r == 0) {
       pre_auth.rxbuf.append(*next.node);
       ceph_assert(!cct->_conf->ms_die_on_bug ||
-                 pre_auth.rxbuf.length() < 1000000);
+                 pre_auth.rxbuf.length() < 20000000);
     }
     next.r = r;
     return &next;
@@ -736,17 +786,26 @@ template <class F>
 CtPtr ProtocolV2::write(const std::string &desc,
                         CONTINUATION_TYPE<ProtocolV2> &next,
                         F &frame) {
-  ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
+  ceph::bufferlist bl;
+  try {
+    bl = frame.get_buffer(tx_frame_asm);
+  } catch (ceph::crypto::onwire::TxHandlerError &e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  }
+
+  ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+                 << " bytes " << tx_frame_asm << dendl;
   return write(desc, next, bl);
 }
 
 CtPtr ProtocolV2::write(const std::string &desc,
                         CONTINUATION_TYPE<ProtocolV2> &next,
-                        bufferlist &buffer) {
+                        ceph::bufferlist &buffer) {
   if (unlikely(pre_auth.enabled)) {
     pre_auth.txbuf.append(buffer);
     ceph_assert(!cct->_conf->ms_die_on_bug ||
-               pre_auth.txbuf.length() < 1000000);
+               pre_auth.txbuf.length() < 20000000);
   }
 
   ssize_t r =
@@ -776,11 +835,12 @@ CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
   ldout(cct, 20) << __func__ << dendl;
   bannerExchangeCallback = &callback;
 
-  bufferlist banner_payload;
+  ceph::bufferlist banner_payload;
+  using ceph::encode;
   encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
   encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
 
-  bufferlist bl;
+  ceph::bufferlist bl;
   bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
   encode((uint16_t)banner_payload.length(), bl, 0);
   bl.claim_append(banner_payload);
@@ -791,7 +851,7 @@ CtPtr ProtocolV2::_banner_exchange(CtRef callback) {
 }
 
 CtPtr ProtocolV2::_wait_for_peer_banner() {
-  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+  unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
   return READ(banner_len, _handle_peer_banner);
 }
 
@@ -817,14 +877,15 @@ CtPtr ProtocolV2::_handle_peer_banner(rx_buffer_t &&buffer, int r) {
   }
 
   uint16_t payload_len;
-  bufferlist bl;
+  ceph::bufferlist bl;
   buffer->set_offset(banner_prefix_len);
-  buffer->set_length(sizeof(__le16));
+  buffer->set_length(sizeof(ceph_le16));
   bl.push_back(std::move(buffer));
   auto ti = bl.cbegin();
+  using ceph::decode;
   try {
     decode(payload_len, ti);
-  } catch (const buffer::error &e) {
+  } catch (const ceph::buffer::error &e) {
     lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
     return _fault();
   }
@@ -846,13 +907,14 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
   uint64_t peer_supported_features;
   uint64_t peer_required_features;
 
-  bufferlist bl;
+  ceph::bufferlist bl;
+  using ceph::decode;
   bl.push_back(std::move(buffer));
   auto ti = bl.cbegin();
   try {
     decode(peer_supported_features, ti);
     decode(peer_required_features, ti);
-  } catch (const buffer::error &e) {
+  } catch (const ceph::buffer::error &e) {
     lderr(cct) << __func__ << " decode banner payload failed " << dendl;
     return _fault();
   }
@@ -884,13 +946,15 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(rx_buffer_t &&buffer, int r) {
     return nullptr;
   }
 
-  this->peer_required_features = peer_required_features;
-  if (this->peer_required_features == 0) {
+  this->peer_supported_features = peer_supported_features;
+  if (peer_required_features == 0) {
     this->connection_features = msgr2_required;
   }
 
-  // at this point we can change how the client protocol behaves based on
-  // this->peer_required_features
+  // if the peer supports msgr2.1, switch to it
+  bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+  tx_frame_asm.set_is_rev1(is_rev1);
+  rx_frame_asm.set_is_rev1(is_rev1);
 
   if (state == BANNER_CONNECTING) {
     state = HELLO_CONNECTING;
@@ -1006,92 +1070,55 @@ CtPtr ProtocolV2::read_frame() {
   }
 
   ldout(cct, 20) << __func__ << dendl;
-  return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
+  rx_preamble.clear();
+  rx_epilogue.clear();
+  rx_segments_data.clear();
+
+  return READ(rx_frame_asm.get_preamble_onwire_len(),
+              handle_read_frame_preamble_main);
 }
 
 CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
+    ldout(cct, 1) << __func__ << " read frame preamble failed r=" << r
                   << " (" << cpp_strerror(r) << ")" << dendl;
     return _fault();
   }
 
-  ceph::bufferlist preamble;
-  preamble.push_back(std::move(buffer));
+  rx_preamble.push_back(std::move(buffer));
 
   ldout(cct, 30) << __func__ << " preamble\n";
-  preamble.hexdump(*_dout);
+  rx_preamble.hexdump(*_dout);
   *_dout << dendl;
 
-  if (session_stream_handlers.rx) {
-    ceph_assert(session_stream_handlers.rx);
-
-    session_stream_handlers.rx->reset_rx_handler();
-    preamble = session_stream_handlers.rx->authenticated_decrypt_update(
-      std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
+  try {
+    next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+  } catch (FrameError& e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  } catch (ceph::crypto::onwire::MsgAuthError&) {
+    ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+    return _fault();
+  }
 
-    ldout(cct, 10) << __func__ << " got encrypted preamble."
-                   << " after decrypt premable.length()=" << preamble.length()
-                   << dendl;
+  ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
+                 << dendl;
 
+  if (session_stream_handlers.rx) {
     ldout(cct, 30) << __func__ << " preamble after decrypt\n";
-    preamble.hexdump(*_dout);
+    rx_preamble.hexdump(*_dout);
     *_dout << dendl;
   }
 
-  {
-    // I expect ceph_le32 will make the endian conversion for me. Passing
-    // everything through ::Decode is unnecessary.
-    const auto& main_preamble = \
-      reinterpret_cast<preamble_block_t&>(*preamble.c_str());
-
-    // verify preamble's CRC before any further processing
-    const auto rx_crc = ceph_crc32c(0,
-      reinterpret_cast<const unsigned char*>(&main_preamble),
-      sizeof(main_preamble) - sizeof(main_preamble.crc));
-    if (rx_crc != main_preamble.crc) {
-      ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
-                    << " rx_crc=" << rx_crc
-                    << " tx_crc=" << main_preamble.crc << dendl;
-      return _fault();
-    }
-
-    // currently we do support between 1 and MAX_NUM_SEGMENTS segments
-    if (main_preamble.num_segments < 1 ||
-        main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-      ldout(cct, 10) << __func__ << " unsupported num_segments="
-                    << " tx_crc=" << main_preamble.num_segments << dendl;
-      return _fault();
-    }
-
-    next_tag = static_cast<Tag>(main_preamble.tag);
-
-    rx_segments_desc.clear();
-    rx_segments_data.clear();
-
-    if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
-      ldout(cct, 30) << __func__
-                    << " num_segments=" << main_preamble.num_segments
-                    << " is too much" << dendl;
-      return _fault();
-    }
-    for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
-      ldout(cct, 10) << __func__ << " got new segment:"
-                    << " len=" << main_preamble.segments[idx].length
-                    << " align=" << main_preamble.segments[idx].alignment
-                    << dendl;
-      rx_segments_desc.emplace_back(main_preamble.segments[idx]);
-    }
-  }
-
   // does it need throttle?
   if (next_tag == Tag::MESSAGE) {
     if (state != READY) {
       lderr(cct) << __func__ << " not in ready state!" << dendl;
       return _fault();
     }
+    recv_stamp = ceph_clock_now();
     state = THROTTLE_MESSAGE;
     return CONTINUE(throttle_message);
   } else {
@@ -1123,6 +1150,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::KEEPALIVE2_ACK:
     case Tag::ACK:
     case Tag::WAIT:
+    case Tag::COMPRESSION_REQUEST:
+    case Tag::COMPRESSION_DONE:
       return handle_frame_payload();
     case Tag::MESSAGE:
       return handle_message();
@@ -1138,21 +1167,26 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
 }
 
 CtPtr ProtocolV2::read_frame_segment() {
-  ldout(cct, 20) << __func__ << dendl;
-  ceph_assert(!rx_segments_desc.empty());
+  size_t seg_idx = rx_segments_data.size();
+  ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
+  rx_segments_data.emplace_back();
+
+  uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+  if (onwire_len == 0) {
+    return _handle_read_frame_segment();
+  }
 
-  // description of current segment to read
-  const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
   rx_buffer_t rx_buffer;
+  uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
   try {
-    rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
-      get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
-  } catch (std::bad_alloc&) {
+    rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
+        onwire_len, align));
+  } catch (const ceph::buffer::bad_alloc&) {
     // Catching because of potential issues with satisfying alignment.
-    ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
-                  << " len=" << get_onwire_size(cur_rx_desc.length)
-                  << " align=" << cur_rx_desc.alignment
-                  << dendl;
+    ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
+                  << " len=" << onwire_len
+                  << " align=" << align
+                  << dendl;
     return _fault();
   }
 
@@ -1168,35 +1202,21 @@ CtPtr ProtocolV2::handle_read_frame_segment(rx_buffer_t &&rx_buffer, int r) {
     return _fault();
   }
 
-  rx_segments_data.emplace_back();
   rx_segments_data.back().push_back(std::move(rx_buffer));
+  return _handle_read_frame_segment();
+}
 
-  // decrypt incoming data
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  if (session_stream_handlers.rx) {
-    ceph_assert(session_stream_handlers.rx);
-
-    auto& new_seg = rx_segments_data.back();
-    if (new_seg.length()) {
-      auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
-          std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
-      const auto idx = rx_segments_data.size() - 1;
-      new_seg.clear();
-      padded.splice(0, rx_segments_desc[idx].length, &new_seg);
-
-      ldout(cct, 20) << __func__
-                     << " unpadded new_seg.length()=" << new_seg.length()
-                     << dendl;
-    }
-  }
-
-  if (rx_segments_desc.size() == rx_segments_data.size()) {
+CtPtr ProtocolV2::_handle_read_frame_segment() {
+  if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
     // OK, all segments planned to read are read. Can go with epilogue.
-    return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
-  } else {
-    // TODO: for makeshift only. This will be more generic and throttled
-    return read_frame_segment();
+    uint32_t epilogue_onwire_len = rx_frame_asm.get_epilogue_onwire_len();
+    if (epilogue_onwire_len == 0) {
+      return _handle_read_frame_epilogue_main();
+    }
+    return READ(epilogue_onwire_len, handle_read_frame_epilogue_main);
   }
+  // TODO: for makeshift only. This will be more generic and throttled
+  return read_frame_segment();
 }
 
 CtPtr ProtocolV2::handle_frame_payload() {
@@ -1246,6 +1266,10 @@ CtPtr ProtocolV2::handle_frame_payload() {
       return handle_message_ack(payload);
     case Tag::WAIT:
       return handle_wait(payload);
+    case Tag::COMPRESSION_REQUEST:
+      return handle_compression_request(payload);
+    case Tag::COMPRESSION_DONE:
+      return handle_compression_done(payload);
     default:
       ceph_abort();
   }
@@ -1291,80 +1315,44 @@ CtPtr ProtocolV2::handle_read_frame_epilogue_main(rx_buffer_t &&buffer, int r)
   ldout(cct, 20) << __func__ << " r=" << r << dendl;
 
   if (r < 0) {
-    ldout(cct, 1) << __func__ << " read data error " << dendl;
+    ldout(cct, 1) << __func__ << " read frame epilogue failed r=" << r
+                  << " (" << cpp_strerror(r) << ")" << dendl;
     return _fault();
   }
 
-  __u8 late_flags;
+  rx_epilogue.push_back(std::move(buffer));
+  return _handle_read_frame_epilogue_main();
+}
 
-  // FIXME: if (auth_meta->is_mode_secure()) {
-  if (session_stream_handlers.rx) {
-    ldout(cct, 1) << __func__ << " read frame epilogue bytes="
-                  << get_epilogue_size() << dendl;
-
-    // decrypt epilogue and authenticate entire frame.
-    ceph::bufferlist epilogue_bl;
-    {
-      epilogue_bl.push_back(std::move(buffer));
-      try {
-        epilogue_bl =
-            session_stream_handlers.rx->authenticated_decrypt_update_final(
-               std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
-      } catch (ceph::crypto::onwire::MsgAuthError &e) {
-        ldout(cct, 5) << __func__ << " message authentication failed: "
-                      << e.what() << dendl;
-        return _fault();
-      }
-    }
-    auto& epilogue =
-        reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
-    late_flags = epilogue.late_flags;
-  } else {
-    auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
-
-    for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
-      const __u32 expected_crc = epilogue.crc_values[idx];
-      const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
-      if (expected_crc != calculated_crc) {
-       ldout(cct, 5) << __func__ << " message integrity check failed: "
-                     << " expected_crc=" << expected_crc
-                     << " calculated_crc=" << calculated_crc
-                     << dendl;
-       return _fault();
-      } else {
-       ldout(cct, 20) << __func__ << " message integrity check success: "
-                      << " expected_crc=" << expected_crc
-                      << " calculated_crc=" << calculated_crc
-                      << dendl;
-      }
-    }
-    late_flags = epilogue.late_flags;
+CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
+  bool ok = false;
+  try {
+    ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
+  } catch (FrameError& e) {
+    ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+    return _fault();
+  } catch (ceph::crypto::onwire::MsgAuthError&) {
+    ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+    return _fault();
   }
 
   // we do have a mechanism that allows transmitter to start sending message
   // and abort after putting entire data field on wire. This will be used by
   // the kernel client to avoid unnecessary buffering.
-  if (late_flags & FRAME_FLAGS_LATEABRT) {
+  if (!ok) {
     reset_throttle();
     state = READY;
     return CONTINUE(read_frame);
-  } else {
-    return handle_read_frame_dispatch();
   }
+  return handle_read_frame_dispatch();
 }
 
 CtPtr ProtocolV2::handle_message() {
   ldout(cct, 20) << __func__ << dendl;
   ceph_assert(state == THROTTLE_DONE);
 
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
-  ltt_recv_stamp = ceph_clock_now();
-#endif
-  recv_stamp = ceph_clock_now();
-
-  // we need to get the size before std::moving segments data
   const size_t cur_msg_size = get_current_msg_size();
-  auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+  auto msg_frame = MessageFrame::Decode(rx_segments_data);
 
   // XXX: paranoid copy just to avoid oops
   ceph_msg_header2 current_header = msg_frame.header();
@@ -1385,15 +1373,16 @@ CtPtr ProtocolV2::handle_message() {
                          current_header.type,
                          current_header.priority,
                          current_header.version,
-                         msg_frame.front_len(),
-                         msg_frame.middle_len(),
-                         msg_frame.data_len(),
+                         ceph_le32(msg_frame.front_len()),
+                         ceph_le32(msg_frame.middle_len()),
+                         ceph_le32(msg_frame.data_len()),
                          current_header.data_off,
                          peer_name,
                          current_header.compat_version,
                          current_header.reserved,
-                         0};
-  ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
+                         ceph_le32(0)};
+  ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+                        ceph_le32(0), ceph_le64(0), current_header.flags};
 
   Message *message = decode_message(cct, 0, header, footer,
       msg_frame.front(),
@@ -1445,12 +1434,12 @@ CtPtr ProtocolV2::handle_message() {
     }
   }
 
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+#if defined(WITH_EVENTTRACE)
   if (message->get_type() == CEPH_MSG_OSD_OP ||
       message->get_type() == CEPH_MSG_OSD_OPREPLY) {
     utime_t ltt_processed_stamp = ceph_clock_now();
     double usecs_elapsed =
-        (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
+      ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
     ostringstream buf;
     if (message->get_type() == CEPH_MSG_OSD_OP)
       OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
@@ -1476,15 +1465,22 @@ CtPtr ProtocolV2::handle_message() {
 
   state = READY;
 
+  ceph::mono_time fast_dispatch_time;
+
+  if (connection->is_blackhole()) {
+    ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
+    message->put();
+    goto out;
+  }
+
   connection->logger->inc(l_msgr_recv_messages);
-  connection->logger->inc(
-      l_msgr_recv_bytes,
-      cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+  connection->logger->inc(l_msgr_recv_bytes,
+                          rx_frame_asm.get_frame_onwire_len());
 
   messenger->ms_fast_preprocess(message);
-  auto fast_dispatch_time = ceph::mono_clock::now();
+  fast_dispatch_time = ceph::mono_clock::now();
   connection->logger->tinc(l_msgr_running_recv_time,
-                           fast_dispatch_time - connection->recv_start_time);
+                          fast_dispatch_time - connection->recv_start_time);
   if (connection->delay_state) {
     double delay_period = 0;
     if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
@@ -1502,6 +1498,12 @@ CtPtr ProtocolV2::handle_message() {
     connection->logger->tinc(l_msgr_running_fast_dispatch_time,
                              connection->recv_start_time - fast_dispatch_time);
     connection->lock.lock();
+    // we might have been reused by another connection
+    // let's check if that is the case
+    if (state != READY) {
+      // yes, that was the case, let's do nothing
+      return nullptr;
+    }
   } else {
     connection->dispatch_queue->enqueue(message, message->get_priority(),
                                         connection->conn_id);
@@ -1509,13 +1511,7 @@ CtPtr ProtocolV2::handle_message() {
 
   handle_message_ack(current_header.ack_seq);
 
-  // we might have been reused by another connection
-  // let's check if that is the case
-  if (state != READY) {
-    // yes, that was the case, let's do nothing
-    return nullptr;
-  }
-
+ out:
   if (need_dispatch_writer && connection->is_connected()) {
     connection->center->dispatch_event_external(connection->write_handler);
   }
@@ -1630,7 +1626,11 @@ CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload)
   ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
 
   connection->write_lock.lock();
-  append_keepalive_ack(keepalive_frame.timestamp());
+  auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp());
+  if (!append_frame(keepalive_ack_frame)) {
+    connection->write_lock.unlock();
+    return _fault();
+  }
   connection->write_lock.unlock();
 
   ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
@@ -1699,12 +1699,12 @@ CtPtr ProtocolV2::post_client_banner_exchange() {
 }
 
 CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
+  ceph_assert(messenger->auth_client);
   ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
                 << " auth_client " << messenger->auth_client << dendl;
-  ceph_assert(messenger->auth_client);
 
-  bufferlist bl;
-  vector<uint32_t> preferred_modes;
+  ceph::bufferlist bl;
+  std::vector<uint32_t> preferred_modes;
   auto am = auth_meta;
   connection->lock.unlock();
   int r = messenger->auth_client->get_auth_request(
@@ -1827,8 +1827,9 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
     return _fault();
   }
   auth_meta->con_mode = auth_done.con_mode();
-  session_stream_handlers = \
-    ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
+  bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+  session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+      cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/false);
 
   state = AUTH_CONNECTING_SIGN;
 
@@ -1841,6 +1842,29 @@ CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
 }
 
 CtPtr ProtocolV2::finish_client_auth() {
+  if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+    return send_compression_request();
+  }
+
+  return start_session_connect();
+}
+
+CtPtr ProtocolV2::finish_server_auth() {
+  // server had sent AuthDone and client responded with correct pre-auth
+  // signature. 
+  // We can start conditioanl msgr protocol
+  if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+    state = COMPRESSION_ACCEPTING;
+  } else {
+    // No msgr protocol features to process
+    // we can start accepting new sessions/reconnects.
+    state = SESSION_ACCEPTING;
+  }
+
+  return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::start_session_connect() {
   if (!server_cookie) {
     ceph_assert(connect_seq == 0);
     state = SESSION_CONNECTING;
@@ -2063,8 +2087,8 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
                 << " features_supported=" << std::hex
                 << server_ident.supported_features()
                 << " features_required=" << server_ident.required_features()
-                << " flags=" << server_ident.flags() << " cookie=" << std::dec
-                << server_ident.cookie() << dendl;
+                << " flags=" << server_ident.flags()
+                << " cookie=" << server_ident.cookie() << std::dec << dendl;
 
   // is this who we intended to talk to?
   // be a bit forgiving here, since we may be connecting based on addresses parsed out
@@ -2100,6 +2124,40 @@ CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload)
   return ready();
 }
 
+CtPtr ProtocolV2::send_compression_request() {
+  state = COMPRESSION_CONNECTING;
+
+  const entity_type_t peer_type = connection->get_peer_type();
+  comp_meta.con_mode =
+    static_cast<Compressor::CompressionMode>(
+      messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
+  const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
+  auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
+
+  INTERCEPT(19);
+  return WRITE(comp_req_frame, "compression request", read_frame);
+}
+
+CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
+  if (state != COMPRESSION_CONNECTING) {
+    lderr(cct) << __func__ << " state changed!" << dendl;
+    return _fault();
+  }
+
+  auto response = CompressionDoneFrame::Decode(payload);
+  ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
+                << ", method=" << response.method() << ")" << dendl;
+
+  comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
+  if (comp_meta.is_compress() != response.is_compress()) {
+    comp_meta.con_mode = Compressor::COMP_NONE;
+  }
+  session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+    cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+  return start_session_connect();
+}
+
 /* Server Protocol Methods */
 
 CtPtr ProtocolV2::start_server_banner_exchange() {
@@ -2161,12 +2219,12 @@ CtPtr ProtocolV2::_auth_bad_method(int r)
   return WRITE(bad_method, "bad auth method", read_frame);
 }
 
-CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+CtPtr ProtocolV2::_handle_auth_request(ceph::bufferlist& auth_payload, bool more)
 {
   if (!messenger->auth_server) {
     return _fault();
   }
-  bufferlist reply;
+  ceph::bufferlist reply;
   auto am = auth_meta;
   connection->lock.unlock();
   int r = messenger->auth_server->handle_auth_request(
@@ -2207,8 +2265,9 @@ CtPtr ProtocolV2::finish_auth()
   ceph_assert(auth_meta);
   // TODO: having a possibility to check whether we're server or client could
   // allow reusing finish_auth().
-  session_stream_handlers = \
-    ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
+  bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+  session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+      cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/true);
 
   const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
     auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
@@ -2262,15 +2321,13 @@ CtPtr ProtocolV2::handle_auth_signature(ceph::bufferlist &payload)
   }
 
   if (state == AUTH_ACCEPTING_SIGN) {
-    // server had sent AuthDone and client responded with correct pre-auth
-    // signature. we can start accepting new sessions/reconnects.
-    state = SESSION_ACCEPTING;
-    return CONTINUE(read_frame);
+    // this happened on server side
+    return finish_server_auth();
   } else if (state == AUTH_CONNECTING_SIGN) {
     // this happened at client side
     return finish_client_auth();
   } else {
-    ceph_assert_always("state corruption" == nullptr);
+    ceph_abort("state corruption");
   }
 }
 
@@ -2335,34 +2392,41 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
 
   peer_global_seq = client_ident.global_seq();
 
-  // Looks good so far, let's check if there is already an existing connection
-  // to this peer.
-
-  connection->lock.unlock();
-  AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
-
-  if (existing &&
-      existing->protocol->proto_type != 2) {
-    ldout(cct,1) << __func__ << " existing " << existing << " proto "
-                << existing->protocol.get() << " version is "
-                << existing->protocol->proto_type << ", marking down" << dendl;
-    existing->mark_down();
-    existing = nullptr;
-  }
+  if (connection->policy.server &&
+      connection->policy.lossy &&
+      !connection->policy.register_lossy_clients) {
+    // incoming lossy client, no need to register this connection
+  } else {
+    // Looks good so far, let's check if there is already an existing connection
+    // to this peer.
+    connection->lock.unlock();
+    AsyncConnectionRef existing = messenger->lookup_conn(
+      *connection->peer_addrs);
+
+    if (existing &&
+       existing->protocol->proto_type != 2) {
+      ldout(cct,1) << __func__ << " existing " << existing << " proto "
+                  << existing->protocol.get() << " version is "
+                  << existing->protocol->proto_type << ", marking down"
+                  << dendl;
+      existing->mark_down();
+      existing = nullptr;
+    }
 
-  connection->inject_delay();
+    connection->inject_delay();
 
-  connection->lock.lock();
-  if (state != SESSION_ACCEPTING) {
-    ldout(cct, 1) << __func__
-                  << " state changed while accept, it must be mark_down"
-                  << dendl;
-    ceph_assert(state == CLOSED);
-    return _fault();
-  }
+    connection->lock.lock();
+    if (state != SESSION_ACCEPTING) {
+      ldout(cct, 1) << __func__
+                   << " state changed while accept, it must be mark_down"
+                   << dendl;
+      ceph_assert(state == CLOSED);
+      return _fault();
+    }
 
-  if (existing) {
-    return handle_existing_connection(existing);
+    if (existing) {
+      return handle_existing_connection(existing);
+    }
   }
 
   // if everything is OK reply with server identification
@@ -2529,7 +2593,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
   return reuse_connection(existing, exproto);
 }
 
-CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
   ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
 
   std::lock_guard<std::mutex> l(existing->lock);
@@ -2630,7 +2694,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
   }
 }
 
-CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
                                    ProtocolV2 *exproto) {
   ldout(cct, 20) << __func__ << " existing=" << existing
                  << " reconnect=" << reconnecting << dendl;
@@ -2650,6 +2714,10 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   exproto->pre_auth.enabled = false;
 
   if (!reconnecting) {
+    exproto->peer_supported_features = peer_supported_features;
+    exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
+    exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
+
     exproto->client_cookie = client_cookie;
     exproto->peer_name = peer_name;
     exproto->connection_features = connection_features;
@@ -2657,16 +2725,21 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   }
   exproto->peer_global_seq = peer_global_seq;
 
+  ceph_assert(connection->center->in_thread());
   auto temp_cs = std::move(connection->cs);
   EventCenter *new_center = connection->center;
   Worker *new_worker = connection->worker;
+  // we can steal the session_stream_handlers under the assumption
+  // this happens in the event center's thread as there should be
+  // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+  auto temp_stream_handlers = std::move(session_stream_handlers);
+  auto temp_compression_handlers = std::move(session_compression_handlers);
+  exproto->auth_meta = auth_meta;
+  exproto->comp_meta = comp_meta;
 
   ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
                            << dendl;
 
-  std::swap(exproto->session_stream_handlers, session_stream_handlers);
-  exproto->auth_meta = auth_meta;
-
   // avoid _stop shutdown replacing socket
   // queue a reset on the new connection, which we're dumping for the old
   stop();
@@ -2687,14 +2760,27 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
   ceph_assert(connection->recv_start == connection->recv_end);
 
   auto deactivate_existing = std::bind(
-      [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+      [ existing,
+        new_worker,
+        new_center,
+        exproto,
+        temp_stream_handlers=std::move(temp_stream_handlers),
+        temp_compression_handlers=std::move(temp_compression_handlers)
+      ](ConnectedSocket &cs) mutable {
         // we need to delete time event in original thread
         {
           std::lock_guard<std::mutex> l(existing->lock);
           existing->write_lock.lock();
           exproto->requeue_sent();
-          existing->outcoming_bl.clear();
+          // XXX: do we really need the locking for `outgoing_bl`? There is
+          // a comment just above its definition saying "lockfree, only used
+          // in own thread". I'm following lockfull schema just in the case.
+          // From performance point of view it should be fine – this happens
+          // far away from hot paths.
+          existing->outgoing_bl.clear();
           existing->open_write = false;
+          exproto->session_stream_handlers = std::move(temp_stream_handlers);
+          exproto->session_compression_handlers = std::move(temp_compression_handlers);
           existing->write_lock.unlock();
           if (exproto->state == NONE) {
             existing->shutdown_socket();
@@ -2788,8 +2874,8 @@ CtPtr ProtocolV2::send_server_ident() {
                 << connection->policy.features_supported
                 << " features_required="
                            << (connection->policy.features_required | msgr2_required)
-                << " flags=" << flags << " cookie=" << std::dec << server_cookie
-                << dendl;
+                << " flags=" << flags
+                << " cookie=" << server_cookie << std::dec << dendl;
 
   connection->lock.unlock();
   // Because "replacing" will prevent other connections preempt this addr,
@@ -2882,3 +2968,43 @@ CtPtr ProtocolV2::send_reconnect_ok() {
 
   return WRITE(reconnect_ok, "reconnect ok", server_ready);
 }
+
+
+CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
+  if (state != COMPRESSION_ACCEPTING) {
+    lderr(cct) << __func__ << " state changed!" << dendl;
+    return _fault();
+  }
+
+  auto request = CompressionRequestFrame::Decode(payload);
+  ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
+                << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
+
+  const int peer_type = connection->get_peer_type();
+  if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
+        peer_type, auth_meta->is_mode_secure());
+      mode != Compressor::COMP_NONE && request.is_compress()) {
+    comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
+    if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
+      comp_meta.con_mode = mode;
+    }
+  } else {
+    comp_meta.con_method = Compressor::COMP_ALG_NONE;
+  }
+  
+  auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
+
+  INTERCEPT(20);
+  return WRITE(response, "compression done", finish_compression);
+}
+
+CtPtr ProtocolV2::finish_compression() {
+  // TODO: having a possibility to check whether we're server or client could
+  // allow reusing finish_compression().
+  
+  session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+    cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+  state = SESSION_ACCEPTING;
+  return CONTINUE(read_frame);
+}