]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/nvme/nvme_rdma.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / nvme / nvme_rdma.c
index b356e3a16e9945721557b0425d591c2abcdeb138..fd9598b97c52435778feab469628cc9b15d5de8d 100644 (file)
@@ -71,6 +71,8 @@ struct spdk_nvmf_cmd {
        struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
 };
 
+struct spdk_nvme_rdma_hooks g_nvme_hooks = {};
+
 /* Mapping from virtual address to ibv_mr pointer for a protection domain */
 struct spdk_nvme_rdma_mr_map {
        struct ibv_pd                           *pd;
@@ -82,20 +84,24 @@ struct spdk_nvme_rdma_mr_map {
 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
 struct nvme_rdma_ctrlr {
        struct spdk_nvme_ctrlr                  ctrlr;
+
+       struct ibv_pd                           *pd;
 };
 
 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
 struct nvme_rdma_qpair {
        struct spdk_nvme_qpair                  qpair;
 
-       struct rdma_event_channel               *cm_channel;
-
        struct rdma_cm_id                       *cm_id;
 
        struct ibv_cq                           *cq;
 
        struct  spdk_nvme_rdma_req              *rdma_reqs;
 
+       uint32_t                                max_send_sge;
+
+       uint32_t                                max_recv_sge;
+
        uint16_t                                num_entries;
 
        /* Parallel arrays of response buffers + response SGLs of size num_entries */
@@ -120,6 +126,9 @@ struct nvme_rdma_qpair {
 
        TAILQ_HEAD(, spdk_nvme_rdma_req)        free_reqs;
        TAILQ_HEAD(, spdk_nvme_rdma_req)        outstanding_reqs;
+
+       /* Placed at the end of the struct since it is not used frequently */
+       struct rdma_event_channel               *cm_channel;
 };
 
 struct spdk_nvme_rdma_req {
@@ -132,6 +141,8 @@ struct spdk_nvme_rdma_req {
        struct ibv_sge                          send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
 
        TAILQ_ENTRY(spdk_nvme_rdma_req)         link;
+
+       bool                                    request_ready_to_put;
 };
 
 static const char *rdma_cm_event_str[] = {
@@ -189,6 +200,7 @@ nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
 static void
 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
 {
+       rdma_req->request_ready_to_put = false;
        TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
        TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
 }
@@ -197,7 +209,7 @@ static void
 nvme_rdma_req_complete(struct nvme_request *req,
                       struct spdk_nvme_cpl *rsp)
 {
-       nvme_complete_request(req, rsp);
+       nvme_complete_request(req->cb_fn, req->cb_arg, req->qpair, req, rsp);
        nvme_free_request(req);
 }
 
@@ -241,6 +253,14 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
 {
        int                     rc;
        struct ibv_qp_init_attr attr;
+       struct ibv_device_attr  dev_attr;
+       struct nvme_rdma_ctrlr  *rctrlr;
+
+       rc = ibv_query_device(rqpair->cm_id->verbs, &dev_attr);
+       if (rc != 0) {
+               SPDK_ERRLOG("Failed to query RDMA device attributes.\n");
+               return -1;
+       }
 
        rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
        if (!rqpair->cq) {
@@ -248,21 +268,35 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
                return -1;
        }
 
+       rctrlr = nvme_rdma_ctrlr(rqpair->qpair.ctrlr);
+       if (g_nvme_hooks.get_ibv_pd) {
+               rctrlr->pd = g_nvme_hooks.get_ibv_pd(&rctrlr->ctrlr.trid, rqpair->cm_id->verbs);
+       } else {
+               rctrlr->pd = NULL;
+       }
+
        memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
        attr.qp_type            = IBV_QPT_RC;
        attr.send_cq            = rqpair->cq;
        attr.recv_cq            = rqpair->cq;
        attr.cap.max_send_wr    = rqpair->num_entries; /* SEND operations */
        attr.cap.max_recv_wr    = rqpair->num_entries; /* RECV operations */
-       attr.cap.max_send_sge   = NVME_RDMA_DEFAULT_TX_SGE;
-       attr.cap.max_recv_sge   = NVME_RDMA_DEFAULT_RX_SGE;
+       attr.cap.max_send_sge   = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, dev_attr.max_sge);
+       attr.cap.max_recv_sge   = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, dev_attr.max_sge);
+
+       rc = rdma_create_qp(rqpair->cm_id, rctrlr->pd, &attr);
 
-       rc = rdma_create_qp(rqpair->cm_id, NULL, &attr);
        if (rc) {
                SPDK_ERRLOG("rdma_create_qp failed\n");
                return -1;
        }
 
+       /* ibv_create_qp will change the values in attr.cap. Make sure we store the proper value. */
+       rqpair->max_send_sge = spdk_min(NVME_RDMA_DEFAULT_TX_SGE, attr.cap.max_send_sge);
+       rqpair->max_recv_sge = spdk_min(NVME_RDMA_DEFAULT_RX_SGE, attr.cap.max_recv_sge);
+
+       rctrlr->pd = rqpair->cm_id->qp->pd;
+
        rqpair->cm_id->context = &rqpair->qpair;
 
        return 0;
@@ -292,13 +326,17 @@ nvme_rdma_post_recv(struct nvme_rdma_qpair *rqpair, uint16_t rsp_idx)
 }
 
 static void
-nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
+nvme_rdma_unregister_rsps(struct nvme_rdma_qpair *rqpair)
 {
        if (rqpair->rsp_mr && rdma_dereg_mr(rqpair->rsp_mr)) {
                SPDK_ERRLOG("Unable to de-register rsp_mr\n");
        }
        rqpair->rsp_mr = NULL;
+}
 
+static void
+nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
+{
        free(rqpair->rsps);
        rqpair->rsps = NULL;
        free(rqpair->rsp_sgls);
@@ -310,9 +348,6 @@ nvme_rdma_free_rsps(struct nvme_rdma_qpair *rqpair)
 static int
 nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
 {
-       uint16_t i;
-
-       rqpair->rsp_mr = NULL;
        rqpair->rsps = NULL;
        rqpair->rsp_recv_wrs = NULL;
 
@@ -335,6 +370,17 @@ nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
                goto fail;
        }
 
+       return 0;
+fail:
+       nvme_rdma_free_rsps(rqpair);
+       return -ENOMEM;
+}
+
+static int
+nvme_rdma_register_rsps(struct nvme_rdma_qpair *rqpair)
+{
+       int i;
+
        rqpair->rsp_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->rsps,
                                       rqpair->num_entries * sizeof(*rqpair->rsps));
        if (rqpair->rsp_mr == NULL) {
@@ -363,21 +409,25 @@ nvme_rdma_alloc_rsps(struct nvme_rdma_qpair *rqpair)
        return 0;
 
 fail:
-       nvme_rdma_free_rsps(rqpair);
+       nvme_rdma_unregister_rsps(rqpair);
        return -ENOMEM;
 }
 
 static void
-nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
+nvme_rdma_unregister_reqs(struct nvme_rdma_qpair *rqpair)
 {
-       if (!rqpair->rdma_reqs) {
-               return;
-       }
-
        if (rqpair->cmd_mr && rdma_dereg_mr(rqpair->cmd_mr)) {
                SPDK_ERRLOG("Unable to de-register cmd_mr\n");
        }
        rqpair->cmd_mr = NULL;
+}
+
+static void
+nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
+{
+       if (!rqpair->rdma_reqs) {
+               return;
+       }
 
        free(rqpair->cmds);
        rqpair->cmds = NULL;
@@ -389,8 +439,6 @@ nvme_rdma_free_reqs(struct nvme_rdma_qpair *rqpair)
 static int
 nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
 {
-       int i;
-
        rqpair->rdma_reqs = calloc(rqpair->num_entries, sizeof(struct spdk_nvme_rdma_req));
        if (rqpair->rdma_reqs == NULL) {
                SPDK_ERRLOG("Failed to allocate rdma_reqs\n");
@@ -403,6 +451,17 @@ nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
                goto fail;
        }
 
+       return 0;
+fail:
+       nvme_rdma_free_reqs(rqpair);
+       return -ENOMEM;
+}
+
+static int
+nvme_rdma_register_reqs(struct nvme_rdma_qpair *rqpair)
+{
+       int i;
+
        rqpair->cmd_mr = rdma_reg_msgs(rqpair->cm_id, rqpair->cmds,
                                       rqpair->num_entries * sizeof(*rqpair->cmds));
        if (!rqpair->cmd_mr) {
@@ -441,7 +500,7 @@ nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
        return 0;
 
 fail:
-       nvme_rdma_free_reqs(rqpair);
+       nvme_rdma_unregister_reqs(rqpair);
        return -ENOMEM;
 }
 
@@ -460,7 +519,12 @@ nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx)
        req = rdma_req->req;
        nvme_rdma_req_complete(req, rsp);
 
-       nvme_rdma_req_put(rqpair, rdma_req);
+       if (rdma_req->request_ready_to_put) {
+               nvme_rdma_req_put(rqpair, rdma_req);
+       } else {
+               rdma_req->request_ready_to_put = true;
+       }
+
        if (nvme_rdma_post_recv(rqpair, rsp_idx)) {
                SPDK_ERRLOG("Unable to re-post rx descriptor\n");
                return -1;
@@ -617,23 +681,30 @@ nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
 
        switch (action) {
        case SPDK_MEM_MAP_NOTIFY_REGISTER:
-               mr = ibv_reg_mr(pd, vaddr, size,
-                               IBV_ACCESS_LOCAL_WRITE |
-                               IBV_ACCESS_REMOTE_READ |
-                               IBV_ACCESS_REMOTE_WRITE);
-               if (mr == NULL) {
-                       SPDK_ERRLOG("ibv_reg_mr() failed\n");
-                       return -EFAULT;
+               if (!g_nvme_hooks.get_rkey) {
+                       mr = ibv_reg_mr(pd, vaddr, size,
+                                       IBV_ACCESS_LOCAL_WRITE |
+                                       IBV_ACCESS_REMOTE_READ |
+                                       IBV_ACCESS_REMOTE_WRITE);
+                       if (mr == NULL) {
+                               SPDK_ERRLOG("ibv_reg_mr() failed\n");
+                               return -EFAULT;
+                       } else {
+                               rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
+                       }
                } else {
-                       rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
+                       rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size,
+                                                         g_nvme_hooks.get_rkey(pd, vaddr, size));
                }
                break;
        case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
-               mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
-               rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
-               if (mr) {
-                       ibv_dereg_mr(mr);
+               if (!g_nvme_hooks.get_rkey) {
+                       mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
+                       if (mr) {
+                               ibv_dereg_mr(mr);
+                       }
                }
+               rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
                break;
        default:
                SPDK_UNREACHABLE();
@@ -642,6 +713,13 @@ nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
        return rc;
 }
 
+static int
+nvme_rdma_check_contiguous_entries(uint64_t addr_1, uint64_t addr_2)
+{
+       /* Two contiguous mappings will point to the same address which is the start of the RDMA MR. */
+       return addr_1 == addr_2;
+}
+
 static int
 nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
 {
@@ -649,7 +727,7 @@ nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
        struct spdk_nvme_rdma_mr_map *mr_map;
        const struct spdk_mem_map_ops nvme_rdma_map_ops = {
                .notify_cb = nvme_rdma_mr_map_notify,
-               .are_contiguous = NULL
+               .are_contiguous = nvme_rdma_check_contiguous_entries
        };
 
        pthread_mutex_lock(&g_rdma_mr_maps_mutex);
@@ -793,21 +871,21 @@ nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
                return -1;
        }
 
-       rc = nvme_rdma_alloc_reqs(rqpair);
+       rc = nvme_rdma_register_reqs(rqpair);
        SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
        if (rc) {
-               SPDK_ERRLOG("Unable to allocate rqpair  RDMA requests\n");
+               SPDK_ERRLOG("Unable to register rqpair RDMA requests\n");
                return -1;
        }
-       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests registered\n");
 
-       rc = nvme_rdma_alloc_rsps(rqpair);
+       rc = nvme_rdma_register_rsps(rqpair);
        SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
        if (rc < 0) {
-               SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
+               SPDK_ERRLOG("Unable to register rqpair RDMA responses\n");
                return -1;
        }
-       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses registered\n");
 
        rc = nvme_rdma_register_mem(rqpair);
        if (rc < 0) {
@@ -869,11 +947,23 @@ nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
        assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
 
        requested_size = req->payload_size;
-       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
-                       (uint64_t)payload, &requested_size);
 
-       if (mr == NULL || requested_size < req->payload_size) {
-               return -EINVAL;
+       if (!g_nvme_hooks.get_rkey) {
+               mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
+                               (uint64_t)payload, &requested_size);
+
+               if (mr == NULL || requested_size < req->payload_size) {
+                       if (mr) {
+                               SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
+                       }
+                       return -EINVAL;
+               }
+               rdma_req->send_sgl[1].lkey = mr->lkey;
+       } else {
+               rdma_req->send_sgl[1].lkey = spdk_mem_map_translate(rqpair->mr_map->map,
+                                            (uint64_t)payload,
+                                            &requested_size);
+
        }
 
        /* The first element of this SGL is pointing at an
@@ -884,7 +974,6 @@ nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
 
        rdma_req->send_sgl[1].addr = (uint64_t)payload;
        rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
-       rdma_req->send_sgl[1].lkey = mr->lkey;
 
        /* The RDMA SGL contains two elements. The first describes
         * the NVMe command and the second describes the data
@@ -918,9 +1007,22 @@ nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
        assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
 
        requested_size = req->payload_size;
-       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
-                       &requested_size);
-       if (mr == NULL || requested_size < req->payload_size) {
+       if (!g_nvme_hooks.get_rkey) {
+
+               mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
+                               &requested_size);
+               if (mr == NULL) {
+                       return -1;
+               }
+               req->cmd.dptr.sgl1.keyed.key = mr->rkey;
+       } else {
+               req->cmd.dptr.sgl1.keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
+                                              (uint64_t)payload,
+                                              &requested_size);
+       }
+
+       if (requested_size < req->payload_size) {
+               SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
                return -1;
        }
 
@@ -937,7 +1039,6 @@ nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
        req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
        req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
        req->cmd.dptr.sgl1.keyed.length = req->payload_size;
-       req->cmd.dptr.sgl1.keyed.key = mr->rkey;
        req->cmd.dptr.sgl1.address = (uint64_t)payload;
 
        return 0;
@@ -977,17 +1078,28 @@ nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
                sge_length = spdk_min(remaining_size, sge_length);
                mr_length = sge_length;
 
-               mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
-                               &mr_length);
+               if (!g_nvme_hooks.get_rkey) {
+                       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
+                                       (uint64_t)virt_addr,
+                                       &mr_length);
+                       if (mr == NULL) {
+                               return -1;
+                       }
+                       cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
+               } else {
+                       cmd->sgl[num_sgl_desc].keyed.key = spdk_mem_map_translate(rqpair->mr_map->map,
+                                                          (uint64_t)virt_addr,
+                                                          &mr_length);
+               }
 
-               if (mr == NULL || mr_length < sge_length) {
+               if (mr_length < sge_length) {
+                       SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
                        return -1;
                }
 
                cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
                cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
                cmd->sgl[num_sgl_desc].keyed.length = sge_length;
-               cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
                cmd->sgl[num_sgl_desc].address = (uint64_t)virt_addr;
 
                remaining_size -= sge_length;
@@ -1017,11 +1129,11 @@ nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
                 * the NVMe command. */
                rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
 
-               req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
-               req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
-               req->cmd.dptr.sgl1.keyed.length = req->payload_size;
-               req->cmd.dptr.sgl1.keyed.key = mr->rkey;
-               req->cmd.dptr.sgl1.address = rqpair->cmds[rdma_req->id].sgl[0].address;
+               req->cmd.dptr.sgl1.keyed.type = cmd->sgl[0].keyed.type;
+               req->cmd.dptr.sgl1.keyed.subtype = cmd->sgl[0].keyed.subtype;
+               req->cmd.dptr.sgl1.keyed.length = cmd->sgl[0].keyed.length;
+               req->cmd.dptr.sgl1.keyed.key = cmd->sgl[0].keyed.key;
+               req->cmd.dptr.sgl1.address = cmd->sgl[0].address;
        } else {
                /*
                 * Otherwise, The SGL descriptor embedded in the command must point to the list of
@@ -1051,7 +1163,7 @@ nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
        uint32_t length;
        uint64_t requested_size;
        void *virt_addr;
-       int rc;
+       int rc, i;
 
        assert(req->payload_size != 0);
        assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
@@ -1059,39 +1171,48 @@ nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
        assert(req->payload.next_sge_fn != NULL);
        req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
 
-       /* TODO: for now, we only support a single SGL entry */
        rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &length);
        if (rc) {
                return -1;
        }
 
        if (length < req->payload_size) {
-               SPDK_ERRLOG("multi-element SGL currently not supported for RDMA\n");
-               return -1;
+               SPDK_DEBUGLOG(SPDK_LOG_NVME, "Inline SGL request split so sending separately.\n");
+               return nvme_rdma_build_sgl_request(rqpair, rdma_req);
        }
 
-       requested_size = req->payload_size;
+       if (length > req->payload_size) {
+               length = req->payload_size;
+       }
+
+       requested_size = length;
        mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
                        &requested_size);
-       if (mr == NULL || requested_size < req->payload_size) {
+       if (mr == NULL || requested_size < length) {
+               for (i = 1; i < rdma_req->send_wr.num_sge; i++) {
+                       rdma_req->send_sgl[i].addr = 0;
+                       rdma_req->send_sgl[i].length = 0;
+                       rdma_req->send_sgl[i].lkey = 0;
+               }
+
+               if (mr) {
+                       SPDK_ERRLOG("Data buffer split over multiple RDMA Memory Regions\n");
+               }
                return -1;
        }
 
+       rdma_req->send_sgl[1].addr = (uint64_t)virt_addr;
+       rdma_req->send_sgl[1].length = length;
+       rdma_req->send_sgl[1].lkey = mr->lkey;
+
+       rdma_req->send_wr.num_sge = 2;
+
        /* The first element of this SGL is pointing at an
         * spdk_nvmf_cmd object. For this particular command,
         * we only need the first 64 bytes corresponding to
         * the NVMe command. */
        rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
 
-       rdma_req->send_sgl[1].addr = (uint64_t)virt_addr;
-       rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
-       rdma_req->send_sgl[1].lkey = mr->lkey;
-
-       /* The RDMA SGL contains two elements. The first describes
-        * the NVMe command and the second describes the data
-        * payload. */
-       rdma_req->send_wr.num_sge = 2;
-
        req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
        req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
        req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
@@ -1180,6 +1301,22 @@ nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
                return NULL;
        }
 
+       rc = nvme_rdma_alloc_reqs(rqpair);
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
+       if (rc) {
+               SPDK_ERRLOG("Unable to allocate rqpair RDMA requests\n");
+               return NULL;
+       }
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
+
+       rc = nvme_rdma_alloc_rsps(rqpair);
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
+       if (rc < 0) {
+               SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
+               return NULL;
+       }
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
+
        rc = nvme_rdma_qpair_connect(rqpair);
        if (rc < 0) {
                nvme_rdma_qpair_destroy(qpair);
@@ -1189,21 +1326,14 @@ nvme_rdma_ctrlr_create_qpair(struct spdk_nvme_ctrlr *ctrlr,
        return qpair;
 }
 
-static int
-nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
+static void
+nvme_rdma_qpair_disconnect(struct spdk_nvme_qpair *qpair)
 {
-       struct nvme_rdma_qpair *rqpair;
-
-       if (!qpair) {
-               return -1;
-       }
-       nvme_qpair_deinit(qpair);
-
-       rqpair = nvme_rdma_qpair(qpair);
+       struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
 
        nvme_rdma_unregister_mem(rqpair);
-       nvme_rdma_free_reqs(rqpair);
-       nvme_rdma_free_rsps(rqpair);
+       nvme_rdma_unregister_reqs(rqpair);
+       nvme_rdma_unregister_rsps(rqpair);
 
        if (rqpair->cm_id) {
                if (rqpair->cm_id->qp) {
@@ -1219,7 +1349,24 @@ nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
        if (rqpair->cm_channel) {
                rdma_destroy_event_channel(rqpair->cm_channel);
        }
+}
 
+static int
+nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
+{
+       struct nvme_rdma_qpair *rqpair;
+
+       if (!qpair) {
+               return -1;
+       }
+       nvme_rdma_qpair_disconnect(qpair);
+       nvme_rdma_qpair_abort_reqs(qpair, 1);
+       nvme_qpair_deinit(qpair);
+
+       rqpair = nvme_rdma_qpair(qpair);
+
+       nvme_rdma_free_reqs(rqpair);
+       nvme_rdma_free_rsps(rqpair);
        free(rqpair);
 
        return 0;
@@ -1242,10 +1389,7 @@ nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
 
 /* This function must only be called while holding g_spdk_nvme_driver->lock */
 int
-nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
-                    void *cb_ctx,
-                    spdk_nvme_probe_cb probe_cb,
-                    spdk_nvme_remove_cb remove_cb,
+nvme_rdma_ctrlr_scan(struct spdk_nvme_probe_ctx *probe_ctx,
                     bool direct_connect)
 {
        struct spdk_nvme_ctrlr_opts discovery_opts;
@@ -1254,9 +1398,9 @@ nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
        int rc;
        struct nvme_completion_poll_status status;
 
-       if (strcmp(discovery_trid->subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
+       if (strcmp(probe_ctx->trid.subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
                /* It is not a discovery_ctrlr info and try to directly connect it */
-               rc = nvme_ctrlr_probe(discovery_trid, NULL, probe_cb, cb_ctx);
+               rc = nvme_ctrlr_probe(&probe_ctx->trid, probe_ctx, NULL);
                return rc;
        }
 
@@ -1264,7 +1408,7 @@ nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
        /* For discovery_ctrlr set the timeout to 0 */
        discovery_opts.keep_alive_timeout_ms = 0;
 
-       discovery_ctrlr = nvme_rdma_ctrlr_construct(discovery_trid, &discovery_opts, NULL);
+       discovery_ctrlr = nvme_rdma_ctrlr_construct(&probe_ctx->trid, &discovery_opts, NULL);
        if (discovery_ctrlr == NULL) {
                return -1;
        }
@@ -1282,30 +1426,30 @@ nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
                return -1;
        }
 
-       /* get the cdata info */
-       rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
-                                    &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
-                                    nvme_completion_poll_cb, &status);
-       if (rc != 0) {
-               SPDK_ERRLOG("Failed to identify cdata\n");
-               return rc;
-       }
-
-       if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
-               SPDK_ERRLOG("nvme_identify_controller failed!\n");
-               return -ENXIO;
-       }
-
        /* Direct attach through spdk_nvme_connect() API */
        if (direct_connect == true) {
+               /* get the cdata info */
+               rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
+                                            &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
+                                            nvme_completion_poll_cb, &status);
+               if (rc != 0) {
+                       SPDK_ERRLOG("Failed to identify cdata\n");
+                       return rc;
+               }
+
+               if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
+                       SPDK_ERRLOG("nvme_identify_controller failed!\n");
+                       return -ENXIO;
+               }
+
                /* Set the ready state to skip the normal init process */
                discovery_ctrlr->state = NVME_CTRLR_STATE_READY;
-               nvme_ctrlr_connected(discovery_ctrlr);
+               nvme_ctrlr_connected(probe_ctx, discovery_ctrlr);
                nvme_ctrlr_add_process(discovery_ctrlr, 0);
                return 0;
        }
 
-       rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, cb_ctx, probe_cb);
+       rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, probe_ctx);
        nvme_ctrlr_destruct(discovery_ctrlr);
        return rc;
 }
@@ -1423,7 +1567,8 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
        rdma_req = nvme_rdma_req_get(rqpair);
        if (!rdma_req) {
                /*
-                * No rdma_req is available.  Queue the request to be processed later.
+                * No rdma_req is available, so queue the request to be
+                *  processed later.
                 */
                STAILQ_INSERT_TAIL(&qpair->queued_req, req, stailq);
                return 0;
@@ -1435,13 +1580,6 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
                return -1;
        }
 
-       req->timed_out = false;
-       if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
-               req->submit_tick = spdk_get_ticks();
-       } else {
-               req->submit_tick = 0;
-       }
-
        wr = &rdma_req->send_wr;
 
        nvme_rdma_trace_ibv_sge(wr->sg_list);
@@ -1461,23 +1599,15 @@ nvme_rdma_ctrlr_delete_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_
 }
 
 int
-nvme_rdma_ctrlr_reinit_io_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
+nvme_rdma_ctrlr_connect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
 {
        return nvme_rdma_qpair_connect(nvme_rdma_qpair(qpair));
 }
 
-int
-nvme_rdma_qpair_enable(struct spdk_nvme_qpair *qpair)
+void
+nvme_rdma_ctrlr_disconnect_qpair(struct spdk_nvme_ctrlr *ctrlr, struct spdk_nvme_qpair *qpair)
 {
-       /* Currently, doing nothing here */
-       return 0;
-}
-
-int
-nvme_rdma_qpair_disable(struct spdk_nvme_qpair *qpair)
-{
-       /* Currently, doing nothing here */
-       return 0;
+       nvme_rdma_qpair_disconnect(qpair);
 }
 
 int
@@ -1487,11 +1617,25 @@ nvme_rdma_qpair_reset(struct spdk_nvme_qpair *qpair)
        return 0;
 }
 
-int
-nvme_rdma_qpair_fail(struct spdk_nvme_qpair *qpair)
+void
+nvme_rdma_qpair_abort_reqs(struct spdk_nvme_qpair *qpair, uint32_t dnr)
 {
-       /* Currently, doing nothing here */
-       return 0;
+       struct spdk_nvme_rdma_req *rdma_req, *tmp;
+       struct nvme_request *req;
+       struct spdk_nvme_cpl cpl;
+       struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
+
+       cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
+       cpl.status.sct = SPDK_NVME_SCT_GENERIC;
+       cpl.status.dnr = dnr;
+
+       TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
+               assert(rdma_req->req != NULL);
+               req = rdma_req->req;
+
+               nvme_rdma_req_complete(req, &cpl);
+               nvme_rdma_req_put(rqpair, rdma_req);
+       }
 }
 
 static void
@@ -1539,11 +1683,12 @@ int
 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
                                    uint32_t max_completions)
 {
-       struct nvme_rdma_qpair  *rqpair = nvme_rdma_qpair(qpair);
-       struct ibv_wc           wc[MAX_COMPLETIONS_PER_POLL];
-       int                     i, rc, batch_size;
-       uint32_t                reaped;
-       struct ibv_cq           *cq;
+       struct nvme_rdma_qpair          *rqpair = nvme_rdma_qpair(qpair);
+       struct ibv_wc                   wc[MAX_COMPLETIONS_PER_POLL];
+       int                             i, rc, batch_size;
+       uint32_t                        reaped;
+       struct ibv_cq                   *cq;
+       struct spdk_nvme_rdma_req       *rdma_req;
 
        if (max_completions == 0) {
                max_completions = rqpair->num_entries;
@@ -1592,6 +1737,13 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
                                break;
 
                        case IBV_WC_SEND:
+                               rdma_req = (struct spdk_nvme_rdma_req *)wc[i].wr_id;
+
+                               if (rdma_req->request_ready_to_put) {
+                                       nvme_rdma_req_put(rqpair, rdma_req);
+                               } else {
+                                       rdma_req->request_ready_to_put = true;
+                               }
                                break;
 
                        default:
@@ -1621,6 +1773,12 @@ nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
        return spdk_min(ctrlr->cdata.nvmf_specific.msdbd, NVME_RDMA_MAX_SGL_DESCRIPTORS);
 }
 
+volatile struct spdk_nvme_registers *
+nvme_rdma_ctrlr_get_registers(struct spdk_nvme_ctrlr *ctrlr)
+{
+       return NULL;
+}
+
 void *
 nvme_rdma_ctrlr_alloc_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, size_t size)
 {
@@ -1632,3 +1790,32 @@ nvme_rdma_ctrlr_free_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, void *buf, siz
 {
        return 0;
 }
+
+void
+nvme_rdma_admin_qpair_abort_aers(struct spdk_nvme_qpair *qpair)
+{
+       struct spdk_nvme_rdma_req *rdma_req, *tmp;
+       struct nvme_request *req;
+       struct spdk_nvme_cpl cpl;
+       struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
+
+       cpl.status.sc = SPDK_NVME_SC_ABORTED_SQ_DELETION;
+       cpl.status.sct = SPDK_NVME_SCT_GENERIC;
+
+       TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
+               if (rdma_req->req->cmd.opc != SPDK_NVME_OPC_ASYNC_EVENT_REQUEST) {
+                       continue;
+               }
+               assert(rdma_req->req != NULL);
+               req = rdma_req->req;
+
+               nvme_rdma_req_complete(req, &cpl);
+               nvme_rdma_req_put(rqpair, rdma_req);
+       }
+}
+
+void
+spdk_nvme_rdma_init_hooks(struct spdk_nvme_rdma_hooks *hooks)
+{
+       g_nvme_hooks = *hooks;
+}