]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/rdma/RDMAStack.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
index f63a8e7d1f2b4ea37a30d22106bede264e5f8d41..b68aeb1a8efa11a985a6f17c9db4c12d5b2463b7 100644 (file)
@@ -38,14 +38,10 @@ RDMADispatcher::~RDMADispatcher()
   ceph_assert(qp_conns.empty());
   ceph_assert(num_qp_conn == 0);
   ceph_assert(dead_queue_pairs.empty());
-  ceph_assert(num_dead_queue_pair == 0);
-
-  delete async_handler;
 }
 
-RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
-  : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
-  w_lock("RDMADispatcher::for worker pending list"), stack(s)
+RDMADispatcher::RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib)
+  : cct(c), ib(ib)
 {
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
 
@@ -80,20 +76,20 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
 void RDMADispatcher::polling_start()
 {
   // take lock because listen/connect can happen from different worker threads
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
 
   if (t.joinable()) 
     return; // dispatcher thread already running 
 
-  get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
+  ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
 
-  tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+  tx_cc = ib->create_comp_channel(cct);
   ceph_assert(tx_cc);
-  rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+  rx_cc = ib->create_comp_channel(cct);
   ceph_assert(rx_cc);
-  tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
+  tx_cq = ib->create_comp_queue(cct, tx_cc);
   ceph_assert(tx_cq);
-  rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
+  rx_cq = ib->create_comp_queue(cct, rx_cc);
   ceph_assert(rx_cq);
 
   t = std::thread(&RDMADispatcher::polling, this);
@@ -103,7 +99,7 @@ void RDMADispatcher::polling_start()
 void RDMADispatcher::polling_stop()
 {
   {
-    Mutex::Locker l(lock);
+    std::lock_guard l{lock};
     done = true;
   }
 
@@ -125,34 +121,128 @@ void RDMADispatcher::handle_async_event()
   ldout(cct, 30) << __func__ << dendl;
   while (1) {
     ibv_async_event async_event;
-    if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
+    if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
       if (errno != EAGAIN)
        lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
                   << " " << cpp_strerror(errno) << ")" << dendl;
       return;
     }
     perf_logger->inc(l_msgr_rdma_total_async_events);
-    // FIXME: Currently we must ensure no other factor make QP in ERROR state,
-    // otherwise this qp can't be deleted in current cleanup flow.
-    if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
-      perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
-      uint64_t qpn = async_event.element.qp->qp_num;
-      ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
-                     << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
-      Mutex::Locker l(lock);
-      RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
-      if (!conn) {
-        ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
-      } else {
-        ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
-        conn->fault();
-        if (!cct->_conf->ms_async_rdma_cm)
-          erase_qpn_lockless(qpn);
-      }
-    } else {
-      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
-                    << " evt: " << ibv_event_type_str(async_event.event_type)
-                    << dendl;
+    ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+
+    switch (async_event.event_type) {
+      /***********************CQ events********************/
+      case IBV_EVENT_CQ_ERR:
+        lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
+                   << " CQ Overflow, dev = " << ib->get_device()->ctxt
+                   << " Need destroy and recreate resource " << dendl;
+        break;
+      /***********************QP events********************/
+      case IBV_EVENT_QP_FATAL:
+        {
+          /* Error occurred on a QP and it transitioned to error state */
+          ibv_qp* ib_qp = async_event.element.qp;
+          uint32_t qpn = ib_qp->qp_num;
+          QueuePair* qp = get_qp(qpn);
+          lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
+                     << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
+                     << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+        }
+        break;
+      case IBV_EVENT_QP_LAST_WQE_REACHED:
+        {
+          /*
+           * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
+           *    Reason: QP is force switched into Error before posting Beacon WR.
+           *            The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
+           *            For SRQ, only WRs on the QP which is switched into Error status will be flushed.
+           *    Handle: Only confirm that qp enter into dead queue pairs
+           * 2. The CQE with error was generated for the last WQE
+           *    Handle: output error log
+           */
+          perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+          ibv_qp* ib_qp = async_event.element.qp;
+          uint32_t qpn = ib_qp->qp_num;
+          std::lock_guard l{lock};
+          RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+          QueuePair* qp = get_qp_lockless(qpn);
+
+          if (qp && !qp->is_dead()) {
+            lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
+                       << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
+                       << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+          }
+          if (!conn) {
+            ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
+                           << " qp number: " << qpn << dendl;
+          } else {
+             conn->fault();
+             if (qp) {
+                if (!cct->_conf->ms_async_rdma_cm)
+                enqueue_dead_qp(qpn);
+             }
+          }
+        }
+        break;
+      case IBV_EVENT_QP_REQ_ERR:
+        /* Invalid Request Local Work Queue Error */
+        [[fallthrough]];
+      case IBV_EVENT_QP_ACCESS_ERR:
+        /* Local access violation error */
+        [[fallthrough]];
+      case IBV_EVENT_COMM_EST:
+        /* Communication was established on a QP */
+        [[fallthrough]];
+      case IBV_EVENT_SQ_DRAINED:
+        /* Send Queue was drained of outstanding messages in progress */
+        [[fallthrough]];
+      case IBV_EVENT_PATH_MIG:
+        /* A connection has migrated to the alternate path */
+        [[fallthrough]];
+      case IBV_EVENT_PATH_MIG_ERR:
+        /* A connection failed to migrate to the alternate path */
+        break;
+      /***********************SRQ events*******************/
+      case IBV_EVENT_SRQ_ERR:
+        /* Error occurred on an SRQ */
+        [[fallthrough]];
+      case IBV_EVENT_SRQ_LIMIT_REACHED:
+        /* SRQ limit was reached */
+        break;
+      /***********************Port events******************/
+      case IBV_EVENT_PORT_ACTIVE:
+        /* Link became active on a port */
+        [[fallthrough]];
+      case IBV_EVENT_PORT_ERR:
+        /* Link became unavailable on a port */
+        [[fallthrough]];
+      case IBV_EVENT_LID_CHANGE:
+        /* LID was changed on a port */
+        [[fallthrough]];
+      case IBV_EVENT_PKEY_CHANGE:
+        /* P_Key table was changed on a port */
+        [[fallthrough]];
+      case IBV_EVENT_SM_CHANGE:
+        /* SM was changed on a port */
+        [[fallthrough]];
+      case IBV_EVENT_CLIENT_REREGISTER:
+        /* SM sent a CLIENT_REREGISTER request to a port */
+        [[fallthrough]];
+      case IBV_EVENT_GID_CHANGE:
+        /* GID table was changed on a port */
+        break;
+
+      /***********************CA events******************/
+      //CA events:
+      case IBV_EVENT_DEVICE_FATAL:
+        /* CA is in FATAL state */
+        lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
+                   << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+        break;
+      default:
+        lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
+                   << " unknown event: " << async_event.event_type << dendl;
+        break;
     }
     ibv_ack_async_event(&async_event);
   }
@@ -160,15 +250,15 @@ void RDMADispatcher::handle_async_event()
 
 void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
 {
-  Mutex::Locker l(lock);
-  get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+  std::lock_guard l{lock};
+  ib->post_chunk_to_pool(chunk);
   perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
 }
 
-int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
+int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
 {
-  Mutex::Locker l(lock);
-  return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
+  std::lock_guard l{lock};
+  return ib->post_chunks_to_rq(num, qp);
 }
 
 void RDMADispatcher::polling()
@@ -179,7 +269,6 @@ void RDMADispatcher::polling()
   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
   std::vector<ibv_wc> tx_cqe;
   ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
-  RDMAConnectedSocketImpl *conn = nullptr;
   uint64_t last_inactive = Cycles::rdtsc();
   bool rearmed = false;
   int r = 0;
@@ -196,70 +285,35 @@ void RDMADispatcher::polling()
     if (rx_ret > 0) {
       ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
                      << " responses."<< dendl;
-      perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
-      perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
-
-      Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-
-      for (int i = 0; i < rx_ret; ++i) {
-        ibv_wc* response = &wc[i];
-        Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-
-        if (response->status == IBV_WC_SUCCESS) {
-          ceph_assert(wc[i].opcode == IBV_WC_RECV);
-          conn = get_conn_lockless(response->qp_num);
-          if (!conn) {
-            ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
-            get_stack()->get_infiniband().post_chunk_to_pool(chunk);
-            perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-          } else {
-            conn->post_chunks_to_rq(1);
-            polled[conn].push_back(*response);
-          }
-        } else {
-          perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
-          ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
-              << ") status(" << response->status << ":"
-              << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
-          if (response->status != IBV_WC_WR_FLUSH_ERR) {
-            conn = get_conn_lockless(response->qp_num);
-            if (conn && conn->is_connected())
-              conn->fault();
-          }
-          get_stack()->get_infiniband().post_chunk_to_pool(chunk);
-          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
-        }
-      }
-      for (auto &&i : polled)
-        i.first->pass_wc(std::move(i.second));
-      polled.clear();
+      handle_rx_event(wc, rx_ret);
     }
 
     if (!tx_ret && !rx_ret) {
-      // NOTE: Has TX just transitioned to idle? We should do it when idle!
-      // It's now safe to delete queue pairs (see comment by declaration
-      // for dead_queue_pairs).
-      // Additionally, don't delete qp while outstanding_buffers isn't empty,
-      // because we need to check qp's state before sending
       perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
-      if (num_dead_queue_pair) {
-        Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
-        auto it = dead_queue_pairs.begin();
-        while (it != dead_queue_pairs.end()) {
-          auto i = *it;
-          // Bypass QPs that do not collect all Tx completions yet.
-          if (i->get_tx_wr()) {
-            ldout(cct, 20) << __func__ << " bypass qp=" << i << " tx_wr=" << i->get_tx_wr() << dendl;
-            ++it;
-          } else {
-            ldout(cct, 10) << __func__ << " finally delete qp=" << i << dendl;
-            delete i;
-            it = dead_queue_pairs.erase(it);
-            perf_logger->dec(l_msgr_rdma_active_queue_pair);
-            --num_dead_queue_pair;
-          }
+      //
+      // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
+      // we can destroy QPs even earlier, just when beacon has been received,
+      // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
+      // CQ before other WCs are fully consumed from rx CQ. For safety, we
+      // wait for beacon and then "no-events" from CQs.
+      //
+      // Calling size() on vector without locks is totally fine, since we
+      // use it as a hint (accuracy is not important here)
+      //
+      if (!dead_queue_pairs.empty()) {
+        decltype(dead_queue_pairs) dead_qps;
+        {
+          std::lock_guard l{lock};
+          dead_queue_pairs.swap(dead_qps);
+        }
+
+        for (auto& qp: dead_qps) {
+          perf_logger->dec(l_msgr_rdma_active_queue_pair);
+          ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
+          delete qp;
         }
       }
+
       if (!num_qp_conn && done && dead_queue_pairs.empty())
         break;
 
@@ -277,10 +331,10 @@ void RDMADispatcher::polling()
 
         struct pollfd channel_poll[2];
         channel_poll[0].fd = tx_cc->get_fd();
-        channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[0].events = POLLIN;
         channel_poll[0].revents = 0;
         channel_poll[1].fd = rx_cc->get_fd();
-        channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[1].events = POLLIN;
         channel_poll[1].revents = 0;
         r = 0;
         perf_logger->set(l_msgr_rdma_polling, 0);
@@ -308,7 +362,7 @@ void RDMADispatcher::notify_pending_workers() {
   if (num_pending_workers) {
     RDMAWorker *w = nullptr;
     {
-      Mutex::Locker l(w_lock);
+      std::lock_guard l{w_lock};
       if (!pending_workers.empty()) {
         w = pending_workers.front();
         pending_workers.pop_front();
@@ -322,7 +376,7 @@ void RDMADispatcher::notify_pending_workers() {
 
 void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
 {
-  Mutex::Locker l(lock);
+  std::lock_guard l{lock};
   ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
   qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
   ++num_qp_conn;
@@ -338,9 +392,8 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
   return it->second.second;
 }
 
-Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
+Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
 {
-  Mutex::Locker l(lock);
   // Try to find the QP in qp_conns firstly.
   auto it = qp_conns.find(qp);
   if (it != qp_conns.end())
@@ -354,21 +407,50 @@ Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
   return nullptr;
 }
 
-void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
+Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
 {
+  std::lock_guard l{lock};
+  return get_qp_lockless(qp);
+}
+
+void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
+{
+  std::lock_guard l{lock};
   auto it = qp_conns.find(qpn);
-  if (it == qp_conns.end())
+  if (it == qp_conns.end()) {
+    lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
     return ;
-  ++num_dead_queue_pair;
-  dead_queue_pairs.push_back(it->second.first);
+  }
+  QueuePair *qp = it->second.first;
+  dead_queue_pairs.push_back(qp);
   qp_conns.erase(it);
   --num_qp_conn;
 }
 
-void RDMADispatcher::erase_qpn(uint32_t qpn)
+void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
 {
-  Mutex::Locker l(lock);
-  erase_qpn_lockless(qpn);
+  std::lock_guard l{lock};
+  auto it = qp_conns.find(qpn);
+  if (it == qp_conns.end()) {
+    lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
+    return;
+  }
+  QueuePair *qp = it->second.first;
+  if (qp->to_dead()) {
+    //
+    // Failed to switch to dead. This is abnormal, but we can't
+    // do anything, so just destroy QP.
+    //
+    dead_queue_pairs.push_back(qp);
+    qp_conns.erase(it);
+    --num_qp_conn;
+  } else {
+    //
+    // Successfully switched to dead, thus keep entry in the map.
+    // But only zero out socked pointer in order to return null from
+    // get_conn_lockless();
+    it->second.second = nullptr;
+  }
 }
 
 void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
@@ -377,44 +459,87 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
 
   for (int i = 0; i < n; ++i) {
     ibv_wc* response = &cqe[i];
-    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-    ldout(cct, 25) << __func__ << " QP: " << response->qp_num
-                   << " len: " << response->byte_len << " , addr:" << chunk
-                   << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
 
-    QueuePair *qp = get_qp(response->qp_num);
-    if (qp)
-      qp->dec_tx_wr(1);
+    // If it's beacon WR, enqueue the QP to be destroyed later
+    if (response->wr_id == BEACON_WRID) {
+      enqueue_dead_qp(response->qp_num);
+      continue;
+    }
+
+    ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
+                   << " status: " << ib->wc_status_to_string(response->status) << dendl;
 
     if (response->status != IBV_WC_SUCCESS) {
-      perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
-      if (response->status == IBV_WC_RETRY_EXC_ERR) {
-        ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
-        perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
-      } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
-        ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
-                      << response->qp_num << " should be down while this WR=" << response->wr_id
-                      << " still in flight." << dendl;
-        perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
-      } else {
-        ldout(cct, 1) << __func__ << " send work request returned error for buffer("
-                      << response->wr_id << ") status(" << response->status << "): "
-                      << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
-        Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-        RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
-
-        if (conn && conn->is_connected()) {
-          ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;
-          conn->fault();
-        } else {
-          ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
-        }
+      switch(response->status) {
+        case IBV_WC_RETRY_EXC_ERR:
+          {
+            perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
+
+            ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
+                          << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+                          << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+                          << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+            std::lock_guard l{lock};
+            RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+            if (conn) {
+              ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
+                            << conn->get_peer_qpn() << dendl;
+            }
+          }
+          break;
+        case IBV_WC_WR_FLUSH_ERR:
+          {
+            perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
+
+            std::lock_guard l{lock};
+            QueuePair *qp = get_qp_lockless(response->qp_num);
+            if (qp) {
+              ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+            }
+            if (qp && qp->is_dead()) {
+              ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
+            } else {
+              lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
+                         << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+                         << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+                         << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+              RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+              if (conn) {
+                ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
+                              << conn->get_peer_qpn() << dendl;
+              }
+            }
+          }
+          break;
+
+        default:
+          {
+            lderr(cct) << __func__ << " SQ WR return error,"
+                       << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+                       << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+                       << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+            std::lock_guard l{lock};
+            RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+            if (conn && conn->is_connected()) {
+              ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
+                             << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
+              conn->fault();
+            } else {
+              ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
+            }
+          }
+          break;
       }
     }
 
-    //TX completion may come either from regular send message or from 'fin' message.
-    //In the case of 'fin' wr_id points to the QueuePair.
-    if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
+    auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
+    //TX completion may come either from
+    // 1) regular send message, WCE wr_id points to chunk
+    // 2) 'fin' message, wr_id points to the QP
+    if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
       tx_chunks.push_back(chunk);
     } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
       ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
@@ -442,16 +567,92 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
     return ;
 
   inflight -= chunks.size();
-  get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
+  ib->get_memory_manager()->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size()
                  << " chunks, inflight " << inflight << dendl;
   notify_pending_workers();
 }
 
+void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
+{
+  perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
+  perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
+
+  std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+  std::lock_guard l{lock};//make sure connected socket alive when pass wc
+
+  for (int i = 0; i < rx_number; ++i) {
+    ibv_wc* response = &cqe[i];
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+    RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+    QueuePair *qp = get_qp_lockless(response->qp_num);
+
+    switch (response->status) {
+      case IBV_WC_SUCCESS:
+        ceph_assert(response->opcode == IBV_WC_RECV);
+        if (!conn) {
+          ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
+                        << std::hex << chunk << " will be back." << std::dec << dendl;
+          ib->post_chunk_to_pool(chunk);
+          perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+        } else {
+          conn->post_chunks_to_rq(1);
+          polled[conn].push_back(*response);
+
+          if (qp != nullptr && !qp->get_srq()) {
+            qp->remove_rq_wr(chunk);
+            chunk->clear_qp();
+          }
+        }
+        break;
+
+      case IBV_WC_WR_FLUSH_ERR:
+        perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+        if (qp) {
+          ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+        }
+        if (qp && qp->is_dead()) {
+          ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
+        } else {
+          ldout(cct, 1) << __func__ << " RQ WR return error,"
+                     << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+                     << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+                     << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+          if (conn) {
+            ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
+                       << conn->get_peer_qpn() << dendl;
+          }
+        }
+
+        ib->post_chunk_to_pool(chunk);
+        perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+        break;
 
-RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
-  : Worker(c, i), stack(nullptr),
-    tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
+      default:
+        perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+        ldout(cct, 1) << __func__ << " RQ WR return error,"
+                      << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+                      << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+                      << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+        if (conn && conn->is_connected())
+          conn->fault();
+
+        ib->post_chunk_to_pool(chunk);
+        perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+        break;
+    }
+  }
+
+  for (auto &i : polled)
+    i.first->pass_wc(std::move(i.second));
+  polled.clear();
+}
+
+RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
+  : Worker(c, worker_id),
+    tx_handler(new C_handle_cq_tx(this))
 {
   // initialize perf_logger
   char name[128];
@@ -479,25 +680,20 @@ RDMAWorker::~RDMAWorker()
 
 void RDMAWorker::initialize()
 {
-  if (!dispatcher) {
-    dispatcher = &stack->get_dispatcher();
-  }
+  ceph_assert(dispatcher);
 }
 
 int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
                       const SocketOptions &opt,ServerSocket *sock)
 {
-  get_stack()->get_infiniband().init();
+  ib->init();
   dispatcher->polling_start();
+
   RDMAServerSocketImpl *p;
   if (cct->_conf->ms_async_rdma_type == "iwarp") {
-    p = new RDMAIWARPServerSocketImpl(
-      cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this,
-      sa, addr_slot);
+    p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
   } else {
-    p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(),
-                                &get_stack()->get_dispatcher(), this, sa,
-                                addr_slot);
+    p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
   }
   int r = p->listen(sa, opt);
   if (r < 0) {
@@ -511,14 +707,14 @@ int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
 
 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
 {
-  get_stack()->get_infiniband().init();
+  ib->init();
   dispatcher->polling_start();
 
   RDMAConnectedSocketImpl* p;
   if (cct->_conf->ms_async_rdma_type == "iwarp") {
-    p = new RDMAIWARPConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+    p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
   } else {
-    p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+    p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
   }
   int r = p->try_connect(addr, opts);
 
@@ -535,11 +731,10 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
   ceph_assert(center.in_thread());
-  int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
-  ceph_assert(r >= 0);
-  size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
+  int r = ib->get_tx_buffers(c, bytes);
+  size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
-  stack->get_dispatcher().inflight += r;
+  dispatcher->inflight += r;
   if (got >= bytes)
     return r;
 
@@ -578,16 +773,18 @@ void RDMAWorker::handle_pending_message()
 }
 
 RDMAStack::RDMAStack(CephContext *cct, const string &t)
-  : NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
+  : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
+    rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
 {
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
 
   unsigned num = get_num_worker();
   for (unsigned i = 0; i < num; ++i) {
     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
-    w->set_stack(this);
+    w->set_dispatcher(rdma_dispatcher);
+    w->set_ib(ib);
   }
-  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
+  ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
 }
 
 RDMAStack::~RDMAStack()