ceph_assert(qp_conns.empty());
ceph_assert(num_qp_conn == 0);
ceph_assert(dead_queue_pairs.empty());
- ceph_assert(num_dead_queue_pair == 0);
-
- delete async_handler;
}
-RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
- : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
- w_lock("RDMADispatcher::for worker pending list"), stack(s)
+RDMADispatcher::RDMADispatcher(CephContext* c, shared_ptr<Infiniband>& ib)
+ : cct(c), ib(ib)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
void RDMADispatcher::polling_start()
{
// take lock because listen/connect can happen from different worker threads
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
if (t.joinable())
return; // dispatcher thread already running
- get_stack()->get_infiniband().get_memory_manager()->set_rx_stat_logger(perf_logger);
+ ib->get_memory_manager()->set_rx_stat_logger(perf_logger);
- tx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+ tx_cc = ib->create_comp_channel(cct);
ceph_assert(tx_cc);
- rx_cc = get_stack()->get_infiniband().create_comp_channel(cct);
+ rx_cc = ib->create_comp_channel(cct);
ceph_assert(rx_cc);
- tx_cq = get_stack()->get_infiniband().create_comp_queue(cct, tx_cc);
+ tx_cq = ib->create_comp_queue(cct, tx_cc);
ceph_assert(tx_cq);
- rx_cq = get_stack()->get_infiniband().create_comp_queue(cct, rx_cc);
+ rx_cq = ib->create_comp_queue(cct, rx_cc);
ceph_assert(rx_cq);
t = std::thread(&RDMADispatcher::polling, this);
void RDMADispatcher::polling_stop()
{
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
done = true;
}
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
- if (ibv_get_async_event(get_stack()->get_infiniband().get_device()->ctxt, &async_event)) {
+ if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
return;
}
perf_logger->inc(l_msgr_rdma_total_async_events);
- // FIXME: Currently we must ensure no other factor make QP in ERROR state,
- // otherwise this qp can't be deleted in current cleanup flow.
- if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
- perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
- uint64_t qpn = async_event.element.qp->qp_num;
- ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
- << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
- Mutex::Locker l(lock);
- RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
- if (!conn) {
- ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
- } else {
- ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
- conn->fault();
- if (!cct->_conf->ms_async_rdma_cm)
- erase_qpn_lockless(qpn);
- }
- } else {
- ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << get_stack()->get_infiniband().get_device()->ctxt
- << " evt: " << ibv_event_type_str(async_event.event_type)
- << dendl;
+ ldout(cct, 1) << __func__ << "Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+
+ switch (async_event.event_type) {
+ /***********************CQ events********************/
+ case IBV_EVENT_CQ_ERR:
+ lderr(cct) << __func__ << " Fatal Error, effect all QP bound with same CQ, "
+ << " CQ Overflow, dev = " << ib->get_device()->ctxt
+ << " Need destroy and recreate resource " << dendl;
+ break;
+ /***********************QP events********************/
+ case IBV_EVENT_QP_FATAL:
+ {
+ /* Error occurred on a QP and it transitioned to error state */
+ ibv_qp* ib_qp = async_event.element.qp;
+ uint32_t qpn = ib_qp->qp_num;
+ QueuePair* qp = get_qp(qpn);
+ lderr(cct) << __func__ << " Fatal Error, event associate qp number: " << qpn
+ << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
+ << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+ }
+ break;
+ case IBV_EVENT_QP_LAST_WQE_REACHED:
+ {
+ /*
+ * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
+ * Reason: QP is force switched into Error before posting Beacon WR.
+ * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
+ * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
+ * Handle: Only confirm that qp enter into dead queue pairs
+ * 2. The CQE with error was generated for the last WQE
+ * Handle: output error log
+ */
+ perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
+ ibv_qp* ib_qp = async_event.element.qp;
+ uint32_t qpn = ib_qp->qp_num;
+ std::lock_guard l{lock};
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
+ QueuePair* qp = get_qp_lockless(qpn);
+
+ if (qp && !qp->is_dead()) {
+ lderr(cct) << __func__ << " QP not dead, event associate qp number: " << qpn
+ << " Queue Pair status: " << Infiniband::qp_state_string(qp->get_state())
+ << " Event : " << ibv_event_type_str(async_event.event_type) << dendl;
+ }
+ if (!conn) {
+ ldout(cct, 20) << __func__ << " Connection's QP maybe entered into dead status. "
+ << " qp number: " << qpn << dendl;
+ } else {
+ conn->fault();
+ if (qp) {
+ if (!cct->_conf->ms_async_rdma_cm)
+ enqueue_dead_qp(qpn);
+ }
+ }
+ }
+ break;
+ case IBV_EVENT_QP_REQ_ERR:
+ /* Invalid Request Local Work Queue Error */
+ [[fallthrough]];
+ case IBV_EVENT_QP_ACCESS_ERR:
+ /* Local access violation error */
+ [[fallthrough]];
+ case IBV_EVENT_COMM_EST:
+ /* Communication was established on a QP */
+ [[fallthrough]];
+ case IBV_EVENT_SQ_DRAINED:
+ /* Send Queue was drained of outstanding messages in progress */
+ [[fallthrough]];
+ case IBV_EVENT_PATH_MIG:
+ /* A connection has migrated to the alternate path */
+ [[fallthrough]];
+ case IBV_EVENT_PATH_MIG_ERR:
+ /* A connection failed to migrate to the alternate path */
+ break;
+ /***********************SRQ events*******************/
+ case IBV_EVENT_SRQ_ERR:
+ /* Error occurred on an SRQ */
+ [[fallthrough]];
+ case IBV_EVENT_SRQ_LIMIT_REACHED:
+ /* SRQ limit was reached */
+ break;
+ /***********************Port events******************/
+ case IBV_EVENT_PORT_ACTIVE:
+ /* Link became active on a port */
+ [[fallthrough]];
+ case IBV_EVENT_PORT_ERR:
+ /* Link became unavailable on a port */
+ [[fallthrough]];
+ case IBV_EVENT_LID_CHANGE:
+ /* LID was changed on a port */
+ [[fallthrough]];
+ case IBV_EVENT_PKEY_CHANGE:
+ /* P_Key table was changed on a port */
+ [[fallthrough]];
+ case IBV_EVENT_SM_CHANGE:
+ /* SM was changed on a port */
+ [[fallthrough]];
+ case IBV_EVENT_CLIENT_REREGISTER:
+ /* SM sent a CLIENT_REREGISTER request to a port */
+ [[fallthrough]];
+ case IBV_EVENT_GID_CHANGE:
+ /* GID table was changed on a port */
+ break;
+
+ /***********************CA events******************/
+ //CA events:
+ case IBV_EVENT_DEVICE_FATAL:
+ /* CA is in FATAL state */
+ lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
+ << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
+ break;
+ default:
+ lderr(cct) << __func__ << " ibv_get_async_event: dev = " << ib->get_device()->ctxt
+ << " unknown event: " << async_event.event_type << dendl;
+ break;
}
ibv_ack_async_event(&async_event);
}
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
{
- Mutex::Locker l(lock);
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
+ std::lock_guard l{lock};
+ ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
-int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
+int RDMADispatcher::post_chunks_to_rq(int num, QueuePair *qp)
{
- Mutex::Locker l(lock);
- return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
+ std::lock_guard l{lock};
+ return ib->post_chunks_to_rq(num, qp);
}
void RDMADispatcher::polling()
std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
std::vector<ibv_wc> tx_cqe;
ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
- RDMAConnectedSocketImpl *conn = nullptr;
uint64_t last_inactive = Cycles::rdtsc();
bool rearmed = false;
int r = 0;
if (rx_ret > 0) {
ldout(cct, 20) << __func__ << " rx completion queue got " << rx_ret
<< " responses."<< dendl;
- perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
- perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);
-
- Mutex::Locker l(lock);//make sure connected socket alive when pass wc
-
- for (int i = 0; i < rx_ret; ++i) {
- ibv_wc* response = &wc[i];
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
-
- if (response->status == IBV_WC_SUCCESS) {
- ceph_assert(wc[i].opcode == IBV_WC_RECV);
- conn = get_conn_lockless(response->qp_num);
- if (!conn) {
- ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
- perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
- } else {
- conn->post_chunks_to_rq(1);
- polled[conn].push_back(*response);
- }
- } else {
- perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
- ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
- << ") status(" << response->status << ":"
- << get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
- if (response->status != IBV_WC_WR_FLUSH_ERR) {
- conn = get_conn_lockless(response->qp_num);
- if (conn && conn->is_connected())
- conn->fault();
- }
- get_stack()->get_infiniband().post_chunk_to_pool(chunk);
- perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
- }
- }
- for (auto &&i : polled)
- i.first->pass_wc(std::move(i.second));
- polled.clear();
+ handle_rx_event(wc, rx_ret);
}
if (!tx_ret && !rx_ret) {
- // NOTE: Has TX just transitioned to idle? We should do it when idle!
- // It's now safe to delete queue pairs (see comment by declaration
- // for dead_queue_pairs).
- // Additionally, don't delete qp while outstanding_buffers isn't empty,
- // because we need to check qp's state before sending
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
- if (num_dead_queue_pair) {
- Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
- auto it = dead_queue_pairs.begin();
- while (it != dead_queue_pairs.end()) {
- auto i = *it;
- // Bypass QPs that do not collect all Tx completions yet.
- if (i->get_tx_wr()) {
- ldout(cct, 20) << __func__ << " bypass qp=" << i << " tx_wr=" << i->get_tx_wr() << dendl;
- ++it;
- } else {
- ldout(cct, 10) << __func__ << " finally delete qp=" << i << dendl;
- delete i;
- it = dead_queue_pairs.erase(it);
- perf_logger->dec(l_msgr_rdma_active_queue_pair);
- --num_dead_queue_pair;
- }
+ //
+ // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
+ // we can destroy QPs even earlier, just when beacon has been received,
+ // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
+ // CQ before other WCs are fully consumed from rx CQ. For safety, we
+ // wait for beacon and then "no-events" from CQs.
+ //
+ // Calling size() on vector without locks is totally fine, since we
+ // use it as a hint (accuracy is not important here)
+ //
+ if (!dead_queue_pairs.empty()) {
+ decltype(dead_queue_pairs) dead_qps;
+ {
+ std::lock_guard l{lock};
+ dead_queue_pairs.swap(dead_qps);
+ }
+
+ for (auto& qp: dead_qps) {
+ perf_logger->dec(l_msgr_rdma_active_queue_pair);
+ ldout(cct, 10) << __func__ << " finally delete qp = " << qp << dendl;
+ delete qp;
}
}
+
if (!num_qp_conn && done && dead_queue_pairs.empty())
break;
struct pollfd channel_poll[2];
channel_poll[0].fd = tx_cc->get_fd();
- channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[0].events = POLLIN;
channel_poll[0].revents = 0;
channel_poll[1].fd = rx_cc->get_fd();
- channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+ channel_poll[1].events = POLLIN;
channel_poll[1].revents = 0;
r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
if (num_pending_workers) {
RDMAWorker *w = nullptr;
{
- Mutex::Locker l(w_lock);
+ std::lock_guard l{w_lock};
if (!pending_workers.empty()) {
w = pending_workers.front();
pending_workers.pop_front();
void RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
{
- Mutex::Locker l(lock);
+ std::lock_guard l{lock};
ceph_assert(!qp_conns.count(qp->get_local_qp_number()));
qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
++num_qp_conn;
return it->second.second;
}
-Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
+Infiniband::QueuePair* RDMADispatcher::get_qp_lockless(uint32_t qp)
{
- Mutex::Locker l(lock);
// Try to find the QP in qp_conns firstly.
auto it = qp_conns.find(qp);
if (it != qp_conns.end())
return nullptr;
}
-void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
+Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
{
+ std::lock_guard l{lock};
+ return get_qp_lockless(qp);
+}
+
+void RDMADispatcher::enqueue_dead_qp(uint32_t qpn)
+{
+ std::lock_guard l{lock};
auto it = qp_conns.find(qpn);
- if (it == qp_conns.end())
+ if (it == qp_conns.end()) {
+ lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
return ;
- ++num_dead_queue_pair;
- dead_queue_pairs.push_back(it->second.first);
+ }
+ QueuePair *qp = it->second.first;
+ dead_queue_pairs.push_back(qp);
qp_conns.erase(it);
--num_qp_conn;
}
-void RDMADispatcher::erase_qpn(uint32_t qpn)
+void RDMADispatcher::schedule_qp_destroy(uint32_t qpn)
{
- Mutex::Locker l(lock);
- erase_qpn_lockless(qpn);
+ std::lock_guard l{lock};
+ auto it = qp_conns.find(qpn);
+ if (it == qp_conns.end()) {
+ lderr(cct) << __func__ << " QP [" << qpn << "] is not registered." << dendl;
+ return;
+ }
+ QueuePair *qp = it->second.first;
+ if (qp->to_dead()) {
+ //
+ // Failed to switch to dead. This is abnormal, but we can't
+ // do anything, so just destroy QP.
+ //
+ dead_queue_pairs.push_back(qp);
+ qp_conns.erase(it);
+ --num_qp_conn;
+ } else {
+ //
+ // Successfully switched to dead, thus keep entry in the map.
+ // But only zero out socked pointer in order to return null from
+ // get_conn_lockless();
+ it->second.second = nullptr;
+ }
}
void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
for (int i = 0; i < n; ++i) {
ibv_wc* response = &cqe[i];
- Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
- ldout(cct, 25) << __func__ << " QP: " << response->qp_num
- << " len: " << response->byte_len << " , addr:" << chunk
- << " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
- QueuePair *qp = get_qp(response->qp_num);
- if (qp)
- qp->dec_tx_wr(1);
+ // If it's beacon WR, enqueue the QP to be destroyed later
+ if (response->wr_id == BEACON_WRID) {
+ enqueue_dead_qp(response->qp_num);
+ continue;
+ }
+
+ ldout(cct, 20) << __func__ << " QP number: " << response->qp_num << " len: " << response->byte_len
+ << " status: " << ib->wc_status_to_string(response->status) << dendl;
if (response->status != IBV_WC_SUCCESS) {
- perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
- if (response->status == IBV_WC_RETRY_EXC_ERR) {
- ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
- perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
- } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
- ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
- << response->qp_num << " should be down while this WR=" << response->wr_id
- << " still in flight." << dendl;
- perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
- } else {
- ldout(cct, 1) << __func__ << " send work request returned error for buffer("
- << response->wr_id << ") status(" << response->status << "): "
- << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
- Mutex::Locker l(lock);//make sure connected socket alive when pass wc
- RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
-
- if (conn && conn->is_connected()) {
- ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;
- conn->fault();
- } else {
- ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
- }
+ switch(response->status) {
+ case IBV_WC_RETRY_EXC_ERR:
+ {
+ perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
+
+ ldout(cct, 1) << __func__ << " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+ std::lock_guard l{lock};
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ if (conn) {
+ ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
+ break;
+ case IBV_WC_WR_FLUSH_ERR:
+ {
+ perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
+
+ std::lock_guard l{lock};
+ QueuePair *qp = get_qp_lockless(response->qp_num);
+ if (qp) {
+ ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ }
+ if (qp && qp->is_dead()) {
+ ldout(cct, 20) << __func__ << " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl;
+ } else {
+ lderr(cct) << __func__ << " Invalid/Unsupported request to consume outstanding SQ WR,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ if (conn) {
+ ldout(cct, 1) << __func__ << " SQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
+ }
+ break;
+
+ default:
+ {
+ lderr(cct) << __func__ << " SQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+
+ std::lock_guard l{lock};
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ if (conn && conn->is_connected()) {
+ ldout(cct, 20) << __func__ << " SQ WR return error Queue Pair error state is : " << conn->get_qp_state()
+ << " remote Queue Pair, qp number: " << conn->get_peer_qpn() << dendl;
+ conn->fault();
+ } else {
+ ldout(cct, 1) << __func__ << " Disconnected, qp_num = " << response->qp_num << " discard event" << dendl;
+ }
+ }
+ break;
}
}
- //TX completion may come either from regular send message or from 'fin' message.
- //In the case of 'fin' wr_id points to the QueuePair.
- if (get_stack()->get_infiniband().get_memory_manager()->is_tx_buffer(chunk->buffer)) {
+ auto chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ //TX completion may come either from
+ // 1) regular send message, WCE wr_id points to chunk
+ // 2) 'fin' message, wr_id points to the QP
+ if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
tx_chunks.push_back(chunk);
} else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
return ;
inflight -= chunks.size();
- get_stack()->get_infiniband().get_memory_manager()->return_tx(chunks);
+ ib->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
}
+void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number)
+{
+ perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_number);
+ perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_number);
+
+ std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
+ std::lock_guard l{lock};//make sure connected socket alive when pass wc
+
+ for (int i = 0; i < rx_number; ++i) {
+ ibv_wc* response = &cqe[i];
+ Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+ RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
+ QueuePair *qp = get_qp_lockless(response->qp_num);
+
+ switch (response->status) {
+ case IBV_WC_SUCCESS:
+ ceph_assert(response->opcode == IBV_WC_RECV);
+ if (!conn) {
+ ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk 0x"
+ << std::hex << chunk << " will be back." << std::dec << dendl;
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ } else {
+ conn->post_chunks_to_rq(1);
+ polled[conn].push_back(*response);
+
+ if (qp != nullptr && !qp->get_srq()) {
+ qp->remove_rq_wr(chunk);
+ chunk->clear_qp();
+ }
+ }
+ break;
+
+ case IBV_WC_WR_FLUSH_ERR:
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+ if (qp) {
+ ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+ }
+ if (qp && qp->is_dead()) {
+ ldout(cct, 20) << __func__ << " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl;
+ } else {
+ ldout(cct, 1) << __func__ << " RQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+ if (conn) {
+ ldout(cct, 1) << __func__ << " RQ WR return error, remote Queue Pair, qp number: "
+ << conn->get_peer_qpn() << dendl;
+ }
+ }
+
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ break;
-RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
- : Worker(c, i), stack(nullptr),
- tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
+ default:
+ perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
+
+ ldout(cct, 1) << __func__ << " RQ WR return error,"
+ << " WCE status(" << response->status << "): " << ib->wc_status_to_string(response->status)
+ << " WCE QP number " << response->qp_num << " Opcode " << response->opcode
+ << " wr_id: 0x" << std::hex << response->wr_id << std::dec << dendl;
+ if (conn && conn->is_connected())
+ conn->fault();
+
+ ib->post_chunk_to_pool(chunk);
+ perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
+ break;
+ }
+ }
+
+ for (auto &i : polled)
+ i.first->pass_wc(std::move(i.second));
+ polled.clear();
+}
+
+RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
+ : Worker(c, worker_id),
+ tx_handler(new C_handle_cq_tx(this))
{
// initialize perf_logger
char name[128];
void RDMAWorker::initialize()
{
- if (!dispatcher) {
- dispatcher = &stack->get_dispatcher();
- }
+ ceph_assert(dispatcher);
}
int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
const SocketOptions &opt,ServerSocket *sock)
{
- get_stack()->get_infiniband().init();
+ ib->init();
dispatcher->polling_start();
+
RDMAServerSocketImpl *p;
if (cct->_conf->ms_async_rdma_type == "iwarp") {
- p = new RDMAIWARPServerSocketImpl(
- cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this,
- sa, addr_slot);
+ p = new RDMAIWARPServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
} else {
- p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(),
- &get_stack()->get_dispatcher(), this, sa,
- addr_slot);
+ p = new RDMAServerSocketImpl(cct, ib, dispatcher, this, sa, addr_slot);
}
int r = p->listen(sa, opt);
if (r < 0) {
int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
- get_stack()->get_infiniband().init();
+ ib->init();
dispatcher->polling_start();
RDMAConnectedSocketImpl* p;
if (cct->_conf->ms_async_rdma_type == "iwarp") {
- p = new RDMAIWARPConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+ p = new RDMAIWARPConnectedSocketImpl(cct, ib, dispatcher, this);
} else {
- p = new RDMAConnectedSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this);
+ p = new RDMAConnectedSocketImpl(cct, ib, dispatcher, this);
}
int r = p->try_connect(addr, opts);
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
ceph_assert(center.in_thread());
- int r = get_stack()->get_infiniband().get_tx_buffers(c, bytes);
- ceph_assert(r >= 0);
- size_t got = get_stack()->get_infiniband().get_memory_manager()->get_tx_buffer_size() * r;
+ int r = ib->get_tx_buffers(c, bytes);
+ size_t got = ib->get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
- stack->get_dispatcher().inflight += r;
+ dispatcher->inflight += r;
if (got >= bytes)
return r;
}
RDMAStack::RDMAStack(CephContext *cct, const string &t)
- : NetworkStack(cct, t), ib(cct), dispatcher(cct, this)
+ : NetworkStack(cct, t), ib(make_shared<Infiniband>(cct)),
+ rdma_dispatcher(make_shared<RDMADispatcher>(cct, ib))
{
ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
- w->set_stack(this);
+ w->set_dispatcher(rdma_dispatcher);
+ w->set_ib(ib);
}
- ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << &dispatcher << dendl;
+ ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher.get() << dendl;
}
RDMAStack::~RDMAStack()