]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifndef CEPH_MSG_RDMASTACK_H | |
18 | #define CEPH_MSG_RDMASTACK_H | |
19 | ||
20 | #include <sys/eventfd.h> | |
21 | ||
22 | #include <list> | |
23 | #include <vector> | |
24 | #include <thread> | |
25 | ||
26 | #include "common/ceph_context.h" | |
27 | #include "common/debug.h" | |
28 | #include "common/errno.h" | |
29 | #include "msg/async/Stack.h" | |
30 | #include "Infiniband.h" | |
7c673cae FG |
31 | |
32 | class RDMAConnectedSocketImpl; | |
33 | class RDMAServerSocketImpl; | |
34 | class RDMAStack; | |
35 | class RDMAWorker; | |
36 | ||
37 | enum { | |
38 | l_msgr_rdma_dispatcher_first = 94000, | |
39 | ||
40 | l_msgr_rdma_polling, | |
41 | l_msgr_rdma_inflight_tx_chunks, | |
42 | l_msgr_rdma_inqueue_rx_chunks, | |
43 | ||
44 | l_msgr_rdma_tx_total_wc, | |
45 | l_msgr_rdma_tx_total_wc_errors, | |
46 | l_msgr_rdma_tx_wc_retry_errors, | |
47 | l_msgr_rdma_tx_wc_wr_flush_errors, | |
48 | ||
49 | l_msgr_rdma_rx_total_wc, | |
50 | l_msgr_rdma_rx_total_wc_errors, | |
51 | l_msgr_rdma_rx_fin, | |
52 | ||
53 | l_msgr_rdma_handshake_errors, | |
54 | ||
55 | l_msgr_rdma_total_async_events, | |
56 | l_msgr_rdma_async_last_wqe_events, | |
57 | ||
58 | l_msgr_rdma_created_queue_pair, | |
59 | l_msgr_rdma_active_queue_pair, | |
60 | ||
61 | l_msgr_rdma_dispatcher_last, | |
62 | }; | |
63 | ||
64 | ||
65 | class RDMADispatcher { | |
66 | typedef Infiniband::MemoryManager::Chunk Chunk; | |
67 | typedef Infiniband::QueuePair QueuePair; | |
68 | ||
69 | std::thread t; | |
70 | CephContext *cct; | |
31f18b77 FG |
71 | Infiniband::CompletionQueue* tx_cq; |
72 | Infiniband::CompletionQueue* rx_cq; | |
73 | Infiniband::CompletionChannel *tx_cc, *rx_cc; | |
74 | EventCallbackRef async_handler; | |
7c673cae FG |
75 | bool done = false; |
76 | std::atomic<uint64_t> num_dead_queue_pair = {0}; | |
77 | std::atomic<uint64_t> num_qp_conn = {0}; | |
78 | Mutex lock; // protect `qp_conns`, `dead_queue_pairs` | |
79 | // qp_num -> InfRcConnection | |
80 | // The main usage of `qp_conns` is looking up connection by qp_num, | |
81 | // so the lifecycle of element in `qp_conns` is the lifecycle of qp. | |
82 | //// make qp queue into dead state | |
83 | /** | |
84 | * 1. Connection call mark_down | |
85 | * 2. Move the Queue Pair into the Error state(QueuePair::to_dead) | |
86 | * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event) | |
87 | * 4. Wait for CQ to be empty(handle_tx_event) | |
88 | * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event) | |
89 | * | |
90 | * @param qp The qp needed to dead | |
91 | */ | |
92 | ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns; | |
93 | ||
94 | /// if a queue pair is closed when transmit buffers are active | |
95 | /// on it, the transmit buffers never get returned via tx_cq. To | |
96 | /// work around this problem, don't delete queue pairs immediately. Instead, | |
97 | /// save them in this vector and delete them at a safe time, when there are | |
98 | /// no outstanding transmit buffers to be lost. | |
99 | std::vector<QueuePair*> dead_queue_pairs; | |
100 | ||
101 | std::atomic<uint64_t> num_pending_workers = {0}; | |
102 | Mutex w_lock; // protect pending workers | |
103 | // fixme: lockfree | |
104 | std::list<RDMAWorker*> pending_workers; | |
105 | RDMAStack* stack; | |
106 | ||
31f18b77 FG |
107 | class C_handle_cq_async : public EventCallback { |
108 | RDMADispatcher *dispatcher; | |
109 | public: | |
110 | C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {} | |
111 | void do_request(int fd) { | |
112 | // worker->handle_tx_event(); | |
113 | dispatcher->handle_async_event(); | |
114 | } | |
115 | }; | |
116 | ||
7c673cae FG |
117 | public: |
118 | PerfCounters *perf_logger; | |
119 | ||
120 | explicit RDMADispatcher(CephContext* c, RDMAStack* s); | |
121 | virtual ~RDMADispatcher(); | |
31f18b77 | 122 | void handle_async_event(); |
7c673cae FG |
123 | |
124 | void polling_start(); | |
125 | void polling_stop(); | |
126 | void polling(); | |
7c673cae FG |
127 | int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); |
128 | void make_pending_worker(RDMAWorker* w) { | |
129 | Mutex::Locker l(w_lock); | |
224ce89b WB |
130 | auto it = std::find(pending_workers.begin(), pending_workers.end(), w); |
131 | if (it != pending_workers.end()) | |
132 | return; | |
133 | pending_workers.push_back(w); | |
134 | ++num_pending_workers; | |
7c673cae FG |
135 | } |
136 | RDMAStack* get_stack() { return stack; } | |
137 | RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); | |
138 | void erase_qpn_lockless(uint32_t qpn); | |
139 | void erase_qpn(uint32_t qpn); | |
31f18b77 FG |
140 | Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } |
141 | Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } | |
7c673cae | 142 | void notify_pending_workers(); |
31f18b77 FG |
143 | void handle_tx_event(ibv_wc *cqe, int n); |
144 | void post_tx_buffer(std::vector<Chunk*> &chunks); | |
7c673cae FG |
145 | |
146 | std::atomic<uint64_t> inflight = {0}; | |
147 | }; | |
148 | ||
149 | ||
150 | enum { | |
151 | l_msgr_rdma_first = 95000, | |
152 | ||
153 | l_msgr_rdma_tx_no_mem, | |
154 | l_msgr_rdma_tx_parital_mem, | |
155 | l_msgr_rdma_tx_failed, | |
156 | l_msgr_rdma_rx_no_registered_mem, | |
157 | ||
158 | l_msgr_rdma_tx_chunks, | |
159 | l_msgr_rdma_tx_bytes, | |
160 | l_msgr_rdma_rx_chunks, | |
161 | l_msgr_rdma_rx_bytes, | |
224ce89b | 162 | l_msgr_rdma_pending_sent_conns, |
7c673cae FG |
163 | |
164 | l_msgr_rdma_last, | |
165 | }; | |
166 | ||
167 | class RDMAWorker : public Worker { | |
168 | typedef Infiniband::CompletionQueue CompletionQueue; | |
169 | typedef Infiniband::CompletionChannel CompletionChannel; | |
170 | typedef Infiniband::MemoryManager::Chunk Chunk; | |
171 | typedef Infiniband::MemoryManager MemoryManager; | |
172 | typedef std::vector<Chunk*>::iterator ChunkIter; | |
173 | RDMAStack *stack; | |
174 | EventCallbackRef tx_handler; | |
175 | std::list<RDMAConnectedSocketImpl*> pending_sent_conns; | |
176 | RDMADispatcher* dispatcher = nullptr; | |
177 | Mutex lock; | |
178 | ||
179 | class C_handle_cq_tx : public EventCallback { | |
180 | RDMAWorker *worker; | |
181 | public: | |
182 | C_handle_cq_tx(RDMAWorker *w): worker(w) {} | |
183 | void do_request(int fd) { | |
184 | worker->handle_pending_message(); | |
185 | } | |
186 | }; | |
187 | ||
188 | public: | |
189 | PerfCounters *perf_logger; | |
190 | explicit RDMAWorker(CephContext *c, unsigned i); | |
191 | virtual ~RDMAWorker(); | |
192 | virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; | |
193 | virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; | |
194 | virtual void initialize() override; | |
195 | RDMAStack *get_stack() { return stack; } | |
196 | int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes); | |
197 | void remove_pending_conn(RDMAConnectedSocketImpl *o) { | |
198 | assert(center.in_thread()); | |
199 | pending_sent_conns.remove(o); | |
200 | } | |
201 | void handle_pending_message(); | |
202 | void set_stack(RDMAStack *s) { stack = s; } | |
203 | void notify_worker() { | |
204 | center.dispatch_event_external(tx_handler); | |
205 | } | |
206 | }; | |
207 | ||
31f18b77 FG |
208 | class RDMAConnectedSocketImpl : public ConnectedSocketImpl { |
209 | public: | |
210 | typedef Infiniband::MemoryManager::Chunk Chunk; | |
211 | typedef Infiniband::CompletionChannel CompletionChannel; | |
212 | typedef Infiniband::CompletionQueue CompletionQueue; | |
213 | ||
214 | private: | |
215 | CephContext *cct; | |
216 | Infiniband::QueuePair *qp; | |
217 | IBSYNMsg peer_msg; | |
218 | IBSYNMsg my_msg; | |
219 | int connected; | |
220 | int error; | |
221 | Infiniband* infiniband; | |
222 | RDMADispatcher* dispatcher; | |
223 | RDMAWorker* worker; | |
224 | std::vector<Chunk*> buffers; | |
225 | int notify_fd = -1; | |
226 | bufferlist pending_bl; | |
227 | ||
228 | Mutex lock; | |
229 | std::vector<ibv_wc> wc; | |
230 | bool is_server; | |
231 | EventCallbackRef con_handler; | |
232 | int tcp_fd = -1; | |
233 | bool active;// qp is active ? | |
224ce89b | 234 | bool pending; |
31f18b77 FG |
235 | |
236 | void notify(); | |
237 | ssize_t read_buffers(char* buf, size_t len); | |
238 | int post_work_request(std::vector<Chunk*>&); | |
239 | ||
240 | public: | |
241 | RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, | |
242 | RDMAWorker *w); | |
243 | virtual ~RDMAConnectedSocketImpl(); | |
244 | ||
245 | void pass_wc(std::vector<ibv_wc> &&v); | |
246 | void get_wc(std::vector<ibv_wc> &w); | |
247 | virtual int is_connected() override { return connected; } | |
248 | ||
249 | virtual ssize_t read(char* buf, size_t len) override; | |
250 | virtual ssize_t zero_copy_read(bufferptr &data) override; | |
251 | virtual ssize_t send(bufferlist &bl, bool more) override; | |
252 | virtual void shutdown() override; | |
253 | virtual void close() override; | |
254 | virtual int fd() const override { return notify_fd; } | |
255 | void fault(); | |
256 | const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } | |
257 | ssize_t submit(bool more); | |
258 | int activate(); | |
259 | void fin(); | |
260 | void handle_connection(); | |
261 | void cleanup(); | |
262 | void set_accept_fd(int sd); | |
263 | int try_connect(const entity_addr_t&, const SocketOptions &opt); | |
224ce89b WB |
264 | bool is_pending() {return pending;} |
265 | void set_pending(bool val) {pending = val;} | |
31f18b77 FG |
266 | class C_handle_connection : public EventCallback { |
267 | RDMAConnectedSocketImpl *csi; | |
268 | bool active; | |
269 | public: | |
270 | C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {} | |
271 | void do_request(int fd) { | |
272 | if (active) | |
273 | csi->handle_connection(); | |
274 | } | |
275 | void close() { | |
276 | active = false; | |
277 | } | |
278 | }; | |
279 | }; | |
280 | ||
281 | class RDMAServerSocketImpl : public ServerSocketImpl { | |
282 | CephContext *cct; | |
283 | NetHandler net; | |
284 | int server_setup_socket; | |
285 | Infiniband* infiniband; | |
286 | RDMADispatcher *dispatcher; | |
287 | RDMAWorker *worker; | |
288 | entity_addr_t sa; | |
289 | ||
290 | public: | |
291 | RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); | |
292 | ||
293 | int listen(entity_addr_t &sa, const SocketOptions &opt); | |
294 | virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; | |
295 | virtual void abort_accept() override; | |
296 | virtual int fd() const override { return server_setup_socket; } | |
297 | int get_fd() { return server_setup_socket; } | |
298 | }; | |
7c673cae FG |
299 | |
300 | class RDMAStack : public NetworkStack { | |
301 | vector<std::thread> threads; | |
302 | RDMADispatcher *dispatcher; | |
303 | PerfCounters *perf_counter; | |
304 | ||
305 | std::atomic<bool> fork_finished = {false}; | |
306 | ||
307 | public: | |
308 | explicit RDMAStack(CephContext *cct, const string &t); | |
309 | virtual ~RDMAStack(); | |
310 | virtual bool support_zero_copy_read() const override { return false; } | |
311 | virtual bool nonblock_connect_need_writable_event() const { return false; } | |
312 | ||
313 | virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override; | |
314 | virtual void join_worker(unsigned i) override; | |
315 | RDMADispatcher *get_dispatcher() { return dispatcher; } | |
316 | ||
317 | virtual bool is_ready() override { return fork_finished.load(); }; | |
318 | virtual void ready() override { fork_finished = true; }; | |
319 | }; | |
31f18b77 | 320 | |
7c673cae | 321 | #endif |