]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/AsyncConnection.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / AsyncConnection.cc
index cab4145de715711c2722473efa0e050d49643c00..82ef1389734b4a7c6d16fb011001762876faeca9 100644 (file)
@@ -113,7 +113,8 @@ class C_tick_wakeup : public EventCallback {
 
 AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q,
                                  Worker *w, bool m2, bool local)
-  : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
+  : Connection(cct, m),
+    delay_state(NULL), async_msgr(m), conn_id(q->get_id()),
     logger(w->get_perf_counter()),
     state(STATE_NONE), port(-1),
     dispatch_queue(q), recv_buf(NULL),
@@ -152,10 +153,16 @@ AsyncConnection::~AsyncConnection()
   ceph_assert(!delay_state);
 }
 
-int AsyncConnection::get_con_mode() const {
+int AsyncConnection::get_con_mode() const
+{
   return protocol->get_con_mode();
 }
 
+bool AsyncConnection::is_msgr2() const
+{
+  return protocol->proto_type == 2;
+}
+
 void AsyncConnection::maybe_start_delay_thread()
 {
   if (!delay_state) {
@@ -298,7 +305,7 @@ ssize_t AsyncConnection::write(bufferlist &bl,
                                bool more) {
 
     std::unique_lock<std::mutex> l(write_lock);
-    outcoming_bl.claim_append(bl);
+    outgoing_bl.claim_append(bl);
     ssize_t r = _try_send(more);
     if (r > 0) {
       writeCallback = callback;
@@ -318,16 +325,16 @@ ssize_t AsyncConnection::_try_send(bool more)
   }
 
   ceph_assert(center->in_thread());
-  ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outcoming_bl.length()
+  ldout(async_msgr->cct, 25) << __func__ << " cs.send " << outgoing_bl.length()
                              << " bytes" << dendl;
-  ssize_t r = cs.send(outcoming_bl, more);
+  ssize_t r = cs.send(outgoing_bl, more);
   if (r < 0) {
     ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl;
     return r;
   }
 
   ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r
-                             << " remaining bytes " << outcoming_bl.length() << dendl;
+                             << " remaining bytes " << outgoing_bl.length() << dendl;
 
   if (!open_write && is_queued()) {
     center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler);
@@ -342,7 +349,7 @@ ssize_t AsyncConnection::_try_send(bool more)
     }
   }
 
-  return outcoming_bl.length();
+  return outgoing_bl.length();
 }
 
 void AsyncConnection::inject_delay() {
@@ -449,6 +456,8 @@ void AsyncConnection::process() {
           read_buffer = nullptr;
           readCallback(buf_tmp, r);
         }
+       logger->tinc(l_msgr_running_recv_time,
+           ceph::mono_clock::now() - recv_start_time);
         return;
       }
       break;
@@ -516,6 +525,13 @@ int AsyncConnection::send_message(Message *m)
                      << this
                      << dendl;
 
+  if (is_blackhole()) {
+    lgeneric_subdout(async_msgr->cct, ms, 0) << __func__ << ceph_entity_type_name(peer_type)
+      << " blackhole " << *m << dendl;
+    m->put();
+    return 0;
+  }
+
   // optimistic think it's ok to encode(actually may broken now)
   if (!m->get_priority())
     m->set_priority(async_msgr->get_default_send_priority());
@@ -523,12 +539,14 @@ int AsyncConnection::send_message(Message *m)
   m->get_header().src = async_msgr->get_myname();
   m->set_connection(this);
 
+#if defined(WITH_EVENTTRACE)
   if (m->get_type() == CEPH_MSG_OSD_OP)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_BEGIN", true);
   else if (m->get_type() == CEPH_MSG_OSD_OPREPLY)
     OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_BEGIN", true);
+#endif
 
-  if (async_msgr->get_myaddrs() == get_peer_addrs()) { //loopback connection
+  if (is_loopback) { //loopback connection
     ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
     std::lock_guard<std::mutex> l(write_lock);
     if (protocol->is_connected()) {
@@ -578,7 +596,7 @@ void AsyncConnection::fault()
 
   recv_start = recv_end = 0;
   state_offset = 0;
-  outcoming_bl.clear();
+  outgoing_bl.clear();
 }
 
 void AsyncConnection::_stop() {
@@ -596,7 +614,7 @@ void AsyncConnection::_stop() {
 }
 
 bool AsyncConnection::is_queued() const {
-  return outcoming_bl.length();
+  return outgoing_bl.length();
 }
 
 void AsyncConnection::shutdown_socket() {