preamble.segments[i].alignment = m_descs[i].align;
}
preamble.num_segments = m_descs.size();
+ preamble.flags = m_flags;
+
preamble.crc = ceph_crc32c(
0, reinterpret_cast<const unsigned char*>(&preamble),
sizeof(preamble) - sizeof(preamble.crc));
frame_bl.append(reinterpret_cast<const char*>(&preamble), sizeof(preamble));
for (size_t i = 0; i < m_descs.size(); i++) {
ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
- epilogue.crc_values[i] = segment_bls[i].crc32c(-1);
+ epilogue.crc_values[i] = m_with_data_crc ? segment_bls[i].crc32c(-1) : 0;
if (segment_bls[i].length() > 0) {
frame_bl.claim_append(segment_bls[i]);
}
ceph_assert(segment_bls[0].length() == m_descs[0].logical_len);
if (segment_bls[0].length() > 0) {
- uint32_t crc = segment_bls[0].crc32c(-1);
+ uint32_t crc = m_with_data_crc ? segment_bls[0].crc32c(-1) : 0;
frame_bl.claim_append(segment_bls[0]);
encode(crc, frame_bl);
}
for (size_t i = 1; i < m_descs.size(); i++) {
ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
- epilogue.crc_values[i - 1] = segment_bls[i].crc32c(-1);
+ epilogue.crc_values[i - 1] =
+ m_with_data_crc ? segment_bls[i].crc32c(-1) : 0;
if (segment_bls[i].length() > 0) {
frame_bl.claim_append(segment_bls[i]);
}
bufferlist FrameAssembler::assemble_frame(Tag tag, bufferlist segment_bls[],
const uint16_t segment_aligns[],
- size_t segment_count) {
+ size_t segment_count) {
+ m_flags = 0;
m_descs.resize(calc_num_segments(segment_bls, segment_count));
for (size_t i = 0; i < m_descs.size(); i++) {
m_descs[i].logical_len = segment_bls[i].length();
m_descs[i].align = segment_aligns[i];
}
+ if (m_compression->tx) {
+ asm_compress(segment_bls);
+ }
+
preamble_block_t preamble;
fill_preamble(tag, preamble);
m_descs[i].logical_len = preamble->segments[i].length;
m_descs[i].align = preamble->segments[i].alignment;
}
+
+ m_flags = preamble->flags;
+ // If frame has been compressed,
+ // we need to make sure the compression handler has been setup
+ ceph_assert_always(!is_compressed() || m_compression->rx);
+
return static_cast<Tag>(preamble->tag);
}
for (size_t i = 0; i < m_descs.size(); i++) {
ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
- check_segment_crc(segment_bls[i], epilogue->crc_values[i]);
+ if (m_with_data_crc) {
+ check_segment_crc(segment_bls[i], epilogue->crc_values[i]);
+ }
}
return !(epilogue->late_flags & FRAME_LATE_FLAG_ABORTED);
}
uint32_t expected_crc;
decode(expected_crc, it);
segment_bl.splice(m_descs[0].logical_len, FRAME_CRC_SIZE);
- check_segment_crc(segment_bl, expected_crc);
+ if (m_with_data_crc) {
+ check_segment_crc(segment_bl, expected_crc);
+ }
} else {
ceph_assert(segment_bl.length() == 0);
}
for (size_t i = 1; i < m_descs.size(); i++) {
ceph_assert(segment_bls[i].length() == m_descs[i].logical_len);
- check_segment_crc(segment_bls[i], epilogue->crc_values[i - 1]);
+ if (m_with_data_crc) {
+ check_segment_crc(segment_bls[i], epilogue->crc_values[i - 1]);
+ }
}
return check_epilogue_late_status(epilogue->late_status);
}
return check_epilogue_late_status(epilogue->late_status);
}
+bool FrameAssembler::disassemble_segments(bufferlist& preamble_bl,
+ bufferlist segments_bls[], bufferlist& epilogue_bl) const {
+ disassemble_first_segment(preamble_bl, segments_bls[0]);
+ if (disassemble_remaining_segments(segments_bls, epilogue_bl)) {
+ if (is_compressed()) {
+ disassemble_decompress(segments_bls);
+ }
+ return true;
+ }
+
+ return false;
+}
+
void FrameAssembler::disassemble_first_segment(bufferlist& preamble_bl,
bufferlist& segment_bl) const {
ceph_assert(!m_descs.empty());
// no epilogue if only one segment
ceph_assert(epilogue_bl.length() == 0);
return true;
- }
- if (m_crypto->rx) {
+ } else if (m_crypto->rx) {
return disasm_remaining_secure_rev1(segment_bls, epilogue_bl);
+ } else {
+ return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
}
- return disasm_remaining_crc_rev1(segment_bls, epilogue_bl);
- }
- if (m_crypto->rx) {
+ } else if (m_crypto->rx) {
return disasm_all_secure_rev0(segment_bls, epilogue_bl);
- }
+ }
+
return disasm_all_crc_rev0(segment_bls, epilogue_bl);
}
}
os << "rev1=" << frame_asm.m_is_rev1
<< " rx=" << frame_asm.m_crypto->rx.get()
- << " tx=" << frame_asm.m_crypto->tx.get();
+ << " tx=" << frame_asm.m_crypto->tx.get()
+ << " comp rx=" << frame_asm.m_compression->rx.get()
+ << " comp tx=" << frame_asm.m_compression->tx.get()
+ << " compressed=" << frame_asm.is_compressed();
return os;
}
+void FrameAssembler::asm_compress(bufferlist segment_bls[]) {
+ std::array<bufferlist, MAX_NUM_SEGMENTS> compressed;
+
+ m_compression->tx->reset_handler(m_descs.size(), get_frame_logical_len());
+
+ bool abort = false;
+ for (size_t i = 0; (i < m_descs.size()) && !abort; i++) {
+ auto out = m_compression->tx->compress(segment_bls[i]);
+ if (!out) {
+ abort = true;
+ } else {
+ compressed[i] = std::move(*out);
+ }
+ }
+
+ if (!abort) {
+ m_compression->tx->done();
+
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ segment_bls[i].swap(compressed[i]);
+ m_descs[i].logical_len = segment_bls[i].length();
+ }
+
+ m_flags |= FRAME_EARLY_DATA_COMPRESSED;
+ }
+}
+
+void FrameAssembler::disassemble_decompress(bufferlist segment_bls[]) const {
+ for (size_t i = 0; i < m_descs.size(); i++) {
+ auto out = m_compression->rx->decompress(segment_bls[i]);
+ if (!out) {
+ throw FrameError("Segment decompression failed");
+ } else {
+ segment_bls[i] = std::move(*out);
+ }
+ }
+}
+
} // namespace ceph::msgr::v2