]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAStack.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / msg / async / rdma / RDMAStack.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#ifndef CEPH_MSG_RDMASTACK_H
18#define CEPH_MSG_RDMASTACK_H
19
20#include <sys/eventfd.h>
21
22#include <list>
23#include <vector>
24#include <thread>
25
26#include "common/ceph_context.h"
27#include "common/debug.h"
28#include "common/errno.h"
29#include "msg/async/Stack.h"
30#include "Infiniband.h"
7c673cae
FG
31
32class RDMAConnectedSocketImpl;
33class RDMAServerSocketImpl;
34class RDMAStack;
35class RDMAWorker;
36
7c673cae
FG
37class RDMADispatcher {
38 typedef Infiniband::MemoryManager::Chunk Chunk;
39 typedef Infiniband::QueuePair QueuePair;
40
41 std::thread t;
42 CephContext *cct;
f67539c2 43 std::shared_ptr<Infiniband> ib;
11fdf7f2
TL
44 Infiniband::CompletionQueue* tx_cq = nullptr;
45 Infiniband::CompletionQueue* rx_cq = nullptr;
46 Infiniband::CompletionChannel *tx_cc = nullptr, *rx_cc = nullptr;
7c673cae 47 bool done = false;
7c673cae 48 std::atomic<uint64_t> num_qp_conn = {0};
9f95a23c
TL
49 // protect `qp_conns`, `dead_queue_pairs`
50 ceph::mutex lock = ceph::make_mutex("RDMADispatcher::lock");
7c673cae
FG
51 // qp_num -> InfRcConnection
52 // The main usage of `qp_conns` is looking up connection by qp_num,
53 // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
54 //// make qp queue into dead state
55 /**
56 * 1. Connection call mark_down
57 * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
9f95a23c
TL
58 * 3. Post a beacon
59 * 4. Wait for beacon which indicates queues are drained
60 * 5. Destroy the QP by calling ibv_destroy_qp()
7c673cae
FG
61 *
62 * @param qp The qp needed to dead
63 */
64 ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
65
66 /// if a queue pair is closed when transmit buffers are active
67 /// on it, the transmit buffers never get returned via tx_cq. To
68 /// work around this problem, don't delete queue pairs immediately. Instead,
69 /// save them in this vector and delete them at a safe time, when there are
70 /// no outstanding transmit buffers to be lost.
71 std::vector<QueuePair*> dead_queue_pairs;
72
73 std::atomic<uint64_t> num_pending_workers = {0};
9f95a23c
TL
74 // protect pending workers
75 ceph::mutex w_lock =
76 ceph::make_mutex("RDMADispatcher::for worker pending list");
7c673cae
FG
77 // fixme: lockfree
78 std::list<RDMAWorker*> pending_workers;
f67539c2
TL
79 void enqueue_dead_qp_lockless(uint32_t qp);
80 void enqueue_dead_qp(uint32_t qpn);
31f18b77 81
7c673cae
FG
82 public:
83 PerfCounters *perf_logger;
84
f67539c2 85 explicit RDMADispatcher(CephContext* c, std::shared_ptr<Infiniband>& ib);
7c673cae 86 virtual ~RDMADispatcher();
31f18b77 87 void handle_async_event();
7c673cae
FG
88
89 void polling_start();
90 void polling_stop();
91 void polling();
11fdf7f2 92 void register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
7c673cae 93 void make_pending_worker(RDMAWorker* w) {
9f95a23c 94 std::lock_guard l{w_lock};
224ce89b
WB
95 auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
96 if (it != pending_workers.end())
97 return;
98 pending_workers.push_back(w);
99 ++num_pending_workers;
7c673cae 100 }
7c673cae 101 RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
9f95a23c 102 QueuePair* get_qp_lockless(uint32_t qp);
11fdf7f2 103 QueuePair* get_qp(uint32_t qp);
9f95a23c 104 void schedule_qp_destroy(uint32_t qp);
31f18b77
FG
105 Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
106 Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
7c673cae 107 void notify_pending_workers();
31f18b77
FG
108 void handle_tx_event(ibv_wc *cqe, int n);
109 void post_tx_buffer(std::vector<Chunk*> &chunks);
9f95a23c 110 void handle_rx_event(ibv_wc *cqe, int rx_number);
7c673cae
FG
111
112 std::atomic<uint64_t> inflight = {0};
7c673cae 113
11fdf7f2 114 void post_chunk_to_pool(Chunk* chunk);
9f95a23c 115 int post_chunks_to_rq(int num, QueuePair *qp = nullptr);
7c673cae
FG
116};
117
118class RDMAWorker : public Worker {
119 typedef Infiniband::CompletionQueue CompletionQueue;
120 typedef Infiniband::CompletionChannel CompletionChannel;
121 typedef Infiniband::MemoryManager::Chunk Chunk;
122 typedef Infiniband::MemoryManager MemoryManager;
123 typedef std::vector<Chunk*>::iterator ChunkIter;
f67539c2 124 std::shared_ptr<Infiniband> ib;
7c673cae
FG
125 EventCallbackRef tx_handler;
126 std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
f67539c2 127 std::shared_ptr<RDMADispatcher> dispatcher;
9f95a23c 128 ceph::mutex lock = ceph::make_mutex("RDMAWorker::lock");
7c673cae
FG
129
130 class C_handle_cq_tx : public EventCallback {
131 RDMAWorker *worker;
132 public:
11fdf7f2
TL
133 explicit C_handle_cq_tx(RDMAWorker *w): worker(w) {}
134 void do_request(uint64_t fd) {
7c673cae
FG
135 worker->handle_pending_message();
136 }
137 };
138
139 public:
140 PerfCounters *perf_logger;
141 explicit RDMAWorker(CephContext *c, unsigned i);
142 virtual ~RDMAWorker();
11fdf7f2
TL
143 virtual int listen(entity_addr_t &addr,
144 unsigned addr_slot,
145 const SocketOptions &opts, ServerSocket *) override;
7c673cae
FG
146 virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
147 virtual void initialize() override;
7c673cae
FG
148 int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
149 void remove_pending_conn(RDMAConnectedSocketImpl *o) {
11fdf7f2 150 ceph_assert(center.in_thread());
7c673cae
FG
151 pending_sent_conns.remove(o);
152 }
153 void handle_pending_message();
f67539c2
TL
154 void set_dispatcher(std::shared_ptr<RDMADispatcher>& dispatcher) { this->dispatcher = dispatcher; }
155 void set_ib(std::shared_ptr<Infiniband> &ib) {this->ib = ib;}
7c673cae
FG
156 void notify_worker() {
157 center.dispatch_event_external(tx_handler);
158 }
159};
160
11fdf7f2
TL
161struct RDMACMInfo {
162 RDMACMInfo(rdma_cm_id *cid, rdma_event_channel *cm_channel_, uint32_t qp_num_)
163 : cm_id(cid), cm_channel(cm_channel_), qp_num(qp_num_) {}
164 rdma_cm_id *cm_id;
165 rdma_event_channel *cm_channel;
166 uint32_t qp_num;
167};
168
31f18b77
FG
169class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
170 public:
171 typedef Infiniband::MemoryManager::Chunk Chunk;
172 typedef Infiniband::CompletionChannel CompletionChannel;
173 typedef Infiniband::CompletionQueue CompletionQueue;
174
11fdf7f2 175 protected:
31f18b77
FG
176 CephContext *cct;
177 Infiniband::QueuePair *qp;
9f95a23c
TL
178 uint32_t peer_qpn = 0;
179 uint32_t local_qpn = 0;
31f18b77
FG
180 int connected;
181 int error;
f67539c2
TL
182 std::shared_ptr<Infiniband> ib;
183 std::shared_ptr<RDMADispatcher> dispatcher;
31f18b77
FG
184 RDMAWorker* worker;
185 std::vector<Chunk*> buffers;
186 int notify_fd = -1;
f67539c2 187 ceph::buffer::list pending_bl;
31f18b77 188
9f95a23c 189 ceph::mutex lock = ceph::make_mutex("RDMAConnectedSocketImpl::lock");
31f18b77
FG
190 std::vector<ibv_wc> wc;
191 bool is_server;
9f95a23c
TL
192 EventCallbackRef read_handler;
193 EventCallbackRef established_handler;
31f18b77
FG
194 int tcp_fd = -1;
195 bool active;// qp is active ?
224ce89b 196 bool pending;
11fdf7f2 197 int post_backlog = 0;
31f18b77
FG
198
199 void notify();
9f95a23c 200 void buffer_prefetch(void);
31f18b77
FG
201 ssize_t read_buffers(char* buf, size_t len);
202 int post_work_request(std::vector<Chunk*>&);
9f95a23c
TL
203 size_t tx_copy_chunk(std::vector<Chunk*> &tx_buffers, size_t req_copy_len,
204 decltype(std::cbegin(pending_bl.buffers()))& start,
205 const decltype(std::cbegin(pending_bl.buffers()))& end);
31f18b77
FG
206
207 public:
f67539c2
TL
208 RDMAConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
209 std::shared_ptr<RDMADispatcher>& rdma_dispatcher, RDMAWorker *w);
31f18b77
FG
210 virtual ~RDMAConnectedSocketImpl();
211
212 void pass_wc(std::vector<ibv_wc> &&v);
213 void get_wc(std::vector<ibv_wc> &w);
214 virtual int is_connected() override { return connected; }
215
216 virtual ssize_t read(char* buf, size_t len) override;
f67539c2 217 virtual ssize_t send(ceph::buffer::list &bl, bool more) override;
31f18b77
FG
218 virtual void shutdown() override;
219 virtual void close() override;
220 virtual int fd() const override { return notify_fd; }
1e59de90 221 virtual void set_priority(int sd, int prio, int domain) override;
31f18b77
FG
222 void fault();
223 const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
9f95a23c
TL
224 uint32_t get_peer_qpn () const { return peer_qpn; }
225 uint32_t get_local_qpn () const { return local_qpn; }
f67539c2 226 Infiniband::QueuePair* get_qp () const { return qp; }
31f18b77
FG
227 ssize_t submit(bool more);
228 int activate();
229 void fin();
230 void handle_connection();
9f95a23c 231 int handle_connection_established(bool need_set_fault = true);
31f18b77
FG
232 void cleanup();
233 void set_accept_fd(int sd);
11fdf7f2 234 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
224ce89b
WB
235 bool is_pending() {return pending;}
236 void set_pending(bool val) {pending = val;}
11fdf7f2
TL
237 void post_chunks_to_rq(int num);
238 void update_post_backlog();
31f18b77
FG
239};
240
11fdf7f2
TL
241enum RDMA_CM_STATUS {
242 IDLE = 1,
243 RDMA_ID_CREATED,
244 CHANNEL_FD_CREATED,
245 RESOURCE_ALLOCATED,
246 ADDR_RESOLVED,
247 ROUTE_RESOLVED,
248 CONNECTED,
249 DISCONNECTED,
250 ERROR
251};
252
253class RDMAIWARPConnectedSocketImpl : public RDMAConnectedSocketImpl {
254 public:
f67539c2
TL
255 RDMAIWARPConnectedSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
256 std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
257 RDMAWorker *w, RDMACMInfo *info = nullptr);
11fdf7f2
TL
258 ~RDMAIWARPConnectedSocketImpl();
259 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
260 virtual void close() override;
261 virtual void shutdown() override;
262 virtual void handle_cm_connection();
11fdf7f2
TL
263 void activate();
264 int alloc_resource();
265 void close_notify();
266
267 private:
9f95a23c
TL
268 rdma_cm_id *cm_id = nullptr;
269 rdma_event_channel *cm_channel = nullptr;
11fdf7f2 270 EventCallbackRef cm_con_handler;
11fdf7f2
TL
271 std::mutex close_mtx;
272 std::condition_variable close_condition;
9f95a23c
TL
273 bool closed = false;
274 RDMA_CM_STATUS status = IDLE;
11fdf7f2
TL
275
276
277 class C_handle_cm_connection : public EventCallback {
278 RDMAIWARPConnectedSocketImpl *csi;
279 public:
280 C_handle_cm_connection(RDMAIWARPConnectedSocketImpl *w): csi(w) {}
281 void do_request(uint64_t fd) {
282 csi->handle_cm_connection();
283 }
284 };
285};
286
31f18b77 287class RDMAServerSocketImpl : public ServerSocketImpl {
11fdf7f2
TL
288 protected:
289 CephContext *cct;
f67539c2 290 ceph::NetHandler net;
11fdf7f2 291 int server_setup_socket;
f67539c2
TL
292 std::shared_ptr<Infiniband> ib;
293 std::shared_ptr<RDMADispatcher> dispatcher;
11fdf7f2
TL
294 RDMAWorker *worker;
295 entity_addr_t sa;
31f18b77
FG
296
297 public:
f67539c2
TL
298 RDMAServerSocketImpl(CephContext *cct, std::shared_ptr<Infiniband>& ib,
299 std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
11fdf7f2 300 RDMAWorker *w, entity_addr_t& a, unsigned slot);
31f18b77 301
11fdf7f2 302 virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
31f18b77
FG
303 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
304 virtual void abort_accept() override;
305 virtual int fd() const override { return server_setup_socket; }
31f18b77 306};
7c673cae 307
11fdf7f2
TL
308class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
309 public:
310 RDMAIWARPServerSocketImpl(
f67539c2
TL
311 CephContext *cct, std::shared_ptr<Infiniband>& ib,
312 std::shared_ptr<RDMADispatcher>& rdma_dispatcher,
9f95a23c 313 RDMAWorker* w, entity_addr_t& addr, unsigned addr_slot);
11fdf7f2
TL
314 virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
315 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
316 virtual void abort_accept() override;
317 private:
9f95a23c
TL
318 rdma_cm_id *cm_id = nullptr;
319 rdma_event_channel *cm_channel = nullptr;
11fdf7f2
TL
320};
321
7c673cae 322class RDMAStack : public NetworkStack {
f67539c2 323 std::vector<std::thread> threads;
11fdf7f2 324 PerfCounters *perf_counter;
f67539c2
TL
325 std::shared_ptr<Infiniband> ib;
326 std::shared_ptr<RDMADispatcher> rdma_dispatcher;
7c673cae
FG
327
328 std::atomic<bool> fork_finished = {false};
329
20effc67 330 virtual Worker* create_worker(CephContext *c, unsigned worker_id) override;
f67539c2 331
7c673cae 332 public:
f67539c2 333 explicit RDMAStack(CephContext *cct);
7c673cae 334 virtual ~RDMAStack();
11fdf7f2 335 virtual bool nonblock_connect_need_writable_event() const override { return false; }
7c673cae 336
20effc67 337 virtual void spawn_worker(std::function<void ()> &&func) override;
7c673cae 338 virtual void join_worker(unsigned i) override;
7c673cae
FG
339 virtual bool is_ready() override { return fork_finished.load(); };
340 virtual void ready() override { fork_finished = true; };
341};
31f18b77 342
11fdf7f2 343
7c673cae 344#endif