<< " :" << connection->port
<< " s=" << get_state_name(state) << " pgs=" << peer_global_seq
<< " cs=" << connect_seq << " l=" << connection->policy.lossy
+ << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
+ REVISION_1)
<< " rx=" << session_stream_handlers.rx.get()
<< " tx=" << session_stream_handlers.tx.get()
<< ").";
ProtocolV2::ProtocolV2(AsyncConnection *connection)
: Protocol(2, connection),
state(NONE),
- peer_required_features(0),
+ peer_supported_features(0),
client_cookie(0),
server_cookie(0),
global_seq(0),
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
+ tx_frame_asm(&session_stream_handlers, false),
+ rx_frame_asm(&session_stream_handlers, false),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
}
}
size_t ProtocolV2::get_current_msg_size() const {
- ceph_assert(!rx_segments_desc.empty());
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
size_t sum = 0;
// we don't include SegmentIndex::Msg::HEADER.
- for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
- sum += rx_segments_desc[idx].length;
+ for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
+ sum += rx_frame_asm.get_segment_logical_len(i);
}
return sum;
}
bool ProtocolV2::append_frame(F& frame) {
ceph::bufferlist bl;
try {
- bl = frame.get_buffer(session_stream_handlers);
+ bl = frame.get_buffer(tx_frame_asm);
} catch (ceph::crypto::onwire::TxHandlerError &e) {
ldout(cct, 1) << __func__ << " " << e.what() << dendl;
return false;
}
+
+ ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+ << " bytes " << tx_frame_asm << dendl;
connection->outgoing_bl.append(bl);
return true;
}
return !out_queue.empty() || connection->is_queued();
}
-uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
- if (session_stream_handlers.rx) {
- return segment_onwire_size(logical_size);
- } else {
- return logical_size;
- }
-}
-
-uint32_t ProtocolV2::get_epilogue_size() const {
- // In secure mode size of epilogue is flexible and depends on particular
- // cipher implementation. See the comment for epilogue_secure_block_t or
- // epilogue_plain_block_t.
- if (session_stream_handlers.rx) {
- return FRAME_SECURE_EPILOGUE_SIZE + \
- session_stream_handlers.rx->get_extra_size_at_final();
- } else {
- return FRAME_PLAIN_EPILOGUE_SIZE;
- }
-}
-
CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
rx_buffer_t &&buffer) {
const auto len = buffer->length();
F &frame) {
ceph::bufferlist bl;
try {
- bl = frame.get_buffer(session_stream_handlers);
+ bl = frame.get_buffer(tx_frame_asm);
} catch (ceph::crypto::onwire::TxHandlerError &e) {
ldout(cct, 1) << __func__ << " " << e.what() << dendl;
return _fault();
}
+
+ ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+ << " bytes " << tx_frame_asm << dendl;
return write(desc, next, bl);
}
return nullptr;
}
- this->peer_required_features = peer_required_features;
- if (this->peer_required_features == 0) {
+ this->peer_supported_features = peer_supported_features;
+ if (peer_required_features == 0) {
this->connection_features = msgr2_required;
}
- // at this point we can change how the client protocol behaves based on
- // this->peer_required_features
+ // if the peer supports msgr2.1, switch to it
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ tx_frame_asm.set_is_rev1(is_rev1);
+ rx_frame_asm.set_is_rev1(is_rev1);
if (state == BANNER_CONNECTING) {
state = HELLO_CONNECTING;
}
ldout(cct, 20) << __func__ << dendl;
- return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
+ rx_preamble.clear();
+ rx_epilogue.clear();
+ rx_segments_data.clear();
+
+ return READ(rx_frame_asm.get_preamble_onwire_len(),
+ handle_read_frame_preamble_main);
}
CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
+ ldout(cct, 1) << __func__ << " read frame preamble failed r=" << r
<< " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
- ceph::bufferlist preamble;
- preamble.push_back(std::move(buffer));
+ rx_preamble.push_back(std::move(buffer));
ldout(cct, 30) << __func__ << " preamble\n";
- preamble.hexdump(*_dout);
+ rx_preamble.hexdump(*_dout);
*_dout << dendl;
- if (session_stream_handlers.rx) {
- ceph_assert(session_stream_handlers.rx);
-
- session_stream_handlers.rx->reset_rx_handler();
- preamble = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
+ try {
+ next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ } catch (FrameError& e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return _fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+ return _fault();
+ }
- ldout(cct, 10) << __func__ << " got encrypted preamble."
- << " after decrypt premable.length()=" << preamble.length()
- << dendl;
+ ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
+ << dendl;
+ if (session_stream_handlers.rx) {
ldout(cct, 30) << __func__ << " preamble after decrypt\n";
- preamble.hexdump(*_dout);
+ rx_preamble.hexdump(*_dout);
*_dout << dendl;
}
- {
- // I expect ceph_le32 will make the endian conversion for me. Passing
- // everything through ::Decode is unnecessary.
- const auto& main_preamble = \
- reinterpret_cast<preamble_block_t&>(*preamble.c_str());
-
- // verify preamble's CRC before any further processing
- const auto rx_crc = ceph_crc32c(0,
- reinterpret_cast<const unsigned char*>(&main_preamble),
- sizeof(main_preamble) - sizeof(main_preamble.crc));
- if (rx_crc != main_preamble.crc) {
- ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
- << " rx_crc=" << rx_crc
- << " tx_crc=" << main_preamble.crc << dendl;
- return _fault();
- }
-
- // currently we do support between 1 and MAX_NUM_SEGMENTS segments
- if (main_preamble.num_segments < 1 ||
- main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- ldout(cct, 10) << __func__ << " unsupported num_segments="
- << " tx_crc=" << main_preamble.num_segments << dendl;
- return _fault();
- }
-
- next_tag = static_cast<Tag>(main_preamble.tag);
-
- rx_segments_desc.clear();
- rx_segments_data.clear();
-
- if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- ldout(cct, 30) << __func__
- << " num_segments=" << main_preamble.num_segments
- << " is too much" << dendl;
- return _fault();
- }
- for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
- ldout(cct, 10) << __func__ << " got new segment:"
- << " len=" << main_preamble.segments[idx].length
- << " align=" << main_preamble.segments[idx].alignment
- << dendl;
- rx_segments_desc.emplace_back(main_preamble.segments[idx]);
- }
- }
-
// does it need throttle?
if (next_tag == Tag::MESSAGE) {
if (state != READY) {
}
CtPtr ProtocolV2::read_frame_segment() {
- ldout(cct, 20) << __func__ << dendl;
- ceph_assert(!rx_segments_desc.empty());
+ size_t seg_idx = rx_segments_data.size();
+ ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
+ rx_segments_data.emplace_back();
+
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+ if (onwire_len == 0) {
+ return _handle_read_frame_segment();
+ }
- // description of current segment to read
- const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
rx_buffer_t rx_buffer;
+ uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
try {
rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
- get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
+ onwire_len, align));
} catch (std::bad_alloc&) {
// Catching because of potential issues with satisfying alignment.
- ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
- << " len=" << get_onwire_size(cur_rx_desc.length)
- << " align=" << cur_rx_desc.alignment
- << dendl;
+ ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
+ << " len=" << onwire_len
+ << " align=" << align
+ << dendl;
return _fault();
}
return _fault();
}
- rx_segments_data.emplace_back();
rx_segments_data.back().push_back(std::move(rx_buffer));
+ return _handle_read_frame_segment();
+}
- // decrypt incoming data
- // FIXME: if (auth_meta->is_mode_secure()) {
- if (session_stream_handlers.rx) {
- ceph_assert(session_stream_handlers.rx);
-
- auto& new_seg = rx_segments_data.back();
- if (new_seg.length()) {
- auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
- const auto idx = rx_segments_data.size() - 1;
- new_seg.clear();
- padded.splice(0, rx_segments_desc[idx].length, &new_seg);
-
- ldout(cct, 20) << __func__
- << " unpadded new_seg.length()=" << new_seg.length()
- << dendl;
- }
- }
-
- if (rx_segments_desc.size() == rx_segments_data.size()) {
+CtPtr ProtocolV2::_handle_read_frame_segment() {
+ if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
// OK, all segments planned to read are read. Can go with epilogue.
- return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
- } else {
- // TODO: for makeshift only. This will be more generic and throttled
- return read_frame_segment();
+ uint32_t epilogue_onwire_len = rx_frame_asm.get_epilogue_onwire_len();
+ if (epilogue_onwire_len == 0) {
+ return _handle_read_frame_epilogue_main();
+ }
+ return READ(epilogue_onwire_len, handle_read_frame_epilogue_main);
}
+ // TODO: for makeshift only. This will be more generic and throttled
+ return read_frame_segment();
}
CtPtr ProtocolV2::handle_frame_payload() {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " read data error " << dendl;
+ ldout(cct, 1) << __func__ << " read frame epilogue failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
- __u8 late_flags;
+ rx_epilogue.push_back(std::move(buffer));
+ return _handle_read_frame_epilogue_main();
+}
- // FIXME: if (auth_meta->is_mode_secure()) {
- if (session_stream_handlers.rx) {
- ldout(cct, 1) << __func__ << " read frame epilogue bytes="
- << get_epilogue_size() << dendl;
-
- // decrypt epilogue and authenticate entire frame.
- ceph::bufferlist epilogue_bl;
- {
- epilogue_bl.push_back(std::move(buffer));
- try {
- epilogue_bl =
- session_stream_handlers.rx->authenticated_decrypt_update_final(
- std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
- } catch (ceph::crypto::onwire::MsgAuthError &e) {
- ldout(cct, 5) << __func__ << " message authentication failed: "
- << e.what() << dendl;
- return _fault();
- }
- }
- auto& epilogue =
- reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
- late_flags = epilogue.late_flags;
- } else {
- auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
-
- for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
- const __u32 expected_crc = epilogue.crc_values[idx];
- const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
- if (expected_crc != calculated_crc) {
- ldout(cct, 5) << __func__ << " message integrity check failed: "
- << " expected_crc=" << expected_crc
- << " calculated_crc=" << calculated_crc
- << dendl;
- return _fault();
- } else {
- ldout(cct, 20) << __func__ << " message integrity check success: "
- << " expected_crc=" << expected_crc
- << " calculated_crc=" << calculated_crc
- << dendl;
- }
- }
- late_flags = epilogue.late_flags;
+CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
+ bool aborted;
+ try {
+ rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
+ aborted = !rx_frame_asm.disassemble_remaining_segments(
+ rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return _fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+ return _fault();
}
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (late_flags & FRAME_FLAGS_LATEABRT) {
+ if (aborted) {
reset_throttle();
state = READY;
return CONTINUE(read_frame);
- } else {
- return handle_read_frame_dispatch();
}
+ return handle_read_frame_dispatch();
}
CtPtr ProtocolV2::handle_message() {
#endif
recv_stamp = ceph_clock_now();
- // we need to get the size before std::moving segments data
const size_t cur_msg_size = get_current_msg_size();
- auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+ auto msg_frame = MessageFrame::Decode(rx_segments_data);
// XXX: paranoid copy just to avoid oops
ceph_msg_header2 current_header = msg_frame.header();
}
connection->logger->inc(l_msgr_recv_messages);
- connection->logger->inc(
- l_msgr_recv_bytes,
- cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+ connection->logger->inc(l_msgr_recv_bytes,
+ rx_frame_asm.get_frame_onwire_len());
messenger->ms_fast_preprocess(message);
fast_dispatch_time = ceph::mono_clock::now();
return _fault();
}
auth_meta->con_mode = auth_done.con_mode();
- session_stream_handlers = \
- ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/false);
state = AUTH_CONNECTING_SIGN;
<< " features_supported=" << std::hex
<< server_ident.supported_features()
<< " features_required=" << server_ident.required_features()
- << " flags=" << server_ident.flags() << " cookie=" << std::dec
- << server_ident.cookie() << dendl;
+ << " flags=" << server_ident.flags()
+ << " cookie=" << server_ident.cookie() << std::dec << dendl;
// is this who we intended to talk to?
// be a bit forgiving here, since we may be connecting based on addresses parsed out
ceph_assert(auth_meta);
// TODO: having a possibility to check whether we're server or client could
// allow reusing finish_auth().
- session_stream_handlers = \
- ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/true);
const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
exproto->pre_auth.enabled = false;
if (!reconnecting) {
+ exproto->peer_supported_features = peer_supported_features;
+ exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
+ exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
+
exproto->client_cookie = client_cookie;
exproto->peer_name = peer_name;
exproto->connection_features = connection_features;
<< connection->policy.features_supported
<< " features_required="
<< (connection->policy.features_required | msgr2_required)
- << " flags=" << flags << " cookie=" << std::dec << server_cookie
- << dendl;
+ << " flags=" << flags
+ << " cookie=" << server_cookie << std::dec << dendl;
connection->lock.unlock();
// Because "replacing" will prevent other connections preempt this addr,