#include "common/errno.h"
#include "msg/async/Stack.h"
#include "Infiniband.h"
-#include "RDMAConnectedSocketImpl.h"
class RDMAConnectedSocketImpl;
class RDMAServerSocketImpl;
std::thread t;
CephContext *cct;
+ Infiniband::CompletionQueue* tx_cq;
+ Infiniband::CompletionQueue* rx_cq;
+ Infiniband::CompletionChannel *tx_cc, *rx_cc;
+ EventCallbackRef async_handler;
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
std::list<RDMAWorker*> pending_workers;
RDMAStack* stack;
+ class C_handle_cq_async : public EventCallback {
+ RDMADispatcher *dispatcher;
+ public:
+ C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
+ void do_request(int fd) {
+ // worker->handle_tx_event();
+ dispatcher->handle_async_event();
+ }
+ };
+
public:
PerfCounters *perf_logger;
explicit RDMADispatcher(CephContext* c, RDMAStack* s);
virtual ~RDMADispatcher();
-
- void process_async_event(Device *ibdev, ibv_async_event &async_event);
+ void handle_async_event();
void polling_start();
void polling_stop();
void polling();
-
int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
Mutex::Locker l(w_lock);
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
+ Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
+ Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
void notify_pending_workers();
- void handle_tx_event(Device *ibdev, ibv_wc *cqe, int n);
- void post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks);
+ void handle_tx_event(ibv_wc *cqe, int n);
+ void post_tx_buffer(std::vector<Chunk*> &chunks);
std::atomic<uint64_t> inflight = {0};
};
}
};
+class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ public:
+ typedef Infiniband::MemoryManager::Chunk Chunk;
+ typedef Infiniband::CompletionChannel CompletionChannel;
+ typedef Infiniband::CompletionQueue CompletionQueue;
+
+ private:
+ CephContext *cct;
+ Infiniband::QueuePair *qp;
+ IBSYNMsg peer_msg;
+ IBSYNMsg my_msg;
+ int connected;
+ int error;
+ Infiniband* infiniband;
+ RDMADispatcher* dispatcher;
+ RDMAWorker* worker;
+ std::vector<Chunk*> buffers;
+ int notify_fd = -1;
+ bufferlist pending_bl;
+
+ Mutex lock;
+ std::vector<ibv_wc> wc;
+ bool is_server;
+ EventCallbackRef con_handler;
+ int tcp_fd = -1;
+ bool active;// qp is active ?
+
+ void notify();
+ ssize_t read_buffers(char* buf, size_t len);
+ int post_work_request(std::vector<Chunk*>&);
+
+ public:
+ RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+ RDMAWorker *w);
+ virtual ~RDMAConnectedSocketImpl();
+
+ void pass_wc(std::vector<ibv_wc> &&v);
+ void get_wc(std::vector<ibv_wc> &w);
+ virtual int is_connected() override { return connected; }
+
+ virtual ssize_t read(char* buf, size_t len) override;
+ virtual ssize_t zero_copy_read(bufferptr &data) override;
+ virtual ssize_t send(bufferlist &bl, bool more) override;
+ virtual void shutdown() override;
+ virtual void close() override;
+ virtual int fd() const override { return notify_fd; }
+ void fault();
+ const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
+ ssize_t submit(bool more);
+ int activate();
+ void fin();
+ void handle_connection();
+ void cleanup();
+ void set_accept_fd(int sd);
+ int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
+ class C_handle_connection : public EventCallback {
+ RDMAConnectedSocketImpl *csi;
+ bool active;
+ public:
+ C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+ void do_request(int fd) {
+ if (active)
+ csi->handle_connection();
+ }
+ void close() {
+ active = false;
+ }
+ };
+};
+
+class RDMAServerSocketImpl : public ServerSocketImpl {
+ CephContext *cct;
+ NetHandler net;
+ int server_setup_socket;
+ Infiniband* infiniband;
+ RDMADispatcher *dispatcher;
+ RDMAWorker *worker;
+ entity_addr_t sa;
+
+ public:
+ RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
+ int listen(entity_addr_t &sa, const SocketOptions &opt);
+ virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+ virtual void abort_accept() override;
+ virtual int fd() const override { return server_setup_socket; }
+ int get_fd() { return server_setup_socket; }
+};
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
virtual bool is_ready() override { return fork_finished.load(); };
virtual void ready() override { fork_finished = true; };
};
+
#endif