1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
20 #include <sys/resource.h>
22 #include "include/str_list.h"
23 #include "include/compat.h"
24 #include "common/Cycles.h"
25 #include "common/deleter.h"
26 #include "RDMAStack.h"
28 #define dout_subsys ceph_subsys_ms
30 #define dout_prefix *_dout << "RDMAStack "
32 RDMADispatcher::~RDMADispatcher()
34 ldout(cct
, 20) << __func__
<< " destructing rdma dispatcher" << dendl
;
37 ceph_assert(qp_conns
.empty());
38 ceph_assert(num_qp_conn
== 0);
39 ceph_assert(dead_queue_pairs
.empty());
42 RDMADispatcher::RDMADispatcher(CephContext
* c
, std::shared_ptr
<Infiniband
>& ib
)
45 PerfCountersBuilder
plb(cct
, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first
, l_msgr_rdma_dispatcher_last
);
47 plb
.add_u64_counter(l_msgr_rdma_polling
, "polling", "Whether dispatcher thread is polling");
48 plb
.add_u64_counter(l_msgr_rdma_inflight_tx_chunks
, "inflight_tx_chunks", "The number of inflight tx chunks");
49 plb
.add_u64_counter(l_msgr_rdma_rx_bufs_in_use
, "rx_bufs_in_use", "The number of rx buffers that are holding data and being processed");
50 plb
.add_u64_counter(l_msgr_rdma_rx_bufs_total
, "rx_bufs_total", "The total number of rx buffers");
52 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc
, "tx_total_wc", "The number of tx work comletions");
53 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc_errors
, "tx_total_wc_errors", "The number of tx errors");
54 plb
.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors
, "tx_retry_errors", "The number of tx retry errors");
55 plb
.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors
, "tx_wr_flush_errors", "The number of tx work request flush errors");
57 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc
, "rx_total_wc", "The number of total rx work completion");
58 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc_errors
, "rx_total_wc_errors", "The number of total rx error work completion");
59 plb
.add_u64_counter(l_msgr_rdma_rx_fin
, "rx_fin", "The number of rx finish work request");
61 plb
.add_u64_counter(l_msgr_rdma_total_async_events
, "total_async_events", "The number of async events");
62 plb
.add_u64_counter(l_msgr_rdma_async_last_wqe_events
, "async_last_wqe_events", "The number of last wqe events");
64 plb
.add_u64_counter(l_msgr_rdma_handshake_errors
, "handshake_errors", "The number of handshake errors");
67 plb
.add_u64_counter(l_msgr_rdma_created_queue_pair
, "created_queue_pair", "Active queue pair number");
68 plb
.add_u64_counter(l_msgr_rdma_active_queue_pair
, "active_queue_pair", "Created queue pair number");
70 perf_logger
= plb
.create_perf_counters();
71 cct
->get_perfcounters_collection()->add(perf_logger
);
75 void RDMADispatcher::polling_start()
77 // take lock because listen/connect can happen from different worker threads
78 std::lock_guard l
{lock
};
81 return; // dispatcher thread already running
83 ib
->get_memory_manager()->set_rx_stat_logger(perf_logger
);
85 tx_cc
= ib
->create_comp_channel(cct
);
87 rx_cc
= ib
->create_comp_channel(cct
);
89 tx_cq
= ib
->create_comp_queue(cct
, tx_cc
);
91 rx_cq
= ib
->create_comp_queue(cct
, rx_cc
);
94 t
= std::thread(&RDMADispatcher::polling
, this);
95 ceph_pthread_setname(t
.native_handle(), "rdma-polling");
98 void RDMADispatcher::polling_stop()
101 std::lock_guard l
{lock
};
118 void RDMADispatcher::handle_async_event()
120 ldout(cct
, 30) << __func__
<< dendl
;
122 ibv_async_event async_event
;
123 if (ibv_get_async_event(ib
->get_device()->ctxt
, &async_event
)) {
125 lderr(cct
) << __func__
<< " ibv_get_async_event failed. (errno=" << errno
126 << " " << cpp_strerror(errno
) << ")" << dendl
;
129 perf_logger
->inc(l_msgr_rdma_total_async_events
);
130 ldout(cct
, 1) << __func__
<< "Event : " << ibv_event_type_str(async_event
.event_type
) << dendl
;
132 switch (async_event
.event_type
) {
133 /***********************CQ events********************/
134 case IBV_EVENT_CQ_ERR
:
135 lderr(cct
) << __func__
<< " Fatal Error, effect all QP bound with same CQ, "
136 << " CQ Overflow, dev = " << ib
->get_device()->ctxt
137 << " Need destroy and recreate resource " << dendl
;
139 /***********************QP events********************/
140 case IBV_EVENT_QP_FATAL
:
142 /* Error occurred on a QP and it transitioned to error state */
143 ibv_qp
* ib_qp
= async_event
.element
.qp
;
144 uint32_t qpn
= ib_qp
->qp_num
;
145 QueuePair
* qp
= get_qp(qpn
);
146 lderr(cct
) << __func__
<< " Fatal Error, event associate qp number: " << qpn
147 << " Queue Pair status: " << Infiniband::qp_state_string(qp
->get_state())
148 << " Event : " << ibv_event_type_str(async_event
.event_type
) << dendl
;
151 case IBV_EVENT_QP_LAST_WQE_REACHED
:
154 * 1. The QP bound with SRQ is in IBV_QPS_ERR state & no more WQE on the RQ of the QP
155 * Reason: QP is force switched into Error before posting Beacon WR.
156 * The QP's WRs will be flushed into CQ with IBV_WC_WR_FLUSH_ERR status
157 * For SRQ, only WRs on the QP which is switched into Error status will be flushed.
158 * Handle: Only confirm that qp enter into dead queue pairs
159 * 2. The CQE with error was generated for the last WQE
160 * Handle: output error log
162 perf_logger
->inc(l_msgr_rdma_async_last_wqe_events
);
163 ibv_qp
* ib_qp
= async_event
.element
.qp
;
164 uint32_t qpn
= ib_qp
->qp_num
;
165 std::lock_guard l
{lock
};
166 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(qpn
);
167 QueuePair
* qp
= get_qp_lockless(qpn
);
169 if (qp
&& !qp
->is_dead()) {
170 lderr(cct
) << __func__
<< " QP not dead, event associate qp number: " << qpn
171 << " Queue Pair status: " << Infiniband::qp_state_string(qp
->get_state())
172 << " Event : " << ibv_event_type_str(async_event
.event_type
) << dendl
;
175 ldout(cct
, 20) << __func__
<< " Connection's QP maybe entered into dead status. "
176 << " qp number: " << qpn
<< dendl
;
180 if (!cct
->_conf
->ms_async_rdma_cm
) {
181 enqueue_dead_qp_lockless(qpn
);
187 case IBV_EVENT_QP_REQ_ERR
:
188 /* Invalid Request Local Work Queue Error */
190 case IBV_EVENT_QP_ACCESS_ERR
:
191 /* Local access violation error */
193 case IBV_EVENT_COMM_EST
:
194 /* Communication was established on a QP */
196 case IBV_EVENT_SQ_DRAINED
:
197 /* Send Queue was drained of outstanding messages in progress */
199 case IBV_EVENT_PATH_MIG
:
200 /* A connection has migrated to the alternate path */
202 case IBV_EVENT_PATH_MIG_ERR
:
203 /* A connection failed to migrate to the alternate path */
205 /***********************SRQ events*******************/
206 case IBV_EVENT_SRQ_ERR
:
207 /* Error occurred on an SRQ */
209 case IBV_EVENT_SRQ_LIMIT_REACHED
:
210 /* SRQ limit was reached */
212 /***********************Port events******************/
213 case IBV_EVENT_PORT_ACTIVE
:
214 /* Link became active on a port */
216 case IBV_EVENT_PORT_ERR
:
217 /* Link became unavailable on a port */
219 case IBV_EVENT_LID_CHANGE
:
220 /* LID was changed on a port */
222 case IBV_EVENT_PKEY_CHANGE
:
223 /* P_Key table was changed on a port */
225 case IBV_EVENT_SM_CHANGE
:
226 /* SM was changed on a port */
228 case IBV_EVENT_CLIENT_REREGISTER
:
229 /* SM sent a CLIENT_REREGISTER request to a port */
231 case IBV_EVENT_GID_CHANGE
:
232 /* GID table was changed on a port */
235 /***********************CA events******************/
237 case IBV_EVENT_DEVICE_FATAL
:
238 /* CA is in FATAL state */
239 lderr(cct
) << __func__
<< " ibv_get_async_event: dev = " << ib
->get_device()->ctxt
240 << " evt: " << ibv_event_type_str(async_event
.event_type
) << dendl
;
243 lderr(cct
) << __func__
<< " ibv_get_async_event: dev = " << ib
->get_device()->ctxt
244 << " unknown event: " << async_event
.event_type
<< dendl
;
247 ibv_ack_async_event(&async_event
);
251 void RDMADispatcher::post_chunk_to_pool(Chunk
* chunk
)
253 std::lock_guard l
{lock
};
254 ib
->post_chunk_to_pool(chunk
);
255 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
258 int RDMADispatcher::post_chunks_to_rq(int num
, QueuePair
*qp
)
260 std::lock_guard l
{lock
};
261 return ib
->post_chunks_to_rq(num
, qp
);
264 void RDMADispatcher::polling()
266 static int MAX_COMPLETIONS
= 32;
267 ibv_wc wc
[MAX_COMPLETIONS
];
269 std::map
<RDMAConnectedSocketImpl
*, std::vector
<ibv_wc
> > polled
;
270 std::vector
<ibv_wc
> tx_cqe
;
271 ldout(cct
, 20) << __func__
<< " going to poll tx cq: " << tx_cq
<< " rx cq: " << rx_cq
<< dendl
;
272 uint64_t last_inactive
= Cycles::rdtsc();
273 bool rearmed
= false;
277 int tx_ret
= tx_cq
->poll_cq(MAX_COMPLETIONS
, wc
);
279 ldout(cct
, 20) << __func__
<< " tx completion queue got " << tx_ret
280 << " responses."<< dendl
;
281 handle_tx_event(wc
, tx_ret
);
284 int rx_ret
= rx_cq
->poll_cq(MAX_COMPLETIONS
, wc
);
286 ldout(cct
, 20) << __func__
<< " rx completion queue got " << rx_ret
287 << " responses."<< dendl
;
288 handle_rx_event(wc
, rx_ret
);
291 if (!tx_ret
&& !rx_ret
) {
292 perf_logger
->set(l_msgr_rdma_inflight_tx_chunks
, inflight
);
294 // Clean up dead QPs when rx/tx CQs are in idle. The thing is that
295 // we can destroy QPs even earlier, just when beacon has been received,
296 // but we have two CQs (rx & tx), thus beacon WC can be poped from tx
297 // CQ before other WCs are fully consumed from rx CQ. For safety, we
298 // wait for beacon and then "no-events" from CQs.
300 // Calling size() on vector without locks is totally fine, since we
301 // use it as a hint (accuracy is not important here)
303 if (!dead_queue_pairs
.empty()) {
304 decltype(dead_queue_pairs
) dead_qps
;
306 std::lock_guard l
{lock
};
307 dead_queue_pairs
.swap(dead_qps
);
310 for (auto& qp
: dead_qps
) {
311 perf_logger
->dec(l_msgr_rdma_active_queue_pair
);
312 ldout(cct
, 10) << __func__
<< " finally delete qp = " << qp
<< dendl
;
317 if (!num_qp_conn
&& done
&& dead_queue_pairs
.empty())
320 uint64_t now
= Cycles::rdtsc();
321 if (Cycles::to_microseconds(now
- last_inactive
) > cct
->_conf
->ms_async_rdma_polling_us
) {
322 handle_async_event();
324 // Clean up cq events after rearm notify ensure no new incoming event
325 // arrived between polling and rearm
326 tx_cq
->rearm_notify();
327 rx_cq
->rearm_notify();
332 struct pollfd channel_poll
[2];
333 channel_poll
[0].fd
= tx_cc
->get_fd();
334 channel_poll
[0].events
= POLLIN
;
335 channel_poll
[0].revents
= 0;
336 channel_poll
[1].fd
= rx_cc
->get_fd();
337 channel_poll
[1].events
= POLLIN
;
338 channel_poll
[1].revents
= 0;
340 perf_logger
->set(l_msgr_rdma_polling
, 0);
341 while (!done
&& r
== 0) {
342 r
= TEMP_FAILURE_RETRY(poll(channel_poll
, 2, 100));
345 lderr(cct
) << __func__
<< " poll failed " << r
<< dendl
;
349 if (r
> 0 && tx_cc
->get_cq_event())
350 ldout(cct
, 20) << __func__
<< " got tx cq event." << dendl
;
351 if (r
> 0 && rx_cc
->get_cq_event())
352 ldout(cct
, 20) << __func__
<< " got rx cq event." << dendl
;
353 last_inactive
= Cycles::rdtsc();
354 perf_logger
->set(l_msgr_rdma_polling
, 1);
361 void RDMADispatcher::notify_pending_workers() {
362 if (num_pending_workers
) {
363 RDMAWorker
*w
= nullptr;
365 std::lock_guard l
{w_lock
};
366 if (!pending_workers
.empty()) {
367 w
= pending_workers
.front();
368 pending_workers
.pop_front();
369 --num_pending_workers
;
377 void RDMADispatcher::register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
)
379 std::lock_guard l
{lock
};
380 ceph_assert(!qp_conns
.count(qp
->get_local_qp_number()));
381 qp_conns
[qp
->get_local_qp_number()] = std::make_pair(qp
, csi
);
385 RDMAConnectedSocketImpl
* RDMADispatcher::get_conn_lockless(uint32_t qp
)
387 auto it
= qp_conns
.find(qp
);
388 if (it
== qp_conns
.end())
390 if (it
->second
.first
->is_dead())
392 return it
->second
.second
;
395 Infiniband::QueuePair
* RDMADispatcher::get_qp_lockless(uint32_t qp
)
397 // Try to find the QP in qp_conns firstly.
398 auto it
= qp_conns
.find(qp
);
399 if (it
!= qp_conns
.end())
400 return it
->second
.first
;
402 // Try again in dead_queue_pairs.
403 for (auto &i
: dead_queue_pairs
)
404 if (i
->get_local_qp_number() == qp
)
410 Infiniband::QueuePair
* RDMADispatcher::get_qp(uint32_t qp
)
412 std::lock_guard l
{lock
};
413 return get_qp_lockless(qp
);
416 void RDMADispatcher::enqueue_dead_qp_lockless(uint32_t qpn
)
418 auto it
= qp_conns
.find(qpn
);
419 if (it
== qp_conns
.end()) {
420 lderr(cct
) << __func__
<< " QP [" << qpn
<< "] is not registered." << dendl
;
423 QueuePair
*qp
= it
->second
.first
;
424 dead_queue_pairs
.push_back(qp
);
429 void RDMADispatcher::enqueue_dead_qp(uint32_t qpn
)
431 std::lock_guard l
{lock
};
432 enqueue_dead_qp_lockless(qpn
);
435 void RDMADispatcher::schedule_qp_destroy(uint32_t qpn
)
437 std::lock_guard l
{lock
};
438 auto it
= qp_conns
.find(qpn
);
439 if (it
== qp_conns
.end()) {
440 lderr(cct
) << __func__
<< " QP [" << qpn
<< "] is not registered." << dendl
;
443 QueuePair
*qp
= it
->second
.first
;
446 // Failed to switch to dead. This is abnormal, but we can't
447 // do anything, so just destroy QP.
449 dead_queue_pairs
.push_back(qp
);
454 // Successfully switched to dead, thus keep entry in the map.
455 // But only zero out socked pointer in order to return null from
456 // get_conn_lockless();
457 it
->second
.second
= nullptr;
461 void RDMADispatcher::handle_tx_event(ibv_wc
*cqe
, int n
)
463 std::vector
<Chunk
*> tx_chunks
;
465 for (int i
= 0; i
< n
; ++i
) {
466 ibv_wc
* response
= &cqe
[i
];
468 // If it's beacon WR, enqueue the QP to be destroyed later
469 if (response
->wr_id
== BEACON_WRID
) {
470 enqueue_dead_qp(response
->qp_num
);
474 ldout(cct
, 20) << __func__
<< " QP number: " << response
->qp_num
<< " len: " << response
->byte_len
475 << " status: " << ib
->wc_status_to_string(response
->status
) << dendl
;
477 if (response
->status
!= IBV_WC_SUCCESS
) {
478 switch(response
->status
) {
479 case IBV_WC_RETRY_EXC_ERR
:
481 perf_logger
->inc(l_msgr_rdma_tx_wc_retry_errors
);
483 ldout(cct
, 1) << __func__
<< " Responder ACK timeout, possible disconnect, or Remote QP in bad state "
484 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
485 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
486 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
488 std::lock_guard l
{lock
};
489 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
491 ldout(cct
, 1) << __func__
<< " SQ WR return error, remote Queue Pair, qp number: "
492 << conn
->get_peer_qpn() << dendl
;
496 case IBV_WC_WR_FLUSH_ERR
:
498 perf_logger
->inc(l_msgr_rdma_tx_wc_wr_flush_errors
);
500 std::lock_guard l
{lock
};
501 QueuePair
*qp
= get_qp_lockless(response
->qp_num
);
503 ldout(cct
, 20) << __func__
<< " qp state is " << Infiniband::qp_state_string(qp
->get_state()) << dendl
;
505 if (qp
&& qp
->is_dead()) {
506 ldout(cct
, 20) << __func__
<< " outstanding SQ WR is flushed into CQ since QueuePair is dead " << dendl
;
508 lderr(cct
) << __func__
<< " Invalid/Unsupported request to consume outstanding SQ WR,"
509 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
510 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
511 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
513 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
515 ldout(cct
, 1) << __func__
<< " SQ WR return error, remote Queue Pair, qp number: "
516 << conn
->get_peer_qpn() << dendl
;
524 lderr(cct
) << __func__
<< " SQ WR return error,"
525 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
526 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
527 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
529 std::lock_guard l
{lock
};
530 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
531 if (conn
&& conn
->is_connected()) {
532 ldout(cct
, 20) << __func__
<< " SQ WR return error Queue Pair error state is : " << conn
->get_qp_state()
533 << " remote Queue Pair, qp number: " << conn
->get_peer_qpn() << dendl
;
536 ldout(cct
, 1) << __func__
<< " Disconnected, qp_num = " << response
->qp_num
<< " discard event" << dendl
;
543 auto chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
544 //TX completion may come either from
545 // 1) regular send message, WCE wr_id points to chunk
546 // 2) 'fin' message, wr_id points to the QP
547 if (ib
->get_memory_manager()->is_valid_chunk(chunk
)) {
548 tx_chunks
.push_back(chunk
);
549 } else if (reinterpret_cast<QueuePair
*>(response
->wr_id
)->get_local_qp_number() == response
->qp_num
) {
550 ldout(cct
, 1) << __func__
<< " sending of the disconnect msg completed" << dendl
;
552 ldout(cct
, 1) << __func__
<< " not tx buffer, chunk " << chunk
<< dendl
;
557 perf_logger
->inc(l_msgr_rdma_tx_total_wc
, n
);
558 post_tx_buffer(tx_chunks
);
562 * Add the given Chunks to the given free queue.
565 * The Chunks to enqueue.
567 * 0 if success or -1 for failure
569 void RDMADispatcher::post_tx_buffer(std::vector
<Chunk
*> &chunks
)
574 inflight
-= chunks
.size();
575 ib
->get_memory_manager()->return_tx(chunks
);
576 ldout(cct
, 30) << __func__
<< " release " << chunks
.size()
577 << " chunks, inflight " << inflight
<< dendl
;
578 notify_pending_workers();
581 void RDMADispatcher::handle_rx_event(ibv_wc
*cqe
, int rx_number
)
583 perf_logger
->inc(l_msgr_rdma_rx_total_wc
, rx_number
);
584 perf_logger
->inc(l_msgr_rdma_rx_bufs_in_use
, rx_number
);
586 std::map
<RDMAConnectedSocketImpl
*, std::vector
<ibv_wc
> > polled
;
587 std::lock_guard l
{lock
};//make sure connected socket alive when pass wc
589 for (int i
= 0; i
< rx_number
; ++i
) {
590 ibv_wc
* response
= &cqe
[i
];
591 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
592 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
593 QueuePair
*qp
= get_qp_lockless(response
->qp_num
);
595 switch (response
->status
) {
597 ceph_assert(response
->opcode
== IBV_WC_RECV
);
599 ldout(cct
, 1) << __func__
<< " csi with qpn " << response
->qp_num
<< " may be dead. chunk 0x"
600 << std::hex
<< chunk
<< " will be back." << std::dec
<< dendl
;
601 ib
->post_chunk_to_pool(chunk
);
602 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
604 conn
->post_chunks_to_rq(1);
605 polled
[conn
].push_back(*response
);
607 if (qp
!= nullptr && !qp
->get_srq()) {
608 qp
->remove_rq_wr(chunk
);
614 case IBV_WC_WR_FLUSH_ERR
:
615 perf_logger
->inc(l_msgr_rdma_rx_total_wc_errors
);
618 ldout(cct
, 20) << __func__
<< " qp state is " << Infiniband::qp_state_string(qp
->get_state()) << dendl
;
620 if (qp
&& qp
->is_dead()) {
621 ldout(cct
, 20) << __func__
<< " outstanding RQ WR is flushed into CQ since QueuePair is dead " << dendl
;
623 ldout(cct
, 1) << __func__
<< " RQ WR return error,"
624 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
625 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
626 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
628 ldout(cct
, 1) << __func__
<< " RQ WR return error, remote Queue Pair, qp number: "
629 << conn
->get_peer_qpn() << dendl
;
633 ib
->post_chunk_to_pool(chunk
);
634 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
638 perf_logger
->inc(l_msgr_rdma_rx_total_wc_errors
);
640 ldout(cct
, 1) << __func__
<< " RQ WR return error,"
641 << " WCE status(" << response
->status
<< "): " << ib
->wc_status_to_string(response
->status
)
642 << " WCE QP number " << response
->qp_num
<< " Opcode " << response
->opcode
643 << " wr_id: 0x" << std::hex
<< response
->wr_id
<< std::dec
<< dendl
;
644 if (conn
&& conn
->is_connected())
647 ib
->post_chunk_to_pool(chunk
);
648 perf_logger
->dec(l_msgr_rdma_rx_bufs_in_use
);
653 for (auto &i
: polled
)
654 i
.first
->pass_wc(std::move(i
.second
));
658 RDMAWorker::RDMAWorker(CephContext
*c
, unsigned worker_id
)
659 : Worker(c
, worker_id
),
660 tx_handler(new C_handle_cq_tx(this))
662 // initialize perf_logger
664 sprintf(name
, "AsyncMessenger::RDMAWorker-%u", id
);
665 PerfCountersBuilder
plb(cct
, name
, l_msgr_rdma_first
, l_msgr_rdma_last
);
667 plb
.add_u64_counter(l_msgr_rdma_tx_no_mem
, "tx_no_mem", "The count of no tx buffer");
668 plb
.add_u64_counter(l_msgr_rdma_tx_parital_mem
, "tx_parital_mem", "The count of parital tx buffer");
669 plb
.add_u64_counter(l_msgr_rdma_tx_failed
, "tx_failed_post", "The number of tx failed posted");
671 plb
.add_u64_counter(l_msgr_rdma_tx_chunks
, "tx_chunks", "The number of tx chunks transmitted");
672 plb
.add_u64_counter(l_msgr_rdma_tx_bytes
, "tx_bytes", "The bytes of tx chunks transmitted", NULL
, 0, unit_t(UNIT_BYTES
));
673 plb
.add_u64_counter(l_msgr_rdma_rx_chunks
, "rx_chunks", "The number of rx chunks transmitted");
674 plb
.add_u64_counter(l_msgr_rdma_rx_bytes
, "rx_bytes", "The bytes of rx chunks transmitted", NULL
, 0, unit_t(UNIT_BYTES
));
675 plb
.add_u64_counter(l_msgr_rdma_pending_sent_conns
, "pending_sent_conns", "The count of pending sent conns");
677 perf_logger
= plb
.create_perf_counters();
678 cct
->get_perfcounters_collection()->add(perf_logger
);
681 RDMAWorker::~RDMAWorker()
686 void RDMAWorker::initialize()
688 ceph_assert(dispatcher
);
691 int RDMAWorker::listen(entity_addr_t
&sa
, unsigned addr_slot
,
692 const SocketOptions
&opt
,ServerSocket
*sock
)
695 dispatcher
->polling_start();
697 RDMAServerSocketImpl
*p
;
698 if (cct
->_conf
->ms_async_rdma_type
== "iwarp") {
699 p
= new RDMAIWARPServerSocketImpl(cct
, ib
, dispatcher
, this, sa
, addr_slot
);
701 p
= new RDMAServerSocketImpl(cct
, ib
, dispatcher
, this, sa
, addr_slot
);
703 int r
= p
->listen(sa
, opt
);
709 *sock
= ServerSocket(std::unique_ptr
<ServerSocketImpl
>(p
));
713 int RDMAWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
)
716 dispatcher
->polling_start();
718 RDMAConnectedSocketImpl
* p
;
719 if (cct
->_conf
->ms_async_rdma_type
== "iwarp") {
720 p
= new RDMAIWARPConnectedSocketImpl(cct
, ib
, dispatcher
, this);
722 p
= new RDMAConnectedSocketImpl(cct
, ib
, dispatcher
, this);
724 int r
= p
->try_connect(addr
, opts
);
727 ldout(cct
, 1) << __func__
<< " try connecting failed." << dendl
;
731 std::unique_ptr
<RDMAConnectedSocketImpl
> csi(p
);
732 *socket
= ConnectedSocket(std::move(csi
));
736 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
)
738 ceph_assert(center
.in_thread());
739 int r
= ib
->get_tx_buffers(c
, bytes
);
740 size_t got
= ib
->get_memory_manager()->get_tx_buffer_size() * r
;
741 ldout(cct
, 30) << __func__
<< " need " << bytes
<< " bytes, reserve " << got
<< " registered bytes, inflight " << dispatcher
->inflight
<< dendl
;
742 dispatcher
->inflight
+= r
;
747 if (!o
->is_pending()) {
748 pending_sent_conns
.push_back(o
);
749 perf_logger
->inc(l_msgr_rdma_pending_sent_conns
, 1);
752 dispatcher
->make_pending_worker(this);
758 void RDMAWorker::handle_pending_message()
760 ldout(cct
, 20) << __func__
<< " pending conns " << pending_sent_conns
.size() << dendl
;
761 while (!pending_sent_conns
.empty()) {
762 RDMAConnectedSocketImpl
*o
= pending_sent_conns
.front();
763 pending_sent_conns
.pop_front();
764 ssize_t r
= o
->submit(false);
765 ldout(cct
, 20) << __func__
<< " sent pending bl socket=" << o
<< " r=" << r
<< dendl
;
768 pending_sent_conns
.push_back(o
);
769 dispatcher
->make_pending_worker(this);
775 perf_logger
->dec(l_msgr_rdma_pending_sent_conns
, 1);
777 dispatcher
->notify_pending_workers();
780 RDMAStack::RDMAStack(CephContext
*cct
)
781 : NetworkStack(cct
), ib(std::make_shared
<Infiniband
>(cct
)),
782 rdma_dispatcher(std::make_shared
<RDMADispatcher
>(cct
, ib
))
784 ldout(cct
, 20) << __func__
<< " constructing RDMAStack..." << dendl
;
785 ldout(cct
, 20) << " creating RDMAStack:" << this << " with dispatcher:" << rdma_dispatcher
.get() << dendl
;
788 RDMAStack::~RDMAStack()
790 if (cct
->_conf
->ms_async_rdma_enable_hugepage
) {
791 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
795 Worker
* RDMAStack::create_worker(CephContext
*c
, unsigned worker_id
)
797 auto w
= new RDMAWorker(c
, worker_id
);
798 w
->set_dispatcher(rdma_dispatcher
);
803 void RDMAStack::spawn_worker(std::function
<void ()> &&func
)
805 threads
.emplace_back(std::move(func
));
808 void RDMAStack::join_worker(unsigned i
)
810 ceph_assert(threads
.size() > i
&& threads
[i
].joinable());