#include <rdma/rdma_cma.h>
#include <atomic>
+#include <functional>
#include <string>
#include <vector>
+#include "include/common_fwd.h"
#include "include/int_types.h"
#include "include/page.h"
+#include "include/scope_guard.h"
#include "common/debug.h"
#include "common/errno.h"
-#include "common/Mutex.h"
+#include "common/ceph_mutex.h"
#include "common/perf_counters.h"
#include "msg/msg_types.h"
#include "msg/async/net_handler.h"
-#define HUGE_PAGE_SIZE (2 * 1024 * 1024)
-#define ALIGN_TO_PAGE_SIZE(x) \
- (((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)
+#define HUGE_PAGE_SIZE_2MB (2 * 1024 * 1024)
+#define ALIGN_TO_PAGE_2MB(x) \
+ (((x) + (HUGE_PAGE_SIZE_2MB - 1)) & ~(HUGE_PAGE_SIZE_2MB - 1))
-struct IBSYNMsg {
+#define PSN_LEN 24
+#define PSN_MSK ((1 << PSN_LEN) - 1)
+
+#define BEACON_WRID 0xDEADBEEF
+
+struct ib_cm_meta_t {
uint16_t lid;
- uint32_t qpn;
+ uint32_t local_qpn;
uint32_t psn;
uint32_t peer_qpn;
union ibv_gid gid;
} __attribute__((packed));
class RDMAStack;
-class CephContext;
class Port {
struct ibv_context* ctxt;
int port_num;
- struct ibv_port_attr* port_attr;
+ struct ibv_port_attr port_attr;
uint16_t lid;
- int gid_idx = 0;
+ int gid_idx;
union ibv_gid gid;
public:
uint16_t get_lid() { return lid; }
ibv_gid get_gid() { return gid; }
int get_port_num() { return port_num; }
- ibv_port_attr* get_port_attr() { return port_attr; }
+ ibv_port_attr* get_port_attr() { return &port_attr; }
int get_gid_idx() { return gid_idx; }
};
const char* name;
uint8_t port_cnt = 0;
public:
- explicit Device(CephContext *c, ibv_device* d, struct ibv_context *dc);
+ explicit Device(CephContext *c, ibv_device* ib_dev);
+ explicit Device(CephContext *c, ibv_context *ib_ctx);
~Device() {
if (active_port) {
delete active_port;
int get_gid_idx() { return active_port->get_gid_idx(); }
void binding_port(CephContext *c, int port_num);
struct ibv_context *ctxt;
- ibv_device_attr *device_attr;
+ ibv_device_attr device_attr;
Port* active_port;
};
int num;
Device** devices;
public:
- explicit DeviceList(CephContext *cct): device_list(ibv_get_device_list(&num)),
- device_context_list(rdma_get_devices(&num)) {
- if (device_list == NULL || num == 0) {
- lderr(cct) << __func__ << " failed to get rdma device list. " << cpp_strerror(errno) << dendl;
- ceph_abort();
+ explicit DeviceList(CephContext *cct): device_list(nullptr), device_context_list(nullptr),
+ num(0), devices(nullptr) {
+ device_list = ibv_get_device_list(&num);
+ ceph_assert(device_list);
+ ceph_assert(num);
+ if (cct->_conf->ms_async_rdma_cm) {
+ device_context_list = rdma_get_devices(NULL);
+ ceph_assert(device_context_list);
}
devices = new Device*[num];
for (int i = 0;i < num; ++i) {
- devices[i] = new Device(cct, device_list[i], device_context_list[i]);
+ if (cct->_conf->ms_async_rdma_cm) {
+ devices[i] = new Device(cct, device_context_list[i]);
+ } else {
+ devices[i] = new Device(cct, device_list[i]);
+ }
}
}
~DeviceList() {
}
delete []devices;
ibv_free_device_list(device_list);
+ rdma_free_devices(device_context_list);
}
Device* get_device(const char* device_name) {
- ceph_assert(devices);
for (int i = 0; i < num; ++i) {
if (!strlen(device_name) || !strcmp(device_name, devices[i]->get_name())) {
return devices[i];
ibv_pd* const pd;
};
-
+ class QueuePair;
class MemoryManager {
public:
class Chunk {
public:
- Chunk(ibv_mr* m, uint32_t len, char* b);
+ Chunk(ibv_mr* m, uint32_t bytes, char* buffer, uint32_t offset = 0, uint32_t bound = 0, uint32_t lkey = 0, QueuePair* qp = nullptr);
~Chunk();
- void set_offset(uint32_t o);
uint32_t get_offset();
- void set_bound(uint32_t b);
+ uint32_t get_size() const;
void prepare_read(uint32_t b);
uint32_t get_bound();
uint32_t read(char* buf, uint32_t len);
uint32_t write(char* buf, uint32_t len);
bool full();
- bool over();
- void clear();
+ void reset_read_chunk();
+ void reset_write_chunk();
+ void set_qp(QueuePair *qp) { this->qp = qp; }
+ void clear_qp() { set_qp(nullptr); }
+ QueuePair* get_qp() { return qp; }
public:
ibv_mr* mr;
- uint32_t lkey = 0;
+ QueuePair *qp;
+ uint32_t lkey;
uint32_t bytes;
- uint32_t bound = 0;
uint32_t offset;
+ uint32_t bound;
char* buffer; // TODO: remove buffer/refactor TX
char data[0];
};
MemoryManager& manager;
uint32_t buffer_size;
uint32_t num_chunk = 0;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("cluster_lock");
std::vector<Chunk*> free_chunks;
char *base = nullptr;
char *end = nullptr;
static char * malloc(const size_type bytes);
static void free(char * const block);
- static MemPoolContext *g_ctx;
- static Mutex lock;
+ template<typename Func>
+ static std::invoke_result_t<Func> with_context(MemPoolContext* ctx,
+ Func&& func) {
+ std::lock_guard l{get_lock()};
+ g_ctx = ctx;
+ scope_guard reset_ctx{[] { g_ctx = nullptr; }};
+ return std::move(func)();
+ }
+ private:
+ static ceph::mutex& get_lock();
+ static MemPoolContext* g_ctx;
};
/**
void *slow_malloc();
public:
+ ceph::mutex lock = ceph::make_mutex("mem_pool_lock");
explicit mem_pool(MemPoolContext *ctx, const size_type nrequested_size,
const size_type nnext_size = 32,
const size_type nmax_size = 0) :
}
Chunk *get_rx_buffer() {
+ std::lock_guard l{rxbuf_pool.lock};
return reinterpret_cast<Chunk *>(rxbuf_pool.malloc());
}
void release_rx_buffer(Chunk *chunk) {
+ std::lock_guard l{rxbuf_pool.lock};
+ chunk->clear_qp();
rxbuf_pool.free(chunk);
}
Device *device = NULL;
ProtectionDomain *pd = NULL;
DeviceList *device_list = nullptr;
- void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
- void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);
CephContext *cct;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("IB lock");
bool initialized = false;
const std::string &device_name;
uint8_t port_num;
// must call plumb() to bring the queue pair to the RTS state.
class QueuePair {
public:
+ typedef MemoryManager::Chunk Chunk;
QueuePair(CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int ib_physical_port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq,
uint32_t tx_queue_len, uint32_t max_recv_wr, struct rdma_cm_id *cid, uint32_t q_key = 0);
~QueuePair();
+ int modify_qp_to_error();
+ int modify_qp_to_rts();
+ int modify_qp_to_rtr();
+ int modify_qp_to_init();
int init();
/**
* Get the state of a QueuePair.
*/
int get_state() const;
- /**
- * Return true if the queue pair is in an error state, false otherwise.
+ /*
+ * send/receive connection management meta data
*/
- bool is_error() const;
- void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
- void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; }
- uint32_t get_tx_wr() const { return tx_wr_inflight; }
+ int send_cm_meta(CephContext *cct, int socket_fd);
+ int recv_cm_meta(CephContext *cct, int socket_fd);
+ void wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data);
+ void gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]);
ibv_qp* get_qp() const { return qp; }
Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
int to_dead();
bool is_dead() const { return dead; }
+ ib_cm_meta_t& get_peer_cm_meta() { return peer_cm_meta; }
+ ib_cm_meta_t& get_local_cm_meta() { return local_cm_meta; }
+ void add_rq_wr(Chunk* chunk)
+ {
+ if (srq) return;
+
+ std::lock_guard l{lock};
+ recv_queue.push_back(chunk);
+ }
+
+ void remove_rq_wr(Chunk* chunk) {
+ if (srq) return;
+
+ std::lock_guard l{lock};
+ auto it = std::find(recv_queue.begin(), recv_queue.end(), chunk);
+ ceph_assert(it != recv_queue.end());
+ recv_queue.erase(it);
+ }
+ ibv_srq* get_srq() const { return srq; }
private:
CephContext *cct;
ibv_srq* srq; // shared receive queue
ibv_qp* qp; // infiniband verbs QP handle
struct rdma_cm_id *cm_id;
+ ib_cm_meta_t peer_cm_meta;
+ ib_cm_meta_t local_cm_meta;
Infiniband::CompletionQueue* txcq;
Infiniband::CompletionQueue* rxcq;
uint32_t initial_psn; // initial packet sequence number
uint32_t max_recv_wr;
uint32_t q_key;
bool dead;
- std::atomic<uint32_t> tx_wr_inflight = {0}; // counter for inflight Tx WQEs
+ vector<Chunk*> recv_queue;
+ ceph::mutex lock = ceph::make_mutex("queue_pair_lock");
};
public:
ibv_qp_type type, struct rdma_cm_id *cm_id);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
// post rx buffers to srq, return number of buffers actually posted
- int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
+ int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
void post_chunk_to_pool(Chunk* chunk) {
+ QueuePair *qp = chunk->get_qp();
+ if (qp != nullptr) {
+ qp->remove_rq_wr(chunk);
+ }
get_memory_manager()->release_rx_buffer(chunk);
}
int get_tx_buffers(std::vector<Chunk*> &c, size_t bytes);
CompletionChannel *create_comp_channel(CephContext *c);
CompletionQueue *create_comp_queue(CephContext *c, CompletionChannel *cc=NULL);
uint8_t get_ib_physical_port() { return ib_physical_port; }
- int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
- int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
uint16_t get_lid() { return device->get_lid(); }
ibv_gid get_gid() { return device->get_gid(); }
MemoryManager* get_memory_manager() { return memory_manager; }