]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/RDMAStack.h
4f93c157f06670c8a39ee17bc5707845d1802fde
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
19
20 #include <sys/eventfd.h>
21
22 #include <list>
23 #include <vector>
24 #include <thread>
25
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
31
32 class RDMAConnectedSocketImpl;
33 class RDMAServerSocketImpl;
34 class RDMAStack;
35 class RDMAWorker;
36
37 enum {
38 l_msgr_rdma_dispatcher_first = 94000,
39
40 l_msgr_rdma_polling,
41 l_msgr_rdma_inflight_tx_chunks,
42 l_msgr_rdma_inqueue_rx_chunks,
43
44 l_msgr_rdma_tx_total_wc,
45 l_msgr_rdma_tx_total_wc_errors,
46 l_msgr_rdma_tx_wc_retry_errors,
47 l_msgr_rdma_tx_wc_wr_flush_errors,
48
49 l_msgr_rdma_rx_total_wc,
50 l_msgr_rdma_rx_total_wc_errors,
51 l_msgr_rdma_rx_fin,
52
53 l_msgr_rdma_handshake_errors,
54
55 l_msgr_rdma_total_async_events,
56 l_msgr_rdma_async_last_wqe_events,
57
58 l_msgr_rdma_created_queue_pair,
59 l_msgr_rdma_active_queue_pair,
60
61 l_msgr_rdma_dispatcher_last,
62 };
63
64
65 class RDMADispatcher {
66 typedef Infiniband::MemoryManager::Chunk Chunk;
67 typedef Infiniband::QueuePair QueuePair;
68
69 std::thread t;
70 CephContext *cct;
71 Infiniband::CompletionQueue* tx_cq;
72 Infiniband::CompletionQueue* rx_cq;
73 Infiniband::CompletionChannel *tx_cc, *rx_cc;
74 EventCallbackRef async_handler;
75 bool done = false;
76 std::atomic<uint64_t> num_dead_queue_pair = {0};
77 std::atomic<uint64_t> num_qp_conn = {0};
78 Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
79 // qp_num -> InfRcConnection
80 // The main usage of `qp_conns` is looking up connection by qp_num,
81 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
82 //// make qp queue into dead state
83 /**
84 * 1. Connection call mark_down
85 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
86 * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
87 * 4. Wait for CQ to be empty(handle_tx_event)
88 * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
89 *
90 * @param qp The qp needed to dead
91 */
92 ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
93
94 /// if a queue pair is closed when transmit buffers are active
95 /// on it, the transmit buffers never get returned via tx_cq. To
96 /// work around this problem, don't delete queue pairs immediately. Instead,
97 /// save them in this vector and delete them at a safe time, when there are
98 /// no outstanding transmit buffers to be lost.
99 std::vector<QueuePair*> dead_queue_pairs;
100
101 std::atomic<uint64_t> num_pending_workers = {0};
102 Mutex w_lock; // protect pending workers
103 // fixme: lockfree
104 std::list<RDMAWorker*> pending_workers;
105 RDMAStack* stack;
106
107 class C_handle_cq_async : public EventCallback {
108 RDMADispatcher *dispatcher;
109 public:
110 C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
111 void do_request(int fd) {
112 // worker->handle_tx_event();
113 dispatcher->handle_async_event();
114 }
115 };
116
117 public:
118 PerfCounters *perf_logger;
119
120 explicit RDMADispatcher(CephContext* c, RDMAStack* s);
121 virtual ~RDMADispatcher();
122 void handle_async_event();
123
124 void polling_start();
125 void polling_stop();
126 void polling();
127 int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
128 void make_pending_worker(RDMAWorker* w) {
129 Mutex::Locker l(w_lock);
130 if (pending_workers.back() != w) {
131 pending_workers.push_back(w);
132 ++num_pending_workers;
133 }
134 }
135 RDMAStack* get_stack() { return stack; }
136 RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
137 void erase_qpn_lockless(uint32_t qpn);
138 void erase_qpn(uint32_t qpn);
139 Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
140 Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
141 void notify_pending_workers();
142 void handle_tx_event(ibv_wc *cqe, int n);
143 void post_tx_buffer(std::vector<Chunk*> &chunks);
144
145 std::atomic<uint64_t> inflight = {0};
146 };
147
148
149 enum {
150 l_msgr_rdma_first = 95000,
151
152 l_msgr_rdma_tx_no_mem,
153 l_msgr_rdma_tx_parital_mem,
154 l_msgr_rdma_tx_failed,
155 l_msgr_rdma_rx_no_registered_mem,
156
157 l_msgr_rdma_tx_chunks,
158 l_msgr_rdma_tx_bytes,
159 l_msgr_rdma_rx_chunks,
160 l_msgr_rdma_rx_bytes,
161
162 l_msgr_rdma_last,
163 };
164
165 class RDMAWorker : public Worker {
166 typedef Infiniband::CompletionQueue CompletionQueue;
167 typedef Infiniband::CompletionChannel CompletionChannel;
168 typedef Infiniband::MemoryManager::Chunk Chunk;
169 typedef Infiniband::MemoryManager MemoryManager;
170 typedef std::vector<Chunk*>::iterator ChunkIter;
171 RDMAStack *stack;
172 EventCallbackRef tx_handler;
173 std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
174 RDMADispatcher* dispatcher = nullptr;
175 Mutex lock;
176
177 class C_handle_cq_tx : public EventCallback {
178 RDMAWorker *worker;
179 public:
180 C_handle_cq_tx(RDMAWorker *w): worker(w) {}
181 void do_request(int fd) {
182 worker->handle_pending_message();
183 }
184 };
185
186 public:
187 PerfCounters *perf_logger;
188 explicit RDMAWorker(CephContext *c, unsigned i);
189 virtual ~RDMAWorker();
190 virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
191 virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
192 virtual void initialize() override;
193 RDMAStack *get_stack() { return stack; }
194 int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
195 void remove_pending_conn(RDMAConnectedSocketImpl *o) {
196 assert(center.in_thread());
197 pending_sent_conns.remove(o);
198 }
199 void handle_pending_message();
200 void set_stack(RDMAStack *s) { stack = s; }
201 void notify_worker() {
202 center.dispatch_event_external(tx_handler);
203 }
204 };
205
206 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
207 public:
208 typedef Infiniband::MemoryManager::Chunk Chunk;
209 typedef Infiniband::CompletionChannel CompletionChannel;
210 typedef Infiniband::CompletionQueue CompletionQueue;
211
212 private:
213 CephContext *cct;
214 Infiniband::QueuePair *qp;
215 IBSYNMsg peer_msg;
216 IBSYNMsg my_msg;
217 int connected;
218 int error;
219 Infiniband* infiniband;
220 RDMADispatcher* dispatcher;
221 RDMAWorker* worker;
222 std::vector<Chunk*> buffers;
223 int notify_fd = -1;
224 bufferlist pending_bl;
225
226 Mutex lock;
227 std::vector<ibv_wc> wc;
228 bool is_server;
229 EventCallbackRef con_handler;
230 int tcp_fd = -1;
231 bool active;// qp is active ?
232
233 void notify();
234 ssize_t read_buffers(char* buf, size_t len);
235 int post_work_request(std::vector<Chunk*>&);
236
237 public:
238 RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
239 RDMAWorker *w);
240 virtual ~RDMAConnectedSocketImpl();
241
242 void pass_wc(std::vector<ibv_wc> &&v);
243 void get_wc(std::vector<ibv_wc> &w);
244 virtual int is_connected() override { return connected; }
245
246 virtual ssize_t read(char* buf, size_t len) override;
247 virtual ssize_t zero_copy_read(bufferptr &data) override;
248 virtual ssize_t send(bufferlist &bl, bool more) override;
249 virtual void shutdown() override;
250 virtual void close() override;
251 virtual int fd() const override { return notify_fd; }
252 void fault();
253 const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
254 ssize_t submit(bool more);
255 int activate();
256 void fin();
257 void handle_connection();
258 void cleanup();
259 void set_accept_fd(int sd);
260 int try_connect(const entity_addr_t&, const SocketOptions &opt);
261
262 class C_handle_connection : public EventCallback {
263 RDMAConnectedSocketImpl *csi;
264 bool active;
265 public:
266 C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
267 void do_request(int fd) {
268 if (active)
269 csi->handle_connection();
270 }
271 void close() {
272 active = false;
273 }
274 };
275 };
276
277 class RDMAServerSocketImpl : public ServerSocketImpl {
278 CephContext *cct;
279 NetHandler net;
280 int server_setup_socket;
281 Infiniband* infiniband;
282 RDMADispatcher *dispatcher;
283 RDMAWorker *worker;
284 entity_addr_t sa;
285
286 public:
287 RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
288
289 int listen(entity_addr_t &sa, const SocketOptions &opt);
290 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
291 virtual void abort_accept() override;
292 virtual int fd() const override { return server_setup_socket; }
293 int get_fd() { return server_setup_socket; }
294 };
295
296 class RDMAStack : public NetworkStack {
297 vector<std::thread> threads;
298 RDMADispatcher *dispatcher;
299 PerfCounters *perf_counter;
300
301 std::atomic<bool> fork_finished = {false};
302
303 public:
304 explicit RDMAStack(CephContext *cct, const string &t);
305 virtual ~RDMAStack();
306 virtual bool support_zero_copy_read() const override { return false; }
307 virtual bool nonblock_connect_need_writable_event() const { return false; }
308
309 virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
310 virtual void join_worker(unsigned i) override;
311 RDMADispatcher *get_dispatcher() { return dispatcher; }
312
313 virtual bool is_ready() override { return fork_finished.load(); };
314 virtual void ready() override { fork_finished = true; };
315 };
316
317 #endif