#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix _conn_prefix(_dout)
-ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
+std::ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) {
return *_dout << "--2- " << messenger->get_myaddrs() << " >> "
<< *connection->peer_addrs << " conn(" << connection << " "
<< this
<< " :" << connection->port
<< " s=" << get_state_name(state) << " pgs=" << peer_global_seq
<< " cs=" << connect_seq << " l=" << connection->policy.lossy
- << " rx=" << session_stream_handlers.rx.get()
+ << " rev1=" << HAVE_MSGR2_FEATURE(peer_supported_features,
+ REVISION_1)
+ << " crypto rx=" << session_stream_handlers.rx.get()
<< " tx=" << session_stream_handlers.tx.get()
+ << " comp rx=" << session_compression_handlers.rx.get()
+ << " tx=" << session_compression_handlers.tx.get()
<< ").";
}
void ProtocolV2::run_continuation(CtRef continuation) {
try {
CONTINUATION_RUN(continuation)
- } catch (const buffer::error &e) {
- lderr(cct) << __func__ << " failed decoding of frame header: " << e
+ } catch (const ceph::buffer::error &e) {
+ lderr(cct) << __func__ << " failed decoding of frame header: " << e.what()
<< dendl;
_fault();
} catch (const ceph::crypto::onwire::MsgAuthError &e) {
#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
-#define READ(L, C) read(CONTINUATION(C), buffer::ptr_node::create(buffer::create(L)))
+#define READ(L, C) read(CONTINUATION(C), ceph::buffer::ptr_node::create(ceph::buffer::create(L)))
#define READ_RXBUF(B, C) read(CONTINUATION(C), B)
ProtocolV2::ProtocolV2(AsyncConnection *connection)
: Protocol(2, connection),
state(NONE),
- peer_required_features(0),
+ peer_supported_features(0),
client_cookie(0),
server_cookie(0),
global_seq(0),
replacing(false),
can_write(false),
bannerExchangeCallback(nullptr),
+ tx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+ &session_compression_handlers),
+ rx_frame_asm(&session_stream_handlers, false, cct->_conf->ms_crc_data,
+ &session_compression_handlers),
next_tag(static_cast<Tag>(0)),
keepalive(false) {
}
void ProtocolV2::discard_out_queue() {
ldout(cct, 10) << __func__ << " started" << dendl;
- for (list<Message *>::iterator p = sent.begin(); p != sent.end(); ++p) {
+ for (auto p = sent.begin(); p != sent.end(); ++p) {
ldout(cct, 20) << __func__ << " discard " << *p << dendl;
(*p)->put();
}
}
}
out_queue.clear();
+ write_in_progress = false;
}
void ProtocolV2::reset_session() {
connection->dispatch_queue->discard_queue(connection->conn_id);
discard_out_queue();
- connection->outcoming_bl.clear();
+ connection->outgoing_bl.clear();
connection->dispatch_queue->queue_remote_reset(connection);
void ProtocolV2::fault() { _fault(); }
void ProtocolV2::requeue_sent() {
+ write_in_progress = false;
if (sent.empty()) {
return;
}
ldout(cct, 5) << __func__ << " requeueing message m=" << m
<< " seq=" << m->get_seq() << " type=" << m->get_type() << " "
<< *m << dendl;
+ m->clear_payload();
rq.emplace_front(out_queue_entry_t{false, m});
}
}
return count;
}
+void ProtocolV2::reset_security() {
+ ldout(cct, 5) << __func__ << dendl;
+
+ auth_meta.reset(new AuthConnectionMeta);
+ session_stream_handlers.rx.reset(nullptr);
+ session_stream_handlers.tx.reset(nullptr);
+ pre_auth.rxbuf.clear();
+ pre_auth.txbuf.clear();
+}
+
+// it's expected the `write_lock` is held while calling this method.
void ProtocolV2::reset_recv_state() {
- if ((state >= AUTH_CONNECTING && state <= SESSION_RECONNECTING) ||
- state == READY) {
- auth_meta.reset(new AuthConnectionMeta);
- session_stream_handlers.tx.reset(nullptr);
- session_stream_handlers.rx.reset(nullptr);
- pre_auth.txbuf.clear();
- pre_auth.rxbuf.clear();
+ ldout(cct, 5) << __func__ << dendl;
+
+ if (!connection->center->in_thread()) {
+ // execute in the same thread that uses the rx/tx handlers. We need
+ // to do the warp because holding `write_lock` is not enough as
+ // `write_event()` unlocks it just before calling `write_message()`.
+ // `submit_to()` here is NOT blocking.
+ connection->center->submit_to(connection->center->get_id(), [this] {
+ ldout(cct, 5) << "reset_recv_state (warped) reseting crypto and compression handlers"
+ << dendl;
+ // Possibly unnecessary. See the comment in `deactivate_existing`.
+ std::lock_guard<std::mutex> l(connection->lock);
+ std::lock_guard<std::mutex> wl(connection->write_lock);
+ reset_security();
+ reset_compression();
+ }, /* always_async = */true);
+ } else {
+ reset_security();
+ reset_compression();
}
// clean read and write callbacks
}
size_t ProtocolV2::get_current_msg_size() const {
- ceph_assert(!rx_segments_desc.empty());
+ ceph_assert(rx_frame_asm.get_num_segments() > 0);
size_t sum = 0;
// we don't include SegmentIndex::Msg::HEADER.
- for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
- sum += rx_segments_desc[idx].length;
+ for (size_t i = 1; i < rx_frame_asm.get_num_segments(); i++) {
+ sum += rx_frame_asm.get_segment_logical_len(i);
}
return sum;
}
ldout(cct, 20) << __func__ << " m=" << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
- if (m->empty_payload()) {
- ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
- << " " << *m << dendl;
- } else {
- ldout(cct, 20) << __func__ << " half-reencoding features " << features
- << " " << m << " " << *m << dendl;
- }
+ ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
+ << features << " " << m << " " << *m << dendl;
// encode and copy out of *m
m->encode(features, 0);
} else {
ldout(cct, 5) << __func__ << " enqueueing message m=" << m
<< " type=" << m->get_type() << " " << *m << dendl;
+ m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(
out_queue_entry_t{is_prepared, m});
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
- if ((!replacing && can_write) || state == STANDBY) {
+ if (((!replacing && can_write) || state == STANDBY) && !write_in_progress) {
+ write_in_progress = true;
connection->center->dispatch_event_external(connection->write_handler);
}
}
ceph_msg_header2 header2{header.seq, header.tid,
header.type, header.priority,
header.version,
- 0, header.data_off,
- ack_seq,
+ ceph_le32(0), header.data_off,
+ ceph_le64(ack_seq),
footer.flags, header.compat_version,
header.reserved};
m->get_payload(),
m->get_middle(),
m->get_data());
- connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
+ if (!append_frame(message)) {
+ m->put();
+ return -EILSEQ;
+ }
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
<< " src=" << entity_name_t(messenger->get_myname())
<< " off=" << header2.data_off
<< dendl;
- ssize_t total_send_size = connection->outcoming_bl.length();
+ ssize_t total_send_size = connection->outgoing_bl.length();
ssize_t rc = connection->_try_send(more);
if (rc < 0) {
ldout(cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(rc) << dendl;
} else {
connection->logger->inc(
- l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+ l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
ldout(cct, 10) << __func__ << " sending " << m
<< (rc ? " continuely." : " done.") << dendl;
}
+
+#if defined(WITH_EVENTTRACE)
if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
+#endif
m->put();
return rc;
}
-void ProtocolV2::append_keepalive() {
- ldout(cct, 10) << __func__ << dendl;
- auto keepalive_frame = KeepAliveFrame::Encode();
- connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
-}
+template <class F>
+bool ProtocolV2::append_frame(F& frame) {
+ ceph::bufferlist bl;
+ try {
+ bl = frame.get_buffer(tx_frame_asm);
+ } catch (ceph::crypto::onwire::TxHandlerError &e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return false;
+ }
-void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
- connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+ ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+ << " bytes " << tx_frame_asm << dendl;
+ connection->outgoing_bl.claim_append(bl);
+ return true;
}
void ProtocolV2::handle_message_ack(uint64_t seq) {
static const int max_pending = 128;
int i = 0;
Message *pending[max_pending];
+ auto now = ceph::mono_clock::now();
connection->write_lock.lock();
while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
Message *m = sent.front();
<< dendl;
}
connection->write_lock.unlock();
+ connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
for (int k = 0; k < i; k++) {
pending[k]->put();
}
}
+void ProtocolV2::reset_compression() {
+ ldout(cct, 5) << __func__ << dendl;
+
+ comp_meta = CompConnectionMeta{};
+ session_compression_handlers.rx.reset(nullptr);
+ session_compression_handlers.tx.reset(nullptr);
+}
+
void ProtocolV2::write_event() {
ldout(cct, 10) << __func__ << dendl;
ssize_t r = 0;
connection->write_lock.lock();
if (can_write) {
if (keepalive) {
- append_keepalive();
+ ldout(cct, 10) << __func__ << " appending keepalive" << dendl;
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ if (!append_frame(keepalive_frame)) {
+ connection->write_lock.unlock();
+ connection->lock.lock();
+ fault();
+ connection->lock.unlock();
+ return;
+ }
keepalive = false;
}
prepare_send_message(connection->get_features(), out_entry.m);
}
+ if (out_entry.m->queue_start != ceph::mono_time()) {
+ connection->logger->tinc(l_msgr_send_messages_queue_lat,
+ ceph::mono_clock::now() -
+ out_entry.m->queue_start);
+ }
+
r = write_message(out_entry.m, more);
connection->write_lock.lock();
} else if (r < 0) {
ldout(cct, 1) << __func__ << " send msg failed" << dendl;
break;
- } else if (r > 0)
+ } else if (r > 0) {
+ // Outbound message in-progress, thread will be re-awoken
+ // when the outbound socket is writeable again
break;
+ }
} while (can_write);
+ write_in_progress = false;
// if r > 0 mean data still lefted, so no need _try_send.
if (r == 0) {
uint64_t left = ack_left;
if (left) {
- ceph_le64 s;
- s = in_seq;
- auto ack = AckFrame::Encode(in_seq);
- connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
- ack_left -= left;
- left = ack_left;
- r = connection->_try_send(left);
+ auto ack_frame = AckFrame::Encode(in_seq);
+ if (append_frame(ack_frame)) {
+ ack_left -= left;
+ left = ack_left;
+ r = connection->_try_send(left);
+ } else {
+ r = -EILSEQ;
+ }
} else if (is_queued()) {
r = connection->_try_send();
}
return;
}
} else {
+ write_in_progress = false;
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
return !out_queue.empty() || connection->is_queued();
}
-uint32_t ProtocolV2::get_onwire_size(const uint32_t logical_size) const {
- if (session_stream_handlers.rx) {
- return segment_onwire_size(logical_size);
- } else {
- return logical_size;
- }
-}
-
-uint32_t ProtocolV2::get_epilogue_size() const {
- // In secure mode size of epilogue is flexible and depends on particular
- // cipher implementation. See the comment for epilogue_secure_block_t or
- // epilogue_plain_block_t.
- if (session_stream_handlers.rx) {
- return FRAME_SECURE_EPILOGUE_SIZE + \
- session_stream_handlers.rx->get_extra_size_at_final();
- } else {
- return FRAME_PLAIN_EPILOGUE_SIZE;
- }
-}
-
CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
rx_buffer_t &&buffer) {
const auto len = buffer->length();
if (unlikely(pre_auth.enabled) && r >= 0) {
pre_auth.rxbuf.append(*next.node);
ceph_assert(!cct->_conf->ms_die_on_bug ||
- pre_auth.rxbuf.length() < 1000000);
+ pre_auth.rxbuf.length() < 20000000);
}
next.r = r;
run_continuation(next);
});
if (r <= 0) {
// error or done synchronously
- if (unlikely(pre_auth.enabled) && r >= 0) {
+ if (unlikely(pre_auth.enabled) && r == 0) {
pre_auth.rxbuf.append(*next.node);
ceph_assert(!cct->_conf->ms_die_on_bug ||
- pre_auth.rxbuf.length() < 1000000);
+ pre_auth.rxbuf.length() < 20000000);
}
next.r = r;
return &next;
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_TYPE<ProtocolV2> &next,
F &frame) {
- ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
+ ceph::bufferlist bl;
+ try {
+ bl = frame.get_buffer(tx_frame_asm);
+ } catch (ceph::crypto::onwire::TxHandlerError &e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return _fault();
+ }
+
+ ldout(cct, 25) << __func__ << " assembled frame " << bl.length()
+ << " bytes " << tx_frame_asm << dendl;
return write(desc, next, bl);
}
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_TYPE<ProtocolV2> &next,
- bufferlist &buffer) {
+ ceph::bufferlist &buffer) {
if (unlikely(pre_auth.enabled)) {
pre_auth.txbuf.append(buffer);
ceph_assert(!cct->_conf->ms_die_on_bug ||
- pre_auth.txbuf.length() < 1000000);
+ pre_auth.txbuf.length() < 20000000);
}
ssize_t r =
ldout(cct, 20) << __func__ << dendl;
bannerExchangeCallback = &callback;
- bufferlist banner_payload;
+ ceph::bufferlist banner_payload;
+ using ceph::encode;
encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
- bufferlist bl;
+ ceph::bufferlist bl;
bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
encode((uint16_t)banner_payload.length(), bl, 0);
bl.claim_append(banner_payload);
}
CtPtr ProtocolV2::_wait_for_peer_banner() {
- unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
return READ(banner_len, _handle_peer_banner);
}
}
uint16_t payload_len;
- bufferlist bl;
+ ceph::bufferlist bl;
buffer->set_offset(banner_prefix_len);
- buffer->set_length(sizeof(__le16));
+ buffer->set_length(sizeof(ceph_le16));
bl.push_back(std::move(buffer));
auto ti = bl.cbegin();
+ using ceph::decode;
try {
decode(payload_len, ti);
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
return _fault();
}
uint64_t peer_supported_features;
uint64_t peer_required_features;
- bufferlist bl;
+ ceph::bufferlist bl;
+ using ceph::decode;
bl.push_back(std::move(buffer));
auto ti = bl.cbegin();
try {
decode(peer_supported_features, ti);
decode(peer_required_features, ti);
- } catch (const buffer::error &e) {
+ } catch (const ceph::buffer::error &e) {
lderr(cct) << __func__ << " decode banner payload failed " << dendl;
return _fault();
}
return nullptr;
}
- this->peer_required_features = peer_required_features;
- if (this->peer_required_features == 0) {
+ this->peer_supported_features = peer_supported_features;
+ if (peer_required_features == 0) {
this->connection_features = msgr2_required;
}
- // at this point we can change how the client protocol behaves based on
- // this->peer_required_features
+ // if the peer supports msgr2.1, switch to it
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ tx_frame_asm.set_is_rev1(is_rev1);
+ rx_frame_asm.set_is_rev1(is_rev1);
if (state == BANNER_CONNECTING) {
state = HELLO_CONNECTING;
<< " peer_type=" << (int)hello.entity_type()
<< " peer_addr_for_me=" << hello.peer_addr() << dendl;
+ sockaddr_storage ss;
+ socklen_t len = sizeof(ss);
+ getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
+ ldout(cct, 5) << __func__ << " getsockname says I am " << (sockaddr *)&ss
+ << " when talking to " << connection->target_addr << dendl;
+
if (connection->get_peer_type() == -1) {
connection->set_peer_type(hello.entity_type());
}
}
+ if (messenger->get_myaddrs().empty() ||
+ messenger->get_myaddrs().front().is_blank_ip()) {
+ entity_addr_t a;
+ if (cct->_conf->ms_learn_addr_from_peer) {
+ ldout(cct, 1) << __func__ << " peer " << connection->target_addr
+ << " says I am " << hello.peer_addr() << " (socket says "
+ << (sockaddr*)&ss << ")" << dendl;
+ a = hello.peer_addr();
+ } else {
+ ldout(cct, 1) << __func__ << " socket to " << connection->target_addr
+ << " says I am " << (sockaddr*)&ss
+ << " (peer says " << hello.peer_addr() << ")" << dendl;
+ a.set_sockaddr((sockaddr *)&ss);
+ }
+ a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
+ a.set_port(0);
+ connection->lock.unlock();
+ messenger->learned_addr(a);
+ if (cct->_conf->ms_inject_internal_delays &&
+ cct->_conf->ms_inject_socket_failures) {
+ if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
+ ldout(cct, 10) << __func__ << " sleep for "
+ << cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+ }
+ connection->lock.lock();
+ if (state != HELLO_CONNECTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while learned_addr, mark_down or "
+ << " replacing must be happened just now" << dendl;
+ return nullptr;
+ }
+ }
+
+
+
CtPtr callback;
callback = bannerExchangeCallback;
bannerExchangeCallback = nullptr;
}
ldout(cct, 20) << __func__ << dendl;
- return READ(FRAME_PREAMBLE_SIZE, handle_read_frame_preamble_main);
+ rx_preamble.clear();
+ rx_epilogue.clear();
+ rx_segments_data.clear();
+
+ return READ(rx_frame_asm.get_preamble_onwire_len(),
+ handle_read_frame_preamble_main);
}
CtPtr ProtocolV2::handle_read_frame_preamble_main(rx_buffer_t &&buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r
+ ldout(cct, 1) << __func__ << " read frame preamble failed r=" << r
<< " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
- ceph::bufferlist preamble;
- preamble.push_back(std::move(buffer));
+ rx_preamble.push_back(std::move(buffer));
ldout(cct, 30) << __func__ << " preamble\n";
- preamble.hexdump(*_dout);
+ rx_preamble.hexdump(*_dout);
*_dout << dendl;
- if (session_stream_handlers.rx) {
- ceph_assert(session_stream_handlers.rx);
-
- session_stream_handlers.rx->reset_rx_handler();
- preamble = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(preamble), segment_t::DEFAULT_ALIGNMENT);
+ try {
+ next_tag = rx_frame_asm.disassemble_preamble(rx_preamble);
+ } catch (FrameError& e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return _fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+ return _fault();
+ }
- ldout(cct, 10) << __func__ << " got encrypted preamble."
- << " after decrypt premable.length()=" << preamble.length()
- << dendl;
+ ldout(cct, 25) << __func__ << " disassembled preamble " << rx_frame_asm
+ << dendl;
+ if (session_stream_handlers.rx) {
ldout(cct, 30) << __func__ << " preamble after decrypt\n";
- preamble.hexdump(*_dout);
+ rx_preamble.hexdump(*_dout);
*_dout << dendl;
}
- {
- // I expect ceph_le32 will make the endian conversion for me. Passing
- // everything through ::Decode is unnecessary.
- const auto& main_preamble = \
- reinterpret_cast<preamble_block_t&>(*preamble.c_str());
-
- // verify preamble's CRC before any further processing
- const auto rx_crc = ceph_crc32c(0,
- reinterpret_cast<const unsigned char*>(&main_preamble),
- sizeof(main_preamble) - sizeof(main_preamble.crc));
- if (rx_crc != main_preamble.crc) {
- ldout(cct, 10) << __func__ << " crc mismatch for main preamble"
- << " rx_crc=" << rx_crc
- << " tx_crc=" << main_preamble.crc << dendl;
- return _fault();
- }
-
- // currently we do support between 1 and MAX_NUM_SEGMENTS segments
- if (main_preamble.num_segments < 1 ||
- main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- ldout(cct, 10) << __func__ << " unsupported num_segments="
- << " tx_crc=" << main_preamble.num_segments << dendl;
- return _fault();
- }
-
- next_tag = static_cast<Tag>(main_preamble.tag);
-
- rx_segments_desc.clear();
- rx_segments_data.clear();
-
- if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- ldout(cct, 30) << __func__
- << " num_segments=" << main_preamble.num_segments
- << " is too much" << dendl;
- return _fault();
- }
- for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
- ldout(cct, 10) << __func__ << " got new segment:"
- << " len=" << main_preamble.segments[idx].length
- << " align=" << main_preamble.segments[idx].alignment
- << dendl;
- rx_segments_desc.emplace_back(main_preamble.segments[idx]);
- }
- }
-
// does it need throttle?
if (next_tag == Tag::MESSAGE) {
if (state != READY) {
lderr(cct) << __func__ << " not in ready state!" << dendl;
return _fault();
}
+ recv_stamp = ceph_clock_now();
state = THROTTLE_MESSAGE;
return CONTINUE(throttle_message);
} else {
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
case Tag::WAIT:
+ case Tag::COMPRESSION_REQUEST:
+ case Tag::COMPRESSION_DONE:
return handle_frame_payload();
case Tag::MESSAGE:
return handle_message();
}
CtPtr ProtocolV2::read_frame_segment() {
- ldout(cct, 20) << __func__ << dendl;
- ceph_assert(!rx_segments_desc.empty());
+ size_t seg_idx = rx_segments_data.size();
+ ldout(cct, 20) << __func__ << " seg_idx=" << seg_idx << dendl;
+ rx_segments_data.emplace_back();
+
+ uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
+ if (onwire_len == 0) {
+ return _handle_read_frame_segment();
+ }
- // description of current segment to read
- const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
rx_buffer_t rx_buffer;
+ uint16_t align = rx_frame_asm.get_segment_align(seg_idx);
try {
- rx_buffer = buffer::ptr_node::create(buffer::create_aligned(
- get_onwire_size(cur_rx_desc.length), cur_rx_desc.alignment));
- } catch (std::bad_alloc&) {
+ rx_buffer = ceph::buffer::ptr_node::create(ceph::buffer::create_aligned(
+ onwire_len, align));
+ } catch (const ceph::buffer::bad_alloc&) {
// Catching because of potential issues with satisfying alignment.
- ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
- << " len=" << get_onwire_size(cur_rx_desc.length)
- << " align=" << cur_rx_desc.alignment
- << dendl;
+ ldout(cct, 1) << __func__ << " can't allocate aligned rx_buffer"
+ << " len=" << onwire_len
+ << " align=" << align
+ << dendl;
return _fault();
}
return _fault();
}
- rx_segments_data.emplace_back();
rx_segments_data.back().push_back(std::move(rx_buffer));
+ return _handle_read_frame_segment();
+}
- // decrypt incoming data
- // FIXME: if (auth_meta->is_mode_secure()) {
- if (session_stream_handlers.rx) {
- ceph_assert(session_stream_handlers.rx);
-
- auto& new_seg = rx_segments_data.back();
- if (new_seg.length()) {
- auto padded = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(new_seg), segment_t::DEFAULT_ALIGNMENT);
- const auto idx = rx_segments_data.size() - 1;
- new_seg.clear();
- padded.splice(0, rx_segments_desc[idx].length, &new_seg);
-
- ldout(cct, 20) << __func__
- << " unpadded new_seg.length()=" << new_seg.length()
- << dendl;
- }
- }
-
- if (rx_segments_desc.size() == rx_segments_data.size()) {
+CtPtr ProtocolV2::_handle_read_frame_segment() {
+ if (rx_segments_data.size() == rx_frame_asm.get_num_segments()) {
// OK, all segments planned to read are read. Can go with epilogue.
- return READ(get_epilogue_size(), handle_read_frame_epilogue_main);
- } else {
- // TODO: for makeshift only. This will be more generic and throttled
- return read_frame_segment();
+ uint32_t epilogue_onwire_len = rx_frame_asm.get_epilogue_onwire_len();
+ if (epilogue_onwire_len == 0) {
+ return _handle_read_frame_epilogue_main();
+ }
+ return READ(epilogue_onwire_len, handle_read_frame_epilogue_main);
}
+ // TODO: for makeshift only. This will be more generic and throttled
+ return read_frame_segment();
}
CtPtr ProtocolV2::handle_frame_payload() {
return handle_message_ack(payload);
case Tag::WAIT:
return handle_wait(payload);
+ case Tag::COMPRESSION_REQUEST:
+ return handle_compression_request(payload);
+ case Tag::COMPRESSION_DONE:
+ return handle_compression_done(payload);
default:
ceph_abort();
}
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
- ldout(cct, 1) << __func__ << " read data error " << dendl;
+ ldout(cct, 1) << __func__ << " read frame epilogue failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
- __u8 late_flags;
+ rx_epilogue.push_back(std::move(buffer));
+ return _handle_read_frame_epilogue_main();
+}
- // FIXME: if (auth_meta->is_mode_secure()) {
- if (session_stream_handlers.rx) {
- ldout(cct, 1) << __func__ << " read frame epilogue bytes="
- << get_epilogue_size() << dendl;
-
- // decrypt epilogue and authenticate entire frame.
- ceph::bufferlist epilogue_bl;
- {
- epilogue_bl.push_back(std::move(buffer));
- try {
- epilogue_bl =
- session_stream_handlers.rx->authenticated_decrypt_update_final(
- std::move(epilogue_bl), segment_t::DEFAULT_ALIGNMENT);
- } catch (ceph::crypto::onwire::MsgAuthError &e) {
- ldout(cct, 5) << __func__ << " message authentication failed: "
- << e.what() << dendl;
- return _fault();
- }
- }
- auto& epilogue =
- reinterpret_cast<epilogue_plain_block_t&>(*epilogue_bl.c_str());
- late_flags = epilogue.late_flags;
- } else {
- auto& epilogue = reinterpret_cast<epilogue_plain_block_t&>(*buffer->c_str());
-
- for (std::uint8_t idx = 0; idx < rx_segments_data.size(); idx++) {
- const __u32 expected_crc = epilogue.crc_values[idx];
- const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
- if (expected_crc != calculated_crc) {
- ldout(cct, 5) << __func__ << " message integrity check failed: "
- << " expected_crc=" << expected_crc
- << " calculated_crc=" << calculated_crc
- << dendl;
- return _fault();
- } else {
- ldout(cct, 20) << __func__ << " message integrity check success: "
- << " expected_crc=" << expected_crc
- << " calculated_crc=" << calculated_crc
- << dendl;
- }
- }
- late_flags = epilogue.late_flags;
+CtPtr ProtocolV2::_handle_read_frame_epilogue_main() {
+ bool ok = false;
+ try {
+ ok = rx_frame_asm.disassemble_segments(rx_preamble, rx_segments_data.data(), rx_epilogue);
+ } catch (FrameError& e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return _fault();
+ } catch (ceph::crypto::onwire::MsgAuthError&) {
+ ldout(cct, 1) << __func__ << "bad auth tag" << dendl;
+ return _fault();
}
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
// the kernel client to avoid unnecessary buffering.
- if (late_flags & FRAME_FLAGS_LATEABRT) {
+ if (!ok) {
reset_throttle();
state = READY;
return CONTINUE(read_frame);
- } else {
- return handle_read_frame_dispatch();
}
+ return handle_read_frame_dispatch();
}
CtPtr ProtocolV2::handle_message() {
ldout(cct, 20) << __func__ << dendl;
ceph_assert(state == THROTTLE_DONE);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
- ltt_recv_stamp = ceph_clock_now();
-#endif
- recv_stamp = ceph_clock_now();
-
- // we need to get the size before std::moving segments data
const size_t cur_msg_size = get_current_msg_size();
- auto msg_frame = MessageFrame::Decode(std::move(rx_segments_data));
+ auto msg_frame = MessageFrame::Decode(rx_segments_data);
// XXX: paranoid copy just to avoid oops
ceph_msg_header2 current_header = msg_frame.header();
current_header.type,
current_header.priority,
current_header.version,
- msg_frame.front_len(),
- msg_frame.middle_len(),
- msg_frame.data_len(),
+ ceph_le32(msg_frame.front_len()),
+ ceph_le32(msg_frame.middle_len()),
+ ceph_le32(msg_frame.data_len()),
current_header.data_off,
peer_name,
current_header.compat_version,
current_header.reserved,
- 0};
- ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
+ ceph_le32(0)};
+ ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
+ ceph_le32(0), ceph_le64(0), current_header.flags};
Message *message = decode_message(cct, 0, header, footer,
msg_frame.front(),
}
}
- message->set_connection(connection);
-
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+#if defined(WITH_EVENTTRACE)
if (message->get_type() == CEPH_MSG_OSD_OP ||
message->get_type() == CEPH_MSG_OSD_OPREPLY) {
utime_t ltt_processed_stamp = ceph_clock_now();
double usecs_elapsed =
- (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000;
+ ((double)(ltt_processed_stamp.to_nsec() - recv_stamp.to_nsec())) / 1000;
ostringstream buf;
if (message->get_type() == CEPH_MSG_OSD_OP)
OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP",
state = READY;
+ ceph::mono_time fast_dispatch_time;
+
+ if (connection->is_blackhole()) {
+ ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
+ message->put();
+ goto out;
+ }
+
connection->logger->inc(l_msgr_recv_messages);
- connection->logger->inc(
- l_msgr_recv_bytes,
- cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+ connection->logger->inc(l_msgr_recv_bytes,
+ rx_frame_asm.get_frame_onwire_len());
messenger->ms_fast_preprocess(message);
- auto fast_dispatch_time = ceph::mono_clock::now();
+ fast_dispatch_time = ceph::mono_clock::now();
connection->logger->tinc(l_msgr_running_recv_time,
- fast_dispatch_time - connection->recv_start_time);
+ fast_dispatch_time - connection->recv_start_time);
if (connection->delay_state) {
double delay_period = 0;
if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
connection->logger->tinc(l_msgr_running_fast_dispatch_time,
connection->recv_start_time - fast_dispatch_time);
connection->lock.lock();
+ // we might have been reused by another connection
+ // let's check if that is the case
+ if (state != READY) {
+ // yes, that was the case, let's do nothing
+ return nullptr;
+ }
} else {
connection->dispatch_queue->enqueue(message, message->get_priority(),
connection->conn_id);
handle_message_ack(current_header.ack_seq);
- // we might have been reused by another connection
- // let's check if that is the case
- if (state != READY) {
- // yes, that was the case, let's do nothing
- return nullptr;
- }
-
+ out:
if (need_dispatch_writer && connection->is_connected()) {
connection->center->dispatch_event_external(connection->write_handler);
}
ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
connection->write_lock.lock();
- append_keepalive_ack(keepalive_frame.timestamp());
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp());
+ if (!append_frame(keepalive_ack_frame)) {
+ connection->write_lock.unlock();
+ return _fault();
+ }
connection->write_lock.unlock();
ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
}
CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
+ ceph_assert(messenger->auth_client);
ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
<< " auth_client " << messenger->auth_client << dendl;
- ceph_assert(messenger->auth_client);
- bufferlist bl;
- vector<uint32_t> preferred_modes;
+ ceph::bufferlist bl;
+ std::vector<uint32_t> preferred_modes;
auto am = auth_meta;
connection->lock.unlock();
int r = messenger->auth_client->get_auth_request(
return _fault();
}
auth_meta->con_mode = auth_done.con_mode();
- session_stream_handlers = \
- ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, false);
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/false);
state = AUTH_CONNECTING_SIGN;
}
CtPtr ProtocolV2::finish_client_auth() {
+ if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+ return send_compression_request();
+ }
+
+ return start_session_connect();
+}
+
+CtPtr ProtocolV2::finish_server_auth() {
+ // server had sent AuthDone and client responded with correct pre-auth
+ // signature.
+ // We can start conditioanl msgr protocol
+ if (HAVE_MSGR2_FEATURE(peer_supported_features, COMPRESSION)) {
+ state = COMPRESSION_ACCEPTING;
+ } else {
+ // No msgr protocol features to process
+ // we can start accepting new sessions/reconnects.
+ state = SESSION_ACCEPTING;
+ }
+
+ return CONTINUE(read_frame);
+}
+
+CtPtr ProtocolV2::start_session_connect() {
if (!server_cookie) {
ceph_assert(connect_seq == 0);
state = SESSION_CONNECTING;
flags |= CEPH_MSG_CONNECT_LOSSY;
}
- if (messenger->get_myaddrs().empty() ||
- messenger->get_myaddrs().front().is_blank_ip()) {
- sockaddr_storage ss;
- socklen_t len = sizeof(ss);
- int r = getsockname(connection->cs.socket_fd(), (sockaddr *)&ss, &len);
- ceph_assert(r == 0);
- ldout(cct, 1) << __func__ << " getsockname reveals I am " << (sockaddr *)&ss
- << " when talking to " << connection->target_addr << dendl;
- entity_addr_t a;
- a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
- a.set_sockaddr((sockaddr *)&ss);
- a.set_port(0);
- connection->lock.unlock();
- messenger->learned_addr(a);
- if (cct->_conf->ms_inject_internal_delays &&
- cct->_conf->ms_inject_socket_failures) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 10) << __func__ << " sleep for "
- << cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(cct->_conf->ms_inject_internal_delays);
- t.sleep();
- }
- }
- connection->lock.lock();
- if (state != SESSION_CONNECTING) {
- ldout(cct, 1) << __func__
- << " state changed while learned_addr, mark_down or "
- << " replacing must be happened just now" << dendl;
- return nullptr;
- }
- }
-
auto client_ident = ClientIdentFrame::Encode(
messenger->get_myaddrs(),
connection->target_addr,
<< " features_supported=" << std::hex
<< server_ident.supported_features()
<< " features_required=" << server_ident.required_features()
- << " flags=" << server_ident.flags() << " cookie=" << std::dec
- << server_ident.cookie() << dendl;
+ << " flags=" << server_ident.flags()
+ << " cookie=" << server_ident.cookie() << std::dec << dendl;
// is this who we intended to talk to?
// be a bit forgiving here, since we may be connecting based on addresses parsed out
return ready();
}
+CtPtr ProtocolV2::send_compression_request() {
+ state = COMPRESSION_CONNECTING;
+
+ const entity_type_t peer_type = connection->get_peer_type();
+ comp_meta.con_mode =
+ static_cast<Compressor::CompressionMode>(
+ messenger->comp_registry.get_mode(peer_type, auth_meta->is_mode_secure()));
+ const auto preferred_methods = messenger->comp_registry.get_methods(peer_type);
+ auto comp_req_frame = CompressionRequestFrame::Encode(comp_meta.is_compress(), preferred_methods);
+
+ INTERCEPT(19);
+ return WRITE(comp_req_frame, "compression request", read_frame);
+}
+
+CtPtr ProtocolV2::handle_compression_done(ceph::bufferlist &payload) {
+ if (state != COMPRESSION_CONNECTING) {
+ lderr(cct) << __func__ << " state changed!" << dendl;
+ return _fault();
+ }
+
+ auto response = CompressionDoneFrame::Decode(payload);
+ ldout(cct, 10) << __func__ << " CompressionDoneFrame(is_compress=" << response.is_compress()
+ << ", method=" << response.method() << ")" << dendl;
+
+ comp_meta.con_method = static_cast<Compressor::CompressionAlgorithm>(response.method());
+ if (comp_meta.is_compress() != response.is_compress()) {
+ comp_meta.con_mode = Compressor::COMP_NONE;
+ }
+ session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+ return start_session_connect();
+}
+
/* Server Protocol Methods */
CtPtr ProtocolV2::start_server_banner_exchange() {
return WRITE(bad_method, "bad auth method", read_frame);
}
-CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
+CtPtr ProtocolV2::_handle_auth_request(ceph::bufferlist& auth_payload, bool more)
{
if (!messenger->auth_server) {
return _fault();
}
- bufferlist reply;
+ ceph::bufferlist reply;
auto am = auth_meta;
connection->lock.unlock();
int r = messenger->auth_server->handle_auth_request(
ceph_assert(auth_meta);
// TODO: having a possibility to check whether we're server or client could
// allow reusing finish_auth().
- session_stream_handlers = \
- ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
+ bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
+ session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
+ cct, *auth_meta, /*new_nonce_format=*/is_rev1, /*crossed=*/true);
const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
auth_meta->session_key.hmac_sha256(cct, pre_auth.rxbuf);
}
if (state == AUTH_ACCEPTING_SIGN) {
- // server had sent AuthDone and client responded with correct pre-auth
- // signature. we can start accepting new sessions/reconnects.
- state = SESSION_ACCEPTING;
- return CONTINUE(read_frame);
+ // this happened on server side
+ return finish_server_auth();
} else if (state == AUTH_CONNECTING_SIGN) {
// this happened at client side
return finish_client_auth();
} else {
- ceph_assert_always("state corruption" == nullptr);
+ ceph_abort("state corruption");
}
}
peer_global_seq = client_ident.global_seq();
- // Looks good so far, let's check if there is already an existing connection
- // to this peer.
-
- connection->lock.unlock();
- AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
-
- if (existing &&
- existing->protocol->proto_type != 2) {
- ldout(cct,1) << __func__ << " existing " << existing << " proto "
- << existing->protocol.get() << " version is "
- << existing->protocol->proto_type << ", marking down" << dendl;
- existing->mark_down();
- existing = nullptr;
- }
+ if (connection->policy.server &&
+ connection->policy.lossy &&
+ !connection->policy.register_lossy_clients) {
+ // incoming lossy client, no need to register this connection
+ } else {
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+ connection->lock.unlock();
+ AsyncConnectionRef existing = messenger->lookup_conn(
+ *connection->peer_addrs);
+
+ if (existing &&
+ existing->protocol->proto_type != 2) {
+ ldout(cct,1) << __func__ << " existing " << existing << " proto "
+ << existing->protocol.get() << " version is "
+ << existing->protocol->proto_type << ", marking down"
+ << dendl;
+ existing->mark_down();
+ existing = nullptr;
+ }
- connection->inject_delay();
+ connection->inject_delay();
- connection->lock.lock();
- if (state != SESSION_ACCEPTING) {
- ldout(cct, 1) << __func__
- << " state changed while accept, it must be mark_down"
- << dendl;
- ceph_assert(state == CLOSED);
- return _fault();
- }
+ connection->lock.lock();
+ if (state != SESSION_ACCEPTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED);
+ return _fault();
+ }
- if (existing) {
- return handle_existing_connection(existing);
+ if (existing) {
+ return handle_existing_connection(existing);
+ }
}
// if everything is OK reply with server identification
return reuse_connection(existing, exproto);
}
-CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
std::lock_guard<std::mutex> l(existing->lock);
}
}
-CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
ProtocolV2 *exproto) {
ldout(cct, 20) << __func__ << " existing=" << existing
<< " reconnect=" << reconnecting << dendl;
exproto->pre_auth.enabled = false;
if (!reconnecting) {
+ exproto->peer_supported_features = peer_supported_features;
+ exproto->tx_frame_asm.set_is_rev1(tx_frame_asm.get_is_rev1());
+ exproto->rx_frame_asm.set_is_rev1(rx_frame_asm.get_is_rev1());
+
exproto->client_cookie = client_cookie;
exproto->peer_name = peer_name;
exproto->connection_features = connection_features;
}
exproto->peer_global_seq = peer_global_seq;
+ ceph_assert(connection->center->in_thread());
auto temp_cs = std::move(connection->cs);
EventCenter *new_center = connection->center;
Worker *new_worker = connection->worker;
+ // we can steal the session_stream_handlers under the assumption
+ // this happens in the event center's thread as there should be
+ // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+ auto temp_stream_handlers = std::move(session_stream_handlers);
+ auto temp_compression_handlers = std::move(session_compression_handlers);
+ exproto->auth_meta = auth_meta;
+ exproto->comp_meta = comp_meta;
ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
<< dendl;
+
// avoid _stop shutdown replacing socket
// queue a reset on the new connection, which we're dumping for the old
stop();
connection->dispatch_queue->queue_reset(connection);
exproto->can_write = false;
+ exproto->write_in_progress = false;
exproto->reconnecting = reconnecting;
exproto->replacing = true;
- std::swap(exproto->session_stream_handlers, session_stream_handlers);
- exproto->auth_meta = auth_meta;
existing->state_offset = 0;
// avoid previous thread modify event
exproto->state = NONE;
ceph_assert(connection->recv_start == connection->recv_end);
auto deactivate_existing = std::bind(
- [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+ [ existing,
+ new_worker,
+ new_center,
+ exproto,
+ temp_stream_handlers=std::move(temp_stream_handlers),
+ temp_compression_handlers=std::move(temp_compression_handlers)
+ ](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
existing->write_lock.lock();
exproto->requeue_sent();
- existing->outcoming_bl.clear();
+ // XXX: do we really need the locking for `outgoing_bl`? There is
+ // a comment just above its definition saying "lockfree, only used
+ // in own thread". I'm following lockfull schema just in the case.
+ // From performance point of view it should be fine – this happens
+ // far away from hot paths.
+ existing->outgoing_bl.clear();
existing->open_write = false;
+ exproto->session_stream_handlers = std::move(temp_stream_handlers);
+ exproto->session_compression_handlers = std::move(temp_compression_handlers);
existing->write_lock.unlock();
if (exproto->state == NONE) {
existing->shutdown_socket();
ceph_assert(exproto->state == NONE);
exproto->state = SESSION_ACCEPTING;
+ // we have called shutdown_socket above
+ ceph_assert(existing->last_tick_id == 0);
+ // restart timer since we are going to re-build connection
+ existing->last_connect_started = ceph::coarse_mono_clock::now();
+ existing->last_tick_id = existing->center->create_time_event(
+ existing->connect_timeout_us, existing->tick_handler);
existing->state = AsyncConnection::STATE_CONNECTION_ESTABLISHED;
existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE,
existing->read_handler);
<< connection->policy.features_supported
<< " features_required="
<< (connection->policy.features_required | msgr2_required)
- << " flags=" << flags << " cookie=" << std::dec << server_cookie
- << dendl;
+ << " flags=" << flags
+ << " cookie=" << server_cookie << std::dec << dendl;
connection->lock.unlock();
// Because "replacing" will prevent other connections preempt this addr,
return WRITE(reconnect_ok, "reconnect ok", server_ready);
}
+
+
+CtPtr ProtocolV2::handle_compression_request(ceph::bufferlist &payload) {
+ if (state != COMPRESSION_ACCEPTING) {
+ lderr(cct) << __func__ << " state changed!" << dendl;
+ return _fault();
+ }
+
+ auto request = CompressionRequestFrame::Decode(payload);
+ ldout(cct, 10) << __func__ << " CompressionRequestFrame(is_compress=" << request.is_compress()
+ << ", preferred_methods=" << request.preferred_methods() << ")" << dendl;
+
+ const int peer_type = connection->get_peer_type();
+ if (Compressor::CompressionMode mode = messenger->comp_registry.get_mode(
+ peer_type, auth_meta->is_mode_secure());
+ mode != Compressor::COMP_NONE && request.is_compress()) {
+ comp_meta.con_method = messenger->comp_registry.pick_method(peer_type, request.preferred_methods());
+ if (comp_meta.con_method != Compressor::COMP_ALG_NONE) {
+ comp_meta.con_mode = mode;
+ }
+ } else {
+ comp_meta.con_method = Compressor::COMP_ALG_NONE;
+ }
+
+ auto response = CompressionDoneFrame::Encode(comp_meta.is_compress(), comp_meta.get_method());
+
+ INTERCEPT(20);
+ return WRITE(response, "compression done", finish_compression);
+}
+
+CtPtr ProtocolV2::finish_compression() {
+ // TODO: having a possibility to check whether we're server or client could
+ // allow reusing finish_compression().
+
+ session_compression_handlers = ceph::compression::onwire::rxtx_t::create_handler_pair(
+ cct, comp_meta, messenger->comp_registry.get_min_compression_size(connection->get_peer_type()));
+
+ state = SESSION_ACCEPTING;
+ return CONTINUE(read_frame);
+}