]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/RDMAStack.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
19
20 #include <sys/eventfd.h>
21
22 #include <list>
23 #include <vector>
24 #include <thread>
25
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
31
32 class RDMAConnectedSocketImpl;
33 class RDMAServerSocketImpl;
34 class RDMAStack;
35 class RDMAWorker;
36
37 class RDMADispatcher {
38 typedef Infiniband::MemoryManager::Chunk Chunk;
39 typedef Infiniband::QueuePair QueuePair;
40
41 std::thread t;
42 CephContext *cct;
43 Infiniband::CompletionQueue* tx_cq = nullptr;
44 Infiniband::CompletionQueue* rx_cq = nullptr;
45 Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
46 EventCallbackRef async_handler;
47 bool done = false;
48 std::atomic<uint64_t> num_dead_queue_pair = {0};
49 std::atomic<uint64_t> num_qp_conn = {0};
50 Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
51 // qp_num -> InfRcConnection
52 // The main usage of `qp_conns` is looking up connection by qp_num,
53 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
54 //// make qp queue into dead state
55 /**
56 * 1. Connection call mark_down
57 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
58 * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
59 * 4. Wait for CQ to be empty(handle_tx_event)
60 * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
61 *
62 * @param qp The qp needed to dead
63 */
64 ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
65
66 /// if a queue pair is closed when transmit buffers are active
67 /// on it, the transmit buffers never get returned via tx_cq. To
68 /// work around this problem, don't delete queue pairs immediately. Instead,
69 /// save them in this vector and delete them at a safe time, when there are
70 /// no outstanding transmit buffers to be lost.
71 std::vector<QueuePair*> dead_queue_pairs;
72
73 std::atomic<uint64_t> num_pending_workers = {0};
74 Mutex w_lock; // protect pending workers
75 // fixme: lockfree
76 std::list<RDMAWorker*> pending_workers;
77 RDMAStack* stack;
78
79 class C_handle_cq_async : public EventCallback {
80 RDMADispatcher *dispatcher;
81 public:
82 explicit C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
83 void do_request(uint64_t fd) {
84 // worker->handle_tx_event();
85 dispatcher->handle_async_event();
86 }
87 };
88
89 public:
90 PerfCounters *perf_logger;
91
92 explicit RDMADispatcher(CephContext* c, RDMAStack* s);
93 virtual ~RDMADispatcher();
94 void handle_async_event();
95
96 void polling_start();
97 void polling_stop();
98 void polling();
99 void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
100 void make_pending_worker(RDMAWorker* w) {
101 Mutex::Locker l(w_lock);
102 auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
103 if (it != pending_workers.end())
104 return;
105 pending_workers.push_back(w);
106 ++num_pending_workers;
107 }
108 RDMAStack* get_stack() { return stack; }
109 RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
110 QueuePair* get_qp(uint32_t qp);
111 void erase_qpn_lockless(uint32_t qpn);
112 void erase_qpn(uint32_t qpn);
113 Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
114 Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
115 void notify_pending_workers();
116 void handle_tx_event(ibv_wc *cqe, int n);
117 void post_tx_buffer(std::vector<Chunk*> &chunks);
118
119 std::atomic<uint64_t> inflight = {0};
120
121 void post_chunk_to_pool(Chunk* chunk);
122 int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
123 };
124
125 class RDMAWorker : public Worker {
126 typedef Infiniband::CompletionQueue CompletionQueue;
127 typedef Infiniband::CompletionChannel CompletionChannel;
128 typedef Infiniband::MemoryManager::Chunk Chunk;
129 typedef Infiniband::MemoryManager MemoryManager;
130 typedef std::vector<Chunk*>::iterator ChunkIter;
131 RDMAStack *stack;
132 EventCallbackRef tx_handler;
133 std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
134 RDMADispatcher* dispatcher = nullptr;
135 Mutex lock;
136
137 class C_handle_cq_tx : public EventCallback {
138 RDMAWorker *worker;
139 public:
140 explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {}
141 void do_request(uint64_t fd) {
142 worker->handle_pending_message();
143 }
144 };
145
146 public:
147 PerfCounters *perf_logger;
148 explicit RDMAWorker(CephContext *c, unsigned i);
149 virtual ~RDMAWorker();
150 virtual int listen(entity_addr_t &addr,
151 unsigned addr_slot,
152 const SocketOptions &opts, ServerSocket *) override;
153 virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
154 virtual void initialize() override;
155 RDMAStack *get_stack() { return stack; }
156 int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
157 void remove_pending_conn(RDMAConnectedSocketImpl *o) {
158 ceph_assert(center.in_thread());
159 pending_sent_conns.remove(o);
160 }
161 void handle_pending_message();
162 void set_stack(RDMAStack *s) { stack = s; }
163 void notify_worker() {
164 center.dispatch_event_external(tx_handler);
165 }
166 };
167
168 struct RDMACMInfo {
169 RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
170 : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
171 rdma_cm_id *cm_id;
172 rdma_event_channel *cm_channel;
173 uint32_t qp_num;
174 };
175
176 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
177 public:
178 typedef Infiniband::MemoryManager::Chunk Chunk;
179 typedef Infiniband::CompletionChannel CompletionChannel;
180 typedef Infiniband::CompletionQueue CompletionQueue;
181
182 protected:
183 CephContext *cct;
184 Infiniband::QueuePair *qp;
185 IBSYNMsg peer_msg;
186 IBSYNMsg my_msg;
187 int connected;
188 int error;
189 Infiniband* infiniband;
190 RDMADispatcher* dispatcher;
191 RDMAWorker* worker;
192 std::vector<Chunk*> buffers;
193 int notify_fd = -1;
194 bufferlist pending_bl;
195
196 Mutex lock;
197 std::vector<ibv_wc> wc;
198 bool is_server;
199 EventCallbackRef con_handler;
200 int tcp_fd = -1;
201 bool active;// qp is active ?
202 bool pending;
203 int post_backlog = 0;
204
205 void notify();
206 ssize_t read_buffers(char* buf, size_t len);
207 int post_work_request(std::vector<Chunk*>&);
208
209 public:
210 RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
211 RDMAWorker *w);
212 virtual ~RDMAConnectedSocketImpl();
213
214 void pass_wc(std::vector<ibv_wc> &&v);
215 void get_wc(std::vector<ibv_wc> &w);
216 virtual int is_connected() override { return connected; }
217
218 virtual ssize_t read(char* buf, size_t len) override;
219 virtual ssize_t zero_copy_read(bufferptr &data) override;
220 virtual ssize_t send(bufferlist &bl, bool more) override;
221 virtual void shutdown() override;
222 virtual void close() override;
223 virtual int fd() const override { return notify_fd; }
224 virtual int socket_fd() const override { return tcp_fd; }
225 void fault();
226 const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
227 ssize_t submit(bool more);
228 int activate();
229 void fin();
230 void handle_connection();
231 void cleanup();
232 void set_accept_fd(int sd);
233 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
234 bool is_pending() {return pending;}
235 void set_pending(bool val) {pending = val;}
236 void post_chunks_to_rq(int num);
237 void update_post_backlog();
238
239 class C_handle_connection : public EventCallback {
240 RDMAConnectedSocketImpl *csi;
241 bool active;
242 public:
243 explicit C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
244 void do_request(uint64_t fd) {
245 if (active)
246 csi->handle_connection();
247 }
248 void close() {
249 active = false;
250 }
251 };
252 };
253
254 enum RDMA_CM_STATUS {
255 IDLE = 1,
256 RDMA_ID_CREATED,
257 CHANNEL_FD_CREATED,
258 RESOURCE_ALLOCATED,
259 ADDR_RESOLVED,
260 ROUTE_RESOLVED,
261 CONNECTED,
262 DISCONNECTED,
263 ERROR
264 };
265
266 class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
267 public:
268 RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
269 RDMAWorker *w, RDMACMInfo *info = nullptr);
270 ~RDMAIWARPConnectedSocketImpl();
271 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
272 virtual void close() override;
273 virtual void shutdown() override;
274 virtual void handle_cm_connection();
275 uint32_t get_local_qpn() const { return local_qpn; }
276 void activate();
277 int alloc_resource();
278 void close_notify();
279
280 private:
281 rdma_cm_id *cm_id;
282 rdma_event_channel *cm_channel;
283 uint32_t local_qpn;
284 uint32_t remote_qpn;
285 EventCallbackRef cm_con_handler;
286 bool is_server;
287 std::mutex close_mtx;
288 std::condition_variable close_condition;
289 bool closed;
290 RDMA_CM_STATUS status;
291
292
293 class C_handle_cm_connection : public EventCallback {
294 RDMAIWARPConnectedSocketImpl *csi;
295 public:
296 C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
297 void do_request(uint64_t fd) {
298 csi->handle_cm_connection();
299 }
300 };
301 };
302
303 class RDMAServerSocketImpl : public ServerSocketImpl {
304 protected:
305 CephContext *cct;
306 NetHandler net;
307 int server_setup_socket;
308 Infiniband* infiniband;
309 RDMADispatcher *dispatcher;
310 RDMAWorker *worker;
311 entity_addr_t sa;
312
313 public:
314 RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
315 RDMAWorker *w, entity_addr_t& a, unsigned slot);
316
317 virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
318 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
319 virtual void abort_accept() override;
320 virtual int fd() const override { return server_setup_socket; }
321 int get_fd() { return server_setup_socket; }
322 };
323
324 class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
325 public:
326 RDMAIWARPServerSocketImpl(
327 CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w,
328 entity_addr_t& addr, unsigned addr_slot);
329 virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
330 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
331 virtual void abort_accept() override;
332 private:
333 rdma_cm_id *cm_id;
334 rdma_event_channel *cm_channel;
335 };
336
337 class RDMAStack : public NetworkStack {
338 vector<std::thread> threads;
339 PerfCounters *perf_counter;
340 Infiniband ib;
341 RDMADispatcher dispatcher;
342
343 std::atomic<bool> fork_finished = {false};
344
345 public:
346 explicit RDMAStack(CephContext *cct, const string &t);
347 virtual ~RDMAStack();
348 virtual bool support_zero_copy_read() const override { return false; }
349 virtual bool nonblock_connect_need_writable_event() const override { return false; }
350
351 virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
352 virtual void join_worker(unsigned i) override;
353 RDMADispatcher &get_dispatcher() { return dispatcher; }
354 Infiniband &get_infiniband() { return ib; }
355 virtual bool is_ready() override { return fork_finished.load(); };
356 virtual void ready() override { fork_finished = true; };
357 };
358
359
360 #endif