1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
20 #include <sys/eventfd.h>
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
32 class RDMAConnectedSocketImpl
;
33 class RDMAServerSocketImpl
;
38 l_msgr_rdma_dispatcher_first
= 94000,
41 l_msgr_rdma_inflight_tx_chunks
,
42 l_msgr_rdma_inqueue_rx_chunks
,
44 l_msgr_rdma_tx_total_wc
,
45 l_msgr_rdma_tx_total_wc_errors
,
46 l_msgr_rdma_tx_wc_retry_errors
,
47 l_msgr_rdma_tx_wc_wr_flush_errors
,
49 l_msgr_rdma_rx_total_wc
,
50 l_msgr_rdma_rx_total_wc_errors
,
53 l_msgr_rdma_handshake_errors
,
55 l_msgr_rdma_total_async_events
,
56 l_msgr_rdma_async_last_wqe_events
,
58 l_msgr_rdma_created_queue_pair
,
59 l_msgr_rdma_active_queue_pair
,
61 l_msgr_rdma_dispatcher_last
,
65 class RDMADispatcher
{
66 typedef Infiniband::MemoryManager::Chunk Chunk
;
67 typedef Infiniband::QueuePair QueuePair
;
71 Infiniband::CompletionQueue
* tx_cq
;
72 Infiniband::CompletionQueue
* rx_cq
;
73 Infiniband::CompletionChannel
*tx_cc
, *rx_cc
;
74 EventCallbackRef async_handler
;
76 std::atomic
<uint64_t> num_dead_queue_pair
= {0};
77 std::atomic
<uint64_t> num_qp_conn
= {0};
78 Mutex lock
; // protect `qp_conns`, `dead_queue_pairs`
79 // qp_num -> InfRcConnection
80 // The main usage of `qp_conns` is looking up connection by qp_num,
81 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
82 //// make qp queue into dead state
84 * 1. Connection call mark_down
85 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
86 * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
87 * 4. Wait for CQ to be empty(handle_tx_event)
88 * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
90 * @param qp The qp needed to dead
92 ceph::unordered_map
<uint32_t, std::pair
<QueuePair
*, RDMAConnectedSocketImpl
*> > qp_conns
;
94 /// if a queue pair is closed when transmit buffers are active
95 /// on it, the transmit buffers never get returned via tx_cq. To
96 /// work around this problem, don't delete queue pairs immediately. Instead,
97 /// save them in this vector and delete them at a safe time, when there are
98 /// no outstanding transmit buffers to be lost.
99 std::vector
<QueuePair
*> dead_queue_pairs
;
101 std::atomic
<uint64_t> num_pending_workers
= {0};
102 Mutex w_lock
; // protect pending workers
104 std::list
<RDMAWorker
*> pending_workers
;
107 class C_handle_cq_async
: public EventCallback
{
108 RDMADispatcher
*dispatcher
;
110 C_handle_cq_async(RDMADispatcher
*w
): dispatcher(w
) {}
111 void do_request(int fd
) {
112 // worker->handle_tx_event();
113 dispatcher
->handle_async_event();
118 PerfCounters
*perf_logger
;
120 explicit RDMADispatcher(CephContext
* c
, RDMAStack
* s
);
121 virtual ~RDMADispatcher();
122 void handle_async_event();
124 void polling_start();
127 int register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
);
128 void make_pending_worker(RDMAWorker
* w
) {
129 Mutex::Locker
l(w_lock
);
130 auto it
= std::find(pending_workers
.begin(), pending_workers
.end(), w
);
131 if (it
!= pending_workers
.end())
133 pending_workers
.push_back(w
);
134 ++num_pending_workers
;
136 RDMAStack
* get_stack() { return stack
; }
137 RDMAConnectedSocketImpl
* get_conn_lockless(uint32_t qp
);
138 void erase_qpn_lockless(uint32_t qpn
);
139 void erase_qpn(uint32_t qpn
);
140 Infiniband::CompletionQueue
* get_tx_cq() const { return tx_cq
; }
141 Infiniband::CompletionQueue
* get_rx_cq() const { return rx_cq
; }
142 void notify_pending_workers();
143 void handle_tx_event(ibv_wc
*cqe
, int n
);
144 void post_tx_buffer(std::vector
<Chunk
*> &chunks
);
146 std::atomic
<uint64_t> inflight
= {0};
151 l_msgr_rdma_first
= 95000,
153 l_msgr_rdma_tx_no_mem
,
154 l_msgr_rdma_tx_parital_mem
,
155 l_msgr_rdma_tx_failed
,
156 l_msgr_rdma_rx_no_registered_mem
,
158 l_msgr_rdma_tx_chunks
,
159 l_msgr_rdma_tx_bytes
,
160 l_msgr_rdma_rx_chunks
,
161 l_msgr_rdma_rx_bytes
,
162 l_msgr_rdma_pending_sent_conns
,
167 class RDMAWorker
: public Worker
{
168 typedef Infiniband::CompletionQueue CompletionQueue
;
169 typedef Infiniband::CompletionChannel CompletionChannel
;
170 typedef Infiniband::MemoryManager::Chunk Chunk
;
171 typedef Infiniband::MemoryManager MemoryManager
;
172 typedef std::vector
<Chunk
*>::iterator ChunkIter
;
174 EventCallbackRef tx_handler
;
175 std::list
<RDMAConnectedSocketImpl
*> pending_sent_conns
;
176 RDMADispatcher
* dispatcher
= nullptr;
179 class C_handle_cq_tx
: public EventCallback
{
182 C_handle_cq_tx(RDMAWorker
*w
): worker(w
) {}
183 void do_request(int fd
) {
184 worker
->handle_pending_message();
189 PerfCounters
*perf_logger
;
190 explicit RDMAWorker(CephContext
*c
, unsigned i
);
191 virtual ~RDMAWorker();
192 virtual int listen(entity_addr_t
&addr
, const SocketOptions
&opts
, ServerSocket
*) override
;
193 virtual int connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
) override
;
194 virtual void initialize() override
;
195 RDMAStack
*get_stack() { return stack
; }
196 int get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
);
197 void remove_pending_conn(RDMAConnectedSocketImpl
*o
) {
198 assert(center
.in_thread());
199 pending_sent_conns
.remove(o
);
201 void handle_pending_message();
202 void set_stack(RDMAStack
*s
) { stack
= s
; }
203 void notify_worker() {
204 center
.dispatch_event_external(tx_handler
);
208 class RDMAConnectedSocketImpl
: public ConnectedSocketImpl
{
210 typedef Infiniband::MemoryManager::Chunk Chunk
;
211 typedef Infiniband::CompletionChannel CompletionChannel
;
212 typedef Infiniband::CompletionQueue CompletionQueue
;
216 Infiniband::QueuePair
*qp
;
221 Infiniband
* infiniband
;
222 RDMADispatcher
* dispatcher
;
224 std::vector
<Chunk
*> buffers
;
226 bufferlist pending_bl
;
229 std::vector
<ibv_wc
> wc
;
231 EventCallbackRef con_handler
;
233 bool active
;// qp is active ?
237 ssize_t
read_buffers(char* buf
, size_t len
);
238 int post_work_request(std::vector
<Chunk
*>&);
241 RDMAConnectedSocketImpl(CephContext
*cct
, Infiniband
* ib
, RDMADispatcher
* s
,
243 virtual ~RDMAConnectedSocketImpl();
245 void pass_wc(std::vector
<ibv_wc
> &&v
);
246 void get_wc(std::vector
<ibv_wc
> &w
);
247 virtual int is_connected() override
{ return connected
; }
249 virtual ssize_t
read(char* buf
, size_t len
) override
;
250 virtual ssize_t
zero_copy_read(bufferptr
&data
) override
;
251 virtual ssize_t
send(bufferlist
&bl
, bool more
) override
;
252 virtual void shutdown() override
;
253 virtual void close() override
;
254 virtual int fd() const override
{ return notify_fd
; }
256 const char* get_qp_state() { return Infiniband::qp_state_string(qp
->get_state()); }
257 ssize_t
submit(bool more
);
260 void handle_connection();
262 void set_accept_fd(int sd
);
263 int try_connect(const entity_addr_t
&, const SocketOptions
&opt
);
264 bool is_pending() {return pending
;}
265 void set_pending(bool val
) {pending
= val
;}
266 class C_handle_connection
: public EventCallback
{
267 RDMAConnectedSocketImpl
*csi
;
270 C_handle_connection(RDMAConnectedSocketImpl
*w
): csi(w
), active(true) {}
271 void do_request(int fd
) {
273 csi
->handle_connection();
281 class RDMAServerSocketImpl
: public ServerSocketImpl
{
284 int server_setup_socket
;
285 Infiniband
* infiniband
;
286 RDMADispatcher
*dispatcher
;
291 RDMAServerSocketImpl(CephContext
*cct
, Infiniband
* i
, RDMADispatcher
*s
, RDMAWorker
*w
, entity_addr_t
& a
);
293 int listen(entity_addr_t
&sa
, const SocketOptions
&opt
);
294 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
295 virtual void abort_accept() override
;
296 virtual int fd() const override
{ return server_setup_socket
; }
297 int get_fd() { return server_setup_socket
; }
300 class RDMAStack
: public NetworkStack
{
301 vector
<std::thread
> threads
;
302 RDMADispatcher
*dispatcher
;
303 PerfCounters
*perf_counter
;
305 std::atomic
<bool> fork_finished
= {false};
308 explicit RDMAStack(CephContext
*cct
, const string
&t
);
309 virtual ~RDMAStack();
310 virtual bool support_zero_copy_read() const override
{ return false; }
311 virtual bool nonblock_connect_need_writable_event() const { return false; }
313 virtual void spawn_worker(unsigned i
, std::function
<void ()> &&func
) override
;
314 virtual void join_worker(unsigned i
) override
;
315 RDMADispatcher
*get_dispatcher() { return dispatcher
; }
317 virtual bool is_ready() override
{ return fork_finished
.load(); };
318 virtual void ready() override
{ fork_finished
= true; };