1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
18 #define CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
20 #include "common/ceph_context.h"
21 #include "common/debug.h"
22 #include "common/errno.h"
23 #include "msg/async/Stack.h"
24 #include "Infiniband.h"
28 class RDMAConnectedSocketImpl
;
30 typedef Infiniband::QueuePair QueuePair
;
33 friend class RDMAConnectedSocketImpl
;
37 RDMAConnectedSocketImpl
*socket
;
38 Infiniband
* infiniband
;
39 RDMADispatcher
* dispatcher
;
43 bool active
;// qp is active ?
47 RDMAConnMgr(CephContext
*cct
, RDMAConnectedSocketImpl
*sock
,
48 Infiniband
* ib
, RDMADispatcher
* s
, RDMAWorker
*w
);
49 virtual ~RDMAConnMgr() { };
51 virtual ostream
&print(ostream
&out
) const = 0;
53 virtual void cleanup() = 0;
54 virtual int try_connect(const entity_addr_t
&, const SocketOptions
&opt
) = 0;
61 inline ostream
& operator<<(ostream
& out
, const RDMAConnMgr
&m
)
66 class RDMAConnectedSocketImpl
: public ConnectedSocketImpl
{
67 friend class RDMAConnMgr
;
71 Infiniband
* infiniband
;
72 RDMADispatcher
* dispatcher
;
74 Device
*ibdev
= nullptr;
76 QueuePair
*qp
= nullptr;
79 typedef Infiniband::MemoryManager::Chunk Chunk
;
80 typedef Infiniband::CompletionChannel CompletionChannel
;
81 typedef Infiniband::CompletionQueue CompletionQueue
;
86 std::vector
<Chunk
*> buffers
;
88 bufferlist pending_bl
;
91 std::vector
<ibv_wc
> wc
;
93 ssize_t
read_buffers(char* buf
, size_t len
);
94 int post_work_request(std::vector
<Chunk
*>&);
97 uint32_t local_qpn
= 0;
98 uint32_t remote_qpn
= 0;
100 RDMAConnectedSocketImpl(CephContext
*cct
, Infiniband
* ib
, RDMADispatcher
* s
,
101 RDMAWorker
*w
, void *info
= nullptr);
102 virtual ~RDMAConnectedSocketImpl();
104 ostream
&print(ostream
&out
) const {
105 return out
<< "socket {lqpn: " << local_qpn
<< " rqpn: " << remote_qpn
<< " " << *cmgr
<< "}";
108 Device
*get_device() { return ibdev
; };
109 int get_ibport() { return ibport
; };
111 void pass_wc(std::vector
<ibv_wc
> &&v
);
112 void get_wc(std::vector
<ibv_wc
> &w
);
113 virtual int is_connected() override
{ return cmgr
->connected
; }
115 virtual ssize_t
read(char* buf
, size_t len
) override
;
116 virtual ssize_t
zero_copy_read(bufferptr
&data
) override
;
117 virtual ssize_t
send(bufferlist
&bl
, bool more
) override
;
118 virtual void shutdown() override
{ cmgr
->shutdown(); };
119 virtual void close() override
{ cmgr
->close(); };
120 virtual int fd() const override
{ return notify_fd
; }
122 const char* get_qp_state() { return Infiniband::qp_state_string(qp
->get_state()); }
123 QueuePair
*get_qp() { return qp
; };
124 ssize_t
submit(bool more
);
126 void register_qp(QueuePair
*qp
);
129 QueuePair
*create_queue_pair(Device
*d
, int p
);
130 int try_connect(const entity_addr_t
&sa
, const SocketOptions
&opt
) { return cmgr
->try_connect(sa
, opt
); };
132 inline ostream
& operator<<(ostream
& out
, const RDMAConnectedSocketImpl
&s
)
138 class RDMAServerSocketImpl
: public ServerSocketImpl
{
143 Infiniband
* infiniband
;
144 RDMADispatcher
*dispatcher
;
149 RDMAServerSocketImpl(CephContext
*cct
, Infiniband
* i
, RDMADispatcher
*s
, RDMAWorker
*w
, entity_addr_t
& a
);
151 virtual int listen(entity_addr_t
&sa
, const SocketOptions
&opt
) = 0;
152 virtual int accept(ConnectedSocket
*s
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) = 0;
153 virtual void abort_accept() = 0;
154 virtual int fd() const = 0;