]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.h
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
18 #define CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
19
20 #include "common/ceph_context.h"
21 #include "common/debug.h"
22 #include "common/errno.h"
23 #include "msg/async/Stack.h"
24 #include "Infiniband.h"
25
26 class RDMAWorker;
27 class RDMADispatcher;
28 class RDMAConnectedSocketImpl;
29
30 typedef Infiniband::QueuePair QueuePair;
31
32 class RDMAConnMgr {
33 friend class RDMAConnectedSocketImpl;
34
35 protected:
36 CephContext *cct;
37 RDMAConnectedSocketImpl *socket;
38 Infiniband* infiniband;
39 RDMADispatcher* dispatcher;
40 RDMAWorker* worker;
41
42 bool is_server;
43 bool active;// qp is active ?
44 int connected;
45
46 public:
47 RDMAConnMgr(CephContext *cct, RDMAConnectedSocketImpl *sock,
48 Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
49 virtual ~RDMAConnMgr() { };
50
51 virtual ostream &print(ostream &out) const = 0;
52
53 virtual void cleanup() = 0;
54 virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) = 0;
55
56 void post_read();
57
58 void shutdown();
59 void close();
60 };
61 inline ostream& operator<<(ostream& out, const RDMAConnMgr &m)
62 {
63 return m.print(out);
64 }
65
66 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
67 friend class RDMAConnMgr;
68
69 protected:
70 CephContext *cct;
71 Infiniband* infiniband;
72 RDMADispatcher* dispatcher;
73 RDMAWorker* worker;
74 Device *ibdev = nullptr;
75 int ibport = -1;
76 QueuePair *qp = nullptr;
77
78 public:
79 typedef Infiniband::MemoryManager::Chunk Chunk;
80 typedef Infiniband::CompletionChannel CompletionChannel;
81 typedef Infiniband::CompletionQueue CompletionQueue;
82
83 private:
84 RDMAConnMgr *cmgr;
85 int error;
86 std::vector<Chunk*> buffers;
87 int notify_fd = -1;
88 bufferlist pending_bl;
89
90 Mutex lock;
91 std::vector<ibv_wc> wc;
92
93 ssize_t read_buffers(char* buf, size_t len);
94 int post_work_request(std::vector<Chunk*>&);
95
96 public:
97 uint32_t local_qpn = 0;
98 uint32_t remote_qpn = 0;
99
100 RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
101 RDMAWorker *w, void *info = nullptr);
102 virtual ~RDMAConnectedSocketImpl();
103
104 ostream &print(ostream &out) const {
105 return out << "socket {lqpn: " << local_qpn << " rqpn: " << remote_qpn << " " << *cmgr << "}";
106 };
107
108 Device *get_device() { return ibdev; };
109 int get_ibport() { return ibport; };
110
111 void pass_wc(std::vector<ibv_wc> &&v);
112 void get_wc(std::vector<ibv_wc> &w);
113 virtual int is_connected() override { return cmgr->connected; }
114
115 virtual ssize_t read(char* buf, size_t len) override;
116 virtual ssize_t zero_copy_read(bufferptr &data) override;
117 virtual ssize_t send(bufferlist &bl, bool more) override;
118 virtual void shutdown() override { cmgr->shutdown(); };
119 virtual void close() override { cmgr->close(); };
120 virtual int fd() const override { return notify_fd; }
121 void fault();
122 const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
123 QueuePair *get_qp() { return qp; };
124 ssize_t submit(bool more);
125 void fin();
126 void register_qp(QueuePair *qp);
127 void notify();
128
129 QueuePair *create_queue_pair(Device *d, int p);
130 int try_connect(const entity_addr_t &sa, const SocketOptions &opt) { return cmgr->try_connect(sa, opt); };
131 };
132 inline ostream& operator<<(ostream& out, const RDMAConnectedSocketImpl &s)
133 {
134 return s.print(out);
135 }
136
137
138 class RDMAServerSocketImpl : public ServerSocketImpl {
139 protected:
140 CephContext *cct;
141 Device *ibdev;
142 int ibport;
143 Infiniband* infiniband;
144 RDMADispatcher *dispatcher;
145 RDMAWorker *worker;
146 entity_addr_t sa;
147
148 public:
149 RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
150
151 virtual int listen(entity_addr_t &sa, const SocketOptions &opt) = 0;
152 virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) = 0;
153 virtual void abort_accept() = 0;
154 virtual int fd() const = 0;
155 };
156
157 #endif
158