1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
20 #include <sys/eventfd.h>
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
32 class RDMAConnectedSocketImpl
;
33 class RDMAServerSocketImpl
;
37 class RDMADispatcher
{
38 typedef Infiniband::MemoryManager::Chunk Chunk
;
39 typedef Infiniband::QueuePair QueuePair
;
43 std::shared_ptr
<Infiniband
> ib
;
44 Infiniband::CompletionQueue
* tx_cq
= nullptr;
45 Infiniband::CompletionQueue
* rx_cq
= nullptr;
46 Infiniband::CompletionChannel
*tx_cc
= nullptr, *rx_cc
= nullptr;
48 std::atomic
<uint64_t> num_qp_conn
= {0};
49 // protect `qp_conns`, `dead_queue_pairs`
50 ceph::mutex lock
= ceph::make_mutex("RDMADispatcher::lock");
51 // qp_num -> InfRcConnection
52 // The main usage of `qp_conns` is looking up connection by qp_num,
53 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
54 //// make qp queue into dead state
56 * 1. Connection call mark_down
57 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
59 * 4. Wait for beacon which indicates queues are drained
60 * 5. Destroy the QP by calling ibv_destroy_qp()
62 * @param qp The qp needed to dead
64 ceph::unordered_map
<uint32_t, std::pair
<QueuePair
*, RDMAConnectedSocketImpl
*> > qp_conns
;
66 /// if a queue pair is closed when transmit buffers are active
67 /// on it, the transmit buffers never get returned via tx_cq. To
68 /// work around this problem, don't delete queue pairs immediately. Instead,
69 /// save them in this vector and delete them at a safe time, when there are
70 /// no outstanding transmit buffers to be lost.
71 std::vector
<QueuePair
*> dead_queue_pairs
;
73 std::atomic
<uint64_t> num_pending_workers
= {0};
74 // protect pending workers
76 ceph::make_mutex("RDMADispatcher::for worker pending list");
78 std::list
<RDMAWorker
*> pending_workers
;
79 void enqueue_dead_qp_lockless(uint32_t qp
);
80 void enqueue_dead_qp(uint32_t qpn
);
83 PerfCounters
*perf_logger
;
85 explicit RDMADispatcher(CephContext
* c
, std::shared_ptr
<Infiniband
>& ib
);
86 virtual ~RDMADispatcher();
87 void handle_async_event();
92 void register_qp(QueuePair
*qp
, RDMAConnectedSocketImpl
* csi
);
93 void make_pending_worker(RDMAWorker
* w
) {
94 std::lock_guard l
{w_lock
};
95 auto it
= std::find(pending_workers
.begin(), pending_workers
.end(), w
);
96 if (it
!= pending_workers
.end())
98 pending_workers
.push_back(w
);
99 ++num_pending_workers
;
101 RDMAConnectedSocketImpl
* get_conn_lockless(uint32_t qp
);
102 QueuePair
* get_qp_lockless(uint32_t qp
);
103 QueuePair
* get_qp(uint32_t qp
);
104 void schedule_qp_destroy(uint32_t qp
);
105 Infiniband::CompletionQueue
* get_tx_cq() const { return tx_cq
; }
106 Infiniband::CompletionQueue
* get_rx_cq() const { return rx_cq
; }
107 void notify_pending_workers();
108 void handle_tx_event(ibv_wc
*cqe
, int n
);
109 void post_tx_buffer(std::vector
<Chunk
*> &chunks
);
110 void handle_rx_event(ibv_wc
*cqe
, int rx_number
);
112 std::atomic
<uint64_t> inflight
= {0};
114 void post_chunk_to_pool(Chunk
* chunk
);
115 int post_chunks_to_rq(int num
, QueuePair
*qp
= nullptr);
118 class RDMAWorker
: public Worker
{
119 typedef Infiniband::CompletionQueue CompletionQueue
;
120 typedef Infiniband::CompletionChannel CompletionChannel
;
121 typedef Infiniband::MemoryManager::Chunk Chunk
;
122 typedef Infiniband::MemoryManager MemoryManager
;
123 typedef std::vector
<Chunk
*>::iterator ChunkIter
;
124 std::shared_ptr
<Infiniband
> ib
;
125 EventCallbackRef tx_handler
;
126 std::list
<RDMAConnectedSocketImpl
*> pending_sent_conns
;
127 std::shared_ptr
<RDMADispatcher
> dispatcher
;
128 ceph::mutex lock
= ceph::make_mutex("RDMAWorker::lock");
130 class C_handle_cq_tx
: public EventCallback
{
133 explicit C_handle_cq_tx(RDMAWorker
*w
): worker(w
) {}
134 void do_request(uint64_t fd
) {
135 worker
->handle_pending_message();
140 PerfCounters
*perf_logger
;
141 explicit RDMAWorker(CephContext
*c
, unsigned i
);
142 virtual ~RDMAWorker();
143 virtual int listen(entity_addr_t
&addr
,
145 const SocketOptions
&opts
, ServerSocket
*) override
;
146 virtual int connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
) override
;
147 virtual void initialize() override
;
148 int get_reged_mem(RDMAConnectedSocketImpl
*o
, std::vector
<Chunk
*> &c
, size_t bytes
);
149 void remove_pending_conn(RDMAConnectedSocketImpl
*o
) {
150 ceph_assert(center
.in_thread());
151 pending_sent_conns
.remove(o
);
153 void handle_pending_message();
154 void set_dispatcher(std::shared_ptr
<RDMADispatcher
>& dispatcher
) { this->dispatcher
= dispatcher
; }
155 void set_ib(std::shared_ptr
<Infiniband
> &ib
) {this->ib
= ib
;}
156 void notify_worker() {
157 center
.dispatch_event_external(tx_handler
);
162 RDMACMInfo(rdma_cm_id
*cid
, rdma_event_channel
*cm_channel_
, uint32_t qp_num_
)
163 : cm_id(cid
), cm_channel(cm_channel_
), qp_num(qp_num_
) {}
165 rdma_event_channel
*cm_channel
;
169 class RDMAConnectedSocketImpl
: public ConnectedSocketImpl
{
171 typedef Infiniband::MemoryManager::Chunk Chunk
;
172 typedef Infiniband::CompletionChannel CompletionChannel
;
173 typedef Infiniband::CompletionQueue CompletionQueue
;
177 Infiniband::QueuePair
*qp
;
178 uint32_t peer_qpn
= 0;
179 uint32_t local_qpn
= 0;
182 std::shared_ptr
<Infiniband
> ib
;
183 std::shared_ptr
<RDMADispatcher
> dispatcher
;
185 std::vector
<Chunk
*> buffers
;
187 ceph::buffer::list pending_bl
;
189 ceph::mutex lock
= ceph::make_mutex("RDMAConnectedSocketImpl::lock");
190 std::vector
<ibv_wc
> wc
;
192 EventCallbackRef read_handler
;
193 EventCallbackRef established_handler
;
195 bool active
;// qp is active ?
197 int post_backlog
= 0;
200 void buffer_prefetch(void);
201 ssize_t
read_buffers(char* buf
, size_t len
);
202 int post_work_request(std::vector
<Chunk
*>&);
203 size_t tx_copy_chunk(std::vector
<Chunk
*> &tx_buffers
, size_t req_copy_len
,
204 decltype(std::cbegin(pending_bl
.buffers()))& start
,
205 const decltype(std::cbegin(pending_bl
.buffers()))& end
);
208 RDMAConnectedSocketImpl(CephContext
*cct
, std::shared_ptr
<Infiniband
>& ib
,
209 std::shared_ptr
<RDMADispatcher
>& rdma_dispatcher
, RDMAWorker
*w
);
210 virtual ~RDMAConnectedSocketImpl();
212 void pass_wc(std::vector
<ibv_wc
> &&v
);
213 void get_wc(std::vector
<ibv_wc
> &w
);
214 virtual int is_connected() override
{ return connected
; }
216 virtual ssize_t
read(char* buf
, size_t len
) override
;
217 virtual ssize_t
send(ceph::buffer::list
&bl
, bool more
) override
;
218 virtual void shutdown() override
;
219 virtual void close() override
;
220 virtual int fd() const override
{ return notify_fd
; }
222 const char* get_qp_state() { return Infiniband::qp_state_string(qp
->get_state()); }
223 uint32_t get_peer_qpn () const { return peer_qpn
; }
224 uint32_t get_local_qpn () const { return local_qpn
; }
225 Infiniband::QueuePair
* get_qp () const { return qp
; }
226 ssize_t
submit(bool more
);
229 void handle_connection();
230 int handle_connection_established(bool need_set_fault
= true);
232 void set_accept_fd(int sd
);
233 virtual int try_connect(const entity_addr_t
&, const SocketOptions
&opt
);
234 bool is_pending() {return pending
;}
235 void set_pending(bool val
) {pending
= val
;}
236 void post_chunks_to_rq(int num
);
237 void update_post_backlog();
240 enum RDMA_CM_STATUS
{
252 class RDMAIWARPConnectedSocketImpl
: public RDMAConnectedSocketImpl
{
254 RDMAIWARPConnectedSocketImpl(CephContext
*cct
, std::shared_ptr
<Infiniband
>& ib
,
255 std::shared_ptr
<RDMADispatcher
>& rdma_dispatcher
,
256 RDMAWorker
*w
, RDMACMInfo
*info
= nullptr);
257 ~RDMAIWARPConnectedSocketImpl();
258 virtual int try_connect(const entity_addr_t
&, const SocketOptions
&opt
) override
;
259 virtual void close() override
;
260 virtual void shutdown() override
;
261 virtual void handle_cm_connection();
263 int alloc_resource();
267 rdma_cm_id
*cm_id
= nullptr;
268 rdma_event_channel
*cm_channel
= nullptr;
269 EventCallbackRef cm_con_handler
;
270 std::mutex close_mtx
;
271 std::condition_variable close_condition
;
273 RDMA_CM_STATUS status
= IDLE
;
276 class C_handle_cm_connection
: public EventCallback
{
277 RDMAIWARPConnectedSocketImpl
*csi
;
279 C_handle_cm_connection(RDMAIWARPConnectedSocketImpl
*w
): csi(w
) {}
280 void do_request(uint64_t fd
) {
281 csi
->handle_cm_connection();
286 class RDMAServerSocketImpl
: public ServerSocketImpl
{
289 ceph::NetHandler net
;
290 int server_setup_socket
;
291 std::shared_ptr
<Infiniband
> ib
;
292 std::shared_ptr
<RDMADispatcher
> dispatcher
;
297 RDMAServerSocketImpl(CephContext
*cct
, std::shared_ptr
<Infiniband
>& ib
,
298 std::shared_ptr
<RDMADispatcher
>& rdma_dispatcher
,
299 RDMAWorker
*w
, entity_addr_t
& a
, unsigned slot
);
301 virtual int listen(entity_addr_t
&sa
, const SocketOptions
&opt
);
302 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
303 virtual void abort_accept() override
;
304 virtual int fd() const override
{ return server_setup_socket
; }
307 class RDMAIWARPServerSocketImpl
: public RDMAServerSocketImpl
{
309 RDMAIWARPServerSocketImpl(
310 CephContext
*cct
, std::shared_ptr
<Infiniband
>& ib
,
311 std::shared_ptr
<RDMADispatcher
>& rdma_dispatcher
,
312 RDMAWorker
* w
, entity_addr_t
& addr
, unsigned addr_slot
);
313 virtual int listen(entity_addr_t
&sa
, const SocketOptions
&opt
) override
;
314 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
315 virtual void abort_accept() override
;
317 rdma_cm_id
*cm_id
= nullptr;
318 rdma_event_channel
*cm_channel
= nullptr;
321 class RDMAStack
: public NetworkStack
{
322 std::vector
<std::thread
> threads
;
323 PerfCounters
*perf_counter
;
324 std::shared_ptr
<Infiniband
> ib
;
325 std::shared_ptr
<RDMADispatcher
> rdma_dispatcher
;
327 std::atomic
<bool> fork_finished
= {false};
329 virtual Worker
* create_worker(CephContext
*c
, unsigned worker_id
) override
;
332 explicit RDMAStack(CephContext
*cct
);
333 virtual ~RDMAStack();
334 virtual bool nonblock_connect_need_writable_event() const override
{ return false; }
336 virtual void spawn_worker(std::function
<void ()> &&func
) override
;
337 virtual void join_worker(unsigned i
) override
;
338 virtual bool is_ready() override
{ return fork_finished
.load(); };
339 virtual void ready() override
{ fork_finished
= true; };