]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/rdma/Infiniband.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / rdma / Infiniband.h
index 068896c982ac7e68602395316b7067f45a30a696..2889cdfc66e970239bad423012d4f37f560e195b 100644 (file)
 #ifndef CEPH_INFINIBAND_H
 #define CEPH_INFINIBAND_H
 
-#include <string>
-#include <vector>
+#include <boost/pool/pool.hpp>
+// need this because boost messes with ceph log/assert definitions
+#include "include/ceph_assert.h"
 
 #include <infiniband/verbs.h>
+#include <rdma/rdma_cma.h>
+
+#include <atomic>
+#include <string>
+#include <vector>
 
 #include "include/int_types.h"
 #include "include/page.h"
 #include "common/debug.h"
 #include "common/errno.h"
 #include "common/Mutex.h"
+#include "common/perf_counters.h"
 #include "msg/msg_types.h"
 #include "msg/async/net_handler.h"
 
@@ -50,7 +57,7 @@ class Port {
   int port_num;
   struct ibv_port_attr* port_attr;
   uint16_t lid;
-  int gid_idx;
+  int gid_idx = 0;
   union ibv_gid gid;
 
  public:
@@ -66,13 +73,13 @@ class Port {
 class Device {
   ibv_device *device;
   const char* name;
-  uint8_t  port_cnt;
+  uint8_t  port_cnt = 0;
  public:
-  explicit Device(CephContext *c, ibv_device* d);
+  explicit Device(CephContext *c, ibv_device* d, struct ibv_context *dc);
   ~Device() {
     if (active_port) {
       delete active_port;
-      assert(ibv_close_device(ctxt) == 0);
+      ceph_assert(ibv_close_device(ctxt) == 0);
     }
   }
   const char* get_name() { return name;}
@@ -88,10 +95,12 @@ class Device {
 
 class DeviceList {
   struct ibv_device ** device_list;
+  struct ibv_context ** device_context_list;
   int num;
   Device** devices;
  public:
-  DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)) {
+  explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)),
+                                device_context_list(rdma_get_devices(&num)) {
     if (device_list == NULL || num == 0) {
       lderr(cct) << __func__ << " failed to get rdma device list.  " << cpp_strerror(errno) << dendl;
       ceph_abort();
@@ -99,7 +108,7 @@ class DeviceList {
     devices = new Device*[num];
 
     for (int i = 0;i < num; ++i) {
-      devices[i] = new Device(cct, device_list[i]);
+      devices[i] = new Device(cct, device_list[i], device_context_list[i]);
     }
   }
   ~DeviceList() {
@@ -111,7 +120,7 @@ class DeviceList {
   }
 
   Device* get_device(const char* device_name) {
-    assert(devices);
+    ceph_assert(devices);
     for (int i = 0; i < num; ++i) {
       if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
         return devices[i];
@@ -121,6 +130,50 @@ class DeviceList {
   }
 };
 
+// stat counters
+enum {
+  l_msgr_rdma_dispatcher_first = 94000,
+
+  l_msgr_rdma_polling,
+  l_msgr_rdma_inflight_tx_chunks,
+  l_msgr_rdma_rx_bufs_in_use,
+  l_msgr_rdma_rx_bufs_total,
+
+  l_msgr_rdma_tx_total_wc,
+  l_msgr_rdma_tx_total_wc_errors,
+  l_msgr_rdma_tx_wc_retry_errors,
+  l_msgr_rdma_tx_wc_wr_flush_errors,
+
+  l_msgr_rdma_rx_total_wc,
+  l_msgr_rdma_rx_total_wc_errors,
+  l_msgr_rdma_rx_fin,
+
+  l_msgr_rdma_handshake_errors,
+
+  l_msgr_rdma_total_async_events,
+  l_msgr_rdma_async_last_wqe_events,
+
+  l_msgr_rdma_created_queue_pair,
+  l_msgr_rdma_active_queue_pair,
+
+  l_msgr_rdma_dispatcher_last,
+};
+
+enum {
+  l_msgr_rdma_first = 95000,
+
+  l_msgr_rdma_tx_no_mem,
+  l_msgr_rdma_tx_parital_mem,
+  l_msgr_rdma_tx_failed,
+
+  l_msgr_rdma_tx_chunks,
+  l_msgr_rdma_tx_bytes,
+  l_msgr_rdma_rx_chunks,
+  l_msgr_rdma_rx_bytes,
+  l_msgr_rdma_pending_sent_conns,
+
+  l_msgr_rdma_last,
+};
 
 class RDMADispatcher;
 
@@ -152,14 +205,15 @@ class Infiniband {
       bool full();
       bool over();
       void clear();
-      void post_srq(Infiniband *ib);
 
      public:
       ibv_mr* mr;
+      uint32_t lkey = 0;
       uint32_t bytes;
-      uint32_t bound;
+      uint32_t bound = 0;
       uint32_t offset;
-      char* buffer;
+      char* buffer; // TODO: remove buffer/refactor TX
+      char  data[0];
     };
 
     class Cluster {
@@ -181,7 +235,7 @@ class Infiniband {
 
       MemoryManager& manager;
       uint32_t buffer_size;
-      uint32_t num_chunk;
+      uint32_t num_chunk = 0;
       Mutex lock;
       std::vector<Chunk*> free_chunks;
       char *base = nullptr;
@@ -189,17 +243,81 @@ class Infiniband {
       Chunk* chunk_base = nullptr;
     };
 
-    MemoryManager(Device *d, ProtectionDomain *p, bool hugepage);
+    class MemPoolContext {
+      PerfCounters *perf_logger;
+
+     public:
+      MemoryManager *manager;
+      unsigned n_bufs_allocated;
+      // true if it is possible to alloc
+      // more memory for the pool
+      explicit MemPoolContext(MemoryManager *m) :
+        perf_logger(nullptr),
+        manager(m),
+        n_bufs_allocated(0) {}
+      bool can_alloc(unsigned nbufs);
+      void update_stats(int val);
+      void set_stat_logger(PerfCounters *logger);
+    };
+
+    class PoolAllocator {
+      struct mem_info {
+        ibv_mr   *mr;
+        MemPoolContext *ctx;
+        unsigned nbufs;
+        Chunk    chunks[0];
+      };
+     public:
+      typedef std::size_t size_type;
+      typedef std::ptrdiff_t difference_type;
+
+      static char * malloc(const size_type bytes);
+      static void free(char * const block);
+
+      static MemPoolContext  *g_ctx;
+      static Mutex lock;
+    };
+
+    /**
+     * modify boost pool so that it is possible to
+     * have a thread safe 'context' when allocating/freeing
+     * the memory. It is needed to allow a different pool
+     * configurations and bookkeeping per CephContext and
+     * also to be able to use same allocator to deal with
+     * RX and TX pool.
+     * TODO: use boost pool to allocate TX chunks too
+     */
+    class mem_pool : public boost::pool<PoolAllocator> {
+     private:
+      MemPoolContext *ctx;
+      void *slow_malloc();
+
+     public:
+      explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size,
+          const size_type nnext_size = 32,
+          const size_type nmax_size = 0) :
+        pool(nrequested_size, nnext_size, nmax_size),
+        ctx(ctx) { }
+
+      void *malloc() {
+        if (!store().empty())
+          return (store().malloc)();
+        // need to alloc more memory...
+        // slow path code
+        return slow_malloc();
+      }
+    };
+
+    MemoryManager(CephContext *c, Device *d, ProtectionDomain *p);
     ~MemoryManager();
 
-    void* malloc_huge_pages(size_t size);
-    void free_huge_pages(void *ptr);
-    void register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num);
+    void* malloc(size_t size);
+    void  free(void *ptr);
+
+    void create_tx_pool(uint32_t size, uint32_t tx_num);
     void return_tx(std::vector<Chunk*> &chunks);
     int get_send_buffers(std::vector<Chunk*> &c, size_t bytes);
-    int get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes);
     bool is_tx_buffer(const char* c) { return send->is_my_buffer(c); }
-    bool is_rx_buffer(const char* c) { return channel->is_my_buffer(c); }
     Chunk *get_tx_chunk_by_buffer(const char *c) {
       return send->get_chunk_by_buffer(c);
     }
@@ -207,18 +325,37 @@ class Infiniband {
       return send->buffer_size;
     }
 
-    bool enabled_huge_page;
+    Chunk *get_rx_buffer() {
+       return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
+    }
+
+    void release_rx_buffer(Chunk *chunk) {
+      rxbuf_pool.free(chunk);
+    }
+
+    void set_rx_stat_logger(PerfCounters *logger) {
+      rxbuf_pool_ctx.set_stat_logger(logger);
+    }
 
+    CephContext  *cct;
    private:
-    Cluster* channel;//RECV
-    Cluster* send;// SEND
+    // TODO: Cluster -> TxPool txbuf_pool
+    // chunk layout fix
+    //  
+    Cluster* send = nullptr;// SEND
     Device *device;
     ProtectionDomain *pd;
+    MemPoolContext rxbuf_pool_ctx;
+    mem_pool     rxbuf_pool;
+
+
+    void* huge_pages_malloc(size_t size);
+    void  huge_pages_free(void *ptr);
   };
 
  private:
-  uint32_t max_send_wr = 0;
-  uint32_t max_recv_wr = 0;
+  uint32_t tx_queue_len = 0;
+  uint32_t rx_queue_len = 0;
   uint32_t max_sge = 0;
   uint8_t  ib_physical_port = 0;
   MemoryManager* memory_manager = nullptr;
@@ -226,7 +363,6 @@ class Infiniband {
   Device *device = NULL;
   ProtectionDomain *pd = NULL;
   DeviceList *device_list = nullptr;
-  RDMADispatcher *dispatcher = nullptr;
   void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
   void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
   CephContext *cct;
@@ -234,13 +370,13 @@ class Infiniband {
   bool initialized = false;
   const std::string &device_name;
   uint8_t port_num;
+  bool support_srq = false;
 
  public:
-  explicit Infiniband(CephContext *c, const std::string &device_name, uint8_t p);
+  explicit Infiniband(CephContext *c);
   ~Infiniband();
   void init();
-
-  void set_dispatcher(RDMADispatcher *d);
+  static void verify_prereq(CephContext *cct);
 
   class CompletionChannel {
     static const uint32_t MAX_ACK_EVENT = 5000;
@@ -297,7 +433,7 @@ class Infiniband {
               int ib_physical_port,  ibv_srq *srq,
               Infiniband::CompletionQueue* txcq,
               Infiniband::CompletionQueue* rxcq,
-              uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key = 0);
+              uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
     ~QueuePair();
 
     int init();
@@ -332,6 +468,9 @@ class Infiniband {
      * Return true if the queue pair is in an error state, false otherwise.
      */
     bool is_error() const;
+    void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
+    void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; }
+    uint32_t get_tx_wr() const { return tx_wr_inflight; }
     ibv_qp* get_qp() const { return qp; }
     Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
     Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
@@ -347,6 +486,7 @@ class Infiniband {
     ibv_pd*      pd;             // protection domain
     ibv_srq*     srq;            // shared receive queue
     ibv_qp*      qp;             // infiniband verbs QP handle
+    struct rdma_cm_id *cm_id;
     Infiniband::CompletionQueue* txcq;
     Infiniband::CompletionQueue* rxcq;
     uint32_t     initial_psn;    // initial packet sequence number
@@ -354,15 +494,20 @@ class Infiniband {
     uint32_t     max_recv_wr;
     uint32_t     q_key;
     bool dead;
+    std::atomic<uint32_t> tx_wr_inflight = {0}; // counter for inflight Tx WQEs
   };
 
  public:
   typedef MemoryManager::Cluster Cluster;
   typedef MemoryManager::Chunk Chunk;
-  QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*, ibv_qp_type type);
+  QueuePair* create_queue_pair(CephContext *c, CompletionQueue*, CompletionQueue*,
+      ibv_qp_type type, struct rdma_cm_id *cm_id);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
-  int post_chunk(Chunk* chunk);
-  int post_channel_cluster();
+  // post rx buffers to srq, return number of buffers actually posted
+  int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
+  void post_chunk_to_pool(Chunk* chunk) {
+    get_memory_manager()->release_rx_buffer(chunk);
+  }
   int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
@@ -375,10 +520,10 @@ class Infiniband {
   Device* get_device() { return device; }
   int get_async_fd() { return device->ctxt->async_fd; }
   bool is_tx_buffer(const char* c) { return memory_manager->is_tx_buffer(c);}
-  bool is_rx_buffer(const char* c) { return memory_manager->is_rx_buffer(c);}
   Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
   static const char* wc_status_to_string(int status);
   static const char* qp_state_string(int status);
+  uint32_t get_rx_queue_len() const { return rx_queue_len; }
 };
 
 #endif