]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/spdk/lib/nvme/nvme_rdma.c
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / spdk / lib / nvme / nvme_rdma.c
index 1fefd5fd95f55849b03b7032a5348d8c6f0edce3..b356e3a16e9945721557b0425d591c2abcdeb138 100644 (file)
  * NVMe over RDMA transport
  */
 
-#include <arpa/inet.h>
-#include <fcntl.h>
+#include "spdk/stdinc.h"
+
 #include <infiniband/verbs.h>
 #include <rdma/rdma_cma.h>
 #include <rdma/rdma_verbs.h>
-#include <unistd.h>
-#include <stdint.h>
-#include <sys/types.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <netdb.h>
 
 #include "spdk/assert.h"
 #include "spdk/log.h"
 #include "spdk/nvme.h"
 #include "spdk/nvmf_spec.h"
 #include "spdk/string.h"
+#include "spdk/endian.h"
+#include "spdk/likely.h"
 
 #include "nvme_internal.h"
 
 #define NVME_RDMA_TIME_OUT_IN_MS 2000
 #define NVME_RDMA_RW_BUFFER_SIZE 131072
-#define NVME_HOST_ID_DEFAULT "12345679890"
-
-#define NVME_HOST_MAX_ENTRIES_PER_QUEUE (128)
 
 /*
-NVME RDMA qpair Resouce Defaults
+ * NVME RDMA qpair Resource Defaults
  */
 #define NVME_RDMA_DEFAULT_TX_SGE               2
 #define NVME_RDMA_DEFAULT_RX_SGE               1
 
+
+/* Max number of NVMe-oF SGL descriptors supported by the host */
+#define NVME_RDMA_MAX_SGL_DESCRIPTORS          16
+struct spdk_nvmf_cmd {
+       struct spdk_nvme_cmd cmd;
+       struct spdk_nvme_sgl_descriptor sgl[NVME_RDMA_MAX_SGL_DESCRIPTORS];
+};
+
+/* Mapping from virtual address to ibv_mr pointer for a protection domain */
+struct spdk_nvme_rdma_mr_map {
+       struct ibv_pd                           *pd;
+       struct spdk_mem_map                     *map;
+       uint64_t                                ref;
+       LIST_ENTRY(spdk_nvme_rdma_mr_map)       link;
+};
+
 /* NVMe RDMA transport extensions for spdk_nvme_ctrlr */
 struct nvme_rdma_ctrlr {
        struct spdk_nvme_ctrlr                  ctrlr;
-
-       uint16_t                                cntlid;
 };
 
 /* NVMe RDMA qpair extensions for spdk_nvme_qpair */
@@ -104,15 +111,15 @@ struct nvme_rdma_qpair {
         * Array of num_entries NVMe commands registered as RDMA message buffers.
         * Indexed by rdma_req->id.
         */
-       struct spdk_nvme_cmd                    *cmds;
+       struct spdk_nvmf_cmd                    *cmds;
 
        /* Memory region describing all cmds for this qpair */
        struct ibv_mr                           *cmd_mr;
 
-       /* Mapping from virtual address to ibv_mr pointer */
-       struct spdk_mem_map                     *mr_map;
+       struct spdk_nvme_rdma_mr_map            *mr_map;
 
-       STAILQ_HEAD(, spdk_nvme_rdma_req)       free_reqs;
+       TAILQ_HEAD(, spdk_nvme_rdma_req)        free_reqs;
+       TAILQ_HEAD(, spdk_nvme_rdma_req)        outstanding_reqs;
 };
 
 struct spdk_nvme_rdma_req {
@@ -120,27 +127,49 @@ struct spdk_nvme_rdma_req {
 
        struct ibv_send_wr                      send_wr;
 
-       struct nvme_request                     *req;
+       struct nvme_request                     *req;
+
+       struct ibv_sge                          send_sgl[NVME_RDMA_DEFAULT_TX_SGE];
 
-       struct ibv_sge                          send_sgl;
+       TAILQ_ENTRY(spdk_nvme_rdma_req)         link;
+};
 
-       STAILQ_ENTRY(spdk_nvme_rdma_req)        link;
+static const char *rdma_cm_event_str[] = {
+       "RDMA_CM_EVENT_ADDR_RESOLVED",
+       "RDMA_CM_EVENT_ADDR_ERROR",
+       "RDMA_CM_EVENT_ROUTE_RESOLVED",
+       "RDMA_CM_EVENT_ROUTE_ERROR",
+       "RDMA_CM_EVENT_CONNECT_REQUEST",
+       "RDMA_CM_EVENT_CONNECT_RESPONSE",
+       "RDMA_CM_EVENT_CONNECT_ERROR",
+       "RDMA_CM_EVENT_UNREACHABLE",
+       "RDMA_CM_EVENT_REJECTED",
+       "RDMA_CM_EVENT_ESTABLISHED",
+       "RDMA_CM_EVENT_DISCONNECTED",
+       "RDMA_CM_EVENT_DEVICE_REMOVAL",
+       "RDMA_CM_EVENT_MULTICAST_JOIN",
+       "RDMA_CM_EVENT_MULTICAST_ERROR",
+       "RDMA_CM_EVENT_ADDR_CHANGE",
+       "RDMA_CM_EVENT_TIMEWAIT_EXIT"
 };
 
+static LIST_HEAD(, spdk_nvme_rdma_mr_map) g_rdma_mr_maps = LIST_HEAD_INITIALIZER(&g_rdma_mr_maps);
+static pthread_mutex_t g_rdma_mr_maps_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 static int nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair);
 
 static inline struct nvme_rdma_qpair *
 nvme_rdma_qpair(struct spdk_nvme_qpair *qpair)
 {
        assert(qpair->trtype == SPDK_NVME_TRANSPORT_RDMA);
-       return (struct nvme_rdma_qpair *)((uintptr_t)qpair - offsetof(struct nvme_rdma_qpair, qpair));
+       return SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair);
 }
 
 static inline struct nvme_rdma_ctrlr *
 nvme_rdma_ctrlr(struct spdk_nvme_ctrlr *ctrlr)
 {
        assert(ctrlr->trid.trtype == SPDK_NVME_TRANSPORT_RDMA);
-       return (struct nvme_rdma_ctrlr *)((uintptr_t)ctrlr - offsetof(struct nvme_rdma_ctrlr, ctrlr));
+       return SPDK_CONTAINEROF(ctrlr, struct nvme_rdma_ctrlr, ctrlr);
 }
 
 static struct spdk_nvme_rdma_req *
@@ -148,9 +177,10 @@ nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
 {
        struct spdk_nvme_rdma_req *rdma_req;
 
-       rdma_req = STAILQ_FIRST(&rqpair->free_reqs);
+       rdma_req = TAILQ_FIRST(&rqpair->free_reqs);
        if (rdma_req) {
-               STAILQ_REMOVE_HEAD(&rqpair->free_reqs, link);
+               TAILQ_REMOVE(&rqpair->free_reqs, rdma_req, link);
+               TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link);
        }
 
        return rdma_req;
@@ -159,17 +189,28 @@ nvme_rdma_req_get(struct nvme_rdma_qpair *rqpair)
 static void
 nvme_rdma_req_put(struct nvme_rdma_qpair *rqpair, struct spdk_nvme_rdma_req *rdma_req)
 {
-       STAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
+       TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
+       TAILQ_INSERT_HEAD(&rqpair->free_reqs, rdma_req, link);
 }
 
 static void
 nvme_rdma_req_complete(struct nvme_request *req,
                       struct spdk_nvme_cpl *rsp)
 {
-       req->cb_fn(req->cb_arg, rsp);
+       nvme_complete_request(req, rsp);
        nvme_free_request(req);
 }
 
+static const char *
+nvme_rdma_cm_event_str_get(uint32_t event)
+{
+       if (event < SPDK_COUNTOF(rdma_cm_event_str)) {
+               return rdma_cm_event_str[event];
+       } else {
+               return "Undefined";
+       }
+}
+
 static struct rdma_cm_event *
 nvme_rdma_get_event(struct rdma_event_channel *channel,
                    enum rdma_cm_event_type evt)
@@ -180,13 +221,14 @@ nvme_rdma_get_event(struct rdma_event_channel *channel,
        rc = rdma_get_cm_event(channel, &event);
        if (rc < 0) {
                SPDK_ERRLOG("Failed to get event from CM event channel. Error %d (%s)\n",
-                           errno, strerror(errno));
+                           errno, spdk_strerror(errno));
                return NULL;
        }
 
        if (event->event != evt) {
-               SPDK_ERRLOG("Received event %d from CM event channel, but expected event %d\n",
-                           event->event, evt);
+               SPDK_ERRLOG("Expected %s but received %s (%d) from CM event channel (status = %d)\n",
+                           nvme_rdma_cm_event_str_get(evt),
+                           nvme_rdma_cm_event_str_get(event->event), event->event, event->status);
                rdma_ack_cm_event(event);
                return NULL;
        }
@@ -202,8 +244,7 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
 
        rqpair->cq = ibv_create_cq(rqpair->cm_id->verbs, rqpair->num_entries * 2, rqpair, NULL, 0);
        if (!rqpair->cq) {
-               SPDK_ERRLOG("Unable to create completion queue\n");
-               SPDK_ERRLOG("Errno %d: %s\n", errno, strerror(errno));
+               SPDK_ERRLOG("Unable to create completion queue: errno %d: %s\n", errno, spdk_strerror(errno));
                return -1;
        }
 
@@ -229,7 +270,7 @@ nvme_rdma_qpair_init(struct nvme_rdma_qpair *rqpair)
 
 #define nvme_rdma_trace_ibv_sge(sg_list) \
        if (sg_list) { \
-               SPDK_TRACELOG(SPDK_TRACE_DEBUG, "local addr %p length 0x%x lkey 0x%x\n", \
+               SPDK_DEBUGLOG(SPDK_LOG_NVME, "local addr %p length 0x%x lkey 0x%x\n", \
                              (void *)(sg_list)->addr, (sg_list)->length, (sg_list)->lkey); \
        }
 
@@ -369,29 +410,32 @@ nvme_rdma_alloc_reqs(struct nvme_rdma_qpair *rqpair)
                goto fail;
        }
 
-       STAILQ_INIT(&rqpair->free_reqs);
+       TAILQ_INIT(&rqpair->free_reqs);
+       TAILQ_INIT(&rqpair->outstanding_reqs);
        for (i = 0; i < rqpair->num_entries; i++) {
                struct spdk_nvme_rdma_req       *rdma_req;
-               struct spdk_nvme_cmd            *cmd;
+               struct spdk_nvmf_cmd            *cmd;
 
                rdma_req = &rqpair->rdma_reqs[i];
                cmd = &rqpair->cmds[i];
 
                rdma_req->id = i;
 
-               rdma_req->send_sgl.addr = (uint64_t)cmd;
-               rdma_req->send_sgl.length = sizeof(*cmd);
-               rdma_req->send_sgl.lkey = rqpair->cmd_mr->lkey;
+               /* The first RDMA sgl element will always point
+                * at this data structure. Depending on whether
+                * an NVMe-oF SGL is required, the length of
+                * this element may change. */
+               rdma_req->send_sgl[0].addr = (uint64_t)cmd;
+               rdma_req->send_sgl[0].lkey = rqpair->cmd_mr->lkey;
 
                rdma_req->send_wr.wr_id = (uint64_t)rdma_req;
                rdma_req->send_wr.next = NULL;
                rdma_req->send_wr.opcode = IBV_WR_SEND;
                rdma_req->send_wr.send_flags = IBV_SEND_SIGNALED;
-               rdma_req->send_wr.sg_list = &rdma_req->send_sgl;
-               rdma_req->send_wr.num_sge = 1;
+               rdma_req->send_wr.sg_list = rdma_req->send_sgl;
                rdma_req->send_wr.imm_data = 0;
 
-               STAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
+               TAILQ_INSERT_TAIL(&rqpair->free_reqs, rdma_req, link);
        }
 
        return 0;
@@ -433,13 +477,14 @@ nvme_rdma_recv(struct nvme_rdma_qpair *rqpair, uint64_t rsp_idx)
 
 static int
 nvme_rdma_resolve_addr(struct nvme_rdma_qpair *rqpair,
-                      struct sockaddr_storage *sin,
+                      struct sockaddr *src_addr,
+                      struct sockaddr *dst_addr,
                       struct rdma_event_channel *cm_channel)
 {
        int ret;
        struct rdma_cm_event *event;
 
-       ret = rdma_resolve_addr(rqpair->cm_id, NULL, (struct sockaddr *) sin,
+       ret = rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr,
                                NVME_RDMA_TIME_OUT_IN_MS);
        if (ret) {
                SPDK_ERRLOG("rdma_resolve_addr, %d\n", errno);
@@ -473,13 +518,12 @@ static int
 nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
 {
        struct rdma_conn_param                          param = {};
-       struct spdk_nvmf_rdma_request_private_data      request_data = {};
+       struct spdk_nvmf_rdma_request_private_data      request_data = {};
        struct spdk_nvmf_rdma_accept_private_data       *accept_data;
-       struct ibv_device_attr                          attr;
-       int                                             ret;
-       struct rdma_cm_event                            *event;
-       struct spdk_nvme_ctrlr                          *ctrlr;
-       struct nvme_rdma_ctrlr                          *rctrlr;
+       struct ibv_device_attr                          attr;
+       int                                             ret;
+       struct rdma_cm_event                            *event;
+       struct spdk_nvme_ctrlr                          *ctrlr;
 
        ret = ibv_query_device(rqpair->cm_id->verbs, &attr);
        if (ret != 0) {
@@ -494,15 +538,15 @@ nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
                return -1;
        }
 
-       rctrlr = nvme_rdma_ctrlr(ctrlr);
-
        request_data.qid = rqpair->qpair.id;
        request_data.hrqsize = rqpair->num_entries;
        request_data.hsqsize = rqpair->num_entries - 1;
-       request_data.cntlid = rctrlr->cntlid;
+       request_data.cntlid = ctrlr->cntlid;
 
        param.private_data = &request_data;
        param.private_data_len = sizeof(request_data);
+       param.retry_count = 7;
+       param.rnr_retry_count = 7;
 
        ret = rdma_connect(rqpair->cm_id, &param);
        if (ret) {
@@ -523,7 +567,7 @@ nvme_rdma_connect(struct nvme_rdma_qpair *rqpair)
                return -1;
        }
 
-       SPDK_TRACELOG(SPDK_TRACE_NVME, "Requested queue depth %d. Actually got queue depth %d.\n",
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "Requested queue depth %d. Actually got queue depth %d.\n",
                      rqpair->num_entries, accept_data->crqsize);
 
        rqpair->num_entries = spdk_min(rqpair->num_entries, accept_data->crqsize);
@@ -547,7 +591,7 @@ nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr,
 
        ret = getaddrinfo(addr, service, &hints, &res);
        if (ret) {
-               SPDK_ERRLOG("getaddrinfo failed - invalid hostname or IP address\n");
+               SPDK_ERRLOG("getaddrinfo failed: %s (%d)\n", gai_strerror(ret), ret);
                return ret;
        }
 
@@ -563,83 +607,13 @@ nvme_rdma_parse_addr(struct sockaddr_storage *sa, int family, const char *addr,
 }
 
 static int
-nvme_rdma_qpair_fabric_connect(struct nvme_rdma_qpair *rqpair)
-{
-       struct nvme_completion_poll_status status;
-       struct spdk_nvmf_fabric_connect_rsp *rsp;
-       struct spdk_nvmf_fabric_connect_cmd cmd;
-       struct spdk_nvmf_fabric_connect_data *nvmf_data;
-       struct spdk_nvme_ctrlr *ctrlr;
-       struct nvme_rdma_ctrlr *rctrlr;
-       int rc = 0;
-
-       ctrlr = rqpair->qpair.ctrlr;
-       if (!ctrlr) {
-               return -1;
-       }
-
-       rctrlr = nvme_rdma_ctrlr(ctrlr);
-
-       nvmf_data = spdk_zmalloc(sizeof(*nvmf_data), 0, NULL);
-       if (!nvmf_data) {
-               SPDK_ERRLOG("nvmf_data allocation error\n");
-               rc = -1;
-               return rc;
-       }
-
-       memset(&cmd, 0, sizeof(cmd));
-       memset(&status, 0, sizeof(struct nvme_completion_poll_status));
-
-       cmd.opcode = SPDK_NVME_OPC_FABRIC;
-       cmd.fctype = SPDK_NVMF_FABRIC_COMMAND_CONNECT;
-       cmd.qid = rqpair->qpair.id;
-       cmd.sqsize = rqpair->num_entries - 1;
-       cmd.kato = ctrlr->opts.keep_alive_timeout_ms;
-
-       if (nvme_qpair_is_admin_queue(&rqpair->qpair)) {
-               nvmf_data->cntlid = 0xFFFF;
-       } else {
-               nvmf_data->cntlid = rctrlr->cntlid;
-       }
-
-       strncpy((char *)&nvmf_data->hostid, (char *)NVME_HOST_ID_DEFAULT,
-               strlen((char *)NVME_HOST_ID_DEFAULT));
-       strncpy((char *)nvmf_data->hostnqn, ctrlr->opts.hostnqn, sizeof(nvmf_data->hostnqn));
-       strncpy((char *)nvmf_data->subnqn, ctrlr->trid.subnqn, sizeof(nvmf_data->subnqn));
-
-       rc = spdk_nvme_ctrlr_cmd_io_raw(ctrlr, &rqpair->qpair,
-                                       (struct spdk_nvme_cmd *)&cmd,
-                                       nvmf_data, sizeof(*nvmf_data),
-                                       nvme_completion_poll_cb, &status);
-       if (rc < 0) {
-               SPDK_ERRLOG("spdk_nvme_rdma_req_fabric_connect failed\n");
-               rc = -1;
-               goto ret;
-       }
-
-       while (status.done == false) {
-               spdk_nvme_qpair_process_completions(&rqpair->qpair, 0);
-       }
-
-       if (spdk_nvme_cpl_is_error(&status.cpl)) {
-               SPDK_ERRLOG("Connect command failed\n");
-               return -1;
-       }
-
-       rsp = (struct spdk_nvmf_fabric_connect_rsp *)&status.cpl;
-       rctrlr->cntlid = rsp->status_code_specific.success.cntlid;
-ret:
-       spdk_free(nvmf_data);
-       return rc;
-}
-
-static void
 nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
                        enum spdk_mem_map_notify_action action,
                        void *vaddr, size_t size)
 {
        struct ibv_pd *pd = cb_ctx;
        struct ibv_mr *mr;
+       int rc;
 
        switch (action) {
        case SPDK_MEM_MAP_NOTIFY_REGISTER:
@@ -649,36 +623,68 @@ nvme_rdma_mr_map_notify(void *cb_ctx, struct spdk_mem_map *map,
                                IBV_ACCESS_REMOTE_WRITE);
                if (mr == NULL) {
                        SPDK_ERRLOG("ibv_reg_mr() failed\n");
+                       return -EFAULT;
                } else {
-                       spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
+                       rc = spdk_mem_map_set_translation(map, (uint64_t)vaddr, size, (uint64_t)mr);
                }
                break;
        case SPDK_MEM_MAP_NOTIFY_UNREGISTER:
-               mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr);
-               spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
+               mr = (struct ibv_mr *)spdk_mem_map_translate(map, (uint64_t)vaddr, NULL);
+               rc = spdk_mem_map_clear_translation(map, (uint64_t)vaddr, size);
                if (mr) {
                        ibv_dereg_mr(mr);
                }
                break;
+       default:
+               SPDK_UNREACHABLE();
        }
-}
 
+       return rc;
+}
 
 static int
 nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
 {
        struct ibv_pd *pd = rqpair->cm_id->qp->pd;
-       struct spdk_mem_map *mr_map;
-
-       // TODO: look up existing mem map registration for this pd
+       struct spdk_nvme_rdma_mr_map *mr_map;
+       const struct spdk_mem_map_ops nvme_rdma_map_ops = {
+               .notify_cb = nvme_rdma_mr_map_notify,
+               .are_contiguous = NULL
+       };
+
+       pthread_mutex_lock(&g_rdma_mr_maps_mutex);
+
+       /* Look up existing mem map registration for this pd */
+       LIST_FOREACH(mr_map, &g_rdma_mr_maps, link) {
+               if (mr_map->pd == pd) {
+                       mr_map->ref++;
+                       rqpair->mr_map = mr_map;
+                       pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
+                       return 0;
+               }
+       }
 
-       mr_map = spdk_mem_map_alloc((uint64_t)NULL, nvme_rdma_mr_map_notify, pd);
+       mr_map = calloc(1, sizeof(*mr_map));
        if (mr_map == NULL) {
+               SPDK_ERRLOG("calloc() failed\n");
+               pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
+               return -1;
+       }
+
+       mr_map->ref = 1;
+       mr_map->pd = pd;
+       mr_map->map = spdk_mem_map_alloc((uint64_t)NULL, &nvme_rdma_map_ops, pd);
+       if (mr_map->map == NULL) {
                SPDK_ERRLOG("spdk_mem_map_alloc() failed\n");
+               free(mr_map);
+               pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
                return -1;
        }
 
        rqpair->mr_map = mr_map;
+       LIST_INSERT_HEAD(&g_rdma_mr_maps, mr_map, link);
+
+       pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
 
        return 0;
 }
@@ -686,13 +692,34 @@ nvme_rdma_register_mem(struct nvme_rdma_qpair *rqpair)
 static void
 nvme_rdma_unregister_mem(struct nvme_rdma_qpair *rqpair)
 {
-       spdk_mem_map_free(&rqpair->mr_map);
+       struct spdk_nvme_rdma_mr_map *mr_map;
+
+       mr_map = rqpair->mr_map;
+       rqpair->mr_map = NULL;
+
+       if (mr_map == NULL) {
+               return;
+       }
+
+       pthread_mutex_lock(&g_rdma_mr_maps_mutex);
+
+       assert(mr_map->ref > 0);
+       mr_map->ref--;
+       if (mr_map->ref == 0) {
+               LIST_REMOVE(mr_map, link);
+               spdk_mem_map_free(&mr_map->map);
+               free(mr_map);
+       }
+
+       pthread_mutex_unlock(&g_rdma_mr_maps_mutex);
 }
 
 static int
 nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
 {
-       struct sockaddr_storage  sin;
+       struct sockaddr_storage dst_addr;
+       struct sockaddr_storage src_addr;
+       bool src_addr_specified;
        int rc;
        struct spdk_nvme_ctrlr *ctrlr;
        int family;
@@ -717,24 +744,38 @@ nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
                return -1;
        }
 
-       SPDK_TRACELOG(SPDK_TRACE_NVME, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "adrfam %d ai_family %d\n", ctrlr->trid.adrfam, family);
 
-       memset(&sin, 0, sizeof(struct sockaddr_storage));
+       memset(&dst_addr, 0, sizeof(dst_addr));
 
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "trsvcid is %s\n", ctrlr->trid.trsvcid);
-       rc = nvme_rdma_parse_addr(&sin, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "trsvcid is %s\n", ctrlr->trid.trsvcid);
+       rc = nvme_rdma_parse_addr(&dst_addr, family, ctrlr->trid.traddr, ctrlr->trid.trsvcid);
        if (rc != 0) {
-               SPDK_ERRLOG("nvme_rdma_parse_addr() failed\n");
+               SPDK_ERRLOG("dst_addr nvme_rdma_parse_addr() failed\n");
                return -1;
        }
 
+       if (ctrlr->opts.src_addr[0] || ctrlr->opts.src_svcid[0]) {
+               memset(&src_addr, 0, sizeof(src_addr));
+               rc = nvme_rdma_parse_addr(&src_addr, family, ctrlr->opts.src_addr, ctrlr->opts.src_svcid);
+               if (rc != 0) {
+                       SPDK_ERRLOG("src_addr nvme_rdma_parse_addr() failed\n");
+                       return -1;
+               }
+               src_addr_specified = true;
+       } else {
+               src_addr_specified = false;
+       }
+
        rc = rdma_create_id(rqpair->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP);
        if (rc < 0) {
                SPDK_ERRLOG("rdma_create_id() failed\n");
                return -1;
        }
 
-       rc = nvme_rdma_resolve_addr(rqpair, &sin, rqpair->cm_channel);
+       rc = nvme_rdma_resolve_addr(rqpair,
+                                   src_addr_specified ? (struct sockaddr *)&src_addr : NULL,
+                                   (struct sockaddr *)&dst_addr, rqpair->cm_channel);
        if (rc < 0) {
                SPDK_ERRLOG("nvme_rdma_resolve_addr() failed\n");
                return -1;
@@ -753,20 +794,20 @@ nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
        }
 
        rc = nvme_rdma_alloc_reqs(rqpair);
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "rc =%d\n", rc);
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
        if (rc) {
                SPDK_ERRLOG("Unable to allocate rqpair  RDMA requests\n");
                return -1;
        }
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "RDMA requests allocated\n");
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA requests allocated\n");
 
        rc = nvme_rdma_alloc_rsps(rqpair);
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "rc =%d\n", rc);
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "rc =%d\n", rc);
        if (rc < 0) {
                SPDK_ERRLOG("Unable to allocate rqpair RDMA responses\n");
                return -1;
        }
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "RDMA responses allocated\n");
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "RDMA responses allocated\n");
 
        rc = nvme_rdma_register_mem(rqpair);
        if (rc < 0) {
@@ -774,7 +815,7 @@ nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
                return -1;
        }
 
-       rc = nvme_rdma_qpair_fabric_connect(rqpair);
+       rc = nvme_fabric_qpair_connect(&rqpair->qpair, rqpair->num_entries);
        if (rc < 0) {
                SPDK_ERRLOG("Failed to send an NVMe-oF Fabric CONNECT command\n");
                return -1;
@@ -787,194 +828,330 @@ nvme_rdma_qpair_connect(struct nvme_rdma_qpair *rqpair)
  * Build SGL describing empty payload.
  */
 static int
-nvme_rdma_build_null_request(struct nvme_request *req)
+nvme_rdma_build_null_request(struct spdk_nvme_rdma_req *rdma_req)
 {
-       struct spdk_nvme_sgl_descriptor *nvme_sgl;
+       struct nvme_request *req = rdma_req->req;
 
-       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_SGL;
+       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
 
-       nvme_sgl = &req->cmd.dptr.sgl1;
-       nvme_sgl->keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
-       nvme_sgl->keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
-       nvme_sgl->keyed.length = 0;
-       nvme_sgl->keyed.key = 0;
-       nvme_sgl->address = 0;
+       /* The first element of this SGL is pointing at an
+        * spdk_nvmf_cmd object. For this particular command,
+        * we only need the first 64 bytes corresponding to
+        * the NVMe command. */
+       rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
+
+       /* The RDMA SGL needs one element describing the NVMe command. */
+       rdma_req->send_wr.num_sge = 1;
+
+       req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
+       req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
+       req->cmd.dptr.sgl1.keyed.length = 0;
+       req->cmd.dptr.sgl1.keyed.key = 0;
+       req->cmd.dptr.sgl1.address = 0;
 
        return 0;
 }
 
 /*
- * Build SGL describing contiguous payload buffer.
+ * Build inline SGL describing contiguous payload buffer.
  */
 static int
-nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair, struct nvme_request *req)
+nvme_rdma_build_contig_inline_request(struct nvme_rdma_qpair *rqpair,
+                                     struct spdk_nvme_rdma_req *rdma_req)
 {
-       void *payload = req->payload.u.contig + req->payload_offset;
+       struct nvme_request *req = rdma_req->req;
        struct ibv_mr *mr;
+       void *payload;
+       uint64_t requested_size;
 
+       payload = req->payload.contig_or_cb_arg + req->payload_offset;
        assert(req->payload_size != 0);
-       assert(req->payload.type == NVME_PAYLOAD_TYPE_CONTIG);
+       assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
 
-       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map, (uint64_t)payload);
-       if (mr == NULL) {
-               return -1;
+       requested_size = req->payload_size;
+       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map,
+                       (uint64_t)payload, &requested_size);
+
+       if (mr == NULL || requested_size < req->payload_size) {
+               return -EINVAL;
        }
 
-       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_SGL;
-       req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
-       req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
-       req->cmd.dptr.sgl1.keyed.length = req->payload_size;
-       req->cmd.dptr.sgl1.keyed.key = mr->rkey;
-       req->cmd.dptr.sgl1.address = (uint64_t)payload;
+       /* The first element of this SGL is pointing at an
+        * spdk_nvmf_cmd object. For this particular command,
+        * we only need the first 64 bytes corresponding to
+        * the NVMe command. */
+       rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
+
+       rdma_req->send_sgl[1].addr = (uint64_t)payload;
+       rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
+       rdma_req->send_sgl[1].lkey = mr->lkey;
+
+       /* The RDMA SGL contains two elements. The first describes
+        * the NVMe command and the second describes the data
+        * payload. */
+       rdma_req->send_wr.num_sge = 2;
+
+       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
+       req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
+       req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
+       req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
+       /* Inline only supported for icdoff == 0 currently.  This function will
+        * not get called for controllers with other values. */
+       req->cmd.dptr.sgl1.address = (uint64_t)0;
 
        return 0;
 }
 
 /*
- * Build SGL describing scattered payload buffer.
+ * Build SGL describing contiguous payload buffer.
  */
 static int
-nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair, struct nvme_request *req)
+nvme_rdma_build_contig_request(struct nvme_rdma_qpair *rqpair,
+                              struct spdk_nvme_rdma_req *rdma_req)
 {
-       int rc;
-       void *virt_addr;
+       struct nvme_request *req = rdma_req->req;
+       void *payload = req->payload.contig_or_cb_arg + req->payload_offset;
        struct ibv_mr *mr;
-       uint32_t length;
+       uint64_t requested_size;
 
        assert(req->payload_size != 0);
-       assert(req->payload.type == NVME_PAYLOAD_TYPE_SGL);
-       assert(req->payload.u.sgl.reset_sgl_fn != NULL);
-       assert(req->payload.u.sgl.next_sge_fn != NULL);
-       req->payload.u.sgl.reset_sgl_fn(req->payload.u.sgl.cb_arg, req->payload_offset);
+       assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG);
 
-       /* TODO: for now, we only support a single SGL entry */
-       rc = req->payload.u.sgl.next_sge_fn(req->payload.u.sgl.cb_arg, &virt_addr, &length);
-       if (rc) {
+       requested_size = req->payload_size;
+       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)payload,
+                       &requested_size);
+       if (mr == NULL || requested_size < req->payload_size) {
                return -1;
        }
 
-       if (length != req->payload_size) {
-               SPDK_ERRLOG("multi-element SGL currently not supported for RDMA\n");
-               return -1;
-       }
+       /* The first element of this SGL is pointing at an
+        * spdk_nvmf_cmd object. For this particular command,
+        * we only need the first 64 bytes corresponding to
+        * the NVMe command. */
+       rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
 
-       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map, (uint64_t)virt_addr);
-       if (mr == NULL) {
-               return -1;
-       }
+       /* The RDMA SGL needs one element describing the NVMe command. */
+       rdma_req->send_wr.num_sge = 1;
 
-       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_SGL;
+       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
        req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
        req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
-       req->cmd.dptr.sgl1.keyed.length = length;
+       req->cmd.dptr.sgl1.keyed.length = req->payload_size;
        req->cmd.dptr.sgl1.keyed.key = mr->rkey;
-       req->cmd.dptr.sgl1.address = (uint64_t)virt_addr;
+       req->cmd.dptr.sgl1.address = (uint64_t)payload;
 
        return 0;
 }
 
+/*
+ * Build SGL describing scattered payload buffer.
+ */
 static int
-nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
-                  struct spdk_nvme_rdma_req *rdma_req)
+nvme_rdma_build_sgl_request(struct nvme_rdma_qpair *rqpair,
+                           struct spdk_nvme_rdma_req *rdma_req)
 {
-       int rc;
+       struct nvme_request *req = rdma_req->req;
+       struct spdk_nvmf_cmd *cmd = &rqpair->cmds[rdma_req->id];
+       struct ibv_mr *mr = NULL;
+       void *virt_addr;
+       uint64_t remaining_size, mr_length;
+       uint32_t sge_length;
+       int rc, max_num_sgl, num_sgl_desc;
 
-       rdma_req->req = req;
-       req->cmd.cid = rdma_req->id;
+       assert(req->payload_size != 0);
+       assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
+       assert(req->payload.reset_sgl_fn != NULL);
+       assert(req->payload.next_sge_fn != NULL);
+       req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
 
-       if (req->payload_size == 0) {
-               rc = nvme_rdma_build_null_request(req);
-       } else if (req->payload.type == NVME_PAYLOAD_TYPE_CONTIG) {
-               rc = nvme_rdma_build_contig_request(rqpair, req);
-       } else if (req->payload.type == NVME_PAYLOAD_TYPE_SGL) {
-               rc = nvme_rdma_build_sgl_request(rqpair, req);
-       } else {
-               rc = -1;
-       }
+       max_num_sgl = req->qpair->ctrlr->max_sges;
 
-       if (rc) {
-               return rc;
-       }
+       remaining_size = req->payload_size;
+       num_sgl_desc = 0;
+       do {
+               rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &sge_length);
+               if (rc) {
+                       return -1;
+               }
 
-       memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
-       return 0;
-}
+               sge_length = spdk_min(remaining_size, sge_length);
+               mr_length = sge_length;
 
-static int
-nvme_rdma_fabric_prop_set_cmd(struct spdk_nvme_ctrlr *ctrlr,
-                             uint32_t offset, uint8_t size, uint64_t value)
-{
-       struct spdk_nvmf_fabric_prop_set_cmd cmd = {};
-       struct nvme_completion_poll_status status = {};
-       int rc;
+               mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
+                               &mr_length);
 
-       cmd.opcode = SPDK_NVME_OPC_FABRIC;
-       cmd.fctype = SPDK_NVMF_FABRIC_COMMAND_PROPERTY_SET;
-       cmd.ofst = offset;
-       cmd.attrib.size = size;
-       cmd.value.u64 = value;
+               if (mr == NULL || mr_length < sge_length) {
+                       return -1;
+               }
 
-       rc = spdk_nvme_ctrlr_cmd_admin_raw(ctrlr, (struct spdk_nvme_cmd *)&cmd,
-                                          NULL, 0,
-                                          nvme_completion_poll_cb, &status);
+               cmd->sgl[num_sgl_desc].keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
+               cmd->sgl[num_sgl_desc].keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
+               cmd->sgl[num_sgl_desc].keyed.length = sge_length;
+               cmd->sgl[num_sgl_desc].keyed.key = mr->rkey;
+               cmd->sgl[num_sgl_desc].address = (uint64_t)virt_addr;
 
-       if (rc < 0) {
-               SPDK_ERRLOG("failed to send nvmf_fabric_prop_set_cmd\n");
+               remaining_size -= sge_length;
+               num_sgl_desc++;
+       } while (remaining_size > 0 && num_sgl_desc < max_num_sgl);
+
+
+       /* Should be impossible if we did our sgl checks properly up the stack, but do a sanity check here. */
+       if (remaining_size > 0) {
                return -1;
        }
 
-       while (status.done == false) {
-               spdk_nvme_qpair_process_completions(ctrlr->adminq, 0);
-       }
+       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
 
-       if (spdk_nvme_cpl_is_error(&status.cpl)) {
-               SPDK_ERRLOG("nvme_rdma_fabric_prop_get_cmd failed\n");
-               return -1;
+       /* The RDMA SGL needs one element describing some portion
+        * of the spdk_nvmf_cmd structure. */
+       rdma_req->send_wr.num_sge = 1;
+
+       /*
+        * If only one SGL descriptor is required, it can be embedded directly in the command
+        * as a data block descriptor.
+        */
+       if (num_sgl_desc == 1) {
+               /* The first element of this SGL is pointing at an
+                * spdk_nvmf_cmd object. For this particular command,
+                * we only need the first 64 bytes corresponding to
+                * the NVMe command. */
+               rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
+
+               req->cmd.dptr.sgl1.keyed.type = SPDK_NVME_SGL_TYPE_KEYED_DATA_BLOCK;
+               req->cmd.dptr.sgl1.keyed.subtype = SPDK_NVME_SGL_SUBTYPE_ADDRESS;
+               req->cmd.dptr.sgl1.keyed.length = req->payload_size;
+               req->cmd.dptr.sgl1.keyed.key = mr->rkey;
+               req->cmd.dptr.sgl1.address = rqpair->cmds[rdma_req->id].sgl[0].address;
+       } else {
+               /*
+                * Otherwise, The SGL descriptor embedded in the command must point to the list of
+                * SGL descriptors used to describe the operation. In that case it is a last segment descriptor.
+                */
+               rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd) + sizeof(struct
+                                              spdk_nvme_sgl_descriptor) * num_sgl_desc;
+
+               req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_LAST_SEGMENT;
+               req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
+               req->cmd.dptr.sgl1.unkeyed.length = num_sgl_desc * sizeof(struct spdk_nvme_sgl_descriptor);
+               req->cmd.dptr.sgl1.address = (uint64_t)0;
        }
 
        return 0;
 }
 
+/*
+ * Build inline SGL describing sgl payload buffer.
+ */
 static int
-nvme_rdma_fabric_prop_get_cmd(struct spdk_nvme_ctrlr *ctrlr,
-                             uint32_t offset, uint8_t size, uint64_t *value)
+nvme_rdma_build_sgl_inline_request(struct nvme_rdma_qpair *rqpair,
+                                  struct spdk_nvme_rdma_req *rdma_req)
 {
-       struct spdk_nvmf_fabric_prop_set_cmd cmd = {};
-       struct nvme_completion_poll_status status = {};
-       struct spdk_nvmf_fabric_prop_get_rsp *response;
+       struct nvme_request *req = rdma_req->req;
+       struct ibv_mr *mr;
+       uint32_t length;
+       uint64_t requested_size;
+       void *virt_addr;
        int rc;
 
-       cmd.opcode = SPDK_NVME_OPC_FABRIC;
-       cmd.fctype = SPDK_NVMF_FABRIC_COMMAND_PROPERTY_GET;
-       cmd.ofst = offset;
-       cmd.attrib.size = size;
-
-       rc = spdk_nvme_ctrlr_cmd_admin_raw(ctrlr, (struct spdk_nvme_cmd *)&cmd,
-                                          NULL, 0, nvme_completion_poll_cb,
-                                          &status);
+       assert(req->payload_size != 0);
+       assert(nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL);
+       assert(req->payload.reset_sgl_fn != NULL);
+       assert(req->payload.next_sge_fn != NULL);
+       req->payload.reset_sgl_fn(req->payload.contig_or_cb_arg, req->payload_offset);
 
-       if (rc < 0) {
-               SPDK_ERRLOG("failed to send nvme_rdma_fabric_prop_get_cmd\n");
+       /* TODO: for now, we only support a single SGL entry */
+       rc = req->payload.next_sge_fn(req->payload.contig_or_cb_arg, &virt_addr, &length);
+       if (rc) {
                return -1;
        }
 
-       while (status.done == false) {
-               spdk_nvme_qpair_process_completions(ctrlr->adminq, 0);
+       if (length < req->payload_size) {
+               SPDK_ERRLOG("multi-element SGL currently not supported for RDMA\n");
+               return -1;
        }
 
-       if (spdk_nvme_cpl_is_error(&status.cpl)) {
-               SPDK_ERRLOG("nvme_rdma_fabric_prop_get_cmd failed\n");
+       requested_size = req->payload_size;
+       mr = (struct ibv_mr *)spdk_mem_map_translate(rqpair->mr_map->map, (uint64_t)virt_addr,
+                       &requested_size);
+       if (mr == NULL || requested_size < req->payload_size) {
                return -1;
        }
 
-       response = (struct spdk_nvmf_fabric_prop_get_rsp *)&status.cpl;
+       /* The first element of this SGL is pointing at an
+        * spdk_nvmf_cmd object. For this particular command,
+        * we only need the first 64 bytes corresponding to
+        * the NVMe command. */
+       rdma_req->send_sgl[0].length = sizeof(struct spdk_nvme_cmd);
+
+       rdma_req->send_sgl[1].addr = (uint64_t)virt_addr;
+       rdma_req->send_sgl[1].length = (uint32_t)req->payload_size;
+       rdma_req->send_sgl[1].lkey = mr->lkey;
+
+       /* The RDMA SGL contains two elements. The first describes
+        * the NVMe command and the second describes the data
+        * payload. */
+       rdma_req->send_wr.num_sge = 2;
+
+       req->cmd.psdt = SPDK_NVME_PSDT_SGL_MPTR_CONTIG;
+       req->cmd.dptr.sgl1.unkeyed.type = SPDK_NVME_SGL_TYPE_DATA_BLOCK;
+       req->cmd.dptr.sgl1.unkeyed.subtype = SPDK_NVME_SGL_SUBTYPE_OFFSET;
+       req->cmd.dptr.sgl1.unkeyed.length = (uint32_t)req->payload_size;
+       /* Inline only supported for icdoff == 0 currently.  This function will
+        * not get called for controllers with other values. */
+       req->cmd.dptr.sgl1.address = (uint64_t)0;
+
+       return 0;
+}
+
+static inline unsigned int
+nvme_rdma_icdsz_bytes(struct spdk_nvme_ctrlr *ctrlr)
+{
+       return (ctrlr->cdata.nvmf_specific.ioccsz * 16 - sizeof(struct spdk_nvme_cmd));
+}
+
+static int
+nvme_rdma_req_init(struct nvme_rdma_qpair *rqpair, struct nvme_request *req,
+                  struct spdk_nvme_rdma_req *rdma_req)
+{
+       struct spdk_nvme_ctrlr *ctrlr = rqpair->qpair.ctrlr;
+       int rc;
+
+       rdma_req->req = req;
+       req->cmd.cid = rdma_req->id;
 
-       if (!size) {
-               *value = response->value.u32.low;
+       if (req->payload_size == 0) {
+               rc = nvme_rdma_build_null_request(rdma_req);
+       } else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_CONTIG) {
+               /*
+                * Check if icdoff is non zero, to avoid interop conflicts with
+                * targets with non-zero icdoff.  Both SPDK and the Linux kernel
+                * targets use icdoff = 0.  For targets with non-zero icdoff, we
+                * will currently just not use inline data for now.
+                */
+               if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
+                   req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
+                   (ctrlr->cdata.nvmf_specific.icdoff == 0)) {
+                       rc = nvme_rdma_build_contig_inline_request(rqpair, rdma_req);
+               } else {
+                       rc = nvme_rdma_build_contig_request(rqpair, rdma_req);
+               }
+       } else if (nvme_payload_type(&req->payload) == NVME_PAYLOAD_TYPE_SGL) {
+               if (req->cmd.opc == SPDK_NVME_OPC_WRITE &&
+                   req->payload_size <= nvme_rdma_icdsz_bytes(ctrlr) &&
+                   ctrlr->cdata.nvmf_specific.icdoff == 0) {
+                       rc = nvme_rdma_build_sgl_inline_request(rqpair, rdma_req);
+               } else {
+                       rc = nvme_rdma_build_sgl_request(rqpair, rdma_req);
+               }
        } else {
-               *value = response->value.u64;
+               rc = -1;
        }
 
+       if (rc) {
+               return rc;
+       }
+
+       memcpy(&rqpair->cmds[rdma_req->id], &req->cmd, sizeof(req->cmd));
        return 0;
 }
 
@@ -1020,6 +1197,7 @@ nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
        if (!qpair) {
                return -1;
        }
+       nvme_qpair_deinit(qpair);
 
        rqpair = nvme_rdma_qpair(qpair);
 
@@ -1049,10 +1227,10 @@ nvme_rdma_qpair_destroy(struct spdk_nvme_qpair *qpair)
 
 struct spdk_nvme_qpair *
 nvme_rdma_ctrlr_create_io_qpair(struct spdk_nvme_ctrlr *ctrlr, uint16_t qid,
-                               enum spdk_nvme_qprio qprio)
+                               const struct spdk_nvme_io_qpair_opts *opts)
 {
-       return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, ctrlr->opts.io_queue_size, qprio,
-                                           ctrlr->opts.io_queue_requests);
+       return nvme_rdma_ctrlr_create_qpair(ctrlr, qid, opts->io_queue_size, opts->qprio,
+                                           opts->io_queue_requests);
 }
 
 int
@@ -1062,103 +1240,30 @@ nvme_rdma_ctrlr_enable(struct spdk_nvme_ctrlr *ctrlr)
        return 0;
 }
 
-static int
-nvme_fabrics_get_log_discovery_page(struct spdk_nvme_ctrlr *ctrlr,
-                                   void *log_page, uint32_t size)
-{
-       struct nvme_completion_poll_status status;
-       int rc;
-
-       status.done = false;
-       rc = spdk_nvme_ctrlr_cmd_get_log_page(ctrlr, SPDK_NVME_LOG_DISCOVERY, 0, log_page, size, 0,
-                                             nvme_completion_poll_cb, &status);
-       if (rc < 0) {
-               return -1;
-       }
-
-       while (status.done == false) {
-               spdk_nvme_qpair_process_completions(ctrlr->adminq, 0);
-       }
-
-       if (spdk_nvme_cpl_is_error(&status.cpl)) {
-               return -1;
-       }
-
-       return 0;
-}
-
-static void
-nvme_rdma_discovery_probe(struct spdk_nvmf_discovery_log_page_entry *entry,
-                         void *cb_ctx, spdk_nvme_probe_cb probe_cb)
-{
-       struct spdk_nvme_transport_id trid;
-       uint8_t *end;
-       size_t len;
-
-       memset(&trid, 0, sizeof(trid));
-
-       if (entry->subtype == SPDK_NVMF_SUBTYPE_DISCOVERY) {
-               SPDK_WARNLOG("Skipping unsupported discovery service referral\n");
-               return;
-       } else if (entry->subtype != SPDK_NVMF_SUBTYPE_NVME) {
-               SPDK_WARNLOG("Skipping unknown subtype %u\n", entry->subtype);
-               return;
-       }
-
-       trid.trtype = entry->trtype;
-       if (!spdk_nvme_transport_available(trid.trtype)) {
-               SPDK_WARNLOG("NVMe transport type %u not available; skipping probe\n",
-                            trid.trtype);
-               return;
-       }
-
-       trid.adrfam = entry->adrfam;
-
-       /* Ensure that subnqn is null terminated. */
-       end = memchr(entry->subnqn, '\0', SPDK_NVMF_NQN_MAX_LEN);
-       if (!end) {
-               SPDK_ERRLOG("Discovery entry SUBNQN is not null terminated\n");
-               return;
-       }
-       len = end - entry->subnqn;
-       memcpy(trid.subnqn, entry->subnqn, len);
-       trid.subnqn[len] = '\0';
-
-       /* Convert traddr to a null terminated string. */
-       len = spdk_strlen_pad(entry->traddr, sizeof(entry->traddr), ' ');
-       memcpy(trid.traddr, entry->traddr, len);
-
-       /* Convert trsvcid to a null terminated string. */
-       len = spdk_strlen_pad(entry->trsvcid, sizeof(entry->trsvcid), ' ');
-       memcpy(trid.trsvcid, entry->trsvcid, len);
-
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "subnqn=%s, trtype=%u, traddr=%s, trsvcid=%s\n",
-                     trid.subnqn, trid.trtype,
-                     trid.traddr, trid.trsvcid);
-
-       nvme_ctrlr_probe(&trid, NULL, probe_cb, cb_ctx);
-}
-
 /* This function must only be called while holding g_spdk_nvme_driver->lock */
 int
 nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
                     void *cb_ctx,
                     spdk_nvme_probe_cb probe_cb,
-                    spdk_nvme_remove_cb remove_cb)
+                    spdk_nvme_remove_cb remove_cb,
+                    bool direct_connect)
 {
        struct spdk_nvme_ctrlr_opts discovery_opts;
        struct spdk_nvme_ctrlr *discovery_ctrlr;
-       struct spdk_nvmf_discovery_log_page *log_page;
        union spdk_nvme_cc_register cc;
-       char buffer[4096];
        int rc;
-       uint64_t i, numrec, buffer_max_entries;
+       struct nvme_completion_poll_status status;
 
-       spdk_nvme_ctrlr_opts_set_defaults(&discovery_opts);
+       if (strcmp(discovery_trid->subnqn, SPDK_NVMF_DISCOVERY_NQN) != 0) {
+               /* It is not a discovery_ctrlr info and try to directly connect it */
+               rc = nvme_ctrlr_probe(discovery_trid, NULL, probe_cb, cb_ctx);
+               return rc;
+       }
+
+       spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, sizeof(discovery_opts));
        /* For discovery_ctrlr set the timeout to 0 */
        discovery_opts.keep_alive_timeout_ms = 0;
 
-       memset(buffer, 0x0, 4096);
        discovery_ctrlr = nvme_rdma_ctrlr_construct(discovery_trid, &discovery_opts, NULL);
        if (discovery_ctrlr == NULL) {
                return -1;
@@ -1177,36 +1282,32 @@ nvme_rdma_ctrlr_scan(const struct spdk_nvme_transport_id *discovery_trid,
                return -1;
        }
 
-       rc = nvme_fabrics_get_log_discovery_page(discovery_ctrlr, buffer, sizeof(buffer));
-       if (rc < 0) {
-               SPDK_TRACELOG(SPDK_TRACE_NVME, "nvme_fabrics_get_log_discovery_page error\n");
-               nvme_ctrlr_destruct(discovery_ctrlr);
-               /* It is not a discovery_ctrlr info and try to directly connect it */
-               rc = nvme_ctrlr_probe(discovery_trid, NULL, probe_cb, cb_ctx);
+       /* get the cdata info */
+       rc = nvme_ctrlr_cmd_identify(discovery_ctrlr, SPDK_NVME_IDENTIFY_CTRLR, 0, 0,
+                                    &discovery_ctrlr->cdata, sizeof(discovery_ctrlr->cdata),
+                                    nvme_completion_poll_cb, &status);
+       if (rc != 0) {
+               SPDK_ERRLOG("Failed to identify cdata\n");
                return rc;
        }
 
-       log_page = (struct spdk_nvmf_discovery_log_page *)buffer;
-
-       /*
-        * For now, only support retrieving one buffer of discovery entries.
-        * This could be extended to call Get Log Page multiple times as needed.
-        */
-       buffer_max_entries = (sizeof(buffer) - offsetof(struct spdk_nvmf_discovery_log_page, entries[0])) /
-                            sizeof(struct spdk_nvmf_discovery_log_page_entry);
-       numrec = spdk_min(log_page->numrec, buffer_max_entries);
-       if (numrec != log_page->numrec) {
-               SPDK_WARNLOG("Discovery service returned %" PRIu64 " entries,"
-                            "but buffer can only hold %" PRIu64 "\n",
-                            log_page->numrec, numrec);
+       if (spdk_nvme_wait_for_completion(discovery_ctrlr->adminq, &status)) {
+               SPDK_ERRLOG("nvme_identify_controller failed!\n");
+               return -ENXIO;
        }
 
-       for (i = 0; i < numrec; i++) {
-               nvme_rdma_discovery_probe(&log_page->entries[i], cb_ctx, probe_cb);
+       /* Direct attach through spdk_nvme_connect() API */
+       if (direct_connect == true) {
+               /* Set the ready state to skip the normal init process */
+               discovery_ctrlr->state = NVME_CTRLR_STATE_READY;
+               nvme_ctrlr_connected(discovery_ctrlr);
+               nvme_ctrlr_add_process(discovery_ctrlr, 0);
+               return 0;
        }
 
+       rc = nvme_fabric_ctrlr_discover(discovery_ctrlr, cb_ctx, probe_cb);
        nvme_ctrlr_destruct(discovery_ctrlr);
-       return 0;
+       return rc;
 }
 
 struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transport_id *trid,
@@ -1215,6 +1316,7 @@ struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transpo
 {
        struct nvme_rdma_ctrlr *rctrlr;
        union spdk_nvme_cap_register cap;
+       union spdk_nvme_vs_register vs;
        int rc;
 
        rctrlr = calloc(1, sizeof(struct nvme_rdma_ctrlr));
@@ -1229,7 +1331,7 @@ struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transpo
 
        rc = nvme_ctrlr_construct(&rctrlr->ctrlr);
        if (rc != 0) {
-               nvme_ctrlr_destruct(&rctrlr->ctrlr);
+               free(rctrlr);
                return NULL;
        }
 
@@ -1237,6 +1339,7 @@ struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transpo
                               SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES, 0, SPDK_NVMF_MIN_ADMIN_QUEUE_ENTRIES);
        if (!rctrlr->ctrlr.adminq) {
                SPDK_ERRLOG("failed to create admin qpair\n");
+               nvme_rdma_ctrlr_destruct(&rctrlr->ctrlr);
                return NULL;
        }
 
@@ -1246,9 +1349,21 @@ struct spdk_nvme_ctrlr *nvme_rdma_ctrlr_construct(const struct spdk_nvme_transpo
                return NULL;
        }
 
-       nvme_ctrlr_init_cap(&rctrlr->ctrlr, &cap);
+       if (nvme_ctrlr_get_vs(&rctrlr->ctrlr, &vs)) {
+               SPDK_ERRLOG("get_vs() failed\n");
+               nvme_ctrlr_destruct(&rctrlr->ctrlr);
+               return NULL;
+       }
+
+       if (nvme_ctrlr_add_process(&rctrlr->ctrlr, 0) != 0) {
+               SPDK_ERRLOG("nvme_ctrlr_add_process() failed\n");
+               nvme_ctrlr_destruct(&rctrlr->ctrlr);
+               return NULL;
+       }
+
+       nvme_ctrlr_init_cap(&rctrlr->ctrlr, &cap, &vs);
 
-       SPDK_TRACELOG(SPDK_TRACE_DEBUG, "succesully initialized the nvmf ctrlr\n");
+       SPDK_DEBUGLOG(SPDK_LOG_NVME, "successfully initialized the nvmf ctrlr\n");
        return &rctrlr->ctrlr;
 }
 
@@ -1261,6 +1376,8 @@ nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
                nvme_rdma_qpair_destroy(ctrlr->adminq);
        }
 
+       nvme_ctrlr_destruct_finish(ctrlr);
+
        free(rctrlr);
 
        return 0;
@@ -1269,32 +1386,25 @@ nvme_rdma_ctrlr_destruct(struct spdk_nvme_ctrlr *ctrlr)
 int
 nvme_rdma_ctrlr_set_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t value)
 {
-       return nvme_rdma_fabric_prop_set_cmd(ctrlr, offset, SPDK_NVMF_PROP_SIZE_4, value);
+       return nvme_fabric_ctrlr_set_reg_4(ctrlr, offset, value);
 }
 
 int
 nvme_rdma_ctrlr_set_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t value)
 {
-       return nvme_rdma_fabric_prop_set_cmd(ctrlr, offset, SPDK_NVMF_PROP_SIZE_8, value);
+       return nvme_fabric_ctrlr_set_reg_8(ctrlr, offset, value);
 }
 
 int
 nvme_rdma_ctrlr_get_reg_4(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint32_t *value)
 {
-       uint64_t tmp_value;
-       int rc;
-       rc = nvme_rdma_fabric_prop_get_cmd(ctrlr, offset, SPDK_NVMF_PROP_SIZE_4, &tmp_value);
-
-       if (!rc) {
-               *value = (uint32_t)tmp_value;
-       }
-       return rc;
+       return nvme_fabric_ctrlr_get_reg_4(ctrlr, offset, value);
 }
 
 int
 nvme_rdma_ctrlr_get_reg_8(struct spdk_nvme_ctrlr *ctrlr, uint32_t offset, uint64_t *value)
 {
-       return nvme_rdma_fabric_prop_get_cmd(ctrlr, offset, SPDK_NVMF_PROP_SIZE_8, value);
+       return nvme_fabric_ctrlr_get_reg_8(ctrlr, offset, value);
 }
 
 int
@@ -1325,13 +1435,20 @@ nvme_rdma_qpair_submit_request(struct spdk_nvme_qpair *qpair,
                return -1;
        }
 
+       req->timed_out = false;
+       if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
+               req->submit_tick = spdk_get_ticks();
+       } else {
+               req->submit_tick = 0;
+       }
+
        wr = &rdma_req->send_wr;
 
        nvme_rdma_trace_ibv_sge(wr->sg_list);
 
        rc = ibv_post_send(rqpair->cm_id->qp, wr, &bad_wr);
        if (rc) {
-               SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, strerror(rc));
+               SPDK_ERRLOG("Failure posting rdma send for NVMf completion: %d (%s)\n", rc, spdk_strerror(rc));
        }
 
        return rc;
@@ -1377,16 +1494,55 @@ nvme_rdma_qpair_fail(struct spdk_nvme_qpair *qpair)
        return 0;
 }
 
+static void
+nvme_rdma_qpair_check_timeout(struct spdk_nvme_qpair *qpair)
+{
+       uint64_t t02;
+       struct spdk_nvme_rdma_req *rdma_req, *tmp;
+       struct nvme_rdma_qpair *rqpair = nvme_rdma_qpair(qpair);
+       struct spdk_nvme_ctrlr *ctrlr = qpair->ctrlr;
+       struct spdk_nvme_ctrlr_process *active_proc;
+
+       /* Don't check timeouts during controller initialization. */
+       if (ctrlr->state != NVME_CTRLR_STATE_READY) {
+               return;
+       }
+
+       if (nvme_qpair_is_admin_queue(qpair)) {
+               active_proc = spdk_nvme_ctrlr_get_current_process(ctrlr);
+       } else {
+               active_proc = qpair->active_proc;
+       }
+
+       /* Only check timeouts if the current process has a timeout callback. */
+       if (active_proc == NULL || active_proc->timeout_cb_fn == NULL) {
+               return;
+       }
+
+       t02 = spdk_get_ticks();
+       TAILQ_FOREACH_SAFE(rdma_req, &rqpair->outstanding_reqs, link, tmp) {
+               assert(rdma_req->req != NULL);
+
+               if (nvme_request_check_timeout(rdma_req->req, rdma_req->id, active_proc, t02)) {
+                       /*
+                        * The requests are in order, so as soon as one has not timed out,
+                        * stop iterating.
+                        */
+                       break;
+               }
+       }
+}
+
 #define MAX_COMPLETIONS_PER_POLL 128
 
 int
 nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
                                    uint32_t max_completions)
 {
-       struct nvme_rdma_qpair  *rqpair = nvme_rdma_qpair(qpair);
-       struct ibv_wc           wc[MAX_COMPLETIONS_PER_POLL];
+       struct nvme_rdma_qpair  *rqpair = nvme_rdma_qpair(qpair);
+       struct ibv_wc           wc[MAX_COMPLETIONS_PER_POLL];
        int                     i, rc, batch_size;
-       uint32_t                reaped;
+       uint32_t                reaped;
        struct ibv_cq           *cq;
 
        if (max_completions == 0) {
@@ -1404,7 +1560,7 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
                rc = ibv_poll_cq(cq, batch_size, wc);
                if (rc < 0) {
                        SPDK_ERRLOG("Error polling CQ! (%d): %s\n",
-                                   errno, strerror(errno));
+                                   errno, spdk_strerror(errno));
                        return -1;
                } else if (rc == 0) {
                        /* Ran out of completions */
@@ -1420,7 +1576,7 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
 
                        switch (wc[i].opcode) {
                        case IBV_WC_RECV:
-                               SPDK_TRACELOG(SPDK_TRACE_DEBUG, "CQ recv completion\n");
+                               SPDK_DEBUGLOG(SPDK_LOG_NVME, "CQ recv completion\n");
 
                                reaped++;
 
@@ -1445,6 +1601,10 @@ nvme_rdma_qpair_process_completions(struct spdk_nvme_qpair *qpair,
                }
        } while (reaped < max_completions);
 
+       if (spdk_unlikely(rqpair->qpair.ctrlr->timeout_enabled)) {
+               nvme_rdma_qpair_check_timeout(qpair);
+       }
+
        return reaped;
 }
 
@@ -1455,8 +1615,20 @@ nvme_rdma_ctrlr_get_max_xfer_size(struct spdk_nvme_ctrlr *ctrlr)
        return NVME_RDMA_RW_BUFFER_SIZE;
 }
 
-uint32_t
-nvme_rdma_ctrlr_get_max_io_queue_size(struct spdk_nvme_ctrlr *ctrlr)
+uint16_t
+nvme_rdma_ctrlr_get_max_sges(struct spdk_nvme_ctrlr *ctrlr)
 {
-       return NVME_HOST_MAX_ENTRIES_PER_QUEUE;
+       return spdk_min(ctrlr->cdata.nvmf_specific.msdbd, NVME_RDMA_MAX_SGL_DESCRIPTORS);
+}
+
+void *
+nvme_rdma_ctrlr_alloc_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, size_t size)
+{
+       return NULL;
+}
+
+int
+nvme_rdma_ctrlr_free_cmb_io_buffer(struct spdk_nvme_ctrlr *ctrlr, void *buf, size_t size)
+{
+       return 0;
 }