]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/frames_v2.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / frames_v2.cc
index 7a0b5907b248944a59ffe60a99020de1baa5d874..e0c41fdb64c173b1de4a0832c0bf43211a50e87c 100644 (file)
@@ -72,6 +72,8 @@ void FrameAssembler::fill_preamble(Tag tag,
     preamble.segments[i].alignment = m_descs[i].align;
   }
   preamble.num_segments = m_descs.size();
+  preamble.flags = m_flags;  
+
   preamble.crc = ceph_crc32c(
       0, reinterpret_cast<const unsigned char*>(&preamble),
       sizeof(preamble) - sizeof(preamble.crc));
@@ -106,7 +108,7 @@ bufferlist FrameAssembler::asm_crc_rev0(const preamble_block_t& preamble,
   frame_bl.append(reinterpret_cast<const char*>(&preamble), sizeof(preamble));
   for (size_t i = 0; i < m_descs.size(); i++) {
     ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
-    epilogue.crc_values[i] = segment_bls[i].crc32c(-1);
+    epilogue.crc_values[i] = m_with_data_crc ? segment_bls[i].crc32c(-1) : 0;
     if (segment_bls[i].length() > 0) {
       frame_bl.claim_append(segment_bls[i]);
     }
@@ -159,7 +161,7 @@ bufferlist FrameAssembler::asm_crc_rev1(const preamble_block_t& preamble,
 
   ceph_assert(segment_bls[0].length() == m_descs[0].logical_len);
   if (segment_bls[0].length() > 0) {
-    uint32_t crc = segment_bls[0].crc32c(-1);
+    uint32_t crc = m_with_data_crc ? segment_bls[0].crc32c(-1) : 0;
     frame_bl.claim_append(segment_bls[0]);
     encode(crc, frame_bl);
   }
@@ -169,7 +171,8 @@ bufferlist FrameAssembler::asm_crc_rev1(const preamble_block_t& preamble,
 
   for (size_t i = 1; i < m_descs.size(); i++) {
     ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
-    epilogue.crc_values[i - 1] = segment_bls[i].crc32c(-1);
+    epilogue.crc_values[i - 1] =
+            m_with_data_crc ? segment_bls[i].crc32c(-1) : 0;
     if (segment_bls[i].length() > 0) {
       frame_bl.claim_append(segment_bls[i]);
     }
@@ -239,13 +242,18 @@ bufferlist FrameAssembler::asm_secure_rev1(const preamble_block_t& preamble,
 
 bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
                                           const uint16_t segment_aligns[],
-                                          size_t segment_count) {
+                                          size_t segment_count) {  
+  m_flags = 0;
   m_descs.resize(calc_num_segments(segment_bls, segment_count));
   for (size_t i = 0; i < m_descs.size(); i++) {
     m_descs[i].logical_len = segment_bls[i].length();
     m_descs[i].align = segment_aligns[i];
   }
 
+  if (m_compression->tx) {   
+    asm_compress(segment_bls);
+  }
+
   preamble_block_t preamble;
   fill_preamble(tag, preamble);
 
@@ -317,6 +325,12 @@ Tag FrameAssembler::disassemble_preamble(bufferlist& preamble_bl) {
     m_descs[i].logical_len = preamble->segments[i].length;
     m_descs[i].align = preamble->segments[i].alignment;
   }
+
+  m_flags = preamble->flags;
+  // If frame has been compressed, 
+  // we need to make sure the compression handler has been setup
+  ceph_assert_always(!is_compressed() || m_compression->rx);
+
   return static_cast<Tag>(preamble->tag);
 }
 
@@ -328,7 +342,9 @@ bool FrameAssembler::disasm_all_crc_rev0(bufferlist segment_bls[],
 
   for (size_t i = 0; i < m_descs.size(); i++) {
     ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
-    check_segment_crc(segment_bls[i], epilogue->crc_values[i]);
+    if (m_with_data_crc) {
+      check_segment_crc(segment_bls[i], epilogue->crc_values[i]);
+    }
   }
   return !(epilogue->late_flags & FRAME_LATE_FLAG_ABORTED);
 }
@@ -361,7 +377,9 @@ void FrameAssembler::disasm_first_crc_rev1(bufferlist& preamble_bl,
     uint32_t expected_crc;
     decode(expected_crc, it);
     segment_bl.splice(m_descs[0].logical_len, FRAME_CRC_SIZE);
-    check_segment_crc(segment_bl, expected_crc);
+    if (m_with_data_crc) {
+      check_segment_crc(segment_bl, expected_crc);
+    }
   } else {
     ceph_assert(segment_bl.length() == 0);
   }
@@ -375,7 +393,9 @@ bool FrameAssembler::disasm_remaining_crc_rev1(bufferlist segment_bls[],
 
   for (size_t i = 1; i < m_descs.size(); i++) {
     ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
-    check_segment_crc(segment_bls[i], epilogue->crc_values[i - 1]);
+    if (m_with_data_crc) {
+      check_segment_crc(segment_bls[i], epilogue->crc_values[i - 1]);
+    }
   }
   return check_epilogue_late_status(epilogue->late_status);
 }
@@ -423,6 +443,19 @@ bool FrameAssembler::disasm_remaining_secure_rev1(
   return check_epilogue_late_status(epilogue->late_status);
 }
 
+bool FrameAssembler::disassemble_segments(bufferlist& preamble_bl, 
+  bufferlist segments_bls[], bufferlist& epilogue_bl) const {
+  disassemble_first_segment(preamble_bl, segments_bls[0]);
+  if (disassemble_remaining_segments(segments_bls, epilogue_bl)) {
+    if (is_compressed()) {
+      disassemble_decompress(segments_bls);
+    }
+    return true;
+  }
+
+  return false;
+}
+
 void FrameAssembler::disassemble_first_segment(bufferlist& preamble_bl,
                                                bufferlist& segment_bl) const {
   ceph_assert(!m_descs.empty());
@@ -445,15 +478,15 @@ bool FrameAssembler::disassemble_remaining_segments(
       // no epilogue if only one segment
       ceph_assert(epilogue_bl.length() == 0);
       return true;
-    }
-    if (m_crypto->rx) {
+    } else if (m_crypto->rx) {
       return disasm_remaining_secure_rev1(segment_bls, epilogue_bl);
+    } else {
+      return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
     }
-    return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
-  }
-  if (m_crypto->rx) {
+  } else if (m_crypto->rx) {
     return disasm_all_secure_rev0(segment_bls, epilogue_bl);
-  }
+  } 
+  
   return disasm_all_crc_rev0(segment_bls, epilogue_bl);
 }
 
@@ -469,8 +502,49 @@ std::ostream& operator<<(std::ostream& os, const FrameAssembler& frame_asm) {
   }
   os << "rev1=" << frame_asm.m_is_rev1
      << " rx=" << frame_asm.m_crypto->rx.get()
-     << " tx=" << frame_asm.m_crypto->tx.get();
+     << " tx=" << frame_asm.m_crypto->tx.get()
+     << " comp rx=" << frame_asm.m_compression->rx.get()
+     << " comp tx=" << frame_asm.m_compression->tx.get()
+     << " compressed=" << frame_asm.is_compressed();
   return os;
 }
 
+void FrameAssembler::asm_compress(bufferlist segment_bls[]) {
+  std::array<bufferlist, MAX_NUM_SEGMENTS> compressed;
+
+  m_compression->tx->reset_handler(m_descs.size(), get_frame_logical_len());
+
+  bool abort = false;
+  for (size_t i = 0; (i < m_descs.size()) && !abort; i++) {
+      auto out = m_compression->tx->compress(segment_bls[i]);
+      if (!out) {
+        abort = true;
+      } else {
+        compressed[i] = std::move(*out);
+      }
+  }
+
+  if (!abort) {
+    m_compression->tx->done();
+
+    for (size_t i = 0; i < m_descs.size(); i++) {
+      segment_bls[i].swap(compressed[i]);
+      m_descs[i].logical_len = segment_bls[i].length();
+    }
+
+    m_flags |= FRAME_EARLY_DATA_COMPRESSED;
+  }
+}
+
+void FrameAssembler::disassemble_decompress(bufferlist segment_bls[]) const {
+  for (size_t i = 0; i < m_descs.size(); i++) {
+    auto out = m_compression->rx->decompress(segment_bls[i]);
+    if (!out) {
+      throw FrameError("Segment decompression failed");
+    } else {
+      segment_bls[i] = std::move(*out);
+    }
+  }
+}
+
 }  // namespace ceph::msgr::v2