]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/nvmf/rdma.c
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / spdk / lib / nvmf / rdma.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include <arpa/inet.h>
35 #include <fcntl.h>
36 #include <errno.h>
37 #include <infiniband/verbs.h>
38 #include <rdma/rdma_cma.h>
39 #include <rdma/rdma_verbs.h>
40 #include <unistd.h>
41 #include <stdio.h>
42 #include <stdint.h>
43
44 #include "nvmf_internal.h"
45 #include "request.h"
46 #include "session.h"
47 #include "subsystem.h"
48 #include "transport.h"
49
50 #include "spdk/assert.h"
51 #include "spdk/nvmf.h"
52 #include "spdk/nvmf_spec.h"
53 #include "spdk/string.h"
54 #include "spdk/trace.h"
55 #include "spdk/util.h"
56
57 #include "spdk_internal/log.h"
58
59 /*
60 RDMA Connection Resouce Defaults
61 */
62 #define NVMF_DEFAULT_TX_SGE 1
63 #define NVMF_DEFAULT_RX_SGE 2
64
65 struct spdk_nvmf_rdma_buf {
66 SLIST_ENTRY(spdk_nvmf_rdma_buf) link;
67 };
68
69 /* This structure holds commands as they are received off the wire.
70 * It must be dynamically paired with a full request object
71 * (spdk_nvmf_rdma_request) to service a request. It is separate
72 * from the request because RDMA does not appear to order
73 * completions, so occasionally we'll get a new incoming
74 * command when there aren't any free request objects.
75 */
76 struct spdk_nvmf_rdma_recv {
77 struct ibv_recv_wr wr;
78 struct ibv_sge sgl[NVMF_DEFAULT_RX_SGE];
79
80 /* In-capsule data buffer */
81 uint8_t *buf;
82
83 TAILQ_ENTRY(spdk_nvmf_rdma_recv) link;
84
85 #ifdef DEBUG
86 bool in_use;
87 #endif
88 };
89
90 struct spdk_nvmf_rdma_request {
91 struct spdk_nvmf_request req;
92 bool data_from_pool;
93
94 struct spdk_nvmf_rdma_recv *recv;
95
96 struct {
97 struct ibv_send_wr wr;
98 struct ibv_sge sgl[NVMF_DEFAULT_TX_SGE];
99 } rsp;
100
101 struct {
102 struct ibv_send_wr wr;
103 struct ibv_sge sgl[NVMF_DEFAULT_TX_SGE];
104 } data;
105
106 TAILQ_ENTRY(spdk_nvmf_rdma_request) link;
107 };
108
109 struct spdk_nvmf_rdma_conn {
110 struct spdk_nvmf_conn conn;
111
112 struct rdma_cm_id *cm_id;
113 struct ibv_cq *cq;
114
115 /* The maximum number of I/O outstanding on this connection at one time */
116 uint16_t max_queue_depth;
117
118 /* The maximum number of active RDMA READ and WRITE operations at one time */
119 uint16_t max_rw_depth;
120
121 /* The current number of I/O outstanding on this connection. This number
122 * includes all I/O from the time the capsule is first received until it is
123 * completed.
124 */
125 uint16_t cur_queue_depth;
126
127 /* The number of RDMA READ and WRITE requests that are outstanding */
128 uint16_t cur_rdma_rw_depth;
129
130 /* Receives that are waiting for a request object */
131 TAILQ_HEAD(, spdk_nvmf_rdma_recv) incoming_queue;
132
133 /* Requests that are not in use */
134 TAILQ_HEAD(, spdk_nvmf_rdma_request) free_queue;
135
136 /* Requests that are waiting to obtain a data buffer */
137 TAILQ_HEAD(, spdk_nvmf_rdma_request) pending_data_buf_queue;
138
139 /* Requests that are waiting to perform an RDMA READ or WRITE */
140 TAILQ_HEAD(, spdk_nvmf_rdma_request) pending_rdma_rw_queue;
141
142 /* Array of size "max_queue_depth" containing RDMA requests. */
143 struct spdk_nvmf_rdma_request *reqs;
144
145 /* Array of size "max_queue_depth" containing RDMA recvs. */
146 struct spdk_nvmf_rdma_recv *recvs;
147
148 /* Array of size "max_queue_depth" containing 64 byte capsules
149 * used for receive.
150 */
151 union nvmf_h2c_msg *cmds;
152 struct ibv_mr *cmds_mr;
153
154 /* Array of size "max_queue_depth" containing 16 byte completions
155 * to be sent back to the user.
156 */
157 union nvmf_c2h_msg *cpls;
158 struct ibv_mr *cpls_mr;
159
160 /* Array of size "max_queue_depth * InCapsuleDataSize" containing
161 * buffers to be used for in capsule data.
162 */
163 void *bufs;
164 struct ibv_mr *bufs_mr;
165
166 TAILQ_ENTRY(spdk_nvmf_rdma_conn) link;
167 };
168
169 /* List of RDMA connections that have not yet received a CONNECT capsule */
170 static TAILQ_HEAD(, spdk_nvmf_rdma_conn) g_pending_conns = TAILQ_HEAD_INITIALIZER(g_pending_conns);
171
172 struct spdk_nvmf_rdma_session {
173 struct spdk_nvmf_session session;
174
175 SLIST_HEAD(, spdk_nvmf_rdma_buf) data_buf_pool;
176
177 struct ibv_context *verbs;
178
179 uint8_t *buf;
180 struct ibv_mr *buf_mr;
181 };
182
183 struct spdk_nvmf_rdma_listen_addr {
184 char *traddr;
185 char *trsvcid;
186 struct rdma_cm_id *id;
187 struct ibv_device_attr attr;
188 struct ibv_comp_channel *comp_channel;
189 uint32_t ref;
190 bool is_listened;
191 TAILQ_ENTRY(spdk_nvmf_rdma_listen_addr) link;
192 };
193
194 struct spdk_nvmf_rdma {
195 struct rdma_event_channel *event_channel;
196
197 pthread_mutex_t lock;
198
199 uint16_t max_queue_depth;
200 uint32_t max_io_size;
201 uint32_t in_capsule_data_size;
202
203 TAILQ_HEAD(, spdk_nvmf_rdma_listen_addr) listen_addrs;
204 };
205
206 static struct spdk_nvmf_rdma g_rdma = {
207 .lock = PTHREAD_MUTEX_INITIALIZER,
208 .listen_addrs = TAILQ_HEAD_INITIALIZER(g_rdma.listen_addrs),
209 };
210
211 static inline struct spdk_nvmf_rdma_conn *
212 get_rdma_conn(struct spdk_nvmf_conn *conn)
213 {
214 return (struct spdk_nvmf_rdma_conn *)((uintptr_t)conn - offsetof(struct spdk_nvmf_rdma_conn, conn));
215 }
216
217 static inline struct spdk_nvmf_rdma_request *
218 get_rdma_req(struct spdk_nvmf_request *req)
219 {
220 return (struct spdk_nvmf_rdma_request *)((uintptr_t)req - offsetof(struct spdk_nvmf_rdma_request,
221 req));
222 }
223
224 static inline struct spdk_nvmf_rdma_session *
225 get_rdma_sess(struct spdk_nvmf_session *sess)
226 {
227 return (struct spdk_nvmf_rdma_session *)((uintptr_t)sess - offsetof(struct spdk_nvmf_rdma_session,
228 session));
229 }
230
231 static void
232 spdk_nvmf_rdma_conn_destroy(struct spdk_nvmf_rdma_conn *rdma_conn)
233 {
234 if (rdma_conn->cmds_mr) {
235 ibv_dereg_mr(rdma_conn->cmds_mr);
236 }
237
238 if (rdma_conn->cpls_mr) {
239 ibv_dereg_mr(rdma_conn->cpls_mr);
240 }
241
242 if (rdma_conn->bufs_mr) {
243 ibv_dereg_mr(rdma_conn->bufs_mr);
244 }
245
246 if (rdma_conn->cm_id) {
247 rdma_destroy_qp(rdma_conn->cm_id);
248 rdma_destroy_id(rdma_conn->cm_id);
249 }
250
251 if (rdma_conn->cq) {
252 ibv_destroy_cq(rdma_conn->cq);
253 }
254
255 /* Free all memory */
256 spdk_free(rdma_conn->cmds);
257 spdk_free(rdma_conn->cpls);
258 spdk_free(rdma_conn->bufs);
259 free(rdma_conn->reqs);
260 free(rdma_conn);
261 }
262
263 static struct spdk_nvmf_rdma_conn *
264 spdk_nvmf_rdma_conn_create(struct rdma_cm_id *id, struct ibv_comp_channel *channel,
265 uint16_t max_queue_depth, uint16_t max_rw_depth, uint32_t subsystem_id)
266 {
267 struct spdk_nvmf_rdma_conn *rdma_conn;
268 struct spdk_nvmf_conn *conn;
269 int rc, i;
270 struct ibv_qp_init_attr attr;
271 struct spdk_nvmf_rdma_recv *rdma_recv;
272 struct spdk_nvmf_rdma_request *rdma_req;
273
274 rdma_conn = calloc(1, sizeof(struct spdk_nvmf_rdma_conn));
275 if (rdma_conn == NULL) {
276 SPDK_ERRLOG("Could not allocate new connection.\n");
277 return NULL;
278 }
279
280 rdma_conn->max_queue_depth = max_queue_depth;
281 rdma_conn->max_rw_depth = max_rw_depth;
282 TAILQ_INIT(&rdma_conn->incoming_queue);
283 TAILQ_INIT(&rdma_conn->free_queue);
284 TAILQ_INIT(&rdma_conn->pending_data_buf_queue);
285 TAILQ_INIT(&rdma_conn->pending_rdma_rw_queue);
286
287 rdma_conn->cq = ibv_create_cq(id->verbs, max_queue_depth * 3, rdma_conn, channel, 0);
288 if (!rdma_conn->cq) {
289 SPDK_ERRLOG("Unable to create completion queue\n");
290 SPDK_ERRLOG("Completion Channel: %p Id: %p Verbs: %p\n", channel, id, id->verbs);
291 SPDK_ERRLOG("Errno %d: %s\n", errno, strerror(errno));
292 rdma_destroy_id(id);
293 spdk_nvmf_rdma_conn_destroy(rdma_conn);
294 return NULL;
295 }
296
297 memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
298 attr.qp_type = IBV_QPT_RC;
299 attr.send_cq = rdma_conn->cq;
300 attr.recv_cq = rdma_conn->cq;
301 attr.cap.max_send_wr = max_queue_depth * 2; /* SEND, READ, and WRITE operations */
302 attr.cap.max_recv_wr = max_queue_depth; /* RECV operations */
303 attr.cap.max_send_sge = NVMF_DEFAULT_TX_SGE;
304 attr.cap.max_recv_sge = NVMF_DEFAULT_RX_SGE;
305
306 rc = rdma_create_qp(id, NULL, &attr);
307 if (rc) {
308 SPDK_ERRLOG("rdma_create_qp failed\n");
309 SPDK_ERRLOG("Errno %d: %s\n", errno, strerror(errno));
310 rdma_destroy_id(id);
311 spdk_nvmf_rdma_conn_destroy(rdma_conn);
312 return NULL;
313 }
314
315 conn = &rdma_conn->conn;
316 conn->transport = &spdk_nvmf_transport_rdma;
317 id->context = conn;
318 rdma_conn->cm_id = id;
319
320 SPDK_TRACELOG(SPDK_TRACE_RDMA, "New RDMA Connection: %p\n", conn);
321
322 rdma_conn->reqs = calloc(max_queue_depth, sizeof(*rdma_conn->reqs));
323 rdma_conn->recvs = calloc(max_queue_depth, sizeof(*rdma_conn->recvs));
324 rdma_conn->cmds = spdk_zmalloc(max_queue_depth * sizeof(*rdma_conn->cmds),
325 0x1000, NULL);
326 rdma_conn->cpls = spdk_zmalloc(max_queue_depth * sizeof(*rdma_conn->cpls),
327 0x1000, NULL);
328 rdma_conn->bufs = spdk_zmalloc(max_queue_depth * g_rdma.in_capsule_data_size,
329 0x1000, NULL);
330 if (!rdma_conn->reqs || !rdma_conn->recvs || !rdma_conn->cmds ||
331 !rdma_conn->cpls || !rdma_conn->bufs) {
332 SPDK_ERRLOG("Unable to allocate sufficient memory for RDMA queue.\n");
333 spdk_nvmf_rdma_conn_destroy(rdma_conn);
334 return NULL;
335 }
336
337 rdma_conn->cmds_mr = ibv_reg_mr(id->pd, rdma_conn->cmds,
338 max_queue_depth * sizeof(*rdma_conn->cmds),
339 IBV_ACCESS_LOCAL_WRITE);
340 rdma_conn->cpls_mr = ibv_reg_mr(id->pd, rdma_conn->cpls,
341 max_queue_depth * sizeof(*rdma_conn->cpls),
342 0);
343 rdma_conn->bufs_mr = ibv_reg_mr(id->pd, rdma_conn->bufs,
344 max_queue_depth * g_rdma.in_capsule_data_size,
345 IBV_ACCESS_LOCAL_WRITE |
346 IBV_ACCESS_REMOTE_WRITE);
347 if (!rdma_conn->cmds_mr || !rdma_conn->cpls_mr || !rdma_conn->bufs_mr) {
348 SPDK_ERRLOG("Unable to register required memory for RDMA queue.\n");
349 spdk_nvmf_rdma_conn_destroy(rdma_conn);
350 return NULL;
351 }
352 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Command Array: %p Length: %lx LKey: %x\n",
353 rdma_conn->cmds, max_queue_depth * sizeof(*rdma_conn->cmds), rdma_conn->cmds_mr->lkey);
354 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Completion Array: %p Length: %lx LKey: %x\n",
355 rdma_conn->cpls, max_queue_depth * sizeof(*rdma_conn->cpls), rdma_conn->cpls_mr->lkey);
356 SPDK_TRACELOG(SPDK_TRACE_RDMA, "In Capsule Data Array: %p Length: %x LKey: %x\n",
357 rdma_conn->bufs, max_queue_depth * g_rdma.in_capsule_data_size, rdma_conn->bufs_mr->lkey);
358
359 for (i = 0; i < max_queue_depth; i++) {
360 struct ibv_recv_wr *bad_wr = NULL;
361
362 rdma_recv = &rdma_conn->recvs[i];
363
364 /* Set up memory to receive commands */
365 rdma_recv->buf = (void *)((uintptr_t)rdma_conn->bufs + (i * g_rdma.in_capsule_data_size));
366
367 rdma_recv->sgl[0].addr = (uintptr_t)&rdma_conn->cmds[i];
368 rdma_recv->sgl[0].length = sizeof(rdma_conn->cmds[i]);
369 rdma_recv->sgl[0].lkey = rdma_conn->cmds_mr->lkey;
370
371 rdma_recv->sgl[1].addr = (uintptr_t)rdma_recv->buf;
372 rdma_recv->sgl[1].length = g_rdma.in_capsule_data_size;
373 rdma_recv->sgl[1].lkey = rdma_conn->bufs_mr->lkey;
374
375 rdma_recv->wr.wr_id = (uintptr_t)rdma_recv;
376 rdma_recv->wr.sg_list = rdma_recv->sgl;
377 rdma_recv->wr.num_sge = SPDK_COUNTOF(rdma_recv->sgl);
378 #ifdef DEBUG
379 rdma_recv->in_use = false;
380 #endif
381
382 rc = ibv_post_recv(rdma_conn->cm_id->qp, &rdma_recv->wr, &bad_wr);
383 if (rc) {
384 SPDK_ERRLOG("Unable to post capsule for RDMA RECV\n");
385 spdk_nvmf_rdma_conn_destroy(rdma_conn);
386 return NULL;
387 }
388 }
389
390 for (i = 0; i < max_queue_depth; i++) {
391 rdma_req = &rdma_conn->reqs[i];
392
393 rdma_req->req.conn = &rdma_conn->conn;
394 rdma_req->req.cmd = NULL;
395
396 /* Set up memory to send responses */
397 rdma_req->req.rsp = &rdma_conn->cpls[i];
398
399 rdma_req->rsp.sgl[0].addr = (uintptr_t)&rdma_conn->cpls[i];
400 rdma_req->rsp.sgl[0].length = sizeof(rdma_conn->cpls[i]);
401 rdma_req->rsp.sgl[0].lkey = rdma_conn->cpls_mr->lkey;
402
403 rdma_req->rsp.wr.wr_id = (uintptr_t)rdma_req;
404 rdma_req->rsp.wr.next = NULL;
405 rdma_req->rsp.wr.opcode = IBV_WR_SEND;
406 rdma_req->rsp.wr.send_flags = IBV_SEND_SIGNALED;
407 rdma_req->rsp.wr.sg_list = rdma_req->rsp.sgl;
408 rdma_req->rsp.wr.num_sge = SPDK_COUNTOF(rdma_req->rsp.sgl);
409
410 /* Set up memory for data buffers */
411 rdma_req->data.wr.wr_id = (uint64_t)rdma_req;
412 rdma_req->data.wr.next = NULL;
413 rdma_req->data.wr.send_flags = IBV_SEND_SIGNALED;
414 rdma_req->data.wr.sg_list = rdma_req->data.sgl;
415 rdma_req->data.wr.num_sge = SPDK_COUNTOF(rdma_req->data.sgl);
416
417 TAILQ_INSERT_TAIL(&rdma_conn->free_queue, rdma_req, link);
418 }
419
420 return rdma_conn;
421 }
422
423 static int
424 request_transfer_in(struct spdk_nvmf_request *req)
425 {
426 int rc;
427 struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
428 struct spdk_nvmf_conn *conn = req->conn;
429 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
430 struct ibv_send_wr *bad_wr = NULL;
431
432 assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
433
434 rdma_conn->cur_rdma_rw_depth++;
435
436 SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA READ POSTED. Request: %p Connection: %p\n", req, conn);
437 spdk_trace_record(TRACE_RDMA_READ_START, 0, 0, (uintptr_t)req, 0);
438
439 rdma_req->data.wr.opcode = IBV_WR_RDMA_READ;
440 rdma_req->data.wr.next = NULL;
441 rc = ibv_post_send(rdma_conn->cm_id->qp, &rdma_req->data.wr, &bad_wr);
442 if (rc) {
443 SPDK_ERRLOG("Unable to transfer data from host to target\n");
444 return -1;
445 }
446
447 return 0;
448 }
449
450 static int
451 request_transfer_out(struct spdk_nvmf_request *req)
452 {
453 int rc;
454 struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
455 struct spdk_nvmf_conn *conn = req->conn;
456 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
457 struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
458 struct ibv_recv_wr *bad_recv_wr = NULL;
459 struct ibv_send_wr *send_wr, *bad_send_wr = NULL;
460
461 /* Advance our sq_head pointer */
462 if (conn->sq_head == conn->sq_head_max) {
463 conn->sq_head = 0;
464 } else {
465 conn->sq_head++;
466 }
467 rsp->sqhd = conn->sq_head;
468
469 /* Post the capsule to the recv buffer */
470 assert(rdma_req->recv != NULL);
471 #ifdef DEBUG
472 assert(rdma_req->recv->in_use == true);
473 rdma_req->recv->in_use = false;
474 #endif
475 SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA RECV POSTED. Recv: %p Connection: %p\n", rdma_req->recv,
476 rdma_conn);
477 rc = ibv_post_recv(rdma_conn->cm_id->qp, &rdma_req->recv->wr, &bad_recv_wr);
478 if (rc) {
479 SPDK_ERRLOG("Unable to re-post rx descriptor\n");
480 return rc;
481 }
482 rdma_req->recv = NULL;
483
484 /* Build the response which consists of an optional
485 * RDMA WRITE to transfer data, plus an RDMA SEND
486 * containing the response.
487 */
488 send_wr = &rdma_req->rsp.wr;
489
490 if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
491 req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
492 SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE POSTED. Request: %p Connection: %p\n", req, conn);
493 spdk_trace_record(TRACE_RDMA_WRITE_START, 0, 0, (uintptr_t)req, 0);
494
495 rdma_conn->cur_rdma_rw_depth++;
496 rdma_req->data.wr.opcode = IBV_WR_RDMA_WRITE;
497
498 rdma_req->data.wr.next = send_wr;
499 send_wr = &rdma_req->data.wr;
500 }
501
502 SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA SEND POSTED. Request: %p Connection: %p\n", req, conn);
503 spdk_trace_record(TRACE_NVMF_IO_COMPLETE, 0, 0, (uintptr_t)req, 0);
504
505 /* Send the completion */
506 rc = ibv_post_send(rdma_conn->cm_id->qp, send_wr, &bad_send_wr);
507 if (rc) {
508 SPDK_ERRLOG("Unable to send response capsule\n");
509 }
510
511 return rc;
512 }
513
514 static int
515 spdk_nvmf_rdma_request_transfer_data(struct spdk_nvmf_request *req)
516 {
517 struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
518 struct spdk_nvmf_conn *conn = req->conn;
519 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
520
521 if (req->xfer == SPDK_NVME_DATA_NONE) {
522 /* If no data transfer, this can bypass the queue */
523 return request_transfer_out(req);
524 }
525
526 if (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
527 if (req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
528 return request_transfer_out(req);
529 } else if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
530 return request_transfer_in(req);
531 }
532 } else {
533 TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
534 }
535
536 return 0;
537 }
538
539 static int
540 nvmf_rdma_connect(struct rdma_cm_event *event)
541 {
542 struct spdk_nvmf_rdma_conn *rdma_conn = NULL;
543 struct spdk_nvmf_rdma_listen_addr *addr;
544 struct rdma_conn_param *rdma_param = NULL;
545 struct rdma_conn_param ctrlr_event_data;
546 const struct spdk_nvmf_rdma_request_private_data *private_data = NULL;
547 struct spdk_nvmf_rdma_accept_private_data accept_data;
548 uint16_t sts = 0;
549 uint16_t max_queue_depth;
550 uint16_t max_rw_depth;
551 uint32_t subsystem_id = 0;
552 int rc;
553
554 if (event->id == NULL) {
555 SPDK_ERRLOG("connect request: missing cm_id\n");
556 goto err0;
557 }
558
559 if (event->id->verbs == NULL) {
560 SPDK_ERRLOG("connect request: missing cm_id ibv_context\n");
561 goto err0;
562 }
563
564 rdma_param = &event->param.conn;
565 if (rdma_param->private_data == NULL ||
566 rdma_param->private_data_len < sizeof(struct spdk_nvmf_rdma_request_private_data)) {
567 SPDK_ERRLOG("connect request: no private data provided\n");
568 goto err0;
569 }
570 private_data = rdma_param->private_data;
571
572 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Connect Recv on fabric intf name %s, dev_name %s\n",
573 event->id->verbs->device->name, event->id->verbs->device->dev_name);
574
575 addr = event->listen_id->context;
576 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Listen Id was %p with verbs %p. ListenAddr: %p\n",
577 event->listen_id, event->listen_id->verbs, addr);
578
579 /* Figure out the supported queue depth. This is a multi-step process
580 * that takes into account hardware maximums, host provided values,
581 * and our target's internal memory limits */
582
583 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Calculating Queue Depth\n");
584
585 /* Start with the maximum queue depth allowed by the target */
586 max_queue_depth = g_rdma.max_queue_depth;
587 max_rw_depth = g_rdma.max_queue_depth;
588 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Target Max Queue Depth: %d\n", g_rdma.max_queue_depth);
589
590 /* Next check the local NIC's hardware limitations */
591 SPDK_TRACELOG(SPDK_TRACE_RDMA,
592 "Local NIC Max Send/Recv Queue Depth: %d Max Read/Write Queue Depth: %d\n",
593 addr->attr.max_qp_wr, addr->attr.max_qp_rd_atom);
594 max_queue_depth = spdk_min(max_queue_depth, addr->attr.max_qp_wr);
595 max_rw_depth = spdk_min(max_rw_depth, addr->attr.max_qp_rd_atom);
596
597 /* Next check the remote NIC's hardware limitations */
598 SPDK_TRACELOG(SPDK_TRACE_RDMA,
599 "Host (Initiator) NIC Max Incoming RDMA R/W operations: %d Max Outgoing RDMA R/W operations: %d\n",
600 rdma_param->initiator_depth, rdma_param->responder_resources);
601 if (rdma_param->initiator_depth > 0) {
602 max_rw_depth = spdk_min(max_rw_depth, rdma_param->initiator_depth);
603 }
604
605 /* Finally check for the host software requested values, which are
606 * optional. */
607 if (rdma_param->private_data != NULL &&
608 rdma_param->private_data_len >= sizeof(struct spdk_nvmf_rdma_request_private_data)) {
609 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host Receive Queue Size: %d\n", private_data->hrqsize);
610 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Host Send Queue Size: %d\n", private_data->hsqsize);
611 max_queue_depth = spdk_min(max_queue_depth, private_data->hrqsize);
612 max_queue_depth = spdk_min(max_queue_depth, private_data->hsqsize + 1);
613 }
614
615 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Final Negotiated Queue Depth: %d R/W Depth: %d\n",
616 max_queue_depth, max_rw_depth);
617
618 /* Init the NVMf rdma transport connection */
619 rdma_conn = spdk_nvmf_rdma_conn_create(event->id, addr->comp_channel, max_queue_depth,
620 max_rw_depth, subsystem_id);
621 if (rdma_conn == NULL) {
622 SPDK_ERRLOG("Error on nvmf connection creation\n");
623 goto err1;
624 }
625
626 accept_data.recfmt = 0;
627 accept_data.crqsize = max_queue_depth;
628 ctrlr_event_data = *rdma_param;
629 ctrlr_event_data.private_data = &accept_data;
630 ctrlr_event_data.private_data_len = sizeof(accept_data);
631 if (event->id->ps == RDMA_PS_TCP) {
632 ctrlr_event_data.responder_resources = 0; /* We accept 0 reads from the host */
633 ctrlr_event_data.initiator_depth = max_rw_depth;
634 }
635
636 rc = rdma_accept(event->id, &ctrlr_event_data);
637 if (rc) {
638 SPDK_ERRLOG("Error on rdma_accept\n");
639 goto err2;
640 }
641 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Sent back the accept\n");
642
643 /* Add this RDMA connection to the global list until a CONNECT capsule
644 * is received. */
645 TAILQ_INSERT_TAIL(&g_pending_conns, rdma_conn, link);
646
647 return 0;
648
649 err2:
650 spdk_nvmf_rdma_conn_destroy(rdma_conn);
651
652 err1: {
653 struct spdk_nvmf_rdma_reject_private_data rej_data;
654
655 rej_data.status.sc = sts;
656 rdma_reject(event->id, &ctrlr_event_data, sizeof(rej_data));
657 }
658 err0:
659 return -1;
660 }
661
662 static int
663 nvmf_rdma_disconnect(struct rdma_cm_event *evt)
664 {
665 struct spdk_nvmf_conn *conn;
666 struct spdk_nvmf_session *session;
667 struct spdk_nvmf_subsystem *subsystem;
668 struct spdk_nvmf_rdma_conn *rdma_conn;
669
670 if (evt->id == NULL) {
671 SPDK_ERRLOG("disconnect request: missing cm_id\n");
672 return -1;
673 }
674
675 conn = evt->id->context;
676 if (conn == NULL) {
677 SPDK_ERRLOG("disconnect request: no active connection\n");
678 return -1;
679 }
680 /* ack the disconnect event before rdma_destroy_id */
681 rdma_ack_cm_event(evt);
682
683 rdma_conn = get_rdma_conn(conn);
684
685 session = conn->sess;
686 if (session == NULL) {
687 /* No session has been established yet. That means the conn
688 * must be in the pending connections list. Remove it. */
689 TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
690 spdk_nvmf_rdma_conn_destroy(rdma_conn);
691 return 0;
692 }
693
694 subsystem = session->subsys;
695
696 subsystem->disconnect_cb(subsystem->cb_ctx, conn);
697
698 return 0;
699 }
700
701 #ifdef DEBUG
702 static const char *CM_EVENT_STR[] = {
703 "RDMA_CM_EVENT_ADDR_RESOLVED",
704 "RDMA_CM_EVENT_ADDR_ERROR",
705 "RDMA_CM_EVENT_ROUTE_RESOLVED",
706 "RDMA_CM_EVENT_ROUTE_ERROR",
707 "RDMA_CM_EVENT_CONNECT_REQUEST",
708 "RDMA_CM_EVENT_CONNECT_RESPONSE",
709 "RDMA_CM_EVENT_CONNECT_ERROR",
710 "RDMA_CM_EVENT_UNREACHABLE",
711 "RDMA_CM_EVENT_REJECTED",
712 "RDMA_CM_EVENT_ESTABLISHED",
713 "RDMA_CM_EVENT_DISCONNECTED",
714 "RDMA_CM_EVENT_DEVICE_REMOVAL",
715 "RDMA_CM_EVENT_MULTICAST_JOIN",
716 "RDMA_CM_EVENT_MULTICAST_ERROR",
717 "RDMA_CM_EVENT_ADDR_CHANGE",
718 "RDMA_CM_EVENT_TIMEWAIT_EXIT"
719 };
720 #endif /* DEBUG */
721
722 typedef enum _spdk_nvmf_request_prep_type {
723 SPDK_NVMF_REQUEST_PREP_ERROR = -1,
724 SPDK_NVMF_REQUEST_PREP_READY = 0,
725 SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER = 1,
726 SPDK_NVMF_REQUEST_PREP_PENDING_DATA = 2,
727 } spdk_nvmf_request_prep_type;
728
729 static spdk_nvmf_request_prep_type
730 spdk_nvmf_request_prep_data(struct spdk_nvmf_request *req)
731 {
732 struct spdk_nvme_cmd *cmd = &req->cmd->nvme_cmd;
733 struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
734 struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
735 struct spdk_nvmf_rdma_session *rdma_sess;
736 struct spdk_nvme_sgl_descriptor *sgl;
737
738 req->length = 0;
739 req->data = NULL;
740
741 if (cmd->opc == SPDK_NVME_OPC_FABRIC) {
742 req->xfer = spdk_nvme_opc_get_data_transfer(req->cmd->nvmf_cmd.fctype);
743 } else {
744 req->xfer = spdk_nvme_opc_get_data_transfer(cmd->opc);
745 if ((req->conn->type == CONN_TYPE_AQ) &&
746 ((cmd->opc == SPDK_NVME_OPC_GET_FEATURES) ||
747 (cmd->opc == SPDK_NVME_OPC_SET_FEATURES))) {
748 switch (cmd->cdw10 & 0xff) {
749 case SPDK_NVME_FEAT_LBA_RANGE_TYPE:
750 case SPDK_NVME_FEAT_AUTONOMOUS_POWER_STATE_TRANSITION:
751 case SPDK_NVME_FEAT_HOST_IDENTIFIER:
752 break;
753 default:
754 req->xfer = SPDK_NVME_DATA_NONE;
755 break;
756 }
757 }
758 }
759
760 if (req->xfer == SPDK_NVME_DATA_NONE) {
761 return SPDK_NVMF_REQUEST_PREP_READY;
762 }
763
764 sgl = &cmd->dptr.sgl1;
765
766 if (sgl->generic.type == SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK &&
767 (sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_ADDRESS ||
768 sgl->keyed.subtype == SPDK_NVME_SGL_SUBTYPE_INVALIDATE_KEY)) {
769 if (sgl->keyed.length > g_rdma.max_io_size) {
770 SPDK_ERRLOG("SGL length 0x%x exceeds max io size 0x%x\n",
771 sgl->keyed.length, g_rdma.max_io_size);
772 rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
773 return SPDK_NVMF_REQUEST_PREP_ERROR;
774 }
775
776 if (sgl->keyed.length == 0) {
777 req->xfer = SPDK_NVME_DATA_NONE;
778 return SPDK_NVMF_REQUEST_PREP_READY;
779 }
780
781 req->length = sgl->keyed.length;
782 rdma_req->data.sgl[0].length = sgl->keyed.length;
783 rdma_req->data.wr.wr.rdma.rkey = sgl->keyed.key;
784 rdma_req->data.wr.wr.rdma.remote_addr = sgl->address;
785
786 rdma_sess = get_rdma_sess(req->conn->sess);
787 if (!rdma_sess) {
788 /* The only time a connection won't have a session
789 * is when this is the CONNECT request.
790 */
791 assert(cmd->opc == SPDK_NVME_OPC_FABRIC);
792 assert(req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER);
793 assert(req->length <= g_rdma.in_capsule_data_size);
794
795 /* Use the in capsule data buffer, even though this isn't in capsule data. */
796 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request using in capsule buffer for non-capsule data\n");
797 req->data = rdma_req->recv->buf;
798 rdma_req->data.sgl[0].lkey = get_rdma_conn(req->conn)->bufs_mr->lkey;
799 rdma_req->data_from_pool = false;
800 } else {
801 req->data = SLIST_FIRST(&rdma_sess->data_buf_pool);
802 rdma_req->data.sgl[0].lkey = rdma_sess->buf_mr->lkey;
803 rdma_req->data_from_pool = true;
804 if (!req->data) {
805 /* No available buffers. Queue this request up. */
806 SPDK_TRACELOG(SPDK_TRACE_RDMA, "No available large data buffers. Queueing request %p\n", req);
807 /* This will get assigned when we actually obtain a buffer */
808 rdma_req->data.sgl[0].addr = (uintptr_t)NULL;
809 return SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER;
810 }
811
812 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p took buffer from central pool\n", req);
813 SLIST_REMOVE_HEAD(&rdma_sess->data_buf_pool, link);
814 }
815
816 rdma_req->data.sgl[0].addr = (uintptr_t)req->data;
817
818 if (req->xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
819 return SPDK_NVMF_REQUEST_PREP_PENDING_DATA;
820 } else {
821 return SPDK_NVMF_REQUEST_PREP_READY;
822 }
823 } else if (sgl->generic.type == SPDK_NVME_SGL_TYPE_DATA_BLOCK &&
824 sgl->unkeyed.subtype == SPDK_NVME_SGL_SUBTYPE_OFFSET) {
825 uint64_t offset = sgl->address;
826 uint32_t max_len = g_rdma.in_capsule_data_size;
827
828 SPDK_TRACELOG(SPDK_TRACE_NVMF, "In-capsule data: offset 0x%" PRIx64 ", length 0x%x\n",
829 offset, sgl->unkeyed.length);
830
831 if (offset > max_len) {
832 SPDK_ERRLOG("In-capsule offset 0x%" PRIx64 " exceeds capsule length 0x%x\n",
833 offset, max_len);
834 rsp->status.sc = SPDK_NVME_SC_INVALID_SGL_OFFSET;
835 return SPDK_NVMF_REQUEST_PREP_ERROR;
836 }
837 max_len -= (uint32_t)offset;
838
839 if (sgl->unkeyed.length > max_len) {
840 SPDK_ERRLOG("In-capsule data length 0x%x exceeds capsule length 0x%x\n",
841 sgl->unkeyed.length, max_len);
842 rsp->status.sc = SPDK_NVME_SC_DATA_SGL_LENGTH_INVALID;
843 return SPDK_NVMF_REQUEST_PREP_ERROR;
844 }
845
846 if (sgl->unkeyed.length == 0) {
847 req->xfer = SPDK_NVME_DATA_NONE;
848 return SPDK_NVMF_REQUEST_PREP_READY;
849 }
850
851 req->data = rdma_req->recv->buf + offset;
852 rdma_req->data_from_pool = false;
853 req->length = sgl->unkeyed.length;
854 return SPDK_NVMF_REQUEST_PREP_READY;
855 }
856
857 SPDK_ERRLOG("Invalid NVMf I/O Command SGL: Type 0x%x, Subtype 0x%x\n",
858 sgl->generic.type, sgl->generic.subtype);
859 rsp->status.sc = SPDK_NVME_SC_SGL_DESCRIPTOR_TYPE_INVALID;
860 return SPDK_NVMF_REQUEST_PREP_ERROR;
861 }
862
863 static int
864 spdk_nvmf_rdma_handle_pending_rdma_rw(struct spdk_nvmf_conn *conn)
865 {
866 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
867 struct spdk_nvmf_rdma_session *rdma_sess;
868 struct spdk_nvmf_rdma_request *rdma_req, *tmp;
869 int rc;
870 int count = 0;
871
872 /* First, try to assign free data buffers to requests that need one */
873 if (conn->sess) {
874 rdma_sess = get_rdma_sess(conn->sess);
875 TAILQ_FOREACH_SAFE(rdma_req, &rdma_conn->pending_data_buf_queue, link, tmp) {
876 assert(rdma_req->req.data == NULL);
877 rdma_req->req.data = SLIST_FIRST(&rdma_sess->data_buf_pool);
878 if (!rdma_req->req.data) {
879 break;
880 }
881 SLIST_REMOVE_HEAD(&rdma_sess->data_buf_pool, link);
882 rdma_req->data.sgl[0].addr = (uintptr_t)rdma_req->req.data;
883 TAILQ_REMOVE(&rdma_conn->pending_data_buf_queue, rdma_req, link);
884 if (rdma_req->req.xfer == SPDK_NVME_DATA_HOST_TO_CONTROLLER) {
885 TAILQ_INSERT_TAIL(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
886 } else {
887 rc = spdk_nvmf_request_exec(&rdma_req->req);
888 if (rc < 0) {
889 return -1;
890 }
891 count++;
892 }
893 }
894 }
895
896 /* Try to initiate RDMA Reads or Writes on requests that have data buffers */
897 while (rdma_conn->cur_rdma_rw_depth < rdma_conn->max_rw_depth) {
898 if (TAILQ_EMPTY(&rdma_conn->pending_rdma_rw_queue)) {
899 break;
900 }
901
902 rdma_req = TAILQ_FIRST(&rdma_conn->pending_rdma_rw_queue);
903 TAILQ_REMOVE(&rdma_conn->pending_rdma_rw_queue, rdma_req, link);
904
905 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Submitting previously queued for RDMA R/W request %p\n", rdma_req);
906
907 rc = spdk_nvmf_rdma_request_transfer_data(&rdma_req->req);
908 if (rc) {
909 return -1;
910 }
911 }
912
913 return count;
914 }
915
916 /* Public API callbacks begin here */
917
918 static int
919 spdk_nvmf_rdma_init(uint16_t max_queue_depth, uint32_t max_io_size,
920 uint32_t in_capsule_data_size)
921 {
922 int rc;
923
924 SPDK_NOTICELOG("*** RDMA Transport Init ***\n");
925
926 pthread_mutex_lock(&g_rdma.lock);
927 g_rdma.max_queue_depth = max_queue_depth;
928 g_rdma.max_io_size = max_io_size;
929 g_rdma.in_capsule_data_size = in_capsule_data_size;
930
931 g_rdma.event_channel = rdma_create_event_channel();
932 if (g_rdma.event_channel == NULL) {
933 SPDK_ERRLOG("rdma_create_event_channel() failed, %s\n", strerror(errno));
934 pthread_mutex_unlock(&g_rdma.lock);
935 return -1;
936 }
937
938 rc = fcntl(g_rdma.event_channel->fd, F_SETFL, O_NONBLOCK);
939 if (rc < 0) {
940 SPDK_ERRLOG("fcntl to set fd to non-blocking failed\n");
941 pthread_mutex_unlock(&g_rdma.lock);
942 return -1;
943 }
944
945 pthread_mutex_unlock(&g_rdma.lock);
946 return 0;
947 }
948
949 static void
950 spdk_nvmf_rdma_listen_addr_free(struct spdk_nvmf_rdma_listen_addr *addr)
951 {
952 if (!addr) {
953 return;
954 }
955
956 free(addr->traddr);
957 free(addr->trsvcid);
958 free(addr);
959 }
960 static int
961 spdk_nvmf_rdma_fini(void)
962 {
963 pthread_mutex_lock(&g_rdma.lock);
964
965 assert(TAILQ_EMPTY(&g_rdma.listen_addrs));
966 if (g_rdma.event_channel != NULL) {
967 rdma_destroy_event_channel(g_rdma.event_channel);
968 }
969 pthread_mutex_unlock(&g_rdma.lock);
970
971 return 0;
972 }
973
974 static int
975 spdk_nvmf_rdma_listen_remove(struct spdk_nvmf_listen_addr *listen_addr)
976 {
977 struct spdk_nvmf_rdma_listen_addr *addr, *tmp;
978
979 pthread_mutex_lock(&g_rdma.lock);
980 TAILQ_FOREACH_SAFE(addr, &g_rdma.listen_addrs, link, tmp) {
981 if ((!strcasecmp(addr->traddr, listen_addr->traddr)) &&
982 (!strcasecmp(addr->trsvcid, listen_addr->trsvcid))) {
983 assert(addr->ref > 0);
984 addr->ref--;
985 if (!addr->ref) {
986 TAILQ_REMOVE(&g_rdma.listen_addrs, addr, link);
987 ibv_destroy_comp_channel(addr->comp_channel);
988 rdma_destroy_id(addr->id);
989 spdk_nvmf_rdma_listen_addr_free(addr);
990 }
991 break;
992 }
993 }
994
995 pthread_mutex_unlock(&g_rdma.lock);
996 return 0;
997 }
998
999 static int
1000 spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn);
1001
1002 static void
1003 spdk_nvmf_rdma_addr_listen_init(struct spdk_nvmf_rdma_listen_addr *addr)
1004 {
1005 int rc;
1006
1007 rc = rdma_listen(addr->id, 10); /* 10 = backlog */
1008 if (rc < 0) {
1009 SPDK_ERRLOG("rdma_listen() failed\n");
1010 addr->ref--;
1011 assert(addr->ref == 0);
1012 TAILQ_REMOVE(&g_rdma.listen_addrs, addr, link);
1013 ibv_destroy_comp_channel(addr->comp_channel);
1014 rdma_destroy_id(addr->id);
1015 spdk_nvmf_rdma_listen_addr_free(addr);
1016 return;
1017 }
1018
1019 addr->is_listened = true;
1020
1021 SPDK_NOTICELOG("*** NVMf Target Listening on %s port %d ***\n",
1022 addr->traddr, ntohs(rdma_get_src_port(addr->id)));
1023 }
1024
1025 static void
1026 spdk_nvmf_rdma_acceptor_poll(void)
1027 {
1028 struct rdma_cm_event *event;
1029 int rc;
1030 struct spdk_nvmf_rdma_conn *rdma_conn, *tmp;
1031 struct spdk_nvmf_rdma_listen_addr *addr = NULL, *addr_tmp;
1032
1033 if (g_rdma.event_channel == NULL) {
1034 return;
1035 }
1036
1037 pthread_mutex_lock(&g_rdma.lock);
1038 TAILQ_FOREACH_SAFE(addr, &g_rdma.listen_addrs, link, addr_tmp) {
1039 if (!addr->is_listened) {
1040 spdk_nvmf_rdma_addr_listen_init(addr);
1041 }
1042 }
1043 pthread_mutex_unlock(&g_rdma.lock);
1044
1045 /* Process pending connections for incoming capsules. The only capsule
1046 * this should ever find is a CONNECT request. */
1047 TAILQ_FOREACH_SAFE(rdma_conn, &g_pending_conns, link, tmp) {
1048 rc = spdk_nvmf_rdma_poll(&rdma_conn->conn);
1049 if (rc < 0) {
1050 TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
1051 spdk_nvmf_rdma_conn_destroy(rdma_conn);
1052 } else if (rc > 0) {
1053 /* At least one request was processed which is assumed to be
1054 * a CONNECT. Remove this connection from our list. */
1055 TAILQ_REMOVE(&g_pending_conns, rdma_conn, link);
1056 }
1057 }
1058
1059 while (1) {
1060 rc = rdma_get_cm_event(g_rdma.event_channel, &event);
1061 if (rc == 0) {
1062 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Acceptor Event: %s\n", CM_EVENT_STR[event->event]);
1063
1064 switch (event->event) {
1065 case RDMA_CM_EVENT_CONNECT_REQUEST:
1066 rc = nvmf_rdma_connect(event);
1067 if (rc < 0) {
1068 SPDK_ERRLOG("Unable to process connect event. rc: %d\n", rc);
1069 break;
1070 }
1071 break;
1072 case RDMA_CM_EVENT_ESTABLISHED:
1073 break;
1074 case RDMA_CM_EVENT_ADDR_CHANGE:
1075 case RDMA_CM_EVENT_DISCONNECTED:
1076 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1077 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1078 rc = nvmf_rdma_disconnect(event);
1079 if (rc < 0) {
1080 SPDK_ERRLOG("Unable to process disconnect event. rc: %d\n", rc);
1081 break;
1082 }
1083 continue;
1084 default:
1085 SPDK_ERRLOG("Unexpected Acceptor Event [%d]\n", event->event);
1086 break;
1087 }
1088
1089 rdma_ack_cm_event(event);
1090 } else {
1091 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1092 SPDK_ERRLOG("Acceptor Event Error: %s\n", strerror(errno));
1093 }
1094 break;
1095 }
1096 }
1097 }
1098
1099 static int
1100 spdk_nvmf_rdma_listen(struct spdk_nvmf_listen_addr *listen_addr)
1101 {
1102 struct spdk_nvmf_rdma_listen_addr *addr;
1103 struct sockaddr_in saddr;
1104 int rc;
1105
1106 pthread_mutex_lock(&g_rdma.lock);
1107 assert(g_rdma.event_channel != NULL);
1108 TAILQ_FOREACH(addr, &g_rdma.listen_addrs, link) {
1109 if ((!strcasecmp(addr->traddr, listen_addr->traddr)) &&
1110 (!strcasecmp(addr->trsvcid, listen_addr->trsvcid))) {
1111 addr->ref++;
1112 /* Already listening at this address */
1113 pthread_mutex_unlock(&g_rdma.lock);
1114 return 0;
1115 }
1116 }
1117
1118 addr = calloc(1, sizeof(*addr));
1119 if (!addr) {
1120 pthread_mutex_unlock(&g_rdma.lock);
1121 return -1;
1122 }
1123
1124 addr->traddr = strdup(listen_addr->traddr);
1125 if (!addr->traddr) {
1126 spdk_nvmf_rdma_listen_addr_free(addr);
1127 pthread_mutex_unlock(&g_rdma.lock);
1128 return -1;
1129 }
1130
1131 addr->trsvcid = strdup(listen_addr->trsvcid);
1132 if (!addr->trsvcid) {
1133 spdk_nvmf_rdma_listen_addr_free(addr);
1134 pthread_mutex_unlock(&g_rdma.lock);
1135 return -1;
1136 }
1137
1138 rc = rdma_create_id(g_rdma.event_channel, &addr->id, addr, RDMA_PS_TCP);
1139 if (rc < 0) {
1140 SPDK_ERRLOG("rdma_create_id() failed\n");
1141 spdk_nvmf_rdma_listen_addr_free(addr);
1142 pthread_mutex_unlock(&g_rdma.lock);
1143 return -1;
1144 }
1145
1146 memset(&saddr, 0, sizeof(saddr));
1147 saddr.sin_family = AF_INET;
1148 saddr.sin_addr.s_addr = inet_addr(addr->traddr);
1149 saddr.sin_port = htons((uint16_t)strtoul(addr->trsvcid, NULL, 10));
1150 rc = rdma_bind_addr(addr->id, (struct sockaddr *)&saddr);
1151 if (rc < 0) {
1152 SPDK_ERRLOG("rdma_bind_addr() failed\n");
1153 rdma_destroy_id(addr->id);
1154 spdk_nvmf_rdma_listen_addr_free(addr);
1155 pthread_mutex_unlock(&g_rdma.lock);
1156 return -1;
1157 }
1158
1159 rc = ibv_query_device(addr->id->verbs, &addr->attr);
1160 if (rc < 0) {
1161 SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
1162 rdma_destroy_id(addr->id);
1163 spdk_nvmf_rdma_listen_addr_free(addr);
1164 pthread_mutex_unlock(&g_rdma.lock);
1165 return -1;
1166 }
1167
1168 addr->comp_channel = ibv_create_comp_channel(addr->id->verbs);
1169 if (!addr->comp_channel) {
1170 SPDK_ERRLOG("Failed to create completion channel\n");
1171 rdma_destroy_id(addr->id);
1172 spdk_nvmf_rdma_listen_addr_free(addr);
1173 pthread_mutex_unlock(&g_rdma.lock);
1174 return -1;
1175 }
1176 SPDK_TRACELOG(SPDK_TRACE_RDMA, "For listen id %p with context %p, created completion channel %p\n",
1177 addr->id, addr->id->verbs, addr->comp_channel);
1178
1179 rc = fcntl(addr->comp_channel->fd, F_SETFL, O_NONBLOCK);
1180 if (rc < 0) {
1181 SPDK_ERRLOG("fcntl to set comp channel to non-blocking failed\n");
1182 rdma_destroy_id(addr->id);
1183 ibv_destroy_comp_channel(addr->comp_channel);
1184 spdk_nvmf_rdma_listen_addr_free(addr);
1185 pthread_mutex_unlock(&g_rdma.lock);
1186 return -1;
1187 }
1188
1189
1190 addr->ref = 1;
1191 TAILQ_INSERT_TAIL(&g_rdma.listen_addrs, addr, link);
1192 pthread_mutex_unlock(&g_rdma.lock);
1193
1194
1195 return 0;
1196 }
1197
1198 static void
1199 spdk_nvmf_rdma_discover(struct spdk_nvmf_listen_addr *listen_addr,
1200 struct spdk_nvmf_discovery_log_page_entry *entry)
1201 {
1202 entry->trtype = SPDK_NVMF_TRTYPE_RDMA;
1203 entry->adrfam = SPDK_NVMF_ADRFAM_IPV4;
1204 entry->treq.secure_channel = SPDK_NVMF_TREQ_SECURE_CHANNEL_NOT_SPECIFIED;
1205
1206 spdk_strcpy_pad(entry->trsvcid, listen_addr->trsvcid, sizeof(entry->trsvcid), ' ');
1207 spdk_strcpy_pad(entry->traddr, listen_addr->traddr, sizeof(entry->traddr), ' ');
1208
1209 entry->tsas.rdma.rdma_qptype = SPDK_NVMF_RDMA_QPTYPE_RELIABLE_CONNECTED;
1210 entry->tsas.rdma.rdma_prtype = SPDK_NVMF_RDMA_PRTYPE_NONE;
1211 entry->tsas.rdma.rdma_cms = SPDK_NVMF_RDMA_CMS_RDMA_CM;
1212 }
1213
1214 static struct spdk_nvmf_session *
1215 spdk_nvmf_rdma_session_init(void)
1216 {
1217 struct spdk_nvmf_rdma_session *rdma_sess;
1218 int i;
1219 struct spdk_nvmf_rdma_buf *buf;
1220
1221 rdma_sess = calloc(1, sizeof(*rdma_sess));
1222 if (!rdma_sess) {
1223 return NULL;
1224 }
1225
1226 /* TODO: Make the number of elements in this pool configurable. For now, one full queue
1227 * worth seems reasonable.
1228 */
1229 rdma_sess->buf = spdk_zmalloc(g_rdma.max_queue_depth * g_rdma.max_io_size,
1230 0x20000, NULL);
1231 if (!rdma_sess->buf) {
1232 SPDK_ERRLOG("Large buffer pool allocation failed (%d x %d)\n",
1233 g_rdma.max_queue_depth, g_rdma.max_io_size);
1234 free(rdma_sess);
1235 return NULL;
1236 }
1237
1238 SLIST_INIT(&rdma_sess->data_buf_pool);
1239 for (i = 0; i < g_rdma.max_queue_depth; i++) {
1240 buf = (struct spdk_nvmf_rdma_buf *)(rdma_sess->buf + (i * g_rdma.max_io_size));
1241 SLIST_INSERT_HEAD(&rdma_sess->data_buf_pool, buf, link);
1242 }
1243
1244 rdma_sess->session.transport = &spdk_nvmf_transport_rdma;
1245
1246 return &rdma_sess->session;
1247 }
1248
1249 static void
1250 spdk_nvmf_rdma_session_fini(struct spdk_nvmf_session *session)
1251 {
1252 struct spdk_nvmf_rdma_session *rdma_sess = get_rdma_sess(session);
1253
1254 if (!rdma_sess) {
1255 return;
1256 }
1257
1258 ibv_dereg_mr(rdma_sess->buf_mr);
1259 spdk_free(rdma_sess->buf);
1260 free(rdma_sess);
1261 }
1262
1263 static int
1264 spdk_nvmf_rdma_session_add_conn(struct spdk_nvmf_session *session,
1265 struct spdk_nvmf_conn *conn)
1266 {
1267 struct spdk_nvmf_rdma_session *rdma_sess = get_rdma_sess(session);
1268 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
1269
1270 if (rdma_sess->verbs != NULL) {
1271 if (rdma_sess->verbs != rdma_conn->cm_id->verbs) {
1272 SPDK_ERRLOG("Two connections belonging to the same session cannot connect using different RDMA devices.\n");
1273 return -1;
1274 }
1275
1276 /* Nothing else to do. */
1277 return 0;
1278 }
1279
1280 rdma_sess->verbs = rdma_conn->cm_id->verbs;
1281 rdma_sess->buf_mr = ibv_reg_mr(rdma_conn->cm_id->pd, rdma_sess->buf,
1282 g_rdma.max_queue_depth * g_rdma.max_io_size,
1283 IBV_ACCESS_LOCAL_WRITE |
1284 IBV_ACCESS_REMOTE_WRITE);
1285 if (!rdma_sess->buf_mr) {
1286 SPDK_ERRLOG("Large buffer pool registration failed (%d x %d)\n",
1287 g_rdma.max_queue_depth, g_rdma.max_io_size);
1288 spdk_free(rdma_sess->buf);
1289 free(rdma_sess);
1290 return -1;
1291 }
1292
1293 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Session Shared Data Pool: %p Length: %x LKey: %x\n",
1294 rdma_sess->buf, g_rdma.max_queue_depth * g_rdma.max_io_size, rdma_sess->buf_mr->lkey);
1295
1296 return 0;
1297 }
1298
1299 static int
1300 spdk_nvmf_rdma_session_remove_conn(struct spdk_nvmf_session *session,
1301 struct spdk_nvmf_conn *conn)
1302 {
1303 return 0;
1304 }
1305
1306 static int
1307 spdk_nvmf_rdma_request_complete(struct spdk_nvmf_request *req)
1308 {
1309 struct spdk_nvme_cpl *rsp = &req->rsp->nvme_cpl;
1310 int rc;
1311
1312 if (rsp->status.sc == SPDK_NVME_SC_SUCCESS &&
1313 req->xfer == SPDK_NVME_DATA_CONTROLLER_TO_HOST) {
1314 rc = spdk_nvmf_rdma_request_transfer_data(req);
1315 } else {
1316 rc = request_transfer_out(req);
1317 }
1318
1319 return rc;
1320 }
1321
1322 static void
1323 request_release_buffer(struct spdk_nvmf_request *req)
1324 {
1325 struct spdk_nvmf_rdma_request *rdma_req = get_rdma_req(req);
1326 struct spdk_nvmf_conn *conn = req->conn;
1327 struct spdk_nvmf_rdma_session *rdma_sess;
1328 struct spdk_nvmf_rdma_buf *buf;
1329
1330 if (rdma_req->data_from_pool) {
1331 /* Put the buffer back in the pool */
1332 rdma_sess = get_rdma_sess(conn->sess);
1333 buf = req->data;
1334
1335 SLIST_INSERT_HEAD(&rdma_sess->data_buf_pool, buf, link);
1336 req->data = NULL;
1337 req->length = 0;
1338 rdma_req->data_from_pool = false;
1339 }
1340 }
1341
1342 static void
1343 spdk_nvmf_rdma_close_conn(struct spdk_nvmf_conn *conn)
1344 {
1345 spdk_nvmf_rdma_conn_destroy(get_rdma_conn(conn));
1346 }
1347
1348 static int
1349 process_incoming_queue(struct spdk_nvmf_rdma_conn *rdma_conn)
1350 {
1351 struct spdk_nvmf_rdma_recv *rdma_recv, *tmp;
1352 struct spdk_nvmf_rdma_request *rdma_req;
1353 struct spdk_nvmf_request *req;
1354 int rc, count;
1355 bool error = false;
1356
1357 count = 0;
1358 TAILQ_FOREACH_SAFE(rdma_recv, &rdma_conn->incoming_queue, link, tmp) {
1359 rdma_req = TAILQ_FIRST(&rdma_conn->free_queue);
1360 if (rdma_req == NULL) {
1361 /* Need to wait for more SEND completions */
1362 break;
1363 }
1364 TAILQ_REMOVE(&rdma_conn->free_queue, rdma_req, link);
1365 TAILQ_REMOVE(&rdma_conn->incoming_queue, rdma_recv, link);
1366 rdma_req->recv = rdma_recv;
1367 req = &rdma_req->req;
1368
1369 /* The first element of the SGL is the NVMe command */
1370 req->cmd = (union nvmf_h2c_msg *)rdma_recv->sgl[0].addr;
1371
1372 spdk_trace_record(TRACE_NVMF_IO_START, 0, 0, (uint64_t)req, 0);
1373
1374 memset(req->rsp, 0, sizeof(*req->rsp));
1375 rc = spdk_nvmf_request_prep_data(req);
1376 switch (rc) {
1377 case SPDK_NVMF_REQUEST_PREP_READY:
1378 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p is ready for execution\n", req);
1379 /* Data is immediately available */
1380 rc = spdk_nvmf_request_exec(req);
1381 if (rc < 0) {
1382 error = true;
1383 continue;
1384 }
1385 count++;
1386 break;
1387 case SPDK_NVMF_REQUEST_PREP_PENDING_BUFFER:
1388 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data buffer\n", req);
1389 TAILQ_INSERT_TAIL(&rdma_conn->pending_data_buf_queue, rdma_req, link);
1390 break;
1391 case SPDK_NVMF_REQUEST_PREP_PENDING_DATA:
1392 SPDK_TRACELOG(SPDK_TRACE_RDMA, "Request %p needs data transfer\n", req);
1393 rc = spdk_nvmf_rdma_request_transfer_data(req);
1394 if (rc < 0) {
1395 error = true;
1396 continue;
1397 }
1398 break;
1399 case SPDK_NVMF_REQUEST_PREP_ERROR:
1400 spdk_nvmf_request_complete(req);
1401 break;
1402 }
1403 }
1404
1405 if (error) {
1406 return -1;
1407 }
1408
1409 return count;
1410 }
1411
1412 static struct spdk_nvmf_rdma_request *
1413 get_rdma_req_from_wc(struct spdk_nvmf_rdma_conn *rdma_conn,
1414 struct ibv_wc *wc)
1415 {
1416 struct spdk_nvmf_rdma_request *rdma_req;
1417
1418 rdma_req = (struct spdk_nvmf_rdma_request *)wc->wr_id;
1419 assert(rdma_req != NULL);
1420 assert(rdma_req - rdma_conn->reqs >= 0);
1421 assert(rdma_req - rdma_conn->reqs < (ptrdiff_t)rdma_conn->max_queue_depth);
1422
1423 return rdma_req;
1424 }
1425
1426 static struct spdk_nvmf_rdma_recv *
1427 get_rdma_recv_from_wc(struct spdk_nvmf_rdma_conn *rdma_conn,
1428 struct ibv_wc *wc)
1429 {
1430 struct spdk_nvmf_rdma_recv *rdma_recv;
1431
1432 assert(wc->byte_len >= sizeof(struct spdk_nvmf_capsule_cmd));
1433
1434 rdma_recv = (struct spdk_nvmf_rdma_recv *)wc->wr_id;
1435 assert(rdma_recv != NULL);
1436 assert(rdma_recv - rdma_conn->recvs >= 0);
1437 assert(rdma_recv - rdma_conn->recvs < (ptrdiff_t)rdma_conn->max_queue_depth);
1438 #ifdef DEBUG
1439 assert(rdma_recv->in_use == false);
1440 rdma_recv->in_use = true;
1441 #endif
1442
1443 return rdma_recv;
1444 }
1445
1446 /* Returns the number of times that spdk_nvmf_request_exec was called,
1447 * or -1 on error.
1448 */
1449 static int
1450 spdk_nvmf_rdma_poll(struct spdk_nvmf_conn *conn)
1451 {
1452 struct ibv_wc wc[32];
1453 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
1454 struct spdk_nvmf_rdma_request *rdma_req;
1455 struct spdk_nvmf_rdma_recv *rdma_recv;
1456 struct spdk_nvmf_request *req;
1457 int reaped, i, rc;
1458 int count = 0;
1459 bool error = false;
1460
1461 /* Poll for completing operations. */
1462 rc = ibv_poll_cq(rdma_conn->cq, 32, wc);
1463 if (rc < 0) {
1464 SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
1465 errno, strerror(errno));
1466 return -1;
1467 }
1468
1469 reaped = rc;
1470 for (i = 0; i < reaped; i++) {
1471 if (wc[i].status) {
1472 SPDK_ERRLOG("CQ error on Connection %p, Request 0x%lu (%d): %s\n",
1473 conn, wc[i].wr_id, wc[i].status, ibv_wc_status_str(wc[i].status));
1474 error = true;
1475 continue;
1476 }
1477
1478 switch (wc[i].opcode) {
1479 case IBV_WC_SEND:
1480 rdma_req = get_rdma_req_from_wc(rdma_conn, &wc[i]);
1481 req = &rdma_req->req;
1482
1483 assert(rdma_conn->cur_queue_depth > 0);
1484 SPDK_TRACELOG(SPDK_TRACE_RDMA,
1485 "RDMA SEND Complete. Request: %p Connection: %p Outstanding I/O: %d\n",
1486 req, conn, rdma_conn->cur_queue_depth - 1);
1487 rdma_conn->cur_queue_depth--;
1488
1489 /* The request may still own a data buffer. Release it */
1490 request_release_buffer(req);
1491
1492 /* Put the request back on the free list */
1493 TAILQ_INSERT_TAIL(&rdma_conn->free_queue, rdma_req, link);
1494
1495 /* Try to process queued incoming requests */
1496 rc = process_incoming_queue(rdma_conn);
1497 if (rc < 0) {
1498 error = true;
1499 continue;
1500 }
1501 count += rc;
1502 break;
1503
1504 case IBV_WC_RDMA_WRITE:
1505 rdma_req = get_rdma_req_from_wc(rdma_conn, &wc[i]);
1506 req = &rdma_req->req;
1507
1508 SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA WRITE Complete. Request: %p Connection: %p\n",
1509 req, conn);
1510 spdk_trace_record(TRACE_RDMA_WRITE_COMPLETE, 0, 0, (uint64_t)req, 0);
1511
1512 /* Now that the write has completed, the data buffer can be released */
1513 request_release_buffer(req);
1514
1515 rdma_conn->cur_rdma_rw_depth--;
1516
1517 /* Since an RDMA R/W operation completed, try to submit from the pending list. */
1518 rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn);
1519 if (rc < 0) {
1520 error = true;
1521 continue;
1522 }
1523 count += rc;
1524 break;
1525
1526 case IBV_WC_RDMA_READ:
1527 rdma_req = get_rdma_req_from_wc(rdma_conn, &wc[i]);
1528 req = &rdma_req->req;
1529
1530 SPDK_TRACELOG(SPDK_TRACE_RDMA, "RDMA READ Complete. Request: %p Connection: %p\n",
1531 req, conn);
1532 spdk_trace_record(TRACE_RDMA_READ_COMPLETE, 0, 0, (uint64_t)req, 0);
1533 rc = spdk_nvmf_request_exec(req);
1534 if (rc) {
1535 error = true;
1536 continue;
1537 }
1538 count++;
1539
1540 /* Since an RDMA R/W operation completed, try to submit from the pending list. */
1541 rdma_conn->cur_rdma_rw_depth--;
1542 rc = spdk_nvmf_rdma_handle_pending_rdma_rw(conn);
1543 if (rc < 0) {
1544 error = true;
1545 continue;
1546 }
1547 count += rc;
1548 break;
1549
1550 case IBV_WC_RECV:
1551 rdma_recv = get_rdma_recv_from_wc(rdma_conn, &wc[i]);
1552
1553 rdma_conn->cur_queue_depth++;
1554 if (rdma_conn->cur_queue_depth > rdma_conn->max_queue_depth) {
1555 SPDK_TRACELOG(SPDK_TRACE_RDMA,
1556 "Temporarily exceeded maximum queue depth (%u). Queueing.\n",
1557 rdma_conn->cur_queue_depth);
1558 }
1559 SPDK_TRACELOG(SPDK_TRACE_RDMA,
1560 "RDMA RECV Complete. Recv: %p Connection: %p Outstanding I/O: %d\n",
1561 rdma_recv, conn, rdma_conn->cur_queue_depth);
1562
1563 TAILQ_INSERT_TAIL(&rdma_conn->incoming_queue, rdma_recv, link);
1564 rc = process_incoming_queue(rdma_conn);
1565 if (rc < 0) {
1566 error = true;
1567 continue;
1568 }
1569 count += rc;
1570 break;
1571
1572 default:
1573 SPDK_ERRLOG("Received an unknown opcode on the CQ: %d\n", wc[i].opcode);
1574 error = true;
1575 continue;
1576 }
1577 }
1578
1579 if (error == true) {
1580 return -1;
1581 }
1582
1583 return count;
1584 }
1585
1586 static bool
1587 spdk_nvmf_rdma_conn_is_idle(struct spdk_nvmf_conn *conn)
1588 {
1589 struct spdk_nvmf_rdma_conn *rdma_conn = get_rdma_conn(conn);
1590
1591 if (rdma_conn->cur_queue_depth == 0 && rdma_conn->cur_rdma_rw_depth == 0) {
1592 return true;
1593 }
1594 return false;
1595 }
1596
1597 const struct spdk_nvmf_transport spdk_nvmf_transport_rdma = {
1598 .name = "rdma",
1599 .transport_init = spdk_nvmf_rdma_init,
1600 .transport_fini = spdk_nvmf_rdma_fini,
1601
1602 .acceptor_poll = spdk_nvmf_rdma_acceptor_poll,
1603
1604 .listen_addr_add = spdk_nvmf_rdma_listen,
1605 .listen_addr_remove = spdk_nvmf_rdma_listen_remove,
1606 .listen_addr_discover = spdk_nvmf_rdma_discover,
1607
1608 .session_init = spdk_nvmf_rdma_session_init,
1609 .session_fini = spdk_nvmf_rdma_session_fini,
1610 .session_add_conn = spdk_nvmf_rdma_session_add_conn,
1611 .session_remove_conn = spdk_nvmf_rdma_session_remove_conn,
1612
1613 .req_complete = spdk_nvmf_rdma_request_complete,
1614
1615 .conn_fini = spdk_nvmf_rdma_close_conn,
1616 .conn_poll = spdk_nvmf_rdma_poll,
1617 .conn_is_idle = spdk_nvmf_rdma_conn_is_idle,
1618
1619 };
1620
1621 SPDK_LOG_REGISTER_TRACE_FLAG("rdma", SPDK_TRACE_RDMA)