]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include "Infiniband.h" | |
18 | #include "common/errno.h" | |
19 | #include "common/debug.h" | |
20 | #include "RDMAStack.h" | |
21 | #include <sys/time.h> | |
22 | #include <sys/resource.h> | |
23 | ||
24 | #define dout_subsys ceph_subsys_ms | |
25 | #undef dout_prefix | |
26 | #define dout_prefix *_dout << "Infiniband " | |
27 | ||
28 | static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1; | |
29 | static const uint32_t MAX_INLINE_DATA = 0; | |
30 | static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000"); | |
31 | static const uint32_t CQ_DEPTH = 30000; | |
32 | ||
33 | Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr), gid_idx(0) | |
34 | { | |
35 | #ifdef HAVE_IBV_EXP | |
36 | union ibv_gid cgid; | |
37 | struct ibv_exp_gid_attr gid_attr; | |
38 | bool malformed = false; | |
39 | ||
40 | ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl; | |
41 | int r = ibv_query_port(ctxt, port_num, port_attr); | |
42 | if (r == -1) { | |
43 | lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; | |
44 | ceph_abort(); | |
45 | } | |
46 | ||
47 | lid = port_attr->lid; | |
48 | ||
49 | // search for requested GID in GIDs table | |
50 | ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid) | |
51 | << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl; | |
52 | r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(), | |
53 | "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx" | |
54 | ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx", | |
55 | &cgid.raw[ 0], &cgid.raw[ 1], | |
56 | &cgid.raw[ 2], &cgid.raw[ 3], | |
57 | &cgid.raw[ 4], &cgid.raw[ 5], | |
58 | &cgid.raw[ 6], &cgid.raw[ 7], | |
59 | &cgid.raw[ 8], &cgid.raw[ 9], | |
60 | &cgid.raw[10], &cgid.raw[11], | |
61 | &cgid.raw[12], &cgid.raw[13], | |
62 | &cgid.raw[14], &cgid.raw[15]); | |
63 | ||
64 | if (r != 16) { | |
65 | ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl; | |
66 | malformed = true; | |
67 | } | |
68 | ||
69 | gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE; | |
70 | ||
71 | for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) { | |
72 | r = ibv_query_gid(ctxt, port_num, gid_idx, &gid); | |
73 | if (r) { | |
74 | lderr(cct) << __func__ << " query gid of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; | |
75 | ceph_abort(); | |
76 | } | |
77 | r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr); | |
78 | if (r) { | |
79 | lderr(cct) << __func__ << " query gid attributes of port " << port_num << " index " << gid_idx << " failed " << cpp_strerror(errno) << dendl; | |
80 | ceph_abort(); | |
81 | } | |
82 | ||
83 | if (malformed) break; // stay with gid_idx=0 | |
84 | if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) && | |
85 | (memcmp(&gid, &cgid, 16) == 0) ) { | |
86 | ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl; | |
87 | break; | |
88 | } | |
89 | } | |
90 | ||
91 | if (gid_idx == port_attr->gid_tbl_len) { | |
92 | lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl; | |
93 | ceph_abort(); | |
94 | } | |
95 | #else | |
96 | int r = ibv_query_port(ctxt, port_num, port_attr); | |
97 | if (r == -1) { | |
98 | lderr(cct) << __func__ << " query port failed " << cpp_strerror(errno) << dendl; | |
99 | ceph_abort(); | |
100 | } | |
101 | ||
102 | lid = port_attr->lid; | |
103 | r = ibv_query_gid(ctxt, port_num, 0, &gid); | |
104 | if (r) { | |
105 | lderr(cct) << __func__ << " query gid failed " << cpp_strerror(errno) << dendl; | |
106 | ceph_abort(); | |
107 | } | |
108 | #endif | |
109 | } | |
110 | ||
111 | ||
112 | Device::Device(CephContext *cct, ibv_device* d, struct ibv_context *dc) | |
113 | : device(d), device_attr(new ibv_device_attr), active_port(nullptr) | |
114 | { | |
115 | if (device == NULL) { | |
116 | lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl; | |
117 | ceph_abort(); | |
118 | } | |
119 | name = ibv_get_device_name(device); | |
120 | if (cct->_conf->ms_async_rdma_cm) { | |
121 | ctxt = dc; | |
122 | } else { | |
123 | ctxt = ibv_open_device(device); | |
124 | } | |
125 | if (ctxt == NULL) { | |
126 | lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl; | |
127 | ceph_abort(); | |
128 | } | |
129 | int r = ibv_query_device(ctxt, device_attr); | |
130 | if (r == -1) { | |
131 | lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl; | |
132 | ceph_abort(); | |
133 | } | |
134 | } | |
135 | ||
136 | void Device::binding_port(CephContext *cct, int port_num) { | |
137 | port_cnt = device_attr->phys_port_cnt; | |
138 | for (uint8_t i = 0; i < port_cnt; ++i) { | |
139 | Port *port = new Port(cct, ctxt, i+1); | |
140 | if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) { | |
141 | active_port = port; | |
142 | ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl; | |
143 | break; | |
144 | } else { | |
145 | ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl; | |
146 | } | |
147 | delete port; | |
148 | } | |
149 | if (nullptr == active_port) { | |
150 | lderr(cct) << __func__ << " port not found" << dendl; | |
151 | ceph_assert(active_port); | |
152 | } | |
153 | } | |
154 | ||
155 | ||
156 | Infiniband::QueuePair::QueuePair( | |
157 | CephContext *c, Infiniband& infiniband, ibv_qp_type type, | |
158 | int port, ibv_srq *srq, | |
159 | Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq, | |
160 | uint32_t tx_queue_len, uint32_t rx_queue_len, struct rdma_cm_id *cid, uint32_t q_key) | |
161 | : cct(c), infiniband(infiniband), | |
162 | type(type), | |
163 | ctxt(infiniband.device->ctxt), | |
164 | ib_physical_port(port), | |
165 | pd(infiniband.pd->pd), | |
166 | srq(srq), | |
167 | qp(NULL), | |
168 | cm_id(cid), | |
169 | txcq(txcq), | |
170 | rxcq(rxcq), | |
171 | initial_psn(0), | |
172 | max_send_wr(tx_queue_len), | |
173 | max_recv_wr(rx_queue_len), | |
174 | q_key(q_key), | |
175 | dead(false) | |
176 | { | |
177 | initial_psn = lrand48() & 0xffffff; | |
178 | if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) { | |
179 | lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl; | |
180 | ceph_abort(); | |
181 | } | |
182 | pd = infiniband.pd->pd; | |
183 | } | |
184 | ||
185 | int Infiniband::QueuePair::init() | |
186 | { | |
187 | ldout(cct, 20) << __func__ << " started." << dendl; | |
188 | ibv_qp_init_attr qpia; | |
189 | // FIPS zeroization audit 20191115: this memset is not security related. | |
190 | memset(&qpia, 0, sizeof(qpia)); | |
191 | qpia.send_cq = txcq->get_cq(); | |
192 | qpia.recv_cq = rxcq->get_cq(); | |
193 | if (srq) { | |
194 | qpia.srq = srq; // use the same shared receive queue | |
195 | } else { | |
196 | qpia.cap.max_recv_wr = max_recv_wr; | |
197 | qpia.cap.max_recv_sge = 1; | |
198 | } | |
199 | qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests | |
200 | qpia.cap.max_send_sge = 1; // max send scatter-gather elements | |
201 | qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q | |
202 | qpia.qp_type = type; // RC, UC, UD, or XRC | |
203 | qpia.sq_sig_all = 0; // only generate CQEs on requested WQEs | |
204 | ||
205 | if (!cct->_conf->ms_async_rdma_cm) { | |
206 | qp = ibv_create_qp(pd, &qpia); | |
207 | if (qp == NULL) { | |
208 | lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl; | |
209 | if (errno == ENOMEM) { | |
210 | lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, " | |
211 | " ms_async_rdma_send_buffers or" | |
212 | " ms_async_rdma_buffer_size" << dendl; | |
213 | } | |
214 | return -1; | |
215 | } | |
216 | } else { | |
217 | ceph_assert(cm_id->verbs == pd->context); | |
218 | if (rdma_create_qp(cm_id, pd, &qpia)) { | |
219 | lderr(cct) << __func__ << " failed to create queue pair with rdmacm library" | |
220 | << cpp_strerror(errno) << dendl; | |
221 | return -1; | |
222 | } | |
223 | qp = cm_id->qp; | |
224 | } | |
225 | ldout(cct, 20) << __func__ << " successfully create queue pair: " | |
226 | << "qp=" << qp << dendl; | |
227 | ||
228 | if (cct->_conf->ms_async_rdma_cm) | |
229 | return 0; | |
230 | ||
231 | // move from RESET to INIT state | |
232 | ibv_qp_attr qpa; | |
233 | memset(&qpa, 0, sizeof(qpa)); | |
234 | qpa.qp_state = IBV_QPS_INIT; | |
235 | qpa.pkey_index = 0; | |
236 | qpa.port_num = (uint8_t)(ib_physical_port); | |
237 | qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE; | |
238 | qpa.qkey = q_key; | |
239 | ||
240 | int mask = IBV_QP_STATE | IBV_QP_PORT; | |
241 | switch (type) { | |
242 | case IBV_QPT_RC: | |
243 | mask |= IBV_QP_ACCESS_FLAGS; | |
244 | mask |= IBV_QP_PKEY_INDEX; | |
245 | break; | |
246 | case IBV_QPT_UD: | |
247 | mask |= IBV_QP_QKEY; | |
248 | mask |= IBV_QP_PKEY_INDEX; | |
249 | break; | |
250 | case IBV_QPT_RAW_PACKET: | |
251 | break; | |
252 | default: | |
253 | ceph_abort(); | |
254 | } | |
255 | ||
256 | int ret = ibv_modify_qp(qp, &qpa, mask); | |
257 | if (ret) { | |
258 | ibv_destroy_qp(qp); | |
259 | lderr(cct) << __func__ << " failed to transition to INIT state: " | |
260 | << cpp_strerror(errno) << dendl; | |
261 | return -1; | |
262 | } | |
263 | ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:" | |
264 | << " qp=" << qp << dendl; | |
265 | return 0; | |
266 | } | |
267 | ||
268 | /** | |
269 | * Change RC QueuePair into the ERROR state. This is necessary modify | |
270 | * the Queue Pair into the Error state and poll all of the relevant | |
271 | * Work Completions prior to destroying a Queue Pair. | |
272 | * Since destroying a Queue Pair does not guarantee that its Work | |
273 | * Completions are removed from the CQ upon destruction. Even if the | |
274 | * Work Completions are already in the CQ, it might not be possible to | |
275 | * retrieve them. If the Queue Pair is associated with an SRQ, it is | |
276 | * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED | |
277 | * | |
278 | * \return | |
279 | * -errno if the QueuePair can't switch to ERROR | |
280 | * 0 for success. | |
281 | */ | |
282 | int Infiniband::QueuePair::to_dead() | |
283 | { | |
284 | if (dead) | |
285 | return 0; | |
286 | ibv_qp_attr qpa; | |
287 | memset(&qpa, 0, sizeof(qpa)); | |
288 | qpa.qp_state = IBV_QPS_ERR; | |
289 | ||
290 | int mask = IBV_QP_STATE; | |
291 | int ret = ibv_modify_qp(qp, &qpa, mask); | |
292 | if (ret) { | |
293 | lderr(cct) << __func__ << " failed to transition to ERROR state: " | |
294 | << cpp_strerror(errno) << dendl; | |
295 | return -errno; | |
296 | } | |
297 | dead = true; | |
298 | return ret; | |
299 | } | |
300 | ||
301 | int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const | |
302 | { | |
303 | ibv_qp_attr qpa; | |
304 | ibv_qp_init_attr qpia; | |
305 | ||
306 | int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia); | |
307 | if (r) { | |
308 | lderr(cct) << __func__ << " failed to query qp: " | |
309 | << cpp_strerror(errno) << dendl; | |
310 | return -1; | |
311 | } | |
312 | ||
313 | if (rqp) | |
314 | *rqp = qpa.dest_qp_num; | |
315 | return 0; | |
316 | } | |
317 | ||
318 | /** | |
319 | * Get the remote infiniband address for this QueuePair, as set in #plumb(). | |
320 | * LIDs are "local IDs" in infiniband terminology. They are short, locally | |
321 | * routable addresses. | |
322 | */ | |
323 | int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const | |
324 | { | |
325 | ibv_qp_attr qpa; | |
326 | ibv_qp_init_attr qpia; | |
327 | ||
328 | int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia); | |
329 | if (r) { | |
330 | lderr(cct) << __func__ << " failed to query qp: " | |
331 | << cpp_strerror(errno) << dendl; | |
332 | return -1; | |
333 | } | |
334 | ||
335 | if (lid) | |
336 | *lid = qpa.ah_attr.dlid; | |
337 | return 0; | |
338 | } | |
339 | ||
340 | /** | |
341 | * Get the state of a QueuePair. | |
342 | */ | |
343 | int Infiniband::QueuePair::get_state() const | |
344 | { | |
345 | ibv_qp_attr qpa; | |
346 | ibv_qp_init_attr qpia; | |
347 | ||
348 | int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia); | |
349 | if (r) { | |
350 | lderr(cct) << __func__ << " failed to get state: " | |
351 | << cpp_strerror(errno) << dendl; | |
352 | return -1; | |
353 | } | |
354 | return qpa.qp_state; | |
355 | } | |
356 | ||
357 | /** | |
358 | * Return true if the queue pair is in an error state, false otherwise. | |
359 | */ | |
360 | bool Infiniband::QueuePair::is_error() const | |
361 | { | |
362 | ibv_qp_attr qpa; | |
363 | ibv_qp_init_attr qpia; | |
364 | ||
365 | int r = ibv_query_qp(qp, &qpa, -1, &qpia); | |
366 | if (r) { | |
367 | lderr(cct) << __func__ << " failed to get state: " | |
368 | << cpp_strerror(errno) << dendl; | |
369 | return true; | |
370 | } | |
371 | return qpa.cur_qp_state == IBV_QPS_ERR; | |
372 | } | |
373 | ||
374 | ||
375 | Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib) | |
376 | : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0) | |
377 | { | |
378 | } | |
379 | ||
380 | Infiniband::CompletionChannel::~CompletionChannel() | |
381 | { | |
382 | if (channel) { | |
383 | int r = ibv_destroy_comp_channel(channel); | |
384 | if (r < 0) | |
385 | lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl; | |
386 | ceph_assert(r == 0); | |
387 | } | |
388 | } | |
389 | ||
390 | int Infiniband::CompletionChannel::init() | |
391 | { | |
392 | ldout(cct, 20) << __func__ << " started." << dendl; | |
393 | channel = ibv_create_comp_channel(infiniband.device->ctxt); | |
394 | if (!channel) { | |
395 | lderr(cct) << __func__ << " failed to create receive completion channel: " | |
396 | << cpp_strerror(errno) << dendl; | |
397 | return -1; | |
398 | } | |
399 | int rc = NetHandler(cct).set_nonblock(channel->fd); | |
400 | if (rc < 0) { | |
401 | ibv_destroy_comp_channel(channel); | |
402 | return -1; | |
403 | } | |
404 | return 0; | |
405 | } | |
406 | ||
407 | void Infiniband::CompletionChannel::ack_events() | |
408 | { | |
409 | ibv_ack_cq_events(cq, cq_events_that_need_ack); | |
410 | cq_events_that_need_ack = 0; | |
411 | } | |
412 | ||
413 | bool Infiniband::CompletionChannel::get_cq_event() | |
414 | { | |
415 | ibv_cq *cq = NULL; | |
416 | void *ev_ctx; | |
417 | if (ibv_get_cq_event(channel, &cq, &ev_ctx)) { | |
418 | if (errno != EAGAIN && errno != EINTR) | |
419 | lderr(cct) << __func__ << " failed to retrieve CQ event: " | |
420 | << cpp_strerror(errno) << dendl; | |
421 | return false; | |
422 | } | |
423 | ||
424 | /* accumulate number of cq events that need to | |
425 | * * be acked, and periodically ack them | |
426 | * */ | |
427 | if (++cq_events_that_need_ack == MAX_ACK_EVENT) { | |
428 | ldout(cct, 20) << __func__ << " ack aq events." << dendl; | |
429 | ibv_ack_cq_events(cq, MAX_ACK_EVENT); | |
430 | cq_events_that_need_ack = 0; | |
431 | } | |
432 | ||
433 | return true; | |
434 | } | |
435 | ||
436 | ||
437 | Infiniband::CompletionQueue::~CompletionQueue() | |
438 | { | |
439 | if (cq) { | |
440 | int r = ibv_destroy_cq(cq); | |
441 | if (r < 0) | |
442 | lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl; | |
443 | ceph_assert(r == 0); | |
444 | } | |
445 | } | |
446 | ||
447 | int Infiniband::CompletionQueue::init() | |
448 | { | |
449 | cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0); | |
450 | if (!cq) { | |
451 | lderr(cct) << __func__ << " failed to create receive completion queue: " | |
452 | << cpp_strerror(errno) << dendl; | |
453 | return -1; | |
454 | } | |
455 | ||
456 | if (ibv_req_notify_cq(cq, 0)) { | |
457 | lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl; | |
458 | ibv_destroy_cq(cq); | |
459 | cq = nullptr; | |
460 | return -1; | |
461 | } | |
462 | ||
463 | channel->bind_cq(cq); | |
464 | ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl; | |
465 | return 0; | |
466 | } | |
467 | ||
468 | int Infiniband::CompletionQueue::rearm_notify(bool solicite_only) | |
469 | { | |
470 | ldout(cct, 20) << __func__ << " started." << dendl; | |
471 | int r = ibv_req_notify_cq(cq, 0); | |
472 | if (r < 0) | |
473 | lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl; | |
474 | return r; | |
475 | } | |
476 | ||
477 | int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) { | |
478 | int r = ibv_poll_cq(cq, num_entries, ret_wc_array); | |
479 | if (r < 0) { | |
480 | lderr(cct) << __func__ << " poll_completion_queue occur met error: " | |
481 | << cpp_strerror(errno) << dendl; | |
482 | return -1; | |
483 | } | |
484 | return r; | |
485 | } | |
486 | ||
487 | ||
488 | Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device) | |
489 | : pd(ibv_alloc_pd(device->ctxt)) | |
490 | { | |
491 | if (pd == NULL) { | |
492 | lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl; | |
493 | ceph_abort(); | |
494 | } | |
495 | } | |
496 | ||
497 | Infiniband::ProtectionDomain::~ProtectionDomain() | |
498 | { | |
499 | ibv_dealloc_pd(pd); | |
500 | } | |
501 | ||
502 | ||
503 | Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t len, char* b) | |
504 | : mr(m), bytes(len), offset(0), buffer(b) | |
505 | { | |
506 | } | |
507 | ||
508 | Infiniband::MemoryManager::Chunk::~Chunk() | |
509 | { | |
510 | } | |
511 | ||
512 | void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o) | |
513 | { | |
514 | offset = o; | |
515 | } | |
516 | ||
517 | uint32_t Infiniband::MemoryManager::Chunk::get_offset() | |
518 | { | |
519 | return offset; | |
520 | } | |
521 | ||
522 | void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b) | |
523 | { | |
524 | bound = b; | |
525 | } | |
526 | ||
527 | void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b) | |
528 | { | |
529 | offset = 0; | |
530 | bound = b; | |
531 | } | |
532 | ||
533 | uint32_t Infiniband::MemoryManager::Chunk::get_bound() | |
534 | { | |
535 | return bound; | |
536 | } | |
537 | ||
538 | uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len) | |
539 | { | |
540 | uint32_t left = bound - offset; | |
541 | if (left >= len) { | |
542 | memcpy(buf, buffer+offset, len); | |
543 | offset += len; | |
544 | return len; | |
545 | } else { | |
546 | memcpy(buf, buffer+offset, left); | |
547 | offset = 0; | |
548 | bound = 0; | |
549 | return left; | |
550 | } | |
551 | } | |
552 | ||
553 | uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len) | |
554 | { | |
555 | uint32_t left = bytes - offset; | |
556 | if (left >= len) { | |
557 | memcpy(buffer+offset, buf, len); | |
558 | offset += len; | |
559 | return len; | |
560 | } else { | |
561 | memcpy(buffer+offset, buf, left); | |
562 | offset = bytes; | |
563 | return left; | |
564 | } | |
565 | } | |
566 | ||
567 | bool Infiniband::MemoryManager::Chunk::full() | |
568 | { | |
569 | return offset == bytes; | |
570 | } | |
571 | ||
572 | bool Infiniband::MemoryManager::Chunk::over() | |
573 | { | |
574 | return Infiniband::MemoryManager::Chunk::offset == bound; | |
575 | } | |
576 | ||
577 | void Infiniband::MemoryManager::Chunk::clear() | |
578 | { | |
579 | offset = 0; | |
580 | bound = 0; | |
581 | } | |
582 | ||
583 | Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s) | |
584 | : manager(m), buffer_size(s), lock("cluster_lock") | |
585 | { | |
586 | } | |
587 | ||
588 | Infiniband::MemoryManager::Cluster::~Cluster() | |
589 | { | |
590 | int r = ibv_dereg_mr(chunk_base->mr); | |
591 | ceph_assert(r == 0); | |
592 | const auto chunk_end = chunk_base + num_chunk; | |
593 | for (auto chunk = chunk_base; chunk != chunk_end; chunk++) { | |
594 | chunk->~Chunk(); | |
595 | } | |
596 | ||
597 | ::free(chunk_base); | |
598 | manager.free(base); | |
599 | } | |
600 | ||
601 | int Infiniband::MemoryManager::Cluster::fill(uint32_t num) | |
602 | { | |
603 | ceph_assert(!base); | |
604 | num_chunk = num; | |
605 | uint32_t bytes = buffer_size * num; | |
606 | ||
607 | base = (char*)manager.malloc(bytes); | |
608 | end = base + bytes; | |
609 | ceph_assert(base); | |
610 | chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num)); | |
611 | // FIPS zeroization audit 20191115: this memset is not security related. | |
612 | memset(static_cast<void*>(chunk_base), 0, sizeof(Chunk) * num); | |
613 | free_chunks.reserve(num); | |
614 | ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); | |
615 | ceph_assert(m); | |
616 | Chunk* chunk = chunk_base; | |
617 | for (uint32_t offset = 0; offset < bytes; offset += buffer_size){ | |
618 | new(chunk) Chunk(m, buffer_size, base+offset); | |
619 | free_chunks.push_back(chunk); | |
620 | chunk++; | |
621 | } | |
622 | return 0; | |
623 | } | |
624 | ||
625 | void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck) | |
626 | { | |
627 | Mutex::Locker l(lock); | |
628 | for (auto c : ck) { | |
629 | c->clear(); | |
630 | free_chunks.push_back(c); | |
631 | } | |
632 | } | |
633 | ||
634 | int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes) | |
635 | { | |
636 | uint32_t num = bytes / buffer_size + 1; | |
637 | if (bytes % buffer_size == 0) | |
638 | --num; | |
639 | int r = num; | |
640 | Mutex::Locker l(lock); | |
641 | if (free_chunks.empty()) | |
642 | return 0; | |
643 | if (!bytes) { | |
644 | r = free_chunks.size(); | |
645 | for (auto c : free_chunks) | |
646 | chunks.push_back(c); | |
647 | free_chunks.clear(); | |
648 | return r; | |
649 | } | |
650 | if (free_chunks.size() < num) { | |
651 | num = free_chunks.size(); | |
652 | r = num; | |
653 | } | |
654 | for (uint32_t i = 0; i < num; ++i) { | |
655 | chunks.push_back(free_chunks.back()); | |
656 | free_chunks.pop_back(); | |
657 | } | |
658 | return r; | |
659 | } | |
660 | ||
661 | bool Infiniband::MemoryManager::MemPoolContext::can_alloc(unsigned nbufs) | |
662 | { | |
663 | /* unlimited */ | |
664 | if (manager->cct->_conf->ms_async_rdma_receive_buffers <= 0) | |
665 | return true; | |
666 | ||
667 | if (n_bufs_allocated + nbufs > (unsigned)manager->cct->_conf->ms_async_rdma_receive_buffers) { | |
668 | lderr(manager->cct) << __func__ << " WARNING: OUT OF RX BUFFERS: allocated: " << | |
669 | n_bufs_allocated << " requested: " << nbufs << | |
670 | " limit: " << manager->cct->_conf->ms_async_rdma_receive_buffers << dendl; | |
671 | return false; | |
672 | } | |
673 | ||
674 | return true; | |
675 | } | |
676 | ||
677 | void Infiniband::MemoryManager::MemPoolContext::set_stat_logger(PerfCounters *logger) { | |
678 | perf_logger = logger; | |
679 | if (perf_logger != nullptr) | |
680 | perf_logger->set(l_msgr_rdma_rx_bufs_total, n_bufs_allocated); | |
681 | } | |
682 | ||
683 | void Infiniband::MemoryManager::MemPoolContext::update_stats(int nbufs) | |
684 | { | |
685 | n_bufs_allocated += nbufs; | |
686 | ||
687 | if (!perf_logger) | |
688 | return; | |
689 | ||
690 | if (nbufs > 0) { | |
691 | perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs); | |
692 | } else { | |
693 | perf_logger->dec(l_msgr_rdma_rx_bufs_total, -nbufs); | |
694 | } | |
695 | } | |
696 | ||
697 | void *Infiniband::MemoryManager::mem_pool::slow_malloc() | |
698 | { | |
699 | void *p; | |
700 | ||
701 | Mutex::Locker l(PoolAllocator::lock); | |
702 | PoolAllocator::g_ctx = ctx; | |
703 | // this will trigger pool expansion via PoolAllocator::malloc() | |
704 | p = boost::pool<PoolAllocator>::malloc(); | |
705 | PoolAllocator::g_ctx = nullptr; | |
706 | return p; | |
707 | } | |
708 | ||
709 | Infiniband::MemoryManager::MemPoolContext *Infiniband::MemoryManager::PoolAllocator::g_ctx = nullptr; | |
710 | Mutex Infiniband::MemoryManager::PoolAllocator::lock("pool-alloc-lock"); | |
711 | ||
712 | // lock is taken by mem_pool::slow_malloc() | |
713 | char *Infiniband::MemoryManager::PoolAllocator::malloc(const size_type bytes) | |
714 | { | |
715 | mem_info *m; | |
716 | Chunk *ch; | |
717 | size_t rx_buf_size; | |
718 | unsigned nbufs; | |
719 | MemoryManager *manager; | |
720 | CephContext *cct; | |
721 | ||
722 | ceph_assert(g_ctx); | |
723 | manager = g_ctx->manager; | |
724 | cct = manager->cct; | |
725 | rx_buf_size = sizeof(Chunk) + cct->_conf->ms_async_rdma_buffer_size; | |
726 | nbufs = bytes/rx_buf_size; | |
727 | ||
728 | if (!g_ctx->can_alloc(nbufs)) | |
729 | return NULL; | |
730 | ||
731 | m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m))); | |
732 | if (!m) { | |
733 | lderr(cct) << __func__ << " failed to allocate " << | |
734 | bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl; | |
735 | return NULL; | |
736 | } | |
737 | ||
738 | m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE); | |
739 | if (m->mr == NULL) { | |
740 | lderr(cct) << __func__ << " failed to register " << | |
741 | bytes << " + " << sizeof(*m) << " bytes of memory for " << nbufs << dendl; | |
742 | manager->free(m); | |
743 | return NULL; | |
744 | } | |
745 | ||
746 | m->nbufs = nbufs; | |
747 | // save this chunk context | |
748 | m->ctx = g_ctx; | |
749 | ||
750 | // note that the memory can be allocated before perf logger is set | |
751 | g_ctx->update_stats(nbufs); | |
752 | ||
753 | /* initialize chunks */ | |
754 | ch = m->chunks; | |
755 | for (unsigned i = 0; i < nbufs; i++) { | |
756 | ch->lkey = m->mr->lkey; | |
757 | ch->bytes = cct->_conf->ms_async_rdma_buffer_size; | |
758 | ch->offset = 0; | |
759 | ch->buffer = ch->data; // TODO: refactor tx and remove buffer | |
760 | ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size); | |
761 | } | |
762 | ||
763 | return reinterpret_cast<char *>(m->chunks); | |
764 | } | |
765 | ||
766 | ||
767 | void Infiniband::MemoryManager::PoolAllocator::free(char * const block) | |
768 | { | |
769 | mem_info *m; | |
770 | Mutex::Locker l(lock); | |
771 | ||
772 | m = reinterpret_cast<mem_info *>(block) - 1; | |
773 | m->ctx->update_stats(-m->nbufs); | |
774 | ibv_dereg_mr(m->mr); | |
775 | m->ctx->manager->free(m); | |
776 | } | |
777 | ||
778 | Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p) | |
779 | : cct(c), device(d), pd(p), | |
780 | rxbuf_pool_ctx(this), | |
781 | rxbuf_pool(&rxbuf_pool_ctx, sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size, | |
782 | c->_conf->ms_async_rdma_receive_buffers > 0 ? | |
783 | // if possible make initial pool size 2 * receive_queue_len | |
784 | // that way there will be no pool expansion upon receive of the | |
785 | // first packet. | |
786 | (c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ? | |
787 | c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) : | |
788 | // rx pool is infinite, we can set any initial size that we want | |
789 | 2 * c->_conf->ms_async_rdma_receive_queue_len) | |
790 | { | |
791 | } | |
792 | ||
793 | Infiniband::MemoryManager::~MemoryManager() | |
794 | { | |
795 | if (send) | |
796 | delete send; | |
797 | } | |
798 | ||
799 | void* Infiniband::MemoryManager::huge_pages_malloc(size_t size) | |
800 | { | |
801 | size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE); | |
802 | char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0); | |
803 | if (ptr == MAP_FAILED) { | |
804 | ptr = (char *)std::malloc(real_size); | |
805 | if (ptr == NULL) return NULL; | |
806 | real_size = 0; | |
807 | } | |
808 | *((size_t *)ptr) = real_size; | |
809 | return ptr + HUGE_PAGE_SIZE; | |
810 | } | |
811 | ||
812 | void Infiniband::MemoryManager::huge_pages_free(void *ptr) | |
813 | { | |
814 | if (ptr == NULL) return; | |
815 | void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE; | |
816 | size_t real_size = *((size_t *)real_ptr); | |
817 | ceph_assert(real_size % HUGE_PAGE_SIZE == 0); | |
818 | if (real_size != 0) | |
819 | munmap(real_ptr, real_size); | |
820 | else | |
821 | std::free(real_ptr); | |
822 | } | |
823 | ||
824 | ||
825 | void* Infiniband::MemoryManager::malloc(size_t size) | |
826 | { | |
827 | if (cct->_conf->ms_async_rdma_enable_hugepage) | |
828 | return huge_pages_malloc(size); | |
829 | else | |
830 | return std::malloc(size); | |
831 | } | |
832 | ||
833 | void Infiniband::MemoryManager::free(void *ptr) | |
834 | { | |
835 | if (cct->_conf->ms_async_rdma_enable_hugepage) | |
836 | huge_pages_free(ptr); | |
837 | else | |
838 | std::free(ptr); | |
839 | } | |
840 | ||
841 | void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num) | |
842 | { | |
843 | ceph_assert(device); | |
844 | ceph_assert(pd); | |
845 | ||
846 | send = new Cluster(*this, size); | |
847 | send->fill(tx_num); | |
848 | } | |
849 | ||
850 | void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks) | |
851 | { | |
852 | send->take_back(chunks); | |
853 | } | |
854 | ||
855 | int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes) | |
856 | { | |
857 | return send->get_buffers(c, bytes); | |
858 | } | |
859 | ||
860 | static std::atomic<bool> init_prereq = {false}; | |
861 | ||
862 | void Infiniband::verify_prereq(CephContext *cct) { | |
863 | ||
864 | //On RDMA MUST be called before fork | |
865 | int rc = ibv_fork_init(); | |
866 | if (rc) { | |
867 | lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl; | |
868 | ceph_abort(); | |
869 | } | |
870 | ||
871 | ldout(cct, 20) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage << dendl; | |
872 | if (cct->_conf->ms_async_rdma_enable_hugepage){ | |
873 | rc = setenv("RDMAV_HUGEPAGES_SAFE","1",1); | |
874 | ldout(cct, 0) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") << dendl; | |
875 | if (rc) { | |
876 | lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl; | |
877 | ceph_abort(); | |
878 | } | |
879 | } | |
880 | ||
881 | //Check ulimit | |
882 | struct rlimit limit; | |
883 | getrlimit(RLIMIT_MEMLOCK, &limit); | |
884 | if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) { | |
885 | lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory." | |
886 | " We recommend setting this parameter to infinity" << dendl; | |
887 | } | |
888 | init_prereq = true; | |
889 | } | |
890 | ||
891 | Infiniband::Infiniband(CephContext *cct) | |
892 | : cct(cct), lock("IB lock"), | |
893 | device_name(cct->_conf->ms_async_rdma_device_name), | |
894 | port_num( cct->_conf->ms_async_rdma_port_num) | |
895 | { | |
896 | if (!init_prereq) | |
897 | verify_prereq(cct); | |
898 | ldout(cct, 20) << __func__ << " constructing Infiniband..." << dendl; | |
899 | } | |
900 | ||
901 | void Infiniband::init() | |
902 | { | |
903 | Mutex::Locker l(lock); | |
904 | ||
905 | if (initialized) | |
906 | return; | |
907 | ||
908 | device_list = new DeviceList(cct); | |
909 | initialized = true; | |
910 | ||
911 | device = device_list->get_device(device_name.c_str()); | |
912 | ceph_assert(device); | |
913 | device->binding_port(cct, port_num); | |
914 | ib_physical_port = device->active_port->get_port_num(); | |
915 | pd = new ProtectionDomain(cct, device); | |
916 | ceph_assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0); | |
917 | ||
918 | support_srq = cct->_conf->ms_async_rdma_support_srq; | |
919 | if (support_srq) | |
920 | rx_queue_len = device->device_attr->max_srq_wr; | |
921 | else | |
922 | rx_queue_len = device->device_attr->max_qp_wr; | |
923 | if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) { | |
924 | rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len; | |
925 | ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl; | |
926 | } else { | |
927 | ldout(cct, 0) << __func__ << " requested receive queue length " << | |
928 | cct->_conf->ms_async_rdma_receive_queue_len << | |
929 | " is too big. Setting " << rx_queue_len << dendl; | |
930 | } | |
931 | ||
932 | // check for the misconfiguration | |
933 | if (cct->_conf->ms_async_rdma_receive_buffers > 0 && | |
934 | rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) { | |
935 | lderr(cct) << __func__ << " rdma_receive_queue_len (" << | |
936 | rx_queue_len << ") > ms_async_rdma_receive_buffers(" << | |
937 | cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl; | |
938 | ceph_abort(); | |
939 | } | |
940 | ||
941 | tx_queue_len = device->device_attr->max_qp_wr; | |
942 | if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) { | |
943 | tx_queue_len = cct->_conf->ms_async_rdma_send_buffers; | |
944 | ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl; | |
945 | } else { | |
946 | ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl; | |
947 | } | |
948 | ||
949 | ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe | |
950 | << " completion entries" << dendl; | |
951 | ||
952 | memory_manager = new MemoryManager(cct, device, pd); | |
953 | memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len); | |
954 | ||
955 | if (support_srq) { | |
956 | srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT); | |
957 | post_chunks_to_rq(rx_queue_len, NULL); //add to srq | |
958 | } | |
959 | } | |
960 | ||
961 | Infiniband::~Infiniband() | |
962 | { | |
963 | if (!initialized) | |
964 | return; | |
965 | if (support_srq) | |
966 | ibv_destroy_srq(srq); | |
967 | delete memory_manager; | |
968 | delete pd; | |
969 | } | |
970 | ||
971 | /** | |
972 | * Create a shared receive queue. This basically wraps the verbs call. | |
973 | * | |
974 | * \param[in] max_wr | |
975 | * The max number of outstanding work requests in the SRQ. | |
976 | * \param[in] max_sge | |
977 | * The max number of scatter elements per WR. | |
978 | * \return | |
979 | * A valid ibv_srq pointer, or NULL on error. | |
980 | */ | |
981 | ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge) | |
982 | { | |
983 | ibv_srq_init_attr sia; | |
984 | // FIPS zeroization audit 20191115: this memset is not security related. | |
985 | memset(&sia, 0, sizeof(sia)); | |
986 | sia.srq_context = device->ctxt; | |
987 | sia.attr.max_wr = max_wr; | |
988 | sia.attr.max_sge = max_sge; | |
989 | return ibv_create_srq(pd->pd, &sia); | |
990 | } | |
991 | ||
992 | int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes) | |
993 | { | |
994 | return memory_manager->get_send_buffers(c, bytes); | |
995 | } | |
996 | ||
997 | /** | |
998 | * Create a new QueuePair. This factory should be used in preference to | |
999 | * the QueuePair constructor directly, since this lets derivatives of | |
1000 | * Infiniband, e.g. MockInfiniband (if it existed), | |
1001 | * return mocked out QueuePair derivatives. | |
1002 | * | |
1003 | * \return | |
1004 | * QueuePair on success or NULL if init fails | |
1005 | * See QueuePair::QueuePair for parameter documentation. | |
1006 | */ | |
1007 | Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, | |
1008 | CompletionQueue* rx, ibv_qp_type type, struct rdma_cm_id *cm_id) | |
1009 | { | |
1010 | Infiniband::QueuePair *qp = new QueuePair( | |
1011 | cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len, cm_id); | |
1012 | if (qp->init()) { | |
1013 | delete qp; | |
1014 | return NULL; | |
1015 | } | |
1016 | return qp; | |
1017 | } | |
1018 | ||
1019 | int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp) | |
1020 | { | |
1021 | int ret, i = 0; | |
1022 | ibv_sge isge[num]; | |
1023 | Chunk *chunk; | |
1024 | ibv_recv_wr rx_work_request[num]; | |
1025 | ||
1026 | while (i < num) { | |
1027 | chunk = get_memory_manager()->get_rx_buffer(); | |
1028 | if (chunk == NULL) { | |
1029 | lderr(cct) << __func__ << " WARNING: out of memory. Requested " << num << | |
1030 | " rx buffers. Got " << i << dendl; | |
1031 | if (i == 0) | |
1032 | return 0; | |
1033 | // if we got some buffers post them and hope for the best | |
1034 | rx_work_request[i-1].next = 0; | |
1035 | break; | |
1036 | } | |
1037 | ||
1038 | isge[i].addr = reinterpret_cast<uint64_t>(chunk->data); | |
1039 | isge[i].length = chunk->bytes; | |
1040 | isge[i].lkey = chunk->lkey; | |
1041 | ||
1042 | memset(&rx_work_request[i], 0, sizeof(rx_work_request[i])); | |
1043 | rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr | |
1044 | if (i == num - 1) { | |
1045 | rx_work_request[i].next = 0; | |
1046 | } else { | |
1047 | rx_work_request[i].next = &rx_work_request[i+1]; | |
1048 | } | |
1049 | rx_work_request[i].sg_list = &isge[i]; | |
1050 | rx_work_request[i].num_sge = 1; | |
1051 | i++; | |
1052 | } | |
1053 | ibv_recv_wr *badworkrequest; | |
1054 | if (support_srq) { | |
1055 | ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest); | |
1056 | ceph_assert(ret == 0); | |
1057 | } else { | |
1058 | ceph_assert(qp); | |
1059 | ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest); | |
1060 | ceph_assert(ret == 0); | |
1061 | } | |
1062 | return i; | |
1063 | } | |
1064 | ||
1065 | Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c) | |
1066 | { | |
1067 | Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this); | |
1068 | if (cc->init()) { | |
1069 | delete cc; | |
1070 | return NULL; | |
1071 | } | |
1072 | return cc; | |
1073 | } | |
1074 | ||
1075 | Infiniband::CompletionQueue* Infiniband::create_comp_queue( | |
1076 | CephContext *cct, CompletionChannel *cc) | |
1077 | { | |
1078 | Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue( | |
1079 | cct, *this, CQ_DEPTH, cc); | |
1080 | if (cq->init()) { | |
1081 | delete cq; | |
1082 | return NULL; | |
1083 | } | |
1084 | return cq; | |
1085 | } | |
1086 | ||
1087 | // 1 means no valid buffer read, 0 means got enough buffer | |
1088 | // else return < 0 means error | |
1089 | int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im) | |
1090 | { | |
1091 | char msg[TCP_MSG_LEN]; | |
1092 | char gid[33]; | |
1093 | ssize_t r = ::read(sd, &msg, sizeof(msg)); | |
1094 | // Drop incoming qpt | |
1095 | if (cct->_conf->ms_inject_socket_failures && sd >= 0) { | |
1096 | if (rand() % cct->_conf->ms_inject_socket_failures == 0) { | |
1097 | ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; | |
1098 | return -EINVAL; | |
1099 | } | |
1100 | } | |
1101 | if (r < 0) { | |
1102 | r = -errno; | |
1103 | lderr(cct) << __func__ << " got error " << r << ": " | |
1104 | << cpp_strerror(r) << dendl; | |
1105 | } else if (r == 0) { // valid disconnect message of length 0 | |
1106 | ldout(cct, 10) << __func__ << " got disconnect message " << dendl; | |
1107 | } else if ((size_t)r != sizeof(msg)) { // invalid message | |
1108 | ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl; | |
1109 | r = -EINVAL; | |
1110 | } else { // valid message | |
1111 | sscanf(msg, "%hx:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid); | |
1112 | wire_gid_to_gid(gid, &(im.gid)); | |
1113 | ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl; | |
1114 | } | |
1115 | return r; | |
1116 | } | |
1117 | ||
1118 | int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im) | |
1119 | { | |
1120 | int retry = 0; | |
1121 | ssize_t r; | |
1122 | ||
1123 | char msg[TCP_MSG_LEN]; | |
1124 | char gid[33]; | |
1125 | retry: | |
1126 | gid_to_wire_gid(&(im.gid), gid); | |
1127 | sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid); | |
1128 | ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn | |
1129 | << ", " << im.peer_qpn << ", " << gid << dendl; | |
1130 | r = ::write(sd, msg, sizeof(msg)); | |
1131 | // Drop incoming qpt | |
1132 | if (cct->_conf->ms_inject_socket_failures && sd >= 0) { | |
1133 | if (rand() % cct->_conf->ms_inject_socket_failures == 0) { | |
1134 | ldout(cct, 0) << __func__ << " injecting socket failure" << dendl; | |
1135 | return -EINVAL; | |
1136 | } | |
1137 | } | |
1138 | ||
1139 | if ((size_t)r != sizeof(msg)) { | |
1140 | // FIXME need to handle EAGAIN instead of retry | |
1141 | if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) { | |
1142 | retry++; | |
1143 | goto retry; | |
1144 | } | |
1145 | if (r < 0) | |
1146 | lderr(cct) << __func__ << " send returned error " << errno << ": " | |
1147 | << cpp_strerror(errno) << dendl; | |
1148 | else | |
1149 | lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl; | |
1150 | return -errno; | |
1151 | } | |
1152 | return 0; | |
1153 | } | |
1154 | ||
1155 | void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid) | |
1156 | { | |
1157 | char tmp[9]; | |
1158 | uint32_t v32; | |
1159 | int i; | |
1160 | ||
1161 | for (tmp[8] = 0, i = 0; i < 4; ++i) { | |
1162 | memcpy(tmp, wgid + i * 8, 8); | |
1163 | sscanf(tmp, "%x", &v32); | |
1164 | *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32); | |
1165 | } | |
1166 | } | |
1167 | ||
1168 | void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[]) | |
1169 | { | |
1170 | for (int i = 0; i < 4; ++i) | |
1171 | sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4))); | |
1172 | } | |
1173 | ||
1174 | Infiniband::QueuePair::~QueuePair() | |
1175 | { | |
1176 | if (qp) { | |
1177 | ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl; | |
1178 | ceph_assert(!ibv_destroy_qp(qp)); | |
1179 | } | |
1180 | } | |
1181 | ||
1182 | /** | |
1183 | * Given a string representation of the `status' field from Verbs | |
1184 | * struct `ibv_wc'. | |
1185 | * | |
1186 | * \param[in] status | |
1187 | * The integer status obtained in ibv_wc.status. | |
1188 | * \return | |
1189 | * A string corresponding to the given status. | |
1190 | */ | |
1191 | const char* Infiniband::wc_status_to_string(int status) | |
1192 | { | |
1193 | static const char *lookup[] = { | |
1194 | "SUCCESS", | |
1195 | "LOC_LEN_ERR", | |
1196 | "LOC_QP_OP_ERR", | |
1197 | "LOC_EEC_OP_ERR", | |
1198 | "LOC_PROT_ERR", | |
1199 | "WR_FLUSH_ERR", | |
1200 | "MW_BIND_ERR", | |
1201 | "BAD_RESP_ERR", | |
1202 | "LOC_ACCESS_ERR", | |
1203 | "REM_INV_REQ_ERR", | |
1204 | "REM_ACCESS_ERR", | |
1205 | "REM_OP_ERR", | |
1206 | "RETRY_EXC_ERR", | |
1207 | "RNR_RETRY_EXC_ERR", | |
1208 | "LOC_RDD_VIOL_ERR", | |
1209 | "REM_INV_RD_REQ_ERR", | |
1210 | "REM_ABORT_ERR", | |
1211 | "INV_EECN_ERR", | |
1212 | "INV_EEC_STATE_ERR", | |
1213 | "FATAL_ERR", | |
1214 | "RESP_TIMEOUT_ERR", | |
1215 | "GENERAL_ERR" | |
1216 | }; | |
1217 | ||
1218 | if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR) | |
1219 | return "<status out of range!>"; | |
1220 | return lookup[status]; | |
1221 | } | |
1222 | ||
1223 | const char* Infiniband::qp_state_string(int status) { | |
1224 | switch(status) { | |
1225 | case IBV_QPS_RESET : return "IBV_QPS_RESET"; | |
1226 | case IBV_QPS_INIT : return "IBV_QPS_INIT"; | |
1227 | case IBV_QPS_RTR : return "IBV_QPS_RTR"; | |
1228 | case IBV_QPS_RTS : return "IBV_QPS_RTS"; | |
1229 | case IBV_QPS_SQD : return "IBV_QPS_SQD"; | |
1230 | case IBV_QPS_SQE : return "IBV_QPS_SQE"; | |
1231 | case IBV_QPS_ERR : return "IBV_QPS_ERR"; | |
1232 | default: return " out of range."; | |
1233 | } | |
1234 | } |