1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
20 #include <sys/eventfd.h>
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
32 class RDMAConnectedSocketImpl
;
33 class RDMAServerSocketImpl
;
37 class RDMADispatcher
{
38 typedef Infiniband::MemoryManager::Chunk Chunk
;
39 typedef Infiniband::QueuePair QueuePair
;
43 Infiniband::CompletionQueue
* tx_cq
= nullptr;
44 Infiniband::CompletionQueue
* rx_cq
= nullptr;
45 Infiniband::CompletionChannel
*tx_cc
= nullptr, *rx_cc
= nullptr;
46 EventCallbackRef async_handler
;
48 std::atomic
<uint64_t> num_dead_queue_pair
= {0};
49 std::atomic
<uint64_t> num_qp_conn
= {0};
50 Mutex lock
; // protect `qp_conns`, `dead_queue_pairs`
51 // qp_num -> InfRcConnection
52 // The main usage of `qp_conns` is looking up connection by qp_num,
53 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
54 //// make qp queue into dead state
56 * 1. Connection call mark_down
57 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
58 * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
59 * 4. Wait for CQ to be empty(handle_tx_event)
60 * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
62 * @param qp The qp needed to dead
64 ceph::unordered_map
<uint32_t, std::pair
<QueuePair
*, RDMAConnectedSocketImpl
*> > qp_conns
;
66 /// if a queue pair is closed when transmit buffers are active
67 /// on it, the transmit buffers never get returned via tx_cq. To
68 /// work around this problem, don't delete queue pairs immediately. Instead,
69 /// save them in this vector and delete them at a safe time, when there are
70 /// no outstanding transmit buffers to be lost.
71 std::vector
<QueuePair
*> dead_queue_pairs
;
73 std::atomic
<uint64_t> num_pending_workers
= {0};
74 Mutex w_lock
; // protect pending workers
76 std::list
<RDMAWorker
*> pending_workers
;
79 class C_handle_cq_async
: public EventCallback
{
80 RDMADispatcher
*dispatcher
;
82 explicit C_handle_cq_async(RDMADispatcher
*w
): dispatcher(w
) {}
83 void do_request(uint64_t fd
) {
84 // worker->handle_tx_event();
85 dispatcher
->handle_async_event();
90 PerfCounters
*perf_logger
;
92 explicit RDMADispatcher(CephContext
* c
, RDMAStack
* s
);
93 virtual ~RDMADispatcher();
94 void handle_async_event();
99 void register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
);
100 void make_pending_worker(RDMAWorker
* w
) {
101 Mutex::Locker
l(w_lock
);
102 auto it
= std::find(pending_workers
.begin(), pending_workers
.end(), w
);
103 if (it
!= pending_workers
.end())
105 pending_workers
.push_back(w
);
106 ++num_pending_workers
;
108 RDMAStack
* get_stack() { return stack
; }
109 RDMAConnectedSocketImpl
* get_conn_lockless(uint32_t qp
);
110 QueuePair
* get_qp(uint32_t qp
);
111 void erase_qpn_lockless(uint32_t qpn
);
112 void erase_qpn(uint32_t qpn
);
113 Infiniband::CompletionQueue
* get_tx_cq() const { return tx_cq
; }
114 Infiniband::CompletionQueue
* get_rx_cq() const { return rx_cq
; }
115 void notify_pending_workers();
116 void handle_tx_event(ibv_wc
*cqe
, int n
);
117 void post_tx_buffer(std::vector
<Chunk
*> &chunks
);
119 std::atomic
<uint64_t> inflight
= {0};
121 void post_chunk_to_pool(Chunk
* chunk
);
122 int post_chunks_to_rq(int num
, ibv_qp
*qp
=NULL
);
125 class RDMAWorker
: public Worker
{
126 typedef Infiniband::CompletionQueue CompletionQueue
;
127 typedef Infiniband::CompletionChannel CompletionChannel
;
128 typedef Infiniband::MemoryManager::Chunk Chunk
;
129 typedef Infiniband::MemoryManager MemoryManager
;
130 typedef std::vector
<Chunk
*>::iterator ChunkIter
;
132 EventCallbackRef tx_handler
;
133 std::list
<RDMAConnectedSocketImpl
*> pending_sent_conns
;
134 RDMADispatcher
* dispatcher
= nullptr;
137 class C_handle_cq_tx
: public EventCallback
{
140 explicit C_handle_cq_tx(RDMAWorker
*w
): worker(w
) {}
141 void do_request(uint64_t fd
) {
142 worker
->handle_pending_message();
147 PerfCounters
*perf_logger
;
148 explicit RDMAWorker(CephContext
*c
, unsigned i
);
149 virtual ~RDMAWorker();
150 virtual int listen(entity_addr_t
&addr
,
152 const SocketOptions
&opts
, ServerSocket
*) override
;
153 virtual int connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
) override
;
154 virtual void initialize() override
;
155 RDMAStack
*get_stack() { return stack
; }
156 int get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
);
157 void remove_pending_conn(RDMAConnectedSocketImpl
*o
) {
158 ceph_assert(center
.in_thread());
159 pending_sent_conns
.remove(o
);
161 void handle_pending_message();
162 void set_stack(RDMAStack
*s
) { stack
= s
; }
163 void notify_worker() {
164 center
.dispatch_event_external(tx_handler
);
169 RDMACMInfo(rdma_cm_id
*cid
, rdma_event_channel
*cm_channel_
, uint32_t qp_num_
)
170 : cm_id(cid
), cm_channel(cm_channel_
), qp_num(qp_num_
) {}
172 rdma_event_channel
*cm_channel
;
176 class RDMAConnectedSocketImpl
: public ConnectedSocketImpl
{
178 typedef Infiniband::MemoryManager::Chunk Chunk
;
179 typedef Infiniband::CompletionChannel CompletionChannel
;
180 typedef Infiniband::CompletionQueue CompletionQueue
;
184 Infiniband::QueuePair
*qp
;
189 Infiniband
* infiniband
;
190 RDMADispatcher
* dispatcher
;
192 std::vector
<Chunk
*> buffers
;
194 bufferlist pending_bl
;
197 std::vector
<ibv_wc
> wc
;
199 EventCallbackRef con_handler
;
201 bool active
;// qp is active ?
203 int post_backlog
= 0;
206 ssize_t
read_buffers(char* buf
, size_t len
);
207 int post_work_request(std::vector
<Chunk
*>&);
210 RDMAConnectedSocketImpl(CephContext
*cct
, Infiniband
* ib
, RDMADispatcher
* s
,
212 virtual ~RDMAConnectedSocketImpl();
214 void pass_wc(std::vector
<ibv_wc
> &&v
);
215 void get_wc(std::vector
<ibv_wc
> &w
);
216 virtual int is_connected() override
{ return connected
; }
218 virtual ssize_t
read(char* buf
, size_t len
) override
;
219 virtual ssize_t
zero_copy_read(bufferptr
&data
) override
;
220 virtual ssize_t
send(bufferlist
&bl
, bool more
) override
;
221 virtual void shutdown() override
;
222 virtual void close() override
;
223 virtual int fd() const override
{ return notify_fd
; }
224 virtual int socket_fd() const override
{ return tcp_fd
; }
226 const char* get_qp_state() { return Infiniband::qp_state_string(qp
->get_state()); }
227 ssize_t
submit(bool more
);
230 void handle_connection();
232 void set_accept_fd(int sd
);
233 virtual int try_connect(const entity_addr_t
&, const SocketOptions
&opt
);
234 bool is_pending() {return pending
;}
235 void set_pending(bool val
) {pending
= val
;}
236 void post_chunks_to_rq(int num
);
237 void update_post_backlog();
239 class C_handle_connection
: public EventCallback
{
240 RDMAConnectedSocketImpl
*csi
;
243 explicit C_handle_connection(RDMAConnectedSocketImpl
*w
): csi(w
), active(true) {}
244 void do_request(uint64_t fd
) {
246 csi
->handle_connection();
254 enum RDMA_CM_STATUS
{
266 class RDMAIWARPConnectedSocketImpl
: public RDMAConnectedSocketImpl
{
268 RDMAIWARPConnectedSocketImpl(CephContext
*cct
, Infiniband
* ib
, RDMADispatcher
* s
,
269 RDMAWorker
*w
, RDMACMInfo
*info
= nullptr);
270 ~RDMAIWARPConnectedSocketImpl();
271 virtual int try_connect(const entity_addr_t
&, const SocketOptions
&opt
) override
;
272 virtual void close() override
;
273 virtual void shutdown() override
;
274 virtual void handle_cm_connection();
275 uint32_t get_local_qpn() const { return local_qpn
; }
277 int alloc_resource();
282 rdma_event_channel
*cm_channel
;
285 EventCallbackRef cm_con_handler
;
287 std::mutex close_mtx
;
288 std::condition_variable close_condition
;
290 RDMA_CM_STATUS status
;
293 class C_handle_cm_connection
: public EventCallback
{
294 RDMAIWARPConnectedSocketImpl
*csi
;
296 C_handle_cm_connection(RDMAIWARPConnectedSocketImpl
*w
): csi(w
) {}
297 void do_request(uint64_t fd
) {
298 csi
->handle_cm_connection();
303 class RDMAServerSocketImpl
: public ServerSocketImpl
{
307 int server_setup_socket
;
308 Infiniband
* infiniband
;
309 RDMADispatcher
*dispatcher
;
314 RDMAServerSocketImpl(CephContext
*cct
, Infiniband
* i
, RDMADispatcher
*s
,
315 RDMAWorker
*w
, entity_addr_t
& a
, unsigned slot
);
317 virtual int listen(entity_addr_t
&sa
, const SocketOptions
&opt
);
318 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
319 virtual void abort_accept() override
;
320 virtual int fd() const override
{ return server_setup_socket
; }
321 int get_fd() { return server_setup_socket
; }
324 class RDMAIWARPServerSocketImpl
: public RDMAServerSocketImpl
{
326 RDMAIWARPServerSocketImpl(
327 CephContext
*cct
, Infiniband
*i
, RDMADispatcher
*s
, RDMAWorker
*w
,
328 entity_addr_t
& addr
, unsigned addr_slot
);
329 virtual int listen(entity_addr_t
&sa
, const SocketOptions
&opt
) override
;
330 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
331 virtual void abort_accept() override
;
334 rdma_event_channel
*cm_channel
;
337 class RDMAStack
: public NetworkStack
{
338 vector
<std::thread
> threads
;
339 PerfCounters
*perf_counter
;
341 RDMADispatcher dispatcher
;
343 std::atomic
<bool> fork_finished
= {false};
346 explicit RDMAStack(CephContext
*cct
, const string
&t
);
347 virtual ~RDMAStack();
348 virtual bool support_zero_copy_read() const override
{ return false; }
349 virtual bool nonblock_connect_need_writable_event() const override
{ return false; }
351 virtual void spawn_worker(unsigned i
, std::function
<void ()> &&func
) override
;
352 virtual void join_worker(unsigned i
) override
;
353 RDMADispatcher
&get_dispatcher() { return dispatcher
; }
354 Infiniband
&get_infiniband() { return ib
; }
355 virtual bool is_ready() override
{ return fork_finished
.load(); };
356 virtual void ready() override
{ fork_finished
= true; };