connection->dispatch_queue->discard_queue(connection->conn_id);
discard_out_queue();
- connection->outcoming_bl.clear();
+ connection->outgoing_bl.clear();
connection->dispatch_queue->queue_remote_reset(connection);
ldout(cct, 5) << __func__ << " requeueing message m=" << m
<< " seq=" << m->get_seq() << " type=" << m->get_type() << " "
<< *m << dendl;
+ m->clear_payload();
rq.emplace_front(out_queue_entry_t{false, m});
}
}
return count;
}
-void ProtocolV2::reset_recv_state() {
+void ProtocolV2::reset_security() {
+ ldout(cct, 5) << __func__ << dendl;
+
auth_meta.reset(new AuthConnectionMeta);
- session_stream_handlers.tx.reset(nullptr);
session_stream_handlers.rx.reset(nullptr);
- pre_auth.txbuf.clear();
+ session_stream_handlers.tx.reset(nullptr);
pre_auth.rxbuf.clear();
+ pre_auth.txbuf.clear();
+}
+
+// it's expected the `write_lock` is held while calling this method.
+void ProtocolV2::reset_recv_state() {
+ ldout(cct, 5) << __func__ << dendl;
+
+ if (!connection->center->in_thread()) {
+ // execute in the same thread that uses the rx/tx handlers. We need
+ // to do the warp because holding `write_lock` is not enough as
+ // `write_event()` unlocks it just before calling `write_message()`.
+ // `submit_to()` here is NOT blocking.
+ connection->center->submit_to(connection->center->get_id(), [this] {
+ ldout(cct, 5) << "reset_recv_state (warped) reseting crypto handlers"
+ << dendl;
+ // Possibly unnecessary. See the comment in `deactivate_existing`.
+ std::lock_guard<std::mutex> l(connection->lock);
+ std::lock_guard<std::mutex> wl(connection->write_lock);
+ reset_security();
+ }, /* always_async = */true);
+ } else {
+ reset_security();
+ }
// clean read and write callbacks
connection->pendingReadLen.reset();
ldout(cct, 20) << __func__ << " m=" << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
- if (m->empty_payload()) {
- ldout(cct, 20) << __func__ << " encoding features " << features << " " << m
- << " " << *m << dendl;
- } else {
- ldout(cct, 20) << __func__ << " half-reencoding features " << features
- << " " << m << " " << *m << dendl;
- }
+ ldout(cct, 20) << __func__ << (m->empty_payload() ? " encoding features " : " half-reencoding features ")
+ << features << " " << m << " " << *m << dendl;
// encode and copy out of *m
m->encode(features, 0);
} else {
ldout(cct, 5) << __func__ << " enqueueing message m=" << m
<< " type=" << m->get_type() << " " << *m << dendl;
+ m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(
out_queue_entry_t{is_prepared, m});
m->get_payload(),
m->get_middle(),
m->get_data());
- connection->outcoming_bl.append(message.get_buffer(session_stream_handlers));
+ connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
<< " src=" << entity_name_t(messenger->get_myname())
<< " off=" << header2.data_off
<< dendl;
- ssize_t total_send_size = connection->outcoming_bl.length();
+ ssize_t total_send_size = connection->outgoing_bl.length();
ssize_t rc = connection->_try_send(more);
if (rc < 0) {
ldout(cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(rc) << dendl;
} else {
connection->logger->inc(
- l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length());
+ l_msgr_send_bytes, total_send_size - connection->outgoing_bl.length());
ldout(cct, 10) << __func__ << " sending " << m
<< (rc ? " continuely." : " done.") << dendl;
}
+
+#if defined(WITH_EVENTTRACE)
if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false);
+#endif
m->put();
return rc;
void ProtocolV2::append_keepalive() {
ldout(cct, 10) << __func__ << dendl;
auto keepalive_frame = KeepAliveFrame::Encode();
- connection->outcoming_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
+ connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
}
void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
- connection->outcoming_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+ connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
}
void ProtocolV2::handle_message_ack(uint64_t seq) {
static const int max_pending = 128;
int i = 0;
Message *pending[max_pending];
+ auto now = ceph::mono_clock::now();
connection->write_lock.lock();
while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) {
Message *m = sent.front();
<< dendl;
}
connection->write_lock.unlock();
+ connection->logger->tinc(l_msgr_handle_ack_lat, ceph::mono_clock::now() - now);
for (int k = 0; k < i; k++) {
pending[k]->put();
}
prepare_send_message(connection->get_features(), out_entry.m);
}
+ if (out_entry.m->queue_start != ceph::mono_time()) {
+ connection->logger->tinc(l_msgr_send_messages_queue_lat,
+ ceph::mono_clock::now() -
+ out_entry.m->queue_start);
+ }
+
r = write_message(out_entry.m, more);
connection->write_lock.lock();
} else if (r < 0) {
ldout(cct, 1) << __func__ << " send msg failed" << dendl;
break;
- } else if (r > 0)
+ } else if (r > 0) {
+ // Outbound message in-progress, thread will be re-awoken
+ // when the outbound socket is writeable again
break;
+ }
} while (can_write);
write_in_progress = false;
if (r == 0) {
uint64_t left = ack_left;
if (left) {
- ceph_le64 s;
- s = in_seq;
auto ack = AckFrame::Encode(in_seq);
- connection->outcoming_bl.append(ack.get_buffer(session_stream_handlers));
+ connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
ack_left -= left;
}
CtPtr ProtocolV2::_wait_for_peer_banner() {
- unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
return READ(banner_len, _handle_peer_banner);
}
uint16_t payload_len;
bufferlist bl;
buffer->set_offset(banner_prefix_len);
- buffer->set_length(sizeof(__le16));
+ buffer->set_length(sizeof(ceph_le16));
bl.push_back(std::move(buffer));
auto ti = bl.cbegin();
try {
ldout(cct, 20) << __func__ << dendl;
ceph_assert(state == THROTTLE_DONE);
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
- ltt_recv_stamp = ceph_clock_now();
+#if defined(WITH_EVENTTRACE)
+ utime_t ltt_recv_stamp = ceph_clock_now();
#endif
recv_stamp = ceph_clock_now();
}
}
-#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
+#if defined(WITH_EVENTTRACE)
if (message->get_type() == CEPH_MSG_OSD_OP ||
message->get_type() == CEPH_MSG_OSD_OPREPLY) {
utime_t ltt_processed_stamp = ceph_clock_now();
state = READY;
+ ceph::mono_time fast_dispatch_time;
+
+ if (connection->is_blackhole()) {
+ ldout(cct, 10) << __func__ << " blackhole " << *message << dendl;
+ message->put();
+ goto out;
+ }
+
connection->logger->inc(l_msgr_recv_messages);
connection->logger->inc(
l_msgr_recv_bytes,
cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
messenger->ms_fast_preprocess(message);
- auto fast_dispatch_time = ceph::mono_clock::now();
+ fast_dispatch_time = ceph::mono_clock::now();
connection->logger->tinc(l_msgr_running_recv_time,
- fast_dispatch_time - connection->recv_start_time);
+ fast_dispatch_time - connection->recv_start_time);
if (connection->delay_state) {
double delay_period = 0;
if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) {
connection->logger->tinc(l_msgr_running_fast_dispatch_time,
connection->recv_start_time - fast_dispatch_time);
connection->lock.lock();
+ // we might have been reused by another connection
+ // let's check if that is the case
+ if (state != READY) {
+ // yes, that was the case, let's do nothing
+ return nullptr;
+ }
} else {
connection->dispatch_queue->enqueue(message, message->get_priority(),
connection->conn_id);
handle_message_ack(current_header.ack_seq);
- // we might have been reused by another connection
- // let's check if that is the case
- if (state != READY) {
- // yes, that was the case, let's do nothing
- return nullptr;
- }
-
+ out:
if (need_dispatch_writer && connection->is_connected()) {
connection->center->dispatch_event_external(connection->write_handler);
}
}
CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
+ ceph_assert(messenger->auth_client);
ldout(cct, 20) << __func__ << " peer_type " << (int)connection->peer_type
<< " auth_client " << messenger->auth_client << dendl;
- ceph_assert(messenger->auth_client);
bufferlist bl;
vector<uint32_t> preferred_modes;
// this happened at client side
return finish_client_auth();
} else {
- ceph_assert_always("state corruption" == nullptr);
+ ceph_abort("state corruption");
}
}
peer_global_seq = client_ident.global_seq();
- // Looks good so far, let's check if there is already an existing connection
- // to this peer.
-
- connection->lock.unlock();
- AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
-
- if (existing &&
- existing->protocol->proto_type != 2) {
- ldout(cct,1) << __func__ << " existing " << existing << " proto "
- << existing->protocol.get() << " version is "
- << existing->protocol->proto_type << ", marking down" << dendl;
- existing->mark_down();
- existing = nullptr;
- }
+ if (connection->policy.server &&
+ connection->policy.lossy &&
+ !connection->policy.register_lossy_clients) {
+ // incoming lossy client, no need to register this connection
+ } else {
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+ connection->lock.unlock();
+ AsyncConnectionRef existing = messenger->lookup_conn(
+ *connection->peer_addrs);
+
+ if (existing &&
+ existing->protocol->proto_type != 2) {
+ ldout(cct,1) << __func__ << " existing " << existing << " proto "
+ << existing->protocol.get() << " version is "
+ << existing->protocol->proto_type << ", marking down"
+ << dendl;
+ existing->mark_down();
+ existing = nullptr;
+ }
- connection->inject_delay();
+ connection->inject_delay();
- connection->lock.lock();
- if (state != SESSION_ACCEPTING) {
- ldout(cct, 1) << __func__
- << " state changed while accept, it must be mark_down"
- << dendl;
- ceph_assert(state == CLOSED);
- return _fault();
- }
+ connection->lock.lock();
+ if (state != SESSION_ACCEPTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED);
+ return _fault();
+ }
- if (existing) {
- return handle_existing_connection(existing);
+ if (existing) {
+ return handle_existing_connection(existing);
+ }
}
// if everything is OK reply with server identification
return reuse_connection(existing, exproto);
}
-CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
+CtPtr ProtocolV2::handle_existing_connection(const AsyncConnectionRef& existing) {
ldout(cct, 20) << __func__ << " existing=" << existing << dendl;
std::lock_guard<std::mutex> l(existing->lock);
}
}
-CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing,
+CtPtr ProtocolV2::reuse_connection(const AsyncConnectionRef& existing,
ProtocolV2 *exproto) {
ldout(cct, 20) << __func__ << " existing=" << existing
<< " reconnect=" << reconnecting << dendl;
}
exproto->peer_global_seq = peer_global_seq;
+ ceph_assert(connection->center->in_thread());
auto temp_cs = std::move(connection->cs);
EventCenter *new_center = connection->center;
Worker *new_worker = connection->worker;
+ // we can steal the session_stream_handlers under the assumption
+ // this happens in the event center's thread as there should be
+ // no user outside its boundaries (simlarly to e.g. outgoing_bl).
+ auto temp_stream_handlers = std::move(session_stream_handlers);
+ exproto->auth_meta = auth_meta;
ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
<< dendl;
- std::swap(exproto->session_stream_handlers, session_stream_handlers);
- exproto->auth_meta = auth_meta;
-
// avoid _stop shutdown replacing socket
// queue a reset on the new connection, which we're dumping for the old
stop();
ceph_assert(connection->recv_start == connection->recv_end);
auto deactivate_existing = std::bind(
- [existing, new_worker, new_center, exproto](ConnectedSocket &cs) mutable {
+ [ existing,
+ new_worker,
+ new_center,
+ exproto,
+ temp_stream_handlers=std::move(temp_stream_handlers)
+ ](ConnectedSocket &cs) mutable {
// we need to delete time event in original thread
{
std::lock_guard<std::mutex> l(existing->lock);
existing->write_lock.lock();
exproto->requeue_sent();
- existing->outcoming_bl.clear();
+ // XXX: do we really need the locking for `outgoing_bl`? There is
+ // a comment just above its definition saying "lockfree, only used
+ // in own thread". I'm following lockfull schema just in the case.
+ // From performance point of view it should be fine – this happens
+ // far away from hot paths.
+ existing->outgoing_bl.clear();
existing->open_write = false;
+ exproto->session_stream_handlers = std::move(temp_stream_handlers);
existing->write_lock.unlock();
if (exproto->state == NONE) {
existing->shutdown_socket();