1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include <sys/resource.h>
21 #include "include/str_list.h"
22 #include "common/deleter.h"
23 #include "common/Tub.h"
24 #include "RDMAStack.h"
26 #define dout_subsys ceph_subsys_ms
28 #define dout_prefix *_dout << "RDMAStack "
30 static Tub
<Infiniband
> global_infiniband
;
32 RDMADispatcher::~RDMADispatcher()
36 ldout(cct
, 20) << __func__
<< " destructing rdma dispatcher" << dendl
;
38 assert(qp_conns
.empty());
39 assert(num_qp_conn
== 0);
40 assert(dead_queue_pairs
.empty());
41 assert(num_dead_queue_pair
== 0);
51 global_infiniband
->set_dispatcher(nullptr);
54 RDMADispatcher::RDMADispatcher(CephContext
* c
, RDMAStack
* s
)
55 : cct(c
), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
56 w_lock("RDMADispatcher::for worker pending list"), stack(s
)
58 PerfCountersBuilder
plb(cct
, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first
, l_msgr_rdma_dispatcher_last
);
60 plb
.add_u64_counter(l_msgr_rdma_polling
, "polling", "Whether dispatcher thread is polling");
61 plb
.add_u64_counter(l_msgr_rdma_inflight_tx_chunks
, "inflight_tx_chunks", "The number of inflight tx chunks");
62 plb
.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks
, "inqueue_rx_chunks", "The number of inqueue rx chunks");
64 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc
, "tx_total_wc", "The number of tx work comletions");
65 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc_errors
, "tx_total_wc_errors", "The number of tx errors");
66 plb
.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors
, "tx_retry_errors", "The number of tx retry errors");
67 plb
.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors
, "tx_wr_flush_errors", "The number of tx work request flush errors");
69 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc
, "rx_total_wc", "The number of total rx work completion");
70 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc_errors
, "rx_total_wc_errors", "The number of total rx error work completion");
71 plb
.add_u64_counter(l_msgr_rdma_rx_fin
, "rx_fin", "The number of rx finish work request");
73 plb
.add_u64_counter(l_msgr_rdma_total_async_events
, "total_async_events", "The number of async events");
74 plb
.add_u64_counter(l_msgr_rdma_async_last_wqe_events
, "async_last_wqe_events", "The number of last wqe events");
76 plb
.add_u64_counter(l_msgr_rdma_handshake_errors
, "handshake_errors", "The number of handshake errors");
79 plb
.add_u64_counter(l_msgr_rdma_created_queue_pair
, "created_queue_pair", "Active queue pair number");
80 plb
.add_u64_counter(l_msgr_rdma_active_queue_pair
, "active_queue_pair", "Created queue pair number");
82 perf_logger
= plb
.create_perf_counters();
83 cct
->get_perfcounters_collection()->add(perf_logger
);
86 void RDMADispatcher::polling_start()
88 tx_cc
= global_infiniband
->create_comp_channel(cct
);
90 rx_cc
= global_infiniband
->create_comp_channel(cct
);
92 tx_cq
= global_infiniband
->create_comp_queue(cct
, tx_cc
);
94 rx_cq
= global_infiniband
->create_comp_queue(cct
, rx_cc
);
97 t
= std::thread(&RDMADispatcher::polling
, this);
100 void RDMADispatcher::polling_stop()
106 void RDMADispatcher::handle_async_event()
108 ldout(cct
, 30) << __func__
<< dendl
;
110 ibv_async_event async_event
;
111 if (ibv_get_async_event(global_infiniband
->get_device()->ctxt
, &async_event
)) {
113 lderr(cct
) << __func__
<< " ibv_get_async_event failed. (errno=" << errno
114 << " " << cpp_strerror(errno
) << ")" << dendl
;
117 perf_logger
->inc(l_msgr_rdma_total_async_events
);
118 // FIXME: Currently we must ensure no other factor make QP in ERROR state,
119 // otherwise this qp can't be deleted in current cleanup flow.
120 if (async_event
.event_type
== IBV_EVENT_QP_LAST_WQE_REACHED
) {
121 perf_logger
->inc(l_msgr_rdma_async_last_wqe_events
);
122 uint64_t qpn
= async_event
.element
.qp
->qp_num
;
123 ldout(cct
, 10) << __func__
<< " event associated qp=" << async_event
.element
.qp
124 << " evt: " << ibv_event_type_str(async_event
.event_type
) << dendl
;
125 Mutex::Locker
l(lock
);
126 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(qpn
);
128 ldout(cct
, 1) << __func__
<< " missing qp_num=" << qpn
<< " discard event" << dendl
;
130 ldout(cct
, 1) << __func__
<< " it's not forwardly stopped by us, reenable=" << conn
<< dendl
;
132 erase_qpn_lockless(qpn
);
135 ldout(cct
, 1) << __func__
<< " ibv_get_async_event: dev=" << global_infiniband
->get_device()->ctxt
136 << " evt: " << ibv_event_type_str(async_event
.event_type
)
139 ibv_ack_async_event(&async_event
);
143 void RDMADispatcher::polling()
145 static int MAX_COMPLETIONS
= 32;
146 ibv_wc wc
[MAX_COMPLETIONS
];
148 std::map
<RDMAConnectedSocketImpl
*, std::vector
<ibv_wc
> > polled
;
149 std::vector
<ibv_wc
> tx_cqe
;
150 ldout(cct
, 20) << __func__
<< " going to poll tx cq: " << tx_cq
<< " rx cq: " << rx_cq
<< dendl
;
151 RDMAConnectedSocketImpl
*conn
= nullptr;
152 utime_t last_inactive
= ceph_clock_now();
153 bool rearmed
= false;
157 int tx_ret
= tx_cq
->poll_cq(MAX_COMPLETIONS
, wc
);
159 ldout(cct
, 20) << __func__
<< " tx completion queue got " << tx_ret
160 << " responses."<< dendl
;
161 handle_tx_event(wc
, tx_ret
);
164 int rx_ret
= rx_cq
->poll_cq(MAX_COMPLETIONS
, wc
);
166 ldout(cct
, 20) << __func__
<< " rt completion queue got " << rx_ret
167 << " responses."<< dendl
;
168 perf_logger
->inc(l_msgr_rdma_rx_total_wc
, rx_ret
);
170 Mutex::Locker
l(lock
);//make sure connected socket alive when pass wc
171 for (int i
= 0; i
< rx_ret
; ++i
) {
172 ibv_wc
* response
= &wc
[i
];
173 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
174 ldout(cct
, 25) << __func__
<< " got chunk=" << chunk
<< " bytes:" << response
->byte_len
<< " opcode:" << response
->opcode
<< dendl
;
176 assert(wc
[i
].opcode
== IBV_WC_RECV
);
178 if (response
->status
== IBV_WC_SUCCESS
) {
179 conn
= get_conn_lockless(response
->qp_num
);
181 assert(global_infiniband
->is_rx_buffer(chunk
->buffer
));
182 r
= global_infiniband
->post_chunk(chunk
);
183 ldout(cct
, 1) << __func__
<< " csi with qpn " << response
->qp_num
<< " may be dead. chunk " << chunk
<< " will be back ? " << r
<< dendl
;
186 polled
[conn
].push_back(*response
);
189 perf_logger
->inc(l_msgr_rdma_rx_total_wc_errors
);
190 ldout(cct
, 1) << __func__
<< " work request returned error for buffer(" << chunk
191 << ") status(" << response
->status
<< ":"
192 << global_infiniband
->wc_status_to_string(response
->status
) << ")" << dendl
;
193 assert(global_infiniband
->is_rx_buffer(chunk
->buffer
));
194 r
= global_infiniband
->post_chunk(chunk
);
196 ldout(cct
, 0) << __func__
<< " post chunk failed, error: " << cpp_strerror(r
) << dendl
;
200 conn
= get_conn_lockless(response
->qp_num
);
201 if (conn
&& conn
->is_connected())
206 for (auto &&i
: polled
) {
207 perf_logger
->inc(l_msgr_rdma_inqueue_rx_chunks
, i
.second
.size());
208 i
.first
->pass_wc(std::move(i
.second
));
213 if (!tx_ret
&& !rx_ret
) {
214 // NOTE: Has TX just transitioned to idle? We should do it when idle!
215 // It's now safe to delete queue pairs (see comment by declaration
216 // for dead_queue_pairs).
217 // Additionally, don't delete qp while outstanding_buffers isn't empty,
218 // because we need to check qp's state before sending
219 perf_logger
->set(l_msgr_rdma_inflight_tx_chunks
, inflight
);
220 if (num_dead_queue_pair
) {
221 Mutex::Locker
l(lock
); // FIXME reuse dead qp because creating one qp costs 1 ms
222 while (!dead_queue_pairs
.empty()) {
223 ldout(cct
, 10) << __func__
<< " finally delete qp=" << dead_queue_pairs
.back() << dendl
;
224 delete dead_queue_pairs
.back();
225 perf_logger
->dec(l_msgr_rdma_active_queue_pair
);
226 dead_queue_pairs
.pop_back();
227 --num_dead_queue_pair
;
230 if (!num_qp_conn
&& done
)
233 if ((ceph_clock_now() - last_inactive
).to_nsec() / 1000 > cct
->_conf
->ms_async_rdma_polling_us
) {
234 handle_async_event();
236 // Clean up cq events after rearm notify ensure no new incoming event
237 // arrived between polling and rearm
238 tx_cq
->rearm_notify();
239 rx_cq
->rearm_notify();
244 struct pollfd channel_poll
[2];
245 channel_poll
[0].fd
= tx_cc
->get_fd();
246 channel_poll
[0].events
= POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
;
247 channel_poll
[0].revents
= 0;
248 channel_poll
[1].fd
= rx_cc
->get_fd();
249 channel_poll
[1].events
= POLLIN
| POLLERR
| POLLNVAL
| POLLHUP
;
250 channel_poll
[1].revents
= 0;
252 perf_logger
->set(l_msgr_rdma_polling
, 0);
253 while (!done
&& r
== 0) {
254 r
= poll(channel_poll
, 2, 100);
257 lderr(cct
) << __func__
<< " poll failed " << r
<< dendl
;
261 if (r
> 0 && tx_cc
->get_cq_event())
262 ldout(cct
, 20) << __func__
<< " got tx cq event." << dendl
;
263 if (r
> 0 && rx_cc
->get_cq_event())
264 ldout(cct
, 20) << __func__
<< " got rx cq event." << dendl
;
265 last_inactive
= ceph_clock_now();
266 perf_logger
->set(l_msgr_rdma_polling
, 1);
273 void RDMADispatcher::notify_pending_workers() {
274 if (num_pending_workers
) {
275 RDMAWorker
*w
= nullptr;
277 Mutex::Locker
l(w_lock
);
278 if (!pending_workers
.empty()) {
279 w
= pending_workers
.front();
280 pending_workers
.pop_front();
281 --num_pending_workers
;
289 int RDMADispatcher::register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
)
291 int fd
= eventfd(0, EFD_CLOEXEC
|EFD_NONBLOCK
);
293 Mutex::Locker
l(lock
);
294 assert(!qp_conns
.count(qp
->get_local_qp_number()));
295 qp_conns
[qp
->get_local_qp_number()] = std::make_pair(qp
, csi
);
300 RDMAConnectedSocketImpl
* RDMADispatcher::get_conn_lockless(uint32_t qp
)
302 auto it
= qp_conns
.find(qp
);
303 if (it
== qp_conns
.end())
305 if (it
->second
.first
->is_dead())
307 return it
->second
.second
;
310 void RDMADispatcher::erase_qpn_lockless(uint32_t qpn
)
312 auto it
= qp_conns
.find(qpn
);
313 if (it
== qp_conns
.end())
315 ++num_dead_queue_pair
;
316 dead_queue_pairs
.push_back(it
->second
.first
);
321 void RDMADispatcher::erase_qpn(uint32_t qpn
)
323 Mutex::Locker
l(lock
);
324 erase_qpn_lockless(qpn
);
327 void RDMADispatcher::handle_tx_event(ibv_wc
*cqe
, int n
)
329 std::vector
<Chunk
*> tx_chunks
;
331 for (int i
= 0; i
< n
; ++i
) {
332 ibv_wc
* response
= &cqe
[i
];
333 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
334 ldout(cct
, 25) << __func__
<< " QP: " << response
->qp_num
335 << " len: " << response
->byte_len
<< " , addr:" << chunk
336 << " " << global_infiniband
->wc_status_to_string(response
->status
) << dendl
;
338 if (response
->status
!= IBV_WC_SUCCESS
) {
339 perf_logger
->inc(l_msgr_rdma_tx_total_wc_errors
);
340 if (response
->status
== IBV_WC_RETRY_EXC_ERR
) {
341 ldout(cct
, 1) << __func__
<< " connection between server and client not working. Disconnect this now" << dendl
;
342 perf_logger
->inc(l_msgr_rdma_tx_wc_retry_errors
);
343 } else if (response
->status
== IBV_WC_WR_FLUSH_ERR
) {
344 ldout(cct
, 1) << __func__
<< " Work Request Flushed Error: this connection's qp="
345 << response
->qp_num
<< " should be down while this WR=" << response
->wr_id
346 << " still in flight." << dendl
;
347 perf_logger
->inc(l_msgr_rdma_tx_wc_wr_flush_errors
);
349 ldout(cct
, 1) << __func__
<< " send work request returned error for buffer("
350 << response
->wr_id
<< ") status(" << response
->status
<< "): "
351 << global_infiniband
->wc_status_to_string(response
->status
) << dendl
;
354 Mutex::Locker
l(lock
);//make sure connected socket alive when pass wc
355 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
357 if (conn
&& conn
->is_connected()) {
358 ldout(cct
, 25) << __func__
<< " qp state is : " << conn
->get_qp_state() << dendl
;//wangzhi
361 ldout(cct
, 1) << __func__
<< " missing qp_num=" << response
->qp_num
<< " discard event" << dendl
;
365 //TX completion may come either from regular send message or from 'fin' message.
366 //In the case of 'fin' wr_id points to the QueuePair.
367 if (global_infiniband
->get_memory_manager()->is_tx_buffer(chunk
->buffer
)) {
368 tx_chunks
.push_back(chunk
);
369 } else if (reinterpret_cast<QueuePair
*>(response
->wr_id
)->get_local_qp_number() == response
->qp_num
) {
370 ldout(cct
, 1) << __func__
<< " sending of the disconnect msg completed" << dendl
;
372 ldout(cct
, 1) << __func__
<< " not tx buffer, chunk " << chunk
<< dendl
;
377 perf_logger
->inc(l_msgr_rdma_tx_total_wc
, n
);
378 post_tx_buffer(tx_chunks
);
382 * Add the given Chunks to the given free queue.
385 * The Chunks to enqueue.
387 * 0 if success or -1 for failure
389 void RDMADispatcher::post_tx_buffer(std::vector
<Chunk
*> &chunks
)
394 inflight
-= chunks
.size();
395 global_infiniband
->get_memory_manager()->return_tx(chunks
);
396 ldout(cct
, 30) << __func__
<< " release " << chunks
.size()
397 << " chunks, inflight " << inflight
<< dendl
;
398 notify_pending_workers();
402 RDMAWorker::RDMAWorker(CephContext
*c
, unsigned i
)
403 : Worker(c
, i
), stack(nullptr),
404 tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
406 // initialize perf_logger
408 sprintf(name
, "AsyncMessenger::RDMAWorker-%u", id
);
409 PerfCountersBuilder
plb(cct
, name
, l_msgr_rdma_first
, l_msgr_rdma_last
);
411 plb
.add_u64_counter(l_msgr_rdma_tx_no_mem
, "tx_no_mem", "The count of no tx buffer");
412 plb
.add_u64_counter(l_msgr_rdma_tx_parital_mem
, "tx_parital_mem", "The count of parital tx buffer");
413 plb
.add_u64_counter(l_msgr_rdma_tx_failed
, "tx_failed_post", "The number of tx failed posted");
414 plb
.add_u64_counter(l_msgr_rdma_rx_no_registered_mem
, "rx_no_registered_mem", "The count of no registered buffer when receiving");
416 plb
.add_u64_counter(l_msgr_rdma_tx_chunks
, "tx_chunks", "The number of tx chunks transmitted");
417 plb
.add_u64_counter(l_msgr_rdma_tx_bytes
, "tx_bytes", "The bytes of tx chunks transmitted");
418 plb
.add_u64_counter(l_msgr_rdma_rx_chunks
, "rx_chunks", "The number of rx chunks transmitted");
419 plb
.add_u64_counter(l_msgr_rdma_rx_bytes
, "rx_bytes", "The bytes of rx chunks transmitted");
420 plb
.add_u64_counter(l_msgr_rdma_pending_sent_conns
, "pending_sent_conns", "The count of pending sent conns");
422 perf_logger
= plb
.create_perf_counters();
423 cct
->get_perfcounters_collection()->add(perf_logger
);
426 RDMAWorker::~RDMAWorker()
431 void RDMAWorker::initialize()
434 dispatcher
= stack
->get_dispatcher();
438 int RDMAWorker::listen(entity_addr_t
&sa
, const SocketOptions
&opt
,ServerSocket
*sock
)
440 global_infiniband
->init();
442 auto p
= new RDMAServerSocketImpl(cct
, global_infiniband
.get(), get_stack()->get_dispatcher(), this, sa
);
443 int r
= p
->listen(sa
, opt
);
449 *sock
= ServerSocket(std::unique_ptr
<ServerSocketImpl
>(p
));
453 int RDMAWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
)
455 global_infiniband
->init();
457 RDMAConnectedSocketImpl
* p
= new RDMAConnectedSocketImpl(cct
, global_infiniband
.get(), get_stack()->get_dispatcher(), this);
458 int r
= p
->try_connect(addr
, opts
);
461 ldout(cct
, 1) << __func__
<< " try connecting failed." << dendl
;
465 std::unique_ptr
<RDMAConnectedSocketImpl
> csi(p
);
466 *socket
= ConnectedSocket(std::move(csi
));
470 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
)
472 assert(center
.in_thread());
473 int r
= global_infiniband
->get_tx_buffers(c
, bytes
);
475 size_t got
= global_infiniband
->get_memory_manager()->get_tx_buffer_size() * r
;
476 ldout(cct
, 30) << __func__
<< " need " << bytes
<< " bytes, reserve " << got
<< " registered bytes, inflight " << dispatcher
->inflight
<< dendl
;
477 stack
->get_dispatcher()->inflight
+= r
;
482 if (!o
->is_pending()) {
483 pending_sent_conns
.push_back(o
);
484 perf_logger
->inc(l_msgr_rdma_pending_sent_conns
, 1);
487 dispatcher
->make_pending_worker(this);
493 void RDMAWorker::handle_pending_message()
495 ldout(cct
, 20) << __func__
<< " pending conns " << pending_sent_conns
.size() << dendl
;
496 while (!pending_sent_conns
.empty()) {
497 RDMAConnectedSocketImpl
*o
= pending_sent_conns
.front();
498 pending_sent_conns
.pop_front();
499 ssize_t r
= o
->submit(false);
500 ldout(cct
, 20) << __func__
<< " sent pending bl socket=" << o
<< " r=" << r
<< dendl
;
503 pending_sent_conns
.push_back(o
);
504 dispatcher
->make_pending_worker(this);
510 perf_logger
->dec(l_msgr_rdma_pending_sent_conns
, 1);
512 dispatcher
->notify_pending_workers();
515 RDMAStack::RDMAStack(CephContext
*cct
, const string
&t
): NetworkStack(cct
, t
)
518 //On RDMA MUST be called before fork
521 int rc
= ibv_fork_init();
523 lderr(cct
) << __func__
<< " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl
;
527 ldout(cct
, 1) << __func__
<< " ms_async_rdma_enable_hugepage value is: " << cct
->_conf
->ms_async_rdma_enable_hugepage
<< dendl
;
528 if (cct
->_conf
->ms_async_rdma_enable_hugepage
) {
529 rc
= setenv("RDMAV_HUGEPAGES_SAFE","1",1);
530 ldout(cct
, 1) << __func__
<< " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl
;
532 lderr(cct
) << __func__
<< " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl
;
539 getrlimit(RLIMIT_MEMLOCK
, &limit
);
540 if (limit
.rlim_cur
!= RLIM_INFINITY
|| limit
.rlim_max
!= RLIM_INFINITY
) {
541 lderr(cct
) << __func__
<< "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
542 " We recommend setting this parameter to infinity" << dendl
;
545 if (!global_infiniband
)
546 global_infiniband
.construct(
547 cct
, cct
->_conf
->ms_async_rdma_device_name
, cct
->_conf
->ms_async_rdma_port_num
);
548 ldout(cct
, 20) << __func__
<< " constructing RDMAStack..." << dendl
;
549 dispatcher
= new RDMADispatcher(cct
, this);
550 global_infiniband
->set_dispatcher(dispatcher
);
552 unsigned num
= get_num_worker();
553 for (unsigned i
= 0; i
< num
; ++i
) {
554 RDMAWorker
* w
= dynamic_cast<RDMAWorker
*>(get_worker(i
));
558 ldout(cct
, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher
<< dendl
;
561 RDMAStack::~RDMAStack()
563 if (cct
->_conf
->ms_async_rdma_enable_hugepage
) {
564 unsetenv("RDMAV_HUGEPAGES_SAFE"); //remove env variable on destruction
570 void RDMAStack::spawn_worker(unsigned i
, std::function
<void ()> &&func
)
573 threads
[i
] = std::thread(func
);
576 void RDMAStack::join_worker(unsigned i
)
578 assert(threads
.size() > i
&& threads
[i
].joinable());