]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/net/ProtocolV2.cc
import 15.2.5
[ceph.git] / ceph / src / crimson / net / ProtocolV2.cc
index 2535584bb6efbb4a5c0d255c2c5614617099fa61..d68dba174ce593964405cba3edecc9d275fba838 100644 (file)
@@ -178,7 +178,8 @@ ProtocolV2::ProtocolV2(Dispatcher& dispatcher,
                        SocketMessenger& messenger)
   : Protocol(proto_t::v2, dispatcher, conn),
     messenger{messenger},
-    protocol_timer{conn}
+    protocol_timer{conn},
+    tx_frame_asm(&session_stream_handlers, false)
 {}
 
 ProtocolV2::~ProtocolV2() {}
@@ -283,7 +284,7 @@ size_t ProtocolV2::get_current_msg_size() const
 
 seastar::future<Tag> ProtocolV2::read_main_preamble()
 {
-  return read_exactly(FRAME_PREAMBLE_SIZE)
+  return read_exactly(sizeof(preamble_block_t))
     .then([this] (auto bl) {
       if (session_stream_handlers.rx) {
         session_stream_handlers.rx->reset_rx_handler();
@@ -371,7 +372,7 @@ seastar::future<> ProtocolV2::read_frame_payload()
   ).then([this] {
     // TODO: get_epilogue_size()
     ceph_assert(!session_stream_handlers.rx);
-    return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
+    return read_exactly(sizeof(epilogue_crc_rev0_block_t));
   }).then([this] (auto bl) {
     logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
 
@@ -380,7 +381,7 @@ seastar::future<> ProtocolV2::read_frame_payload()
       // TODO
       ceph_assert(false);
     } else {
-      auto& epilogue = *reinterpret_cast<const epilogue_plain_block_t*>(bl.get());
+      auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
       for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
         const __u32 expected_crc = epilogue.crc_values[idx];
         const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
@@ -402,7 +403,7 @@ seastar::future<> ProtocolV2::read_frame_payload()
     // we do have a mechanism that allows transmitter to start sending message
     // and abort after putting entire data field on wire. This will be used by
     // the kernel client to avoid unnecessary buffering.
-    if (late_flags & FRAME_FLAGS_LATEABRT) {
+    if (late_flags & FRAME_LATE_FLAG_ABORTED) {
       // TODO
       ceph_assert(false);
     }
@@ -412,7 +413,7 @@ seastar::future<> ProtocolV2::read_frame_payload()
 template <class F>
 seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
 {
-  auto bl = frame.get_buffer(session_stream_handlers);
+  auto bl = frame.get_buffer(tx_frame_asm);
   const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
   logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
                  conn, bl.length(), (int)main_preamble->tag,
@@ -1852,19 +1853,19 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
 
   if (unlikely(require_keepalive)) {
     auto keepalive_frame = KeepAliveFrame::Encode();
-    bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+    bl.append(keepalive_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
   }
 
   if (unlikely(_keepalive_ack.has_value())) {
     auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
-    bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+    bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
   }
 
   if (require_ack && !num_msgs) {
     auto ack_frame = AckFrame::Encode(conn.in_seq);
-    bl.append(ack_frame.get_buffer(session_stream_handlers));
+    bl.append(ack_frame.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
 
@@ -1893,7 +1894,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
         msg->get_payload(), msg->get_middle(), msg->get_data());
     logger().debug("{} --> #{} === {} ({})",
                   conn, msg->get_seq(), *msg, msg->get_type());
-    bl.append(message.get_buffer(session_stream_handlers));
+    bl.append(message.get_buffer(tx_frame_asm));
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
   });
 
@@ -1908,7 +1909,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
 
     // we need to get the size before std::moving segments data
     const size_t cur_msg_size = get_current_msg_size();
-    auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+    auto msg_frame = MessageFrame::Decode(rx_segments_data);
     // XXX: paranoid copy just to avoid oops
     ceph_msg_header2 current_header = msg_frame.header();