]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / msg / async / rdma / RDMAIWARPConnectedSocketImpl.cc
CommitLineData
11fdf7f2
TL
1#include "RDMAStack.h"
2
3#define dout_subsys ceph_subsys_ms
4#undef dout_prefix
5#define dout_prefix *_dout << " RDMAIWARPConnectedSocketImpl "
6
7#define TIMEOUT_MS 3000
8#define RETRY_COUNT 7
9
10RDMAIWARPConnectedSocketImpl::RDMAIWARPConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
11 RDMAWorker *w, RDMACMInfo *info)
12 : RDMAConnectedSocketImpl(cct, ib, s, w), cm_con_handler(new C_handle_cm_connection(this))
13{
14 status = IDLE;
15 notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
16 if (info) {
17 is_server = true;
18 cm_id = info->cm_id;
19 cm_channel = info->cm_channel;
20 status = RDMA_ID_CREATED;
21 remote_qpn = info->qp_num;
22 if (alloc_resource()) {
23 close_notify();
24 return;
25 }
26 worker->center.submit_to(worker->center.get_id(), [this]() {
27 worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
28 status = CHANNEL_FD_CREATED;
29 }, false);
30 status = RESOURCE_ALLOCATED;
31 local_qpn = qp->get_local_qp_number();
32 my_msg.qpn = local_qpn;
33 } else {
34 is_server = false;
35 cm_channel = rdma_create_event_channel();
36 rdma_create_id(cm_channel, &cm_id, NULL, RDMA_PS_TCP);
37 status = RDMA_ID_CREATED;
38 ldout(cct, 20) << __func__ << " successfully created cm id: " << cm_id << dendl;
39 }
40}
41
42RDMAIWARPConnectedSocketImpl::~RDMAIWARPConnectedSocketImpl() {
43 ldout(cct, 20) << __func__ << " destruct." << dendl;
44 std::unique_lock l(close_mtx);
45 close_condition.wait(l, [&] { return closed; });
46 if (status >= RDMA_ID_CREATED) {
47 rdma_destroy_id(cm_id);
48 rdma_destroy_event_channel(cm_channel);
49 }
50}
51
52int RDMAIWARPConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
53 worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
54 status = CHANNEL_FD_CREATED;
55 if (rdma_resolve_addr(cm_id, NULL, const_cast<struct sockaddr*>(peer_addr.get_sockaddr()), TIMEOUT_MS)) {
56 lderr(cct) << __func__ << " failed to resolve addr" << dendl;
57 return -1;
58 }
59 return 0;
60}
61
62void RDMAIWARPConnectedSocketImpl::close() {
63 error = ECONNRESET;
64 active = false;
65 if (status >= CONNECTED) {
66 rdma_disconnect(cm_id);
67 }
68 close_notify();
69}
70
71void RDMAIWARPConnectedSocketImpl::shutdown() {
72 error = ECONNRESET;
73 active = false;
74}
75
76void RDMAIWARPConnectedSocketImpl::handle_cm_connection() {
77 struct rdma_cm_event *event;
78 rdma_get_cm_event(cm_channel, &event);
79 ldout(cct, 20) << __func__ << " event name: " << rdma_event_str(event->event)
80 << " (cm id: " << cm_id << ")" << dendl;
81 struct rdma_conn_param cm_params;
82 switch (event->event) {
83 case RDMA_CM_EVENT_ADDR_RESOLVED:
84 status = ADDR_RESOLVED;
85 if (rdma_resolve_route(cm_id, TIMEOUT_MS)) {
86 lderr(cct) << __func__ << " failed to resolve rdma addr" << dendl;
87 notify();
88 }
89 break;
90
91 case RDMA_CM_EVENT_ROUTE_RESOLVED:
92 status = ROUTE_RESOLVED;
93 if (alloc_resource()) {
94 lderr(cct) << __func__ << " failed to alloc resource while resolving the route" << dendl;
95 connected = -ECONNREFUSED;
96 notify();
97 break;
98 }
99 local_qpn = qp->get_local_qp_number();
100 my_msg.qpn = local_qpn;
101
92f5a8d4 102 // FIPS zeroization audit 20191115: this memset is not security related.
11fdf7f2
TL
103 memset(&cm_params, 0, sizeof(cm_params));
104 cm_params.retry_count = RETRY_COUNT;
105 cm_params.qp_num = local_qpn;
106 if (rdma_connect(cm_id, &cm_params)) {
107 lderr(cct) << __func__ << " failed to connect remote rdma port" << dendl;
108 connected = -ECONNREFUSED;
109 notify();
110 }
111 break;
112
113 case RDMA_CM_EVENT_ESTABLISHED:
114 ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
115 status = CONNECTED;
116 if (!is_server) {
117 remote_qpn = event->param.conn.qp_num;
118 activate();
119 notify();
120 }
121 break;
122
123 case RDMA_CM_EVENT_ADDR_ERROR:
124 case RDMA_CM_EVENT_ROUTE_ERROR:
125 case RDMA_CM_EVENT_CONNECT_ERROR:
126 case RDMA_CM_EVENT_UNREACHABLE:
127 case RDMA_CM_EVENT_REJECTED:
128 lderr(cct) << __func__ << " rdma connection rejected" << dendl;
129 connected = -ECONNREFUSED;
130 notify();
131 break;
132
133 case RDMA_CM_EVENT_DISCONNECTED:
134 status = DISCONNECTED;
135 close_notify();
136 if (!error) {
137 error = ECONNRESET;
138 notify();
139 }
140 break;
141
142 case RDMA_CM_EVENT_DEVICE_REMOVAL:
143 break;
144
145 default:
146 ceph_abort_msg("unhandled event");
147 break;
148 }
149 rdma_ack_cm_event(event);
150}
151
152void RDMAIWARPConnectedSocketImpl::activate() {
153 ldout(cct, 30) << __func__ << dendl;
154 active = true;
155 connected = 1;
156}
157
158int RDMAIWARPConnectedSocketImpl::alloc_resource() {
159 ldout(cct, 30) << __func__ << dendl;
160 qp = infiniband->create_queue_pair(cct, dispatcher->get_tx_cq(),
161 dispatcher->get_rx_cq(), IBV_QPT_RC, cm_id);
162 if (!qp) {
163 return -1;
164 }
165 if (!cct->_conf->ms_async_rdma_support_srq)
166 dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
167 dispatcher->register_qp(qp, this);
168 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
169 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
170 return 0;
171}
172
173void RDMAIWARPConnectedSocketImpl::close_notify() {
174 ldout(cct, 30) << __func__ << dendl;
175 if (status >= CHANNEL_FD_CREATED) {
176 worker->center.delete_file_event(cm_channel->fd, EVENT_READABLE);
177 }
178 std::unique_lock l(close_mtx);
179 if (!closed) {
180 closed = true;
181 close_condition.notify_all();
182 }
183}