]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/rdma/Infiniband.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / rdma / Infiniband.h
index 2889cdfc66e970239bad423012d4f37f560e195b..3af89f304fe1e4cda94e985aa28b5682ebdb7013 100644 (file)
 #include <rdma/rdma_cma.h>
 
 #include <atomic>
+#include <functional>
 #include <string>
 #include <vector>
 
+#include "include/common_fwd.h"
 #include "include/int_types.h"
 #include "include/page.h"
+#include "include/scope_guard.h"
 #include "common/debug.h"
 #include "common/errno.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
 #include "common/perf_counters.h"
 #include "msg/msg_types.h"
 #include "msg/async/net_handler.h"
 
-#define HUGE_PAGE_SIZE (2 * 1024 * 1024)
-#define ALIGN_TO_PAGE_SIZE(x) \
-  (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)
+#define HUGE_PAGE_SIZE_2MB (2 * 1024 * 1024)
+#define ALIGN_TO_PAGE_2MB(x) \
+    (((x) + (HUGE_PAGE_SIZE_2MB - 1)) & ~(HUGE_PAGE_SIZE_2MB - 1))
 
-struct IBSYNMsg {
+#define PSN_LEN 24
+#define PSN_MSK ((1 << PSN_LEN) - 1)
+
+#define BEACON_WRID 0xDEADBEEF
+
+struct ib_cm_meta_t {
   uint16_t lid;
-  uint32_t qpn;
+  uint32_t local_qpn;
   uint32_t psn;
   uint32_t peer_qpn;
   union ibv_gid gid;
 } __attribute__((packed));
 
 class RDMAStack;
-class CephContext;
 
 class Port {
   struct ibv_context* ctxt;
   int port_num;
-  struct ibv_port_attr* port_attr;
+  struct ibv_port_attr port_attr;
   uint16_t lid;
-  int gid_idx = 0;
+  int gid_idx;
   union ibv_gid gid;
 
  public:
@@ -65,7 +72,7 @@ class Port {
   uint16_t get_lid() { return lid; }
   ibv_gid  get_gid() { return gid; }
   int get_port_num() { return port_num; }
-  ibv_port_attr* get_port_attr() { return port_attr; }
+  ibv_port_attr* get_port_attr() { return &port_attr; }
   int get_gid_idx() { return gid_idx; }
 };
 
@@ -75,7 +82,8 @@ class Device {
   const char* name;
   uint8_t  port_cnt = 0;
  public:
-  explicit Device(CephContext *c, ibv_device* d, struct ibv_context *dc);
+  explicit Device(CephContext *c, ibv_device* ib_dev);
+  explicit Device(CephContext *c, ibv_context *ib_ctx);
   ~Device() {
     if (active_port) {
       delete active_port;
@@ -88,7 +96,7 @@ class Device {
   int get_gid_idx() { return active_port->get_gid_idx(); }
   void binding_port(CephContext *c, int port_num);
   struct ibv_context *ctxt;
-  ibv_device_attr *device_attr;
+  ibv_device_attr device_attr;
   Port* active_port;
 };
 
@@ -99,16 +107,23 @@ class DeviceList {
   int num;
   Device** devices;
  public:
-  explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)),
-                                device_context_list(rdma_get_devices(&num)) {
-    if (device_list == NULL || num == 0) {
-      lderr(cct) << __func__ << " failed to get rdma device list.  " << cpp_strerror(errno) << dendl;
-      ceph_abort();
+  explicit DeviceList(CephContext *cct): device_list(nullptr), device_context_list(nullptr),
+                                         num(0), devices(nullptr) {
+    device_list = ibv_get_device_list(&num);
+    ceph_assert(device_list);
+    ceph_assert(num);
+    if (cct->_conf->ms_async_rdma_cm) {
+        device_context_list = rdma_get_devices(NULL);
+        ceph_assert(device_context_list);
     }
     devices = new Device*[num];
 
     for (int i = 0;i < num; ++i) {
-      devices[i] = new Device(cct, device_list[i], device_context_list[i]);
+      if (cct->_conf->ms_async_rdma_cm) {
+         devices[i] = new Device(cct, device_context_list[i]);
+      } else {
+         devices[i] = new Device(cct, device_list[i]);
+      }
     }
   }
   ~DeviceList() {
@@ -117,10 +132,10 @@ class DeviceList {
     }
     delete []devices;
     ibv_free_device_list(device_list);
+    rdma_free_devices(device_context_list);
   }
 
   Device* get_device(const char* device_name) {
-    ceph_assert(devices);
     for (int i = 0; i < num; ++i) {
       if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
         return devices[i];
@@ -187,31 +202,34 @@ class Infiniband {
     ibv_pd* const pd;
   };
 
-
+  class QueuePair;
   class MemoryManager {
    public:
     class Chunk {
      public:
-      Chunk(ibv_mr* m, uint32_t len, char* b);
+      Chunk(ibv_mr* m, uint32_t bytes, char* buffer, uint32_t offset = 0, uint32_t bound = 0, uint32_t lkey = 0, QueuePair* qp = nullptr);
       ~Chunk();
 
-      void set_offset(uint32_t o);
       uint32_t get_offset();
-      void set_bound(uint32_t b);
+      uint32_t get_size() const;
       void prepare_read(uint32_t b);
       uint32_t get_bound();
       uint32_t read(char* buf, uint32_t len);
       uint32_t write(char* buf, uint32_t len);
       bool full();
-      bool over();
-      void clear();
+      void reset_read_chunk();
+      void reset_write_chunk();
+      void set_qp(QueuePair *qp) { this->qp = qp; }
+      void clear_qp() { set_qp(nullptr); }
+      QueuePair* get_qp() { return qp; }
 
      public:
       ibv_mr* mr;
-      uint32_t lkey = 0;
+      QueuePair *qp;
+      uint32_t lkey;
       uint32_t bytes;
-      uint32_t bound = 0;
       uint32_t offset;
+      uint32_t bound;
       char* buffer; // TODO: remove buffer/refactor TX
       char  data[0];
     };
@@ -236,7 +254,7 @@ class Infiniband {
       MemoryManager& manager;
       uint32_t buffer_size;
       uint32_t num_chunk = 0;
-      Mutex lock;
+      ceph::mutex lock = ceph::make_mutex("cluster_lock");
       std::vector<Chunk*> free_chunks;
       char *base = nullptr;
       char *end = nullptr;
@@ -274,8 +292,17 @@ class Infiniband {
       static char * malloc(const size_type bytes);
       static void free(char * const block);
 
-      static MemPoolContext  *g_ctx;
-      static Mutex lock;
+      template<typename Func>
+      static std::invoke_result_t<Func> with_context(MemPoolContext* ctx,
+                                                    Func&& func) {
+       std::lock_guard l{get_lock()};
+       g_ctx = ctx;
+       scope_guard reset_ctx{[] { g_ctx = nullptr; }};
+       return std::move(func)();
+      }
+    private:
+      static ceph::mutex& get_lock();
+      static MemPoolContext* g_ctx;
     };
 
     /**
@@ -293,6 +320,7 @@ class Infiniband {
       void *slow_malloc();
 
      public:
+      ceph::mutex lock = ceph::make_mutex("mem_pool_lock");
       explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size,
           const size_type nnext_size = 32,
           const size_type nmax_size = 0) :
@@ -326,10 +354,13 @@ class Infiniband {
     }
 
     Chunk *get_rx_buffer() {
+       std::lock_guard l{rxbuf_pool.lock};
        return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
     }
 
     void release_rx_buffer(Chunk *chunk) {
+      std::lock_guard l{rxbuf_pool.lock};
+      chunk->clear_qp();
       rxbuf_pool.free(chunk);
     }
 
@@ -363,10 +394,8 @@ class Infiniband {
   Device *device = NULL;
   ProtectionDomain *pd = NULL;
   DeviceList *device_list = nullptr;
-  void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
-  void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
   CephContext *cct;
-  Mutex lock;
+  ceph::mutex lock = ceph::make_mutex("IB lock");
   bool initialized = false;
   const std::string &device_name;
   uint8_t port_num;
@@ -429,6 +458,7 @@ class Infiniband {
   // must call plumb() to bring the queue pair to the RTS state.
   class QueuePair {
    public:
+    typedef MemoryManager::Chunk Chunk;
     QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
               int ib_physical_port,  ibv_srq *srq,
               Infiniband::CompletionQueue* txcq,
@@ -436,6 +466,10 @@ class Infiniband {
               uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
     ~QueuePair();
 
+    int modify_qp_to_error();
+    int modify_qp_to_rts();
+    int modify_qp_to_rtr();
+    int modify_qp_to_init();
     int init();
 
     /**
@@ -464,18 +498,37 @@ class Infiniband {
      * Get the state of a QueuePair.
      */
     int get_state() const;
-    /**
-     * Return true if the queue pair is in an error state, false otherwise.
+    /*
+     * send/receive connection management meta data
      */
-    bool is_error() const;
-    void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
-    void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; }
-    uint32_t get_tx_wr() const { return tx_wr_inflight; }
+    int send_cm_meta(CephContext *cct, int socket_fd);
+    int recv_cm_meta(CephContext *cct, int socket_fd);
+    void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data);
+    void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]);
     ibv_qp* get_qp() const { return qp; }
     Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
     Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
     int to_dead();
     bool is_dead() const { return dead; }
+    ib_cm_meta_t& get_peer_cm_meta() { return peer_cm_meta; }
+    ib_cm_meta_t& get_local_cm_meta() { return local_cm_meta; }
+    void add_rq_wr(Chunk* chunk)
+    {
+      if (srq) return;
+
+      std::lock_guard l{lock};
+      recv_queue.push_back(chunk);
+    }
+
+    void remove_rq_wr(Chunk* chunk) {
+      if (srq) return;
+
+      std::lock_guard l{lock};
+      auto it = std::find(recv_queue.begin(), recv_queue.end(), chunk);
+      ceph_assert(it != recv_queue.end());
+      recv_queue.erase(it);
+    }
+    ibv_srq* get_srq() const { return srq; }
 
    private:
     CephContext  *cct;
@@ -487,6 +540,8 @@ class Infiniband {
     ibv_srq*     srq;            // shared receive queue
     ibv_qp*      qp;             // infiniband verbs QP handle
     struct rdma_cm_id *cm_id;
+    ib_cm_meta_t peer_cm_meta;
+    ib_cm_meta_t local_cm_meta;
     Infiniband::CompletionQueue* txcq;
     Infiniband::CompletionQueue* rxcq;
     uint32_t     initial_psn;    // initial packet sequence number
@@ -494,7 +549,8 @@ class Infiniband {
     uint32_t     max_recv_wr;
     uint32_t     q_key;
     bool dead;
-    std::atomic<uint32_t> tx_wr_inflight = {0}; // counter for inflight Tx WQEs
+    vector<Chunk*> recv_queue;
+    ceph::mutex lock = ceph::make_mutex("queue_pair_lock");
   };
 
  public:
@@ -504,16 +560,18 @@ class Infiniband {
       ibv_qp_type type, struct rdma_cm_id *cm_id);
   ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
   // post rx buffers to srq, return number of buffers actually posted
-  int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
+  int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
   void post_chunk_to_pool(Chunk* chunk) {
+    QueuePair *qp = chunk->get_qp();
+    if (qp != nullptr) {
+      qp->remove_rq_wr(chunk);
+    }
     get_memory_manager()->release_rx_buffer(chunk);
   }
   int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
   CompletionChannel *create_comp_channel(CephContext *c);
   CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
   uint8_t get_ib_physical_port() { return ib_physical_port; }
-  int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
-  int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
   uint16_t get_lid() { return device->get_lid(); }
   ibv_gid get_gid() { return device->get_gid(); }
   MemoryManager* get_memory_manager() { return memory_manager; }