SocketMessenger& messenger)
: Protocol(proto_t::v2, dispatcher, conn),
messenger{messenger},
- protocol_timer{conn}
+ protocol_timer{conn},
+ tx_frame_asm(&session_stream_handlers, false)
{}
ProtocolV2::~ProtocolV2() {}
seastar::future<Tag> ProtocolV2::read_main_preamble()
{
- return read_exactly(FRAME_PREAMBLE_SIZE)
+ return read_exactly(sizeof(preamble_block_t))
.then([this] (auto bl) {
if (session_stream_handlers.rx) {
session_stream_handlers.rx->reset_rx_handler();
).then([this] {
// TODO: get_epilogue_size()
ceph_assert(!session_stream_handlers.rx);
- return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
+ return read_exactly(sizeof(epilogue_crc_rev0_block_t));
}).then([this] (auto bl) {
logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
// TODO
ceph_assert(false);
} else {
- auto& epilogue = *reinterpret_cast<const epilogue_plain_block_t*>(bl.get());
+ auto& epilogue = *reinterpret_cast<const epilogue_crc_rev0_block_t*>(bl.get());
for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
const __u32 expected_crc = epilogue.crc_values[idx];
const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (late_flags & FRAME_FLAGS_LATEABRT) {
+ if (late_flags & FRAME_LATE_FLAG_ABORTED) {
// TODO
ceph_assert(false);
}
template <class F>
seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
{
- auto bl = frame.get_buffer(session_stream_handlers);
+ auto bl = frame.get_buffer(tx_frame_asm);
const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
conn, bl.length(), (int)main_preamble->tag,
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+ bl.append(keepalive_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2, bp_type_t::WRITE);
}
if (unlikely(_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*_keepalive_ack);
- bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+ bl.append(keepalive_ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
}
if (require_ack && !num_msgs) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
- bl.append(ack_frame.get_buffer(session_stream_handlers));
+ bl.append(ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
msg->get_payload(), msg->get_middle(), msg->get_data());
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(message.get_buffer(session_stream_handlers));
+ bl.append(message.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::MESSAGE, bp_type_t::WRITE);
});
// we need to get the size before std::moving segments data
const size_t cur_msg_size = get_current_msg_size();
- auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+ auto msg_frame = MessageFrame::Decode(rx_segments_data);
// XXX: paranoid copy just to avoid oops
ceph_msg_header2 current_header = msg_frame.header();