]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/rdma/Infiniband.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / rdma / Infiniband.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include "Infiniband.h"
7c673cae
FG
18#include "common/errno.h"
19#include "common/debug.h"
31f18b77 20#include "RDMAStack.h"
11fdf7f2
TL
21#include <sys/time.h>
22#include <sys/resource.h>
7c673cae
FG
23
24#define dout_subsys ceph_subsys_ms
25#undef dout_prefix
26#define dout_prefix *_dout << "Infiniband "
27
31f18b77 28static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
7c673cae 29static const uint32_t MAX_INLINE_DATA = 0;
31f18b77
FG
30static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
31static const uint32_t CQ_DEPTH = 30000;
32
9f95a23c
TL
33Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn),
34 gid_idx(cct->_conf.get_val<int64_t>("ms_async_rdma_gid_idx"))
31f18b77 35{
9f95a23c
TL
36 int r = ibv_query_port(ctxt, port_num, &port_attr);
37 if (r == -1) {
38 lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl;
39 ceph_abort();
40 }
41
42 lid = port_attr.lid;
43 ceph_assert(gid_idx < port_attr.gid_tbl_len);
31f18b77
FG
44#ifdef HAVE_IBV_EXP
45 union ibv_gid cgid;
46 struct ibv_exp_gid_attr gid_attr;
47 bool malformed = false;
48
49 ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
31f18b77 50
31f18b77
FG
51
52 // search for requested GID in GIDs table
53 ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
54 << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
55 r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
56 "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
57 ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
58 &cgid.raw[ 0], &cgid.raw[ 1],
59 &cgid.raw[ 2], &cgid.raw[ 3],
60 &cgid.raw[ 4], &cgid.raw[ 5],
61 &cgid.raw[ 6], &cgid.raw[ 7],
62 &cgid.raw[ 8], &cgid.raw[ 9],
63 &cgid.raw[10], &cgid.raw[11],
64 &cgid.raw[12], &cgid.raw[13],
65 &cgid.raw[14], &cgid.raw[15]);
66
67 if (r != 16) {
68 ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
69 malformed = true;
70 }
71
72 gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
73
9f95a23c 74 for (gid_idx = 0; gid_idx < port_attr.gid_tbl_len; gid_idx++) {
31f18b77
FG
75 r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
76 if (r) {
77 lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
78 ceph_abort();
79 }
80 r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
81 if (r) {
82 lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl;
83 ceph_abort();
84 }
85
86 if (malformed) break; // stay with gid_idx=0
87 if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
88 (memcmp(&gid, &cgid, 16) == 0) ) {
89 ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
90 break;
91 }
92 }
93
9f95a23c 94 if (gid_idx == port_attr.gid_tbl_len) {
31f18b77
FG
95 lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
96 ceph_abort();
97 }
98#else
9f95a23c 99 r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
31f18b77
FG
100 if (r) {
101 lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl;
102 ceph_abort();
103 }
104#endif
105}
106
9f95a23c 107Device::Device(CephContext *cct, ibv_device* ib_dev): device(ib_dev), active_port(nullptr)
31f18b77 108{
9f95a23c
TL
109 ceph_assert(device);
110 ctxt = ibv_open_device(device);
111 ceph_assert(ctxt);
112
31f18b77 113 name = ibv_get_device_name(device);
9f95a23c
TL
114
115 int r = ibv_query_device(ctxt, &device_attr);
116 if (r) {
117 lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
31f18b77
FG
118 ceph_abort();
119 }
9f95a23c
TL
120}
121
122Device::Device(CephContext *cct, struct ibv_context *ib_ctx): device(ib_ctx->device),
123 active_port(nullptr)
124{
125 ceph_assert(device);
126 ctxt = ib_ctx;
127 ceph_assert(ctxt);
128
129 name = ibv_get_device_name(device);
130
131 int r = ibv_query_device(ctxt, &device_attr);
132 if (r) {
31f18b77
FG
133 lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
134 ceph_abort();
135 }
136}
137
138void Device::binding_port(CephContext *cct, int port_num) {
9f95a23c
TL
139 port_cnt = device_attr.phys_port_cnt;
140 for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) {
141 Port *port = new Port(cct, ctxt, port_id);
142 if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
31f18b77 143 active_port = port;
9f95a23c 144 ldout(cct, 1) << __func__ << " found active port " << static_cast<int>(port_id) << dendl;
31f18b77
FG
145 break;
146 } else {
9f95a23c
TL
147 ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: "
148 << ibv_port_state_str(port->get_port_attr()->state) << dendl;
149 delete port;
31f18b77 150 }
31f18b77
FG
151 }
152 if (nullptr == active_port) {
153 lderr(cct) << __func__ << " port not found" << dendl;
11fdf7f2 154 ceph_assert(active_port);
31f18b77
FG
155 }
156}
157
7c673cae
FG
158
159Infiniband::QueuePair::QueuePair(
31f18b77 160 CephContext *c, Infiniband& infiniband, ibv_qp_type type,
7c673cae
FG
161 int port, ibv_srq *srq,
162 Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
11fdf7f2 163 uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key)
31f18b77 164: cct(c), infiniband(infiniband),
7c673cae 165 type(type),
31f18b77 166 ctxt(infiniband.device->ctxt),
7c673cae 167 ib_physical_port(port),
31f18b77 168 pd(infiniband.pd->pd),
7c673cae
FG
169 srq(srq),
170 qp(NULL),
9f95a23c 171 cm_id(cid), peer_cm_meta{0}, local_cm_meta{0},
7c673cae
FG
172 txcq(txcq),
173 rxcq(rxcq),
9f95a23c
TL
174 initial_psn(lrand48() & PSN_MSK),
175 // One extra WR for beacon
176 max_send_wr(tx_queue_len + 1),
11fdf7f2 177 max_recv_wr(rx_queue_len),
7c673cae
FG
178 q_key(q_key),
179 dead(false)
180{
7c673cae
FG
181 if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
182 lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
183 ceph_abort();
184 }
9f95a23c
TL
185}
186
187int Infiniband::QueuePair::modify_qp_to_error(void)
188{
189 ibv_qp_attr qpa;
190 // FIPS zeroization audit 20191115: this memset is not security related.
191 memset(&qpa, 0, sizeof(qpa));
192 qpa.qp_state = IBV_QPS_ERR;
193 if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) {
194 lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl;
195 return -1;
196 }
197 ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl;
198 return 0;
199}
200
201int Infiniband::QueuePair::modify_qp_to_rts(void)
202{
203 // move from RTR state RTS
204 ibv_qp_attr qpa;
205 // FIPS zeroization audit 20191115: this memset is not security related.
206 memset(&qpa, 0, sizeof(qpa));
207 qpa.qp_state = IBV_QPS_RTS;
208 /*
209 * How long to wait before retrying if packet lost or server dead.
210 * Supposedly the timeout is 4.096us*2^timeout. However, the actual
211 * timeout appears to be 4.096us*2^(timeout+1), so the setting
212 * below creates a 135ms timeout.
213 */
214 qpa.timeout = 0x12;
215 // How many times to retry after timeouts before giving up.
216 qpa.retry_cnt = 7;
217 /*
218 * How many times to retry after RNR (receiver not ready) condition
219 * before giving up. Occurs when the remote side has not yet posted
220 * a receive request.
221 */
222 qpa.rnr_retry = 7; // 7 is infinite retry.
223 qpa.sq_psn = local_cm_meta.psn;
224 qpa.max_rd_atomic = 1;
225
226 int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC;
227 int r = ibv_modify_qp(qp, &qpa, attr_mask);
228 if (r) {
229 lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl;
230 return -1;
231 }
232 ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
233 return 0;
234}
235
236int Infiniband::QueuePair::modify_qp_to_rtr(void)
237{
238 // move from INIT to RTR state
239 ibv_qp_attr qpa;
240 // FIPS zeroization audit 20191115: this memset is not security related.
241 memset(&qpa, 0, sizeof(qpa));
242 qpa.qp_state = IBV_QPS_RTR;
243 qpa.path_mtu = IBV_MTU_1024;
244 qpa.dest_qp_num = peer_cm_meta.local_qpn;
245 qpa.rq_psn = peer_cm_meta.psn;
246 qpa.max_dest_rd_atomic = 1;
247 qpa.min_rnr_timer = 0x12;
248 qpa.ah_attr.is_global = 1;
249 qpa.ah_attr.grh.hop_limit = 6;
250 qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
251 qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx();
252 qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
253 //qpa.ah_attr.grh.flow_label = 0;
254
255 qpa.ah_attr.dlid = peer_cm_meta.lid;
256 qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
257 qpa.ah_attr.src_path_bits = 0;
258 qpa.ah_attr.port_num = (uint8_t)(ib_physical_port);
259
260 ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
261
262 int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC;
263
264 int r = ibv_modify_qp(qp, &qpa, attr_mask);
265 if (r) {
266 lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl;
267 return -1;
268 }
269 ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
270 return 0;
271}
272
273int Infiniband::QueuePair::modify_qp_to_init(void)
274{
275 // move from RESET to INIT state
276 ibv_qp_attr qpa;
277 // FIPS zeroization audit 20191115: this memset is not security related.
278 memset(&qpa, 0, sizeof(qpa));
279 qpa.qp_state = IBV_QPS_INIT;
280 qpa.pkey_index = 0;
281 qpa.port_num = (uint8_t)(ib_physical_port);
282 qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
283 qpa.qkey = q_key;
284
285 int mask = IBV_QP_STATE | IBV_QP_PORT;
286 switch (type) {
287 case IBV_QPT_RC:
288 mask |= IBV_QP_ACCESS_FLAGS;
289 mask |= IBV_QP_PKEY_INDEX;
290 break;
291 case IBV_QPT_UD:
292 mask |= IBV_QP_QKEY;
293 mask |= IBV_QP_PKEY_INDEX;
294 break;
295 case IBV_QPT_RAW_PACKET:
296 break;
297 default:
298 ceph_abort();
299 }
300
301 if (ibv_modify_qp(qp, &qpa, mask)) {
302 lderr(cct) << __func__ << " failed to switch to INIT state Queue Pair, qp number: " << qp->qp_num
303 << " Error: " << cpp_strerror(errno) << dendl;
304 return -1;
305 }
306 ldout(cct, 20) << __func__ << " successfully switch to INIT state Queue Pair, qp number: " << qp->qp_num << dendl;
307 return 0;
7c673cae
FG
308}
309
310int Infiniband::QueuePair::init()
311{
312 ldout(cct, 20) << __func__ << " started." << dendl;
313 ibv_qp_init_attr qpia;
92f5a8d4 314 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
315 memset(&qpia, 0, sizeof(qpia));
316 qpia.send_cq = txcq->get_cq();
317 qpia.recv_cq = rxcq->get_cq();
11fdf7f2
TL
318 if (srq) {
319 qpia.srq = srq; // use the same shared receive queue
320 } else {
321 qpia.cap.max_recv_wr = max_recv_wr;
322 qpia.cap.max_recv_sge = 1;
323 }
7c673cae
FG
324 qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
325 qpia.cap.max_send_sge = 1; // max send scatter-gather elements
326 qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
327 qpia.qp_type = type; // RC, UC, UD, or XRC
328 qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs
329
11fdf7f2
TL
330 if (!cct->_conf->ms_async_rdma_cm) {
331 qp = ibv_create_qp(pd, &qpia);
332 if (qp == NULL) {
333 lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
334 if (errno == ENOMEM) {
335 lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
336 " ms_async_rdma_send_buffers or"
337 " ms_async_rdma_buffer_size" << dendl;
338 }
339 return -1;
7c673cae 340 }
9f95a23c
TL
341 if (modify_qp_to_init() != 0) {
342 ibv_destroy_qp(qp);
343 return -1;
344 }
11fdf7f2
TL
345 } else {
346 ceph_assert(cm_id->verbs == pd->context);
347 if (rdma_create_qp(cm_id, pd, &qpia)) {
348 lderr(cct) << __func__ << " failed to create queue pair with rdmacm library"
349 << cpp_strerror(errno) << dendl;
350 return -1;
351 }
352 qp = cm_id->qp;
7c673cae 353 }
7c673cae
FG
354 ldout(cct, 20) << __func__ << " successfully create queue pair: "
355 << "qp=" << qp << dendl;
9f95a23c
TL
356 local_cm_meta.local_qpn = get_local_qp_number();
357 local_cm_meta.psn = get_initial_psn();
358 local_cm_meta.lid = infiniband.get_lid();
359 local_cm_meta.peer_qpn = 0;
360 local_cm_meta.gid = infiniband.get_gid();
361 if (!srq) {
362 int rq_wrs = infiniband.post_chunks_to_rq(max_recv_wr, this);
363 if (rq_wrs == 0) {
364 lderr(cct) << __func__ << " intialize no SRQ Queue Pair, qp number: " << qp->qp_num
365 << " fatal error: can't post SQ WR " << dendl;
366 return -1;
367 }
368 ldout(cct, 20) << __func__ << " initialize no SRQ Queue Pair, qp number: "
369 << qp->qp_num << " post SQ WR " << rq_wrs << dendl;
370 }
371 return 0;
372}
7c673cae 373
9f95a23c
TL
374void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data)
375{
376 char tmp[9];
377 uint32_t v32;
378 int i;
11fdf7f2 379
9f95a23c
TL
380 for (tmp[8] = 0, i = 0; i < 4; ++i) {
381 memcpy(tmp, wgid + i * 8, 8);
382 sscanf(tmp, "%x", &v32);
383 *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32);
384 }
385}
7c673cae 386
9f95a23c
TL
387void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[])
388{
389 for (int i = 0; i < 4; ++i)
390 sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4)));
391}
392
393/*
394 * return value
395 * 1: means no valid buffer read
396 * 0: means got enough buffer
397 * < 0: means error
398 */
399int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd)
400{
401 char msg[TCP_MSG_LEN];
402 char gid[33];
403 ssize_t r = ::read(socket_fd, &msg, sizeof(msg));
404 // Drop incoming qpt
405 if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
406 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
407 ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
408 return -EINVAL;
409 }
410 }
411 if (r < 0) {
412 r = -errno;
413 lderr(cct) << __func__ << " got error " << r << ": "
414 << cpp_strerror(r) << dendl;
415 } else if (r == 0) { // valid disconnect message of length 0
416 ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
417 } else if ((size_t)r != sizeof(msg)) { // invalid message
418 ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
419 r = -EINVAL;
420 } else { // valid message
421 sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid);
422 wire_gid_to_gid(gid, &peer_cm_meta);
423 ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn
424 << ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl;
7c673cae 425 }
9f95a23c
TL
426 return r;
427}
7c673cae 428
9f95a23c
TL
429int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd)
430{
431 int retry = 0;
432 ssize_t r;
433
434 char msg[TCP_MSG_LEN];
435 char gid[33];
436retry:
437 gid_to_wire_gid(local_cm_meta, gid);
438 sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid);
439 ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn
440 << ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", " << gid << dendl;
441 r = ::write(socket_fd, msg, sizeof(msg));
442 // Drop incoming qpt
443 if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) {
444 if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
445 ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
446 return -EINVAL;
447 }
448 }
449
450 if ((size_t)r != sizeof(msg)) {
451 // FIXME need to handle EAGAIN instead of retry
452 if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
453 retry++;
454 goto retry;
455 }
456 if (r < 0)
457 lderr(cct) << __func__ << " send returned error " << errno << ": "
458 << cpp_strerror(errno) << dendl;
459 else
460 lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
461 return -errno;
7c673cae 462 }
7c673cae
FG
463 return 0;
464}
465
466/**
9f95a23c
TL
467 * Switch QP to ERROR state and then post a beacon to be able to drain all
468 * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event()
469 * for details.
7c673cae
FG
470 *
471 * \return
472 * -errno if the QueuePair can't switch to ERROR
473 * 0 for success.
474 */
475int Infiniband::QueuePair::to_dead()
476{
477 if (dead)
478 return 0;
7c673cae 479
9f95a23c
TL
480 if (modify_qp_to_error()) {
481 return -1;
482 }
483 ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn
484 << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl;
485
486 struct ibv_send_wr *bad_wr = nullptr, beacon;
487 // FIPS zeroization audit 20191115: this memset is not security related.
488 memset(&beacon, 0, sizeof(beacon));
489 beacon.wr_id = BEACON_WRID;
490 beacon.opcode = IBV_WR_SEND;
491 beacon.send_flags = IBV_SEND_SIGNALED;
492 if (ibv_post_send(qp, &beacon, &bad_wr)) {
493 lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl;
7c673cae
FG
494 return -errno;
495 }
9f95a23c 496 ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl;
7c673cae 497 dead = true;
9f95a23c
TL
498
499 return 0;
7c673cae
FG
500}
501
502int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
503{
504 ibv_qp_attr qpa;
505 ibv_qp_init_attr qpia;
506
507 int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
508 if (r) {
509 lderr(cct) << __func__ << " failed to query qp: "
510 << cpp_strerror(errno) << dendl;
511 return -1;
512 }
513
514 if (rqp)
515 *rqp = qpa.dest_qp_num;
516 return 0;
517}
518
519/**
520 * Get the remote infiniband address for this QueuePair, as set in #plumb().
521 * LIDs are "local IDs" in infiniband terminology. They are short, locally
522 * routable addresses.
523 */
524int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
525{
526 ibv_qp_attr qpa;
527 ibv_qp_init_attr qpia;
528
529 int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
530 if (r) {
531 lderr(cct) << __func__ << " failed to query qp: "
532 << cpp_strerror(errno) << dendl;
533 return -1;
534 }
535
536 if (lid)
537 *lid = qpa.ah_attr.dlid;
538 return 0;
539}
540
541/**
542 * Get the state of a QueuePair.
543 */
544int Infiniband::QueuePair::get_state() const
545{
546 ibv_qp_attr qpa;
547 ibv_qp_init_attr qpia;
548
549 int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
550 if (r) {
551 lderr(cct) << __func__ << " failed to get state: "
552 << cpp_strerror(errno) << dendl;
553 return -1;
554 }
555 return qpa.qp_state;
556}
557
31f18b77
FG
558Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
559 : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
7c673cae
FG
560{
561}
562
563Infiniband::CompletionChannel::~CompletionChannel()
564{
565 if (channel) {
566 int r = ibv_destroy_comp_channel(channel);
567 if (r < 0)
568 lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
11fdf7f2 569 ceph_assert(r == 0);
7c673cae
FG
570 }
571}
572
573int Infiniband::CompletionChannel::init()
574{
575 ldout(cct, 20) << __func__ << " started." << dendl;
31f18b77 576 channel = ibv_create_comp_channel(infiniband.device->ctxt);
7c673cae
FG
577 if (!channel) {
578 lderr(cct) << __func__ << " failed to create receive completion channel: "
579 << cpp_strerror(errno) << dendl;
580 return -1;
581 }
582 int rc = NetHandler(cct).set_nonblock(channel->fd);
583 if (rc < 0) {
584 ibv_destroy_comp_channel(channel);
585 return -1;
586 }
587 return 0;
588}
589
590void Infiniband::CompletionChannel::ack_events()
591{
592 ibv_ack_cq_events(cq, cq_events_that_need_ack);
593 cq_events_that_need_ack = 0;
594}
595
596bool Infiniband::CompletionChannel::get_cq_event()
597{
598 ibv_cq *cq = NULL;
599 void *ev_ctx;
600 if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
601 if (errno != EAGAIN && errno != EINTR)
602 lderr(cct) << __func__ << " failed to retrieve CQ event: "
603 << cpp_strerror(errno) << dendl;
604 return false;
605 }
606
607 /* accumulate number of cq events that need to
608 * * be acked, and periodically ack them
609 * */
610 if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
611 ldout(cct, 20) << __func__ << " ack aq events." << dendl;
612 ibv_ack_cq_events(cq, MAX_ACK_EVENT);
613 cq_events_that_need_ack = 0;
614 }
615
616 return true;
617}
618
619
620Infiniband::CompletionQueue::~CompletionQueue()
621{
622 if (cq) {
623 int r = ibv_destroy_cq(cq);
624 if (r < 0)
625 lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
11fdf7f2 626 ceph_assert(r == 0);
7c673cae
FG
627 }
628}
629
630int Infiniband::CompletionQueue::init()
631{
31f18b77 632 cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
7c673cae
FG
633 if (!cq) {
634 lderr(cct) << __func__ << " failed to create receive completion queue: "
635 << cpp_strerror(errno) << dendl;
636 return -1;
637 }
638
639 if (ibv_req_notify_cq(cq, 0)) {
640 lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
641 ibv_destroy_cq(cq);
642 cq = nullptr;
643 return -1;
644 }
645
646 channel->bind_cq(cq);
647 ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
648 return 0;
649}
650
651int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
652{
653 ldout(cct, 20) << __func__ << " started." << dendl;
654 int r = ibv_req_notify_cq(cq, 0);
655 if (r < 0)
656 lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
657 return r;
658}
659
660int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
661 int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
662 if (r < 0) {
663 lderr(cct) << __func__ << " poll_completion_queue occur met error: "
664 << cpp_strerror(errno) << dendl;
665 return -1;
666 }
667 return r;
668}
669
670
671Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
672 : pd(ibv_alloc_pd(device->ctxt))
673{
674 if (pd == NULL) {
675 lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
676 ceph_abort();
677 }
678}
679
680Infiniband::ProtectionDomain::~ProtectionDomain()
681{
224ce89b 682 ibv_dealloc_pd(pd);
7c673cae
FG
683}
684
685
9f95a23c
TL
686Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t bytes, char* buffer,
687 uint32_t offset, uint32_t bound, uint32_t lkey, QueuePair* qp)
688 : mr(m), qp(qp), lkey(lkey), bytes(bytes), offset(offset), bound(bound), buffer(buffer)
7c673cae
FG
689{
690}
691
692Infiniband::MemoryManager::Chunk::~Chunk()
693{
7c673cae
FG
694}
695
7c673cae
FG
696uint32_t Infiniband::MemoryManager::Chunk::get_offset()
697{
698 return offset;
699}
700
9f95a23c 701uint32_t Infiniband::MemoryManager::Chunk::get_size() const
7c673cae 702{
9f95a23c 703 return bound - offset;
7c673cae
FG
704}
705
706void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
707{
708 offset = 0;
709 bound = b;
710}
711
712uint32_t Infiniband::MemoryManager::Chunk::get_bound()
713{
714 return bound;
715}
716
717uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
718{
9f95a23c
TL
719 uint32_t left = get_size();
720 uint32_t read_len = left <= len ? left : len;
721 memcpy(buf, buffer + offset, read_len);
722 offset += read_len;
723 return read_len;
7c673cae
FG
724}
725
726uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
727{
9f95a23c
TL
728 uint32_t write_len = (bytes - offset) <= len ? (bytes - offset) : len;
729 memcpy(buffer + offset, buf, write_len);
730 offset += write_len;
731 return write_len;
7c673cae
FG
732}
733
734bool Infiniband::MemoryManager::Chunk::full()
735{
736 return offset == bytes;
737}
738
9f95a23c 739void Infiniband::MemoryManager::Chunk::reset_read_chunk()
7c673cae 740{
9f95a23c
TL
741 offset = 0;
742 bound = 0;
7c673cae
FG
743}
744
9f95a23c 745void Infiniband::MemoryManager::Chunk::reset_write_chunk()
7c673cae
FG
746{
747 offset = 0;
9f95a23c 748 bound = bytes;
7c673cae
FG
749}
750
751Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
9f95a23c 752 : manager(m), buffer_size(s)
7c673cae
FG
753{
754}
755
756Infiniband::MemoryManager::Cluster::~Cluster()
757{
224ce89b 758 int r = ibv_dereg_mr(chunk_base->mr);
11fdf7f2 759 ceph_assert(r == 0);
7c673cae
FG
760 const auto chunk_end = chunk_base + num_chunk;
761 for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
762 chunk->~Chunk();
763 }
764
765 ::free(chunk_base);
11fdf7f2 766 manager.free(base);
7c673cae
FG
767}
768
769int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
770{
11fdf7f2 771 ceph_assert(!base);
7c673cae
FG
772 num_chunk = num;
773 uint32_t bytes = buffer_size * num;
11fdf7f2
TL
774
775 base = (char*)manager.malloc(bytes);
7c673cae 776 end = base + bytes;
11fdf7f2 777 ceph_assert(base);
7c673cae 778 chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
92f5a8d4 779 // FIPS zeroization audit 20191115: this memset is not security related.
11fdf7f2 780 memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num);
7c673cae 781 free_chunks.reserve(num);
224ce89b 782 ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
11fdf7f2 783 ceph_assert(m);
7c673cae
FG
784 Chunk* chunk = chunk_base;
785 for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
9f95a23c 786 new(chunk) Chunk(m, buffer_size, base + offset, 0, buffer_size, m->lkey);
7c673cae
FG
787 free_chunks.push_back(chunk);
788 chunk++;
789 }
790 return 0;
791}
792
793void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
794{
9f95a23c 795 std::lock_guard l{lock};
7c673cae 796 for (auto c : ck) {
9f95a23c 797 c->reset_write_chunk();
7c673cae
FG
798 free_chunks.push_back(c);
799 }
800}
801
9f95a23c 802int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t block_size)
7c673cae 803{
9f95a23c
TL
804 std::lock_guard l{lock};
805 uint32_t chunk_buffer_number = (block_size + buffer_size - 1) / buffer_size;
806 chunk_buffer_number = free_chunks.size() < chunk_buffer_number ? free_chunks.size(): chunk_buffer_number;
807 uint32_t r = 0;
808
809 for (r = 0; r < chunk_buffer_number; ++r) {
7c673cae
FG
810 chunks.push_back(free_chunks.back());
811 free_chunks.pop_back();
812 }
813 return r;
814}
815
11fdf7f2
TL
816bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs)
817{
818 /* unlimited */
819 if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0)
820 return true;
821
822 if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) {
823 lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " <<
824 n_bufs_allocated << " requested: " << nbufs <<
825 " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl;
826 return false;
827 }
828
829 return true;
830}
831
832void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) {
833 perf_logger = logger;
834 if (perf_logger != nullptr)
835 perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated);
836}
837
838void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs)
839{
840 n_bufs_allocated += nbufs;
841
842 if (!perf_logger)
843 return;
844
845 if (nbufs > 0) {
846 perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);
847 } else {
848 perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs);
849 }
850}
851
852void *Infiniband::MemoryManager::mem_pool::slow_malloc()
853{
11fdf7f2 854 // this will trigger pool expansion via PoolAllocator::malloc()
9f95a23c
TL
855 return PoolAllocator::with_context(ctx, [this] {
856 return boost::pool<PoolAllocator>::malloc();
857 });
11fdf7f2
TL
858}
859
9f95a23c
TL
860Infiniband::MemoryManager::MemPoolContext*
861Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr;
11fdf7f2
TL
862
863// lock is taken by mem_pool::slow_malloc()
9f95a23c 864ceph::mutex& Infiniband::MemoryManager::PoolAllocator::get_lock()
11fdf7f2 865{
9f95a23c
TL
866 static ceph::mutex lock = ceph::make_mutex("pool-alloc-lock");
867 return lock;
868}
11fdf7f2 869
9f95a23c
TL
870char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size)
871{
11fdf7f2 872 ceph_assert(g_ctx);
9f95a23c
TL
873 MemoryManager *manager = g_ctx->manager;
874 CephContext *cct = manager->cct;
875 size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size;
876 size_t chunk_buffer_number = block_size / chunk_buffer_size;
11fdf7f2 877
9f95a23c 878 if (!g_ctx->can_alloc(chunk_buffer_number))
11fdf7f2
TL
879 return NULL;
880
9f95a23c
TL
881 mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info)));
882 if (!minfo) {
883 lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers "
884 " Its block size is : " << block_size + sizeof(mem_info) << dendl;
11fdf7f2
TL
885 return NULL;
886 }
887
9f95a23c
TL
888 minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
889 if (minfo->mr == NULL) {
890 lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. "
891 " relase allocated memory now." << dendl;
892 manager->free(minfo);
11fdf7f2
TL
893 return NULL;
894 }
895
9f95a23c 896 minfo->nbufs = chunk_buffer_number;
11fdf7f2 897 // save this chunk context
9f95a23c 898 minfo->ctx = g_ctx;
11fdf7f2
TL
899
900 // note that the memory can be allocated before perf logger is set
9f95a23c 901 g_ctx->update_stats(chunk_buffer_number);
11fdf7f2
TL
902
903 /* initialize chunks */
9f95a23c
TL
904 Chunk *chunk = minfo->chunks;
905 for (unsigned i = 0; i < chunk_buffer_number; i++) {
906 new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey);
907 chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size);
11fdf7f2
TL
908 }
909
9f95a23c 910 return reinterpret_cast<char *>(minfo->chunks);
11fdf7f2
TL
911}
912
7c673cae 913
11fdf7f2
TL
914void Infiniband::MemoryManager::PoolAllocator::free(char * const block)
915{
916 mem_info *m;
9f95a23c 917 std::lock_guard l{get_lock()};
11fdf7f2 918
9f95a23c
TL
919 Chunk *mem_info_chunk = reinterpret_cast<Chunk *>(block);
920 m = reinterpret_cast<mem_info *>(reinterpret_cast<char *>(mem_info_chunk) - offsetof(mem_info, chunks));
11fdf7f2
TL
921 m->ctx->update_stats(-m->nbufs);
922 ibv_dereg_mr(m->mr);
923 m->ctx->manager->free(m);
924}
925
926Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
927 : cct(c), device(d), pd(p),
928 rxbuf_pool_ctx(this),
929 rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
930 c->_conf->ms_async_rdma_receive_buffers > 0 ?
931 // if possible make initial pool size 2 * receive_queue_len
932 // that way there will be no pool expansion upon receive of the
933 // first packet.
934 (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
935 c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
936 // rx pool is infinite, we can set any initial size that we want
9f95a23c
TL
937 2 * c->_conf->ms_async_rdma_receive_queue_len,
938 device->device_attr.max_mr_size / (sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size))
7c673cae 939{
7c673cae
FG
940}
941
942Infiniband::MemoryManager::~MemoryManager()
943{
7c673cae
FG
944 if (send)
945 delete send;
946}
947
11fdf7f2 948void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
7c673cae 949{
9f95a23c
TL
950 size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB;
951 char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0);
7c673cae 952 if (ptr == MAP_FAILED) {
11fdf7f2 953 ptr = (char *)std::malloc(real_size);
7c673cae
FG
954 if (ptr == NULL) return NULL;
955 real_size = 0;
956 }
957 *((size_t *)ptr) = real_size;
9f95a23c 958 return ptr + HUGE_PAGE_SIZE_2MB;
7c673cae
FG
959}
960
11fdf7f2 961void Infiniband::MemoryManager::huge_pages_free(void *ptr)
7c673cae
FG
962{
963 if (ptr == NULL) return;
9f95a23c 964 void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB;
7c673cae 965 size_t real_size = *((size_t *)real_ptr);
9f95a23c 966 ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0);
7c673cae
FG
967 if (real_size != 0)
968 munmap(real_ptr, real_size);
969 else
11fdf7f2
TL
970 std::free(real_ptr);
971}
972
973
974void* Infiniband::MemoryManager::malloc(size_t size)
975{
976 if (cct->_conf->ms_async_rdma_enable_hugepage)
977 return huge_pages_malloc(size);
978 else
979 return std::malloc(size);
980}
981
982void Infiniband::MemoryManager::free(void *ptr)
983{
984 if (cct->_conf->ms_async_rdma_enable_hugepage)
985 huge_pages_free(ptr);
986 else
987 std::free(ptr);
7c673cae
FG
988}
989
11fdf7f2 990void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
7c673cae 991{
11fdf7f2
TL
992 ceph_assert(device);
993 ceph_assert(pd);
7c673cae
FG
994
995 send = new Cluster(*this, size);
996 send->fill(tx_num);
997}
998
999void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
1000{
1001 send->take_back(chunks);
1002}
1003
1004int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
1005{
1006 return send->get_buffers(c, bytes);
1007}
1008
11fdf7f2 1009static std::atomic<bool> init_prereq = {false};
7c673cae 1010
11fdf7f2 1011void Infiniband::verify_prereq(CephContext *cct) {
9f95a23c 1012 int rc = 0;
11fdf7f2
TL
1013 ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl;
1014 if (cct->_conf->ms_async_rdma_enable_hugepage){
1015 rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1);
1016 ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl;
1017 if (rc) {
1018 lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
1019 ceph_abort();
1020 }
1021 }
1022
9f95a23c
TL
1023 //On RDMA MUST be called before fork
1024 rc = ibv_fork_init();
1025 if (rc) {
1026 lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
1027 ceph_abort();
1028 }
1029
11fdf7f2
TL
1030 //Check ulimit
1031 struct rlimit limit;
1032 getrlimit(RLIMIT_MEMLOCK, &limit);
1033 if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
1034 lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
1035 " We recommend setting this parameter to infinity" << dendl;
1036 }
1037 init_prereq = true;
1038}
7c673cae 1039
11fdf7f2 1040Infiniband::Infiniband(CephContext *cct)
9f95a23c 1041 : cct(cct),
11fdf7f2
TL
1042 device_name(cct->_conf->ms_async_rdma_device_name),
1043 port_num( cct->_conf->ms_async_rdma_port_num)
7c673cae 1044{
11fdf7f2
TL
1045 if (!init_prereq)
1046 verify_prereq(cct);
1047 ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl;
7c673cae
FG
1048}
1049
7c673cae
FG
1050void Infiniband::init()
1051{
9f95a23c 1052 std::lock_guard l{lock};
7c673cae
FG
1053
1054 if (initialized)
1055 return;
1056
31f18b77 1057 device_list = new DeviceList(cct);
7c673cae
FG
1058 initialized = true;
1059
31f18b77 1060 device = device_list->get_device(device_name.c_str());
11fdf7f2 1061 ceph_assert(device);
31f18b77 1062 device->binding_port(cct, port_num);
31f18b77
FG
1063 ib_physical_port = device->active_port->get_port_num();
1064 pd = new ProtectionDomain(cct, device);
11fdf7f2 1065 ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
31f18b77 1066
11fdf7f2 1067 support_srq = cct->_conf->ms_async_rdma_support_srq;
9f95a23c
TL
1068 if (support_srq) {
1069 ceph_assert(device->device_attr.max_srq);
1070 rx_queue_len = device->device_attr.max_srq_wr;
1071 }
11fdf7f2 1072 else
9f95a23c 1073 rx_queue_len = device->device_attr.max_qp_wr;
11fdf7f2
TL
1074 if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
1075 rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
9f95a23c 1076 ldout(cct, 1) << __func__ << " assigning: " << rx_queue_len << " receive buffers" << dendl;
31f18b77 1077 } else {
9f95a23c 1078 ldout(cct, 0) << __func__ << " using the max allowed receive buffers: " << rx_queue_len << dendl;
11fdf7f2
TL
1079 }
1080
1081 // check for the misconfiguration
1082 if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
1083 rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
1084 lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
1085 rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
1086 cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
1087 ceph_abort();
31f18b77
FG
1088 }
1089
9f95a23c
TL
1090 // Keep extra one WR for a beacon to indicate all WCEs were consumed
1091 tx_queue_len = device->device_attr.max_qp_wr - 1;
11fdf7f2
TL
1092 if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
1093 tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
1094 ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
31f18b77 1095 } else {
11fdf7f2 1096 ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
31f18b77
FG
1097 }
1098
9f95a23c
TL
1099 //check for the memory region size misconfiguration
1100 if ((uint64_t)cct->_conf->ms_async_rdma_buffer_size * tx_queue_len > device->device_attr.max_mr_size) {
1101 lderr(cct) << __func__ << " Out of max memory region size " << dendl;
1102 ceph_abort();
1103 }
1104
1105 ldout(cct, 1) << __func__ << " device allow " << device->device_attr.max_cqe
31f18b77
FG
1106 << " completion entries" << dendl;
1107
11fdf7f2
TL
1108 memory_manager = new MemoryManager(cct, device, pd);
1109 memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
31f18b77 1110
11fdf7f2
TL
1111 if (support_srq) {
1112 srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
1113 post_chunks_to_rq(rx_queue_len, NULL); //add to srq
1114 }
7c673cae
FG
1115}
1116
31f18b77
FG
1117Infiniband::~Infiniband()
1118{
1119 if (!initialized)
1120 return;
11fdf7f2
TL
1121 if (support_srq)
1122 ibv_destroy_srq(srq);
31f18b77
FG
1123 delete memory_manager;
1124 delete pd;
9f95a23c 1125 delete device_list;
31f18b77
FG
1126}
1127
31f18b77
FG
1128/**
1129 * Create a shared receive queue. This basically wraps the verbs call.
1130 *
1131 * \param[in] max_wr
1132 * The max number of outstanding work requests in the SRQ.
1133 * \param[in] max_sge
1134 * The max number of scatter elements per WR.
1135 * \return
1136 * A valid ibv_srq pointer, or NULL on error.
1137 */
1138ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
1139{
1140 ibv_srq_init_attr sia;
92f5a8d4 1141 // FIPS zeroization audit 20191115: this memset is not security related.
31f18b77
FG
1142 memset(&sia, 0, sizeof(sia));
1143 sia.srq_context = device->ctxt;
1144 sia.attr.max_wr = max_wr;
1145 sia.attr.max_sge = max_sge;
1146 return ibv_create_srq(pd->pd, &sia);
1147}
1148
1149int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
1150{
1151 return memory_manager->get_send_buffers(c, bytes);
1152}
1153
1154/**
1155 * Create a new QueuePair. This factory should be used in preference to
1156 * the QueuePair constructor directly, since this lets derivatives of
1157 * Infiniband, e.g. MockInfiniband (if it existed),
1158 * return mocked out QueuePair derivatives.
1159 *
1160 * \return
1161 * QueuePair on success or NULL if init fails
1162 * See QueuePair::QueuePair for parameter documentation.
1163 */
11fdf7f2
TL
1164Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx,
1165 CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id)
31f18b77
FG
1166{
1167 Infiniband::QueuePair *qp = new QueuePair(
11fdf7f2 1168 cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id);
31f18b77
FG
1169 if (qp->init()) {
1170 delete qp;
1171 return NULL;
1172 }
1173 return qp;
1174}
1175
9f95a23c 1176int Infiniband::post_chunks_to_rq(int rq_wr_num, QueuePair *qp)
31f18b77 1177{
9f95a23c
TL
1178 int ret = 0;
1179 Chunk *chunk = nullptr;
1180
1181 ibv_recv_wr *rx_work_request = static_cast<ibv_recv_wr*>(::calloc(rq_wr_num, sizeof(ibv_recv_wr)));
1182 ibv_sge *isge = static_cast<ibv_sge*>(::calloc(rq_wr_num, sizeof(ibv_sge)));
1183 ceph_assert(rx_work_request);
1184 ceph_assert(isge);
11fdf7f2 1185
9f95a23c
TL
1186 int i = 0;
1187 while (i < rq_wr_num) {
11fdf7f2 1188 chunk = get_memory_manager()->get_rx_buffer();
9f95a23c
TL
1189 if (chunk == nullptr) {
1190 lderr(cct) << __func__ << " WARNING: out of memory. Request " << rq_wr_num <<
1191 " rx buffers. Only get " << i << " rx buffers." << dendl;
1192 if (i == 0) {
1193 ::free(rx_work_request);
1194 ::free(isge);
11fdf7f2 1195 return 0;
9f95a23c
TL
1196 }
1197 break; //get some buffers, so we need post them to recevie queue
11fdf7f2 1198 }
31f18b77 1199
11fdf7f2
TL
1200 isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
1201 isge[i].length = chunk->bytes;
1202 isge[i].lkey = chunk->lkey;
31f18b77 1203
9f95a23c
TL
1204 rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// assign chunk address as work request id
1205
1206 if (i != 0) {
1207 rx_work_request[i - 1].next = &rx_work_request[i];
11fdf7f2
TL
1208 }
1209 rx_work_request[i].sg_list = &isge[i];
1210 rx_work_request[i].num_sge = 1;
9f95a23c
TL
1211
1212 if (qp && !qp->get_srq()) {
1213 chunk->set_qp(qp);
1214 qp->add_rq_wr(chunk);
1215 }
11fdf7f2 1216 i++;
31f18b77 1217 }
9f95a23c
TL
1218
1219 ibv_recv_wr *badworkrequest = nullptr;
11fdf7f2 1220 if (support_srq) {
9f95a23c 1221 ret = ibv_post_srq_recv(srq, rx_work_request, &badworkrequest);
11fdf7f2
TL
1222 } else {
1223 ceph_assert(qp);
9f95a23c 1224 ret = ibv_post_recv(qp->get_qp(), rx_work_request, &badworkrequest);
11fdf7f2 1225 }
9f95a23c
TL
1226
1227 ::free(rx_work_request);
1228 ::free(isge);
1229 ceph_assert(badworkrequest == nullptr && ret == 0);
11fdf7f2 1230 return i;
31f18b77
FG
1231}
1232
1233Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
7c673cae 1234{
31f18b77
FG
1235 Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
1236 if (cc->init()) {
1237 delete cc;
1238 return NULL;
1239 }
1240 return cc;
7c673cae
FG
1241}
1242
31f18b77
FG
1243Infiniband::CompletionQueue* Infiniband::create_comp_queue(
1244 CephContext *cct, CompletionChannel *cc)
7c673cae 1245{
31f18b77
FG
1246 Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
1247 cct, *this, CQ_DEPTH, cc);
1248 if (cq->init()) {
1249 delete cq;
1250 return NULL;
1251 }
1252 return cq;
1253}
1254
7c673cae
FG
1255Infiniband::QueuePair::~QueuePair()
1256{
9f95a23c 1257 ldout(cct, 20) << __func__ << " destroy Queue Pair, qp number: " << qp->qp_num << " left SQ WR " << recv_queue.size() << dendl;
7c673cae
FG
1258 if (qp) {
1259 ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
11fdf7f2 1260 ceph_assert(!ibv_destroy_qp(qp));
7c673cae 1261 }
9f95a23c
TL
1262
1263 for (auto& chunk: recv_queue) {
1264 infiniband.get_memory_manager()->release_rx_buffer(chunk);
1265 }
1266 recv_queue.clear();
7c673cae
FG
1267}
1268
1269/**
1270 * Given a string representation of the `status' field from Verbs
1271 * struct `ibv_wc'.
1272 *
1273 * \param[in] status
1274 * The integer status obtained in ibv_wc.status.
1275 * \return
1276 * A string corresponding to the given status.
1277 */
1278const char* Infiniband::wc_status_to_string(int status)
1279{
1280 static const char *lookup[] = {
1281 "SUCCESS",
1282 "LOC_LEN_ERR",
1283 "LOC_QP_OP_ERR",
1284 "LOC_EEC_OP_ERR",
1285 "LOC_PROT_ERR",
1286 "WR_FLUSH_ERR",
1287 "MW_BIND_ERR",
1288 "BAD_RESP_ERR",
1289 "LOC_ACCESS_ERR",
1290 "REM_INV_REQ_ERR",
1291 "REM_ACCESS_ERR",
1292 "REM_OP_ERR",
1293 "RETRY_EXC_ERR",
1294 "RNR_RETRY_EXC_ERR",
1295 "LOC_RDD_VIOL_ERR",
1296 "REM_INV_RD_REQ_ERR",
1297 "REM_ABORT_ERR",
1298 "INV_EECN_ERR",
1299 "INV_EEC_STATE_ERR",
1300 "FATAL_ERR",
1301 "RESP_TIMEOUT_ERR",
1302 "GENERAL_ERR"
1303 };
1304
1305 if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
1306 return "<status out of range!>";
1307 return lookup[status];
1308}
1309
1310const char* Infiniband::qp_state_string(int status) {
1311 switch(status) {
1312 case IBV_QPS_RESET : return "IBV_QPS_RESET";
1313 case IBV_QPS_INIT : return "IBV_QPS_INIT";
1314 case IBV_QPS_RTR : return "IBV_QPS_RTR";
1315 case IBV_QPS_RTS : return "IBV_QPS_RTS";
1316 case IBV_QPS_SQD : return "IBV_QPS_SQD";
1317 case IBV_QPS_SQE : return "IBV_QPS_SQE";
1318 case IBV_QPS_ERR : return "IBV_QPS_ERR";
1319 default: return " out of range.";
1320 }
1321}