*
*/
+#include <poll.h>
#include <sys/time.h>
#include <sys/resource.h>
#include "common/deleter.h"
#include "common/Tub.h"
#include "RDMAStack.h"
-#include "RDMAConnTCP.h"
-#include "Device.h"
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
RDMADispatcher::~RDMADispatcher()
{
+ done = true;
polling_stop();
-
ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
- global_infiniband->set_dispatcher(nullptr);
-
assert(qp_conns.empty());
assert(num_qp_conn == 0);
assert(dead_queue_pairs.empty());
assert(num_dead_queue_pair == 0);
+
+ tx_cc->ack_events();
+ rx_cc->ack_events();
+ delete tx_cq;
+ delete rx_cq;
+ delete tx_cc;
+ delete rx_cc;
+ delete async_handler;
+
+ global_infiniband->set_dispatcher(nullptr);
}
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
- : cct(c), lock("RDMADispatcher::lock"),
+ : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
void RDMADispatcher::polling_start()
{
+ tx_cc = global_infiniband->create_comp_channel(cct);
+ assert(tx_cc);
+ rx_cc = global_infiniband->create_comp_channel(cct);
+ assert(rx_cc);
+ tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
+ assert(tx_cq);
+ rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
+ assert(rx_cq);
+
t = std::thread(&RDMADispatcher::polling, this);
}
void RDMADispatcher::polling_stop()
{
- if (!t.joinable())
- return;
-
- done = true;
- t.join();
+ if (t.joinable())
+ t.join();
}
-void RDMADispatcher::process_async_event(Device *ibdev, ibv_async_event &async_event)
+void RDMADispatcher::handle_async_event()
{
- perf_logger->inc(l_msgr_rdma_total_async_events);
- // FIXME: Currently we must ensure no other factor make QP in ERROR state,
- // otherwise this qp can't be deleted in current cleanup flow.
- if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
- perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
- uint64_t qpn = async_event.element.qp->qp_num;
- ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
- << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
- Mutex::Locker l(lock);
- RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
- if (!conn) {
- ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+ ldout(cct, 30) << __func__ << dendl;
+ while (1) {
+ ibv_async_event async_event;
+ if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
+ if (errno != EAGAIN)
+ lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
+ << " " << cpp_strerror(errno) << ")" << dendl;
+ return;
+ }
+ perf_logger->inc(l_msgr_rdma_total_async_events);
+ // FIXME: Currently we must ensure no other factor make QP in ERROR state,
+ // otherwise this qp can't be deleted in current cleanup flow.
+ if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
+ perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+ uint64_t qpn = async_event.element.qp->qp_num;
+ ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
+ << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+ Mutex::Locker l(lock);
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
+ conn->fault();
+ erase_qpn_lockless(qpn);
+ }
} else {
- ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
- conn->fault();
- erase_qpn_lockless(qpn);
+ ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
+ << " evt: " << ibv_event_type_str(async_event.event_type)
+ << dendl;
}
- } else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << *ibdev
- << " evt: " << ibv_event_type_str(async_event.event_type)
- << dendl;
+ ibv_ack_async_event(&async_event);
}
}
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
std::vector<ibv_wc> tx_cqe;
+ ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
RDMAConnectedSocketImpl *conn = nullptr;
utime_t last_inactive = ceph_clock_now();
bool rearmed = false;
int r = 0;
while (true) {
- Device *ibdev;
-
- int tx_ret = global_infiniband->poll_tx(MAX_COMPLETIONS, &ibdev, wc);
+ int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
if (tx_ret > 0) {
ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
<< " responses."<< dendl;
- handle_tx_event(ibdev, wc, tx_ret);
+ handle_tx_event(wc, tx_ret);
}
- int rx_ret = global_infiniband->poll_rx(MAX_COMPLETIONS, &ibdev, wc);
+ int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
if (rx_ret > 0) {
- ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
+ ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
if (response->status == IBV_WC_SUCCESS) {
conn = get_conn_lockless(response->qp_num);
if (!conn) {
- assert(ibdev->is_rx_buffer(chunk->buffer));
- r = ibdev->post_chunk(chunk);
+ assert(global_infiniband->is_rx_buffer(chunk->buffer));
+ r = global_infiniband->post_chunk(chunk);
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
assert(r == 0);
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
- << Infiniband::wc_status_to_string(response->status) << ")" << dendl;
- assert(ibdev->is_rx_buffer(chunk->buffer));
- r = ibdev->post_chunk(chunk);
+ << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
+ assert(global_infiniband->is_rx_buffer(chunk->buffer));
+ r = global_infiniband->post_chunk(chunk);
if (r) {
ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
assert(r == 0);
break;
if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
+ handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
- global_infiniband->rearm_notify();
+ tx_cq->rearm_notify();
+ rx_cq->rearm_notify();
rearmed = true;
continue;
}
+ struct pollfd channel_poll[2];
+ channel_poll[0].fd = tx_cc->get_fd();
+ channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[0].revents = 0;
+ channel_poll[1].fd = rx_cc->get_fd();
+ channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[1].revents = 0;
+ r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
-
- r = global_infiniband->poll_blocking(done);
- if (r > 0)
- ldout(cct, 20) << __func__ << " got a cq event." << dendl;
-
+ while (!done && r == 0) {
+ r = poll(channel_poll, 2, 100);
+ if (r < 0) {
+ r = -errno;
+ lderr(cct) << __func__ << " poll failed " << r << dendl;
+ ceph_abort();
+ }
+ }
+ if (r > 0 && tx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
+ if (r > 0 && rx_cc->get_cq_event())
+ ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
last_inactive = ceph_clock_now();
perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
erase_qpn_lockless(qpn);
}
-void RDMADispatcher::handle_tx_event(Device *ibdev, ibv_wc *cqe, int n)
+void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
{
std::vector<Chunk*> tx_chunks;
}
}
- // FIXME: why not tx?
- if (ibdev->get_memory_manager()->is_tx_buffer(chunk->buffer))
+ //TX completion may come either from regular send message or from 'fin' message.
+ //In the case of 'fin' wr_id points to the QueuePair.
+ if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
tx_chunks.push_back(chunk);
- else
+ } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
+ ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
+ } else {
ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
+ ceph_abort();
+ }
}
perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
- post_tx_buffer(ibdev, tx_chunks);
+ post_tx_buffer(tx_chunks);
}
/**
* \return
* 0 if success or -1 for failure
*/
-void RDMADispatcher::post_tx_buffer(Device *ibdev, std::vector<Chunk*> &chunks)
+void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
{
if (chunks.empty())
return ;
inflight -= chunks.size();
- ibdev->get_memory_manager()->return_tx(chunks);
+ global_infiniband->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
{
global_infiniband->init();
- auto p = new RDMAServerConnTCP(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
+ auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
- Device *ibdev = o->get_device();
-
assert(center.in_thread());
- int r = ibdev->get_tx_buffers(c, bytes);
+ int r = global_infiniband->get_tx_buffers(c, bytes);
assert(r >= 0);
- size_t got = ibdev->get_memory_manager()->get_tx_buffer_size() * r;
+ size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher()->inflight += r;
if (got == bytes)
//
//On RDMA MUST be called before fork
//
+
int rc = ibv_fork_init();
if (rc) {
lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
ceph_abort();
}
+ ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
+ if (cct->_conf->ms_async_rdma_enable_hugepage) {
+ rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
+ ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
+ if (rc) {
+ lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
+ ceph_abort();
+ }
+ }
+
//Check ulimit
struct rlimit limit;
getrlimit(RLIMIT_MEMLOCK, &limit);
}
if (!global_infiniband)
- global_infiniband.construct(cct);
+ global_infiniband.construct(
+ cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
RDMAStack::~RDMAStack()
{
+ if (cct->_conf->ms_async_rdma_enable_hugepage) {
+ unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
+ }
+
delete dispatcher;
}