]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "Infiniband.h" | |
7c673cae FG |
18 | #include "common/errno.h" |
19 | #include "common/debug.h" | |
31f18b77 | 20 | #include "RDMAStack.h" |
11fdf7f2 TL |
21 | #include <sys/time.h> |
22 | #include <sys/resource.h> | |
7c673cae FG |
23 | |
24 | #define dout_subsys ceph_subsys_ms | |
25 | #undef dout_prefix | |
26 | #define dout_prefix *_dout << "Infiniband " | |
27 | ||
31f18b77 | 28 | static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; |
7c673cae | 29 | static const uint32_t MAX_INLINE_DATA = 0; |
31f18b77 FG |
30 | static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); |
31 | static const uint32_t CQ_DEPTH = 30000; | |
32 | ||
9f95a23c TL |
33 | Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), |
34 | gid_idx(cct->_conf.get_val<int64_t>("ms_async_rdma_gid_idx")) | |
31f18b77 | 35 | { |
9f95a23c TL |
36 | int r = ibv_query_port(ctxt, port_num, &port_attr); |
37 | if (r == -1) { | |
38 | lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; | |
39 | ceph_abort(); | |
40 | } | |
41 | ||
42 | lid = port_attr.lid; | |
43 | ceph_assert(gid_idx < port_attr.gid_tbl_len); | |
31f18b77 FG |
44 | #ifdef HAVE_IBV_EXP |
45 | union ibv_gid cgid; | |
46 | struct ibv_exp_gid_attr gid_attr; | |
47 | bool malformed = false; | |
48 | ||
49 | ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl; | |
31f18b77 | 50 | |
31f18b77 FG |
51 | |
52 | // search for requested GID in GIDs table | |
53 | ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) | |
54 | << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; | |
55 | r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), | |
56 | "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" | |
57 | ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", | |
58 | &cgid.raw[ 0], &cgid.raw[ 1], | |
59 | &cgid.raw[ 2], &cgid.raw[ 3], | |
60 | &cgid.raw[ 4], &cgid.raw[ 5], | |
61 | &cgid.raw[ 6], &cgid.raw[ 7], | |
62 | &cgid.raw[ 8], &cgid.raw[ 9], | |
63 | &cgid.raw[10], &cgid.raw[11], | |
64 | &cgid.raw[12], &cgid.raw[13], | |
65 | &cgid.raw[14], &cgid.raw[15]); | |
66 | ||
67 | if (r != 16) { | |
68 | ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; | |
69 | malformed = true; | |
70 | } | |
71 | ||
72 | gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; | |
73 | ||
9f95a23c | 74 | for (gid_idx = 0; gid_idx < port_attr.gid_tbl_len; gid_idx++) { |
31f18b77 FG |
75 | r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); |
76 | if (r) { | |
77 | lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; | |
78 | ceph_abort(); | |
79 | } | |
80 | r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); | |
81 | if (r) { | |
82 | lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; | |
83 | ceph_abort(); | |
84 | } | |
85 | ||
86 | if (malformed) break; // stay with gid_idx=0 | |
87 | if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && | |
88 | (memcmp(&gid, &cgid, 16) == 0) ) { | |
89 | ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; | |
90 | break; | |
91 | } | |
92 | } | |
93 | ||
9f95a23c | 94 | if (gid_idx == port_attr.gid_tbl_len) { |
31f18b77 FG |
95 | lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; |
96 | ceph_abort(); | |
97 | } | |
98 | #else | |
9f95a23c | 99 | r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); |
31f18b77 FG |
100 | if (r) { |
101 | lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl; | |
102 | ceph_abort(); | |
103 | } | |
104 | #endif | |
105 | } | |
106 | ||
9f95a23c | 107 | Device::Device(CephContext *cct, ibv_device* ib_dev): device(ib_dev), active_port(nullptr) |
31f18b77 | 108 | { |
9f95a23c TL |
109 | ceph_assert(device); |
110 | ctxt = ibv_open_device(device); | |
111 | ceph_assert(ctxt); | |
112 | ||
31f18b77 | 113 | name = ibv_get_device_name(device); |
9f95a23c TL |
114 | |
115 | int r = ibv_query_device(ctxt, &device_attr); | |
116 | if (r) { | |
117 | lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; | |
31f18b77 FG |
118 | ceph_abort(); |
119 | } | |
9f95a23c TL |
120 | } |
121 | ||
122 | Device::Device(CephContext *cct, struct ibv_context *ib_ctx): device(ib_ctx->device), | |
123 | active_port(nullptr) | |
124 | { | |
125 | ceph_assert(device); | |
126 | ctxt = ib_ctx; | |
127 | ceph_assert(ctxt); | |
128 | ||
129 | name = ibv_get_device_name(device); | |
130 | ||
131 | int r = ibv_query_device(ctxt, &device_attr); | |
132 | if (r) { | |
31f18b77 FG |
133 | lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; |
134 | ceph_abort(); | |
135 | } | |
136 | } | |
137 | ||
138 | void Device::binding_port(CephContext *cct, int port_num) { | |
9f95a23c TL |
139 | port_cnt = device_attr.phys_port_cnt; |
140 | for (uint8_t port_id = 1; port_id <= port_cnt; ++port_id) { | |
141 | Port *port = new Port(cct, ctxt, port_id); | |
142 | if (port_id == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) { | |
31f18b77 | 143 | active_port = port; |
9f95a23c | 144 | ldout(cct, 1) << __func__ << " found active port " << static_cast<int>(port_id) << dendl; |
31f18b77 FG |
145 | break; |
146 | } else { | |
9f95a23c TL |
147 | ldout(cct, 10) << __func__ << " port " << port_id << " is not what we want. state: " |
148 | << ibv_port_state_str(port->get_port_attr()->state) << dendl; | |
149 | delete port; | |
31f18b77 | 150 | } |
31f18b77 FG |
151 | } |
152 | if (nullptr == active_port) { | |
153 | lderr(cct) << __func__ << " port not found" << dendl; | |
11fdf7f2 | 154 | ceph_assert(active_port); |
31f18b77 FG |
155 | } |
156 | } | |
157 | ||
7c673cae FG |
158 | |
159 | Infiniband::QueuePair::QueuePair( | |
31f18b77 | 160 | CephContext *c, Infiniband& infiniband, ibv_qp_type type, |
7c673cae FG |
161 | int port, ibv_srq *srq, |
162 | Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, | |
11fdf7f2 | 163 | uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key) |
31f18b77 | 164 | : cct(c), infiniband(infiniband), |
7c673cae | 165 | type(type), |
31f18b77 | 166 | ctxt(infiniband.device->ctxt), |
7c673cae | 167 | ib_physical_port(port), |
31f18b77 | 168 | pd(infiniband.pd->pd), |
7c673cae FG |
169 | srq(srq), |
170 | qp(NULL), | |
9f95a23c | 171 | cm_id(cid), peer_cm_meta{0}, local_cm_meta{0}, |
7c673cae FG |
172 | txcq(txcq), |
173 | rxcq(rxcq), | |
9f95a23c TL |
174 | initial_psn(lrand48() & PSN_MSK), |
175 | // One extra WR for beacon | |
176 | max_send_wr(tx_queue_len + 1), | |
11fdf7f2 | 177 | max_recv_wr(rx_queue_len), |
7c673cae FG |
178 | q_key(q_key), |
179 | dead(false) | |
180 | { | |
7c673cae FG |
181 | if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { |
182 | lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl; | |
183 | ceph_abort(); | |
184 | } | |
9f95a23c TL |
185 | } |
186 | ||
187 | int Infiniband::QueuePair::modify_qp_to_error(void) | |
188 | { | |
189 | ibv_qp_attr qpa; | |
190 | // FIPS zeroization audit 20191115: this memset is not security related. | |
191 | memset(&qpa, 0, sizeof(qpa)); | |
192 | qpa.qp_state = IBV_QPS_ERR; | |
193 | if (ibv_modify_qp(qp, &qpa, IBV_QP_STATE)) { | |
194 | lderr(cct) << __func__ << " failed to transition to ERROR state: " << cpp_strerror(errno) << dendl; | |
195 | return -1; | |
196 | } | |
197 | ldout(cct, 20) << __func__ << " transition to ERROR state successfully." << dendl; | |
198 | return 0; | |
199 | } | |
200 | ||
201 | int Infiniband::QueuePair::modify_qp_to_rts(void) | |
202 | { | |
203 | // move from RTR state RTS | |
204 | ibv_qp_attr qpa; | |
205 | // FIPS zeroization audit 20191115: this memset is not security related. | |
206 | memset(&qpa, 0, sizeof(qpa)); | |
207 | qpa.qp_state = IBV_QPS_RTS; | |
208 | /* | |
209 | * How long to wait before retrying if packet lost or server dead. | |
210 | * Supposedly the timeout is 4.096us*2^timeout. However, the actual | |
211 | * timeout appears to be 4.096us*2^(timeout+1), so the setting | |
212 | * below creates a 135ms timeout. | |
213 | */ | |
214 | qpa.timeout = 0x12; | |
215 | // How many times to retry after timeouts before giving up. | |
216 | qpa.retry_cnt = 7; | |
217 | /* | |
218 | * How many times to retry after RNR (receiver not ready) condition | |
219 | * before giving up. Occurs when the remote side has not yet posted | |
220 | * a receive request. | |
221 | */ | |
222 | qpa.rnr_retry = 7; // 7 is infinite retry. | |
223 | qpa.sq_psn = local_cm_meta.psn; | |
224 | qpa.max_rd_atomic = 1; | |
225 | ||
226 | int attr_mask = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; | |
227 | int r = ibv_modify_qp(qp, &qpa, attr_mask); | |
228 | if (r) { | |
229 | lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl; | |
230 | return -1; | |
231 | } | |
232 | ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; | |
233 | return 0; | |
234 | } | |
235 | ||
236 | int Infiniband::QueuePair::modify_qp_to_rtr(void) | |
237 | { | |
238 | // move from INIT to RTR state | |
239 | ibv_qp_attr qpa; | |
240 | // FIPS zeroization audit 20191115: this memset is not security related. | |
241 | memset(&qpa, 0, sizeof(qpa)); | |
242 | qpa.qp_state = IBV_QPS_RTR; | |
243 | qpa.path_mtu = IBV_MTU_1024; | |
244 | qpa.dest_qp_num = peer_cm_meta.local_qpn; | |
245 | qpa.rq_psn = peer_cm_meta.psn; | |
246 | qpa.max_dest_rd_atomic = 1; | |
247 | qpa.min_rnr_timer = 0x12; | |
248 | qpa.ah_attr.is_global = 1; | |
249 | qpa.ah_attr.grh.hop_limit = 6; | |
250 | qpa.ah_attr.grh.dgid = peer_cm_meta.gid; | |
251 | qpa.ah_attr.grh.sgid_index = infiniband.get_device()->get_gid_idx(); | |
252 | qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; | |
253 | //qpa.ah_attr.grh.flow_label = 0; | |
254 | ||
255 | qpa.ah_attr.dlid = peer_cm_meta.lid; | |
256 | qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; | |
257 | qpa.ah_attr.src_path_bits = 0; | |
258 | qpa.ah_attr.port_num = (uint8_t)(ib_physical_port); | |
259 | ||
260 | ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; | |
261 | ||
262 | int attr_mask = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC; | |
263 | ||
264 | int r = ibv_modify_qp(qp, &qpa, attr_mask); | |
265 | if (r) { | |
266 | lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl; | |
267 | return -1; | |
268 | } | |
269 | ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; | |
270 | return 0; | |
271 | } | |
272 | ||
273 | int Infiniband::QueuePair::modify_qp_to_init(void) | |
274 | { | |
275 | // move from RESET to INIT state | |
276 | ibv_qp_attr qpa; | |
277 | // FIPS zeroization audit 20191115: this memset is not security related. | |
278 | memset(&qpa, 0, sizeof(qpa)); | |
279 | qpa.qp_state = IBV_QPS_INIT; | |
280 | qpa.pkey_index = 0; | |
281 | qpa.port_num = (uint8_t)(ib_physical_port); | |
282 | qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; | |
283 | qpa.qkey = q_key; | |
284 | ||
285 | int mask = IBV_QP_STATE | IBV_QP_PORT; | |
286 | switch (type) { | |
287 | case IBV_QPT_RC: | |
288 | mask |= IBV_QP_ACCESS_FLAGS; | |
289 | mask |= IBV_QP_PKEY_INDEX; | |
290 | break; | |
291 | case IBV_QPT_UD: | |
292 | mask |= IBV_QP_QKEY; | |
293 | mask |= IBV_QP_PKEY_INDEX; | |
294 | break; | |
295 | case IBV_QPT_RAW_PACKET: | |
296 | break; | |
297 | default: | |
298 | ceph_abort(); | |
299 | } | |
300 | ||
301 | if (ibv_modify_qp(qp, &qpa, mask)) { | |
302 | lderr(cct) << __func__ << " failed to switch to INIT state Queue Pair, qp number: " << qp->qp_num | |
303 | << " Error: " << cpp_strerror(errno) << dendl; | |
304 | return -1; | |
305 | } | |
306 | ldout(cct, 20) << __func__ << " successfully switch to INIT state Queue Pair, qp number: " << qp->qp_num << dendl; | |
307 | return 0; | |
7c673cae FG |
308 | } |
309 | ||
310 | int Infiniband::QueuePair::init() | |
311 | { | |
312 | ldout(cct, 20) << __func__ << " started." << dendl; | |
313 | ibv_qp_init_attr qpia; | |
92f5a8d4 | 314 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae FG |
315 | memset(&qpia, 0, sizeof(qpia)); |
316 | qpia.send_cq = txcq->get_cq(); | |
317 | qpia.recv_cq = rxcq->get_cq(); | |
11fdf7f2 TL |
318 | if (srq) { |
319 | qpia.srq = srq; // use the same shared receive queue | |
320 | } else { | |
321 | qpia.cap.max_recv_wr = max_recv_wr; | |
322 | qpia.cap.max_recv_sge = 1; | |
323 | } | |
7c673cae FG |
324 | qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests |
325 | qpia.cap.max_send_sge = 1; // max send scatter-gather elements | |
326 | qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q | |
327 | qpia.qp_type = type; // RC, UC, UD, or XRC | |
328 | qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs | |
329 | ||
11fdf7f2 TL |
330 | if (!cct->_conf->ms_async_rdma_cm) { |
331 | qp = ibv_create_qp(pd, &qpia); | |
332 | if (qp == NULL) { | |
333 | lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; | |
334 | if (errno == ENOMEM) { | |
335 | lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, " | |
336 | " ms_async_rdma_send_buffers or" | |
337 | " ms_async_rdma_buffer_size" << dendl; | |
338 | } | |
339 | return -1; | |
7c673cae | 340 | } |
9f95a23c TL |
341 | if (modify_qp_to_init() != 0) { |
342 | ibv_destroy_qp(qp); | |
343 | return -1; | |
344 | } | |
11fdf7f2 TL |
345 | } else { |
346 | ceph_assert(cm_id->verbs == pd->context); | |
347 | if (rdma_create_qp(cm_id, pd, &qpia)) { | |
348 | lderr(cct) << __func__ << " failed to create queue pair with rdmacm library" | |
349 | << cpp_strerror(errno) << dendl; | |
350 | return -1; | |
351 | } | |
352 | qp = cm_id->qp; | |
7c673cae | 353 | } |
7c673cae FG |
354 | ldout(cct, 20) << __func__ << " successfully create queue pair: " |
355 | << "qp=" << qp << dendl; | |
9f95a23c TL |
356 | local_cm_meta.local_qpn = get_local_qp_number(); |
357 | local_cm_meta.psn = get_initial_psn(); | |
358 | local_cm_meta.lid = infiniband.get_lid(); | |
359 | local_cm_meta.peer_qpn = 0; | |
360 | local_cm_meta.gid = infiniband.get_gid(); | |
361 | if (!srq) { | |
362 | int rq_wrs = infiniband.post_chunks_to_rq(max_recv_wr, this); | |
363 | if (rq_wrs == 0) { | |
364 | lderr(cct) << __func__ << " intialize no SRQ Queue Pair, qp number: " << qp->qp_num | |
365 | << " fatal error: can't post SQ WR " << dendl; | |
366 | return -1; | |
367 | } | |
368 | ldout(cct, 20) << __func__ << " initialize no SRQ Queue Pair, qp number: " | |
369 | << qp->qp_num << " post SQ WR " << rq_wrs << dendl; | |
370 | } | |
371 | return 0; | |
372 | } | |
7c673cae | 373 | |
9f95a23c TL |
374 | void Infiniband::QueuePair::wire_gid_to_gid(const char *wgid, ib_cm_meta_t* cm_meta_data) |
375 | { | |
376 | char tmp[9]; | |
377 | uint32_t v32; | |
378 | int i; | |
11fdf7f2 | 379 | |
9f95a23c TL |
380 | for (tmp[8] = 0, i = 0; i < 4; ++i) { |
381 | memcpy(tmp, wgid + i * 8, 8); | |
382 | sscanf(tmp, "%x", &v32); | |
383 | *(uint32_t *)(&cm_meta_data->gid.raw[i * 4]) = ntohl(v32); | |
384 | } | |
385 | } | |
7c673cae | 386 | |
9f95a23c TL |
387 | void Infiniband::QueuePair::gid_to_wire_gid(const ib_cm_meta_t& cm_meta_data, char wgid[]) |
388 | { | |
389 | for (int i = 0; i < 4; ++i) | |
390 | sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(cm_meta_data.gid.raw + i * 4))); | |
391 | } | |
392 | ||
393 | /* | |
394 | * return value | |
395 | * 1: means no valid buffer read | |
396 | * 0: means got enough buffer | |
397 | * < 0: means error | |
398 | */ | |
399 | int Infiniband::QueuePair::recv_cm_meta(CephContext *cct, int socket_fd) | |
400 | { | |
401 | char msg[TCP_MSG_LEN]; | |
402 | char gid[33]; | |
403 | ssize_t r = ::read(socket_fd, &msg, sizeof(msg)); | |
404 | // Drop incoming qpt | |
405 | if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) { | |
406 | if (rand() % cct->_conf->ms_inject_socket_failures == 0) { | |
407 | ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; | |
408 | return -EINVAL; | |
409 | } | |
410 | } | |
411 | if (r < 0) { | |
412 | r = -errno; | |
413 | lderr(cct) << __func__ << " got error " << r << ": " | |
414 | << cpp_strerror(r) << dendl; | |
415 | } else if (r == 0) { // valid disconnect message of length 0 | |
416 | ldout(cct, 10) << __func__ << " got disconnect message " << dendl; | |
417 | } else if ((size_t)r != sizeof(msg)) { // invalid message | |
418 | ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; | |
419 | r = -EINVAL; | |
420 | } else { // valid message | |
421 | sscanf(msg, "%hx:%x:%x:%x:%s", &(peer_cm_meta.lid), &(peer_cm_meta.local_qpn), &(peer_cm_meta.psn), &(peer_cm_meta.peer_qpn), gid); | |
422 | wire_gid_to_gid(gid, &peer_cm_meta); | |
423 | ldout(cct, 5) << __func__ << " recevd: " << peer_cm_meta.lid << ", " << peer_cm_meta.local_qpn | |
424 | << ", " << peer_cm_meta.psn << ", " << peer_cm_meta.peer_qpn << ", " << gid << dendl; | |
7c673cae | 425 | } |
9f95a23c TL |
426 | return r; |
427 | } | |
7c673cae | 428 | |
9f95a23c TL |
429 | int Infiniband::QueuePair::send_cm_meta(CephContext *cct, int socket_fd) |
430 | { | |
431 | int retry = 0; | |
432 | ssize_t r; | |
433 | ||
434 | char msg[TCP_MSG_LEN]; | |
435 | char gid[33]; | |
436 | retry: | |
437 | gid_to_wire_gid(local_cm_meta, gid); | |
438 | sprintf(msg, "%04x:%08x:%08x:%08x:%s", local_cm_meta.lid, local_cm_meta.local_qpn, local_cm_meta.psn, local_cm_meta.peer_qpn, gid); | |
439 | ldout(cct, 10) << __func__ << " sending: " << local_cm_meta.lid << ", " << local_cm_meta.local_qpn | |
440 | << ", " << local_cm_meta.psn << ", " << local_cm_meta.peer_qpn << ", " << gid << dendl; | |
441 | r = ::write(socket_fd, msg, sizeof(msg)); | |
442 | // Drop incoming qpt | |
443 | if (cct->_conf->ms_inject_socket_failures && socket_fd >= 0) { | |
444 | if (rand() % cct->_conf->ms_inject_socket_failures == 0) { | |
445 | ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; | |
446 | return -EINVAL; | |
447 | } | |
448 | } | |
449 | ||
450 | if ((size_t)r != sizeof(msg)) { | |
451 | // FIXME need to handle EAGAIN instead of retry | |
452 | if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { | |
453 | retry++; | |
454 | goto retry; | |
455 | } | |
456 | if (r < 0) | |
457 | lderr(cct) << __func__ << " send returned error " << errno << ": " | |
458 | << cpp_strerror(errno) << dendl; | |
459 | else | |
460 | lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; | |
461 | return -errno; | |
7c673cae | 462 | } |
7c673cae FG |
463 | return 0; |
464 | } | |
465 | ||
466 | /** | |
9f95a23c TL |
467 | * Switch QP to ERROR state and then post a beacon to be able to drain all |
468 | * WCEs and then safely destroy QP. See RDMADispatcher::handle_tx_event() | |
469 | * for details. | |
7c673cae FG |
470 | * |
471 | * \return | |
472 | * -errno if the QueuePair can't switch to ERROR | |
473 | * 0 for success. | |
474 | */ | |
475 | int Infiniband::QueuePair::to_dead() | |
476 | { | |
477 | if (dead) | |
478 | return 0; | |
7c673cae | 479 | |
9f95a23c TL |
480 | if (modify_qp_to_error()) { |
481 | return -1; | |
482 | } | |
483 | ldout(cct, 20) << __func__ << " force trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn | |
484 | << " bound remote QueuePair, qp number: " << local_cm_meta.peer_qpn << dendl; | |
485 | ||
486 | struct ibv_send_wr *bad_wr = nullptr, beacon; | |
487 | // FIPS zeroization audit 20191115: this memset is not security related. | |
488 | memset(&beacon, 0, sizeof(beacon)); | |
489 | beacon.wr_id = BEACON_WRID; | |
490 | beacon.opcode = IBV_WR_SEND; | |
491 | beacon.send_flags = IBV_SEND_SIGNALED; | |
492 | if (ibv_post_send(qp, &beacon, &bad_wr)) { | |
493 | lderr(cct) << __func__ << " failed to send a beacon: " << cpp_strerror(errno) << dendl; | |
7c673cae FG |
494 | return -errno; |
495 | } | |
9f95a23c | 496 | ldout(cct, 20) << __func__ << " trigger error state Queue Pair, qp number: " << local_cm_meta.local_qpn << " Beacon sent " << dendl; |
7c673cae | 497 | dead = true; |
9f95a23c TL |
498 | |
499 | return 0; | |
7c673cae FG |
500 | } |
501 | ||
502 | int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const | |
503 | { | |
504 | ibv_qp_attr qpa; | |
505 | ibv_qp_init_attr qpia; | |
506 | ||
507 | int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); | |
508 | if (r) { | |
509 | lderr(cct) << __func__ << " failed to query qp: " | |
510 | << cpp_strerror(errno) << dendl; | |
511 | return -1; | |
512 | } | |
513 | ||
514 | if (rqp) | |
515 | *rqp = qpa.dest_qp_num; | |
516 | return 0; | |
517 | } | |
518 | ||
519 | /** | |
520 | * Get the remote infiniband address for this QueuePair, as set in #plumb(). | |
521 | * LIDs are "local IDs" in infiniband terminology. They are short, locally | |
522 | * routable addresses. | |
523 | */ | |
524 | int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const | |
525 | { | |
526 | ibv_qp_attr qpa; | |
527 | ibv_qp_init_attr qpia; | |
528 | ||
529 | int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); | |
530 | if (r) { | |
531 | lderr(cct) << __func__ << " failed to query qp: " | |
532 | << cpp_strerror(errno) << dendl; | |
533 | return -1; | |
534 | } | |
535 | ||
536 | if (lid) | |
537 | *lid = qpa.ah_attr.dlid; | |
538 | return 0; | |
539 | } | |
540 | ||
541 | /** | |
542 | * Get the state of a QueuePair. | |
543 | */ | |
544 | int Infiniband::QueuePair::get_state() const | |
545 | { | |
546 | ibv_qp_attr qpa; | |
547 | ibv_qp_init_attr qpia; | |
548 | ||
549 | int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); | |
550 | if (r) { | |
551 | lderr(cct) << __func__ << " failed to get state: " | |
552 | << cpp_strerror(errno) << dendl; | |
553 | return -1; | |
554 | } | |
555 | return qpa.qp_state; | |
556 | } | |
557 | ||
31f18b77 FG |
558 | Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib) |
559 | : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) | |
7c673cae FG |
560 | { |
561 | } | |
562 | ||
563 | Infiniband::CompletionChannel::~CompletionChannel() | |
564 | { | |
565 | if (channel) { | |
566 | int r = ibv_destroy_comp_channel(channel); | |
567 | if (r < 0) | |
568 | lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; | |
11fdf7f2 | 569 | ceph_assert(r == 0); |
7c673cae FG |
570 | } |
571 | } | |
572 | ||
573 | int Infiniband::CompletionChannel::init() | |
574 | { | |
575 | ldout(cct, 20) << __func__ << " started." << dendl; | |
31f18b77 | 576 | channel = ibv_create_comp_channel(infiniband.device->ctxt); |
7c673cae FG |
577 | if (!channel) { |
578 | lderr(cct) << __func__ << " failed to create receive completion channel: " | |
579 | << cpp_strerror(errno) << dendl; | |
580 | return -1; | |
581 | } | |
582 | int rc = NetHandler(cct).set_nonblock(channel->fd); | |
583 | if (rc < 0) { | |
584 | ibv_destroy_comp_channel(channel); | |
585 | return -1; | |
586 | } | |
587 | return 0; | |
588 | } | |
589 | ||
590 | void Infiniband::CompletionChannel::ack_events() | |
591 | { | |
592 | ibv_ack_cq_events(cq, cq_events_that_need_ack); | |
593 | cq_events_that_need_ack = 0; | |
594 | } | |
595 | ||
596 | bool Infiniband::CompletionChannel::get_cq_event() | |
597 | { | |
598 | ibv_cq *cq = NULL; | |
599 | void *ev_ctx; | |
600 | if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { | |
601 | if (errno != EAGAIN && errno != EINTR) | |
602 | lderr(cct) << __func__ << " failed to retrieve CQ event: " | |
603 | << cpp_strerror(errno) << dendl; | |
604 | return false; | |
605 | } | |
606 | ||
607 | /* accumulate number of cq events that need to | |
608 | * * be acked, and periodically ack them | |
609 | * */ | |
610 | if (++cq_events_that_need_ack == MAX_ACK_EVENT) { | |
611 | ldout(cct, 20) << __func__ << " ack aq events." << dendl; | |
612 | ibv_ack_cq_events(cq, MAX_ACK_EVENT); | |
613 | cq_events_that_need_ack = 0; | |
614 | } | |
615 | ||
616 | return true; | |
617 | } | |
618 | ||
619 | ||
620 | Infiniband::CompletionQueue::~CompletionQueue() | |
621 | { | |
622 | if (cq) { | |
623 | int r = ibv_destroy_cq(cq); | |
624 | if (r < 0) | |
625 | lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; | |
11fdf7f2 | 626 | ceph_assert(r == 0); |
7c673cae FG |
627 | } |
628 | } | |
629 | ||
630 | int Infiniband::CompletionQueue::init() | |
631 | { | |
31f18b77 | 632 | cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); |
7c673cae FG |
633 | if (!cq) { |
634 | lderr(cct) << __func__ << " failed to create receive completion queue: " | |
635 | << cpp_strerror(errno) << dendl; | |
636 | return -1; | |
637 | } | |
638 | ||
639 | if (ibv_req_notify_cq(cq, 0)) { | |
640 | lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; | |
641 | ibv_destroy_cq(cq); | |
642 | cq = nullptr; | |
643 | return -1; | |
644 | } | |
645 | ||
646 | channel->bind_cq(cq); | |
647 | ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; | |
648 | return 0; | |
649 | } | |
650 | ||
651 | int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) | |
652 | { | |
653 | ldout(cct, 20) << __func__ << " started." << dendl; | |
654 | int r = ibv_req_notify_cq(cq, 0); | |
655 | if (r < 0) | |
656 | lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; | |
657 | return r; | |
658 | } | |
659 | ||
660 | int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { | |
661 | int r = ibv_poll_cq(cq, num_entries, ret_wc_array); | |
662 | if (r < 0) { | |
663 | lderr(cct) << __func__ << " poll_completion_queue occur met error: " | |
664 | << cpp_strerror(errno) << dendl; | |
665 | return -1; | |
666 | } | |
667 | return r; | |
668 | } | |
669 | ||
670 | ||
671 | Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device) | |
672 | : pd(ibv_alloc_pd(device->ctxt)) | |
673 | { | |
674 | if (pd == NULL) { | |
675 | lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; | |
676 | ceph_abort(); | |
677 | } | |
678 | } | |
679 | ||
680 | Infiniband::ProtectionDomain::~ProtectionDomain() | |
681 | { | |
224ce89b | 682 | ibv_dealloc_pd(pd); |
7c673cae FG |
683 | } |
684 | ||
685 | ||
9f95a23c TL |
686 | Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t bytes, char* buffer, |
687 | uint32_t offset, uint32_t bound, uint32_t lkey, QueuePair* qp) | |
688 | : mr(m), qp(qp), lkey(lkey), bytes(bytes), offset(offset), bound(bound), buffer(buffer) | |
7c673cae FG |
689 | { |
690 | } | |
691 | ||
692 | Infiniband::MemoryManager::Chunk::~Chunk() | |
693 | { | |
7c673cae FG |
694 | } |
695 | ||
7c673cae FG |
696 | uint32_t Infiniband::MemoryManager::Chunk::get_offset() |
697 | { | |
698 | return offset; | |
699 | } | |
700 | ||
9f95a23c | 701 | uint32_t Infiniband::MemoryManager::Chunk::get_size() const |
7c673cae | 702 | { |
9f95a23c | 703 | return bound - offset; |
7c673cae FG |
704 | } |
705 | ||
706 | void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b) | |
707 | { | |
708 | offset = 0; | |
709 | bound = b; | |
710 | } | |
711 | ||
712 | uint32_t Infiniband::MemoryManager::Chunk::get_bound() | |
713 | { | |
714 | return bound; | |
715 | } | |
716 | ||
717 | uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len) | |
718 | { | |
9f95a23c TL |
719 | uint32_t left = get_size(); |
720 | uint32_t read_len = left <= len ? left : len; | |
721 | memcpy(buf, buffer + offset, read_len); | |
722 | offset += read_len; | |
723 | return read_len; | |
7c673cae FG |
724 | } |
725 | ||
726 | uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len) | |
727 | { | |
9f95a23c TL |
728 | uint32_t write_len = (bytes - offset) <= len ? (bytes - offset) : len; |
729 | memcpy(buffer + offset, buf, write_len); | |
730 | offset += write_len; | |
731 | return write_len; | |
7c673cae FG |
732 | } |
733 | ||
734 | bool Infiniband::MemoryManager::Chunk::full() | |
735 | { | |
736 | return offset == bytes; | |
737 | } | |
738 | ||
9f95a23c | 739 | void Infiniband::MemoryManager::Chunk::reset_read_chunk() |
7c673cae | 740 | { |
9f95a23c TL |
741 | offset = 0; |
742 | bound = 0; | |
7c673cae FG |
743 | } |
744 | ||
9f95a23c | 745 | void Infiniband::MemoryManager::Chunk::reset_write_chunk() |
7c673cae FG |
746 | { |
747 | offset = 0; | |
9f95a23c | 748 | bound = bytes; |
7c673cae FG |
749 | } |
750 | ||
751 | Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) | |
9f95a23c | 752 | : manager(m), buffer_size(s) |
7c673cae FG |
753 | { |
754 | } | |
755 | ||
756 | Infiniband::MemoryManager::Cluster::~Cluster() | |
757 | { | |
224ce89b | 758 | int r = ibv_dereg_mr(chunk_base->mr); |
11fdf7f2 | 759 | ceph_assert(r == 0); |
7c673cae FG |
760 | const auto chunk_end = chunk_base + num_chunk; |
761 | for (auto chunk = chunk_base; chunk != chunk_end; chunk++) { | |
762 | chunk->~Chunk(); | |
763 | } | |
764 | ||
765 | ::free(chunk_base); | |
11fdf7f2 | 766 | manager.free(base); |
7c673cae FG |
767 | } |
768 | ||
769 | int Infiniband::MemoryManager::Cluster::fill(uint32_t num) | |
770 | { | |
11fdf7f2 | 771 | ceph_assert(!base); |
7c673cae FG |
772 | num_chunk = num; |
773 | uint32_t bytes = buffer_size * num; | |
11fdf7f2 TL |
774 | |
775 | base = (char*)manager.malloc(bytes); | |
7c673cae | 776 | end = base + bytes; |
11fdf7f2 | 777 | ceph_assert(base); |
7c673cae | 778 | chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num)); |
92f5a8d4 | 779 | // FIPS zeroization audit 20191115: this memset is not security related. |
11fdf7f2 | 780 | memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num); |
7c673cae | 781 | free_chunks.reserve(num); |
224ce89b | 782 | ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); |
11fdf7f2 | 783 | ceph_assert(m); |
7c673cae FG |
784 | Chunk* chunk = chunk_base; |
785 | for (uint32_t offset = 0; offset < bytes; offset += buffer_size){ | |
9f95a23c | 786 | new(chunk) Chunk(m, buffer_size, base + offset, 0, buffer_size, m->lkey); |
7c673cae FG |
787 | free_chunks.push_back(chunk); |
788 | chunk++; | |
789 | } | |
790 | return 0; | |
791 | } | |
792 | ||
793 | void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck) | |
794 | { | |
9f95a23c | 795 | std::lock_guard l{lock}; |
7c673cae | 796 | for (auto c : ck) { |
9f95a23c | 797 | c->reset_write_chunk(); |
7c673cae FG |
798 | free_chunks.push_back(c); |
799 | } | |
800 | } | |
801 | ||
9f95a23c | 802 | int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t block_size) |
7c673cae | 803 | { |
9f95a23c TL |
804 | std::lock_guard l{lock}; |
805 | uint32_t chunk_buffer_number = (block_size + buffer_size - 1) / buffer_size; | |
806 | chunk_buffer_number = free_chunks.size() < chunk_buffer_number ? free_chunks.size(): chunk_buffer_number; | |
807 | uint32_t r = 0; | |
808 | ||
809 | for (r = 0; r < chunk_buffer_number; ++r) { | |
7c673cae FG |
810 | chunks.push_back(free_chunks.back()); |
811 | free_chunks.pop_back(); | |
812 | } | |
813 | return r; | |
814 | } | |
815 | ||
11fdf7f2 TL |
816 | bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs) |
817 | { | |
818 | /* unlimited */ | |
819 | if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0) | |
820 | return true; | |
821 | ||
822 | if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) { | |
823 | lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " << | |
824 | n_bufs_allocated << " requested: " << nbufs << | |
825 | " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl; | |
826 | return false; | |
827 | } | |
828 | ||
829 | return true; | |
830 | } | |
831 | ||
832 | void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) { | |
833 | perf_logger = logger; | |
834 | if (perf_logger != nullptr) | |
835 | perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated); | |
836 | } | |
837 | ||
838 | void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs) | |
839 | { | |
840 | n_bufs_allocated += nbufs; | |
841 | ||
842 | if (!perf_logger) | |
843 | return; | |
844 | ||
845 | if (nbufs > 0) { | |
846 | perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs); | |
847 | } else { | |
848 | perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs); | |
849 | } | |
850 | } | |
851 | ||
852 | void *Infiniband::MemoryManager::mem_pool::slow_malloc() | |
853 | { | |
11fdf7f2 | 854 | // this will trigger pool expansion via PoolAllocator::malloc() |
9f95a23c TL |
855 | return PoolAllocator::with_context(ctx, [this] { |
856 | return boost::pool<PoolAllocator>::malloc(); | |
857 | }); | |
11fdf7f2 TL |
858 | } |
859 | ||
9f95a23c TL |
860 | Infiniband::MemoryManager::MemPoolContext* |
861 | Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; | |
11fdf7f2 TL |
862 | |
863 | // lock is taken by mem_pool::slow_malloc() | |
9f95a23c | 864 | ceph::mutex& Infiniband::MemoryManager::PoolAllocator::get_lock() |
11fdf7f2 | 865 | { |
9f95a23c TL |
866 | static ceph::mutex lock = ceph::make_mutex("pool-alloc-lock"); |
867 | return lock; | |
868 | } | |
11fdf7f2 | 869 | |
9f95a23c TL |
870 | char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type block_size) |
871 | { | |
11fdf7f2 | 872 | ceph_assert(g_ctx); |
9f95a23c TL |
873 | MemoryManager *manager = g_ctx->manager; |
874 | CephContext *cct = manager->cct; | |
875 | size_t chunk_buffer_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size; | |
876 | size_t chunk_buffer_number = block_size / chunk_buffer_size; | |
11fdf7f2 | 877 | |
9f95a23c | 878 | if (!g_ctx->can_alloc(chunk_buffer_number)) |
11fdf7f2 TL |
879 | return NULL; |
880 | ||
9f95a23c TL |
881 | mem_info *minfo= static_cast<mem_info *>(manager->malloc(block_size + sizeof(mem_info))); |
882 | if (!minfo) { | |
883 | lderr(cct) << __func__ << " failed to allocate " << chunk_buffer_number << " buffers " | |
884 | " Its block size is : " << block_size + sizeof(mem_info) << dendl; | |
11fdf7f2 TL |
885 | return NULL; |
886 | } | |
887 | ||
9f95a23c TL |
888 | minfo->mr = ibv_reg_mr(manager->pd->pd, minfo->chunks, block_size, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); |
889 | if (minfo->mr == NULL) { | |
890 | lderr(cct) << __func__ << " failed to do rdma memory registration " << block_size << " bytes. " | |
891 | " relase allocated memory now." << dendl; | |
892 | manager->free(minfo); | |
11fdf7f2 TL |
893 | return NULL; |
894 | } | |
895 | ||
9f95a23c | 896 | minfo->nbufs = chunk_buffer_number; |
11fdf7f2 | 897 | // save this chunk context |
9f95a23c | 898 | minfo->ctx = g_ctx; |
11fdf7f2 TL |
899 | |
900 | // note that the memory can be allocated before perf logger is set | |
9f95a23c | 901 | g_ctx->update_stats(chunk_buffer_number); |
11fdf7f2 TL |
902 | |
903 | /* initialize chunks */ | |
9f95a23c TL |
904 | Chunk *chunk = minfo->chunks; |
905 | for (unsigned i = 0; i < chunk_buffer_number; i++) { | |
906 | new(chunk) Chunk(minfo->mr, cct->_conf->ms_async_rdma_buffer_size, chunk->data, 0, 0, minfo->mr->lkey); | |
907 | chunk = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(chunk) + chunk_buffer_size); | |
11fdf7f2 TL |
908 | } |
909 | ||
9f95a23c | 910 | return reinterpret_cast<char *>(minfo->chunks); |
11fdf7f2 TL |
911 | } |
912 | ||
7c673cae | 913 | |
11fdf7f2 TL |
914 | void Infiniband::MemoryManager::PoolAllocator::free(char * const block) |
915 | { | |
916 | mem_info *m; | |
9f95a23c | 917 | std::lock_guard l{get_lock()}; |
11fdf7f2 | 918 | |
9f95a23c TL |
919 | Chunk *mem_info_chunk = reinterpret_cast<Chunk *>(block); |
920 | m = reinterpret_cast<mem_info *>(reinterpret_cast<char *>(mem_info_chunk) - offsetof(mem_info, chunks)); | |
11fdf7f2 TL |
921 | m->ctx->update_stats(-m->nbufs); |
922 | ibv_dereg_mr(m->mr); | |
923 | m->ctx->manager->free(m); | |
924 | } | |
925 | ||
926 | Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p) | |
927 | : cct(c), device(d), pd(p), | |
928 | rxbuf_pool_ctx(this), | |
929 | rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, | |
930 | c->_conf->ms_async_rdma_receive_buffers > 0 ? | |
931 | // if possible make initial pool size 2 * receive_queue_len | |
932 | // that way there will be no pool expansion upon receive of the | |
933 | // first packet. | |
934 | (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ? | |
935 | c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) : | |
936 | // rx pool is infinite, we can set any initial size that we want | |
9f95a23c TL |
937 | 2 * c->_conf->ms_async_rdma_receive_queue_len, |
938 | device->device_attr.max_mr_size / (sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size)) | |
7c673cae | 939 | { |
7c673cae FG |
940 | } |
941 | ||
942 | Infiniband::MemoryManager::~MemoryManager() | |
943 | { | |
7c673cae FG |
944 | if (send) |
945 | delete send; | |
946 | } | |
947 | ||
11fdf7f2 | 948 | void* Infiniband::MemoryManager::huge_pages_malloc(size_t size) |
7c673cae | 949 | { |
9f95a23c TL |
950 | size_t real_size = ALIGN_TO_PAGE_2MB(size) + HUGE_PAGE_SIZE_2MB; |
951 | char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, -1, 0); | |
7c673cae | 952 | if (ptr == MAP_FAILED) { |
11fdf7f2 | 953 | ptr = (char *)std::malloc(real_size); |
7c673cae FG |
954 | if (ptr == NULL) return NULL; |
955 | real_size = 0; | |
956 | } | |
957 | *((size_t *)ptr) = real_size; | |
9f95a23c | 958 | return ptr + HUGE_PAGE_SIZE_2MB; |
7c673cae FG |
959 | } |
960 | ||
11fdf7f2 | 961 | void Infiniband::MemoryManager::huge_pages_free(void *ptr) |
7c673cae FG |
962 | { |
963 | if (ptr == NULL) return; | |
9f95a23c | 964 | void *real_ptr = (char *)ptr - HUGE_PAGE_SIZE_2MB; |
7c673cae | 965 | size_t real_size = *((size_t *)real_ptr); |
9f95a23c | 966 | ceph_assert(real_size % HUGE_PAGE_SIZE_2MB == 0); |
7c673cae FG |
967 | if (real_size != 0) |
968 | munmap(real_ptr, real_size); | |
969 | else | |
11fdf7f2 TL |
970 | std::free(real_ptr); |
971 | } | |
972 | ||
973 | ||
974 | void* Infiniband::MemoryManager::malloc(size_t size) | |
975 | { | |
976 | if (cct->_conf->ms_async_rdma_enable_hugepage) | |
977 | return huge_pages_malloc(size); | |
978 | else | |
979 | return std::malloc(size); | |
980 | } | |
981 | ||
982 | void Infiniband::MemoryManager::free(void *ptr) | |
983 | { | |
984 | if (cct->_conf->ms_async_rdma_enable_hugepage) | |
985 | huge_pages_free(ptr); | |
986 | else | |
987 | std::free(ptr); | |
7c673cae FG |
988 | } |
989 | ||
11fdf7f2 | 990 | void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num) |
7c673cae | 991 | { |
11fdf7f2 TL |
992 | ceph_assert(device); |
993 | ceph_assert(pd); | |
7c673cae FG |
994 | |
995 | send = new Cluster(*this, size); | |
996 | send->fill(tx_num); | |
997 | } | |
998 | ||
999 | void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks) | |
1000 | { | |
1001 | send->take_back(chunks); | |
1002 | } | |
1003 | ||
1004 | int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes) | |
1005 | { | |
1006 | return send->get_buffers(c, bytes); | |
1007 | } | |
1008 | ||
11fdf7f2 | 1009 | static std::atomic<bool> init_prereq = {false}; |
7c673cae | 1010 | |
11fdf7f2 | 1011 | void Infiniband::verify_prereq(CephContext *cct) { |
9f95a23c | 1012 | int rc = 0; |
11fdf7f2 TL |
1013 | ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; |
1014 | if (cct->_conf->ms_async_rdma_enable_hugepage){ | |
1015 | rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); | |
1016 | ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; | |
1017 | if (rc) { | |
1018 | lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; | |
1019 | ceph_abort(); | |
1020 | } | |
1021 | } | |
1022 | ||
9f95a23c TL |
1023 | //On RDMA MUST be called before fork |
1024 | rc = ibv_fork_init(); | |
1025 | if (rc) { | |
1026 | lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; | |
1027 | ceph_abort(); | |
1028 | } | |
1029 | ||
11fdf7f2 TL |
1030 | //Check ulimit |
1031 | struct rlimit limit; | |
1032 | getrlimit(RLIMIT_MEMLOCK, &limit); | |
1033 | if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { | |
1034 | lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." | |
1035 | " We recommend setting this parameter to infinity" << dendl; | |
1036 | } | |
1037 | init_prereq = true; | |
1038 | } | |
7c673cae | 1039 | |
11fdf7f2 | 1040 | Infiniband::Infiniband(CephContext *cct) |
9f95a23c | 1041 | : cct(cct), |
11fdf7f2 TL |
1042 | device_name(cct->_conf->ms_async_rdma_device_name), |
1043 | port_num( cct->_conf->ms_async_rdma_port_num) | |
7c673cae | 1044 | { |
11fdf7f2 TL |
1045 | if (!init_prereq) |
1046 | verify_prereq(cct); | |
1047 | ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl; | |
7c673cae FG |
1048 | } |
1049 | ||
7c673cae FG |
1050 | void Infiniband::init() |
1051 | { | |
9f95a23c | 1052 | std::lock_guard l{lock}; |
7c673cae FG |
1053 | |
1054 | if (initialized) | |
1055 | return; | |
1056 | ||
31f18b77 | 1057 | device_list = new DeviceList(cct); |
7c673cae FG |
1058 | initialized = true; |
1059 | ||
31f18b77 | 1060 | device = device_list->get_device(device_name.c_str()); |
11fdf7f2 | 1061 | ceph_assert(device); |
31f18b77 | 1062 | device->binding_port(cct, port_num); |
31f18b77 FG |
1063 | ib_physical_port = device->active_port->get_port_num(); |
1064 | pd = new ProtectionDomain(cct, device); | |
11fdf7f2 | 1065 | ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); |
31f18b77 | 1066 | |
11fdf7f2 | 1067 | support_srq = cct->_conf->ms_async_rdma_support_srq; |
9f95a23c TL |
1068 | if (support_srq) { |
1069 | ceph_assert(device->device_attr.max_srq); | |
1070 | rx_queue_len = device->device_attr.max_srq_wr; | |
1071 | } | |
11fdf7f2 | 1072 | else |
9f95a23c | 1073 | rx_queue_len = device->device_attr.max_qp_wr; |
11fdf7f2 TL |
1074 | if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) { |
1075 | rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len; | |
9f95a23c | 1076 | ldout(cct, 1) << __func__ << " assigning: " << rx_queue_len << " receive buffers" << dendl; |
31f18b77 | 1077 | } else { |
9f95a23c | 1078 | ldout(cct, 0) << __func__ << " using the max allowed receive buffers: " << rx_queue_len << dendl; |
11fdf7f2 TL |
1079 | } |
1080 | ||
1081 | // check for the misconfiguration | |
1082 | if (cct->_conf->ms_async_rdma_receive_buffers > 0 && | |
1083 | rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) { | |
1084 | lderr(cct) << __func__ << " rdma_receive_queue_len (" << | |
1085 | rx_queue_len << ") > ms_async_rdma_receive_buffers(" << | |
1086 | cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl; | |
1087 | ceph_abort(); | |
31f18b77 FG |
1088 | } |
1089 | ||
9f95a23c TL |
1090 | // Keep extra one WR for a beacon to indicate all WCEs were consumed |
1091 | tx_queue_len = device->device_attr.max_qp_wr - 1; | |
11fdf7f2 TL |
1092 | if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) { |
1093 | tx_queue_len = cct->_conf->ms_async_rdma_send_buffers; | |
1094 | ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl; | |
31f18b77 | 1095 | } else { |
11fdf7f2 | 1096 | ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl; |
31f18b77 FG |
1097 | } |
1098 | ||
9f95a23c TL |
1099 | //check for the memory region size misconfiguration |
1100 | if ((uint64_t)cct->_conf->ms_async_rdma_buffer_size * tx_queue_len > device->device_attr.max_mr_size) { | |
1101 | lderr(cct) << __func__ << " Out of max memory region size " << dendl; | |
1102 | ceph_abort(); | |
1103 | } | |
1104 | ||
1105 | ldout(cct, 1) << __func__ << " device allow " << device->device_attr.max_cqe | |
31f18b77 FG |
1106 | << " completion entries" << dendl; |
1107 | ||
11fdf7f2 TL |
1108 | memory_manager = new MemoryManager(cct, device, pd); |
1109 | memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len); | |
31f18b77 | 1110 | |
11fdf7f2 TL |
1111 | if (support_srq) { |
1112 | srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT); | |
1113 | post_chunks_to_rq(rx_queue_len, NULL); //add to srq | |
1114 | } | |
7c673cae FG |
1115 | } |
1116 | ||
31f18b77 FG |
1117 | Infiniband::~Infiniband() |
1118 | { | |
1119 | if (!initialized) | |
1120 | return; | |
11fdf7f2 TL |
1121 | if (support_srq) |
1122 | ibv_destroy_srq(srq); | |
31f18b77 FG |
1123 | delete memory_manager; |
1124 | delete pd; | |
9f95a23c | 1125 | delete device_list; |
31f18b77 FG |
1126 | } |
1127 | ||
31f18b77 FG |
1128 | /** |
1129 | * Create a shared receive queue. This basically wraps the verbs call. | |
1130 | * | |
1131 | * \param[in] max_wr | |
1132 | * The max number of outstanding work requests in the SRQ. | |
1133 | * \param[in] max_sge | |
1134 | * The max number of scatter elements per WR. | |
1135 | * \return | |
1136 | * A valid ibv_srq pointer, or NULL on error. | |
1137 | */ | |
1138 | ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) | |
1139 | { | |
1140 | ibv_srq_init_attr sia; | |
92f5a8d4 | 1141 | // FIPS zeroization audit 20191115: this memset is not security related. |
31f18b77 FG |
1142 | memset(&sia, 0, sizeof(sia)); |
1143 | sia.srq_context = device->ctxt; | |
1144 | sia.attr.max_wr = max_wr; | |
1145 | sia.attr.max_sge = max_sge; | |
1146 | return ibv_create_srq(pd->pd, &sia); | |
1147 | } | |
1148 | ||
1149 | int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) | |
1150 | { | |
1151 | return memory_manager->get_send_buffers(c, bytes); | |
1152 | } | |
1153 | ||
1154 | /** | |
1155 | * Create a new QueuePair. This factory should be used in preference to | |
1156 | * the QueuePair constructor directly, since this lets derivatives of | |
1157 | * Infiniband, e.g. MockInfiniband (if it existed), | |
1158 | * return mocked out QueuePair derivatives. | |
1159 | * | |
1160 | * \return | |
1161 | * QueuePair on success or NULL if init fails | |
1162 | * See QueuePair::QueuePair for parameter documentation. | |
1163 | */ | |
11fdf7f2 TL |
1164 | Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, |
1165 | CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id) | |
31f18b77 FG |
1166 | { |
1167 | Infiniband::QueuePair *qp = new QueuePair( | |
11fdf7f2 | 1168 | cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id); |
31f18b77 FG |
1169 | if (qp->init()) { |
1170 | delete qp; | |
1171 | return NULL; | |
1172 | } | |
1173 | return qp; | |
1174 | } | |
1175 | ||
9f95a23c | 1176 | int Infiniband::post_chunks_to_rq(int rq_wr_num, QueuePair *qp) |
31f18b77 | 1177 | { |
9f95a23c TL |
1178 | int ret = 0; |
1179 | Chunk *chunk = nullptr; | |
1180 | ||
1181 | ibv_recv_wr *rx_work_request = static_cast<ibv_recv_wr*>(::calloc(rq_wr_num, sizeof(ibv_recv_wr))); | |
1182 | ibv_sge *isge = static_cast<ibv_sge*>(::calloc(rq_wr_num, sizeof(ibv_sge))); | |
1183 | ceph_assert(rx_work_request); | |
1184 | ceph_assert(isge); | |
11fdf7f2 | 1185 | |
9f95a23c TL |
1186 | int i = 0; |
1187 | while (i < rq_wr_num) { | |
11fdf7f2 | 1188 | chunk = get_memory_manager()->get_rx_buffer(); |
9f95a23c TL |
1189 | if (chunk == nullptr) { |
1190 | lderr(cct) << __func__ << " WARNING: out of memory. Request " << rq_wr_num << | |
1191 | " rx buffers. Only get " << i << " rx buffers." << dendl; | |
1192 | if (i == 0) { | |
1193 | ::free(rx_work_request); | |
1194 | ::free(isge); | |
11fdf7f2 | 1195 | return 0; |
9f95a23c TL |
1196 | } |
1197 | break; //get some buffers, so we need post them to recevie queue | |
11fdf7f2 | 1198 | } |
31f18b77 | 1199 | |
11fdf7f2 TL |
1200 | isge[i].addr = reinterpret_cast<uint64_t>(chunk->data); |
1201 | isge[i].length = chunk->bytes; | |
1202 | isge[i].lkey = chunk->lkey; | |
31f18b77 | 1203 | |
9f95a23c TL |
1204 | rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// assign chunk address as work request id |
1205 | ||
1206 | if (i != 0) { | |
1207 | rx_work_request[i - 1].next = &rx_work_request[i]; | |
11fdf7f2 TL |
1208 | } |
1209 | rx_work_request[i].sg_list = &isge[i]; | |
1210 | rx_work_request[i].num_sge = 1; | |
9f95a23c TL |
1211 | |
1212 | if (qp && !qp->get_srq()) { | |
1213 | chunk->set_qp(qp); | |
1214 | qp->add_rq_wr(chunk); | |
1215 | } | |
11fdf7f2 | 1216 | i++; |
31f18b77 | 1217 | } |
9f95a23c TL |
1218 | |
1219 | ibv_recv_wr *badworkrequest = nullptr; | |
11fdf7f2 | 1220 | if (support_srq) { |
9f95a23c | 1221 | ret = ibv_post_srq_recv(srq, rx_work_request, &badworkrequest); |
11fdf7f2 TL |
1222 | } else { |
1223 | ceph_assert(qp); | |
9f95a23c | 1224 | ret = ibv_post_recv(qp->get_qp(), rx_work_request, &badworkrequest); |
11fdf7f2 | 1225 | } |
9f95a23c TL |
1226 | |
1227 | ::free(rx_work_request); | |
1228 | ::free(isge); | |
1229 | ceph_assert(badworkrequest == nullptr && ret == 0); | |
11fdf7f2 | 1230 | return i; |
31f18b77 FG |
1231 | } |
1232 | ||
1233 | Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) | |
7c673cae | 1234 | { |
31f18b77 FG |
1235 | Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); |
1236 | if (cc->init()) { | |
1237 | delete cc; | |
1238 | return NULL; | |
1239 | } | |
1240 | return cc; | |
7c673cae FG |
1241 | } |
1242 | ||
31f18b77 FG |
1243 | Infiniband::CompletionQueue* Infiniband::create_comp_queue( |
1244 | CephContext *cct, CompletionChannel *cc) | |
7c673cae | 1245 | { |
31f18b77 FG |
1246 | Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( |
1247 | cct, *this, CQ_DEPTH, cc); | |
1248 | if (cq->init()) { | |
1249 | delete cq; | |
1250 | return NULL; | |
1251 | } | |
1252 | return cq; | |
1253 | } | |
1254 | ||
7c673cae FG |
1255 | Infiniband::QueuePair::~QueuePair() |
1256 | { | |
9f95a23c | 1257 | ldout(cct, 20) << __func__ << " destroy Queue Pair, qp number: " << qp->qp_num << " left SQ WR " << recv_queue.size() << dendl; |
7c673cae FG |
1258 | if (qp) { |
1259 | ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl; | |
11fdf7f2 | 1260 | ceph_assert(!ibv_destroy_qp(qp)); |
7c673cae | 1261 | } |
9f95a23c TL |
1262 | |
1263 | for (auto& chunk: recv_queue) { | |
1264 | infiniband.get_memory_manager()->release_rx_buffer(chunk); | |
1265 | } | |
1266 | recv_queue.clear(); | |
7c673cae FG |
1267 | } |
1268 | ||
1269 | /** | |
1270 | * Given a string representation of the `status' field from Verbs | |
1271 | * struct `ibv_wc'. | |
1272 | * | |
1273 | * \param[in] status | |
1274 | * The integer status obtained in ibv_wc.status. | |
1275 | * \return | |
1276 | * A string corresponding to the given status. | |
1277 | */ | |
1278 | const char* Infiniband::wc_status_to_string(int status) | |
1279 | { | |
1280 | static const char *lookup[] = { | |
1281 | "SUCCESS", | |
1282 | "LOC_LEN_ERR", | |
1283 | "LOC_QP_OP_ERR", | |
1284 | "LOC_EEC_OP_ERR", | |
1285 | "LOC_PROT_ERR", | |
1286 | "WR_FLUSH_ERR", | |
1287 | "MW_BIND_ERR", | |
1288 | "BAD_RESP_ERR", | |
1289 | "LOC_ACCESS_ERR", | |
1290 | "REM_INV_REQ_ERR", | |
1291 | "REM_ACCESS_ERR", | |
1292 | "REM_OP_ERR", | |
1293 | "RETRY_EXC_ERR", | |
1294 | "RNR_RETRY_EXC_ERR", | |
1295 | "LOC_RDD_VIOL_ERR", | |
1296 | "REM_INV_RD_REQ_ERR", | |
1297 | "REM_ABORT_ERR", | |
1298 | "INV_EECN_ERR", | |
1299 | "INV_EEC_STATE_ERR", | |
1300 | "FATAL_ERR", | |
1301 | "RESP_TIMEOUT_ERR", | |
1302 | "GENERAL_ERR" | |
1303 | }; | |
1304 | ||
1305 | if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR) | |
1306 | return "<status out of range!>"; | |
1307 | return lookup[status]; | |
1308 | } | |
1309 | ||
1310 | const char* Infiniband::qp_state_string(int status) { | |
1311 | switch(status) { | |
1312 | case IBV_QPS_RESET : return "IBV_QPS_RESET"; | |
1313 | case IBV_QPS_INIT : return "IBV_QPS_INIT"; | |
1314 | case IBV_QPS_RTR : return "IBV_QPS_RTR"; | |
1315 | case IBV_QPS_RTS : return "IBV_QPS_RTS"; | |
1316 | case IBV_QPS_SQD : return "IBV_QPS_SQD"; | |
1317 | case IBV_QPS_SQE : return "IBV_QPS_SQE"; | |
1318 | case IBV_QPS_ERR : return "IBV_QPS_ERR"; | |
1319 | default: return " out of range."; | |
1320 | } | |
1321 | } |