]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/rdma/RDMAStack.cc
update sources to v12.1.0
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.cc
index b1dae3bcf5c2f02fa46d8ba9e4efc974c77e3391..020dd9853edde2655d52c378414a988c22f29d27 100644 (file)
@@ -14,6 +14,7 @@
  *
  */
 
+#include <poll.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 
@@ -21,8 +22,6 @@
 #include "common/deleter.h"
 #include "common/Tub.h"
 #include "RDMAStack.h"
-#include "RDMAConnTCP.h"
-#include "Device.h"
 
 #define dout_subsys ceph_subsys_ms
 #undef dout_prefix
@@ -32,20 +31,28 @@ static Tub<Infiniband> global_infiniband;
 
 RDMADispatcher::~RDMADispatcher()
 {
+  done = true;
   polling_stop();
-
   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
 
-  global_infiniband->set_dispatcher(nullptr);
-
   assert(qp_conns.empty());
   assert(num_qp_conn == 0);
   assert(dead_queue_pairs.empty());
   assert(num_dead_queue_pair == 0);
+
+  tx_cc->ack_events();
+  rx_cc->ack_events();
+  delete tx_cq;
+  delete rx_cq;
+  delete tx_cc;
+  delete rx_cc;
+  delete async_handler;
+
+  global_infiniband->set_dispatcher(nullptr);
 }
 
 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
-  : cct(c), lock("RDMADispatcher::lock"),
+  : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
   w_lock("RDMADispatcher::for worker pending list"), stack(s)
 {
   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
@@ -78,41 +85,58 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
 
 void RDMADispatcher::polling_start()
 {
+  tx_cc = global_infiniband->create_comp_channel(cct);
+  assert(tx_cc);
+  rx_cc = global_infiniband->create_comp_channel(cct);
+  assert(rx_cc);
+  tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+  assert(tx_cq);
+  rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+  assert(rx_cq);
+
   t = std::thread(&RDMADispatcher::polling, this);
 }
 
 void RDMADispatcher::polling_stop()
 {
-  if (!t.joinable())
-    return;
-
-  done = true;
-  t.join();
+  if (t.joinable())
+    t.join();
 }
 
-void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
+void RDMADispatcher::handle_async_event()
 {
-  perf_logger->inc(l_msgr_rdma_total_async_events);
-  // FIXME: Currently we must ensure no other factor make QP in ERROR state,
-  // otherwise this qp can't be deleted in current cleanup flow.
-  if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
-    perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
-    uint64_t qpn = async_event.element.qp->qp_num;
-    ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
-      << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
-    Mutex::Locker l(lock);
-    RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
-    if (!conn) {
-      ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+  ldout(cct, 30) << __func__ << dendl;
+  while (1) {
+    ibv_async_event async_event;
+    if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
+      if (errno != EAGAIN)
+       lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+                  << " " << cpp_strerror(errno) << ")" << dendl;
+      return;
+    }
+    perf_logger->inc(l_msgr_rdma_total_async_events);
+    // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+    // otherwise this qp can't be deleted in current cleanup flow.
+    if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+      perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+      uint64_t qpn = async_event.element.qp->qp_num;
+      ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+                     << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+      Mutex::Locker l(lock);
+      RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+      if (!conn) {
+        ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+      } else {
+        ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+        conn->fault();
+        erase_qpn_lockless(qpn);
+      }
     } else {
-      ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
-      conn->fault();
-      erase_qpn_lockless(qpn);
+      ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+                    << " evt: " << ibv_event_type_str(async_event.event_type)
+                    << dendl;
     }
-  } else {
-    ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
-      << " evt: " << ibv_event_type_str(async_event.event_type)
-      << dendl;
+    ibv_ack_async_event(&async_event);
   }
 }
 
@@ -123,24 +147,23 @@ void RDMADispatcher::polling()
 
   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
   std::vector<ibv_wc> tx_cqe;
+  ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
   RDMAConnectedSocketImpl *conn = nullptr;
   utime_t last_inactive = ceph_clock_now();
   bool rearmed = false;
   int r = 0;
 
   while (true) {
-    Device *ibdev;
-
-    int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc);
+    int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
     if (tx_ret > 0) {
       ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
                      << " responses."<< dendl;
-      handle_tx_event(ibdev, wc, tx_ret);
+      handle_tx_event(wc, tx_ret);
     }
 
-    int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc);
+    int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
     if (rx_ret > 0) {
-      ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
+      ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
                      << " responses."<< dendl;
       perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
 
@@ -155,8 +178,8 @@ void RDMADispatcher::polling()
         if (response->status == IBV_WC_SUCCESS) {
           conn = get_conn_lockless(response->qp_num);
           if (!conn) {
-            assert(ibdev->is_rx_buffer(chunk->buffer));
-            r = ibdev->post_chunk(chunk);
+            assert(global_infiniband->is_rx_buffer(chunk->buffer));
+            r = global_infiniband->post_chunk(chunk);
             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
             assert(r == 0);
           } else {
@@ -166,9 +189,9 @@ void RDMADispatcher::polling()
           perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
               << ") status(" << response->status << ":"
-              << Infiniband::wc_status_to_string(response->status) << ")" << dendl;
-          assert(ibdev->is_rx_buffer(chunk->buffer));
-          r = ibdev->post_chunk(chunk);
+              << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+          assert(global_infiniband->is_rx_buffer(chunk->buffer));
+          r = global_infiniband->post_chunk(chunk);
           if (r) {
             ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
             assert(r == 0);
@@ -208,20 +231,37 @@ void RDMADispatcher::polling()
         break;
 
       if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
+        handle_async_event();
         if (!rearmed) {
           // Clean up cq events after rearm notify ensure no new incoming event
           // arrived between polling and rearm
-         global_infiniband->rearm_notify();
+          tx_cq->rearm_notify();
+          rx_cq->rearm_notify();
           rearmed = true;
           continue;
         }
 
+        struct pollfd channel_poll[2];
+        channel_poll[0].fd = tx_cc->get_fd();
+        channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[0].revents = 0;
+        channel_poll[1].fd = rx_cc->get_fd();
+        channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+        channel_poll[1].revents = 0;
+        r = 0;
         perf_logger->set(l_msgr_rdma_polling, 0);
-
-       r = global_infiniband->poll_blocking(done);
-        if (r > 0)
-          ldout(cct, 20) << __func__ << " got a cq event." << dendl;
-
+        while (!done && r == 0) {
+          r = poll(channel_poll, 2, 100);
+          if (r < 0) {
+            r = -errno;
+            lderr(cct) << __func__ << " poll failed " << r << dendl;
+            ceph_abort();
+          }
+        }
+        if (r > 0 && tx_cc->get_cq_event())
+          ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
+        if (r > 0 && rx_cc->get_cq_event())
+          ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
         last_inactive = ceph_clock_now();
         perf_logger->set(l_msgr_rdma_polling, 1);
         rearmed = false;
@@ -284,7 +324,7 @@ void RDMADispatcher::erase_qpn(uint32_t qpn)
   erase_qpn_lockless(qpn);
 }
 
-void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
 {
   std::vector<Chunk*> tx_chunks;
 
@@ -322,15 +362,20 @@ void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
       }
     }
 
-    // FIXME: why not tx?
-    if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer))
+    //TX completion may come either from regular send message or from 'fin' message.
+    //In the case of 'fin' wr_id points to the QueuePair.
+    if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
       tx_chunks.push_back(chunk);
-    else
+    } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
+      ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
+    } else {
       ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
+      ceph_abort();
+    }
   }
 
   perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
-  post_tx_buffer(ibdev, tx_chunks);
+  post_tx_buffer(tx_chunks);
 }
 
 /**
@@ -341,13 +386,13 @@ void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
  * \return
  *      0 if success or -1 for failure
  */
-void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks)
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
 {
   if (chunks.empty())
     return ;
 
   inflight -= chunks.size();
-  ibdev->get_memory_manager()->return_tx(chunks);
+  global_infiniband->get_memory_manager()->return_tx(chunks);
   ldout(cct, 30) << __func__ << " release " << chunks.size()
                  << " chunks, inflight " << inflight << dendl;
   notify_pending_workers();
@@ -393,7 +438,7 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket
 {
   global_infiniband->init();
 
-  auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+  auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
   int r = p->listen(sa, opt);
   if (r < 0) {
     delete p;
@@ -423,12 +468,10 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
 
 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
 {
-  Device *ibdev = o->get_device();
-
   assert(center.in_thread());
-  int r = ibdev->get_tx_buffers(c, bytes);
+  int r = global_infiniband->get_tx_buffers(c, bytes);
   assert(r >= 0);
-  size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r;
+  size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
   stack->get_dispatcher()->inflight += r;
   if (got == bytes)
@@ -473,12 +516,23 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
   //
   //On RDMA MUST be called before fork
   //
+
   int rc = ibv_fork_init();
   if (rc) {
      lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
      ceph_abort();
   }
 
+  ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage <<  dendl;
+  if (cct->_conf->ms_async_rdma_enable_hugepage) {
+    rc =  setenv("RDMAV_HUGEPAGES_SAFE","1",1);
+    ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") <<  dendl;
+    if (rc) {
+      lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
+      ceph_abort();
+    }
+  }
+
   //Check ulimit
   struct rlimit limit;
   getrlimit(RLIMIT_MEMLOCK, &limit);
@@ -488,7 +542,8 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
   }
 
   if (!global_infiniband)
-    global_infiniband.construct(cct);
+    global_infiniband.construct(
+      cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
   dispatcher = new RDMADispatcher(cct, this);
   global_infiniband->set_dispatcher(dispatcher);
@@ -504,6 +559,10 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
 
 RDMAStack::~RDMAStack()
 {
+  if (cct->_conf->ms_async_rdma_enable_hugepage) {
+    unsetenv("RDMAV_HUGEPAGES_SAFE");  //remove env variable on destruction
+  }
+
   delete dispatcher;
 }