<< " cs=" << connect_seq << " l=" << connection->policy.lossy
<< " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
REVISION_1)
- << " rx=" << session_stream_handlers.rx.get()
+ << " crypto rx=" << session_stream_handlers.rx.get()
<< " tx=" << session_stream_handlers.tx.get()
+ << " comp rx=" << session_compression_handlers.rx.get()
+ << " tx=" << session_compression_handlers.tx.get()
<< ").";
}
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
- tx_frame_asm(&session_stream_handlers, false),
- rx_frame_asm(&session_stream_handlers, false),
+ tx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+ &session_compression_handlers),
+ rx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+ &session_compression_handlers),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
}
// `write_event()` unlocks it just before calling `write_message()`.
// `submit_to()` here is NOT blocking.
connection->center->submit_to(connection->center->get_id(), [this] {
- ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+ ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
<< dendl;
// Possibly unnecessary. See the comment in `deactivate_existing`.
std::lock_guard<std::mutex> l(connection->lock);
std::lock_guard<std::mutex> wl(connection->write_lock);
reset_security();
+ reset_compression();
}, /* always_async = */true);
} else {
reset_security();
+ reset_compression();
}
// clean read and write callbacks
ceph_msg_header2 header2{header.seq, header.tid,
header.type, header.priority,
header.version,
- init_le32(0), header.data_off,
- init_le64(ack_seq),
+ ceph_le32(0), header.data_off,
+ ceph_le64(ack_seq),
footer.flags, header.compat_version,
header.reserved};
ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
<< " bytes " << tx_frame_asm << dendl;
- connection->outgoing_bl.append(bl);
+ connection->outgoing_bl.claim_append(bl);
return true;
}
}
}
+void ProtocolV2::reset_compression() {
+ ldout(cct, 5) << __func__ << dendl;
+
+ comp_meta = CompConnectionMeta{};
+ session_compression_handlers.rx.reset(nullptr);
+ session_compression_handlers.tx.reset(nullptr);
+}
+
void ProtocolV2::write_event() {
ldout(cct, 10) << __func__ << dendl;
ssize_t r = 0;
lderr(cct) << __func__ << " not in ready state!" << dendl;
return _fault();
}
+ recv_stamp = ceph_clock_now();
state = THROTTLE_MESSAGE;
return CONTINUE(throttle_message);
} else {
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
case Tag::WAIT:
+ case Tag::COMPRESSION_REQUEST:
+ case Tag::COMPRESSION_DONE:
return handle_frame_payload();
case Tag::MESSAGE:
return handle_message();
return handle_message_ack(payload);
case Tag::WAIT:
return handle_wait(payload);
+ case Tag::COMPRESSION_REQUEST:
+ return handle_compression_request(payload);
+ case Tag::COMPRESSION_DONE:
+ return handle_compression_done(payload);
default:
ceph_abort();
}
}
CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
- bool aborted;
+ bool ok = false;
try {
- rx_frame_asm.disassemble_first_segment(rx_preamble, rx_segments_data[0]);
- aborted = !rx_frame_asm.disassemble_remaining_segments(
- rx_segments_data.data(), rx_epilogue);
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
} catch (FrameError& e) {
ldout(cct, 1) << __func__ << " " << e.what() << dendl;
return _fault();
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (aborted) {
+ if (!ok) {
reset_throttle();
state = READY;
return CONTINUE(read_frame);
ldout(cct, 20) << __func__ << dendl;
ceph_assert(state == THROTTLE_DONE);
-#if defined(WITH_EVENTTRACE)
- utime_t ltt_recv_stamp = ceph_clock_now();
-#endif
- recv_stamp = ceph_clock_now();
-
const size_t cur_msg_size = get_current_msg_size();
auto msg_frame = MessageFrame::Decode(rx_segments_data);
current_header.type,
current_header.priority,
current_header.version,
- init_le32(msg_frame.front_len()),
- init_le32(msg_frame.middle_len()),
- init_le32(msg_frame.data_len()),
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
current_header.data_off,
peer_name,
current_header.compat_version,
current_header.reserved,
- init_le32(0)};
- ceph_msg_footer footer{init_le32(0), init_le32(0),
- init_le32(0), init_le64(0), current_header.flags};
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
Message *message = decode_message(cct, 0, header, footer,
msg_frame.front(),
message->get_type() == CEPH_MSG_OSD_OPREPLY) {
utime_t ltt_processed_stamp = ceph_clock_now();
double usecs_elapsed =
- (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
+ ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
ostringstream buf;
if (message->get_type() == CEPH_MSG_OSD_OP)
OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
}
CtPtr ProtocolV2::finish_client_auth() {
+ if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+ return send_compression_request();
+ }
+
+ return start_session_connect();
+}
+
+CtPtr ProtocolV2::finish_server_auth() {
+ // server had sent AuthDone and client responded with correct pre-auth
+ // signature.
+ // We can start conditioanl msgr protocol
+ if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+ state = COMPRESSION_ACCEPTING;
+ } else {
+ // No msgr protocol features to process
+ // we can start accepting new sessions/reconnects.
+ state = SESSION_ACCEPTING;
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::start_session_connect() {
if (!server_cookie) {
ceph_assert(connect_seq == 0);
state = SESSION_CONNECTING;
return ready();
}
+CtPtr ProtocolV2::send_compression_request() {
+ state = COMPRESSION_CONNECTING;
+
+ const entity_type_t peer_type = connection->get_peer_type();
+ comp_meta.con_mode =
+ static_cast<Compressor::CompressionMode>(
+ messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
+ const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
+ auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
+
+ INTERCEPT(19);
+ return WRITE(comp_req_frame, "compression request", read_frame);
+}
+
+CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
+ if (state != COMPRESSION_CONNECTING) {
+ lderr(cct) << __func__ << " state changed!" << dendl;
+ return _fault();
+ }
+
+ auto response = CompressionDoneFrame::Decode(payload);
+ ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
+ << ", method=" << response.method() << ")" << dendl;
+
+ comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
+ if (comp_meta.is_compress() != response.is_compress()) {
+ comp_meta.con_mode = Compressor::COMP_NONE;
+ }
+ session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+ return start_session_connect();
+}
+
/* Server Protocol Methods */
CtPtr ProtocolV2::start_server_banner_exchange() {
}
if (state == AUTH_ACCEPTING_SIGN) {
- // server had sent AuthDone and client responded with correct pre-auth
- // signature. we can start accepting new sessions/reconnects.
- state = SESSION_ACCEPTING;
- return CONTINUE(read_frame);
+ // this happened on server side
+ return finish_server_auth();
} else if (state == AUTH_CONNECTING_SIGN) {
// this happened at client side
return finish_client_auth();
// this happens in the event center's thread as there should be
// no user outside its boundaries (simlarly to e.g. outgoing_bl).
auto temp_stream_handlers = std::move(session_stream_handlers);
+ auto temp_compression_handlers = std::move(session_compression_handlers);
exproto->auth_meta = auth_meta;
+ exproto->comp_meta = comp_meta;
ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
<< dendl;
new_worker,
new_center,
exproto,
- temp_stream_handlers=std::move(temp_stream_handlers)
+ temp_stream_handlers=std::move(temp_stream_handlers),
+ temp_compression_handlers=std::move(temp_compression_handlers)
](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
existing->outgoing_bl.clear();
existing->open_write = false;
exproto->session_stream_handlers = std::move(temp_stream_handlers);
+ exproto->session_compression_handlers = std::move(temp_compression_handlers);
existing->write_lock.unlock();
if (exproto->state == NONE) {
existing->shutdown_socket();
return WRITE(reconnect_ok, "reconnect ok", server_ready);
}
+
+
+CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
+ if (state != COMPRESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " state changed!" << dendl;
+ return _fault();
+ }
+
+ auto request = CompressionRequestFrame::Decode(payload);
+ ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
+ << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
+
+ const int peer_type = connection->get_peer_type();
+ if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
+ peer_type, auth_meta->is_mode_secure());
+ mode != Compressor::COMP_NONE && request.is_compress()) {
+ comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
+ if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
+ comp_meta.con_mode = mode;
+ }
+ } else {
+ comp_meta.con_method = Compressor::COMP_ALG_NONE;
+ }
+
+ auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
+
+ INTERCEPT(20);
+ return WRITE(response, "compression done", finish_compression);
+}
+
+CtPtr ProtocolV2::finish_compression() {
+ // TODO: having a possibility to check whether we're server or client could
+ // allow reusing finish_compression().
+
+ session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+ state = SESSION_ACCEPTING;
+ return CONTINUE(read_frame);
+}