]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/rdma/RDMAStack.h
update sources to v12.1.0
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.h
index ef1188b802375e3538ff7b7b0e3fdfdaaa26ee7d..4f93c157f06670c8a39ee17bc5707845d1802fde 100644 (file)
@@ -28,7 +28,6 @@
 #include "common/errno.h"
 #include "msg/async/Stack.h"
 #include "Infiniband.h"
-#include "RDMAConnectedSocketImpl.h"
 
 class RDMAConnectedSocketImpl;
 class RDMAServerSocketImpl;
@@ -69,6 +68,10 @@ class RDMADispatcher {
 
   std::thread t;
   CephContext *cct;
+  Infiniband::CompletionQueue* tx_cq;
+  Infiniband::CompletionQueue* rx_cq;
+  Infiniband::CompletionChannel *tx_cc, *rx_cc;
+  EventCallbackRef async_handler;
   bool done = false;
   std::atomic<uint64_t> num_dead_queue_pair = {0};
   std::atomic<uint64_t> num_qp_conn = {0};
@@ -101,18 +104,26 @@ class RDMADispatcher {
   std::list<RDMAWorker*> pending_workers;
   RDMAStack* stack;
 
+  class C_handle_cq_async : public EventCallback {
+    RDMADispatcher *dispatcher;
+   public:
+    C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
+    void do_request(int fd) {
+      // worker->handle_tx_event();
+      dispatcher->handle_async_event();
+    }
+  };
+
  public:
   PerfCounters *perf_logger;
 
   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
   virtual ~RDMADispatcher();
-
-  void process_async_event(Device *ibdev, ibv_async_event &async_event);
+  void handle_async_event();
 
   void polling_start();
   void polling_stop();
   void polling();
-
   int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
   void make_pending_worker(RDMAWorker* w) {
     Mutex::Locker l(w_lock);
@@ -125,9 +136,11 @@ class RDMADispatcher {
   RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
   void erase_qpn_lockless(uint32_t qpn);
   void erase_qpn(uint32_t qpn);
+  Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
+  Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
   void notify_pending_workers();
-  void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
-  void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);
+  void handle_tx_event(ibv_wc *cqe, int n);
+  void post_tx_buffer(std::vector<Chunk*> &chunks);
 
   std::atomic<uint64_t> inflight = {0};
 };
@@ -190,6 +203,95 @@ class RDMAWorker : public Worker {
   }
 };
 
+class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ public:
+  typedef Infiniband::MemoryManager::Chunk Chunk;
+  typedef Infiniband::CompletionChannel CompletionChannel;
+  typedef Infiniband::CompletionQueue CompletionQueue;
+
+ private:
+  CephContext *cct;
+  Infiniband::QueuePair *qp;
+  IBSYNMsg peer_msg;
+  IBSYNMsg my_msg;
+  int connected;
+  int error;
+  Infiniband* infiniband;
+  RDMADispatcher* dispatcher;
+  RDMAWorker* worker;
+  std::vector<Chunk*> buffers;
+  int notify_fd = -1;
+  bufferlist pending_bl;
+
+  Mutex lock;
+  std::vector<ibv_wc> wc;
+  bool is_server;
+  EventCallbackRef con_handler;
+  int tcp_fd = -1;
+  bool active;// qp is active ?
+
+  void notify();
+  ssize_t read_buffers(char* buf, size_t len);
+  int post_work_request(std::vector<Chunk*>&);
+
+ public:
+  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                          RDMAWorker *w);
+  virtual ~RDMAConnectedSocketImpl();
+
+  void pass_wc(std::vector<ibv_wc> &&v);
+  void get_wc(std::vector<ibv_wc> &w);
+  virtual int is_connected() override { return connected; }
+
+  virtual ssize_t read(char* buf, size_t len) override;
+  virtual ssize_t zero_copy_read(bufferptr &data) override;
+  virtual ssize_t send(bufferlist &bl, bool more) override;
+  virtual void shutdown() override;
+  virtual void close() override;
+  virtual int fd() const override { return notify_fd; }
+  void fault();
+  const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
+  ssize_t submit(bool more);
+  int activate();
+  void fin();
+  void handle_connection();
+  void cleanup();
+  void set_accept_fd(int sd);
+  int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
+  class C_handle_connection : public EventCallback {
+    RDMAConnectedSocketImpl *csi;
+    bool active;
+   public:
+    C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+    void do_request(int fd) {
+      if (active)
+        csi->handle_connection();
+    }
+    void close() {
+      active = false;
+    }
+  };
+};
+
+class RDMAServerSocketImpl : public ServerSocketImpl {
+  CephContext *cct;
+  NetHandler net;
+  int server_setup_socket;
+  Infiniband* infiniband;
+  RDMADispatcher *dispatcher;
+  RDMAWorker *worker;
+  entity_addr_t sa;
+
+ public:
+  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
+  int listen(entity_addr_t &sa, const SocketOptions &opt);
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+  virtual void abort_accept() override;
+  virtual int fd() const override { return server_setup_socket; }
+  int get_fd() { return server_setup_socket; }
+};
 
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
@@ -211,4 +313,5 @@ class RDMAStack : public NetworkStack {
   virtual bool is_ready() override { return fork_finished.load(); };
   virtual void ready() override { fork_finished = true; };
 };
+
 #endif