namespace {
-// TODO: apply the same logging policy to Protocol V1
+// TODO: CEPH_MSGR2_FEATURE_COMPRESSION
+const uint64_t CRIMSON_MSGR2_SUPPORTED_FEATURES =
+ (CEPH_MSGR2_FEATURE_REVISION_1 |
+ // CEPH_MSGR2_FEATURE_COMPRESSION |
+ UINT64_C(0));
+
// Log levels in V2 Protocol:
// * error level, something error that cause connection to terminate:
// - fatal errors;
logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
bool ok = false;
try {
- rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
bufferlist rx_epilogue;
rx_epilogue.append(buffer::create(std::move(bl)));
- ok = rx_frame_asm.disassemble_remaining_segments(rx_segments_data.data(), rx_epilogue);
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
} catch (FrameError& e) {
logger().error("read_frame_payload: {} {}", conn, e.what());
abort_in_fault();
{
// 1. prepare and send banner
bufferlist banner_payload;
- encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CRIMSON_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
bufferlist bl;
logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
"required={}, banner=\"{}\"",
conn, bl.length(), len_payload,
- CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+ CRIMSON_MSGR2_SUPPORTED_FEATURES,
+ CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
return write_flush(std::move(bl)).then([this] {
peer_supported_features, peer_required_features);
// Check feature bit compatibility
- uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+ uint64_t supported_features = CRIMSON_MSGR2_SUPPORTED_FEATURES;
uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
if ((required_features & peer_supported_features) != required_features) {
logger().error("{} peer does not support all required features"
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
- // TODO: change peer_addr to entity_addrvec_t
- entity_addr_t paddr = client_ident.addrs().front();
- if ((paddr.is_msgr2() || paddr.is_any()) &&
- paddr.is_same_host(conn.target_addr)) {
- // good
- } else {
- logger().warn("{} peer's address {} is not v2 or not the same host with {}",
- conn, paddr, conn.target_addr);
- throw std::system_error(
- make_error_code(crimson::net::error::bad_peer_address));
- }
- conn.peer_addr = paddr;
+ conn.peer_addr = client_ident.addrs().front();
logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
conn.target_addr = conn.peer_addr;
if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
session_stream_handlers = { nullptr, nullptr };
+ session_comp_handlers = { nullptr, nullptr };
enable_recording();
return banner_exchange(false);
}).then([this] (auto&& ret) {
conn, ceph_entity_type_name(_peer_type),
conn.policy.lossy, conn.policy.server,
conn.policy.standby, conn.policy.resetcheck);
- if (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
- messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce()) {
+ if (!messenger.get_myaddr().is_blank_ip() &&
+ (messenger.get_myaddr().get_port() != _my_addr_from_peer.get_port() ||
+ messenger.get_myaddr().get_nonce() != _my_addr_from_peer.get_nonce())) {
logger().warn("{} my_addr_from_peer {} port/nonce doesn't match myaddr {}",
conn, _my_addr_from_peer, messenger.get_myaddr());
throw std::system_error(
// READY state
ceph::bufferlist ProtocolV2::do_sweep_messages(
- const std::deque<MessageRef>& msgs,
+ const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> _keepalive_ack,
INTERCEPT_FRAME(ceph::msgr::v2::Tag::KEEPALIVE2_ACK, bp_type_t::WRITE);
}
- if (require_ack && !num_msgs) {
+ if (require_ack && num_msgs == 0u) {
auto ack_frame = AckFrame::Encode(conn.in_seq);
bl.append(ack_frame.get_buffer(tx_frame_asm));
INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
- std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
// TODO: move to common code
// set priority
msg->get_header().src = messenger.get_myname();
ceph_msg_header2 header2{header.seq, header.tid,
header.type, header.priority,
header.version,
- init_le32(0), header.data_off,
- init_le64(conn.in_seq),
+ ceph_le32(0), header.data_off,
+ ceph_le64(conn.in_seq),
footer.flags, header.compat_version,
header.reserved};
current_header.type,
current_header.priority,
current_header.version,
- init_le32(msg_frame.front_len()),
- init_le32(msg_frame.middle_len()),
- init_le32(msg_frame.data_len()),
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
current_header.data_off,
conn.get_peer_name(),
current_header.compat_version,
current_header.reserved,
- init_le32(0)};
- ceph_msg_footer footer{init_le32(0), init_le32(0),
- init_le32(0), init_le64(0), current_header.flags};
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this());