AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
Worker *w, bool m2, bool local)
- : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
+ : Connection(cct, m),
+ delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
logger(w->get_perf_counter()),
state(STATE_NONE), port(-1),
dispatch_queue(q), recv_buf(NULL),
ceph_assert(!delay_state);
}
-int AsyncConnection::get_con_mode() const {
+int AsyncConnection::get_con_mode() const
+{
return protocol->get_con_mode();
}
+bool AsyncConnection::is_msgr2() const
+{
+ return protocol->proto_type == 2;
+}
+
void AsyncConnection::maybe_start_delay_thread()
{
if (!delay_state) {
bool more) {
std::unique_lock<std::mutex> l(write_lock);
- outcoming_bl.claim_append(bl);
+ outgoing_bl.claim_append(bl);
ssize_t r = _try_send(more);
if (r > 0) {
writeCallback = callback;
}
ceph_assert(center->in_thread());
- ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length()
+ ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
<< " bytes" << dendl;
- ssize_t r = cs.send(outcoming_bl, more);
+ ssize_t r = cs.send(outgoing_bl, more);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
return r;
}
ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
- << " remaining bytes " << outcoming_bl.length() << dendl;
+ << " remaining bytes " << outgoing_bl.length() << dendl;
if (!open_write && is_queued()) {
center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
}
}
- return outcoming_bl.length();
+ return outgoing_bl.length();
}
void AsyncConnection::inject_delay() {
read_buffer = nullptr;
readCallback(buf_tmp, r);
}
+ logger->tinc(l_msgr_running_recv_time,
+ ceph::mono_clock::now() - recv_start_time);
return;
}
break;
<< this
<< dendl;
+ if (is_blackhole()) {
+ lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
+ << " blackhole " << *m << dendl;
+ m->put();
+ return 0;
+ }
+
// optimistic think it's ok to encode(actually may broken now)
if (!m->get_priority())
m->set_priority(async_msgr->get_default_send_priority());
m->get_header().src = async_msgr->get_myname();
m->set_connection(this);
+#if defined(WITH_EVENTTRACE)
if (m->get_type() == CEPH_MSG_OSD_OP)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
+#endif
- if (async_msgr->get_myaddrs() == get_peer_addrs()) { //loopback connection
+ if (is_loopback) { //loopback connection
ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
std::lock_guard<std::mutex> l(write_lock);
if (protocol->is_connected()) {
recv_start = recv_end = 0;
state_offset = 0;
- outcoming_bl.clear();
+ outgoing_bl.clear();
}
void AsyncConnection::_stop() {
}
bool AsyncConnection::is_queued() const {
- return outcoming_bl.length();
+ return outgoing_bl.length();
}
void AsyncConnection::shutdown_socket() {