1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
18 #include <sys/resource.h>
20 #include "include/str_list.h"
21 #include "common/deleter.h"
22 #include "common/Tub.h"
23 #include "RDMAStack.h"
24 #include "RDMAConnTCP.h"
27 #define dout_subsys ceph_subsys_ms
29 #define dout_prefix *_dout << "RDMAStack "
31 static Tub
<Infiniband
> global_infiniband
;
33 RDMADispatcher::~RDMADispatcher()
37 ldout(cct
, 20) << __func__
<< " destructing rdma dispatcher" << dendl
;
39 global_infiniband
->set_dispatcher(nullptr);
41 assert(qp_conns
.empty());
42 assert(num_qp_conn
== 0);
43 assert(dead_queue_pairs
.empty());
44 assert(num_dead_queue_pair
== 0);
47 RDMADispatcher::RDMADispatcher(CephContext
* c
, RDMAStack
* s
)
48 : cct(c
), lock("RDMADispatcher::lock"),
49 w_lock("RDMADispatcher::for worker pending list"), stack(s
)
51 PerfCountersBuilder
plb(cct
, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first
, l_msgr_rdma_dispatcher_last
);
53 plb
.add_u64_counter(l_msgr_rdma_polling
, "polling", "Whether dispatcher thread is polling");
54 plb
.add_u64_counter(l_msgr_rdma_inflight_tx_chunks
, "inflight_tx_chunks", "The number of inflight tx chunks");
55 plb
.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks
, "inqueue_rx_chunks", "The number of inqueue rx chunks");
57 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc
, "tx_total_wc", "The number of tx work comletions");
58 plb
.add_u64_counter(l_msgr_rdma_tx_total_wc_errors
, "tx_total_wc_errors", "The number of tx errors");
59 plb
.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors
, "tx_retry_errors", "The number of tx retry errors");
60 plb
.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors
, "tx_wr_flush_errors", "The number of tx work request flush errors");
62 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc
, "rx_total_wc", "The number of total rx work completion");
63 plb
.add_u64_counter(l_msgr_rdma_rx_total_wc_errors
, "rx_total_wc_errors", "The number of total rx error work completion");
64 plb
.add_u64_counter(l_msgr_rdma_rx_fin
, "rx_fin", "The number of rx finish work request");
66 plb
.add_u64_counter(l_msgr_rdma_total_async_events
, "total_async_events", "The number of async events");
67 plb
.add_u64_counter(l_msgr_rdma_async_last_wqe_events
, "async_last_wqe_events", "The number of last wqe events");
69 plb
.add_u64_counter(l_msgr_rdma_handshake_errors
, "handshake_errors", "The number of handshake errors");
72 plb
.add_u64_counter(l_msgr_rdma_created_queue_pair
, "created_queue_pair", "Active queue pair number");
73 plb
.add_u64_counter(l_msgr_rdma_active_queue_pair
, "active_queue_pair", "Created queue pair number");
75 perf_logger
= plb
.create_perf_counters();
76 cct
->get_perfcounters_collection()->add(perf_logger
);
79 void RDMADispatcher::polling_start()
81 t
= std::thread(&RDMADispatcher::polling
, this);
84 void RDMADispatcher::polling_stop()
93 void RDMADispatcher::process_async_event(Device
*ibdev
, ibv_async_event
&async_event
)
95 perf_logger
->inc(l_msgr_rdma_total_async_events
);
96 // FIXME: Currently we must ensure no other factor make QP in ERROR state,
97 // otherwise this qp can't be deleted in current cleanup flow.
98 if (async_event
.event_type
== IBV_EVENT_QP_LAST_WQE_REACHED
) {
99 perf_logger
->inc(l_msgr_rdma_async_last_wqe_events
);
100 uint64_t qpn
= async_event
.element
.qp
->qp_num
;
101 ldout(cct
, 10) << __func__
<< " event associated qp=" << async_event
.element
.qp
102 << " evt: " << ibv_event_type_str(async_event
.event_type
) << dendl
;
103 Mutex::Locker
l(lock
);
104 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(qpn
);
106 ldout(cct
, 1) << __func__
<< " missing qp_num=" << qpn
<< " discard event" << dendl
;
108 ldout(cct
, 1) << __func__
<< " it's not forwardly stopped by us, reenable=" << conn
<< dendl
;
110 erase_qpn_lockless(qpn
);
113 ldout(cct
, 1) << __func__
<< " ibv_get_async_event: dev=" << *ibdev
114 << " evt: " << ibv_event_type_str(async_event
.event_type
)
119 void RDMADispatcher::polling()
121 static int MAX_COMPLETIONS
= 32;
122 ibv_wc wc
[MAX_COMPLETIONS
];
124 std::map
<RDMAConnectedSocketImpl
*, std::vector
<ibv_wc
> > polled
;
125 std::vector
<ibv_wc
> tx_cqe
;
126 RDMAConnectedSocketImpl
*conn
= nullptr;
127 utime_t last_inactive
= ceph_clock_now();
128 bool rearmed
= false;
134 int tx_ret
= global_infiniband
->poll_tx(MAX_COMPLETIONS
, &ibdev
, wc
);
136 ldout(cct
, 20) << __func__
<< " tx completion queue got " << tx_ret
137 << " responses."<< dendl
;
138 handle_tx_event(ibdev
, wc
, tx_ret
);
141 int rx_ret
= global_infiniband
->poll_rx(MAX_COMPLETIONS
, &ibdev
, wc
);
143 ldout(cct
, 20) << __func__
<< " rx completion queue got " << rx_ret
144 << " responses."<< dendl
;
145 perf_logger
->inc(l_msgr_rdma_rx_total_wc
, rx_ret
);
147 Mutex::Locker
l(lock
);//make sure connected socket alive when pass wc
148 for (int i
= 0; i
< rx_ret
; ++i
) {
149 ibv_wc
* response
= &wc
[i
];
150 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
151 ldout(cct
, 25) << __func__
<< " got chunk=" << chunk
<< " bytes:" << response
->byte_len
<< " opcode:" << response
->opcode
<< dendl
;
153 assert(wc
[i
].opcode
== IBV_WC_RECV
);
155 if (response
->status
== IBV_WC_SUCCESS
) {
156 conn
= get_conn_lockless(response
->qp_num
);
158 assert(ibdev
->is_rx_buffer(chunk
->buffer
));
159 r
= ibdev
->post_chunk(chunk
);
160 ldout(cct
, 1) << __func__
<< " csi with qpn " << response
->qp_num
<< " may be dead. chunk " << chunk
<< " will be back ? " << r
<< dendl
;
163 polled
[conn
].push_back(*response
);
166 perf_logger
->inc(l_msgr_rdma_rx_total_wc_errors
);
167 ldout(cct
, 1) << __func__
<< " work request returned error for buffer(" << chunk
168 << ") status(" << response
->status
<< ":"
169 << Infiniband::wc_status_to_string(response
->status
) << ")" << dendl
;
170 assert(ibdev
->is_rx_buffer(chunk
->buffer
));
171 r
= ibdev
->post_chunk(chunk
);
173 ldout(cct
, 0) << __func__
<< " post chunk failed, error: " << cpp_strerror(r
) << dendl
;
177 conn
= get_conn_lockless(response
->qp_num
);
178 if (conn
&& conn
->is_connected())
183 for (auto &&i
: polled
) {
184 perf_logger
->inc(l_msgr_rdma_inqueue_rx_chunks
, i
.second
.size());
185 i
.first
->pass_wc(std::move(i
.second
));
190 if (!tx_ret
&& !rx_ret
) {
191 // NOTE: Has TX just transitioned to idle? We should do it when idle!
192 // It's now safe to delete queue pairs (see comment by declaration
193 // for dead_queue_pairs).
194 // Additionally, don't delete qp while outstanding_buffers isn't empty,
195 // because we need to check qp's state before sending
196 perf_logger
->set(l_msgr_rdma_inflight_tx_chunks
, inflight
);
197 if (num_dead_queue_pair
) {
198 Mutex::Locker
l(lock
); // FIXME reuse dead qp because creating one qp costs 1 ms
199 while (!dead_queue_pairs
.empty()) {
200 ldout(cct
, 10) << __func__
<< " finally delete qp=" << dead_queue_pairs
.back() << dendl
;
201 delete dead_queue_pairs
.back();
202 perf_logger
->dec(l_msgr_rdma_active_queue_pair
);
203 dead_queue_pairs
.pop_back();
204 --num_dead_queue_pair
;
207 if (!num_qp_conn
&& done
)
210 if ((ceph_clock_now() - last_inactive
).to_nsec() / 1000 > cct
->_conf
->ms_async_rdma_polling_us
) {
212 // Clean up cq events after rearm notify ensure no new incoming event
213 // arrived between polling and rearm
214 global_infiniband
->rearm_notify();
219 perf_logger
->set(l_msgr_rdma_polling
, 0);
221 r
= global_infiniband
->poll_blocking(done
);
223 ldout(cct
, 20) << __func__
<< " got a cq event." << dendl
;
225 last_inactive
= ceph_clock_now();
226 perf_logger
->set(l_msgr_rdma_polling
, 1);
233 void RDMADispatcher::notify_pending_workers() {
234 if (num_pending_workers
) {
235 RDMAWorker
*w
= nullptr;
237 Mutex::Locker
l(w_lock
);
238 if (!pending_workers
.empty()) {
239 w
= pending_workers
.front();
240 pending_workers
.pop_front();
241 --num_pending_workers
;
249 int RDMADispatcher::register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
)
251 int fd
= eventfd(0, EFD_CLOEXEC
|EFD_NONBLOCK
);
253 Mutex::Locker
l(lock
);
254 assert(!qp_conns
.count(qp
->get_local_qp_number()));
255 qp_conns
[qp
->get_local_qp_number()] = std::make_pair(qp
, csi
);
260 RDMAConnectedSocketImpl
* RDMADispatcher::get_conn_lockless(uint32_t qp
)
262 auto it
= qp_conns
.find(qp
);
263 if (it
== qp_conns
.end())
265 if (it
->second
.first
->is_dead())
267 return it
->second
.second
;
270 void RDMADispatcher::erase_qpn_lockless(uint32_t qpn
)
272 auto it
= qp_conns
.find(qpn
);
273 if (it
== qp_conns
.end())
275 ++num_dead_queue_pair
;
276 dead_queue_pairs
.push_back(it
->second
.first
);
281 void RDMADispatcher::erase_qpn(uint32_t qpn
)
283 Mutex::Locker
l(lock
);
284 erase_qpn_lockless(qpn
);
287 void RDMADispatcher::handle_tx_event(Device
*ibdev
, ibv_wc
*cqe
, int n
)
289 std::vector
<Chunk
*> tx_chunks
;
291 for (int i
= 0; i
< n
; ++i
) {
292 ibv_wc
* response
= &cqe
[i
];
293 Chunk
* chunk
= reinterpret_cast<Chunk
*>(response
->wr_id
);
294 ldout(cct
, 25) << __func__
<< " QP: " << response
->qp_num
295 << " len: " << response
->byte_len
<< " , addr:" << chunk
296 << " " << global_infiniband
->wc_status_to_string(response
->status
) << dendl
;
298 if (response
->status
!= IBV_WC_SUCCESS
) {
299 perf_logger
->inc(l_msgr_rdma_tx_total_wc_errors
);
300 if (response
->status
== IBV_WC_RETRY_EXC_ERR
) {
301 ldout(cct
, 1) << __func__
<< " connection between server and client not working. Disconnect this now" << dendl
;
302 perf_logger
->inc(l_msgr_rdma_tx_wc_retry_errors
);
303 } else if (response
->status
== IBV_WC_WR_FLUSH_ERR
) {
304 ldout(cct
, 1) << __func__
<< " Work Request Flushed Error: this connection's qp="
305 << response
->qp_num
<< " should be down while this WR=" << response
->wr_id
306 << " still in flight." << dendl
;
307 perf_logger
->inc(l_msgr_rdma_tx_wc_wr_flush_errors
);
309 ldout(cct
, 1) << __func__
<< " send work request returned error for buffer("
310 << response
->wr_id
<< ") status(" << response
->status
<< "): "
311 << global_infiniband
->wc_status_to_string(response
->status
) << dendl
;
314 Mutex::Locker
l(lock
);//make sure connected socket alive when pass wc
315 RDMAConnectedSocketImpl
*conn
= get_conn_lockless(response
->qp_num
);
317 if (conn
&& conn
->is_connected()) {
318 ldout(cct
, 25) << __func__
<< " qp state is : " << conn
->get_qp_state() << dendl
;//wangzhi
321 ldout(cct
, 1) << __func__
<< " missing qp_num=" << response
->qp_num
<< " discard event" << dendl
;
325 // FIXME: why not tx?
326 if (ibdev
->get_memory_manager()->is_tx_buffer(chunk
->buffer
))
327 tx_chunks
.push_back(chunk
);
329 ldout(cct
, 1) << __func__
<< " not tx buffer, chunk " << chunk
<< dendl
;
332 perf_logger
->inc(l_msgr_rdma_tx_total_wc
, n
);
333 post_tx_buffer(ibdev
, tx_chunks
);
337 * Add the given Chunks to the given free queue.
340 * The Chunks to enqueue.
342 * 0 if success or -1 for failure
344 void RDMADispatcher::post_tx_buffer(Device
*ibdev
, std::vector
<Chunk
*> &chunks
)
349 inflight
-= chunks
.size();
350 ibdev
->get_memory_manager()->return_tx(chunks
);
351 ldout(cct
, 30) << __func__
<< " release " << chunks
.size()
352 << " chunks, inflight " << inflight
<< dendl
;
353 notify_pending_workers();
357 RDMAWorker::RDMAWorker(CephContext
*c
, unsigned i
)
358 : Worker(c
, i
), stack(nullptr),
359 tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
361 // initialize perf_logger
363 sprintf(name
, "AsyncMessenger::RDMAWorker-%u", id
);
364 PerfCountersBuilder
plb(cct
, name
, l_msgr_rdma_first
, l_msgr_rdma_last
);
366 plb
.add_u64_counter(l_msgr_rdma_tx_no_mem
, "tx_no_mem", "The count of no tx buffer");
367 plb
.add_u64_counter(l_msgr_rdma_tx_parital_mem
, "tx_parital_mem", "The count of parital tx buffer");
368 plb
.add_u64_counter(l_msgr_rdma_tx_failed
, "tx_failed_post", "The number of tx failed posted");
369 plb
.add_u64_counter(l_msgr_rdma_rx_no_registered_mem
, "rx_no_registered_mem", "The count of no registered buffer when receiving");
371 plb
.add_u64_counter(l_msgr_rdma_tx_chunks
, "tx_chunks", "The number of tx chunks transmitted");
372 plb
.add_u64_counter(l_msgr_rdma_tx_bytes
, "tx_bytes", "The bytes of tx chunks transmitted");
373 plb
.add_u64_counter(l_msgr_rdma_rx_chunks
, "rx_chunks", "The number of rx chunks transmitted");
374 plb
.add_u64_counter(l_msgr_rdma_rx_bytes
, "rx_bytes", "The bytes of rx chunks transmitted");
376 perf_logger
= plb
.create_perf_counters();
377 cct
->get_perfcounters_collection()->add(perf_logger
);
380 RDMAWorker::~RDMAWorker()
385 void RDMAWorker::initialize()
388 dispatcher
= stack
->get_dispatcher();
392 int RDMAWorker::listen(entity_addr_t
&sa
, const SocketOptions
&opt
,ServerSocket
*sock
)
394 global_infiniband
->init();
396 auto p
= new RDMAServerConnTCP(cct
, global_infiniband
.get(), get_stack()->get_dispatcher(), this, sa
);
397 int r
= p
->listen(sa
, opt
);
403 *sock
= ServerSocket(std::unique_ptr
<ServerSocketImpl
>(p
));
407 int RDMAWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
)
409 global_infiniband
->init();
411 RDMAConnectedSocketImpl
* p
= new RDMAConnectedSocketImpl(cct
, global_infiniband
.get(), get_stack()->get_dispatcher(), this);
412 int r
= p
->try_connect(addr
, opts
);
415 ldout(cct
, 1) << __func__
<< " try connecting failed." << dendl
;
419 std::unique_ptr
<RDMAConnectedSocketImpl
> csi(p
);
420 *socket
= ConnectedSocket(std::move(csi
));
424 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
)
426 Device
*ibdev
= o
->get_device();
428 assert(center
.in_thread());
429 int r
= ibdev
->get_tx_buffers(c
, bytes
);
431 size_t got
= ibdev
->get_memory_manager()->get_tx_buffer_size() * r
;
432 ldout(cct
, 30) << __func__
<< " need " << bytes
<< " bytes, reserve " << got
<< " registered bytes, inflight " << dispatcher
->inflight
<< dendl
;
433 stack
->get_dispatcher()->inflight
+= r
;
438 if (pending_sent_conns
.back() != o
)
439 pending_sent_conns
.push_back(o
);
440 dispatcher
->make_pending_worker(this);
446 void RDMAWorker::handle_pending_message()
448 ldout(cct
, 20) << __func__
<< " pending conns " << pending_sent_conns
.size() << dendl
;
449 std::set
<RDMAConnectedSocketImpl
*> done
;
450 while (!pending_sent_conns
.empty()) {
451 RDMAConnectedSocketImpl
*o
= pending_sent_conns
.front();
452 pending_sent_conns
.pop_front();
453 if (!done
.count(o
)) {
455 ssize_t r
= o
->submit(false);
456 ldout(cct
, 20) << __func__
<< " sent pending bl socket=" << o
<< " r=" << r
<< dendl
;
459 pending_sent_conns
.push_front(o
);
460 dispatcher
->make_pending_worker(this);
468 dispatcher
->notify_pending_workers();
471 RDMAStack::RDMAStack(CephContext
*cct
, const string
&t
): NetworkStack(cct
, t
)
474 //On RDMA MUST be called before fork
476 int rc
= ibv_fork_init();
478 lderr(cct
) << __func__
<< " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl
;
484 getrlimit(RLIMIT_MEMLOCK
, &limit
);
485 if (limit
.rlim_cur
!= RLIM_INFINITY
|| limit
.rlim_max
!= RLIM_INFINITY
) {
486 lderr(cct
) << __func__
<< "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
487 " We recommend setting this parameter to infinity" << dendl
;
490 if (!global_infiniband
)
491 global_infiniband
.construct(cct
);
492 ldout(cct
, 20) << __func__
<< " constructing RDMAStack..." << dendl
;
493 dispatcher
= new RDMADispatcher(cct
, this);
494 global_infiniband
->set_dispatcher(dispatcher
);
496 unsigned num
= get_num_worker();
497 for (unsigned i
= 0; i
< num
; ++i
) {
498 RDMAWorker
* w
= dynamic_cast<RDMAWorker
*>(get_worker(i
));
502 ldout(cct
, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher
<< dendl
;
505 RDMAStack::~RDMAStack()
510 void RDMAStack::spawn_worker(unsigned i
, std::function
<void ()> &&func
)
513 threads
[i
] = std::thread(func
);
516 void RDMAStack::join_worker(unsigned i
)
518 assert(threads
.size() > i
&& threads
[i
].joinable());