1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
20 #include <sys/resource.h>
22 #include "include/str_list.h"
23 #include "include/compat.h"
24 #include "common/Cycles.h"
25 #include "common/deleter.h"
26 #include "common/Tub.h"
27 #include "RDMAStack.h"
29 #define dout_subsys ceph_subsys_ms
31 #define dout_prefix *_dout << "RDMAStack "
33 RDMADispatcher::~RDMADispatcher()
35 ldout(cct
, 20) << __func__
<< " destructing rdma dispatcher" << dendl
;
38 ceph_assert(qp_conns
.empty());
39 ceph_assert(num_qp_conn
== 0);
40 ceph_assert(dead_queue_pairs
.empty());
43 RDMADispatcher::RDMADispatcher(CephContext
* c
, std::shared_ptr
<Infiniband
>& ib
)
46 PerfCountersBuilder
plb(cct
, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first
, l_msgr_rdma_dispatcher_last
);
48 plb
.add_u64_counter(l_msgr_rdma_polling
, "polling", "Whether dispatcher thread is polling");
49 plb
.add_u64_counter(l_msgr_rdma_inflight_tx_chunks
, "inflight_tx_chunks", "The number of inflight tx chunks");
50 plb
.add_u64_counter(l_msgr_rdma_rx_bufs_in_use
, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
51 plb
.add_u64_counter(l_msgr_rdma_rx_bufs_total
, "rx_bufs_total", "The total number of rx buffers");
53 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc
, "tx_total_wc", "The number of tx work comletions");
54 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc_errors
, "tx_total_wc_errors", "The number of tx errors");
55 plb
.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors
, "tx_retry_errors", "The number of tx retry errors");
56 plb
.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors
, "tx_wr_flush_errors", "The number of tx work request flush errors");
58 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc
, "rx_total_wc", "The number of total rx work completion");
59 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc_errors
, "rx_total_wc_errors", "The number of total rx error work completion");
60 plb
.add_u64_counter(l_msgr_rdma_rx_fin
, "rx_fin", "The number of rx finish work request");
62 plb
.add_u64_counter(l_msgr_rdma_total_async_events
, "total_async_events", "The number of async events");
63 plb
.add_u64_counter(l_msgr_rdma_async_last_wqe_events
, "async_last_wqe_events", "The number of last wqe events");
65 plb
.add_u64_counter(l_msgr_rdma_handshake_errors
, "handshake_errors", "The number of handshake errors");
68 plb
.add_u64_counter(l_msgr_rdma_created_queue_pair
, "created_queue_pair", "Active queue pair number");
69 plb
.add_u64_counter(l_msgr_rdma_active_queue_pair
, "active_queue_pair", "Created queue pair number");
71 perf_logger
= plb
.create_perf_counters();
72 cct
->get_perfcounters_collection()->add(perf_logger
);
76 void RDMADispatcher::polling_start()
78 // take lock because listen/connect can happen from different worker threads
79 std::lock_guard l
{lock
};
82 return; // dispatcher thread already running
84 ib
->get_memory_manager()->set_rx_stat_logger(perf_logger
);
86 tx_cc
= ib
->create_comp_channel(cct
);
88 rx_cc
= ib
->create_comp_channel(cct
);
90 tx_cq
= ib
->create_comp_queue(cct
, tx_cc
);
92 rx_cq
= ib
->create_comp_queue(cct
, rx_cc
);
95 t
= std::thread(&RDMADispatcher::polling
, this);
96 ceph_pthread_setname(t
.native_handle(), "rdma-polling");
99 void RDMADispatcher::polling_stop()
102 std::lock_guard l
{lock
};
119 void RDMADispatcher::handle_async_event()
121 ldout(cct
, 30) << __func__
<< dendl
;
123 ibv_async_event async_event
;
124 if (ibv_get_async_event(ib
->get_device()->ctxt
, &async_event
)) {
126 lderr(cct
) << __func__
<< " ibv_get_async_event failed. (errno=" << errno
127 << " " << cpp_strerror(errno
) << ")" << dendl
;
130 perf_logger
->inc(l_msgr_rdma_total_async_events
);
131 ldout(cct
, 1) << __func__
<< "Event : " << ibv_event_type_str(async_event
.event_type
) << dendl
;
133 switch (async_event
.event_type
) {
134 /***********************CQ events********************/
135 case IBV_EVENT_CQ_ERR
:
136 lderr(cct
) << __func__
<< " Fatal Error, effect all QP bound with same CQ, "
137 << " CQ Overflow, dev = " << ib
->get_device()->ctxt
138 << " Need destroy and recreate resource " << dendl
;
140 /***********************QP events********************/
141 case IBV_EVENT_QP_FATAL
:
143 /* Error occurred on a QP and it transitioned to error state */
144 ibv_qp
* ib_qp
= async_event
.element
.qp
;
145 uint32_t qpn
= ib_qp
->qp_num
;
146 QueuePair
* qp
= get_qp(qpn
);
147 lderr(cct
) << __func__
<< " Fatal Error, event associate qp number: " << qpn
148 << " Queue Pair status: " << Infiniband::qp_state_string(qp
->get_state())
149 << " Event : " << ibv_event_type_str(async_event
.event_type
) << dendl
;
152 case IBV_EVENT_QP_LAST_WQE_REACHED
:
155 * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
156 * Reason: QP is force switched into Error before posting Beacon WR.
157 * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
158 * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
159 * Handle: Only confirm that qp enter into dead queue pairs
160 * 2. The CQE with error was generated for the last WQE
161 * Handle: output error log
163 perf_logger
->inc(l_msgr_rdma_async_last_wqe_events
);
164 ibv_qp
* ib_qp
= async_event
.element
.qp
;
165 uint32_t qpn
= ib_qp
->qp_num
;
166 std::lock_guard l
{lock
};
167 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(qpn
);
168 QueuePair
* qp
= get_qp_lockless(qpn
);
170 if (qp
&& !qp
->is_dead()) {
171 lderr(cct
) << __func__
<< " QP not dead, event associate qp number: " << qpn
172 << " Queue Pair status: " << Infiniband::qp_state_string(qp
->get_state())
173 << " Event : " << ibv_event_type_str(async_event
.event_type
) << dendl
;
176 ldout(cct
, 20) << __func__
<< " Connection's QP maybe entered into dead status. "
177 << " qp number: " << qpn
<< dendl
;
181 if (!cct
->_conf
->ms_async_rdma_cm
) {
182 enqueue_dead_qp_lockless(qpn
);
188 case IBV_EVENT_QP_REQ_ERR
:
189 /* Invalid Request Local Work Queue Error */
191 case IBV_EVENT_QP_ACCESS_ERR
:
192 /* Local access violation error */
194 case IBV_EVENT_COMM_EST
:
195 /* Communication was established on a QP */
197 case IBV_EVENT_SQ_DRAINED
:
198 /* Send Queue was drained of outstanding messages in progress */
200 case IBV_EVENT_PATH_MIG
:
201 /* A connection has migrated to the alternate path */
203 case IBV_EVENT_PATH_MIG_ERR
:
204 /* A connection failed to migrate to the alternate path */
206 /***********************SRQ events*******************/
207 case IBV_EVENT_SRQ_ERR
:
208 /* Error occurred on an SRQ */
210 case IBV_EVENT_SRQ_LIMIT_REACHED
:
211 /* SRQ limit was reached */
213 /***********************Port events******************/
214 case IBV_EVENT_PORT_ACTIVE
:
215 /* Link became active on a port */
217 case IBV_EVENT_PORT_ERR
:
218 /* Link became unavailable on a port */
220 case IBV_EVENT_LID_CHANGE
:
221 /* LID was changed on a port */
223 case IBV_EVENT_PKEY_CHANGE
:
224 /* P_Key table was changed on a port */
226 case IBV_EVENT_SM_CHANGE
:
227 /* SM was changed on a port */
229 case IBV_EVENT_CLIENT_REREGISTER
:
230 /* SM sent a CLIENT_REREGISTER request to a port */
232 case IBV_EVENT_GID_CHANGE
:
233 /* GID table was changed on a port */
236 /***********************CA events******************/
238 case IBV_EVENT_DEVICE_FATAL
:
239 /* CA is in FATAL state */
240 lderr(cct
) << __func__
<< " ibv_get_async_event: dev = " << ib
->get_device()->ctxt
241 << " evt: " << ibv_event_type_str(async_event
.event_type
) << dendl
;
244 lderr(cct
) << __func__
<< " ibv_get_async_event: dev = " << ib
->get_device()->ctxt
245 << " unknown event: " << async_event
.event_type
<< dendl
;
248 ibv_ack_async_event(&async_event
);
252 void RDMADispatcher::post_chunk_to_pool(Chunk
* chunk
)
254 std::lock_guard l
{lock
};
255 ib
->post_chunk_to_pool(chunk
);
256 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
259 int RDMADispatcher::post_chunks_to_rq(int num
, QueuePair
*qp
)
261 std::lock_guard l
{lock
};
262 return ib
->post_chunks_to_rq(num
, qp
);
265 void RDMADispatcher::polling()
267 static int MAX_COMPLETIONS
= 32;
268 ibv_wc wc
[MAX_COMPLETIONS
];
270 std::map
<RDMAConnectedSocketImpl
*, std::vector
<ibv_wc
> > polled
;
271 std::vector
<ibv_wc
> tx_cqe
;
272 ldout(cct
, 20) << __func__
<< " going to poll tx cq: " << tx_cq
<< " rx cq: " << rx_cq
<< dendl
;
273 uint64_t last_inactive
= Cycles::rdtsc();
274 bool rearmed
= false;
278 int tx_ret
= tx_cq
->poll_cq(MAX_COMPLETIONS
, wc
);
280 ldout(cct
, 20) << __func__
<< " tx completion queue got " << tx_ret
281 << " responses."<< dendl
;
282 handle_tx_event(wc
, tx_ret
);
285 int rx_ret
= rx_cq
->poll_cq(MAX_COMPLETIONS
, wc
);
287 ldout(cct
, 20) << __func__
<< " rx completion queue got " << rx_ret
288 << " responses."<< dendl
;
289 handle_rx_event(wc
, rx_ret
);
292 if (!tx_ret
&& !rx_ret
) {
293 perf_logger
->set(l_msgr_rdma_inflight_tx_chunks
, inflight
);
295 // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
296 // we can destroy QPs even earlier, just when beacon has been received,
297 // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
298 // CQ before other WCs are fully consumed from rx CQ. For safety, we
299 // wait for beacon and then "no-events" from CQs.
301 // Calling size() on vector without locks is totally fine, since we
302 // use it as a hint (accuracy is not important here)
304 if (!dead_queue_pairs
.empty()) {
305 decltype(dead_queue_pairs
) dead_qps
;
307 std::lock_guard l
{lock
};
308 dead_queue_pairs
.swap(dead_qps
);
311 for (auto& qp
: dead_qps
) {
312 perf_logger
->dec(l_msgr_rdma_active_queue_pair
);
313 ldout(cct
, 10) << __func__
<< " finally delete qp = " << qp
<< dendl
;
318 if (!num_qp_conn
&& done
&& dead_queue_pairs
.empty())
321 uint64_t now
= Cycles::rdtsc();
322 if (Cycles::to_microseconds(now
- last_inactive
) > cct
->_conf
->ms_async_rdma_polling_us
) {
323 handle_async_event();
325 // Clean up cq events after rearm notify ensure no new incoming event
326 // arrived between polling and rearm
327 tx_cq
->rearm_notify();
328 rx_cq
->rearm_notify();
333 struct pollfd channel_poll
[2];
334 channel_poll
[0].fd
= tx_cc
->get_fd();
335 channel_poll
[0].events
= POLLIN
;
336 channel_poll
[0].revents
= 0;
337 channel_poll
[1].fd
= rx_cc
->get_fd();
338 channel_poll
[1].events
= POLLIN
;
339 channel_poll
[1].revents
= 0;
341 perf_logger
->set(l_msgr_rdma_polling
, 0);
342 while (!done
&& r
== 0) {
343 r
= TEMP_FAILURE_RETRY(poll(channel_poll
, 2, 100));
346 lderr(cct
) << __func__
<< " poll failed " << r
<< dendl
;
350 if (r
> 0 && tx_cc
->get_cq_event())
351 ldout(cct
, 20) << __func__
<< " got tx cq event." << dendl
;
352 if (r
> 0 && rx_cc
->get_cq_event())
353 ldout(cct
, 20) << __func__
<< " got rx cq event." << dendl
;
354 last_inactive
= Cycles::rdtsc();
355 perf_logger
->set(l_msgr_rdma_polling
, 1);
362 void RDMADispatcher::notify_pending_workers() {
363 if (num_pending_workers
) {
364 RDMAWorker
*w
= nullptr;
366 std::lock_guard l
{w_lock
};
367 if (!pending_workers
.empty()) {
368 w
= pending_workers
.front();
369 pending_workers
.pop_front();
370 --num_pending_workers
;
378 void RDMADispatcher::register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
)
380 std::lock_guard l
{lock
};
381 ceph_assert(!qp_conns
.count(qp
->get_local_qp_number()));
382 qp_conns
[qp
->get_local_qp_number()] = std::make_pair(qp
, csi
);
386 RDMAConnectedSocketImpl
* RDMADispatcher::get_conn_lockless(uint32_t qp
)
388 auto it
= qp_conns
.find(qp
);
389 if (it
== qp_conns
.end())
391 if (it
->second
.first
->is_dead())
393 return it
->second
.second
;
396 Infiniband::QueuePair
* RDMADispatcher::get_qp_lockless(uint32_t qp
)
398 // Try to find the QP in qp_conns firstly.
399 auto it
= qp_conns
.find(qp
);
400 if (it
!= qp_conns
.end())
401 return it
->second
.first
;
403 // Try again in dead_queue_pairs.
404 for (auto &i
: dead_queue_pairs
)
405 if (i
->get_local_qp_number() == qp
)
411 Infiniband::QueuePair
* RDMADispatcher::get_qp(uint32_t qp
)
413 std::lock_guard l
{lock
};
414 return get_qp_lockless(qp
);
417 void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn
)
419 auto it
= qp_conns
.find(qpn
);
420 if (it
== qp_conns
.end()) {
421 lderr(cct
) << __func__
<< " QP [" << qpn
<< "] is not registered." << dendl
;
424 QueuePair
*qp
= it
->second
.first
;
425 dead_queue_pairs
.push_back(qp
);
430 void RDMADispatcher::enqueue_dead_qp(uint32_t qpn
)
432 std::lock_guard l
{lock
};
433 enqueue_dead_qp_lockless(qpn
);
436 void RDMADispatcher::schedule_qp_destroy(uint32_t qpn
)
438 std::lock_guard l
{lock
};
439 auto it
= qp_conns
.find(qpn
);
440 if (it
== qp_conns
.end()) {
441 lderr(cct
) << __func__
<< " QP [" << qpn
<< "] is not registered." << dendl
;
444 QueuePair
*qp
= it
->second
.first
;
447 // Failed to switch to dead. This is abnormal, but we can't
448 // do anything, so just destroy QP.
450 dead_queue_pairs
.push_back(qp
);
455 // Successfully switched to dead, thus keep entry in the map.
456 // But only zero out socked pointer in order to return null from
457 // get_conn_lockless();
458 it
->second
.second
= nullptr;
462 void RDMADispatcher::handle_tx_event(ibv_wc
*cqe
, int n
)
464 std::vector
<Chunk
*> tx_chunks
;
466 for (int i
= 0; i
< n
; ++i
) {
467 ibv_wc
* response
= &cqe
[i
];
469 // If it's beacon WR, enqueue the QP to be destroyed later
470 if (response
->wr_id
== BEACON_WRID
) {
471 enqueue_dead_qp(response
->qp_num
);
475 ldout(cct
, 20) << __func__
<< " QP number: " << response
->qp_num
<< " len: " << response
->byte_len
476 << " status: " << ib
->wc_status_to_string(response
->status
) << dendl
;
478 if (response
->status
!= IBV_WC_SUCCESS
) {
479 switch(response
->status
) {
480 case IBV_WC_RETRY_EXC_ERR
:
482 perf_logger
->inc(l_msgr_rdma_tx_wc_retry_errors
);
484 ldout(cct
, 1) << __func__
<< " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
485 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
486 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
487 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
489 std::lock_guard l
{lock
};
490 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
492 ldout(cct
, 1) << __func__
<< " SQ WR return error, remote Queue Pair, qp number: "
493 << conn
->get_peer_qpn() << dendl
;
497 case IBV_WC_WR_FLUSH_ERR
:
499 perf_logger
->inc(l_msgr_rdma_tx_wc_wr_flush_errors
);
501 std::lock_guard l
{lock
};
502 QueuePair
*qp
= get_qp_lockless(response
->qp_num
);
504 ldout(cct
, 20) << __func__
<< " qp state is " << Infiniband::qp_state_string(qp
->get_state()) << dendl
;
506 if (qp
&& qp
->is_dead()) {
507 ldout(cct
, 20) << __func__
<< " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl
;
509 lderr(cct
) << __func__
<< " Invalid/Unsupported request to consume outstanding SQ WR,"
510 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
511 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
512 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
514 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
516 ldout(cct
, 1) << __func__
<< " SQ WR return error, remote Queue Pair, qp number: "
517 << conn
->get_peer_qpn() << dendl
;
525 lderr(cct
) << __func__
<< " SQ WR return error,"
526 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
527 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
528 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
530 std::lock_guard l
{lock
};
531 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
532 if (conn
&& conn
->is_connected()) {
533 ldout(cct
, 20) << __func__
<< " SQ WR return error Queue Pair error state is : " << conn
->get_qp_state()
534 << " remote Queue Pair, qp number: " << conn
->get_peer_qpn() << dendl
;
537 ldout(cct
, 1) << __func__
<< " Disconnected, qp_num = " << response
->qp_num
<< " discard event" << dendl
;
544 auto chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
545 //TX completion may come either from
546 // 1) regular send message, WCE wr_id points to chunk
547 // 2) 'fin' message, wr_id points to the QP
548 if (ib
->get_memory_manager()->is_valid_chunk(chunk
)) {
549 tx_chunks
.push_back(chunk
);
550 } else if (reinterpret_cast<QueuePair
*>(response
->wr_id
)->get_local_qp_number() == response
->qp_num
) {
551 ldout(cct
, 1) << __func__
<< " sending of the disconnect msg completed" << dendl
;
553 ldout(cct
, 1) << __func__
<< " not tx buffer, chunk " << chunk
<< dendl
;
558 perf_logger
->inc(l_msgr_rdma_tx_total_wc
, n
);
559 post_tx_buffer(tx_chunks
);
563 * Add the given Chunks to the given free queue.
566 * The Chunks to enqueue.
568 * 0 if success or -1 for failure
570 void RDMADispatcher::post_tx_buffer(std::vector
<Chunk
*> &chunks
)
575 inflight
-= chunks
.size();
576 ib
->get_memory_manager()->return_tx(chunks
);
577 ldout(cct
, 30) << __func__
<< " release " << chunks
.size()
578 << " chunks, inflight " << inflight
<< dendl
;
579 notify_pending_workers();
582 void RDMADispatcher::handle_rx_event(ibv_wc
*cqe
, int rx_number
)
584 perf_logger
->inc(l_msgr_rdma_rx_total_wc
, rx_number
);
585 perf_logger
->inc(l_msgr_rdma_rx_bufs_in_use
, rx_number
);
587 std::map
<RDMAConnectedSocketImpl
*, std::vector
<ibv_wc
> > polled
;
588 std::lock_guard l
{lock
};//make sure connected socket alive when pass wc
590 for (int i
= 0; i
< rx_number
; ++i
) {
591 ibv_wc
* response
= &cqe
[i
];
592 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
593 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
594 QueuePair
*qp
= get_qp_lockless(response
->qp_num
);
596 switch (response
->status
) {
598 ceph_assert(response
->opcode
== IBV_WC_RECV
);
600 ldout(cct
, 1) << __func__
<< " csi with qpn " << response
->qp_num
<< " may be dead. chunk 0x"
601 << std::hex
<< chunk
<< " will be back." << std::dec
<< dendl
;
602 ib
->post_chunk_to_pool(chunk
);
603 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
605 conn
->post_chunks_to_rq(1);
606 polled
[conn
].push_back(*response
);
608 if (qp
!= nullptr && !qp
->get_srq()) {
609 qp
->remove_rq_wr(chunk
);
615 case IBV_WC_WR_FLUSH_ERR
:
616 perf_logger
->inc(l_msgr_rdma_rx_total_wc_errors
);
619 ldout(cct
, 20) << __func__
<< " qp state is " << Infiniband::qp_state_string(qp
->get_state()) << dendl
;
621 if (qp
&& qp
->is_dead()) {
622 ldout(cct
, 20) << __func__
<< " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl
;
624 ldout(cct
, 1) << __func__
<< " RQ WR return error,"
625 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
626 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
627 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
629 ldout(cct
, 1) << __func__
<< " RQ WR return error, remote Queue Pair, qp number: "
630 << conn
->get_peer_qpn() << dendl
;
634 ib
->post_chunk_to_pool(chunk
);
635 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
639 perf_logger
->inc(l_msgr_rdma_rx_total_wc_errors
);
641 ldout(cct
, 1) << __func__
<< " RQ WR return error,"
642 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
643 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
644 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
645 if (conn
&& conn
->is_connected())
648 ib
->post_chunk_to_pool(chunk
);
649 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
654 for (auto &i
: polled
)
655 i
.first
->pass_wc(std::move(i
.second
));
659 RDMAWorker::RDMAWorker(CephContext
*c
, unsigned worker_id
)
660 : Worker(c
, worker_id
),
661 tx_handler(new C_handle_cq_tx(this))
663 // initialize perf_logger
665 sprintf(name
, "AsyncMessenger::RDMAWorker-%u", id
);
666 PerfCountersBuilder
plb(cct
, name
, l_msgr_rdma_first
, l_msgr_rdma_last
);
668 plb
.add_u64_counter(l_msgr_rdma_tx_no_mem
, "tx_no_mem", "The count of no tx buffer");
669 plb
.add_u64_counter(l_msgr_rdma_tx_parital_mem
, "tx_parital_mem", "The count of parital tx buffer");
670 plb
.add_u64_counter(l_msgr_rdma_tx_failed
, "tx_failed_post", "The number of tx failed posted");
672 plb
.add_u64_counter(l_msgr_rdma_tx_chunks
, "tx_chunks", "The number of tx chunks transmitted");
673 plb
.add_u64_counter(l_msgr_rdma_tx_bytes
, "tx_bytes", "The bytes of tx chunks transmitted", NULL
, 0, unit_t(UNIT_BYTES
));
674 plb
.add_u64_counter(l_msgr_rdma_rx_chunks
, "rx_chunks", "The number of rx chunks transmitted");
675 plb
.add_u64_counter(l_msgr_rdma_rx_bytes
, "rx_bytes", "The bytes of rx chunks transmitted", NULL
, 0, unit_t(UNIT_BYTES
));
676 plb
.add_u64_counter(l_msgr_rdma_pending_sent_conns
, "pending_sent_conns", "The count of pending sent conns");
678 perf_logger
= plb
.create_perf_counters();
679 cct
->get_perfcounters_collection()->add(perf_logger
);
682 RDMAWorker::~RDMAWorker()
687 void RDMAWorker::initialize()
689 ceph_assert(dispatcher
);
692 int RDMAWorker::listen(entity_addr_t
&sa
, unsigned addr_slot
,
693 const SocketOptions
&opt
,ServerSocket
*sock
)
696 dispatcher
->polling_start();
698 RDMAServerSocketImpl
*p
;
699 if (cct
->_conf
->ms_async_rdma_type
== "iwarp") {
700 p
= new RDMAIWARPServerSocketImpl(cct
, ib
, dispatcher
, this, sa
, addr_slot
);
702 p
= new RDMAServerSocketImpl(cct
, ib
, dispatcher
, this, sa
, addr_slot
);
704 int r
= p
->listen(sa
, opt
);
710 *sock
= ServerSocket(std::unique_ptr
<ServerSocketImpl
>(p
));
714 int RDMAWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
)
717 dispatcher
->polling_start();
719 RDMAConnectedSocketImpl
* p
;
720 if (cct
->_conf
->ms_async_rdma_type
== "iwarp") {
721 p
= new RDMAIWARPConnectedSocketImpl(cct
, ib
, dispatcher
, this);
723 p
= new RDMAConnectedSocketImpl(cct
, ib
, dispatcher
, this);
725 int r
= p
->try_connect(addr
, opts
);
728 ldout(cct
, 1) << __func__
<< " try connecting failed." << dendl
;
732 std::unique_ptr
<RDMAConnectedSocketImpl
> csi(p
);
733 *socket
= ConnectedSocket(std::move(csi
));
737 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
)
739 ceph_assert(center
.in_thread());
740 int r
= ib
->get_tx_buffers(c
, bytes
);
741 size_t got
= ib
->get_memory_manager()->get_tx_buffer_size() * r
;
742 ldout(cct
, 30) << __func__
<< " need " << bytes
<< " bytes, reserve " << got
<< " registered bytes, inflight " << dispatcher
->inflight
<< dendl
;
743 dispatcher
->inflight
+= r
;
748 if (!o
->is_pending()) {
749 pending_sent_conns
.push_back(o
);
750 perf_logger
->inc(l_msgr_rdma_pending_sent_conns
, 1);
753 dispatcher
->make_pending_worker(this);
759 void RDMAWorker::handle_pending_message()
761 ldout(cct
, 20) << __func__
<< " pending conns " << pending_sent_conns
.size() << dendl
;
762 while (!pending_sent_conns
.empty()) {
763 RDMAConnectedSocketImpl
*o
= pending_sent_conns
.front();
764 pending_sent_conns
.pop_front();
765 ssize_t r
= o
->submit(false);
766 ldout(cct
, 20) << __func__
<< " sent pending bl socket=" << o
<< " r=" << r
<< dendl
;
769 pending_sent_conns
.push_back(o
);
770 dispatcher
->make_pending_worker(this);
776 perf_logger
->dec(l_msgr_rdma_pending_sent_conns
, 1);
778 dispatcher
->notify_pending_workers();
781 RDMAStack::RDMAStack(CephContext
*cct
)
782 : NetworkStack(cct
), ib(std::make_shared
<Infiniband
>(cct
)),
783 rdma_dispatcher(std::make_shared
<RDMADispatcher
>(cct
, ib
))
785 ldout(cct
, 20) << __func__
<< " constructing RDMAStack..." << dendl
;
787 unsigned num
= get_num_worker();
788 for (unsigned i
= 0; i
< num
; ++i
) {
789 RDMAWorker
* w
= dynamic_cast<RDMAWorker
*>(get_worker(i
));
790 w
->set_dispatcher(rdma_dispatcher
);
793 ldout(cct
, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher
.get() << dendl
;
796 RDMAStack::~RDMAStack()
798 if (cct
->_conf
->ms_async_rdma_enable_hugepage
) {
799 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
803 void RDMAStack::spawn_worker(unsigned i
, std::function
<void ()> &&func
)
806 threads
[i
] = std::thread(func
);
809 void RDMAStack::join_worker(unsigned i
)
811 ceph_assert(threads
.size() > i
&& threads
[i
].joinable());