]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #ifndef CEPH_MSG_RDMASTACK_H | |
18 | #define CEPH_MSG_RDMASTACK_H | |
19 | ||
20 | #include <sys/eventfd.h> | |
21 | ||
22 | #include <list> | |
23 | #include <vector> | |
24 | #include <thread> | |
25 | ||
26 | #include "common/ceph_context.h" | |
27 | #include "common/debug.h" | |
28 | #include "common/errno.h" | |
29 | #include "msg/async/Stack.h" | |
30 | #include "Infiniband.h" | |
7c673cae FG |
31 | |
32 | class RDMAConnectedSocketImpl; | |
33 | class RDMAServerSocketImpl; | |
34 | class RDMAStack; | |
35 | class RDMAWorker; | |
36 | ||
7c673cae FG |
37 | class RDMADispatcher { |
38 | typedef Infiniband::MemoryManager::Chunk Chunk; | |
39 | typedef Infiniband::QueuePair QueuePair; | |
40 | ||
41 | std::thread t; | |
42 | CephContext *cct; | |
f67539c2 | 43 | std::shared_ptr<Infiniband> ib; |
11fdf7f2 TL |
44 | Infiniband::CompletionQueue* tx_cq = nullptr; |
45 | Infiniband::CompletionQueue* rx_cq = nullptr; | |
46 | Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr; | |
7c673cae | 47 | bool done = false; |
7c673cae | 48 | std::atomic<uint64_t> num_qp_conn = {0}; |
9f95a23c TL |
49 | // protect `qp_conns`, `dead_queue_pairs` |
50 | ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock"); | |
7c673cae FG |
51 | // qp_num -> InfRcConnection |
52 | // The main usage of `qp_conns` is looking up connection by qp_num, | |
53 | // so the lifecycle of element in `qp_conns` is the lifecycle of qp. | |
54 | //// make qp queue into dead state | |
55 | /** | |
56 | * 1. Connection call mark_down | |
57 | * 2. Move the Queue Pair into the Error state(QueuePair::to_dead) | |
9f95a23c TL |
58 | * 3. Post a beacon |
59 | * 4. Wait for beacon which indicates queues are drained | |
60 | * 5. Destroy the QP by calling ibv_destroy_qp() | |
7c673cae FG |
61 | * |
62 | * @param qp The qp needed to dead | |
63 | */ | |
64 | ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns; | |
65 | ||
66 | /// if a queue pair is closed when transmit buffers are active | |
67 | /// on it, the transmit buffers never get returned via tx_cq. To | |
68 | /// work around this problem, don't delete queue pairs immediately. Instead, | |
69 | /// save them in this vector and delete them at a safe time, when there are | |
70 | /// no outstanding transmit buffers to be lost. | |
71 | std::vector<QueuePair*> dead_queue_pairs; | |
72 | ||
73 | std::atomic<uint64_t> num_pending_workers = {0}; | |
9f95a23c TL |
74 | // protect pending workers |
75 | ceph::mutex w_lock = | |
76 | ceph::make_mutex("RDMADispatcher::for worker pending list"); | |
7c673cae FG |
77 | // fixme: lockfree |
78 | std::list<RDMAWorker*> pending_workers; | |
f67539c2 TL |
79 | void enqueue_dead_qp_lockless(uint32_t qp); |
80 | void enqueue_dead_qp(uint32_t qpn); | |
31f18b77 | 81 | |
7c673cae FG |
82 | public: |
83 | PerfCounters *perf_logger; | |
84 | ||
f67539c2 | 85 | explicit RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib); |
7c673cae | 86 | virtual ~RDMADispatcher(); |
31f18b77 | 87 | void handle_async_event(); |
7c673cae FG |
88 | |
89 | void polling_start(); | |
90 | void polling_stop(); | |
91 | void polling(); | |
11fdf7f2 | 92 | void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); |
7c673cae | 93 | void make_pending_worker(RDMAWorker* w) { |
9f95a23c | 94 | std::lock_guard l{w_lock}; |
224ce89b WB |
95 | auto it = std::find(pending_workers.begin(), pending_workers.end(), w); |
96 | if (it != pending_workers.end()) | |
97 | return; | |
98 | pending_workers.push_back(w); | |
99 | ++num_pending_workers; | |
7c673cae | 100 | } |
7c673cae | 101 | RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); |
9f95a23c | 102 | QueuePair* get_qp_lockless(uint32_t qp); |
11fdf7f2 | 103 | QueuePair* get_qp(uint32_t qp); |
9f95a23c | 104 | void schedule_qp_destroy(uint32_t qp); |
31f18b77 FG |
105 | Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } |
106 | Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } | |
7c673cae | 107 | void notify_pending_workers(); |
31f18b77 FG |
108 | void handle_tx_event(ibv_wc *cqe, int n); |
109 | void post_tx_buffer(std::vector<Chunk*> &chunks); | |
9f95a23c | 110 | void handle_rx_event(ibv_wc *cqe, int rx_number); |
7c673cae FG |
111 | |
112 | std::atomic<uint64_t> inflight = {0}; | |
7c673cae | 113 | |
11fdf7f2 | 114 | void post_chunk_to_pool(Chunk* chunk); |
9f95a23c | 115 | int post_chunks_to_rq(int num, QueuePair *qp = nullptr); |
7c673cae FG |
116 | }; |
117 | ||
118 | class RDMAWorker : public Worker { | |
119 | typedef Infiniband::CompletionQueue CompletionQueue; | |
120 | typedef Infiniband::CompletionChannel CompletionChannel; | |
121 | typedef Infiniband::MemoryManager::Chunk Chunk; | |
122 | typedef Infiniband::MemoryManager MemoryManager; | |
123 | typedef std::vector<Chunk*>::iterator ChunkIter; | |
f67539c2 | 124 | std::shared_ptr<Infiniband> ib; |
7c673cae FG |
125 | EventCallbackRef tx_handler; |
126 | std::list<RDMAConnectedSocketImpl*> pending_sent_conns; | |
f67539c2 | 127 | std::shared_ptr<RDMADispatcher> dispatcher; |
9f95a23c | 128 | ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock"); |
7c673cae FG |
129 | |
130 | class C_handle_cq_tx : public EventCallback { | |
131 | RDMAWorker *worker; | |
132 | public: | |
11fdf7f2 TL |
133 | explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {} |
134 | void do_request(uint64_t fd) { | |
7c673cae FG |
135 | worker->handle_pending_message(); |
136 | } | |
137 | }; | |
138 | ||
139 | public: | |
140 | PerfCounters *perf_logger; | |
141 | explicit RDMAWorker(CephContext *c, unsigned i); | |
142 | virtual ~RDMAWorker(); | |
11fdf7f2 TL |
143 | virtual int listen(entity_addr_t &addr, |
144 | unsigned addr_slot, | |
145 | const SocketOptions &opts, ServerSocket *) override; | |
7c673cae FG |
146 | virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; |
147 | virtual void initialize() override; | |
7c673cae FG |
148 | int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes); |
149 | void remove_pending_conn(RDMAConnectedSocketImpl *o) { | |
11fdf7f2 | 150 | ceph_assert(center.in_thread()); |
7c673cae FG |
151 | pending_sent_conns.remove(o); |
152 | } | |
153 | void handle_pending_message(); | |
f67539c2 TL |
154 | void set_dispatcher(std::shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; } |
155 | void set_ib(std::shared_ptr<Infiniband> &ib) {this->ib = ib;} | |
7c673cae FG |
156 | void notify_worker() { |
157 | center.dispatch_event_external(tx_handler); | |
158 | } | |
159 | }; | |
160 | ||
11fdf7f2 TL |
161 | struct RDMACMInfo { |
162 | RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_) | |
163 | : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {} | |
164 | rdma_cm_id *cm_id; | |
165 | rdma_event_channel *cm_channel; | |
166 | uint32_t qp_num; | |
167 | }; | |
168 | ||
31f18b77 FG |
169 | class RDMAConnectedSocketImpl : public ConnectedSocketImpl { |
170 | public: | |
171 | typedef Infiniband::MemoryManager::Chunk Chunk; | |
172 | typedef Infiniband::CompletionChannel CompletionChannel; | |
173 | typedef Infiniband::CompletionQueue CompletionQueue; | |
174 | ||
11fdf7f2 | 175 | protected: |
31f18b77 FG |
176 | CephContext *cct; |
177 | Infiniband::QueuePair *qp; | |
9f95a23c TL |
178 | uint32_t peer_qpn = 0; |
179 | uint32_t local_qpn = 0; | |
31f18b77 FG |
180 | int connected; |
181 | int error; | |
f67539c2 TL |
182 | std::shared_ptr<Infiniband> ib; |
183 | std::shared_ptr<RDMADispatcher> dispatcher; | |
31f18b77 FG |
184 | RDMAWorker* worker; |
185 | std::vector<Chunk*> buffers; | |
186 | int notify_fd = -1; | |
f67539c2 | 187 | ceph::buffer::list pending_bl; |
31f18b77 | 188 | |
9f95a23c | 189 | ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock"); |
31f18b77 FG |
190 | std::vector<ibv_wc> wc; |
191 | bool is_server; | |
9f95a23c TL |
192 | EventCallbackRef read_handler; |
193 | EventCallbackRef established_handler; | |
31f18b77 FG |
194 | int tcp_fd = -1; |
195 | bool active;// qp is active ? | |
224ce89b | 196 | bool pending; |
11fdf7f2 | 197 | int post_backlog = 0; |
31f18b77 FG |
198 | |
199 | void notify(); | |
9f95a23c | 200 | void buffer_prefetch(void); |
31f18b77 FG |
201 | ssize_t read_buffers(char* buf, size_t len); |
202 | int post_work_request(std::vector<Chunk*>&); | |
9f95a23c TL |
203 | size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len, |
204 | decltype(std::cbegin(pending_bl.buffers()))& start, | |
205 | const decltype(std::cbegin(pending_bl.buffers()))& end); | |
31f18b77 FG |
206 | |
207 | public: | |
f67539c2 TL |
208 | RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, |
209 | std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w); | |
31f18b77 FG |
210 | virtual ~RDMAConnectedSocketImpl(); |
211 | ||
212 | void pass_wc(std::vector<ibv_wc> &&v); | |
213 | void get_wc(std::vector<ibv_wc> &w); | |
214 | virtual int is_connected() override { return connected; } | |
215 | ||
216 | virtual ssize_t read(char* buf, size_t len) override; | |
f67539c2 | 217 | virtual ssize_t send(ceph::buffer::list &bl, bool more) override; |
31f18b77 FG |
218 | virtual void shutdown() override; |
219 | virtual void close() override; | |
220 | virtual int fd() const override { return notify_fd; } | |
1e59de90 | 221 | virtual void set_priority(int sd, int prio, int domain) override; |
31f18b77 FG |
222 | void fault(); |
223 | const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } | |
9f95a23c TL |
224 | uint32_t get_peer_qpn () const { return peer_qpn; } |
225 | uint32_t get_local_qpn () const { return local_qpn; } | |
f67539c2 | 226 | Infiniband::QueuePair* get_qp () const { return qp; } |
31f18b77 FG |
227 | ssize_t submit(bool more); |
228 | int activate(); | |
229 | void fin(); | |
230 | void handle_connection(); | |
9f95a23c | 231 | int handle_connection_established(bool need_set_fault = true); |
31f18b77 FG |
232 | void cleanup(); |
233 | void set_accept_fd(int sd); | |
11fdf7f2 | 234 | virtual int try_connect(const entity_addr_t&, const SocketOptions &opt); |
224ce89b WB |
235 | bool is_pending() {return pending;} |
236 | void set_pending(bool val) {pending = val;} | |
11fdf7f2 TL |
237 | void post_chunks_to_rq(int num); |
238 | void update_post_backlog(); | |
31f18b77 FG |
239 | }; |
240 | ||
11fdf7f2 TL |
241 | enum RDMA_CM_STATUS { |
242 | IDLE = 1, | |
243 | RDMA_ID_CREATED, | |
244 | CHANNEL_FD_CREATED, | |
245 | RESOURCE_ALLOCATED, | |
246 | ADDR_RESOLVED, | |
247 | ROUTE_RESOLVED, | |
248 | CONNECTED, | |
249 | DISCONNECTED, | |
250 | ERROR | |
251 | }; | |
252 | ||
253 | class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl { | |
254 | public: | |
f67539c2 TL |
255 | RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, |
256 | std::shared_ptr<RDMADispatcher>& rdma_dispatcher, | |
257 | RDMAWorker *w, RDMACMInfo *info = nullptr); | |
11fdf7f2 TL |
258 | ~RDMAIWARPConnectedSocketImpl(); |
259 | virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override; | |
260 | virtual void close() override; | |
261 | virtual void shutdown() override; | |
262 | virtual void handle_cm_connection(); | |
11fdf7f2 TL |
263 | void activate(); |
264 | int alloc_resource(); | |
265 | void close_notify(); | |
266 | ||
267 | private: | |
9f95a23c TL |
268 | rdma_cm_id *cm_id = nullptr; |
269 | rdma_event_channel *cm_channel = nullptr; | |
11fdf7f2 | 270 | EventCallbackRef cm_con_handler; |
11fdf7f2 TL |
271 | std::mutex close_mtx; |
272 | std::condition_variable close_condition; | |
9f95a23c TL |
273 | bool closed = false; |
274 | RDMA_CM_STATUS status = IDLE; | |
11fdf7f2 TL |
275 | |
276 | ||
277 | class C_handle_cm_connection : public EventCallback { | |
278 | RDMAIWARPConnectedSocketImpl *csi; | |
279 | public: | |
280 | C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {} | |
281 | void do_request(uint64_t fd) { | |
282 | csi->handle_cm_connection(); | |
283 | } | |
284 | }; | |
285 | }; | |
286 | ||
31f18b77 | 287 | class RDMAServerSocketImpl : public ServerSocketImpl { |
11fdf7f2 TL |
288 | protected: |
289 | CephContext *cct; | |
f67539c2 | 290 | ceph::NetHandler net; |
11fdf7f2 | 291 | int server_setup_socket; |
f67539c2 TL |
292 | std::shared_ptr<Infiniband> ib; |
293 | std::shared_ptr<RDMADispatcher> dispatcher; | |
11fdf7f2 TL |
294 | RDMAWorker *worker; |
295 | entity_addr_t sa; | |
31f18b77 FG |
296 | |
297 | public: | |
f67539c2 TL |
298 | RDMAServerSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib, |
299 | std::shared_ptr<RDMADispatcher>& rdma_dispatcher, | |
11fdf7f2 | 300 | RDMAWorker *w, entity_addr_t& a, unsigned slot); |
31f18b77 | 301 | |
11fdf7f2 | 302 | virtual int listen(entity_addr_t &sa, const SocketOptions &opt); |
31f18b77 FG |
303 | virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; |
304 | virtual void abort_accept() override; | |
305 | virtual int fd() const override { return server_setup_socket; } | |
31f18b77 | 306 | }; |
7c673cae | 307 | |
11fdf7f2 TL |
308 | class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl { |
309 | public: | |
310 | RDMAIWARPServerSocketImpl( | |
f67539c2 TL |
311 | CephContext *cct, std::shared_ptr<Infiniband>& ib, |
312 | std::shared_ptr<RDMADispatcher>& rdma_dispatcher, | |
9f95a23c | 313 | RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot); |
11fdf7f2 TL |
314 | virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override; |
315 | virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; | |
316 | virtual void abort_accept() override; | |
317 | private: | |
9f95a23c TL |
318 | rdma_cm_id *cm_id = nullptr; |
319 | rdma_event_channel *cm_channel = nullptr; | |
11fdf7f2 TL |
320 | }; |
321 | ||
7c673cae | 322 | class RDMAStack : public NetworkStack { |
f67539c2 | 323 | std::vector<std::thread> threads; |
11fdf7f2 | 324 | PerfCounters *perf_counter; |
f67539c2 TL |
325 | std::shared_ptr<Infiniband> ib; |
326 | std::shared_ptr<RDMADispatcher> rdma_dispatcher; | |
7c673cae FG |
327 | |
328 | std::atomic<bool> fork_finished = {false}; | |
329 | ||
20effc67 | 330 | virtual Worker* create_worker(CephContext *c, unsigned worker_id) override; |
f67539c2 | 331 | |
7c673cae | 332 | public: |
f67539c2 | 333 | explicit RDMAStack(CephContext *cct); |
7c673cae | 334 | virtual ~RDMAStack(); |
11fdf7f2 | 335 | virtual bool nonblock_connect_need_writable_event() const override { return false; } |
7c673cae | 336 | |
20effc67 | 337 | virtual void spawn_worker(std::function<void ()> &&func) override; |
7c673cae | 338 | virtual void join_worker(unsigned i) override; |
7c673cae FG |
339 | virtual bool is_ready() override { return fork_finished.load(); }; |
340 | virtual void ready() override { fork_finished = true; }; | |
341 | }; | |
31f18b77 | 342 | |
11fdf7f2 | 343 | |
7c673cae | 344 | #endif |